Add persistent job run queue

This commit is contained in:
Abel Luck 2026-03-31 09:24:46 +02:00
parent 2bd0651478
commit 0b3b1b2731
8 changed files with 1047 additions and 27 deletions

View file

@ -14,6 +14,7 @@ from typing import Callable, TextIO, cast
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from peewee import IntegrityError
from repub.config import feed_output_dir, feed_output_path
from repub.model import (
@ -143,6 +144,7 @@ class JobRuntime:
)
self.sync_jobs()
self._started = True
self._start_queued_jobs()
def shutdown(self) -> None:
for execution_id in tuple(self._workers):
@ -183,20 +185,84 @@ class JobRuntime:
self.scheduler.remove_job(scheduled_job.id)
def run_scheduled_job(self, job_id: int) -> None:
self.run_job_now(job_id, reason="scheduled")
self.enqueue_job_run(job_id, reason="scheduled")
def run_job_now(self, job_id: int, *, reason: str) -> int | None:
return self.enqueue_job_run(job_id, reason=reason)
def enqueue_job_run(self, job_id: int, *, reason: str) -> int | None:
del reason
self.start()
with self._run_lock:
with database.connection_context():
execution_id = self._enqueue_job_run_locked(job_id)
self._start_queued_jobs_locked()
if execution_id is not None:
self._trigger_refresh()
return execution_id
def _enqueue_job_run_locked(self, job_id: int) -> int | None:
with database.connection_context():
with database.atomic():
job = Job.get_or_none(id=job_id)
if job is None:
return None
if self._max_concurrent_jobs_reached():
return None
pending_execution = JobExecution.get_or_none(
(JobExecution.job == job)
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
if pending_execution is not None:
return _execution_id(pending_execution)
try:
execution = JobExecution.create(
job=job,
running_status=JobExecutionStatus.PENDING,
)
except IntegrityError:
pending_execution = JobExecution.get_or_none(
(JobExecution.job == job)
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
return (
_execution_id(pending_execution)
if pending_execution is not None
else None
)
return _execution_id(execution)
def _start_queued_jobs(self) -> None:
with self._run_lock:
self._start_queued_jobs_locked()
def _start_queued_jobs_locked(self) -> None:
while True:
if self._max_concurrent_jobs_reached():
return
claimed_execution = self._claim_next_pending_execution()
if claimed_execution is None:
return
job = cast(Job, claimed_execution.job)
self._start_worker_for_execution(
job_id=_job_id(job),
execution_id=_execution_id(claimed_execution),
)
def _claim_next_pending_execution(self) -> JobExecution | None:
with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
pending_executions = tuple(
JobExecution.select(JobExecution, Job)
.join(Job)
.where(JobExecution.running_status == JobExecutionStatus.PENDING)
.order_by(JobExecution.created_at.asc(), execution_primary_key.asc())
)
for execution in pending_executions:
job = cast(Job, execution.job)
already_running = (
JobExecution.select()
.where(
@ -206,15 +272,25 @@ class JobRuntime:
.exists()
)
if already_running:
return None
continue
execution = JobExecution.create(
job=job,
started_at=utc_now(),
running_status=JobExecutionStatus.RUNNING,
started_at = utc_now()
claimed = (
JobExecution.update(
started_at=started_at,
running_status=JobExecutionStatus.RUNNING,
)
.where(
(execution_primary_key == _execution_id(execution))
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
.execute()
)
execution_id = _execution_id(execution)
if claimed:
return JobExecution.get_by_id(_execution_id(execution))
return None
def _start_worker_for_execution(self, *, job_id: int, execution_id: int) -> None:
artifacts = JobArtifacts.for_execution(
log_dir=self.log_dir, job_id=job_id, execution_id=execution_id
)
@ -250,8 +326,6 @@ class JobRuntime:
log_handle=log_handle,
artifacts=artifacts,
)
self._trigger_refresh()
return execution_id
def _max_concurrent_jobs_reached(self) -> bool:
return (
@ -282,18 +356,51 @@ class JobRuntime:
self._trigger_refresh()
return True
def cancel_queued_execution(self, execution_id: int) -> bool:
with self._run_lock:
with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
deleted = (
JobExecution.delete()
.where(
(execution_primary_key == execution_id)
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
.execute()
)
if not deleted:
return False
self._trigger_refresh()
return True
def set_job_enabled(self, job_id: int, *, enabled: bool) -> bool:
with database.connection_context():
job = Job.get_or_none(id=job_id)
if job is None:
return False
job.enabled = enabled
job.save()
with database.atomic():
job = Job.get_or_none(id=job_id)
if job is None:
return False
job.enabled = enabled
job.save()
if not enabled:
(
JobExecution.delete()
.where(
(JobExecution.job == job)
& (
JobExecution.running_status
== JobExecutionStatus.PENDING
)
)
.execute()
)
self.sync_jobs()
self._trigger_refresh()
return True
def poll_workers(self) -> None:
any_finished = False
for execution_id in tuple(self._workers):
worker = self._workers[execution_id]
self._apply_stats(worker)
@ -315,8 +422,12 @@ class JobRuntime:
worker.log_handle.close()
del self._workers[execution_id]
any_finished = True
self._trigger_refresh()
if any_finished:
self._start_queued_jobs()
def _apply_stats(self, worker: RunningWorker) -> None:
if not worker.artifacts.stats_path.exists():
return
@ -451,7 +562,15 @@ def load_runs_view(
reference_time = now or datetime.now(UTC)
resolved_log_dir = Path(log_dir)
with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
jobs = tuple(Job.select(Job, Source).join(Source).order_by(Source.name.asc()))
queued_executions = tuple(
JobExecution.select(JobExecution, Job, Source)
.join(Job)
.join(Source)
.where(JobExecution.running_status == JobExecutionStatus.PENDING)
.order_by(JobExecution.created_at.asc(), execution_primary_key.asc())
)
running_executions = tuple(
JobExecution.select(JobExecution, Job, Source)
.join(Job)
@ -477,16 +596,31 @@ def load_runs_view(
)
running_by_job = {
_job_id(execution.job): execution for execution in running_executions
_job_id(cast(Job, execution.job)): execution
for execution in running_executions
}
queued_by_job = {
_job_id(cast(Job, execution.job)): execution
for execution in queued_executions
}
return {
"running": tuple(
_project_running_execution(execution, resolved_log_dir, reference_time)
_project_running_execution(
execution,
resolved_log_dir,
reference_time,
queued_follow_up=queued_by_job.get(_job_id(cast(Job, execution.job))),
)
for execution in running_executions
),
"queued": tuple(
_project_queued_execution(execution, reference_time, position=position)
for position, execution in enumerate(queued_executions, start=1)
),
"upcoming": tuple(
_project_upcoming_job(job, running_by_job.get(job.id), reference_time)
for job in jobs
if job.id not in queued_by_job
),
"completed": tuple(
_project_completed_execution(execution, resolved_log_dir, reference_time)
@ -596,7 +730,11 @@ def _scheduler_job_id(job_id: int) -> str:
def _project_running_execution(
execution: JobExecution, log_dir: Path, reference_time: datetime
execution: JobExecution,
log_dir: Path,
reference_time: datetime,
*,
queued_follow_up: JobExecution | None = None,
) -> dict[str, object]:
job = cast(Job, execution.job)
job_id = _job_id(job)
@ -624,7 +762,36 @@ def _project_running_execution(
),
"log_href": f"/job/{job_id}/execution/{execution_id}/logs",
"log_exists": artifacts.log_path.exists(),
"cancel_post_path": f"/actions/executions/{execution_id}/cancel",
"cancel_label": "Cancel" if queued_follow_up is not None else "Stop",
"cancel_post_path": (
f"/actions/queued-executions/{_execution_id(queued_follow_up)}/cancel"
if queued_follow_up is not None
else f"/actions/executions/{execution_id}/cancel"
),
}
def _project_queued_execution(
execution: JobExecution, reference_time: datetime, *, position: int
) -> dict[str, object]:
job = cast(Job, execution.job)
queued_at = _coerce_datetime(cast(datetime | str, execution.created_at))
return {
"source": job.source.name,
"slug": job.source.slug,
"job_id": _job_id(job),
"execution_id": _execution_id(execution),
"queued_at": _humanize_relative_time(reference_time, queued_at),
"queued_at_iso": queued_at.isoformat(),
"queue_position": position,
"status": "Queued",
"status_tone": "idle",
"run_label": "Queued",
"run_disabled": True,
"run_post_path": f"/actions/jobs/{_job_id(job)}/run-now",
"cancel_post_path": (
f"/actions/queued-executions/{_execution_id(execution)}/cancel"
),
}

View file

@ -87,7 +87,7 @@ def _running_row(execution: Mapping[str, object]) -> tuple[Node, ...]:
tone="amber",
),
_action_button(
label="Stop",
label=_text(execution, "cancel_label"),
tone="danger",
post_path=_maybe_text(execution, "cancel_post_path"),
),
@ -95,6 +95,54 @@ def _running_row(execution: Mapping[str, object]) -> tuple[Node, ...]:
)
def _queued_row(execution: Mapping[str, object]) -> tuple[Node, ...]:
queued_at = _maybe_text(execution, "queued_at_iso")
queued_label: Node = h.p(class_="font-medium text-slate-900")[
_text(execution, "queued_at")
]
if queued_at is not None:
queued_label = h.time(
{
"data-queued-at": queued_at,
"title": queued_at,
},
datetime=queued_at,
class_="font-medium text-slate-900",
)[_text(execution, "queued_at")]
return (
h.div[
h.div(class_="font-semibold text-slate-950")[_text(execution, "source")],
h.p(class_="mt-1 font-mono text-xs text-slate-500")[
_text(execution, "slug")
],
],
h.div[
h.p(class_="font-medium text-slate-900")[
f"#{_text(execution, 'execution_id')}"
],
],
queued_label,
h.div[
h.p(class_="font-medium text-slate-900")[
f"#{_text(execution, 'queue_position')}"
],
],
_action_button(
label=_text(execution, "run_label"),
disabled=_flag(execution, "run_disabled"),
post_path=_maybe_text(execution, "run_post_path"),
),
h.div(class_="flex flex-nowrap items-center gap-2")[
_action_button(
label="Cancel",
tone="danger",
post_path=_maybe_text(execution, "cancel_post_path"),
)
],
)
def _upcoming_row(job: Mapping[str, object]) -> tuple[Node, ...]:
next_run_at = _maybe_text(job, "next_run_at")
next_run_label: Node = h.p(class_="font-medium text-slate-900")[
@ -192,14 +240,17 @@ def _completed_row(execution: Mapping[str, object]) -> tuple[Node, ...]:
def runs_page(
*,
running_executions: tuple[Mapping[str, object], ...] | None = None,
queued_executions: tuple[Mapping[str, object], ...] | None = None,
upcoming_jobs: tuple[Mapping[str, object], ...] | None = None,
completed_executions: tuple[Mapping[str, object], ...] | None = None,
source_count: int = 0,
) -> Renderable:
running_items = running_executions or ()
queued_items = queued_executions or ()
upcoming_items = upcoming_jobs or ()
completed_items = completed_executions or ()
running_rows = tuple(_running_row(execution) for execution in running_items)
queued_rows = tuple(_queued_row(execution) for execution in queued_items)
upcoming_rows = tuple(_upcoming_row(job) for job in upcoming_items)
completed_rows = tuple(_completed_row(execution) for execution in completed_items)
@ -227,7 +278,21 @@ def runs_page(
),
table_section(
eyebrow="Queue",
title="Upcoming jobs",
title="Queued job executions",
empty_message="No queued executions are waiting.",
headers=(
"Source",
"Execution",
"Queued",
"Position",
"Run now",
"Actions",
),
rows=queued_rows,
),
table_section(
eyebrow="Schedule",
title="Scheduled jobs",
empty_message="No jobs are scheduled.",
headers=(
"Source",

View file

@ -0,0 +1,6 @@
CREATE INDEX IF NOT EXISTS job_execution_pending_created_at_idx
ON job_execution (running_status, created_at ASC);
CREATE UNIQUE INDEX IF NOT EXISTS job_execution_pending_unique_job_idx
ON job_execution (job_id)
WHERE running_status = 0;

View file

@ -276,6 +276,12 @@ def create_app(*, dev_mode: bool = False) -> Quart:
trigger_refresh(app)
return Response(status=204)
@app.post("/actions/queued-executions/<int:execution_id>/cancel")
async def cancel_queued_execution_action(execution_id: int) -> Response:
get_job_runtime(app).cancel_queued_execution(execution_id)
trigger_refresh(app)
return Response(status=204)
@app.post("/job/<int:job_id>/execution/<int:execution_id>/logs")
async def logs_patch(job_id: int, execution_id: int) -> DatastarResponse:
async def render() -> Renderable:
@ -370,6 +376,7 @@ async def render_runs(app: Quart | None = None) -> Renderable:
view = load_runs_view(log_dir=app.config["REPUB_LOG_DIR"])
return runs_page(
running_executions=cast(tuple[dict[str, object], ...], view["running"]),
queued_executions=cast(tuple[dict[str, object], ...], view["queued"]),
upcoming_jobs=cast(tuple[dict[str, object], ...], view["upcoming"]),
completed_executions=cast(tuple[dict[str, object], ...], view["completed"]),
source_count=len(load_sources()),