1314 lines
45 KiB
Python
1314 lines
45 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import math
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
from datetime import UTC, datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Callable, TextIO, TypedDict, cast
|
|
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from peewee import IntegrityError
|
|
|
|
from repub.config import feed_output_dir, feed_output_path
|
|
from repub.model import (
|
|
Job,
|
|
JobExecution,
|
|
JobExecutionStatus,
|
|
Source,
|
|
database,
|
|
load_max_concurrent_jobs,
|
|
utc_now,
|
|
)
|
|
|
|
SCHEDULER_JOB_PREFIX = "job-"
|
|
POLL_JOB_ID = "runtime-poll-workers"
|
|
SYNC_JOB_ID = "runtime-sync-jobs"
|
|
COMPLETED_EXECUTION_PAGE_SIZE = 20
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class JobArtifacts:
|
|
log_path: Path
|
|
stats_path: Path
|
|
|
|
@classmethod
|
|
def for_execution(
|
|
cls, *, log_dir: Path, job_id: int, execution_id: int
|
|
) -> "JobArtifacts":
|
|
prefix = f"job-{job_id}-execution-{execution_id}"
|
|
return cls(
|
|
log_path=log_dir / f"{prefix}.log",
|
|
stats_path=log_dir / f"{prefix}.jsonl",
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class RunningWorker:
|
|
execution_id: int
|
|
process: subprocess.Popen[str] | DetachedProcess
|
|
log_handle: TextIO
|
|
artifacts: JobArtifacts
|
|
stats_offset: int = 0
|
|
detached: bool = False
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class DetachedProcess:
|
|
pid: int
|
|
|
|
def poll(self) -> int | None:
|
|
return None if _pid_is_running(self.pid) else 0
|
|
|
|
def terminate(self) -> None:
|
|
_send_signal(self.pid, signal.SIGTERM)
|
|
|
|
def kill(self) -> None:
|
|
_send_signal(self.pid, signal.SIGKILL)
|
|
|
|
def wait(self, timeout: float | None = None) -> int:
|
|
deadline = None if timeout is None else time.monotonic() + timeout
|
|
while self.poll() is None:
|
|
if deadline is not None and time.monotonic() >= deadline:
|
|
raise subprocess.TimeoutExpired(
|
|
cmd=f"pid {self.pid}",
|
|
timeout=timeout if timeout is not None else 0.0,
|
|
)
|
|
time.sleep(0.05)
|
|
return 0
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class LiveWorker:
|
|
job_id: int
|
|
execution_id: int
|
|
pid: int
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ExecutionLogView:
|
|
job_id: int
|
|
execution_id: int
|
|
title: str
|
|
description: str
|
|
status_label: str
|
|
status_tone: str
|
|
log_text: str
|
|
error_message: str | None = None
|
|
|
|
|
|
class RunsView(TypedDict):
|
|
running: tuple[dict[str, object], ...]
|
|
queued: tuple[dict[str, object], ...]
|
|
upcoming: tuple[dict[str, object], ...]
|
|
completed: tuple[dict[str, object], ...]
|
|
completed_page: int
|
|
completed_page_size: int
|
|
completed_total_count: int
|
|
completed_total_pages: int
|
|
|
|
|
|
class JobRuntime:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
log_dir: str | Path,
|
|
refresh_callback: Callable[[object], None] | None = None,
|
|
graceful_stop_seconds: float = 15.0,
|
|
) -> None:
|
|
self.log_dir = Path(log_dir)
|
|
self.refresh_callback = refresh_callback
|
|
self.graceful_stop_seconds = graceful_stop_seconds
|
|
self.scheduler = BackgroundScheduler(timezone=UTC)
|
|
self._workers: dict[int, RunningWorker] = {}
|
|
self._run_lock = threading.Lock()
|
|
self._started = False
|
|
self._last_runtime_refresh_at = 0.0
|
|
|
|
def start(self) -> None:
|
|
if self._started:
|
|
return
|
|
|
|
self._reconcile_stale_executions()
|
|
self.scheduler.start()
|
|
self.scheduler.add_job(
|
|
self.poll_workers,
|
|
"interval",
|
|
id=POLL_JOB_ID,
|
|
seconds=0.25,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
coalesce=True,
|
|
)
|
|
self.scheduler.add_job(
|
|
self.sync_jobs,
|
|
"interval",
|
|
id=SYNC_JOB_ID,
|
|
seconds=1,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
coalesce=True,
|
|
)
|
|
self.sync_jobs()
|
|
self._started = True
|
|
self._start_queued_jobs()
|
|
|
|
def shutdown(self) -> None:
|
|
for execution_id in tuple(self._workers):
|
|
worker = self._workers.pop(execution_id)
|
|
if worker.process.poll() is None:
|
|
worker.process.kill()
|
|
worker.process.wait(timeout=2)
|
|
worker.log_handle.close()
|
|
|
|
if self._started:
|
|
self.scheduler.shutdown(wait=False)
|
|
self._started = False
|
|
|
|
def sync_jobs(self) -> None:
|
|
with database.connection_context():
|
|
jobs = tuple(Job.select().where(Job.enabled == True)) # noqa: E712
|
|
|
|
desired_ids = set()
|
|
for job in jobs:
|
|
scheduler_job_id = _scheduler_job_id(_job_id(job))
|
|
desired_ids.add(scheduler_job_id)
|
|
self.scheduler.add_job(
|
|
self.run_scheduled_job,
|
|
trigger=_job_trigger(job),
|
|
args=(_job_id(job),),
|
|
id=scheduler_job_id,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
coalesce=True,
|
|
misfire_grace_time=1,
|
|
)
|
|
|
|
for scheduled_job in tuple(self.scheduler.get_jobs()):
|
|
if (
|
|
scheduled_job.id.startswith(SCHEDULER_JOB_PREFIX)
|
|
and scheduled_job.id not in desired_ids
|
|
):
|
|
self.scheduler.remove_job(scheduled_job.id)
|
|
|
|
def run_scheduled_job(self, job_id: int) -> None:
|
|
self.enqueue_job_run(job_id, reason="scheduled")
|
|
|
|
def run_job_now(self, job_id: int, *, reason: str) -> int | None:
|
|
return self.enqueue_job_run(job_id, reason=reason)
|
|
|
|
def enqueue_job_run(self, job_id: int, *, reason: str) -> int | None:
|
|
del reason
|
|
self.start()
|
|
with self._run_lock:
|
|
execution_id = self._enqueue_job_run_locked(job_id)
|
|
self._start_queued_jobs_locked()
|
|
|
|
if execution_id is not None:
|
|
self._trigger_refresh()
|
|
return execution_id
|
|
|
|
def _enqueue_job_run_locked(self, job_id: int) -> int | None:
|
|
with database.connection_context():
|
|
with database.atomic():
|
|
job = Job.get_or_none(id=job_id)
|
|
if job is None:
|
|
return None
|
|
|
|
pending_execution = JobExecution.get_or_none(
|
|
(JobExecution.job == job)
|
|
& (JobExecution.running_status == JobExecutionStatus.PENDING)
|
|
)
|
|
if pending_execution is not None:
|
|
return _execution_id(pending_execution)
|
|
|
|
try:
|
|
execution = JobExecution.create(
|
|
job=job,
|
|
running_status=JobExecutionStatus.PENDING,
|
|
)
|
|
except IntegrityError:
|
|
pending_execution = JobExecution.get_or_none(
|
|
(JobExecution.job == job)
|
|
& (JobExecution.running_status == JobExecutionStatus.PENDING)
|
|
)
|
|
return (
|
|
_execution_id(pending_execution)
|
|
if pending_execution is not None
|
|
else None
|
|
)
|
|
return _execution_id(execution)
|
|
|
|
def _start_queued_jobs(self) -> None:
|
|
with self._run_lock:
|
|
self._start_queued_jobs_locked()
|
|
|
|
def _start_queued_jobs_locked(self) -> None:
|
|
while True:
|
|
if self._max_concurrent_jobs_reached():
|
|
return
|
|
|
|
claimed_execution = self._claim_next_pending_execution()
|
|
if claimed_execution is None:
|
|
return
|
|
|
|
job = cast(Job, claimed_execution.job)
|
|
self._start_worker_for_execution(
|
|
job_id=_job_id(job),
|
|
execution_id=_execution_id(claimed_execution),
|
|
)
|
|
|
|
def _claim_next_pending_execution(self) -> JobExecution | None:
|
|
with database.connection_context():
|
|
execution_primary_key = getattr(JobExecution, "_meta").primary_key
|
|
pending_executions = tuple(
|
|
JobExecution.select(JobExecution, Job)
|
|
.join(Job)
|
|
.where(JobExecution.running_status == JobExecutionStatus.PENDING)
|
|
.order_by(JobExecution.created_at.asc(), execution_primary_key.asc())
|
|
)
|
|
|
|
for execution in pending_executions:
|
|
job = cast(Job, execution.job)
|
|
already_running = (
|
|
JobExecution.select()
|
|
.where(
|
|
(JobExecution.job == job)
|
|
& (JobExecution.running_status == JobExecutionStatus.RUNNING)
|
|
)
|
|
.exists()
|
|
)
|
|
if already_running:
|
|
continue
|
|
|
|
started_at = utc_now()
|
|
claimed = (
|
|
JobExecution.update(
|
|
started_at=started_at,
|
|
running_status=JobExecutionStatus.RUNNING,
|
|
)
|
|
.where(
|
|
(execution_primary_key == _execution_id(execution))
|
|
& (JobExecution.running_status == JobExecutionStatus.PENDING)
|
|
)
|
|
.execute()
|
|
)
|
|
if claimed:
|
|
return JobExecution.get_by_id(_execution_id(execution))
|
|
return None
|
|
|
|
def _start_worker_for_execution(self, *, job_id: int, execution_id: int) -> None:
|
|
artifacts = JobArtifacts.for_execution(
|
|
log_dir=self.log_dir, job_id=job_id, execution_id=execution_id
|
|
)
|
|
artifacts.log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
log_handle = artifacts.log_path.open("a", encoding="utf-8", buffering=1)
|
|
log_handle.write(
|
|
f"scheduler: starting execution {execution_id} for job {job_id}\n"
|
|
)
|
|
process = subprocess.Popen(
|
|
[
|
|
sys.executable,
|
|
"-u",
|
|
"-m",
|
|
"repub.job_runner",
|
|
"--job-id",
|
|
str(job_id),
|
|
"--execution-id",
|
|
str(execution_id),
|
|
"--db-path",
|
|
str(database.database),
|
|
"--out-dir",
|
|
str(self.log_dir.parent),
|
|
"--stats-path",
|
|
str(artifacts.stats_path),
|
|
],
|
|
stdout=log_handle,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
)
|
|
self._workers[execution_id] = RunningWorker(
|
|
execution_id=execution_id,
|
|
process=process,
|
|
log_handle=log_handle,
|
|
artifacts=artifacts,
|
|
)
|
|
|
|
def _max_concurrent_jobs_reached(self) -> bool:
|
|
return (
|
|
JobExecution.select()
|
|
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
|
|
.count()
|
|
>= load_max_concurrent_jobs()
|
|
)
|
|
|
|
def request_execution_cancel(self, execution_id: int) -> bool:
|
|
with database.connection_context():
|
|
execution = JobExecution.get_or_none(id=execution_id)
|
|
if execution is None:
|
|
return False
|
|
if execution.running_status != JobExecutionStatus.RUNNING:
|
|
return False
|
|
if execution.stop_requested_at is None:
|
|
execution.stop_requested_at = utc_now()
|
|
execution.save()
|
|
|
|
worker = self._workers.get(execution_id)
|
|
if worker is not None and worker.process.poll() is None:
|
|
worker.log_handle.write(
|
|
f"scheduler: graceful stop requested for execution {execution_id}\n"
|
|
)
|
|
worker.process.terminate()
|
|
|
|
self._trigger_refresh("queue-reordered")
|
|
return True
|
|
|
|
def cancel_queued_execution(self, execution_id: int) -> bool:
|
|
with self._run_lock:
|
|
with database.connection_context():
|
|
execution_primary_key = getattr(JobExecution, "_meta").primary_key
|
|
deleted = (
|
|
JobExecution.delete()
|
|
.where(
|
|
(execution_primary_key == execution_id)
|
|
& (JobExecution.running_status == JobExecutionStatus.PENDING)
|
|
)
|
|
.execute()
|
|
)
|
|
|
|
if not deleted:
|
|
return False
|
|
|
|
self._trigger_refresh()
|
|
return True
|
|
|
|
def move_queued_execution(self, execution_id: int, *, direction: str) -> bool:
|
|
offset = -1 if direction == "up" else 1
|
|
with self._run_lock:
|
|
with database.connection_context():
|
|
execution_primary_key = getattr(JobExecution, "_meta").primary_key
|
|
queued_executions = tuple(
|
|
JobExecution.select()
|
|
.where(JobExecution.running_status == JobExecutionStatus.PENDING)
|
|
.order_by(
|
|
JobExecution.created_at.asc(), execution_primary_key.asc()
|
|
)
|
|
)
|
|
current_index = next(
|
|
(
|
|
index
|
|
for index, execution in enumerate(queued_executions)
|
|
if _execution_id(execution) == execution_id
|
|
),
|
|
None,
|
|
)
|
|
if current_index is None:
|
|
return False
|
|
|
|
target_index = current_index + offset
|
|
if target_index < 0 or target_index >= len(queued_executions):
|
|
return False
|
|
|
|
current_execution = queued_executions[current_index]
|
|
target_execution = queued_executions[target_index]
|
|
current_created_at = _coerce_datetime(
|
|
cast(datetime | str, current_execution.created_at)
|
|
)
|
|
target_created_at = _coerce_datetime(
|
|
cast(datetime | str, target_execution.created_at)
|
|
)
|
|
|
|
with database.atomic():
|
|
if current_created_at == target_created_at:
|
|
adjusted_created_at = target_created_at + timedelta(
|
|
microseconds=-1 if offset < 0 else 1
|
|
)
|
|
(
|
|
JobExecution.update(created_at=adjusted_created_at)
|
|
.where(
|
|
execution_primary_key
|
|
== _execution_id(current_execution)
|
|
)
|
|
.execute()
|
|
)
|
|
else:
|
|
(
|
|
JobExecution.update(created_at=target_created_at)
|
|
.where(
|
|
execution_primary_key
|
|
== _execution_id(current_execution)
|
|
)
|
|
.execute()
|
|
)
|
|
(
|
|
JobExecution.update(created_at=current_created_at)
|
|
.where(
|
|
execution_primary_key == _execution_id(target_execution)
|
|
)
|
|
.execute()
|
|
)
|
|
|
|
self._trigger_refresh()
|
|
return True
|
|
|
|
def set_job_enabled(self, job_id: int, *, enabled: bool) -> bool:
|
|
with database.connection_context():
|
|
with database.atomic():
|
|
job = Job.get_or_none(id=job_id)
|
|
if job is None:
|
|
return False
|
|
job.enabled = enabled
|
|
job.save()
|
|
if not enabled:
|
|
(
|
|
JobExecution.delete()
|
|
.where(
|
|
(JobExecution.job == job)
|
|
& (
|
|
JobExecution.running_status
|
|
== JobExecutionStatus.PENDING
|
|
)
|
|
)
|
|
.execute()
|
|
)
|
|
self.sync_jobs()
|
|
self._trigger_refresh()
|
|
return True
|
|
|
|
def poll_workers(self) -> None:
|
|
any_finished = False
|
|
for execution_id in tuple(self._workers):
|
|
worker = self._workers[execution_id]
|
|
self._apply_stats(worker)
|
|
self._enforce_graceful_stop(worker)
|
|
returncode = worker.process.poll()
|
|
if returncode is None:
|
|
continue
|
|
|
|
self._apply_stats(worker)
|
|
with database.connection_context():
|
|
execution = JobExecution.get_by_id(execution_id)
|
|
execution.ended_at = utc_now()
|
|
execution.running_status = _worker_final_status(
|
|
execution=execution,
|
|
worker=worker,
|
|
returncode=returncode,
|
|
)
|
|
execution.save()
|
|
|
|
worker.log_handle.close()
|
|
del self._workers[execution_id]
|
|
any_finished = True
|
|
self._trigger_refresh()
|
|
|
|
if any_finished:
|
|
self._start_queued_jobs()
|
|
|
|
self._refresh_running_runtime()
|
|
|
|
def _apply_stats(self, worker: RunningWorker) -> None:
|
|
if not worker.artifacts.stats_path.exists():
|
|
return
|
|
|
|
with worker.artifacts.stats_path.open("r", encoding="utf-8") as handle:
|
|
handle.seek(worker.stats_offset)
|
|
payload = handle.read()
|
|
worker.stats_offset = handle.tell()
|
|
|
|
lines = [line for line in payload.splitlines() if line.strip()]
|
|
if not lines:
|
|
return
|
|
|
|
stats = json.loads(lines[-1])
|
|
with database.connection_context():
|
|
execution = JobExecution.get_by_id(worker.execution_id)
|
|
execution.requests_count = int(stats.get("requests_count", 0))
|
|
execution.items_count = int(stats.get("items_count", 0))
|
|
execution.warnings_count = int(stats.get("warnings_count", 0))
|
|
execution.errors_count = int(stats.get("errors_count", 0))
|
|
execution.bytes_count = int(stats.get("bytes_count", 0))
|
|
execution.retries_count = int(stats.get("retries_count", 0))
|
|
execution.exceptions_count = int(stats.get("exceptions_count", 0))
|
|
execution.cache_size_count = int(stats.get("cache_size_count", 0))
|
|
execution.cache_object_count = int(stats.get("cache_object_count", 0))
|
|
execution.raw_stats = json.dumps(stats, sort_keys=True)
|
|
execution.save()
|
|
|
|
self._trigger_refresh()
|
|
|
|
def _enforce_graceful_stop(self, worker: RunningWorker) -> None:
|
|
with database.connection_context():
|
|
execution = JobExecution.get_by_id(worker.execution_id)
|
|
if execution.stop_requested_at is None:
|
|
return
|
|
elapsed = utc_now() - _coerce_datetime(execution.stop_requested_at)
|
|
|
|
if (
|
|
elapsed >= timedelta(seconds=self.graceful_stop_seconds)
|
|
and worker.process.poll() is None
|
|
):
|
|
worker.process.kill()
|
|
|
|
def _trigger_refresh(self, event: object = "refresh-event") -> None:
|
|
if self.refresh_callback is not None:
|
|
self.refresh_callback(event)
|
|
|
|
def _refresh_running_runtime(self) -> None:
|
|
if not self._has_running_executions():
|
|
return
|
|
|
|
current_time = time.monotonic()
|
|
if current_time - self._last_runtime_refresh_at < 1.0:
|
|
return
|
|
|
|
self._last_runtime_refresh_at = current_time
|
|
self._trigger_refresh()
|
|
|
|
def _has_running_executions(self) -> bool:
|
|
return (
|
|
JobExecution.select()
|
|
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
|
|
.exists()
|
|
)
|
|
|
|
def _reconcile_stale_executions(self) -> None:
|
|
live_workers = _find_live_workers()
|
|
recovered_execution_ids: set[int] = set()
|
|
with database.connection_context():
|
|
execution_primary_key = getattr(JobExecution, "_meta").primary_key
|
|
if live_workers:
|
|
live_executions = tuple(
|
|
JobExecution.select(JobExecution, Job)
|
|
.join(Job)
|
|
.where(execution_primary_key.in_(tuple(live_workers)))
|
|
)
|
|
else:
|
|
live_executions = ()
|
|
|
|
for execution in live_executions:
|
|
job = cast(Job, execution.job)
|
|
execution_id = _execution_id(execution)
|
|
live_worker = live_workers.get(execution_id)
|
|
if live_worker is None or live_worker.job_id != _job_id(job):
|
|
continue
|
|
|
|
artifacts = JobArtifacts.for_execution(
|
|
log_dir=self.log_dir,
|
|
job_id=live_worker.job_id,
|
|
execution_id=execution_id,
|
|
)
|
|
artifacts.log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
if execution.running_status != JobExecutionStatus.RUNNING:
|
|
execution.running_status = JobExecutionStatus.RUNNING
|
|
execution.ended_at = None
|
|
execution.save()
|
|
message = f"scheduler: restored execution state from live worker pid {live_worker.pid} after app restart\n"
|
|
else:
|
|
message = f"scheduler: reattached to worker pid {live_worker.pid} after app restart\n"
|
|
|
|
log_handle = artifacts.log_path.open("a", encoding="utf-8", buffering=1)
|
|
log_handle.write(message)
|
|
self._workers[execution_id] = RunningWorker(
|
|
execution_id=execution_id,
|
|
process=DetachedProcess(pid=live_worker.pid),
|
|
log_handle=log_handle,
|
|
artifacts=artifacts,
|
|
detached=True,
|
|
)
|
|
recovered_execution_ids.add(execution_id)
|
|
|
|
stale_executions = tuple(
|
|
JobExecution.select(JobExecution, Job)
|
|
.join(Job)
|
|
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
|
|
)
|
|
|
|
for execution in stale_executions:
|
|
job = cast(Job, execution.job)
|
|
execution_id = _execution_id(execution)
|
|
if execution_id in recovered_execution_ids:
|
|
continue
|
|
job_id = _job_id(job)
|
|
artifacts = JobArtifacts.for_execution(
|
|
log_dir=self.log_dir,
|
|
job_id=job_id,
|
|
execution_id=execution_id,
|
|
)
|
|
artifacts.log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with artifacts.log_path.open("a", encoding="utf-8") as log_handle:
|
|
log_handle.write(
|
|
"scheduler: execution marked failed after app restart\n"
|
|
)
|
|
|
|
execution.ended_at = utc_now()
|
|
execution.running_status = (
|
|
JobExecutionStatus.CANCELED
|
|
if execution.stop_requested_at is not None
|
|
else JobExecutionStatus.FAILED
|
|
)
|
|
execution.save()
|
|
|
|
if stale_executions or recovered_execution_ids:
|
|
self._trigger_refresh()
|
|
|
|
|
|
def load_runs_view(
|
|
*,
|
|
log_dir: str | Path,
|
|
now: datetime | None = None,
|
|
completed_page: int = 1,
|
|
completed_page_size: int = COMPLETED_EXECUTION_PAGE_SIZE,
|
|
) -> RunsView:
|
|
reference_time = now or datetime.now(UTC)
|
|
resolved_log_dir = Path(log_dir)
|
|
sanitized_page_size = max(1, completed_page_size)
|
|
with database.connection_context():
|
|
execution_primary_key = getattr(JobExecution, "_meta").primary_key
|
|
jobs = tuple(Job.select(Job, Source).join(Source).order_by(Source.name.asc()))
|
|
queued_executions = tuple(
|
|
JobExecution.select(JobExecution, Job, Source)
|
|
.join(Job)
|
|
.join(Source)
|
|
.where(JobExecution.running_status == JobExecutionStatus.PENDING)
|
|
.order_by(JobExecution.created_at.asc(), execution_primary_key.asc())
|
|
)
|
|
running_executions = tuple(
|
|
JobExecution.select(JobExecution, Job, Source)
|
|
.join(Job)
|
|
.join(Source)
|
|
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
|
|
.order_by(JobExecution.started_at.desc())
|
|
)
|
|
completed_query = (
|
|
JobExecution.select(JobExecution, Job, Source)
|
|
.join(Job)
|
|
.join(Source)
|
|
.where(
|
|
JobExecution.running_status.in_(
|
|
(
|
|
JobExecutionStatus.SUCCEEDED,
|
|
JobExecutionStatus.FAILED,
|
|
JobExecutionStatus.CANCELED,
|
|
)
|
|
)
|
|
)
|
|
.order_by(JobExecution.ended_at.desc())
|
|
)
|
|
completed_total_count = completed_query.count()
|
|
completed_total_pages = max(
|
|
1, math.ceil(completed_total_count / sanitized_page_size)
|
|
)
|
|
sanitized_completed_page = min(max(1, completed_page), completed_total_pages)
|
|
completed_executions = tuple(
|
|
completed_query.paginate(sanitized_completed_page, sanitized_page_size)
|
|
)
|
|
|
|
running_by_job = {
|
|
_job_id(cast(Job, execution.job)): execution
|
|
for execution in running_executions
|
|
}
|
|
queued_by_job = {
|
|
_job_id(cast(Job, execution.job)): execution
|
|
for execution in queued_executions
|
|
}
|
|
return {
|
|
"running": tuple(
|
|
_project_running_execution(
|
|
execution,
|
|
resolved_log_dir,
|
|
reference_time,
|
|
queued_follow_up=queued_by_job.get(_job_id(cast(Job, execution.job))),
|
|
)
|
|
for execution in running_executions
|
|
),
|
|
"queued": tuple(
|
|
_project_queued_execution(
|
|
execution,
|
|
reference_time,
|
|
position=position,
|
|
total_count=len(queued_executions),
|
|
)
|
|
for position, execution in enumerate(queued_executions, start=1)
|
|
),
|
|
"upcoming": tuple(
|
|
_project_upcoming_job(
|
|
job,
|
|
running_by_job.get(job.id),
|
|
queued_by_job.get(job.id),
|
|
reference_time,
|
|
)
|
|
for job in jobs
|
|
),
|
|
"completed": tuple(
|
|
_project_completed_execution(execution, resolved_log_dir, reference_time)
|
|
for execution in completed_executions
|
|
),
|
|
"completed_page": sanitized_completed_page,
|
|
"completed_page_size": sanitized_page_size,
|
|
"completed_total_count": completed_total_count,
|
|
"completed_total_pages": completed_total_pages,
|
|
}
|
|
|
|
|
|
def clear_completed_executions(*, log_dir: str | Path) -> int:
|
|
resolved_log_dir = Path(log_dir)
|
|
with database.connection_context():
|
|
execution_primary_key = getattr(JobExecution, "_meta").primary_key
|
|
completed_executions = tuple(
|
|
JobExecution.select(JobExecution, Job)
|
|
.join(Job)
|
|
.where(
|
|
JobExecution.running_status.in_(
|
|
(
|
|
JobExecutionStatus.SUCCEEDED,
|
|
JobExecutionStatus.FAILED,
|
|
JobExecutionStatus.CANCELED,
|
|
)
|
|
)
|
|
)
|
|
)
|
|
if not completed_executions:
|
|
return 0
|
|
|
|
for execution in completed_executions:
|
|
job = cast(Job, execution.job)
|
|
prefix = f"job-{_job_id(job)}-execution-{_execution_id(execution)}"
|
|
for artifact_path in resolved_log_dir.glob(f"{prefix}.*"):
|
|
artifact_path.unlink(missing_ok=True)
|
|
|
|
execution_ids = tuple(
|
|
_execution_id(execution) for execution in completed_executions
|
|
)
|
|
return (
|
|
JobExecution.delete()
|
|
.where(execution_primary_key.in_(execution_ids))
|
|
.execute()
|
|
)
|
|
|
|
|
|
def load_dashboard_view(
|
|
*, log_dir: str | Path, now: datetime | None = None
|
|
) -> dict[str, object]:
|
|
reference_time = now or datetime.now(UTC)
|
|
runs_view = load_runs_view(log_dir=log_dir, now=reference_time)
|
|
output_dir = Path(log_dir).parent
|
|
with database.connection_context():
|
|
sources = tuple(Source.select().order_by(Source.name.asc()))
|
|
failed_last_day = (
|
|
JobExecution.select()
|
|
.where(
|
|
(JobExecution.running_status == JobExecutionStatus.FAILED)
|
|
& (JobExecution.ended_at.is_null(False))
|
|
)
|
|
.count()
|
|
)
|
|
|
|
upcoming_ready = sum(
|
|
1 for job in runs_view["upcoming"] if str(job["run_reason"]) == "Ready"
|
|
)
|
|
footprint_bytes = _directory_size(output_dir)
|
|
return {
|
|
"running": runs_view["running"],
|
|
"source_feeds": tuple(
|
|
_project_source_feed(source, output_dir, reference_time)
|
|
for source in sources
|
|
),
|
|
"snapshot": {
|
|
"running_now": str(len(runs_view["running"])),
|
|
"upcoming_today": str(upcoming_ready),
|
|
"failures_24h": str(failed_last_day),
|
|
"artifact_footprint": _format_bytes(footprint_bytes),
|
|
},
|
|
}
|
|
|
|
|
|
def load_execution_log_view(
|
|
*, log_dir: str | Path, job_id: int, execution_id: int
|
|
) -> ExecutionLogView:
|
|
with database.connection_context():
|
|
execution = JobExecution.get_or_none(id=execution_id)
|
|
|
|
route = f"/job/{job_id}/execution/{execution_id}/logs"
|
|
if execution is None or _job_id(cast(Job, execution.job)) != job_id:
|
|
return ExecutionLogView(
|
|
job_id=job_id,
|
|
execution_id=execution_id,
|
|
title=f"Job {job_id} / execution {execution_id}",
|
|
description="Plain text log view routed through the app.",
|
|
status_label="Unavailable",
|
|
status_tone="failed",
|
|
log_text="",
|
|
error_message="Execution does not exist.",
|
|
)
|
|
|
|
artifacts = JobArtifacts.for_execution(
|
|
log_dir=Path(log_dir),
|
|
job_id=job_id,
|
|
execution_id=execution_id,
|
|
)
|
|
if not artifacts.log_path.exists():
|
|
return ExecutionLogView(
|
|
job_id=job_id,
|
|
execution_id=execution_id,
|
|
title=f"Job {job_id} / execution {execution_id}",
|
|
description="Plain text log view routed through the app.",
|
|
status_label=_execution_status_label(execution),
|
|
status_tone=_execution_status_tone(execution),
|
|
log_text="",
|
|
error_message="Log file has not been created yet.",
|
|
)
|
|
|
|
return ExecutionLogView(
|
|
job_id=job_id,
|
|
execution_id=execution_id,
|
|
title=f"Job {job_id} / execution {execution_id}",
|
|
description=f"Route: {route}",
|
|
status_label=_execution_status_label(execution),
|
|
status_tone=_execution_status_tone(execution),
|
|
log_text=artifacts.log_path.read_text(encoding="utf-8"),
|
|
)
|
|
|
|
|
|
def _job_trigger(job: Job) -> CronTrigger:
|
|
expression = " ".join(
|
|
(
|
|
str(job.cron_minute),
|
|
str(job.cron_hour),
|
|
str(job.cron_day_of_month),
|
|
str(job.cron_month),
|
|
str(job.cron_day_of_week),
|
|
)
|
|
)
|
|
return CronTrigger.from_crontab(expression, timezone=UTC)
|
|
|
|
|
|
def _scheduler_job_id(job_id: int) -> str:
|
|
return f"{SCHEDULER_JOB_PREFIX}{job_id}"
|
|
|
|
|
|
def _project_running_execution(
|
|
execution: JobExecution,
|
|
log_dir: Path,
|
|
reference_time: datetime,
|
|
*,
|
|
queued_follow_up: JobExecution | None = None,
|
|
) -> dict[str, object]:
|
|
job = cast(Job, execution.job)
|
|
job_id = _job_id(job)
|
|
execution_id = _execution_id(execution)
|
|
artifacts = JobArtifacts.for_execution(
|
|
log_dir=log_dir, job_id=job_id, execution_id=execution_id
|
|
)
|
|
started_at = _coerce_datetime(
|
|
cast(datetime | str, execution.started_at or execution.created_at)
|
|
)
|
|
runtime = reference_time - started_at
|
|
return {
|
|
"source": job.source.name,
|
|
"slug": job.source.slug,
|
|
"job_id": job_id,
|
|
"execution_id": execution_id,
|
|
"started_at": started_at.strftime("%Y-%m-%d %H:%M UTC"),
|
|
"runtime": f"running for {int(runtime.total_seconds())}s",
|
|
"status": "Stopping" if execution.stop_requested_at else "Running",
|
|
"stats": _stats_summary(execution),
|
|
"worker": (
|
|
"graceful stop requested"
|
|
if execution.stop_requested_at
|
|
else "streaming stats from worker"
|
|
),
|
|
"log_href": f"/job/{job_id}/execution/{execution_id}/logs",
|
|
"log_exists": artifacts.log_path.exists(),
|
|
"cancel_label": "Cancel" if queued_follow_up is not None else "Stop",
|
|
"cancel_post_path": (
|
|
f"/actions/queued-executions/{_execution_id(queued_follow_up)}/cancel"
|
|
if queued_follow_up is not None
|
|
else f"/actions/executions/{execution_id}/cancel"
|
|
),
|
|
}
|
|
|
|
|
|
def _project_queued_execution(
|
|
execution: JobExecution,
|
|
reference_time: datetime,
|
|
*,
|
|
position: int,
|
|
total_count: int,
|
|
) -> dict[str, object]:
|
|
job = cast(Job, execution.job)
|
|
queued_at = _coerce_datetime(cast(datetime | str, execution.created_at))
|
|
execution_id = _execution_id(execution)
|
|
return {
|
|
"source": job.source.name,
|
|
"slug": job.source.slug,
|
|
"job_id": _job_id(job),
|
|
"execution_id": execution_id,
|
|
"queued_at": _humanize_relative_time(reference_time, queued_at),
|
|
"queued_at_iso": queued_at.isoformat(),
|
|
"queue_position": position,
|
|
"status": "Queued",
|
|
"status_tone": "idle",
|
|
"run_label": "Queued",
|
|
"run_disabled": True,
|
|
"run_post_path": f"/actions/jobs/{_job_id(job)}/run-now",
|
|
"cancel_post_path": (f"/actions/queued-executions/{execution_id}/cancel"),
|
|
"move_up_disabled": position == 1,
|
|
"move_up_post_path": (
|
|
None
|
|
if position == 1
|
|
else f"/actions/queued-executions/{execution_id}/move-up"
|
|
),
|
|
"move_down_disabled": position == total_count,
|
|
"move_down_post_path": (
|
|
None
|
|
if position == total_count
|
|
else f"/actions/queued-executions/{execution_id}/move-down"
|
|
),
|
|
}
|
|
|
|
|
|
def _project_upcoming_job(
|
|
job: Job,
|
|
running_execution: JobExecution | None,
|
|
queued_execution: JobExecution | None,
|
|
reference_time: datetime,
|
|
) -> dict[str, object]:
|
|
job_id = _job_id(job)
|
|
trigger = _job_trigger(job)
|
|
next_run = (
|
|
trigger.get_next_fire_time(None, reference_time)
|
|
if job.enabled and running_execution is None
|
|
else None
|
|
)
|
|
run_disabled = running_execution is not None or queued_execution is not None
|
|
run_reason = (
|
|
"Already running"
|
|
if running_execution is not None
|
|
else ("Queued" if queued_execution is not None else "Ready")
|
|
)
|
|
return {
|
|
"source": job.source.name,
|
|
"slug": job.source.slug,
|
|
"job_id": job_id,
|
|
"next_run": (
|
|
_humanize_relative_time(reference_time, next_run)
|
|
if next_run is not None
|
|
else ("Running now" if running_execution is not None else "Not scheduled")
|
|
),
|
|
"next_run_at": next_run.isoformat() if next_run is not None else None,
|
|
"schedule": " ".join(
|
|
(
|
|
str(job.cron_minute),
|
|
str(job.cron_hour),
|
|
str(job.cron_day_of_month),
|
|
str(job.cron_month),
|
|
str(job.cron_day_of_week),
|
|
)
|
|
),
|
|
"enabled_label": "Enabled" if job.enabled else "Disabled",
|
|
"enabled_tone": "scheduled" if job.enabled else "idle",
|
|
"run_disabled": run_disabled,
|
|
"run_reason": run_reason,
|
|
"toggle_label": "Disable" if job.enabled else "Enable",
|
|
"toggle_enabled": not job.enabled,
|
|
"run_post_path": f"/actions/jobs/{job_id}/run-now",
|
|
"toggle_post_path": f"/actions/jobs/{job_id}/toggle-enabled",
|
|
"delete_post_path": f"/actions/jobs/{job_id}/delete",
|
|
}
|
|
|
|
|
|
def _project_completed_execution(
|
|
execution: JobExecution, log_dir: Path, reference_time: datetime
|
|
) -> dict[str, object]:
|
|
job = cast(Job, execution.job)
|
|
job_id = _job_id(job)
|
|
execution_id = _execution_id(execution)
|
|
artifacts = JobArtifacts.for_execution(
|
|
log_dir=log_dir, job_id=job_id, execution_id=execution_id
|
|
)
|
|
ended_at = (
|
|
_coerce_datetime(cast(datetime | str, execution.ended_at))
|
|
if execution.ended_at is not None
|
|
else None
|
|
)
|
|
return {
|
|
"source": job.source.name,
|
|
"slug": job.source.slug,
|
|
"job_id": job_id,
|
|
"execution_id": execution_id,
|
|
"ended_at": (
|
|
_humanize_relative_time(reference_time, ended_at)
|
|
if ended_at is not None
|
|
else "Pending"
|
|
),
|
|
"ended_at_iso": ended_at.isoformat() if ended_at is not None else None,
|
|
"status": _execution_status_label(execution),
|
|
"status_tone": _execution_status_tone(execution),
|
|
"stats": _stats_summary(execution),
|
|
"summary": (
|
|
"Canceled by operator"
|
|
if execution.running_status == JobExecutionStatus.CANCELED
|
|
else (
|
|
"Worker exited successfully"
|
|
if execution.running_status == JobExecutionStatus.SUCCEEDED
|
|
else "Worker exited with failure"
|
|
)
|
|
),
|
|
"log_href": f"/job/{job_id}/execution/{execution_id}/logs",
|
|
"log_exists": artifacts.log_path.exists(),
|
|
}
|
|
|
|
|
|
def _project_source_feed(
|
|
source: Source, output_dir: Path, reference_time: datetime
|
|
) -> dict[str, object]:
|
|
source_slug = str(source.slug)
|
|
source_dir = feed_output_dir(out_dir=output_dir, feed_slug=source_slug)
|
|
feed_path = feed_output_path(out_dir=output_dir, feed_slug=source_slug)
|
|
feed_exists = feed_path.exists()
|
|
updated_at = (
|
|
datetime.fromtimestamp(feed_path.stat().st_mtime, tz=UTC)
|
|
if feed_exists
|
|
else None
|
|
)
|
|
return {
|
|
"source": source.name,
|
|
"slug": source_slug,
|
|
"feed_href": f"/feeds/{source_slug}/feed.rss",
|
|
"feed_status_label": "Available" if feed_exists else "Missing",
|
|
"feed_status_tone": "done" if feed_exists else "failed",
|
|
"feed_exists": feed_exists,
|
|
"last_updated": (
|
|
_humanize_relative_time(reference_time, updated_at)
|
|
if updated_at is not None
|
|
else "Never published"
|
|
),
|
|
"last_updated_iso": updated_at.isoformat() if updated_at is not None else None,
|
|
"artifact_footprint": _format_bytes(_directory_size(source_dir)),
|
|
}
|
|
|
|
|
|
def _execution_status_label(execution: JobExecution) -> str:
|
|
status = JobExecutionStatus(execution.running_status)
|
|
return {
|
|
JobExecutionStatus.PENDING: "Pending",
|
|
JobExecutionStatus.RUNNING: (
|
|
"Stopping" if execution.stop_requested_at else "Running"
|
|
),
|
|
JobExecutionStatus.SUCCEEDED: "Succeeded",
|
|
JobExecutionStatus.FAILED: "Failed",
|
|
JobExecutionStatus.CANCELED: "Canceled",
|
|
}[status]
|
|
|
|
|
|
def _execution_status_tone(execution: JobExecution) -> str:
|
|
status = JobExecutionStatus(execution.running_status)
|
|
return {
|
|
JobExecutionStatus.PENDING: "idle",
|
|
JobExecutionStatus.RUNNING: "running",
|
|
JobExecutionStatus.SUCCEEDED: "done",
|
|
JobExecutionStatus.FAILED: "failed",
|
|
JobExecutionStatus.CANCELED: "idle",
|
|
}[status]
|
|
|
|
|
|
def _stats_summary(execution: JobExecution) -> str:
|
|
bytes_count = cast(int, execution.bytes_count)
|
|
return (
|
|
f"{execution.requests_count} requests"
|
|
f" • {execution.items_count} items"
|
|
f" • {_format_summary_bytes(bytes_count)}"
|
|
)
|
|
|
|
|
|
def _final_status(*, execution: JobExecution, returncode: int) -> JobExecutionStatus:
|
|
if execution.stop_requested_at is not None:
|
|
return JobExecutionStatus.CANCELED
|
|
if returncode == 0:
|
|
return JobExecutionStatus.SUCCEEDED
|
|
return JobExecutionStatus.FAILED
|
|
|
|
|
|
def _worker_final_status(
|
|
*,
|
|
execution: JobExecution,
|
|
worker: RunningWorker,
|
|
returncode: int,
|
|
) -> JobExecutionStatus:
|
|
if worker.detached:
|
|
return _detached_final_status(execution=execution, artifacts=worker.artifacts)
|
|
return _final_status(execution=execution, returncode=returncode)
|
|
|
|
|
|
def _detached_final_status(
|
|
*, execution: JobExecution, artifacts: JobArtifacts
|
|
) -> JobExecutionStatus:
|
|
if execution.stop_requested_at is not None:
|
|
return JobExecutionStatus.CANCELED
|
|
log_tail = _read_log_tail(artifacts.log_path)
|
|
if "completed successfully" in log_tail:
|
|
return JobExecutionStatus.SUCCEEDED
|
|
return JobExecutionStatus.FAILED
|
|
|
|
|
|
def _coerce_datetime(value: datetime | str) -> datetime:
|
|
if isinstance(value, datetime):
|
|
if value.tzinfo is None:
|
|
return value.replace(tzinfo=UTC)
|
|
return value.astimezone(UTC)
|
|
|
|
parsed = datetime.fromisoformat(value)
|
|
if parsed.tzinfo is None:
|
|
return parsed.replace(tzinfo=UTC)
|
|
return parsed.astimezone(UTC)
|
|
|
|
|
|
def _job_id(job: Job) -> int:
|
|
return int(job.get_id())
|
|
|
|
|
|
def _execution_id(execution: JobExecution) -> int:
|
|
return int(execution.get_id())
|
|
|
|
|
|
def _directory_size(path: Path) -> int:
|
|
if not path.exists():
|
|
return 0
|
|
return sum(entry.stat().st_size for entry in path.rglob("*") if entry.is_file())
|
|
|
|
|
|
def _format_bytes(value: int) -> str:
|
|
if value < 1024:
|
|
return f"{value} B"
|
|
if value < 1024 * 1024:
|
|
return f"{value / 1024:.1f} KB"
|
|
if value < 1024 * 1024 * 1024:
|
|
return f"{value / (1024 * 1024):.1f} MB"
|
|
return f"{value / (1024 * 1024 * 1024):.1f} GB"
|
|
|
|
|
|
def _format_summary_bytes(value: int) -> str:
|
|
if value == 1:
|
|
return "1 byte"
|
|
if value < 1024:
|
|
return f"{value} bytes"
|
|
if value < 1024 * 1024:
|
|
return f"{value / 1024:.1f} KiB"
|
|
if value < 1024 * 1024 * 1024:
|
|
return f"{value / (1024 * 1024):.1f} MiB"
|
|
return f"{value / (1024 * 1024 * 1024):.1f} GiB"
|
|
|
|
|
|
def _humanize_relative_time(reference_time: datetime, target_time: datetime) -> str:
|
|
delta_seconds = int(round((target_time - reference_time).total_seconds()))
|
|
if delta_seconds == 0:
|
|
return "now"
|
|
|
|
absolute_delta_seconds = abs(delta_seconds)
|
|
units = (
|
|
("day", 24 * 60 * 60),
|
|
("hour", 60 * 60),
|
|
("minute", 60),
|
|
)
|
|
for label, size in units:
|
|
if absolute_delta_seconds >= size:
|
|
count = max(1, round(absolute_delta_seconds / size))
|
|
suffix = "" if count == 1 else "s"
|
|
if delta_seconds > 0:
|
|
return f"in {count} {label}{suffix}"
|
|
return f"{count} {label}{suffix} ago"
|
|
|
|
if delta_seconds > 0:
|
|
return f"in {absolute_delta_seconds} seconds"
|
|
return f"{absolute_delta_seconds} seconds ago"
|
|
|
|
|
|
def _find_live_workers() -> dict[int, LiveWorker]:
|
|
proc_dir = Path("/proc")
|
|
if not proc_dir.exists():
|
|
return {}
|
|
|
|
live_workers: dict[int, LiveWorker] = {}
|
|
for cmdline_path in proc_dir.glob("[0-9]*/cmdline"):
|
|
try:
|
|
argv = [
|
|
part
|
|
for part in cmdline_path.read_bytes()
|
|
.decode("utf-8", errors="ignore")
|
|
.split("\x00")
|
|
if part != ""
|
|
]
|
|
except OSError:
|
|
continue
|
|
|
|
live_worker = _parse_live_worker(argv, pid=int(cmdline_path.parent.name))
|
|
if live_worker is None or not _pid_is_running(live_worker.pid):
|
|
continue
|
|
|
|
live_workers[live_worker.execution_id] = live_worker
|
|
return live_workers
|
|
|
|
|
|
def _parse_live_worker(argv: list[str], *, pid: int) -> LiveWorker | None:
|
|
if "repub.job_runner" not in argv:
|
|
return None
|
|
job_id = _argv_flag_value(argv, "--job-id")
|
|
execution_id = _argv_flag_value(argv, "--execution-id")
|
|
if job_id is None or execution_id is None:
|
|
return None
|
|
return LiveWorker(job_id=int(job_id), execution_id=int(execution_id), pid=pid)
|
|
|
|
|
|
def _argv_flag_value(argv: list[str], flag: str) -> str | None:
|
|
try:
|
|
index = argv.index(flag)
|
|
except ValueError:
|
|
return None
|
|
value_index = index + 1
|
|
if value_index >= len(argv):
|
|
return None
|
|
return argv[value_index]
|
|
|
|
|
|
def _pid_is_running(pid: int) -> bool:
|
|
try:
|
|
os.kill(pid, 0)
|
|
except ProcessLookupError:
|
|
return False
|
|
except PermissionError:
|
|
return True
|
|
stat_path = Path(f"/proc/{pid}/stat")
|
|
if stat_path.exists():
|
|
try:
|
|
stat_text = stat_path.read_text(encoding="utf-8")
|
|
except OSError:
|
|
return True
|
|
state_parts = stat_text.split(") ", 1)
|
|
if len(state_parts) == 2 and state_parts[1].startswith("Z"):
|
|
return False
|
|
return True
|
|
|
|
|
|
def _send_signal(pid: int, signum: int) -> None:
|
|
try:
|
|
os.kill(pid, signum)
|
|
except ProcessLookupError:
|
|
return
|
|
|
|
|
|
def _read_log_tail(path: Path, *, max_bytes: int = 8192) -> str:
|
|
if not path.exists():
|
|
return ""
|
|
|
|
with path.open("rb") as handle:
|
|
handle.seek(0, os.SEEK_END)
|
|
size = handle.tell()
|
|
handle.seek(max(0, size - max_bytes))
|
|
return handle.read().decode("utf-8", errors="replace")
|