from __future__ import annotations import json import subprocess import sys from dataclasses import dataclass from datetime import UTC, datetime, timedelta from pathlib import Path from typing import Callable, TextIO, cast from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from repub.config import feed_output_dir, feed_output_path from repub.model import Job, JobExecution, JobExecutionStatus, Source, database, utc_now SCHEDULER_JOB_PREFIX = "job-" POLL_JOB_ID = "runtime-poll-workers" SYNC_JOB_ID = "runtime-sync-jobs" @dataclass(frozen=True) class JobArtifacts: log_path: Path stats_path: Path @classmethod def for_execution( cls, *, log_dir: Path, job_id: int, execution_id: int ) -> "JobArtifacts": prefix = f"job-{job_id}-execution-{execution_id}" return cls( log_path=log_dir / f"{prefix}.log", stats_path=log_dir / f"{prefix}.jsonl", ) @dataclass class RunningWorker: execution_id: int process: subprocess.Popen[str] log_handle: TextIO artifacts: JobArtifacts stats_offset: int = 0 @dataclass(frozen=True) class ExecutionLogView: job_id: int execution_id: int title: str description: str status_label: str status_tone: str log_text: str error_message: str | None = None class JobRuntime: def __init__( self, *, log_dir: str | Path, refresh_callback: Callable[[], None] | None = None, graceful_stop_seconds: float = 15.0, ) -> None: self.log_dir = Path(log_dir) self.refresh_callback = refresh_callback self.graceful_stop_seconds = graceful_stop_seconds self.scheduler = BackgroundScheduler(timezone=UTC) self._workers: dict[int, RunningWorker] = {} self._started = False def start(self) -> None: if self._started: return self._reconcile_stale_executions() self.scheduler.start() self.scheduler.add_job( self.poll_workers, "interval", id=POLL_JOB_ID, seconds=0.25, replace_existing=True, max_instances=1, coalesce=True, ) self.scheduler.add_job( self.sync_jobs, "interval", id=SYNC_JOB_ID, seconds=1, replace_existing=True, max_instances=1, coalesce=True, ) self.sync_jobs() self._started = True def shutdown(self) -> None: for execution_id in tuple(self._workers): worker = self._workers.pop(execution_id) if worker.process.poll() is None: worker.process.kill() worker.process.wait(timeout=2) worker.log_handle.close() if self._started: self.scheduler.shutdown(wait=False) self._started = False def sync_jobs(self) -> None: with database.connection_context(): jobs = tuple(Job.select().where(Job.enabled == True)) # noqa: E712 desired_ids = set() for job in jobs: scheduler_job_id = _scheduler_job_id(_job_id(job)) desired_ids.add(scheduler_job_id) self.scheduler.add_job( self.run_scheduled_job, trigger=_job_trigger(job), args=(_job_id(job),), id=scheduler_job_id, replace_existing=True, max_instances=1, coalesce=True, misfire_grace_time=1, ) for scheduled_job in tuple(self.scheduler.get_jobs()): if ( scheduled_job.id.startswith(SCHEDULER_JOB_PREFIX) and scheduled_job.id not in desired_ids ): self.scheduler.remove_job(scheduled_job.id) def run_scheduled_job(self, job_id: int) -> None: self.run_job_now(job_id, reason="scheduled") def run_job_now(self, job_id: int, *, reason: str) -> int | None: del reason self.start() with database.connection_context(): job = Job.get_or_none(id=job_id) if job is None: return None already_running = ( JobExecution.select() .where( (JobExecution.job == job) & (JobExecution.running_status == JobExecutionStatus.RUNNING) ) .exists() ) if already_running: return None execution = JobExecution.create( job=job, started_at=utc_now(), running_status=JobExecutionStatus.RUNNING, ) execution_id = _execution_id(execution) artifacts = JobArtifacts.for_execution( log_dir=self.log_dir, job_id=job_id, execution_id=execution_id ) artifacts.log_path.parent.mkdir(parents=True, exist_ok=True) log_handle = artifacts.log_path.open("a", encoding="utf-8", buffering=1) log_handle.write( f"scheduler: starting execution {execution_id} for job {job_id}\n" ) process = subprocess.Popen( [ sys.executable, "-u", "-m", "repub.job_runner", "--job-id", str(job_id), "--execution-id", str(execution_id), "--db-path", str(database.database), "--out-dir", str(self.log_dir.parent), "--stats-path", str(artifacts.stats_path), ], stdout=log_handle, stderr=subprocess.STDOUT, text=True, ) self._workers[execution_id] = RunningWorker( execution_id=execution_id, process=process, log_handle=log_handle, artifacts=artifacts, ) self._trigger_refresh() return execution_id def request_execution_cancel(self, execution_id: int) -> bool: with database.connection_context(): execution = JobExecution.get_or_none(id=execution_id) if execution is None: return False if execution.running_status != JobExecutionStatus.RUNNING: return False if execution.stop_requested_at is None: execution.stop_requested_at = utc_now() execution.save() worker = self._workers.get(execution_id) if worker is not None and worker.process.poll() is None: worker.log_handle.write( f"scheduler: graceful stop requested for execution {execution_id}\n" ) worker.process.terminate() self._trigger_refresh() return True def set_job_enabled(self, job_id: int, *, enabled: bool) -> bool: with database.connection_context(): job = Job.get_or_none(id=job_id) if job is None: return False job.enabled = enabled job.save() self.sync_jobs() self._trigger_refresh() return True def poll_workers(self) -> None: for execution_id in tuple(self._workers): worker = self._workers[execution_id] self._apply_stats(worker) self._enforce_graceful_stop(worker) returncode = worker.process.poll() if returncode is None: continue self._apply_stats(worker) with database.connection_context(): execution = JobExecution.get_by_id(execution_id) execution.ended_at = utc_now() execution.running_status = _final_status( execution=execution, returncode=returncode, ) execution.save() worker.log_handle.close() del self._workers[execution_id] self._trigger_refresh() def _apply_stats(self, worker: RunningWorker) -> None: if not worker.artifacts.stats_path.exists(): return with worker.artifacts.stats_path.open("r", encoding="utf-8") as handle: handle.seek(worker.stats_offset) payload = handle.read() worker.stats_offset = handle.tell() lines = [line for line in payload.splitlines() if line.strip()] if not lines: return stats = json.loads(lines[-1]) with database.connection_context(): execution = JobExecution.get_by_id(worker.execution_id) execution.requests_count = int(stats.get("requests_count", 0)) execution.items_count = int(stats.get("items_count", 0)) execution.warnings_count = int(stats.get("warnings_count", 0)) execution.errors_count = int(stats.get("errors_count", 0)) execution.bytes_count = int(stats.get("bytes_count", 0)) execution.retries_count = int(stats.get("retries_count", 0)) execution.exceptions_count = int(stats.get("exceptions_count", 0)) execution.cache_size_count = int(stats.get("cache_size_count", 0)) execution.cache_object_count = int(stats.get("cache_object_count", 0)) execution.raw_stats = json.dumps(stats, sort_keys=True) execution.save() self._trigger_refresh() def _enforce_graceful_stop(self, worker: RunningWorker) -> None: with database.connection_context(): execution = JobExecution.get_by_id(worker.execution_id) if execution.stop_requested_at is None: return elapsed = utc_now() - _coerce_datetime(execution.stop_requested_at) if ( elapsed >= timedelta(seconds=self.graceful_stop_seconds) and worker.process.poll() is None ): worker.process.kill() def _trigger_refresh(self) -> None: if self.refresh_callback is not None: self.refresh_callback() def _reconcile_stale_executions(self) -> None: with database.connection_context(): stale_executions = tuple( JobExecution.select(JobExecution, Job) .join(Job) .where(JobExecution.running_status == JobExecutionStatus.RUNNING) ) for execution in stale_executions: job = cast(Job, execution.job) execution_id = _execution_id(execution) artifacts = JobArtifacts.for_execution( log_dir=self.log_dir, job_id=_job_id(job), execution_id=execution_id, ) artifacts.log_path.parent.mkdir(parents=True, exist_ok=True) with artifacts.log_path.open("a", encoding="utf-8") as log_handle: log_handle.write( "scheduler: execution marked failed after app restart\n" ) execution.ended_at = utc_now() execution.running_status = ( JobExecutionStatus.CANCELED if execution.stop_requested_at is not None else JobExecutionStatus.FAILED ) execution.save() if stale_executions: self._trigger_refresh() def load_runs_view( *, log_dir: str | Path, now: datetime | None = None ) -> dict[str, tuple[dict[str, object], ...]]: reference_time = now or datetime.now(UTC) resolved_log_dir = Path(log_dir) with database.connection_context(): jobs = tuple(Job.select(Job, Source).join(Source).order_by(Source.name.asc())) running_executions = tuple( JobExecution.select(JobExecution, Job, Source) .join(Job) .join(Source) .where(JobExecution.running_status == JobExecutionStatus.RUNNING) .order_by(JobExecution.started_at.desc()) ) completed_executions = tuple( JobExecution.select(JobExecution, Job, Source) .join(Job) .join(Source) .where( JobExecution.running_status.in_( ( JobExecutionStatus.SUCCEEDED, JobExecutionStatus.FAILED, JobExecutionStatus.CANCELED, ) ) ) .order_by(JobExecution.ended_at.desc()) .limit(20) ) running_by_job = { _job_id(execution.job): execution for execution in running_executions } return { "running": tuple( _project_running_execution(execution, resolved_log_dir, reference_time) for execution in running_executions ), "upcoming": tuple( _project_upcoming_job(job, running_by_job.get(job.id), reference_time) for job in jobs ), "completed": tuple( _project_completed_execution(execution, resolved_log_dir, reference_time) for execution in completed_executions ), } def load_dashboard_view( *, log_dir: str | Path, now: datetime | None = None ) -> dict[str, object]: reference_time = now or datetime.now(UTC) runs_view = load_runs_view(log_dir=log_dir, now=reference_time) output_dir = Path(log_dir).parent with database.connection_context(): sources = tuple(Source.select().order_by(Source.name.asc())) failed_last_day = ( JobExecution.select() .where( (JobExecution.running_status == JobExecutionStatus.FAILED) & (JobExecution.ended_at.is_null(False)) ) .count() ) upcoming_ready = sum( 1 for job in runs_view["upcoming"] if str(job["run_reason"]) == "Ready" ) footprint_bytes = _directory_size(output_dir) return { "running": runs_view["running"], "source_feeds": tuple( _project_source_feed(source, output_dir, reference_time) for source in sources ), "snapshot": { "running_now": str(len(runs_view["running"])), "upcoming_today": str(upcoming_ready), "failures_24h": str(failed_last_day), "artifact_footprint": _format_bytes(footprint_bytes), }, } def load_execution_log_view( *, log_dir: str | Path, job_id: int, execution_id: int ) -> ExecutionLogView: with database.connection_context(): execution = JobExecution.get_or_none(id=execution_id) route = f"/job/{job_id}/execution/{execution_id}/logs" if execution is None or _job_id(cast(Job, execution.job)) != job_id: return ExecutionLogView( job_id=job_id, execution_id=execution_id, title=f"Job {job_id} / execution {execution_id}", description="Plain text log view routed through the app.", status_label="Unavailable", status_tone="failed", log_text="", error_message="Execution does not exist.", ) artifacts = JobArtifacts.for_execution( log_dir=Path(log_dir), job_id=job_id, execution_id=execution_id, ) if not artifacts.log_path.exists(): return ExecutionLogView( job_id=job_id, execution_id=execution_id, title=f"Job {job_id} / execution {execution_id}", description="Plain text log view routed through the app.", status_label=_execution_status_label(execution), status_tone=_execution_status_tone(execution), log_text="", error_message="Log file has not been created yet.", ) return ExecutionLogView( job_id=job_id, execution_id=execution_id, title=f"Job {job_id} / execution {execution_id}", description=f"Route: {route}", status_label=_execution_status_label(execution), status_tone=_execution_status_tone(execution), log_text=artifacts.log_path.read_text(encoding="utf-8"), ) def _job_trigger(job: Job) -> CronTrigger: expression = " ".join( ( str(job.cron_minute), str(job.cron_hour), str(job.cron_day_of_month), str(job.cron_month), str(job.cron_day_of_week), ) ) return CronTrigger.from_crontab(expression, timezone=UTC) def _scheduler_job_id(job_id: int) -> str: return f"{SCHEDULER_JOB_PREFIX}{job_id}" def _project_running_execution( execution: JobExecution, log_dir: Path, reference_time: datetime ) -> dict[str, object]: job = cast(Job, execution.job) job_id = _job_id(job) execution_id = _execution_id(execution) artifacts = JobArtifacts.for_execution( log_dir=log_dir, job_id=job_id, execution_id=execution_id ) started_at = _coerce_datetime( cast(datetime | str, execution.started_at or execution.created_at) ) runtime = reference_time - started_at return { "source": job.source.name, "slug": job.source.slug, "job_id": job_id, "execution_id": execution_id, "started_at": started_at.strftime("%Y-%m-%d %H:%M UTC"), "runtime": f"running for {int(runtime.total_seconds())}s", "status": "Stopping" if execution.stop_requested_at else "Running", "stats": _stats_summary(execution), "worker": ( "graceful stop requested" if execution.stop_requested_at else "streaming stats from worker jsonl" ), "log_href": f"/job/{job_id}/execution/{execution_id}/logs", "log_exists": artifacts.log_path.exists(), "cancel_post_path": f"/actions/executions/{execution_id}/cancel", } def _project_upcoming_job( job: Job, running_execution: JobExecution | None, reference_time: datetime ) -> dict[str, object]: job_id = _job_id(job) trigger = _job_trigger(job) next_run = ( trigger.get_next_fire_time(None, reference_time) if job.enabled and running_execution is None else None ) return { "source": job.source.name, "slug": job.source.slug, "job_id": job_id, "next_run": ( _humanize_relative_time(reference_time, next_run) if next_run is not None else ("Running now" if running_execution is not None else "Not scheduled") ), "next_run_at": next_run.isoformat() if next_run is not None else None, "schedule": " ".join( ( str(job.cron_minute), str(job.cron_hour), str(job.cron_day_of_month), str(job.cron_month), str(job.cron_day_of_week), ) ), "enabled_label": "Enabled" if job.enabled else "Disabled", "enabled_tone": "scheduled" if job.enabled else "idle", "run_disabled": running_execution is not None, "run_reason": "Already running" if running_execution is not None else "Ready", "toggle_label": "Disable" if job.enabled else "Enable", "toggle_enabled": not job.enabled, "run_post_path": f"/actions/jobs/{job_id}/run-now", "toggle_post_path": f"/actions/jobs/{job_id}/toggle-enabled", "delete_post_path": f"/actions/jobs/{job_id}/delete", } def _project_completed_execution( execution: JobExecution, log_dir: Path, reference_time: datetime ) -> dict[str, object]: job = cast(Job, execution.job) job_id = _job_id(job) execution_id = _execution_id(execution) artifacts = JobArtifacts.for_execution( log_dir=log_dir, job_id=job_id, execution_id=execution_id ) ended_at = ( _coerce_datetime(cast(datetime | str, execution.ended_at)) if execution.ended_at is not None else None ) return { "source": job.source.name, "slug": job.source.slug, "job_id": job_id, "execution_id": execution_id, "ended_at": ( _humanize_relative_time(reference_time, ended_at) if ended_at is not None else "Pending" ), "ended_at_iso": ended_at.isoformat() if ended_at is not None else None, "status": _execution_status_label(execution), "status_tone": _execution_status_tone(execution), "stats": _stats_summary(execution), "summary": ( "Canceled by operator" if execution.running_status == JobExecutionStatus.CANCELED else ( "Worker exited successfully" if execution.running_status == JobExecutionStatus.SUCCEEDED else "Worker exited with failure" ) ), "log_href": f"/job/{job_id}/execution/{execution_id}/logs", "log_exists": artifacts.log_path.exists(), } def _project_source_feed( source: Source, output_dir: Path, reference_time: datetime ) -> dict[str, object]: source_slug = str(source.slug) source_dir = feed_output_dir(out_dir=output_dir, feed_slug=source_slug) feed_path = feed_output_path(out_dir=output_dir, feed_slug=source_slug) feed_exists = feed_path.exists() updated_at = ( datetime.fromtimestamp(feed_path.stat().st_mtime, tz=UTC) if feed_exists else None ) return { "source": source.name, "slug": source_slug, "feed_href": f"/feeds/{source_slug}/feed.rss", "feed_status_label": "Available" if feed_exists else "Missing", "feed_status_tone": "done" if feed_exists else "failed", "feed_exists": feed_exists, "last_updated": ( _humanize_relative_time(reference_time, updated_at) if updated_at is not None else "Never published" ), "last_updated_iso": updated_at.isoformat() if updated_at is not None else None, "artifact_footprint": _format_bytes(_directory_size(source_dir)), } def _execution_status_label(execution: JobExecution) -> str: status = JobExecutionStatus(execution.running_status) return { JobExecutionStatus.PENDING: "Pending", JobExecutionStatus.RUNNING: ( "Stopping" if execution.stop_requested_at else "Running" ), JobExecutionStatus.SUCCEEDED: "Succeeded", JobExecutionStatus.FAILED: "Failed", JobExecutionStatus.CANCELED: "Canceled", }[status] def _execution_status_tone(execution: JobExecution) -> str: status = JobExecutionStatus(execution.running_status) return { JobExecutionStatus.PENDING: "idle", JobExecutionStatus.RUNNING: "running", JobExecutionStatus.SUCCEEDED: "done", JobExecutionStatus.FAILED: "failed", JobExecutionStatus.CANCELED: "idle", }[status] def _stats_summary(execution: JobExecution) -> str: bytes_count = cast(int, execution.bytes_count) return ( f"{execution.requests_count} requests" f" • {execution.items_count} items" f" • {_format_summary_bytes(bytes_count)}" ) def _final_status(*, execution: JobExecution, returncode: int) -> JobExecutionStatus: if execution.stop_requested_at is not None: return JobExecutionStatus.CANCELED if returncode == 0: return JobExecutionStatus.SUCCEEDED return JobExecutionStatus.FAILED def _coerce_datetime(value: datetime | str) -> datetime: if isinstance(value, datetime): if value.tzinfo is None: return value.replace(tzinfo=UTC) return value.astimezone(UTC) parsed = datetime.fromisoformat(value) if parsed.tzinfo is None: return parsed.replace(tzinfo=UTC) return parsed.astimezone(UTC) def _job_id(job: Job) -> int: return int(job.get_id()) def _execution_id(execution: JobExecution) -> int: return int(execution.get_id()) def _directory_size(path: Path) -> int: if not path.exists(): return 0 return sum(entry.stat().st_size for entry in path.rglob("*") if entry.is_file()) def _format_bytes(value: int) -> str: if value < 1024: return f"{value} B" if value < 1024 * 1024: return f"{value / 1024:.1f} KB" if value < 1024 * 1024 * 1024: return f"{value / (1024 * 1024):.1f} MB" return f"{value / (1024 * 1024 * 1024):.1f} GB" def _format_summary_bytes(value: int) -> str: if value == 1: return "1 byte" if value < 1024: return f"{value} bytes" if value < 1024 * 1024: return f"{value / 1024:.1f} KiB" if value < 1024 * 1024 * 1024: return f"{value / (1024 * 1024):.1f} MiB" return f"{value / (1024 * 1024 * 1024):.1f} GiB" def _humanize_relative_time(reference_time: datetime, target_time: datetime) -> str: delta_seconds = int(round((target_time - reference_time).total_seconds())) if delta_seconds == 0: return "now" absolute_delta_seconds = abs(delta_seconds) units = ( ("day", 24 * 60 * 60), ("hour", 60 * 60), ("minute", 60), ) for label, size in units: if absolute_delta_seconds >= size: count = max(1, round(absolute_delta_seconds / size)) suffix = "" if count == 1 else "s" if delta_seconds > 0: return f"in {count} {label}{suffix}" return f"{count} {label}{suffix} ago" if delta_seconds > 0: return f"in {absolute_delta_seconds} seconds" return f"{absolute_delta_seconds} seconds ago"