republisher/repub/jobs.py

1335 lines
46 KiB
Python

from __future__ import annotations
import json
import math
import os
import signal
import subprocess
import sys
import threading
import time
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Callable, TextIO, TypedDict, cast
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from peewee import IntegrityError
from repub.config import feed_output_dir, feed_output_path
from repub.model import (
Job,
JobExecution,
JobExecutionStatus,
Source,
database,
load_max_concurrent_jobs,
utc_now,
)
SCHEDULER_JOB_PREFIX = "job-"
POLL_JOB_ID = "runtime-poll-workers"
SYNC_JOB_ID = "runtime-sync-jobs"
COMPLETED_EXECUTION_PAGE_SIZE = 20
@dataclass(frozen=True)
class JobArtifacts:
log_path: Path
stats_path: Path
@classmethod
def for_execution(
cls, *, log_dir: Path, job_id: int, execution_id: int
) -> "JobArtifacts":
prefix = f"job-{job_id}-execution-{execution_id}"
return cls(
log_path=log_dir / f"{prefix}.log",
stats_path=log_dir / f"{prefix}.jsonl",
)
@dataclass
class RunningWorker:
execution_id: int
process: subprocess.Popen[str] | DetachedProcess
log_handle: TextIO
artifacts: JobArtifacts
stats_offset: int = 0
detached: bool = False
@dataclass(frozen=True)
class DetachedProcess:
pid: int
def poll(self) -> int | None:
return None if _pid_is_running(self.pid) else 0
def terminate(self) -> None:
_send_signal(self.pid, signal.SIGTERM)
def kill(self) -> None:
_send_signal(self.pid, signal.SIGKILL)
def wait(self, timeout: float | None = None) -> int:
deadline = None if timeout is None else time.monotonic() + timeout
while self.poll() is None:
if deadline is not None and time.monotonic() >= deadline:
raise subprocess.TimeoutExpired(
cmd=f"pid {self.pid}",
timeout=timeout if timeout is not None else 0.0,
)
time.sleep(0.05)
return 0
@dataclass(frozen=True)
class LiveWorker:
job_id: int
execution_id: int
pid: int
@dataclass(frozen=True)
class ExecutionLogView:
job_id: int
execution_id: int
title: str
description: str
status_label: str
status_tone: str
log_text: str
error_message: str | None = None
class RunsView(TypedDict):
running: tuple[dict[str, object], ...]
queued: tuple[dict[str, object], ...]
upcoming: tuple[dict[str, object], ...]
completed: tuple[dict[str, object], ...]
completed_page: int
completed_page_size: int
completed_total_count: int
completed_total_pages: int
class JobRuntime:
def __init__(
self,
*,
log_dir: str | Path,
refresh_callback: Callable[[object], None] | None = None,
graceful_stop_seconds: float = 15.0,
) -> None:
self.log_dir = Path(log_dir)
self.refresh_callback = refresh_callback
self.graceful_stop_seconds = graceful_stop_seconds
self.scheduler = BackgroundScheduler(timezone=UTC)
self._workers: dict[int, RunningWorker] = {}
self._run_lock = threading.Lock()
self._started = False
self._last_runtime_refresh_at = 0.0
def start(self) -> None:
if self._started:
return
self._reconcile_stale_executions()
self.scheduler.start()
self.scheduler.add_job(
self.poll_workers,
"interval",
id=POLL_JOB_ID,
seconds=0.25,
replace_existing=True,
max_instances=1,
coalesce=True,
)
self.scheduler.add_job(
self.sync_jobs,
"interval",
id=SYNC_JOB_ID,
seconds=1,
replace_existing=True,
max_instances=1,
coalesce=True,
)
self.sync_jobs()
self._started = True
self._start_queued_jobs()
def shutdown(self) -> None:
for execution_id in tuple(self._workers):
worker = self._workers.pop(execution_id)
if worker.process.poll() is None:
worker.process.kill()
worker.process.wait(timeout=2)
worker.log_handle.close()
if self._started:
self.scheduler.shutdown(wait=False)
self._started = False
def sync_jobs(self) -> None:
with database.connection_context():
jobs = tuple(Job.select().where(Job.enabled == True)) # noqa: E712
desired_ids = set()
for job in jobs:
scheduler_job_id = _scheduler_job_id(_job_id(job))
desired_ids.add(scheduler_job_id)
self.scheduler.add_job(
self.run_scheduled_job,
trigger=_job_trigger(job),
args=(_job_id(job),),
id=scheduler_job_id,
replace_existing=True,
max_instances=1,
coalesce=True,
misfire_grace_time=1,
)
for scheduled_job in tuple(self.scheduler.get_jobs()):
if (
scheduled_job.id.startswith(SCHEDULER_JOB_PREFIX)
and scheduled_job.id not in desired_ids
):
self.scheduler.remove_job(scheduled_job.id)
def run_scheduled_job(self, job_id: int) -> None:
self.enqueue_job_run(job_id, reason="scheduled")
def run_job_now(self, job_id: int, *, reason: str) -> int | None:
return self.enqueue_job_run(job_id, reason=reason)
def enqueue_job_run(self, job_id: int, *, reason: str) -> int | None:
del reason
self.start()
with self._run_lock:
execution_id = self._enqueue_job_run_locked(job_id)
self._start_queued_jobs_locked()
if execution_id is not None:
self._trigger_refresh()
return execution_id
def _enqueue_job_run_locked(self, job_id: int) -> int | None:
with database.connection_context():
with database.atomic():
job = Job.get_or_none(id=job_id)
if job is None:
return None
pending_execution = JobExecution.get_or_none(
(JobExecution.job == job)
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
if pending_execution is not None:
return _execution_id(pending_execution)
try:
execution = JobExecution.create(
job=job,
running_status=JobExecutionStatus.PENDING,
)
except IntegrityError:
pending_execution = JobExecution.get_or_none(
(JobExecution.job == job)
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
return (
_execution_id(pending_execution)
if pending_execution is not None
else None
)
return _execution_id(execution)
def _start_queued_jobs(self) -> None:
with self._run_lock:
self._start_queued_jobs_locked()
def _start_queued_jobs_locked(self) -> None:
while True:
if self._max_concurrent_jobs_reached():
return
claimed_execution = self._claim_next_pending_execution()
if claimed_execution is None:
return
job = cast(Job, claimed_execution.job)
self._start_worker_for_execution(
job_id=_job_id(job),
execution_id=_execution_id(claimed_execution),
)
def _claim_next_pending_execution(self) -> JobExecution | None:
with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
pending_executions = tuple(
JobExecution.select(JobExecution, Job)
.join(Job)
.where(JobExecution.running_status == JobExecutionStatus.PENDING)
.order_by(JobExecution.created_at.asc(), execution_primary_key.asc())
)
for execution in pending_executions:
job = cast(Job, execution.job)
already_running = (
JobExecution.select()
.where(
(JobExecution.job == job)
& (JobExecution.running_status == JobExecutionStatus.RUNNING)
)
.exists()
)
if already_running:
continue
started_at = utc_now()
claimed = (
JobExecution.update(
started_at=started_at,
running_status=JobExecutionStatus.RUNNING,
)
.where(
(execution_primary_key == _execution_id(execution))
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
.execute()
)
if claimed:
return JobExecution.get_by_id(_execution_id(execution))
return None
def _start_worker_for_execution(self, *, job_id: int, execution_id: int) -> None:
artifacts = JobArtifacts.for_execution(
log_dir=self.log_dir, job_id=job_id, execution_id=execution_id
)
artifacts.log_path.parent.mkdir(parents=True, exist_ok=True)
log_handle = artifacts.log_path.open("a", encoding="utf-8", buffering=1)
log_handle.write(
f"scheduler: starting execution {execution_id} for job {job_id}\n"
)
process = subprocess.Popen(
[
sys.executable,
"-u",
"-m",
"repub.job_runner",
"--job-id",
str(job_id),
"--execution-id",
str(execution_id),
"--db-path",
str(database.database),
"--out-dir",
str(self.log_dir.parent),
"--stats-path",
str(artifacts.stats_path),
],
stdout=log_handle,
stderr=subprocess.STDOUT,
text=True,
)
self._workers[execution_id] = RunningWorker(
execution_id=execution_id,
process=process,
log_handle=log_handle,
artifacts=artifacts,
)
def _max_concurrent_jobs_reached(self) -> bool:
return (
JobExecution.select()
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
.count()
>= load_max_concurrent_jobs()
)
def request_execution_cancel(self, execution_id: int) -> bool:
with database.connection_context():
execution = JobExecution.get_or_none(id=execution_id)
if execution is None:
return False
if execution.running_status != JobExecutionStatus.RUNNING:
return False
if execution.stop_requested_at is None:
execution.stop_requested_at = utc_now()
execution.save()
worker = self._workers.get(execution_id)
if worker is not None and worker.process.poll() is None:
worker.log_handle.write(
f"scheduler: graceful stop requested for execution {execution_id}\n"
)
worker.process.terminate()
self._trigger_refresh("queue-reordered")
return True
def cancel_queued_execution(self, execution_id: int) -> bool:
with self._run_lock:
with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
deleted = (
JobExecution.delete()
.where(
(execution_primary_key == execution_id)
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
.execute()
)
if not deleted:
return False
self._trigger_refresh()
return True
def move_queued_execution(self, execution_id: int, *, direction: str) -> bool:
offset = -1 if direction == "up" else 1
with self._run_lock:
with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
queued_executions = tuple(
JobExecution.select()
.where(JobExecution.running_status == JobExecutionStatus.PENDING)
.order_by(
JobExecution.created_at.asc(), execution_primary_key.asc()
)
)
current_index = next(
(
index
for index, execution in enumerate(queued_executions)
if _execution_id(execution) == execution_id
),
None,
)
if current_index is None:
return False
target_index = current_index + offset
if target_index < 0 or target_index >= len(queued_executions):
return False
current_execution = queued_executions[current_index]
target_execution = queued_executions[target_index]
current_created_at = _coerce_datetime(
cast(datetime | str, current_execution.created_at)
)
target_created_at = _coerce_datetime(
cast(datetime | str, target_execution.created_at)
)
with database.atomic():
if current_created_at == target_created_at:
adjusted_created_at = target_created_at + timedelta(
microseconds=-1 if offset < 0 else 1
)
(
JobExecution.update(created_at=adjusted_created_at)
.where(
execution_primary_key
== _execution_id(current_execution)
)
.execute()
)
else:
(
JobExecution.update(created_at=target_created_at)
.where(
execution_primary_key
== _execution_id(current_execution)
)
.execute()
)
(
JobExecution.update(created_at=current_created_at)
.where(
execution_primary_key == _execution_id(target_execution)
)
.execute()
)
self._trigger_refresh()
return True
def set_job_enabled(self, job_id: int, *, enabled: bool) -> bool:
with database.connection_context():
with database.atomic():
job = Job.get_or_none(id=job_id)
if job is None:
return False
job.enabled = enabled
job.save()
if not enabled:
(
JobExecution.delete()
.where(
(JobExecution.job == job)
& (
JobExecution.running_status
== JobExecutionStatus.PENDING
)
)
.execute()
)
self.sync_jobs()
self._trigger_refresh()
return True
def poll_workers(self) -> None:
any_finished = False
for execution_id in tuple(self._workers):
worker = self._workers[execution_id]
self._apply_stats(worker)
self._enforce_graceful_stop(worker)
returncode = worker.process.poll()
if returncode is None:
continue
self._apply_stats(worker)
with database.connection_context():
execution = JobExecution.get_by_id(execution_id)
execution.ended_at = utc_now()
execution.running_status = _worker_final_status(
execution=execution,
worker=worker,
returncode=returncode,
)
execution.save()
worker.log_handle.close()
del self._workers[execution_id]
any_finished = True
self._trigger_refresh()
if any_finished:
self._start_queued_jobs()
self._refresh_running_runtime()
def _apply_stats(self, worker: RunningWorker) -> None:
if not worker.artifacts.stats_path.exists():
return
with worker.artifacts.stats_path.open("r", encoding="utf-8") as handle:
handle.seek(worker.stats_offset)
payload = handle.read()
worker.stats_offset = handle.tell()
lines = [line for line in payload.splitlines() if line.strip()]
if not lines:
return
stats = json.loads(lines[-1])
with database.connection_context():
execution = JobExecution.get_by_id(worker.execution_id)
execution.requests_count = int(stats.get("requests_count", 0))
execution.items_count = int(stats.get("items_count", 0))
execution.warnings_count = int(stats.get("warnings_count", 0))
execution.errors_count = int(stats.get("errors_count", 0))
execution.bytes_count = int(stats.get("bytes_count", 0))
execution.retries_count = int(stats.get("retries_count", 0))
execution.exceptions_count = int(stats.get("exceptions_count", 0))
execution.cache_size_count = int(stats.get("cache_size_count", 0))
execution.cache_object_count = int(stats.get("cache_object_count", 0))
execution.raw_stats = json.dumps(stats, sort_keys=True)
execution.save()
self._trigger_refresh()
def _enforce_graceful_stop(self, worker: RunningWorker) -> None:
with database.connection_context():
execution = JobExecution.get_by_id(worker.execution_id)
if execution.stop_requested_at is None:
return
elapsed = utc_now() - _coerce_datetime(execution.stop_requested_at)
if (
elapsed >= timedelta(seconds=self.graceful_stop_seconds)
and worker.process.poll() is None
):
worker.process.kill()
def _trigger_refresh(self, event: object = "refresh-event") -> None:
if self.refresh_callback is not None:
self.refresh_callback(event)
def _refresh_running_runtime(self) -> None:
if not self._has_running_executions():
return
current_time = time.monotonic()
if current_time - self._last_runtime_refresh_at < 1.0:
return
self._last_runtime_refresh_at = current_time
self._trigger_refresh()
def _has_running_executions(self) -> bool:
return (
JobExecution.select()
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
.exists()
)
def _reconcile_stale_executions(self) -> None:
live_workers = _find_live_workers()
recovered_execution_ids: set[int] = set()
with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
if live_workers:
live_executions = tuple(
JobExecution.select(JobExecution, Job)
.join(Job)
.where(execution_primary_key.in_(tuple(live_workers)))
)
else:
live_executions = ()
for execution in live_executions:
job = cast(Job, execution.job)
execution_id = _execution_id(execution)
live_worker = live_workers.get(execution_id)
if live_worker is None or live_worker.job_id != _job_id(job):
continue
artifacts = JobArtifacts.for_execution(
log_dir=self.log_dir,
job_id=live_worker.job_id,
execution_id=execution_id,
)
artifacts.log_path.parent.mkdir(parents=True, exist_ok=True)
if execution.running_status != JobExecutionStatus.RUNNING:
execution.running_status = JobExecutionStatus.RUNNING
execution.ended_at = None
execution.save()
message = f"scheduler: restored execution state from live worker pid {live_worker.pid} after app restart\n"
else:
message = f"scheduler: reattached to worker pid {live_worker.pid} after app restart\n"
log_handle = artifacts.log_path.open("a", encoding="utf-8", buffering=1)
log_handle.write(message)
self._workers[execution_id] = RunningWorker(
execution_id=execution_id,
process=DetachedProcess(pid=live_worker.pid),
log_handle=log_handle,
artifacts=artifacts,
detached=True,
)
recovered_execution_ids.add(execution_id)
stale_executions = tuple(
JobExecution.select(JobExecution, Job)
.join(Job)
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
)
for execution in stale_executions:
job = cast(Job, execution.job)
execution_id = _execution_id(execution)
if execution_id in recovered_execution_ids:
continue
job_id = _job_id(job)
artifacts = JobArtifacts.for_execution(
log_dir=self.log_dir,
job_id=job_id,
execution_id=execution_id,
)
artifacts.log_path.parent.mkdir(parents=True, exist_ok=True)
with artifacts.log_path.open("a", encoding="utf-8") as log_handle:
log_handle.write(
"scheduler: execution marked failed after app restart\n"
)
execution.ended_at = utc_now()
execution.running_status = (
JobExecutionStatus.CANCELED
if execution.stop_requested_at is not None
else JobExecutionStatus.FAILED
)
execution.save()
if stale_executions or recovered_execution_ids:
self._trigger_refresh()
def load_runs_view(
*,
log_dir: str | Path,
now: datetime | None = None,
completed_page: int = 1,
completed_page_size: int = COMPLETED_EXECUTION_PAGE_SIZE,
) -> RunsView:
reference_time = now or datetime.now(UTC)
resolved_log_dir = Path(log_dir)
sanitized_page_size = max(1, completed_page_size)
with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
jobs = tuple(Job.select(Job, Source).join(Source).order_by(Source.name.asc()))
queued_executions = tuple(
JobExecution.select(JobExecution, Job, Source)
.join(Job)
.join(Source)
.where(JobExecution.running_status == JobExecutionStatus.PENDING)
.order_by(JobExecution.created_at.asc(), execution_primary_key.asc())
)
running_executions = tuple(
JobExecution.select(JobExecution, Job, Source)
.join(Job)
.join(Source)
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
.order_by(JobExecution.started_at.desc())
)
completed_query = (
JobExecution.select(JobExecution, Job, Source)
.join(Job)
.join(Source)
.where(
JobExecution.running_status.in_(
(
JobExecutionStatus.SUCCEEDED,
JobExecutionStatus.FAILED,
JobExecutionStatus.CANCELED,
)
)
)
.order_by(JobExecution.ended_at.desc())
)
completed_total_count = completed_query.count()
completed_total_pages = max(
1, math.ceil(completed_total_count / sanitized_page_size)
)
sanitized_completed_page = min(max(1, completed_page), completed_total_pages)
completed_executions = tuple(
completed_query.paginate(sanitized_completed_page, sanitized_page_size)
)
running_by_job = {
_job_id(cast(Job, execution.job)): execution
for execution in running_executions
}
queued_by_job = {
_job_id(cast(Job, execution.job)): execution
for execution in queued_executions
}
return {
"running": tuple(
_project_running_execution(
execution,
resolved_log_dir,
reference_time,
queued_follow_up=queued_by_job.get(_job_id(cast(Job, execution.job))),
)
for execution in running_executions
),
"queued": tuple(
_project_queued_execution(
execution,
reference_time,
position=position,
total_count=len(queued_executions),
)
for position, execution in enumerate(queued_executions, start=1)
),
"upcoming": tuple(
_project_upcoming_job(
job,
running_by_job.get(job.id),
queued_by_job.get(job.id),
reference_time,
)
for job in jobs
),
"completed": tuple(
_project_completed_execution(execution, resolved_log_dir, reference_time)
for execution in completed_executions
),
"completed_page": sanitized_completed_page,
"completed_page_size": sanitized_page_size,
"completed_total_count": completed_total_count,
"completed_total_pages": completed_total_pages,
}
def clear_completed_executions(*, log_dir: str | Path) -> int:
resolved_log_dir = Path(log_dir)
with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
completed_executions = tuple(
JobExecution.select(JobExecution, Job)
.join(Job)
.where(
JobExecution.running_status.in_(
(
JobExecutionStatus.SUCCEEDED,
JobExecutionStatus.FAILED,
JobExecutionStatus.CANCELED,
)
)
)
)
if not completed_executions:
return 0
for execution in completed_executions:
job = cast(Job, execution.job)
prefix = f"job-{_job_id(job)}-execution-{_execution_id(execution)}"
for artifact_path in resolved_log_dir.glob(f"{prefix}.*"):
artifact_path.unlink(missing_ok=True)
execution_ids = tuple(
_execution_id(execution) for execution in completed_executions
)
return (
JobExecution.delete()
.where(execution_primary_key.in_(execution_ids))
.execute()
)
def load_dashboard_view(
*, log_dir: str | Path, now: datetime | None = None
) -> dict[str, object]:
reference_time = now or datetime.now(UTC)
runs_view = load_runs_view(log_dir=log_dir, now=reference_time)
output_dir = Path(log_dir).parent
with database.connection_context():
sources = tuple(Source.select().order_by(Source.name.asc()))
failed_last_day = (
JobExecution.select()
.where(
(JobExecution.running_status == JobExecutionStatus.FAILED)
& (JobExecution.ended_at.is_null(False))
)
.count()
)
upcoming_ready = sum(
1 for job in runs_view["upcoming"] if str(job["run_reason"]) == "Ready"
)
footprint_bytes = _directory_size(output_dir)
return {
"running": runs_view["running"],
"queued": runs_view["queued"],
"source_feeds": tuple(
_project_source_feed(source, output_dir, reference_time)
for source in sources
),
"snapshot": {
"running_now": str(len(runs_view["running"])),
"upcoming_today": str(upcoming_ready),
"failures_24h": str(failed_last_day),
"artifact_footprint": _format_bytes(footprint_bytes),
},
}
def load_execution_log_view(
*, log_dir: str | Path, job_id: int, execution_id: int
) -> ExecutionLogView:
with database.connection_context():
execution = JobExecution.get_or_none(id=execution_id)
route = f"/job/{job_id}/execution/{execution_id}/logs"
if execution is None or _job_id(cast(Job, execution.job)) != job_id:
return ExecutionLogView(
job_id=job_id,
execution_id=execution_id,
title=f"Job {job_id} / execution {execution_id}",
description="Plain text log view routed through the app.",
status_label="Unavailable",
status_tone="failed",
log_text="",
error_message="Execution does not exist.",
)
artifacts = JobArtifacts.for_execution(
log_dir=Path(log_dir),
job_id=job_id,
execution_id=execution_id,
)
if not artifacts.log_path.exists():
return ExecutionLogView(
job_id=job_id,
execution_id=execution_id,
title=f"Job {job_id} / execution {execution_id}",
description="Plain text log view routed through the app.",
status_label=_execution_status_label(execution),
status_tone=_execution_status_tone(execution),
log_text="",
error_message="Log file has not been created yet.",
)
return ExecutionLogView(
job_id=job_id,
execution_id=execution_id,
title=f"Job {job_id} / execution {execution_id}",
description=f"Route: {route}",
status_label=_execution_status_label(execution),
status_tone=_execution_status_tone(execution),
log_text=artifacts.log_path.read_text(encoding="utf-8"),
)
def _job_trigger(job: Job) -> CronTrigger:
expression = " ".join(
(
str(job.cron_minute),
str(job.cron_hour),
str(job.cron_day_of_month),
str(job.cron_month),
str(job.cron_day_of_week),
)
)
return CronTrigger.from_crontab(expression, timezone=UTC)
def _scheduler_job_id(job_id: int) -> str:
return f"{SCHEDULER_JOB_PREFIX}{job_id}"
def _project_running_execution(
execution: JobExecution,
log_dir: Path,
reference_time: datetime,
*,
queued_follow_up: JobExecution | None = None,
) -> dict[str, object]:
job = cast(Job, execution.job)
job_id = _job_id(job)
execution_id = _execution_id(execution)
artifacts = JobArtifacts.for_execution(
log_dir=log_dir, job_id=job_id, execution_id=execution_id
)
started_at = _coerce_datetime(
cast(datetime | str, execution.started_at or execution.created_at)
)
runtime = reference_time - started_at
return {
"source": job.source.name,
"slug": job.source.slug,
"job_id": job_id,
"execution_id": execution_id,
"duration": _format_duration(started_at, reference_time),
"started_at": _humanize_relative_time(reference_time, started_at),
"started_at_iso": started_at.isoformat(),
"runtime": f"running for {int(runtime.total_seconds())}s",
"status": "Stopping" if execution.stop_requested_at else "Running",
"stats": _stats_summary(execution),
"worker": (
"graceful stop requested"
if execution.stop_requested_at
else "streaming stats from worker"
),
"log_href": f"/job/{job_id}/execution/{execution_id}/logs",
"log_exists": artifacts.log_path.exists(),
"cancel_label": "Cancel" if queued_follow_up is not None else "Stop",
"cancel_post_path": (
f"/actions/queued-executions/{_execution_id(queued_follow_up)}/cancel"
if queued_follow_up is not None
else f"/actions/executions/{execution_id}/cancel"
),
}
def _project_queued_execution(
execution: JobExecution,
reference_time: datetime,
*,
position: int,
total_count: int,
) -> dict[str, object]:
job = cast(Job, execution.job)
queued_at = _coerce_datetime(cast(datetime | str, execution.created_at))
execution_id = _execution_id(execution)
return {
"source": job.source.name,
"slug": job.source.slug,
"job_id": _job_id(job),
"execution_id": execution_id,
"queued_at": _humanize_relative_time(reference_time, queued_at),
"queued_at_iso": queued_at.isoformat(),
"queue_position": position,
"status": "Queued",
"status_tone": "idle",
"run_label": "Queued",
"run_disabled": True,
"run_post_path": f"/actions/jobs/{_job_id(job)}/run-now",
"cancel_post_path": (f"/actions/queued-executions/{execution_id}/cancel"),
"move_up_disabled": position == 1,
"move_up_post_path": (
None
if position == 1
else f"/actions/queued-executions/{execution_id}/move-up"
),
"move_down_disabled": position == total_count,
"move_down_post_path": (
None
if position == total_count
else f"/actions/queued-executions/{execution_id}/move-down"
),
}
def _project_upcoming_job(
job: Job,
running_execution: JobExecution | None,
queued_execution: JobExecution | None,
reference_time: datetime,
) -> dict[str, object]:
job_id = _job_id(job)
trigger = _job_trigger(job)
next_run = (
trigger.get_next_fire_time(None, reference_time)
if job.enabled and running_execution is None
else None
)
run_disabled = running_execution is not None or queued_execution is not None
run_reason = (
"Already running"
if running_execution is not None
else ("Queued" if queued_execution is not None else "Ready")
)
return {
"source": job.source.name,
"slug": job.source.slug,
"job_id": job_id,
"next_run": (
_humanize_relative_time(reference_time, next_run)
if next_run is not None
else ("Running now" if running_execution is not None else "Not scheduled")
),
"next_run_at": next_run.isoformat() if next_run is not None else None,
"schedule": " ".join(
(
str(job.cron_minute),
str(job.cron_hour),
str(job.cron_day_of_month),
str(job.cron_month),
str(job.cron_day_of_week),
)
),
"enabled_label": "Enabled" if job.enabled else "Disabled",
"enabled_tone": "scheduled" if job.enabled else "idle",
"run_disabled": run_disabled,
"run_reason": run_reason,
"toggle_label": "Disable" if job.enabled else "Enable",
"toggle_enabled": not job.enabled,
"run_post_path": f"/actions/jobs/{job_id}/run-now",
"toggle_post_path": f"/actions/jobs/{job_id}/toggle-enabled",
"delete_post_path": f"/actions/jobs/{job_id}/delete",
}
def _project_completed_execution(
execution: JobExecution, log_dir: Path, reference_time: datetime
) -> dict[str, object]:
job = cast(Job, execution.job)
job_id = _job_id(job)
execution_id = _execution_id(execution)
artifacts = JobArtifacts.for_execution(
log_dir=log_dir, job_id=job_id, execution_id=execution_id
)
ended_at = (
_coerce_datetime(cast(datetime | str, execution.ended_at))
if execution.ended_at is not None
else None
)
started_at = (
_coerce_datetime(cast(datetime | str, execution.started_at))
if execution.started_at is not None
else None
)
return {
"source": job.source.name,
"slug": job.source.slug,
"job_id": job_id,
"execution_id": execution_id,
"duration": _format_duration(started_at, ended_at),
"ended_at": (
_humanize_relative_time(reference_time, ended_at)
if ended_at is not None
else "Pending"
),
"ended_at_iso": ended_at.isoformat() if ended_at is not None else None,
"status": _execution_status_label(execution),
"status_tone": _execution_status_tone(execution),
"stats": _stats_summary(execution),
"summary": (
"Canceled by operator"
if execution.running_status == JobExecutionStatus.CANCELED
else (
"Worker exited successfully"
if execution.running_status == JobExecutionStatus.SUCCEEDED
else "Worker exited with failure"
)
),
"log_href": f"/job/{job_id}/execution/{execution_id}/logs",
"log_exists": artifacts.log_path.exists(),
}
def _project_source_feed(
source: Source, output_dir: Path, reference_time: datetime
) -> dict[str, object]:
source_slug = str(source.slug)
source_dir = feed_output_dir(out_dir=output_dir, feed_slug=source_slug)
feed_path = feed_output_path(out_dir=output_dir, feed_slug=source_slug)
feed_exists = feed_path.exists()
updated_at = (
datetime.fromtimestamp(feed_path.stat().st_mtime, tz=UTC)
if feed_exists
else None
)
return {
"source": source.name,
"slug": source_slug,
"feed_href": f"/feeds/{source_slug}/feed.rss",
"feed_status_label": "Available" if feed_exists else "Missing",
"feed_status_tone": "done" if feed_exists else "failed",
"feed_exists": feed_exists,
"last_updated": (
_humanize_relative_time(reference_time, updated_at)
if updated_at is not None
else "Never published"
),
"last_updated_iso": updated_at.isoformat() if updated_at is not None else None,
"artifact_footprint": _format_bytes(_directory_size(source_dir)),
}
def _execution_status_label(execution: JobExecution) -> str:
status = JobExecutionStatus(execution.running_status)
return {
JobExecutionStatus.PENDING: "Pending",
JobExecutionStatus.RUNNING: (
"Stopping" if execution.stop_requested_at else "Running"
),
JobExecutionStatus.SUCCEEDED: "Succeeded",
JobExecutionStatus.FAILED: "Failed",
JobExecutionStatus.CANCELED: "Canceled",
}[status]
def _execution_status_tone(execution: JobExecution) -> str:
status = JobExecutionStatus(execution.running_status)
return {
JobExecutionStatus.PENDING: "idle",
JobExecutionStatus.RUNNING: "running",
JobExecutionStatus.SUCCEEDED: "done",
JobExecutionStatus.FAILED: "failed",
JobExecutionStatus.CANCELED: "idle",
}[status]
def _stats_summary(execution: JobExecution) -> str:
bytes_count = cast(int, execution.bytes_count)
return (
f"{execution.requests_count} requests"
f"{execution.items_count} items"
f"{_format_summary_bytes(bytes_count)}"
)
def _final_status(*, execution: JobExecution, returncode: int) -> JobExecutionStatus:
if execution.stop_requested_at is not None:
return JobExecutionStatus.CANCELED
if returncode == 0:
return JobExecutionStatus.SUCCEEDED
return JobExecutionStatus.FAILED
def _worker_final_status(
*,
execution: JobExecution,
worker: RunningWorker,
returncode: int,
) -> JobExecutionStatus:
if worker.detached:
return _detached_final_status(execution=execution, artifacts=worker.artifacts)
return _final_status(execution=execution, returncode=returncode)
def _detached_final_status(
*, execution: JobExecution, artifacts: JobArtifacts
) -> JobExecutionStatus:
if execution.stop_requested_at is not None:
return JobExecutionStatus.CANCELED
log_tail = _read_log_tail(artifacts.log_path)
if "completed successfully" in log_tail:
return JobExecutionStatus.SUCCEEDED
return JobExecutionStatus.FAILED
def _coerce_datetime(value: datetime | str) -> datetime:
if isinstance(value, datetime):
if value.tzinfo is None:
return value.replace(tzinfo=UTC)
return value.astimezone(UTC)
parsed = datetime.fromisoformat(value)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC)
def _job_id(job: Job) -> int:
return int(job.get_id())
def _execution_id(execution: JobExecution) -> int:
return int(execution.get_id())
def _directory_size(path: Path) -> int:
if not path.exists():
return 0
return sum(entry.stat().st_size for entry in path.rglob("*") if entry.is_file())
def _format_bytes(value: int) -> str:
if value < 1024:
return f"{value} B"
if value < 1024 * 1024:
return f"{value / 1024:.1f} KB"
if value < 1024 * 1024 * 1024:
return f"{value / (1024 * 1024):.1f} MB"
return f"{value / (1024 * 1024 * 1024):.1f} GB"
def _format_summary_bytes(value: int) -> str:
if value == 1:
return "1 byte"
if value < 1024:
return f"{value} bytes"
if value < 1024 * 1024:
return f"{value / 1024:.1f} KiB"
if value < 1024 * 1024 * 1024:
return f"{value / (1024 * 1024):.1f} MiB"
return f"{value / (1024 * 1024 * 1024):.1f} GiB"
def _humanize_relative_time(reference_time: datetime, target_time: datetime) -> str:
delta_seconds = int(round((target_time - reference_time).total_seconds()))
if delta_seconds == 0:
return "now"
absolute_delta_seconds = abs(delta_seconds)
units = (
("day", 24 * 60 * 60),
("hour", 60 * 60),
("minute", 60),
)
for label, size in units:
if absolute_delta_seconds >= size:
count = max(1, round(absolute_delta_seconds / size))
suffix = "" if count == 1 else "s"
if delta_seconds > 0:
return f"in {count} {label}{suffix}"
return f"{count} {label}{suffix} ago"
if delta_seconds > 0:
return f"in {absolute_delta_seconds} seconds"
return f"{absolute_delta_seconds} seconds ago"
def _format_duration(
started_at: datetime | None, ended_at: datetime | None
) -> str | None:
if started_at is None or ended_at is None:
return None
total_seconds = max(0, int((ended_at - started_at).total_seconds()))
hours, remainder = divmod(total_seconds, 60 * 60)
minutes, seconds = divmod(remainder, 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def _find_live_workers() -> dict[int, LiveWorker]:
proc_dir = Path("/proc")
if not proc_dir.exists():
return {}
live_workers: dict[int, LiveWorker] = {}
for cmdline_path in proc_dir.glob("[0-9]*/cmdline"):
try:
argv = [
part
for part in cmdline_path.read_bytes()
.decode("utf-8", errors="ignore")
.split("\x00")
if part != ""
]
except OSError:
continue
live_worker = _parse_live_worker(argv, pid=int(cmdline_path.parent.name))
if live_worker is None or not _pid_is_running(live_worker.pid):
continue
live_workers[live_worker.execution_id] = live_worker
return live_workers
def _parse_live_worker(argv: list[str], *, pid: int) -> LiveWorker | None:
if "repub.job_runner" not in argv:
return None
job_id = _argv_flag_value(argv, "--job-id")
execution_id = _argv_flag_value(argv, "--execution-id")
if job_id is None or execution_id is None:
return None
return LiveWorker(job_id=int(job_id), execution_id=int(execution_id), pid=pid)
def _argv_flag_value(argv: list[str], flag: str) -> str | None:
try:
index = argv.index(flag)
except ValueError:
return None
value_index = index + 1
if value_index >= len(argv):
return None
return argv[value_index]
def _pid_is_running(pid: int) -> bool:
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
stat_path = Path(f"/proc/{pid}/stat")
if stat_path.exists():
try:
stat_text = stat_path.read_text(encoding="utf-8")
except OSError:
return True
state_parts = stat_text.split(") ", 1)
if len(state_parts) == 2 and state_parts[1].startswith("Z"):
return False
return True
def _send_signal(pid: int, signum: int) -> None:
try:
os.kill(pid, signum)
except ProcessLookupError:
return
def _read_log_tail(path: Path, *, max_bytes: int = 8192) -> str:
if not path.exists():
return ""
with path.open("rb") as handle:
handle.seek(0, os.SEEK_END)
size = handle.tell()
handle.seek(max(0, size - max_bytes))
return handle.read().decode("utf-8", errors="replace")