republisher/repub/jobs.py

1389 lines
47 KiB
Python
Raw Normal View History

2026-03-30 14:02:39 +02:00
from __future__ import annotations
import json
import math
2026-03-30 15:53:04 +02:00
import os
import signal
2026-03-30 14:02:39 +02:00
import subprocess
import sys
2026-03-30 18:26:02 +02:00
import threading
2026-03-30 15:53:04 +02:00
import time
2026-03-30 14:02:39 +02:00
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Callable, TextIO, TypedDict, cast
2026-03-30 14:02:39 +02:00
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
2026-03-31 09:24:46 +02:00
from peewee import IntegrityError
2026-03-30 14:02:39 +02:00
2026-03-30 15:21:39 +02:00
from repub.config import feed_output_dir, feed_output_path
2026-03-30 18:26:02 +02:00
from repub.model import (
Job,
JobExecution,
JobExecutionStatus,
Source,
database,
load_max_concurrent_jobs,
utc_now,
)
2026-03-30 14:02:39 +02:00
SCHEDULER_JOB_PREFIX = "job-"
POLL_JOB_ID = "runtime-poll-workers"
SYNC_JOB_ID = "runtime-sync-jobs"
COMPLETED_EXECUTION_PAGE_SIZE = 20
2026-03-30 14:02:39 +02:00
@dataclass(frozen=True)
class JobArtifacts:
log_path: Path
stats_path: Path
@classmethod
def for_execution(
cls, *, log_dir: Path, job_id: int, execution_id: int
) -> "JobArtifacts":
prefix = f"job-{job_id}-execution-{execution_id}"
return cls(
log_path=log_dir / f"{prefix}.log",
stats_path=log_dir / f"{prefix}.jsonl",
)
@dataclass
class RunningWorker:
execution_id: int
2026-03-30 15:53:04 +02:00
process: subprocess.Popen[str] | DetachedProcess
2026-03-30 14:02:39 +02:00
log_handle: TextIO
artifacts: JobArtifacts
stats_offset: int = 0
2026-03-30 15:53:04 +02:00
detached: bool = False
@dataclass(frozen=True)
class DetachedProcess:
pid: int
def poll(self) -> int | None:
return None if _pid_is_running(self.pid) else 0
def terminate(self) -> None:
_send_signal(self.pid, signal.SIGTERM)
def kill(self) -> None:
_send_signal(self.pid, signal.SIGKILL)
def wait(self, timeout: float | None = None) -> int:
deadline = None if timeout is None else time.monotonic() + timeout
while self.poll() is None:
if deadline is not None and time.monotonic() >= deadline:
raise subprocess.TimeoutExpired(
cmd=f"pid {self.pid}",
timeout=timeout if timeout is not None else 0.0,
)
time.sleep(0.05)
return 0
@dataclass(frozen=True)
class LiveWorker:
job_id: int
execution_id: int
pid: int
2026-03-30 14:02:39 +02:00
@dataclass(frozen=True)
class ExecutionLogView:
job_id: int
execution_id: int
title: str
description: str
status_label: str
status_tone: str
log_text: str
error_message: str | None = None
class RunsView(TypedDict):
running: tuple[dict[str, object], ...]
queued: tuple[dict[str, object], ...]
upcoming: tuple[dict[str, object], ...]
completed: tuple[dict[str, object], ...]
completed_page: int
completed_page_size: int
completed_total_count: int
completed_total_pages: int
2026-03-30 14:02:39 +02:00
class JobRuntime:
def __init__(
self,
*,
log_dir: str | Path,
refresh_callback: Callable[[object], None] | None = None,
2026-03-30 14:02:39 +02:00
graceful_stop_seconds: float = 15.0,
) -> None:
self.log_dir = Path(log_dir)
self.refresh_callback = refresh_callback
self.graceful_stop_seconds = graceful_stop_seconds
self.scheduler = BackgroundScheduler(timezone=UTC)
self._workers: dict[int, RunningWorker] = {}
2026-03-30 18:26:02 +02:00
self._run_lock = threading.Lock()
2026-03-30 14:02:39 +02:00
self._started = False
self._last_runtime_refresh_at = 0.0
2026-03-30 14:02:39 +02:00
def start(self) -> None:
if self._started:
return
2026-03-30 14:18:55 +02:00
self._reconcile_stale_executions()
2026-03-30 14:02:39 +02:00
self.scheduler.start()
self.scheduler.add_job(
self.poll_workers,
"interval",
id=POLL_JOB_ID,
seconds=0.25,
replace_existing=True,
max_instances=1,
coalesce=True,
)
self.scheduler.add_job(
self.sync_jobs,
"interval",
id=SYNC_JOB_ID,
seconds=1,
replace_existing=True,
max_instances=1,
coalesce=True,
)
self.sync_jobs()
self._started = True
2026-03-31 09:24:46 +02:00
self._start_queued_jobs()
2026-03-30 14:02:39 +02:00
def shutdown(self) -> None:
for execution_id in tuple(self._workers):
worker = self._workers.pop(execution_id)
if worker.process.poll() is None:
worker.process.kill()
worker.process.wait(timeout=2)
worker.log_handle.close()
if self._started:
self.scheduler.shutdown(wait=False)
self._started = False
def sync_jobs(self) -> None:
with database.connection_context():
jobs = tuple(Job.select().where(Job.enabled == True)) # noqa: E712
desired_ids = set()
for job in jobs:
scheduler_job_id = _scheduler_job_id(_job_id(job))
desired_ids.add(scheduler_job_id)
self.scheduler.add_job(
self.run_scheduled_job,
trigger=_job_trigger(job),
args=(_job_id(job),),
id=scheduler_job_id,
replace_existing=True,
max_instances=1,
coalesce=True,
misfire_grace_time=1,
)
for scheduled_job in tuple(self.scheduler.get_jobs()):
if (
scheduled_job.id.startswith(SCHEDULER_JOB_PREFIX)
and scheduled_job.id not in desired_ids
):
self.scheduler.remove_job(scheduled_job.id)
def run_scheduled_job(self, job_id: int) -> None:
2026-03-31 09:24:46 +02:00
self.enqueue_job_run(job_id, reason="scheduled")
2026-03-30 14:02:39 +02:00
def run_job_now(self, job_id: int, *, reason: str) -> int | None:
2026-03-31 09:24:46 +02:00
return self.enqueue_job_run(job_id, reason=reason)
def enqueue_job_run(self, job_id: int, *, reason: str) -> int | None:
2026-03-30 14:02:39 +02:00
del reason
self.start()
2026-03-30 18:26:02 +02:00
with self._run_lock:
2026-03-31 09:24:46 +02:00
execution_id = self._enqueue_job_run_locked(job_id)
self._start_queued_jobs_locked()
if execution_id is not None:
self._trigger_refresh()
return execution_id
def _enqueue_job_run_locked(self, job_id: int) -> int | None:
with database.connection_context():
with database.atomic():
2026-03-30 18:26:02 +02:00
job = Job.get_or_none(id=job_id)
if job is None:
return None
2026-03-31 09:24:46 +02:00
pending_execution = JobExecution.get_or_none(
(JobExecution.job == job)
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
if pending_execution is not None:
return _execution_id(pending_execution)
try:
execution = JobExecution.create(
job=job,
running_status=JobExecutionStatus.PENDING,
)
except IntegrityError:
pending_execution = JobExecution.get_or_none(
(JobExecution.job == job)
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
return (
_execution_id(pending_execution)
if pending_execution is not None
else None
)
return _execution_id(execution)
def _start_queued_jobs(self) -> None:
with self._run_lock:
self._start_queued_jobs_locked()
def _start_queued_jobs_locked(self) -> None:
while True:
if self._max_concurrent_jobs_reached():
return
2026-03-30 18:26:02 +02:00
2026-03-31 09:24:46 +02:00
claimed_execution = self._claim_next_pending_execution()
if claimed_execution is None:
return
job = cast(Job, claimed_execution.job)
self._start_worker_for_execution(
job_id=_job_id(job),
execution_id=_execution_id(claimed_execution),
)
def _claim_next_pending_execution(self) -> JobExecution | None:
with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
pending_executions = tuple(
JobExecution.select(JobExecution, Job)
.join(Job)
.where(JobExecution.running_status == JobExecutionStatus.PENDING)
.order_by(JobExecution.created_at.asc(), execution_primary_key.asc())
)
for execution in pending_executions:
job = cast(Job, execution.job)
2026-03-30 18:26:02 +02:00
already_running = (
JobExecution.select()
.where(
(JobExecution.job == job)
& (JobExecution.running_status == JobExecutionStatus.RUNNING)
)
.exists()
2026-03-30 14:02:39 +02:00
)
2026-03-30 18:26:02 +02:00
if already_running:
2026-03-31 09:24:46 +02:00
continue
2026-03-30 14:02:39 +02:00
2026-03-31 09:24:46 +02:00
started_at = utc_now()
claimed = (
JobExecution.update(
started_at=started_at,
running_status=JobExecutionStatus.RUNNING,
)
.where(
(execution_primary_key == _execution_id(execution))
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
.execute()
2026-03-30 18:26:02 +02:00
)
2026-03-31 09:24:46 +02:00
if claimed:
return JobExecution.get_by_id(_execution_id(execution))
return None
2026-03-30 14:02:39 +02:00
2026-03-31 09:24:46 +02:00
def _start_worker_for_execution(self, *, job_id: int, execution_id: int) -> None:
2026-03-30 14:02:39 +02:00
artifacts = JobArtifacts.for_execution(
log_dir=self.log_dir, job_id=job_id, execution_id=execution_id
)
artifacts.log_path.parent.mkdir(parents=True, exist_ok=True)
log_handle = artifacts.log_path.open("a", encoding="utf-8", buffering=1)
log_handle.write(
f"scheduler: starting execution {execution_id} for job {job_id}\n"
)
process = subprocess.Popen(
[
sys.executable,
"-u",
"-m",
"repub.job_runner",
"--job-id",
str(job_id),
"--execution-id",
str(execution_id),
2026-03-30 15:04:41 +02:00
"--db-path",
str(database.database),
"--out-dir",
str(self.log_dir.parent),
2026-03-30 14:02:39 +02:00
"--stats-path",
str(artifacts.stats_path),
],
stdout=log_handle,
stderr=subprocess.STDOUT,
text=True,
)
self._workers[execution_id] = RunningWorker(
execution_id=execution_id,
process=process,
log_handle=log_handle,
artifacts=artifacts,
)
2026-03-30 18:26:02 +02:00
def _max_concurrent_jobs_reached(self) -> bool:
return (
JobExecution.select()
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
.count()
>= load_max_concurrent_jobs()
)
2026-03-30 14:02:39 +02:00
def request_execution_cancel(self, execution_id: int) -> bool:
with database.connection_context():
execution = JobExecution.get_or_none(id=execution_id)
if execution is None:
return False
if execution.running_status != JobExecutionStatus.RUNNING:
return False
if execution.stop_requested_at is None:
execution.stop_requested_at = utc_now()
execution.save()
worker = self._workers.get(execution_id)
if worker is not None and worker.process.poll() is None:
worker.log_handle.write(
f"scheduler: graceful stop requested for execution {execution_id}\n"
)
worker.process.terminate()
self._trigger_refresh("queue-reordered")
2026-03-30 14:02:39 +02:00
return True
2026-03-31 09:24:46 +02:00
def cancel_queued_execution(self, execution_id: int) -> bool:
with self._run_lock:
with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
deleted = (
JobExecution.delete()
.where(
(execution_primary_key == execution_id)
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
.execute()
)
if not deleted:
return False
self._trigger_refresh()
return True
def move_queued_execution(self, execution_id: int, *, direction: str) -> bool:
offset = -1 if direction == "up" else 1
with self._run_lock:
with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
queued_executions = tuple(
JobExecution.select()
.where(JobExecution.running_status == JobExecutionStatus.PENDING)
.order_by(
JobExecution.created_at.asc(), execution_primary_key.asc()
)
)
current_index = next(
(
index
for index, execution in enumerate(queued_executions)
if _execution_id(execution) == execution_id
),
None,
)
if current_index is None:
return False
target_index = current_index + offset
if target_index < 0 or target_index >= len(queued_executions):
return False
current_execution = queued_executions[current_index]
target_execution = queued_executions[target_index]
current_created_at = _coerce_datetime(
cast(datetime | str, current_execution.created_at)
)
target_created_at = _coerce_datetime(
cast(datetime | str, target_execution.created_at)
)
with database.atomic():
if current_created_at == target_created_at:
adjusted_created_at = target_created_at + timedelta(
microseconds=-1 if offset < 0 else 1
)
(
JobExecution.update(created_at=adjusted_created_at)
.where(
execution_primary_key
== _execution_id(current_execution)
)
.execute()
)
else:
(
JobExecution.update(created_at=target_created_at)
.where(
execution_primary_key
== _execution_id(current_execution)
)
.execute()
)
(
JobExecution.update(created_at=current_created_at)
.where(
execution_primary_key == _execution_id(target_execution)
)
.execute()
)
self._trigger_refresh()
return True
2026-03-30 14:02:39 +02:00
def set_job_enabled(self, job_id: int, *, enabled: bool) -> bool:
with database.connection_context():
2026-03-31 09:24:46 +02:00
with database.atomic():
job = Job.get_or_none(id=job_id)
if job is None:
return False
job.enabled = enabled
job.save()
if not enabled:
(
JobExecution.delete()
.where(
(JobExecution.job == job)
& (
JobExecution.running_status
== JobExecutionStatus.PENDING
)
)
.execute()
)
2026-03-30 14:02:39 +02:00
self.sync_jobs()
self._trigger_refresh()
return True
def poll_workers(self) -> None:
2026-03-31 09:24:46 +02:00
any_finished = False
2026-03-30 14:02:39 +02:00
for execution_id in tuple(self._workers):
worker = self._workers[execution_id]
self._apply_stats(worker)
self._enforce_graceful_stop(worker)
returncode = worker.process.poll()
if returncode is None:
continue
self._apply_stats(worker)
with database.connection_context():
execution = JobExecution.get_by_id(execution_id)
execution.ended_at = utc_now()
2026-03-30 15:53:04 +02:00
execution.running_status = _worker_final_status(
2026-03-30 14:02:39 +02:00
execution=execution,
2026-03-30 15:53:04 +02:00
worker=worker,
2026-03-30 14:02:39 +02:00
returncode=returncode,
)
execution.save()
worker.log_handle.close()
del self._workers[execution_id]
2026-03-31 09:24:46 +02:00
any_finished = True
2026-03-30 14:02:39 +02:00
self._trigger_refresh()
2026-03-31 09:24:46 +02:00
if any_finished:
self._start_queued_jobs()
self._refresh_running_runtime()
2026-03-30 14:02:39 +02:00
def _apply_stats(self, worker: RunningWorker) -> None:
if not worker.artifacts.stats_path.exists():
return
with worker.artifacts.stats_path.open("r", encoding="utf-8") as handle:
handle.seek(worker.stats_offset)
payload = handle.read()
worker.stats_offset = handle.tell()
lines = [line for line in payload.splitlines() if line.strip()]
if not lines:
return
stats = json.loads(lines[-1])
with database.connection_context():
execution = JobExecution.get_by_id(worker.execution_id)
execution.requests_count = int(stats.get("requests_count", 0))
execution.items_count = int(stats.get("items_count", 0))
execution.warnings_count = int(stats.get("warnings_count", 0))
execution.errors_count = int(stats.get("errors_count", 0))
execution.bytes_count = int(stats.get("bytes_count", 0))
execution.retries_count = int(stats.get("retries_count", 0))
execution.exceptions_count = int(stats.get("exceptions_count", 0))
execution.cache_size_count = int(stats.get("cache_size_count", 0))
execution.cache_object_count = int(stats.get("cache_object_count", 0))
execution.raw_stats = json.dumps(stats, sort_keys=True)
execution.save()
self._trigger_refresh()
def _enforce_graceful_stop(self, worker: RunningWorker) -> None:
with database.connection_context():
execution = JobExecution.get_by_id(worker.execution_id)
if execution.stop_requested_at is None:
return
elapsed = utc_now() - _coerce_datetime(execution.stop_requested_at)
if (
elapsed >= timedelta(seconds=self.graceful_stop_seconds)
and worker.process.poll() is None
):
worker.process.kill()
def _trigger_refresh(self, event: object = "refresh-event") -> None:
2026-03-30 14:02:39 +02:00
if self.refresh_callback is not None:
self.refresh_callback(event)
def _refresh_running_runtime(self) -> None:
if not self._has_running_executions():
return
current_time = time.monotonic()
if current_time - self._last_runtime_refresh_at < 1.0:
return
self._last_runtime_refresh_at = current_time
self._trigger_refresh()
def _has_running_executions(self) -> bool:
return (
JobExecution.select()
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
.exists()
)
2026-03-30 14:02:39 +02:00
2026-03-30 14:18:55 +02:00
def _reconcile_stale_executions(self) -> None:
2026-03-30 15:53:04 +02:00
live_workers = _find_live_workers()
recovered_execution_ids: set[int] = set()
2026-03-30 14:18:55 +02:00
with database.connection_context():
2026-03-30 15:53:04 +02:00
execution_primary_key = getattr(JobExecution, "_meta").primary_key
if live_workers:
live_executions = tuple(
JobExecution.select(JobExecution, Job)
.join(Job)
.where(execution_primary_key.in_(tuple(live_workers)))
)
else:
live_executions = ()
for execution in live_executions:
job = cast(Job, execution.job)
execution_id = _execution_id(execution)
live_worker = live_workers.get(execution_id)
if live_worker is None or live_worker.job_id != _job_id(job):
continue
artifacts = JobArtifacts.for_execution(
log_dir=self.log_dir,
job_id=live_worker.job_id,
execution_id=execution_id,
)
artifacts.log_path.parent.mkdir(parents=True, exist_ok=True)
if execution.running_status != JobExecutionStatus.RUNNING:
execution.running_status = JobExecutionStatus.RUNNING
execution.ended_at = None
execution.save()
message = f"scheduler: restored execution state from live worker pid {live_worker.pid} after app restart\n"
else:
message = f"scheduler: reattached to worker pid {live_worker.pid} after app restart\n"
log_handle = artifacts.log_path.open("a", encoding="utf-8", buffering=1)
log_handle.write(message)
self._workers[execution_id] = RunningWorker(
execution_id=execution_id,
process=DetachedProcess(pid=live_worker.pid),
log_handle=log_handle,
artifacts=artifacts,
detached=True,
)
recovered_execution_ids.add(execution_id)
2026-03-30 14:18:55 +02:00
stale_executions = tuple(
JobExecution.select(JobExecution, Job)
.join(Job)
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
)
for execution in stale_executions:
job = cast(Job, execution.job)
execution_id = _execution_id(execution)
2026-03-30 15:53:04 +02:00
if execution_id in recovered_execution_ids:
continue
job_id = _job_id(job)
2026-03-30 14:18:55 +02:00
artifacts = JobArtifacts.for_execution(
log_dir=self.log_dir,
2026-03-30 15:53:04 +02:00
job_id=job_id,
2026-03-30 14:18:55 +02:00
execution_id=execution_id,
)
artifacts.log_path.parent.mkdir(parents=True, exist_ok=True)
with artifacts.log_path.open("a", encoding="utf-8") as log_handle:
log_handle.write(
"scheduler: execution marked failed after app restart\n"
)
execution.ended_at = utc_now()
execution.running_status = (
JobExecutionStatus.CANCELED
if execution.stop_requested_at is not None
else JobExecutionStatus.FAILED
)
execution.save()
2026-03-30 15:53:04 +02:00
if stale_executions or recovered_execution_ids:
2026-03-30 14:18:55 +02:00
self._trigger_refresh()
2026-03-30 14:02:39 +02:00
def load_runs_view(
*,
log_dir: str | Path,
now: datetime | None = None,
completed_page: int = 1,
completed_page_size: int = COMPLETED_EXECUTION_PAGE_SIZE,
) -> RunsView:
2026-03-30 14:02:39 +02:00
reference_time = now or datetime.now(UTC)
resolved_log_dir = Path(log_dir)
sanitized_page_size = max(1, completed_page_size)
2026-03-30 14:02:39 +02:00
with database.connection_context():
2026-03-31 09:24:46 +02:00
execution_primary_key = getattr(JobExecution, "_meta").primary_key
2026-03-30 14:02:39 +02:00
jobs = tuple(Job.select(Job, Source).join(Source).order_by(Source.name.asc()))
2026-03-31 09:24:46 +02:00
queued_executions = tuple(
JobExecution.select(JobExecution, Job, Source)
.join(Job)
.join(Source)
.where(JobExecution.running_status == JobExecutionStatus.PENDING)
.order_by(JobExecution.created_at.asc(), execution_primary_key.asc())
)
2026-03-30 14:02:39 +02:00
running_executions = tuple(
JobExecution.select(JobExecution, Job, Source)
.join(Job)
.join(Source)
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
.order_by(JobExecution.started_at.desc())
)
completed_query = (
2026-03-30 14:02:39 +02:00
JobExecution.select(JobExecution, Job, Source)
.join(Job)
.join(Source)
.where(
JobExecution.running_status.in_(
(
JobExecutionStatus.SUCCEEDED,
JobExecutionStatus.FAILED,
JobExecutionStatus.CANCELED,
)
)
)
.order_by(JobExecution.ended_at.desc())
)
completed_total_count = completed_query.count()
completed_total_pages = max(
1, math.ceil(completed_total_count / sanitized_page_size)
)
sanitized_completed_page = min(max(1, completed_page), completed_total_pages)
completed_executions = tuple(
completed_query.paginate(sanitized_completed_page, sanitized_page_size)
2026-03-30 14:02:39 +02:00
)
running_by_job = {
2026-03-31 09:24:46 +02:00
_job_id(cast(Job, execution.job)): execution
for execution in running_executions
}
queued_by_job = {
_job_id(cast(Job, execution.job)): execution
for execution in queued_executions
2026-03-30 14:02:39 +02:00
}
return {
"running": tuple(
2026-03-31 09:24:46 +02:00
_project_running_execution(
execution,
resolved_log_dir,
reference_time,
queued_follow_up=queued_by_job.get(_job_id(cast(Job, execution.job))),
)
2026-03-30 14:02:39 +02:00
for execution in running_executions
),
2026-03-31 09:24:46 +02:00
"queued": tuple(
_project_queued_execution(
execution,
reference_time,
position=position,
total_count=len(queued_executions),
)
2026-03-31 09:24:46 +02:00
for position, execution in enumerate(queued_executions, start=1)
),
2026-03-30 14:02:39 +02:00
"upcoming": tuple(
_project_upcoming_job(
job,
running_by_job.get(job.id),
queued_by_job.get(job.id),
reference_time,
)
2026-03-30 14:02:39 +02:00
for job in jobs
),
"completed": tuple(
2026-03-30 15:04:41 +02:00
_project_completed_execution(execution, resolved_log_dir, reference_time)
2026-03-30 14:02:39 +02:00
for execution in completed_executions
),
"completed_page": sanitized_completed_page,
"completed_page_size": sanitized_page_size,
"completed_total_count": completed_total_count,
"completed_total_pages": completed_total_pages,
2026-03-30 14:02:39 +02:00
}
def clear_completed_executions(*, log_dir: str | Path) -> int:
resolved_log_dir = Path(log_dir)
with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
completed_executions = tuple(
JobExecution.select(JobExecution, Job)
.join(Job)
.where(
JobExecution.running_status.in_(
(
JobExecutionStatus.SUCCEEDED,
JobExecutionStatus.FAILED,
JobExecutionStatus.CANCELED,
)
)
)
)
if not completed_executions:
return 0
for execution in completed_executions:
job = cast(Job, execution.job)
prefix = f"job-{_job_id(job)}-execution-{_execution_id(execution)}"
for artifact_path in resolved_log_dir.glob(f"{prefix}.*"):
artifact_path.unlink(missing_ok=True)
execution_ids = tuple(
_execution_id(execution) for execution in completed_executions
)
return (
JobExecution.delete()
.where(execution_primary_key.in_(execution_ids))
.execute()
)
2026-03-30 14:02:39 +02:00
def load_dashboard_view(
*, log_dir: str | Path, now: datetime | None = None
) -> dict[str, object]:
reference_time = now or datetime.now(UTC)
runs_view = load_runs_view(log_dir=log_dir, now=reference_time)
2026-03-30 15:04:41 +02:00
output_dir = Path(log_dir).parent
running_by_job_id = {
int(cast(int, execution["job_id"])): execution
for execution in runs_view["running"]
}
queued_by_job_id = {
int(cast(int, execution["job_id"])): execution
for execution in runs_view["queued"]
}
upcoming_by_job_id = {
int(cast(int, job["job_id"])): job for job in runs_view["upcoming"]
}
2026-03-30 14:02:39 +02:00
with database.connection_context():
jobs = tuple(Job.select(Job, Source).join(Source).order_by(Source.name.asc()))
2026-03-30 14:02:39 +02:00
failed_last_day = (
JobExecution.select()
.where(
(JobExecution.running_status == JobExecutionStatus.FAILED)
& (JobExecution.ended_at.is_null(False))
)
.count()
)
upcoming_ready = sum(
1 for job in runs_view["upcoming"] if str(job["run_reason"]) == "Ready"
)
2026-03-30 15:04:41 +02:00
footprint_bytes = _directory_size(output_dir)
2026-03-30 14:02:39 +02:00
return {
"running": runs_view["running"],
"queued": runs_view["queued"],
2026-03-30 15:21:39 +02:00
"source_feeds": tuple(
_project_source_feed(
cast(Job, job),
output_dir,
reference_time,
running_execution=running_by_job_id.get(_job_id(cast(Job, job))),
queued_execution=queued_by_job_id.get(_job_id(cast(Job, job))),
upcoming_job=upcoming_by_job_id.get(_job_id(cast(Job, job))),
)
for job in jobs
2026-03-30 15:21:39 +02:00
),
2026-03-30 14:02:39 +02:00
"snapshot": {
"running_now": str(len(runs_view["running"])),
"upcoming_today": str(upcoming_ready),
"failures_24h": str(failed_last_day),
"artifact_footprint": _format_bytes(footprint_bytes),
},
}
def load_execution_log_view(
*, log_dir: str | Path, job_id: int, execution_id: int
) -> ExecutionLogView:
with database.connection_context():
execution = JobExecution.get_or_none(id=execution_id)
route = f"/job/{job_id}/execution/{execution_id}/logs"
if execution is None or _job_id(cast(Job, execution.job)) != job_id:
return ExecutionLogView(
job_id=job_id,
execution_id=execution_id,
title=f"Job {job_id} / execution {execution_id}",
description="Plain text log view routed through the app.",
status_label="Unavailable",
status_tone="failed",
log_text="",
error_message="Execution does not exist.",
)
artifacts = JobArtifacts.for_execution(
log_dir=Path(log_dir),
job_id=job_id,
execution_id=execution_id,
)
if not artifacts.log_path.exists():
return ExecutionLogView(
job_id=job_id,
execution_id=execution_id,
title=f"Job {job_id} / execution {execution_id}",
description="Plain text log view routed through the app.",
status_label=_execution_status_label(execution),
status_tone=_execution_status_tone(execution),
log_text="",
error_message="Log file has not been created yet.",
)
return ExecutionLogView(
job_id=job_id,
execution_id=execution_id,
title=f"Job {job_id} / execution {execution_id}",
description=f"Route: {route}",
status_label=_execution_status_label(execution),
status_tone=_execution_status_tone(execution),
log_text=artifacts.log_path.read_text(encoding="utf-8"),
)
def _job_trigger(job: Job) -> CronTrigger:
expression = " ".join(
(
str(job.cron_minute),
str(job.cron_hour),
str(job.cron_day_of_month),
str(job.cron_month),
str(job.cron_day_of_week),
)
)
return CronTrigger.from_crontab(expression, timezone=UTC)
def _scheduler_job_id(job_id: int) -> str:
return f"{SCHEDULER_JOB_PREFIX}{job_id}"
def _project_running_execution(
2026-03-31 09:24:46 +02:00
execution: JobExecution,
log_dir: Path,
reference_time: datetime,
*,
queued_follow_up: JobExecution | None = None,
2026-03-30 14:02:39 +02:00
) -> dict[str, object]:
job = cast(Job, execution.job)
job_id = _job_id(job)
execution_id = _execution_id(execution)
artifacts = JobArtifacts.for_execution(
log_dir=log_dir, job_id=job_id, execution_id=execution_id
)
started_at = _coerce_datetime(
cast(datetime | str, execution.started_at or execution.created_at)
)
runtime = reference_time - started_at
return {
"source": job.source.name,
"slug": job.source.slug,
"job_id": job_id,
"execution_id": execution_id,
2026-03-31 12:35:41 +02:00
"duration": _format_duration(started_at, reference_time),
"started_at": _humanize_relative_time(reference_time, started_at),
"started_at_iso": started_at.isoformat(),
2026-03-30 14:02:39 +02:00
"runtime": f"running for {int(runtime.total_seconds())}s",
"status": "Stopping" if execution.stop_requested_at else "Running",
"stats": _stats_summary(execution),
"worker": (
"graceful stop requested"
if execution.stop_requested_at
2026-03-30 15:53:04 +02:00
else "streaming stats from worker"
2026-03-30 14:02:39 +02:00
),
"log_href": f"/job/{job_id}/execution/{execution_id}/logs",
"log_exists": artifacts.log_path.exists(),
2026-03-31 09:24:46 +02:00
"cancel_label": "Cancel" if queued_follow_up is not None else "Stop",
"cancel_post_path": (
f"/actions/queued-executions/{_execution_id(queued_follow_up)}/cancel"
if queued_follow_up is not None
else f"/actions/executions/{execution_id}/cancel"
),
}
def _project_queued_execution(
execution: JobExecution,
reference_time: datetime,
*,
position: int,
total_count: int,
2026-03-31 09:24:46 +02:00
) -> dict[str, object]:
job = cast(Job, execution.job)
queued_at = _coerce_datetime(cast(datetime | str, execution.created_at))
execution_id = _execution_id(execution)
2026-03-31 09:24:46 +02:00
return {
"source": job.source.name,
"slug": job.source.slug,
"job_id": _job_id(job),
"execution_id": execution_id,
2026-03-31 09:24:46 +02:00
"queued_at": _humanize_relative_time(reference_time, queued_at),
"queued_at_iso": queued_at.isoformat(),
"queue_position": position,
"status": "Queued",
"status_tone": "idle",
"run_label": "Queued",
"run_disabled": True,
"run_post_path": f"/actions/jobs/{_job_id(job)}/run-now",
"cancel_post_path": (f"/actions/queued-executions/{execution_id}/cancel"),
"move_up_disabled": position == 1,
"move_up_post_path": (
None
if position == 1
else f"/actions/queued-executions/{execution_id}/move-up"
),
"move_down_disabled": position == total_count,
"move_down_post_path": (
None
if position == total_count
else f"/actions/queued-executions/{execution_id}/move-down"
2026-03-31 09:24:46 +02:00
),
2026-03-30 14:02:39 +02:00
}
def _project_upcoming_job(
job: Job,
running_execution: JobExecution | None,
queued_execution: JobExecution | None,
reference_time: datetime,
2026-03-30 14:02:39 +02:00
) -> dict[str, object]:
job_id = _job_id(job)
trigger = _job_trigger(job)
next_run = (
trigger.get_next_fire_time(None, reference_time)
if job.enabled and running_execution is None
else None
)
run_disabled = running_execution is not None or queued_execution is not None
run_reason = (
"Already running"
if running_execution is not None
else ("Queued" if queued_execution is not None else "Ready")
)
2026-03-30 14:02:39 +02:00
return {
"source": job.source.name,
"slug": job.source.slug,
"job_id": job_id,
"next_run": (
2026-03-30 15:04:41 +02:00
_humanize_relative_time(reference_time, next_run)
2026-03-30 14:02:39 +02:00
if next_run is not None
else ("Running now" if running_execution is not None else "Not scheduled")
),
2026-03-30 14:14:59 +02:00
"next_run_at": next_run.isoformat() if next_run is not None else None,
2026-03-30 14:02:39 +02:00
"schedule": " ".join(
(
str(job.cron_minute),
str(job.cron_hour),
str(job.cron_day_of_month),
str(job.cron_month),
str(job.cron_day_of_week),
)
),
"enabled_label": "Enabled" if job.enabled else "Disabled",
"enabled_tone": "scheduled" if job.enabled else "idle",
"run_disabled": run_disabled,
"run_reason": run_reason,
2026-03-30 14:02:39 +02:00
"toggle_label": "Disable" if job.enabled else "Enable",
"toggle_enabled": not job.enabled,
"run_post_path": f"/actions/jobs/{job_id}/run-now",
"toggle_post_path": f"/actions/jobs/{job_id}/toggle-enabled",
"delete_post_path": f"/actions/jobs/{job_id}/delete",
}
def _project_completed_execution(
2026-03-30 15:04:41 +02:00
execution: JobExecution, log_dir: Path, reference_time: datetime
2026-03-30 14:02:39 +02:00
) -> dict[str, object]:
job = cast(Job, execution.job)
job_id = _job_id(job)
execution_id = _execution_id(execution)
artifacts = JobArtifacts.for_execution(
log_dir=log_dir, job_id=job_id, execution_id=execution_id
)
2026-03-30 15:04:41 +02:00
ended_at = (
_coerce_datetime(cast(datetime | str, execution.ended_at))
if execution.ended_at is not None
else None
)
2026-03-31 12:25:46 +02:00
started_at = (
_coerce_datetime(cast(datetime | str, execution.started_at))
if execution.started_at is not None
else None
)
2026-03-30 14:02:39 +02:00
return {
"source": job.source.name,
"slug": job.source.slug,
"job_id": job_id,
"execution_id": execution_id,
2026-03-31 12:25:46 +02:00
"duration": _format_duration(started_at, ended_at),
2026-03-30 14:02:39 +02:00
"ended_at": (
2026-03-30 15:04:41 +02:00
_humanize_relative_time(reference_time, ended_at)
if ended_at is not None
2026-03-30 14:02:39 +02:00
else "Pending"
),
2026-03-30 15:04:41 +02:00
"ended_at_iso": ended_at.isoformat() if ended_at is not None else None,
2026-03-30 14:02:39 +02:00
"status": _execution_status_label(execution),
"status_tone": _execution_status_tone(execution),
"stats": _stats_summary(execution),
"summary": (
"Canceled by operator"
if execution.running_status == JobExecutionStatus.CANCELED
else (
"Worker exited successfully"
if execution.running_status == JobExecutionStatus.SUCCEEDED
else "Worker exited with failure"
)
),
"log_href": f"/job/{job_id}/execution/{execution_id}/logs",
"log_exists": artifacts.log_path.exists(),
}
2026-03-30 15:21:39 +02:00
def _project_source_feed(
job: Job,
output_dir: Path,
reference_time: datetime,
*,
running_execution: dict[str, object] | None = None,
queued_execution: dict[str, object] | None = None,
upcoming_job: dict[str, object] | None = None,
2026-03-30 15:21:39 +02:00
) -> dict[str, object]:
source = cast(Source, job.source)
2026-03-30 15:21:39 +02:00
source_slug = str(source.slug)
source_dir = feed_output_dir(out_dir=output_dir, feed_slug=source_slug)
feed_path = feed_output_path(out_dir=output_dir, feed_slug=source_slug)
feed_exists = feed_path.exists()
updated_at = (
datetime.fromtimestamp(feed_path.stat().st_mtime, tz=UTC)
if feed_exists
else None
)
if running_execution is not None:
feed_status_label = str(running_execution["status"])
feed_status_tone = "scheduled"
elif queued_execution is not None:
feed_status_label = "Queued"
feed_status_tone = "queued"
else:
feed_status_label = "Available" if feed_exists else "Missing"
feed_status_tone = "done" if feed_exists else "failed"
2026-03-30 15:21:39 +02:00
return {
"source": source.name,
"slug": source_slug,
"feed_href": f"/feeds/{source_slug}/feed.rss",
"feed_status_label": feed_status_label,
"feed_status_tone": feed_status_tone,
2026-03-30 15:21:39 +02:00
"feed_exists": feed_exists,
"last_updated": (
_humanize_relative_time(reference_time, updated_at)
if updated_at is not None
else "Never published"
),
"last_updated_iso": updated_at.isoformat() if updated_at is not None else None,
"next_run": (
str(upcoming_job["next_run"])
if upcoming_job is not None
else "Not scheduled"
),
"next_run_at": (
cast(str | None, upcoming_job["next_run_at"])
if upcoming_job is not None
else None
),
"run_disabled": (
bool(upcoming_job["run_disabled"]) if upcoming_job is not None else False
),
"run_post_path": (
str(upcoming_job["run_post_path"])
if upcoming_job is not None
else f"/actions/jobs/{_job_id(job)}/run-now"
),
2026-03-30 15:21:39 +02:00
"artifact_footprint": _format_bytes(_directory_size(source_dir)),
}
2026-03-30 14:02:39 +02:00
def _execution_status_label(execution: JobExecution) -> str:
status = JobExecutionStatus(execution.running_status)
return {
JobExecutionStatus.PENDING: "Pending",
JobExecutionStatus.RUNNING: (
"Stopping" if execution.stop_requested_at else "Running"
),
JobExecutionStatus.SUCCEEDED: "Succeeded",
JobExecutionStatus.FAILED: "Failed",
JobExecutionStatus.CANCELED: "Canceled",
}[status]
def _execution_status_tone(execution: JobExecution) -> str:
status = JobExecutionStatus(execution.running_status)
return {
JobExecutionStatus.PENDING: "idle",
JobExecutionStatus.RUNNING: "running",
JobExecutionStatus.SUCCEEDED: "done",
JobExecutionStatus.FAILED: "failed",
JobExecutionStatus.CANCELED: "idle",
}[status]
def _stats_summary(execution: JobExecution) -> str:
2026-03-30 15:25:28 +02:00
bytes_count = cast(int, execution.bytes_count)
2026-03-30 14:02:39 +02:00
return (
f"{execution.requests_count} requests"
f"{execution.items_count} items"
2026-03-30 15:25:28 +02:00
f"{_format_summary_bytes(bytes_count)}"
2026-03-30 14:02:39 +02:00
)
def _final_status(*, execution: JobExecution, returncode: int) -> JobExecutionStatus:
if execution.stop_requested_at is not None:
return JobExecutionStatus.CANCELED
if returncode == 0:
return JobExecutionStatus.SUCCEEDED
return JobExecutionStatus.FAILED
2026-03-30 15:53:04 +02:00
def _worker_final_status(
*,
execution: JobExecution,
worker: RunningWorker,
returncode: int,
) -> JobExecutionStatus:
if worker.detached:
return _detached_final_status(execution=execution, artifacts=worker.artifacts)
return _final_status(execution=execution, returncode=returncode)
def _detached_final_status(
*, execution: JobExecution, artifacts: JobArtifacts
) -> JobExecutionStatus:
if execution.stop_requested_at is not None:
return JobExecutionStatus.CANCELED
log_tail = _read_log_tail(artifacts.log_path)
if "completed successfully" in log_tail:
return JobExecutionStatus.SUCCEEDED
return JobExecutionStatus.FAILED
2026-03-30 14:02:39 +02:00
def _coerce_datetime(value: datetime | str) -> datetime:
if isinstance(value, datetime):
if value.tzinfo is None:
return value.replace(tzinfo=UTC)
return value.astimezone(UTC)
parsed = datetime.fromisoformat(value)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC)
def _job_id(job: Job) -> int:
return int(job.get_id())
def _execution_id(execution: JobExecution) -> int:
return int(execution.get_id())
def _directory_size(path: Path) -> int:
if not path.exists():
return 0
return sum(entry.stat().st_size for entry in path.rglob("*") if entry.is_file())
def _format_bytes(value: int) -> str:
if value < 1024:
return f"{value} B"
if value < 1024 * 1024:
return f"{value / 1024:.1f} KB"
if value < 1024 * 1024 * 1024:
return f"{value / (1024 * 1024):.1f} MB"
return f"{value / (1024 * 1024 * 1024):.1f} GB"
2026-03-30 14:14:59 +02:00
2026-03-30 15:25:28 +02:00
def _format_summary_bytes(value: int) -> str:
if value == 1:
return "1 byte"
if value < 1024:
return f"{value} bytes"
if value < 1024 * 1024:
return f"{value / 1024:.1f} KiB"
if value < 1024 * 1024 * 1024:
return f"{value / (1024 * 1024):.1f} MiB"
return f"{value / (1024 * 1024 * 1024):.1f} GiB"
2026-03-30 15:04:41 +02:00
def _humanize_relative_time(reference_time: datetime, target_time: datetime) -> str:
2026-03-30 14:14:59 +02:00
delta_seconds = int(round((target_time - reference_time).total_seconds()))
2026-03-30 15:04:41 +02:00
if delta_seconds == 0:
2026-03-30 14:14:59 +02:00
return "now"
2026-03-30 15:04:41 +02:00
absolute_delta_seconds = abs(delta_seconds)
2026-03-30 14:14:59 +02:00
units = (
("day", 24 * 60 * 60),
("hour", 60 * 60),
("minute", 60),
)
for label, size in units:
2026-03-30 15:04:41 +02:00
if absolute_delta_seconds >= size:
count = max(1, round(absolute_delta_seconds / size))
2026-03-30 14:14:59 +02:00
suffix = "" if count == 1 else "s"
2026-03-30 15:04:41 +02:00
if delta_seconds > 0:
return f"in {count} {label}{suffix}"
return f"{count} {label}{suffix} ago"
2026-03-30 14:14:59 +02:00
2026-03-30 15:04:41 +02:00
if delta_seconds > 0:
return f"in {absolute_delta_seconds} seconds"
return f"{absolute_delta_seconds} seconds ago"
2026-03-30 15:53:04 +02:00
2026-03-31 12:25:46 +02:00
def _format_duration(
started_at: datetime | None, ended_at: datetime | None
) -> str | None:
if started_at is None or ended_at is None:
return None
total_seconds = max(0, int((ended_at - started_at).total_seconds()))
hours, remainder = divmod(total_seconds, 60 * 60)
minutes, seconds = divmod(remainder, 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
2026-03-30 15:53:04 +02:00
def _find_live_workers() -> dict[int, LiveWorker]:
proc_dir = Path("/proc")
if not proc_dir.exists():
return {}
live_workers: dict[int, LiveWorker] = {}
for cmdline_path in proc_dir.glob("[0-9]*/cmdline"):
try:
argv = [
part
for part in cmdline_path.read_bytes()
.decode("utf-8", errors="ignore")
.split("\x00")
if part != ""
]
except OSError:
continue
live_worker = _parse_live_worker(argv, pid=int(cmdline_path.parent.name))
if live_worker is None or not _pid_is_running(live_worker.pid):
continue
live_workers[live_worker.execution_id] = live_worker
return live_workers
def _parse_live_worker(argv: list[str], *, pid: int) -> LiveWorker | None:
if "repub.job_runner" not in argv:
return None
job_id = _argv_flag_value(argv, "--job-id")
execution_id = _argv_flag_value(argv, "--execution-id")
if job_id is None or execution_id is None:
return None
return LiveWorker(job_id=int(job_id), execution_id=int(execution_id), pid=pid)
def _argv_flag_value(argv: list[str], flag: str) -> str | None:
try:
index = argv.index(flag)
except ValueError:
return None
value_index = index + 1
if value_index >= len(argv):
return None
return argv[value_index]
def _pid_is_running(pid: int) -> bool:
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
stat_path = Path(f"/proc/{pid}/stat")
if stat_path.exists():
try:
stat_text = stat_path.read_text(encoding="utf-8")
except OSError:
return True
state_parts = stat_text.split(") ", 1)
if len(state_parts) == 2 and state_parts[1].startswith("Z"):
return False
return True
def _send_signal(pid: int, signum: int) -> None:
try:
os.kill(pid, signum)
except ProcessLookupError:
return
def _read_log_tail(path: Path, *, max_bytes: int = 8192) -> str:
if not path.exists():
return ""
with path.open("rb") as handle:
handle.seek(0, os.SEEK_END)
size = handle.tell()
handle.seek(max(0, size - max_bytes))
return handle.read().decode("utf-8", errors="replace")