diff --git a/repub/jobs.py b/repub/jobs.py index b5441ac..3e7ef3a 100644 --- a/repub/jobs.py +++ b/repub/jobs.py @@ -14,6 +14,7 @@ from typing import Callable, TextIO, cast from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger +from peewee import IntegrityError from repub.config import feed_output_dir, feed_output_path from repub.model import ( @@ -143,6 +144,7 @@ class JobRuntime: ) self.sync_jobs() self._started = True + self._start_queued_jobs() def shutdown(self) -> None: for execution_id in tuple(self._workers): @@ -183,20 +185,84 @@ class JobRuntime: self.scheduler.remove_job(scheduled_job.id) def run_scheduled_job(self, job_id: int) -> None: - self.run_job_now(job_id, reason="scheduled") + self.enqueue_job_run(job_id, reason="scheduled") def run_job_now(self, job_id: int, *, reason: str) -> int | None: + return self.enqueue_job_run(job_id, reason=reason) + + def enqueue_job_run(self, job_id: int, *, reason: str) -> int | None: del reason self.start() with self._run_lock: - with database.connection_context(): + execution_id = self._enqueue_job_run_locked(job_id) + self._start_queued_jobs_locked() + + if execution_id is not None: + self._trigger_refresh() + return execution_id + + def _enqueue_job_run_locked(self, job_id: int) -> int | None: + with database.connection_context(): + with database.atomic(): job = Job.get_or_none(id=job_id) if job is None: return None - if self._max_concurrent_jobs_reached(): - return None + pending_execution = JobExecution.get_or_none( + (JobExecution.job == job) + & (JobExecution.running_status == JobExecutionStatus.PENDING) + ) + if pending_execution is not None: + return _execution_id(pending_execution) + try: + execution = JobExecution.create( + job=job, + running_status=JobExecutionStatus.PENDING, + ) + except IntegrityError: + pending_execution = JobExecution.get_or_none( + (JobExecution.job == job) + & (JobExecution.running_status == JobExecutionStatus.PENDING) + ) + return ( + _execution_id(pending_execution) + if pending_execution is not None + else None + ) + return _execution_id(execution) + + def _start_queued_jobs(self) -> None: + with self._run_lock: + self._start_queued_jobs_locked() + + def _start_queued_jobs_locked(self) -> None: + while True: + if self._max_concurrent_jobs_reached(): + return + + claimed_execution = self._claim_next_pending_execution() + if claimed_execution is None: + return + + job = cast(Job, claimed_execution.job) + self._start_worker_for_execution( + job_id=_job_id(job), + execution_id=_execution_id(claimed_execution), + ) + + def _claim_next_pending_execution(self) -> JobExecution | None: + with database.connection_context(): + execution_primary_key = getattr(JobExecution, "_meta").primary_key + pending_executions = tuple( + JobExecution.select(JobExecution, Job) + .join(Job) + .where(JobExecution.running_status == JobExecutionStatus.PENDING) + .order_by(JobExecution.created_at.asc(), execution_primary_key.asc()) + ) + + for execution in pending_executions: + job = cast(Job, execution.job) already_running = ( JobExecution.select() .where( @@ -206,15 +272,25 @@ class JobRuntime: .exists() ) if already_running: - return None + continue - execution = JobExecution.create( - job=job, - started_at=utc_now(), - running_status=JobExecutionStatus.RUNNING, + started_at = utc_now() + claimed = ( + JobExecution.update( + started_at=started_at, + running_status=JobExecutionStatus.RUNNING, + ) + .where( + (execution_primary_key == _execution_id(execution)) + & (JobExecution.running_status == JobExecutionStatus.PENDING) + ) + .execute() ) - execution_id = _execution_id(execution) + if claimed: + return JobExecution.get_by_id(_execution_id(execution)) + return None + def _start_worker_for_execution(self, *, job_id: int, execution_id: int) -> None: artifacts = JobArtifacts.for_execution( log_dir=self.log_dir, job_id=job_id, execution_id=execution_id ) @@ -250,8 +326,6 @@ class JobRuntime: log_handle=log_handle, artifacts=artifacts, ) - self._trigger_refresh() - return execution_id def _max_concurrent_jobs_reached(self) -> bool: return ( @@ -282,18 +356,51 @@ class JobRuntime: self._trigger_refresh() return True + def cancel_queued_execution(self, execution_id: int) -> bool: + with self._run_lock: + with database.connection_context(): + execution_primary_key = getattr(JobExecution, "_meta").primary_key + deleted = ( + JobExecution.delete() + .where( + (execution_primary_key == execution_id) + & (JobExecution.running_status == JobExecutionStatus.PENDING) + ) + .execute() + ) + + if not deleted: + return False + + self._trigger_refresh() + return True + def set_job_enabled(self, job_id: int, *, enabled: bool) -> bool: with database.connection_context(): - job = Job.get_or_none(id=job_id) - if job is None: - return False - job.enabled = enabled - job.save() + with database.atomic(): + job = Job.get_or_none(id=job_id) + if job is None: + return False + job.enabled = enabled + job.save() + if not enabled: + ( + JobExecution.delete() + .where( + (JobExecution.job == job) + & ( + JobExecution.running_status + == JobExecutionStatus.PENDING + ) + ) + .execute() + ) self.sync_jobs() self._trigger_refresh() return True def poll_workers(self) -> None: + any_finished = False for execution_id in tuple(self._workers): worker = self._workers[execution_id] self._apply_stats(worker) @@ -315,8 +422,12 @@ class JobRuntime: worker.log_handle.close() del self._workers[execution_id] + any_finished = True self._trigger_refresh() + if any_finished: + self._start_queued_jobs() + def _apply_stats(self, worker: RunningWorker) -> None: if not worker.artifacts.stats_path.exists(): return @@ -451,7 +562,15 @@ def load_runs_view( reference_time = now or datetime.now(UTC) resolved_log_dir = Path(log_dir) with database.connection_context(): + execution_primary_key = getattr(JobExecution, "_meta").primary_key jobs = tuple(Job.select(Job, Source).join(Source).order_by(Source.name.asc())) + queued_executions = tuple( + JobExecution.select(JobExecution, Job, Source) + .join(Job) + .join(Source) + .where(JobExecution.running_status == JobExecutionStatus.PENDING) + .order_by(JobExecution.created_at.asc(), execution_primary_key.asc()) + ) running_executions = tuple( JobExecution.select(JobExecution, Job, Source) .join(Job) @@ -477,16 +596,31 @@ def load_runs_view( ) running_by_job = { - _job_id(execution.job): execution for execution in running_executions + _job_id(cast(Job, execution.job)): execution + for execution in running_executions + } + queued_by_job = { + _job_id(cast(Job, execution.job)): execution + for execution in queued_executions } return { "running": tuple( - _project_running_execution(execution, resolved_log_dir, reference_time) + _project_running_execution( + execution, + resolved_log_dir, + reference_time, + queued_follow_up=queued_by_job.get(_job_id(cast(Job, execution.job))), + ) for execution in running_executions ), + "queued": tuple( + _project_queued_execution(execution, reference_time, position=position) + for position, execution in enumerate(queued_executions, start=1) + ), "upcoming": tuple( _project_upcoming_job(job, running_by_job.get(job.id), reference_time) for job in jobs + if job.id not in queued_by_job ), "completed": tuple( _project_completed_execution(execution, resolved_log_dir, reference_time) @@ -596,7 +730,11 @@ def _scheduler_job_id(job_id: int) -> str: def _project_running_execution( - execution: JobExecution, log_dir: Path, reference_time: datetime + execution: JobExecution, + log_dir: Path, + reference_time: datetime, + *, + queued_follow_up: JobExecution | None = None, ) -> dict[str, object]: job = cast(Job, execution.job) job_id = _job_id(job) @@ -624,7 +762,36 @@ def _project_running_execution( ), "log_href": f"/job/{job_id}/execution/{execution_id}/logs", "log_exists": artifacts.log_path.exists(), - "cancel_post_path": f"/actions/executions/{execution_id}/cancel", + "cancel_label": "Cancel" if queued_follow_up is not None else "Stop", + "cancel_post_path": ( + f"/actions/queued-executions/{_execution_id(queued_follow_up)}/cancel" + if queued_follow_up is not None + else f"/actions/executions/{execution_id}/cancel" + ), + } + + +def _project_queued_execution( + execution: JobExecution, reference_time: datetime, *, position: int +) -> dict[str, object]: + job = cast(Job, execution.job) + queued_at = _coerce_datetime(cast(datetime | str, execution.created_at)) + return { + "source": job.source.name, + "slug": job.source.slug, + "job_id": _job_id(job), + "execution_id": _execution_id(execution), + "queued_at": _humanize_relative_time(reference_time, queued_at), + "queued_at_iso": queued_at.isoformat(), + "queue_position": position, + "status": "Queued", + "status_tone": "idle", + "run_label": "Queued", + "run_disabled": True, + "run_post_path": f"/actions/jobs/{_job_id(job)}/run-now", + "cancel_post_path": ( + f"/actions/queued-executions/{_execution_id(execution)}/cancel" + ), } diff --git a/repub/pages/runs.py b/repub/pages/runs.py index 058f1bf..ca9cce9 100644 --- a/repub/pages/runs.py +++ b/repub/pages/runs.py @@ -87,7 +87,7 @@ def _running_row(execution: Mapping[str, object]) -> tuple[Node, ...]: tone="amber", ), _action_button( - label="Stop", + label=_text(execution, "cancel_label"), tone="danger", post_path=_maybe_text(execution, "cancel_post_path"), ), @@ -95,6 +95,54 @@ def _running_row(execution: Mapping[str, object]) -> tuple[Node, ...]: ) +def _queued_row(execution: Mapping[str, object]) -> tuple[Node, ...]: + queued_at = _maybe_text(execution, "queued_at_iso") + queued_label: Node = h.p(class_="font-medium text-slate-900")[ + _text(execution, "queued_at") + ] + if queued_at is not None: + queued_label = h.time( + { + "data-queued-at": queued_at, + "title": queued_at, + }, + datetime=queued_at, + class_="font-medium text-slate-900", + )[_text(execution, "queued_at")] + + return ( + h.div[ + h.div(class_="font-semibold text-slate-950")[_text(execution, "source")], + h.p(class_="mt-1 font-mono text-xs text-slate-500")[ + _text(execution, "slug") + ], + ], + h.div[ + h.p(class_="font-medium text-slate-900")[ + f"#{_text(execution, 'execution_id')}" + ], + ], + queued_label, + h.div[ + h.p(class_="font-medium text-slate-900")[ + f"#{_text(execution, 'queue_position')}" + ], + ], + _action_button( + label=_text(execution, "run_label"), + disabled=_flag(execution, "run_disabled"), + post_path=_maybe_text(execution, "run_post_path"), + ), + h.div(class_="flex flex-nowrap items-center gap-2")[ + _action_button( + label="Cancel", + tone="danger", + post_path=_maybe_text(execution, "cancel_post_path"), + ) + ], + ) + + def _upcoming_row(job: Mapping[str, object]) -> tuple[Node, ...]: next_run_at = _maybe_text(job, "next_run_at") next_run_label: Node = h.p(class_="font-medium text-slate-900")[ @@ -192,14 +240,17 @@ def _completed_row(execution: Mapping[str, object]) -> tuple[Node, ...]: def runs_page( *, running_executions: tuple[Mapping[str, object], ...] | None = None, + queued_executions: tuple[Mapping[str, object], ...] | None = None, upcoming_jobs: tuple[Mapping[str, object], ...] | None = None, completed_executions: tuple[Mapping[str, object], ...] | None = None, source_count: int = 0, ) -> Renderable: running_items = running_executions or () + queued_items = queued_executions or () upcoming_items = upcoming_jobs or () completed_items = completed_executions or () running_rows = tuple(_running_row(execution) for execution in running_items) + queued_rows = tuple(_queued_row(execution) for execution in queued_items) upcoming_rows = tuple(_upcoming_row(job) for job in upcoming_items) completed_rows = tuple(_completed_row(execution) for execution in completed_items) @@ -227,7 +278,21 @@ def runs_page( ), table_section( eyebrow="Queue", - title="Upcoming jobs", + title="Queued job executions", + empty_message="No queued executions are waiting.", + headers=( + "Source", + "Execution", + "Queued", + "Position", + "Run now", + "Actions", + ), + rows=queued_rows, + ), + table_section( + eyebrow="Schedule", + title="Scheduled jobs", empty_message="No jobs are scheduled.", headers=( "Source", diff --git a/repub/sql/003_job_execution_queue.sql b/repub/sql/003_job_execution_queue.sql new file mode 100644 index 0000000..b8dbb51 --- /dev/null +++ b/repub/sql/003_job_execution_queue.sql @@ -0,0 +1,6 @@ +CREATE INDEX IF NOT EXISTS job_execution_pending_created_at_idx +ON job_execution (running_status, created_at ASC); + +CREATE UNIQUE INDEX IF NOT EXISTS job_execution_pending_unique_job_idx +ON job_execution (job_id) +WHERE running_status = 0; diff --git a/repub/web.py b/repub/web.py index ae8b832..05e0cc4 100644 --- a/repub/web.py +++ b/repub/web.py @@ -276,6 +276,12 @@ def create_app(*, dev_mode: bool = False) -> Quart: trigger_refresh(app) return Response(status=204) + @app.post("/actions/queued-executions//cancel") + async def cancel_queued_execution_action(execution_id: int) -> Response: + get_job_runtime(app).cancel_queued_execution(execution_id) + trigger_refresh(app) + return Response(status=204) + @app.post("/job//execution//logs") async def logs_patch(job_id: int, execution_id: int) -> DatastarResponse: async def render() -> Renderable: @@ -370,6 +376,7 @@ async def render_runs(app: Quart | None = None) -> Renderable: view = load_runs_view(log_dir=app.config["REPUB_LOG_DIR"]) return runs_page( running_executions=cast(tuple[dict[str, object], ...], view["running"]), + queued_executions=cast(tuple[dict[str, object], ...], view["queued"]), upcoming_jobs=cast(tuple[dict[str, object], ...], view["upcoming"]), completed_executions=cast(tuple[dict[str, object], ...], view["completed"]), source_count=len(load_sources()), diff --git a/tests/test_jobs.py b/tests/test_jobs.py index fa3a70d..5450cf1 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -1,6 +1,6 @@ from __future__ import annotations -from datetime import UTC, datetime +from datetime import UTC, datetime, timedelta from pathlib import Path from repub.jobs import load_runs_view @@ -83,3 +83,160 @@ def test_load_runs_view_humanizes_running_execution_summary_bytes( ) assert view["running"][0]["stats"] == "14 requests • 11 items • 1.5 KiB" + + +def test_load_runs_view_projects_queued_executions_in_fifo_order( + tmp_path: Path, +) -> None: + initialize_database(tmp_path / "jobs-queued.db") + first_source = create_source( + name="First queued source", + slug="first-queued-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/first.xml", + ) + second_source = create_source( + name="Second queued source", + slug="second-queued-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/second.xml", + ) + first_job = Job.get(Job.source == first_source) + second_job = Job.get(Job.source == second_source) + reference_time = datetime(2026, 3, 30, 12, 30, tzinfo=UTC) + first_created_at = reference_time - timedelta(minutes=7) + second_created_at = reference_time - timedelta(minutes=3) + first_execution = JobExecution.create( + job=first_job, + created_at=first_created_at, + running_status=JobExecutionStatus.PENDING, + ) + second_execution = JobExecution.create( + job=second_job, + created_at=second_created_at, + running_status=JobExecutionStatus.PENDING, + ) + + view = load_runs_view( + log_dir=tmp_path / "out" / "logs", + now=reference_time, + ) + + assert tuple(row["execution_id"] for row in view["queued"]) == ( + int(first_execution.get_id()), + int(second_execution.get_id()), + ) + assert tuple(row["queue_position"] for row in view["queued"]) == (1, 2) + assert tuple(row["queued_at"] for row in view["queued"]) == ( + "7 minutes ago", + "3 minutes ago", + ) + + +def test_load_runs_view_separates_queued_jobs_from_scheduled_jobs( + tmp_path: Path, +) -> None: + initialize_database(tmp_path / "jobs-queue-separation.db") + queued_source = create_source( + name="Queued source", + slug="queued-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/queued.xml", + ) + scheduled_source = create_source( + name="Scheduled source", + slug="scheduled-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/scheduled.xml", + ) + queued_job = Job.get(Job.source == queued_source) + Job.get(Job.source == scheduled_source) + JobExecution.create( + job=queued_job, + running_status=JobExecutionStatus.PENDING, + ) + + view = load_runs_view( + log_dir=tmp_path / "out" / "logs", + now=datetime(2026, 3, 30, 12, 30, tzinfo=UTC), + ) + + assert tuple(row["slug"] for row in view["queued"]) == ("queued-source",) + assert all(row["slug"] != "queued-source" for row in view["upcoming"]) + assert tuple(row["slug"] for row in view["upcoming"]) == ("scheduled-source",) + assert view["upcoming"][0]["run_reason"] == "Ready" + assert view["upcoming"][0]["run_disabled"] is False + + +def test_load_runs_view_running_row_targets_queued_follow_up_cancel( + tmp_path: Path, +) -> None: + initialize_database(tmp_path / "jobs-running-cancel.db") + source = create_source( + name="Running source", + slug="running-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/running.xml", + ) + job = Job.get(Job.source == source) + JobExecution.create( + job=job, + started_at=datetime(2026, 3, 30, 12, 0, tzinfo=UTC), + running_status=JobExecutionStatus.RUNNING, + ) + pending_execution = JobExecution.create( + job=job, + created_at=datetime(2026, 3, 30, 12, 5, tzinfo=UTC), + running_status=JobExecutionStatus.PENDING, + ) + + view = load_runs_view( + log_dir=tmp_path / "out" / "logs", + now=datetime(2026, 3, 30, 12, 30, tzinfo=UTC), + ) + + running_row = view["running"][0] + assert running_row["cancel_label"] == "Cancel" + assert running_row["cancel_post_path"] == ( + f"/actions/queued-executions/{int(pending_execution.get_id())}/cancel" + ) diff --git a/tests/test_model.py b/tests/test_model.py index 3d0729d..4ff67f6 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -169,6 +169,40 @@ def test_initialize_database_creates_scheduler_and_execution_indexes( connection.close() +def test_initialize_database_creates_run_queue_indexes(tmp_path: Path) -> None: + db_path = tmp_path / "queue-indexes.db" + + initialize_database(db_path) + + connection = sqlite3.connect(db_path) + try: + indexes = { + row[0]: row[1] + for row in connection.execute( + """ + SELECT name, sql + FROM sqlite_master + WHERE type = 'index' + AND name IN ( + 'job_execution_pending_created_at_idx', + 'job_execution_pending_unique_job_idx' + ) + """ + ) + } + assert set(indexes) == { + "job_execution_pending_created_at_idx", + "job_execution_pending_unique_job_idx", + } + assert indexes["job_execution_pending_unique_job_idx"] is not None + assert ( + "WHERE running_status = 0" + in indexes["job_execution_pending_unique_job_idx"] + ) + finally: + connection.close() + + def test_job_table_allows_exactly_one_job_per_source(tmp_path: Path) -> None: initialize_database(tmp_path / "jobs.db") diff --git a/tests/test_scheduler_runtime.py b/tests/test_scheduler_runtime.py index 2af4326..3fda18a 100644 --- a/tests/test_scheduler_runtime.py +++ b/tests/test_scheduler_runtime.py @@ -186,13 +186,24 @@ def test_job_runtime_respects_max_concurrent_jobs_setting(tmp_path: Path) -> Non second_execution_id = runtime.run_job_now(second_job.id, reason="manual") - assert second_execution_id is None + assert second_execution_id is not None + second_execution = _wait_for_execution_status( + second_execution_id, + JobExecutionStatus.PENDING, + ) assert ( JobExecution.select() .where(JobExecution.running_status == JobExecutionStatus.RUNNING) .count() == 1 ) + assert second_execution.started_at is None + assert ( + JobExecution.select() + .where(JobExecution.running_status == JobExecutionStatus.PENDING) + .count() + == 1 + ) runtime.request_execution_cancel(first_execution_id) finished_execution = _wait_for_terminal_execution(first_execution_id) assert finished_execution.running_status == JobExecutionStatus.CANCELED @@ -200,6 +211,332 @@ def test_job_runtime_respects_max_concurrent_jobs_setting(tmp_path: Path) -> Non runtime.shutdown() +def test_job_runtime_starts_queued_execution_after_capacity_opens( + tmp_path: Path, +) -> None: + db_path = tmp_path / "drain-queue.db" + log_dir = tmp_path / "out" / "logs" + initialize_database(db_path) + save_setting("max_concurrent_jobs", 1) + + with _slow_feed_server() as feed_url: + first_source = create_source( + name="First source", + slug="first-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url=feed_url, + ) + second_source = create_source( + name="Second source", + slug="second-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url=FIXTURE_FEED_PATH.as_uri(), + ) + first_job = Job.get(Job.source == first_source) + second_job = Job.get(Job.source == second_source) + + runtime = JobRuntime(log_dir=log_dir) + try: + runtime.start() + first_execution_id = runtime.run_job_now(first_job.id, reason="manual") + assert first_execution_id is not None + _wait_for_running_execution(first_execution_id) + + second_execution_id = runtime.run_job_now(second_job.id, reason="manual") + assert second_execution_id is not None + _wait_for_execution_status(second_execution_id, JobExecutionStatus.PENDING) + + runtime.request_execution_cancel(first_execution_id) + finished_execution = _wait_for_terminal_execution(first_execution_id) + assert finished_execution.running_status == JobExecutionStatus.CANCELED + + _wait_for_running_execution(second_execution_id) + drained_execution = _wait_for_terminal_execution(second_execution_id) + assert drained_execution.running_status == JobExecutionStatus.SUCCEEDED + assert drained_execution.started_at is not None + finally: + runtime.shutdown() + + +def test_job_runtime_deduplicates_manual_queue_requests(tmp_path: Path) -> None: + db_path = tmp_path / "queue-dedup.db" + log_dir = tmp_path / "out" / "logs" + initialize_database(db_path) + save_setting("max_concurrent_jobs", 1) + + with _slow_feed_server() as feed_url: + blocking_source = create_source( + name="Blocking source", + slug="blocking-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url=feed_url, + ) + queued_source = create_source( + name="Queued source", + slug="queued-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/queued.xml", + ) + blocking_job = Job.get(Job.source == blocking_source) + queued_job = Job.get(Job.source == queued_source) + + runtime = JobRuntime(log_dir=log_dir) + try: + runtime.start() + blocking_execution_id = runtime.run_job_now( + blocking_job.id, reason="manual" + ) + assert blocking_execution_id is not None + _wait_for_running_execution(blocking_execution_id) + + first_pending_id = runtime.run_job_now(queued_job.id, reason="manual") + second_pending_id = runtime.run_job_now(queued_job.id, reason="manual") + + assert first_pending_id is not None + assert second_pending_id == first_pending_id + assert ( + JobExecution.select() + .where( + (JobExecution.job == queued_job) + & (JobExecution.running_status == JobExecutionStatus.PENDING) + ) + .count() + == 1 + ) + finally: + runtime.shutdown() + + +def test_job_runtime_allows_one_running_and_one_pending_per_job( + tmp_path: Path, +) -> None: + db_path = tmp_path / "running-plus-pending.db" + log_dir = tmp_path / "out" / "logs" + initialize_database(db_path) + save_setting("max_concurrent_jobs", 1) + + with _slow_feed_server() as feed_url: + source = create_source( + name="Busy source", + slug="busy-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url=feed_url, + ) + job = Job.get(Job.source == source) + + runtime = JobRuntime(log_dir=log_dir) + try: + runtime.start() + running_execution_id = runtime.run_job_now(job.id, reason="manual") + assert running_execution_id is not None + _wait_for_running_execution(running_execution_id) + + pending_execution_id = runtime.run_job_now(job.id, reason="manual") + duplicate_pending_id = runtime.run_job_now(job.id, reason="manual") + runtime.run_scheduled_job(job.id) + + assert pending_execution_id is not None + assert duplicate_pending_id == pending_execution_id + assert ( + JobExecution.select() + .where(JobExecution.job == job) + .where(JobExecution.running_status == JobExecutionStatus.RUNNING) + .count() + == 1 + ) + assert ( + JobExecution.select() + .where(JobExecution.job == job) + .where(JobExecution.running_status == JobExecutionStatus.PENDING) + .count() + == 1 + ) + finally: + runtime.shutdown() + + +def test_job_runtime_start_drains_pending_rows_created_before_start( + tmp_path: Path, +) -> None: + db_path = tmp_path / "startup-drain.db" + log_dir = tmp_path / "out" / "logs" + initialize_database(db_path) + source = create_source( + name="Queued source", + slug="queued-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url=FIXTURE_FEED_PATH.as_uri(), + ) + job = Job.get(Job.source == source) + pending_execution = JobExecution.create( + job=job, + running_status=JobExecutionStatus.PENDING, + ) + + runtime = JobRuntime(log_dir=log_dir) + try: + runtime.start() + _wait_for_running_execution(int(pending_execution.get_id())) + drained_execution = _wait_for_terminal_execution( + int(pending_execution.get_id()) + ) + + assert drained_execution.running_status == JobExecutionStatus.SUCCEEDED + assert drained_execution.started_at is not None + finally: + runtime.shutdown() + + +def test_job_runtime_scheduled_runs_use_the_persistent_queue( + tmp_path: Path, +) -> None: + db_path = tmp_path / "scheduled-queue.db" + log_dir = tmp_path / "out" / "logs" + initialize_database(db_path) + save_setting("max_concurrent_jobs", 1) + + with _slow_feed_server() as feed_url: + first_source = create_source( + name="First scheduled source", + slug="first-scheduled-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url=feed_url, + ) + second_source = create_source( + name="Second scheduled source", + slug="second-scheduled-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/second-scheduled.xml", + ) + first_job = Job.get(Job.source == first_source) + second_job = Job.get(Job.source == second_source) + + runtime = JobRuntime(log_dir=log_dir) + try: + runtime.start() + runtime.run_scheduled_job(first_job.id) + first_execution = JobExecution.get(JobExecution.job == first_job) + _wait_for_running_execution(int(first_execution.get_id())) + + runtime.run_scheduled_job(second_job.id) + second_execution = JobExecution.get(JobExecution.job == second_job) + + assert second_execution.running_status == JobExecutionStatus.PENDING + assert second_execution.started_at is None + finally: + runtime.shutdown() + + +def test_job_runtime_cancel_pending_follow_up_keeps_running_worker_alive( + tmp_path: Path, +) -> None: + db_path = tmp_path / "cancel-pending.db" + log_dir = tmp_path / "out" / "logs" + initialize_database(db_path) + save_setting("max_concurrent_jobs", 1) + + with _slow_feed_server() as feed_url: + source = create_source( + name="Cancelable queued source", + slug="cancelable-queued-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url=feed_url, + ) + job = Job.get(Job.source == source) + + runtime = JobRuntime(log_dir=log_dir) + try: + runtime.start() + running_execution_id = runtime.run_job_now(job.id, reason="manual") + assert running_execution_id is not None + _wait_for_running_execution(running_execution_id) + + pending_execution_id = runtime.run_job_now(job.id, reason="manual") + assert pending_execution_id is not None + _wait_for_execution_status(pending_execution_id, JobExecutionStatus.PENDING) + + assert runtime.cancel_queued_execution(pending_execution_id) is True + assert JobExecution.get_or_none(id=pending_execution_id) is None + assert ( + JobExecution.get_by_id(running_execution_id).running_status + == JobExecutionStatus.RUNNING + ) + finally: + runtime.shutdown() + + def test_job_runtime_cancel_marks_execution_canceled(tmp_path: Path) -> None: initialize_database(tmp_path / "cancel.db") with _slow_feed_server() as feed_url: @@ -571,7 +908,7 @@ def test_render_runs_uses_database_backed_jobs_and_executions( assert "runs-page-source" in body assert "Running job executions" in body - assert "Upcoming jobs" in body + assert "Scheduled jobs" in body assert "Completed job executions" in body assert f"/job/{job.id}/execution/{execution.get_id()}/logs" in body assert "Succeeded" in body @@ -719,6 +1056,21 @@ def _wait_for_running_execution( raise AssertionError(f"execution {execution_id} never entered RUNNING state") +def _wait_for_execution_status( + execution_id: int, + status: JobExecutionStatus, + *, + timeout_seconds: float = 2.0, +) -> JobExecution: + deadline = time.monotonic() + timeout_seconds + while time.monotonic() < deadline: + execution = JobExecution.get_by_id(execution_id) + if execution.running_status == status: + return execution + time.sleep(0.02) + raise AssertionError(f"execution {execution_id} never entered {status.name}") + + def _wait_for_terminal_execution( execution_id: int, *, timeout_seconds: float = 4.0 ) -> JobExecution: diff --git a/tests/test_web.py b/tests/test_web.py index 70a5bb5..da4daef 100644 --- a/tests/test_web.py +++ b/tests/test_web.py @@ -92,6 +92,35 @@ def test_runs_page_renders_completed_execution_end_time_as_relative_hoverable_ti assert ">2 hours ago<" in body +def test_runs_page_renders_queued_execution_table() -> None: + body = str( + runs_page( + queued_executions=( + { + "source": "Queued source", + "slug": "queued-source", + "job_id": 7, + "execution_id": 42, + "queued_at": "2 minutes ago", + "queued_at_iso": "2026-03-30T12:28:00+00:00", + "queue_position": 1, + "status": "Queued", + "status_tone": "idle", + "run_label": "Queued", + "run_disabled": True, + "run_post_path": "/actions/jobs/7/run-now", + "cancel_post_path": "/actions/queued-executions/42/cancel", + }, + ) + ) + ) + + assert "Queued job executions" in body + assert "queued-source" in body + assert ">Queued<" in body + assert "/actions/queued-executions/42/cancel" in body + + def test_root_get_serves_datastar_shim() -> None: async def run() -> None: client = create_app().test_client() @@ -1069,7 +1098,8 @@ def test_render_runs_shows_running_upcoming_and_completed_tables( body = str(await render_runs(app)) assert "Running job executions" in body - assert "Upcoming jobs" in body + assert "Queued job executions" in body + assert "Scheduled jobs" in body assert "Completed job executions" in body assert "runs-render-source" in body assert f"/job/{job.id}/execution/{execution.get_id()}/logs" in body @@ -1089,12 +1119,214 @@ def test_render_runs_shows_empty_state_rows(monkeypatch, tmp_path: Path) -> None body = str(await render_runs(app)) assert body.count("No job executions are running.") == 1 + assert "No queued executions are waiting." in body assert "No jobs are scheduled." in body assert "No job executions have completed yet." in body asyncio.run(run()) +def test_render_runs_shows_queued_execution_separately_from_scheduled_jobs( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "runs-queued-render.db" + log_dir = tmp_path / "out" / "logs" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + app = create_app() + app.config["REPUB_LOG_DIR"] = log_dir + + queued_source = create_source( + name="Queued source", + slug="queued-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/queued.xml", + ) + create_source( + name="Scheduled source", + slug="scheduled-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/scheduled.xml", + ) + queued_job = Job.get(Job.source == queued_source) + queued_execution = JobExecution.create( + job=queued_job, + running_status=JobExecutionStatus.PENDING, + ) + + async def run() -> None: + body = str(await render_runs(app)) + + assert "Queued job executions" in body + assert "Scheduled jobs" in body + assert "queued-source" in body + assert "scheduled-source" in body + assert ( + f"/actions/queued-executions/{int(queued_execution.get_id())}/cancel" + in body + ) + + asyncio.run(run()) + + +def test_render_runs_shows_cancel_button_for_running_row_with_queued_follow_up( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "runs-cancel-follow-up.db" + log_dir = tmp_path / "out" / "logs" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + app = create_app() + app.config["REPUB_LOG_DIR"] = log_dir + + source = create_source( + name="Busy source", + slug="busy-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/busy.xml", + ) + job = Job.get(Job.source == source) + running_execution = JobExecution.create( + job=job, + started_at=datetime(2026, 3, 30, 12, 0, tzinfo=UTC), + running_status=JobExecutionStatus.RUNNING, + ) + pending_execution = JobExecution.create( + job=job, + running_status=JobExecutionStatus.PENDING, + ) + + async def run() -> None: + body = str(await render_runs(app)) + + assert f"/job/{job.id}/execution/{int(running_execution.get_id())}/logs" in body + assert ( + f"/actions/queued-executions/{int(pending_execution.get_id())}/cancel" + in body + ) + assert ">Cancel<" in body + + asyncio.run(run()) + + +def test_cancel_queued_execution_action_deletes_pending_row_without_touching_running_execution( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "cancel-queued-action.db" + log_dir = tmp_path / "out" / "logs" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + + async def run() -> None: + app = create_app() + app.config["REPUB_LOG_DIR"] = log_dir + client = app.test_client() + + source = create_source( + name="Busy source", + slug="busy-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/busy.xml", + ) + job = Job.get(Job.source == source) + running_execution = JobExecution.create( + job=job, + started_at=datetime(2026, 3, 30, 12, 0, tzinfo=UTC), + running_status=JobExecutionStatus.RUNNING, + ) + pending_execution = JobExecution.create( + job=job, + running_status=JobExecutionStatus.PENDING, + ) + + response = await client.post( + f"/actions/queued-executions/{int(pending_execution.get_id())}/cancel" + ) + + assert response.status_code == 204 + assert JobExecution.get_or_none(id=int(pending_execution.get_id())) is None + assert ( + JobExecution.get_by_id(int(running_execution.get_id())).running_status + == JobExecutionStatus.RUNNING + ) + + asyncio.run(run()) + + +def test_toggle_job_enabled_action_removes_queued_execution( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "toggle-removes-queue.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + + async def run() -> None: + app = create_app() + client = app.test_client() + + source = create_source( + name="Queued source", + slug="queued-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/queued.xml", + ) + job = Job.get(Job.source == source) + queued_execution = JobExecution.create( + job=job, + running_status=JobExecutionStatus.PENDING, + ) + + response = await client.post(f"/actions/jobs/{job.id}/toggle-enabled") + + assert response.status_code == 204 + assert Job.get_by_id(job.id).enabled is False + assert JobExecution.get_or_none(id=int(queued_execution.get_id())) is None + body = str(await render_runs(app)) + assert ( + f"/actions/queued-executions/{int(queued_execution.get_id())}/cancel" + not in body + ) + assert "Disabled" in body + + asyncio.run(run()) + + def test_render_execution_logs_uses_app_route(monkeypatch, tmp_path: Path) -> None: db_path = tmp_path / "logs-render.db" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))