diff --git a/repub/datastar.py b/repub/datastar.py index 2ae30a8..d11efe5 100644 --- a/repub/datastar.py +++ b/repub/datastar.py @@ -19,27 +19,31 @@ RenderFunction = Callable[[], Awaitable[RenderResult]] class RefreshBroker: def __init__(self) -> None: - self._subscribers: set[asyncio.Queue[object]] = set() + self._subscribers: dict[asyncio.Queue[object], asyncio.AbstractEventLoop] = {} def subscribe(self) -> asyncio.Queue[object]: queue: asyncio.Queue[object] = asyncio.Queue(maxsize=1) - self._subscribers.add(queue) + self._subscribers[queue] = asyncio.get_running_loop() return queue def unsubscribe(self, queue: asyncio.Queue[object]) -> None: - self._subscribers.discard(queue) + self._subscribers.pop(queue, None) def publish(self, event: object = "refresh-event") -> None: - for queue in tuple(self._subscribers): - if queue.full(): - try: - queue.get_nowait() - except asyncio.QueueEmpty: - pass - try: - queue.put_nowait(event) - except asyncio.QueueFull: - continue + for queue, loop in tuple(self._subscribers.items()): + loop.call_soon_threadsafe(_publish_event, queue, event) + + +def _publish_event(queue: asyncio.Queue[object], event: object) -> None: + if queue.full(): + try: + queue.get_nowait() + except asyncio.QueueEmpty: + pass + try: + queue.put_nowait(event) + except asyncio.QueueFull: + return async def render_sse_event( diff --git a/repub/job_runner.py b/repub/job_runner.py new file mode 100644 index 0000000..61abacb --- /dev/null +++ b/repub/job_runner.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +import argparse +import json +import random +import signal +import sys +import time +from datetime import UTC, datetime +from pathlib import Path + + +def parse_args(argv: list[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Simulated republisher worker") + parser.add_argument("--job-id", type=int, required=True) + parser.add_argument("--execution-id", type=int, required=True) + parser.add_argument("--stats-path", required=True) + parser.add_argument("--duration-seconds", type=float, required=True) + parser.add_argument("--interval-seconds", type=float, required=True) + parser.add_argument("--failure-probability", type=float, required=True) + return parser.parse_args(argv) + + +def main(argv: list[str] | None = None) -> int: + args = parse_args(argv) + rng = random.Random(f"{args.job_id}:{args.execution_id}") + stats_path = Path(args.stats_path) + stats_path.parent.mkdir(parents=True, exist_ok=True) + stop_requested = False + + def request_stop(signum: int, frame: object | None) -> None: + del signum, frame + nonlocal stop_requested + if stop_requested: + return + stop_requested = True + print( + f"worker[{args.job_id}:{args.execution_id}]: graceful stop requested", + flush=True, + ) + + signal.signal(signal.SIGTERM, request_stop) + signal.signal(signal.SIGINT, request_stop) + + counters = { + "requests_count": 0, + "items_count": 0, + "warnings_count": 0, + "errors_count": 0, + "bytes_count": 0, + "retries_count": 0, + "exceptions_count": 0, + "cache_size_count": 0, + "cache_object_count": 0, + } + + print( + f"worker[{args.job_id}:{args.execution_id}]: starting simulated crawl", + flush=True, + ) + started = time.monotonic() + iteration = 0 + with stats_path.open("a", encoding="utf-8") as stats_file: + while time.monotonic() - started < args.duration_seconds: + time.sleep(args.interval_seconds) + iteration += 1 + counters["requests_count"] += rng.randint(1, 5) + counters["items_count"] += rng.randint(0, 2) + counters["bytes_count"] += rng.randint(500, 3000) + counters["cache_size_count"] += rng.randint(0, 1) + counters["cache_object_count"] += rng.randint(0, 2) + if rng.random() < 0.1: + counters["warnings_count"] += 1 + if rng.random() < 0.05: + counters["retries_count"] += 1 + + snapshot = { + "timestamp": datetime.now(UTC).isoformat(), + "iteration": iteration, + **counters, + } + stats_file.write(json.dumps(snapshot, sort_keys=True) + "\n") + stats_file.flush() + print( + "stats: " + f"requests={counters['requests_count']} " + f"items={counters['items_count']} " + f"bytes={counters['bytes_count']}", + flush=True, + ) + if stop_requested: + print( + f"worker[{args.job_id}:{args.execution_id}]: stopping after graceful request", + flush=True, + ) + return 130 + + if rng.random() < args.failure_probability: + counters["errors_count"] += 1 + counters["exceptions_count"] += 1 + stats_file.write( + json.dumps( + {"timestamp": datetime.now(UTC).isoformat(), **counters}, + sort_keys=True, + ) + + "\n" + ) + stats_file.flush() + print( + f"worker[{args.job_id}:{args.execution_id}]: simulated failure", + flush=True, + ) + return 1 + + print( + f"worker[{args.job_id}:{args.execution_id}]: completed successfully", + flush=True, + ) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/repub/jobs.py b/repub/jobs.py new file mode 100644 index 0000000..6e306af --- /dev/null +++ b/repub/jobs.py @@ -0,0 +1,643 @@ +from __future__ import annotations + +import json +import subprocess +import sys +from dataclasses import dataclass +from datetime import UTC, datetime, timedelta +from pathlib import Path +from typing import Callable, TextIO, cast + +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger + +from repub.model import Job, JobExecution, JobExecutionStatus, Source, database, utc_now + +SCHEDULER_JOB_PREFIX = "job-" +POLL_JOB_ID = "runtime-poll-workers" +SYNC_JOB_ID = "runtime-sync-jobs" + + +@dataclass(frozen=True) +class JobArtifacts: + log_path: Path + stats_path: Path + + @classmethod + def for_execution( + cls, *, log_dir: Path, job_id: int, execution_id: int + ) -> "JobArtifacts": + prefix = f"job-{job_id}-execution-{execution_id}" + return cls( + log_path=log_dir / f"{prefix}.log", + stats_path=log_dir / f"{prefix}.jsonl", + ) + + +@dataclass +class RunningWorker: + execution_id: int + process: subprocess.Popen[str] + log_handle: TextIO + artifacts: JobArtifacts + stats_offset: int = 0 + + +@dataclass(frozen=True) +class ExecutionLogView: + job_id: int + execution_id: int + title: str + description: str + status_label: str + status_tone: str + log_text: str + error_message: str | None = None + + +class JobRuntime: + def __init__( + self, + *, + log_dir: str | Path, + worker_duration_seconds: float = 20.0, + worker_stats_interval_seconds: float = 1.0, + worker_failure_probability: float = 0.3, + refresh_callback: Callable[[], None] | None = None, + graceful_stop_seconds: float = 15.0, + ) -> None: + self.log_dir = Path(log_dir) + self.worker_duration_seconds = worker_duration_seconds + self.worker_stats_interval_seconds = worker_stats_interval_seconds + self.worker_failure_probability = worker_failure_probability + self.refresh_callback = refresh_callback + self.graceful_stop_seconds = graceful_stop_seconds + self.scheduler = BackgroundScheduler(timezone=UTC) + self._workers: dict[int, RunningWorker] = {} + self._started = False + + def start(self) -> None: + if self._started: + return + + self.scheduler.start() + self.scheduler.add_job( + self.poll_workers, + "interval", + id=POLL_JOB_ID, + seconds=0.25, + replace_existing=True, + max_instances=1, + coalesce=True, + ) + self.scheduler.add_job( + self.sync_jobs, + "interval", + id=SYNC_JOB_ID, + seconds=1, + replace_existing=True, + max_instances=1, + coalesce=True, + ) + self.sync_jobs() + self._started = True + + def shutdown(self) -> None: + for execution_id in tuple(self._workers): + worker = self._workers.pop(execution_id) + if worker.process.poll() is None: + worker.process.kill() + worker.process.wait(timeout=2) + worker.log_handle.close() + + if self._started: + self.scheduler.shutdown(wait=False) + self._started = False + + def sync_jobs(self) -> None: + with database.connection_context(): + jobs = tuple(Job.select().where(Job.enabled == True)) # noqa: E712 + + desired_ids = set() + for job in jobs: + scheduler_job_id = _scheduler_job_id(_job_id(job)) + desired_ids.add(scheduler_job_id) + self.scheduler.add_job( + self.run_scheduled_job, + trigger=_job_trigger(job), + args=(_job_id(job),), + id=scheduler_job_id, + replace_existing=True, + max_instances=1, + coalesce=True, + misfire_grace_time=1, + ) + + for scheduled_job in tuple(self.scheduler.get_jobs()): + if ( + scheduled_job.id.startswith(SCHEDULER_JOB_PREFIX) + and scheduled_job.id not in desired_ids + ): + self.scheduler.remove_job(scheduled_job.id) + + def run_scheduled_job(self, job_id: int) -> None: + self.run_job_now(job_id, reason="scheduled") + + def run_job_now(self, job_id: int, *, reason: str) -> int | None: + del reason + self.start() + with database.connection_context(): + job = Job.get_or_none(id=job_id) + if job is None: + return None + + already_running = ( + JobExecution.select() + .where( + (JobExecution.job == job) + & (JobExecution.running_status == JobExecutionStatus.RUNNING) + ) + .exists() + ) + if already_running: + return None + + execution = JobExecution.create( + job=job, + started_at=utc_now(), + running_status=JobExecutionStatus.RUNNING, + ) + execution_id = _execution_id(execution) + + artifacts = JobArtifacts.for_execution( + log_dir=self.log_dir, job_id=job_id, execution_id=execution_id + ) + artifacts.log_path.parent.mkdir(parents=True, exist_ok=True) + log_handle = artifacts.log_path.open("a", encoding="utf-8", buffering=1) + log_handle.write( + f"scheduler: starting execution {execution_id} for job {job_id}\n" + ) + process = subprocess.Popen( + [ + sys.executable, + "-u", + "-m", + "repub.job_runner", + "--job-id", + str(job_id), + "--execution-id", + str(execution_id), + "--stats-path", + str(artifacts.stats_path), + "--duration-seconds", + str(self.worker_duration_seconds), + "--interval-seconds", + str(self.worker_stats_interval_seconds), + "--failure-probability", + str(self.worker_failure_probability), + ], + stdout=log_handle, + stderr=subprocess.STDOUT, + text=True, + ) + self._workers[execution_id] = RunningWorker( + execution_id=execution_id, + process=process, + log_handle=log_handle, + artifacts=artifacts, + ) + self._trigger_refresh() + return execution_id + + def request_execution_cancel(self, execution_id: int) -> bool: + with database.connection_context(): + execution = JobExecution.get_or_none(id=execution_id) + if execution is None: + return False + if execution.running_status != JobExecutionStatus.RUNNING: + return False + if execution.stop_requested_at is None: + execution.stop_requested_at = utc_now() + execution.save() + + worker = self._workers.get(execution_id) + if worker is not None and worker.process.poll() is None: + worker.log_handle.write( + f"scheduler: graceful stop requested for execution {execution_id}\n" + ) + worker.process.terminate() + + self._trigger_refresh() + return True + + def set_job_enabled(self, job_id: int, *, enabled: bool) -> bool: + with database.connection_context(): + job = Job.get_or_none(id=job_id) + if job is None: + return False + job.enabled = enabled + job.save() + self.sync_jobs() + self._trigger_refresh() + return True + + def poll_workers(self) -> None: + for execution_id in tuple(self._workers): + worker = self._workers[execution_id] + self._apply_stats(worker) + self._enforce_graceful_stop(worker) + returncode = worker.process.poll() + if returncode is None: + continue + + self._apply_stats(worker) + with database.connection_context(): + execution = JobExecution.get_by_id(execution_id) + execution.ended_at = utc_now() + execution.running_status = _final_status( + execution=execution, + returncode=returncode, + ) + execution.save() + + worker.log_handle.close() + del self._workers[execution_id] + self._trigger_refresh() + + def _apply_stats(self, worker: RunningWorker) -> None: + if not worker.artifacts.stats_path.exists(): + return + + with worker.artifacts.stats_path.open("r", encoding="utf-8") as handle: + handle.seek(worker.stats_offset) + payload = handle.read() + worker.stats_offset = handle.tell() + + lines = [line for line in payload.splitlines() if line.strip()] + if not lines: + return + + stats = json.loads(lines[-1]) + with database.connection_context(): + execution = JobExecution.get_by_id(worker.execution_id) + execution.requests_count = int(stats.get("requests_count", 0)) + execution.items_count = int(stats.get("items_count", 0)) + execution.warnings_count = int(stats.get("warnings_count", 0)) + execution.errors_count = int(stats.get("errors_count", 0)) + execution.bytes_count = int(stats.get("bytes_count", 0)) + execution.retries_count = int(stats.get("retries_count", 0)) + execution.exceptions_count = int(stats.get("exceptions_count", 0)) + execution.cache_size_count = int(stats.get("cache_size_count", 0)) + execution.cache_object_count = int(stats.get("cache_object_count", 0)) + execution.raw_stats = json.dumps(stats, sort_keys=True) + execution.save() + + self._trigger_refresh() + + def _enforce_graceful_stop(self, worker: RunningWorker) -> None: + with database.connection_context(): + execution = JobExecution.get_by_id(worker.execution_id) + if execution.stop_requested_at is None: + return + elapsed = utc_now() - _coerce_datetime(execution.stop_requested_at) + + if ( + elapsed >= timedelta(seconds=self.graceful_stop_seconds) + and worker.process.poll() is None + ): + worker.process.kill() + + def _trigger_refresh(self) -> None: + if self.refresh_callback is not None: + self.refresh_callback() + + +def load_runs_view( + *, log_dir: str | Path, now: datetime | None = None +) -> dict[str, tuple[dict[str, object], ...]]: + reference_time = now or datetime.now(UTC) + resolved_log_dir = Path(log_dir) + with database.connection_context(): + jobs = tuple(Job.select(Job, Source).join(Source).order_by(Source.name.asc())) + running_executions = tuple( + JobExecution.select(JobExecution, Job, Source) + .join(Job) + .join(Source) + .where(JobExecution.running_status == JobExecutionStatus.RUNNING) + .order_by(JobExecution.started_at.desc()) + ) + completed_executions = tuple( + JobExecution.select(JobExecution, Job, Source) + .join(Job) + .join(Source) + .where( + JobExecution.running_status.in_( + ( + JobExecutionStatus.SUCCEEDED, + JobExecutionStatus.FAILED, + JobExecutionStatus.CANCELED, + ) + ) + ) + .order_by(JobExecution.ended_at.desc()) + .limit(20) + ) + + running_by_job = { + _job_id(execution.job): execution for execution in running_executions + } + return { + "running": tuple( + _project_running_execution(execution, resolved_log_dir, reference_time) + for execution in running_executions + ), + "upcoming": tuple( + _project_upcoming_job(job, running_by_job.get(job.id), reference_time) + for job in jobs + ), + "completed": tuple( + _project_completed_execution(execution, resolved_log_dir) + for execution in completed_executions + ), + } + + +def load_dashboard_view( + *, log_dir: str | Path, now: datetime | None = None +) -> dict[str, object]: + reference_time = now or datetime.now(UTC) + runs_view = load_runs_view(log_dir=log_dir, now=reference_time) + with database.connection_context(): + failed_last_day = ( + JobExecution.select() + .where( + (JobExecution.running_status == JobExecutionStatus.FAILED) + & (JobExecution.ended_at.is_null(False)) + ) + .count() + ) + + upcoming_ready = sum( + 1 for job in runs_view["upcoming"] if str(job["run_reason"]) == "Ready" + ) + footprint_bytes = _directory_size(Path(log_dir)) + return { + "running": runs_view["running"], + "snapshot": { + "running_now": str(len(runs_view["running"])), + "upcoming_today": str(upcoming_ready), + "failures_24h": str(failed_last_day), + "artifact_footprint": _format_bytes(footprint_bytes), + }, + } + + +def load_execution_log_view( + *, log_dir: str | Path, job_id: int, execution_id: int +) -> ExecutionLogView: + with database.connection_context(): + execution = JobExecution.get_or_none(id=execution_id) + + route = f"/job/{job_id}/execution/{execution_id}/logs" + if execution is None or _job_id(cast(Job, execution.job)) != job_id: + return ExecutionLogView( + job_id=job_id, + execution_id=execution_id, + title=f"Job {job_id} / execution {execution_id}", + description="Plain text log view routed through the app.", + status_label="Unavailable", + status_tone="failed", + log_text="", + error_message="Execution does not exist.", + ) + + artifacts = JobArtifacts.for_execution( + log_dir=Path(log_dir), + job_id=job_id, + execution_id=execution_id, + ) + if not artifacts.log_path.exists(): + return ExecutionLogView( + job_id=job_id, + execution_id=execution_id, + title=f"Job {job_id} / execution {execution_id}", + description="Plain text log view routed through the app.", + status_label=_execution_status_label(execution), + status_tone=_execution_status_tone(execution), + log_text="", + error_message="Log file has not been created yet.", + ) + + return ExecutionLogView( + job_id=job_id, + execution_id=execution_id, + title=f"Job {job_id} / execution {execution_id}", + description=f"Route: {route}", + status_label=_execution_status_label(execution), + status_tone=_execution_status_tone(execution), + log_text=artifacts.log_path.read_text(encoding="utf-8"), + ) + + +def _job_trigger(job: Job) -> CronTrigger: + expression = " ".join( + ( + str(job.cron_minute), + str(job.cron_hour), + str(job.cron_day_of_month), + str(job.cron_month), + str(job.cron_day_of_week), + ) + ) + return CronTrigger.from_crontab(expression, timezone=UTC) + + +def _scheduler_job_id(job_id: int) -> str: + return f"{SCHEDULER_JOB_PREFIX}{job_id}" + + +def _project_running_execution( + execution: JobExecution, log_dir: Path, reference_time: datetime +) -> dict[str, object]: + job = cast(Job, execution.job) + job_id = _job_id(job) + execution_id = _execution_id(execution) + artifacts = JobArtifacts.for_execution( + log_dir=log_dir, job_id=job_id, execution_id=execution_id + ) + started_at = _coerce_datetime( + cast(datetime | str, execution.started_at or execution.created_at) + ) + runtime = reference_time - started_at + return { + "source": job.source.name, + "slug": job.source.slug, + "job_id": job_id, + "execution_id": execution_id, + "started_at": started_at.strftime("%Y-%m-%d %H:%M UTC"), + "runtime": f"running for {int(runtime.total_seconds())}s", + "status": "Stopping" if execution.stop_requested_at else "Running", + "stats": _stats_summary(execution), + "worker": ( + "graceful stop requested" + if execution.stop_requested_at + else "streaming stats from worker jsonl" + ), + "log_href": f"/job/{job_id}/execution/{execution_id}/logs", + "log_exists": artifacts.log_path.exists(), + "cancel_post_path": f"/actions/executions/{execution_id}/cancel", + } + + +def _project_upcoming_job( + job: Job, running_execution: JobExecution | None, reference_time: datetime +) -> dict[str, object]: + job_id = _job_id(job) + trigger = _job_trigger(job) + next_run = ( + trigger.get_next_fire_time(None, reference_time) + if job.enabled and running_execution is None + else None + ) + return { + "source": job.source.name, + "slug": job.source.slug, + "job_id": job_id, + "next_run": ( + next_run.strftime("%Y-%m-%d %H:%M UTC") + if next_run is not None + else ("Running now" if running_execution is not None else "Not scheduled") + ), + "schedule": " ".join( + ( + str(job.cron_minute), + str(job.cron_hour), + str(job.cron_day_of_month), + str(job.cron_month), + str(job.cron_day_of_week), + ) + ), + "enabled_label": "Enabled" if job.enabled else "Disabled", + "enabled_tone": "scheduled" if job.enabled else "idle", + "run_disabled": running_execution is not None, + "run_reason": "Already running" if running_execution is not None else "Ready", + "toggle_label": "Disable" if job.enabled else "Enable", + "toggle_enabled": not job.enabled, + "run_post_path": f"/actions/jobs/{job_id}/run-now", + "toggle_post_path": f"/actions/jobs/{job_id}/toggle-enabled", + "delete_post_path": f"/actions/jobs/{job_id}/delete", + } + + +def _project_completed_execution( + execution: JobExecution, log_dir: Path +) -> dict[str, object]: + job = cast(Job, execution.job) + job_id = _job_id(job) + execution_id = _execution_id(execution) + artifacts = JobArtifacts.for_execution( + log_dir=log_dir, job_id=job_id, execution_id=execution_id + ) + return { + "source": job.source.name, + "slug": job.source.slug, + "job_id": job_id, + "execution_id": execution_id, + "ended_at": ( + _coerce_datetime(cast(datetime | str, execution.ended_at)).strftime( + "%Y-%m-%d %H:%M UTC" + ) + if execution.ended_at is not None + else "Pending" + ), + "status": _execution_status_label(execution), + "status_tone": _execution_status_tone(execution), + "stats": _stats_summary(execution), + "summary": ( + "Canceled by operator" + if execution.running_status == JobExecutionStatus.CANCELED + else ( + "Worker exited successfully" + if execution.running_status == JobExecutionStatus.SUCCEEDED + else "Worker exited with failure" + ) + ), + "log_href": f"/job/{job_id}/execution/{execution_id}/logs", + "log_exists": artifacts.log_path.exists(), + } + + +def _execution_status_label(execution: JobExecution) -> str: + status = JobExecutionStatus(execution.running_status) + return { + JobExecutionStatus.PENDING: "Pending", + JobExecutionStatus.RUNNING: ( + "Stopping" if execution.stop_requested_at else "Running" + ), + JobExecutionStatus.SUCCEEDED: "Succeeded", + JobExecutionStatus.FAILED: "Failed", + JobExecutionStatus.CANCELED: "Canceled", + }[status] + + +def _execution_status_tone(execution: JobExecution) -> str: + status = JobExecutionStatus(execution.running_status) + return { + JobExecutionStatus.PENDING: "idle", + JobExecutionStatus.RUNNING: "running", + JobExecutionStatus.SUCCEEDED: "done", + JobExecutionStatus.FAILED: "failed", + JobExecutionStatus.CANCELED: "idle", + }[status] + + +def _stats_summary(execution: JobExecution) -> str: + return ( + f"{execution.requests_count} requests" + f" • {execution.items_count} items" + f" • {execution.bytes_count} bytes" + ) + + +def _final_status(*, execution: JobExecution, returncode: int) -> JobExecutionStatus: + if execution.stop_requested_at is not None: + return JobExecutionStatus.CANCELED + if returncode == 0: + return JobExecutionStatus.SUCCEEDED + return JobExecutionStatus.FAILED + + +def _coerce_datetime(value: datetime | str) -> datetime: + if isinstance(value, datetime): + if value.tzinfo is None: + return value.replace(tzinfo=UTC) + return value.astimezone(UTC) + + parsed = datetime.fromisoformat(value) + if parsed.tzinfo is None: + return parsed.replace(tzinfo=UTC) + return parsed.astimezone(UTC) + + +def _job_id(job: Job) -> int: + return int(job.get_id()) + + +def _execution_id(execution: JobExecution) -> int: + return int(execution.get_id()) + + +def _directory_size(path: Path) -> int: + if not path.exists(): + return 0 + return sum(entry.stat().st_size for entry in path.rglob("*") if entry.is_file()) + + +def _format_bytes(value: int) -> str: + if value < 1024: + return f"{value} B" + if value < 1024 * 1024: + return f"{value / 1024:.1f} KB" + if value < 1024 * 1024 * 1024: + return f"{value / (1024 * 1024):.1f} MB" + return f"{value / (1024 * 1024 * 1024):.1f} GB" diff --git a/repub/model.py b/repub/model.py index de62329..41d31d6 100644 --- a/repub/model.py +++ b/repub/model.py @@ -295,6 +295,16 @@ def update_source( return source +def delete_job_source(job_id: int) -> bool: + with database.connection_context(): + with database.atomic(): + job = Job.get_or_none(id=job_id) + if job is None: + return False + source = Source.get_by_id(job.source_id) + return source.delete_instance() > 0 + + def load_sources() -> tuple[dict[str, object], ...]: with database.connection_context(): sources = tuple(Source.select().order_by(Source.created_at.desc())) @@ -416,6 +426,7 @@ class JobExecution(BaseModel): created_at = DateTimeField(default=utc_now) started_at = DateTimeField(null=True) ended_at = DateTimeField(null=True) + stop_requested_at = DateTimeField(null=True) running_status = IntegerField( default=JobExecutionStatus.PENDING, constraints=[Check("running_status BETWEEN 0 AND 4")], diff --git a/repub/pages/__init__.py b/repub/pages/__init__.py index 55612a1..bc914b7 100644 --- a/repub/pages/__init__.py +++ b/repub/pages/__init__.py @@ -1,4 +1,4 @@ -from repub.pages.dashboard import dashboard_page +from repub.pages.dashboard import dashboard_page, dashboard_page_with_data from repub.pages.runs import execution_logs_page, runs_page from repub.pages.shim import shim_page from repub.pages.sources import create_source_page, edit_source_page, sources_page @@ -6,6 +6,7 @@ from repub.pages.sources import create_source_page, edit_source_page, sources_pa __all__ = [ "create_source_page", "dashboard_page", + "dashboard_page_with_data", "edit_source_page", "execution_logs_page", "runs_page", diff --git a/repub/pages/dashboard.py b/repub/pages/dashboard.py index 1267fba..67b93a8 100644 --- a/repub/pages/dashboard.py +++ b/repub/pages/dashboard.py @@ -1,5 +1,7 @@ from __future__ import annotations +from collections.abc import Mapping + import htpy as h from htpy import Node, Renderable @@ -12,36 +14,43 @@ from repub.components import ( stat_card, status_badge, ) -from repub.pages.runs import RUNNING_EXECUTIONS -def _running_execution_row(execution: dict[str, str | bool]) -> tuple[Node, ...]: - status_tone = "running" if execution["is_running"] else "done" +def _text(values: Mapping[str, object], key: str) -> str: + return str(values[key]) + + +def _running_execution_row(execution: Mapping[str, object]) -> tuple[Node, ...]: + status_tone = "running" if _text(execution, "status") != "Succeeded" else "done" return ( h.div[ - h.div(class_="font-semibold text-slate-950")[execution["source"]], + h.div(class_="font-semibold text-slate-950")[_text(execution, "source")], h.p(class_="mt-0.5 font-mono text-[11px] text-slate-500")[ - execution["slug"] + _text(execution, "slug") ], ], h.div[ - h.p(class_="font-medium text-slate-900")[f"#{execution['execution_id']}"], + h.p(class_="font-medium text-slate-900")[ + f"#{_text(execution, 'execution_id')}" + ], h.p(class_="mt-0.5 text-[11px] text-slate-500")[ - f"job {execution['job_id']}" + f"job {_text(execution, 'job_id')}" ], ], h.div[ - h.p(class_="font-medium text-slate-900")[execution["started_at"]], - h.p(class_="mt-0.5 text-[11px] text-slate-500")[execution["runtime"]], + h.p(class_="font-medium text-slate-900")[_text(execution, "started_at")], + h.p(class_="mt-0.5 text-[11px] text-slate-500")[ + _text(execution, "runtime") + ], ], - status_badge(label=str(execution["status"]), tone=status_tone), + status_badge(label=_text(execution, "status"), tone=status_tone), h.div(class_="min-w-56 whitespace-normal")[ - h.p(class_="font-medium text-slate-900")[execution["stats"]], - h.p(class_="mt-0.5 text-[11px] text-slate-500")[execution["worker"]], + h.p(class_="font-medium text-slate-900")[_text(execution, "stats")], + h.p(class_="mt-0.5 text-[11px] text-slate-500")[_text(execution, "worker")], ], h.div(class_="flex flex-nowrap items-center gap-3")[ inline_link( - href=str(execution["log_href"]), + href=_text(execution, "log_href"), label="View log", tone="amber", ), @@ -71,7 +80,13 @@ def dashboard_header() -> Renderable: ] -def operational_snapshot() -> Renderable: +def operational_snapshot(*, snapshot: Mapping[str, str] | None = None) -> Renderable: + values = snapshot or { + "running_now": "0", + "upcoming_today": "0", + "failures_24h": "0", + "artifact_footprint": "0 B", + } return h.section[ h.div(class_="mb-3 flex items-end justify-between gap-4")[ h.div[ @@ -82,37 +97,39 @@ def operational_snapshot() -> Renderable: "Operational snapshot" ], ], - h.p(class_="text-xs text-slate-500")[ - "Static fixture data shaped around the intended operator dashboard" - ], + h.p(class_="text-xs text-slate-500")["Live values from the database"], ], h.dl(class_="grid gap-3 md:grid-cols-2 xl:grid-cols-4")[ stat_card( label="Running now", - value="3", - detail="Two feed workers and one Pangea worker are active.", + value=values["running_now"], + detail="Currently active job executions.", ), stat_card( label="Upcoming today", - value="11", - detail="Next scheduled job fires in 13 minutes.", + value=values["upcoming_today"], + detail="Enabled jobs that are ready for their next run.", ), stat_card( label="Failures in 24h", - value="2", - detail="One network timeout and one source parsing error.", + value=values["failures_24h"], + detail="Recent failed executions recorded by the scheduler.", ), stat_card( - label="Output footprint", - value="18.4 GB", - detail="Mirrored feeds, media, logs, and execution stats.", + label="Artifact footprint", + value=values["artifact_footprint"], + detail="Current log and stats artifact size under out/logs.", ), ], ] -def running_executions_table() -> Renderable: - rows = tuple(_running_execution_row(execution) for execution in RUNNING_EXECUTIONS) +def running_executions_table( + *, running_executions: tuple[Mapping[str, object], ...] | None = None +) -> Renderable: + rows = tuple( + _running_execution_row(execution) for execution in (running_executions or ()) + ) headers = ("Source", "Execution", "Started", "Status", "Stats", "Actions") def render_row(row: tuple[Node, ...]) -> Renderable: @@ -172,6 +189,14 @@ def running_executions_table() -> Renderable: def dashboard_page() -> Renderable: + return dashboard_page_with_data() + + +def dashboard_page_with_data( + *, + snapshot: Mapping[str, str] | None = None, + running_executions: tuple[Mapping[str, object], ...] | None = None, +) -> Renderable: return h.main( id="morph", class_="min-h-screen lg:grid lg:grid-cols-[18rem_minmax(0,1fr)]", @@ -180,8 +205,8 @@ def dashboard_page() -> Renderable: h.div(class_="px-4 py-4 sm:px-5 lg:px-6 lg:py-5")[ h.div(class_="mx-auto max-w-7xl space-y-5")[ dashboard_header(), - operational_snapshot(), - running_executions_table(), + operational_snapshot(snapshot=snapshot), + running_executions_table(running_executions=running_executions), ] ], ] diff --git a/repub/pages/runs.py b/repub/pages/runs.py index f2a70c0..c76f5c0 100644 --- a/repub/pages/runs.py +++ b/repub/pages/runs.py @@ -1,10 +1,11 @@ from __future__ import annotations +from collections.abc import Mapping + import htpy as h from htpy import Node, Renderable from repub.components import ( - inline_button, inline_link, muted_action_link, page_shell, @@ -13,254 +14,174 @@ from repub.components import ( table_section, ) -RUNNING_EXECUTIONS: tuple[dict[str, str | bool], ...] = ( - { - "source": "Pangea mobile articles", - "slug": "pangea-mobile", - "job_id": "7", - "execution_id": "104", - "started_at": "Today, 11:42 UTC", - "runtime": "running for 8m", - "status": "Running", - "stats": "26 requests • 7 items • 2.6 MB", - "worker": "graceful stop after current item", - "log_href": "/job/7/execution/104/logs", - "is_running": True, - }, - { - "source": "Guardian feed mirror", - "slug": "guardian-feed", - "job_id": "3", - "execution_id": "103", - "started_at": "Today, 11:33 UTC", - "runtime": "running for 17m", - "status": "Running", - "stats": "91 requests • 13 items • 5.1 MB", - "worker": "streaming stats from worker jsonl", - "log_href": "/job/3/execution/103/logs", - "is_running": True, - }, - { - "source": "Podcast enclosure mirror", - "slug": "podcast-audio", - "job_id": "11", - "execution_id": "105", - "started_at": "Today, 11:48 UTC", - "runtime": "running for 2m", - "status": "Stopping", - "stats": "4 requests • 0 items • 0.8 MB", - "worker": "waiting for 15s graceful shutdown window", - "log_href": "/job/11/execution/105/logs", - "is_running": True, - }, -) -UPCOMING_JOBS: tuple[dict[str, str | bool], ...] = ( - { - "source": "Podcast enclosure mirror", - "slug": "podcast-audio", - "job_id": "11", - "next_run": "Today, 12:15 local", - "schedule": "15 */4 * * 1-6", - "enabled_label": "Enabled", - "enabled_tone": "scheduled", - "run_disabled": True, - "run_reason": "Already running", - "toggle_label": "Disable", - }, - { - "source": "Weekly digest feed", - "slug": "weekly-digest", - "job_id": "18", - "next_run": "Tomorrow, 08:00 local", - "schedule": "0 8 * * 1", - "enabled_label": "Disabled", - "enabled_tone": "idle", - "run_disabled": False, - "run_reason": "Ready", - "toggle_label": "Enable", - }, - { - "source": "Kenya health desk", - "slug": "kenya-health", - "job_id": "22", - "next_run": "Today, 13:00 local", - "schedule": "0 */6 * * *", - "enabled_label": "Enabled", - "enabled_tone": "scheduled", - "run_disabled": False, - "run_reason": "Ready", - "toggle_label": "Disable", - }, -) - -COMPLETED_EXECUTIONS: tuple[dict[str, str], ...] = ( - { - "source": "Guardian feed mirror", - "slug": "guardian-feed", - "job_id": "3", - "execution_id": "102", - "ended_at": "Today, 10:57 UTC", - "status": "Succeeded", - "status_tone": "done", - "stats": "204 requests • 28 items • 9.4 MB", - "summary": "Finished on schedule", - "log_href": "/job/3/execution/102/logs", - }, - { - "source": "Podcast enclosure mirror", - "slug": "podcast-audio", - "job_id": "11", - "execution_id": "101", - "ended_at": "Today, 09:12 UTC", - "status": "Failed", - "status_tone": "failed", - "stats": "timeout after 3 retries", - "summary": "Worker exited with failure", - "log_href": "/job/11/execution/101/logs", - }, - { - "source": "Pangea mobile articles", - "slug": "pangea-mobile", - "job_id": "7", - "execution_id": "100", - "ended_at": "Today, 05:48 UTC", - "status": "Canceled", - "status_tone": "idle", - "stats": "stopped by operator after 11m", - "summary": "Graceful stop completed", - "log_href": "/job/7/execution/100/logs", - }, -) +def _action_button( + *, + label: str, + tone: str = "default", + disabled: bool = False, + post_path: str | None = None, +) -> Renderable: + classes = { + "default": "bg-stone-100 text-slate-700 hover:bg-stone-200", + "danger": "bg-rose-50 text-rose-700 hover:bg-rose-100", + } + class_name = ( + "cursor-not-allowed bg-slate-100 text-slate-400" if disabled else classes[tone] + ) + attributes: dict[str, str] = {} + if post_path is not None and not disabled: + attributes["data-on:pointerdown"] = f"@post('{post_path}')" + return h.button( + attributes, + type="button", + disabled=disabled, + class_=( + "inline-flex items-center whitespace-nowrap rounded-full px-3 py-1.5 " + f"text-sm font-semibold transition {class_name}" + ), + )[label] -def _running_row(execution: dict[str, str | bool]) -> tuple[Node, ...]: +def _text(values: Mapping[str, object], key: str) -> str: + return str(values[key]) + + +def _maybe_text(values: Mapping[str, object], key: str) -> str | None: + value = values.get(key) + if value in {None, ""}: + return None + return str(value) + + +def _flag(values: Mapping[str, object], key: str) -> bool: + return bool(values[key]) + + +def _running_row(execution: Mapping[str, object]) -> tuple[Node, ...]: return ( h.div[ - h.div(class_="font-semibold text-slate-950")[execution["source"]], - h.p(class_="mt-1 font-mono text-xs text-slate-500")[execution["slug"]], + h.div(class_="font-semibold text-slate-950")[_text(execution, "source")], + h.p(class_="mt-1 font-mono text-xs text-slate-500")[ + _text(execution, "slug") + ], ], h.div[ - h.p(class_="font-medium text-slate-900")[f"#{execution['execution_id']}"], - h.p(class_="mt-1 text-xs text-slate-500")[f"job {execution['job_id']}"], + h.p(class_="font-medium text-slate-900")[ + f"#{_text(execution, 'execution_id')}" + ], + h.p(class_="mt-1 text-xs text-slate-500")[ + f"job {_text(execution, 'job_id')}" + ], ], h.div[ - h.p(class_="font-medium text-slate-900")[execution["started_at"]], - h.p(class_="mt-1 text-xs text-slate-500")[execution["runtime"]], + h.p(class_="font-medium text-slate-900")[_text(execution, "started_at")], + h.p(class_="mt-1 text-xs text-slate-500")[_text(execution, "runtime")], ], - status_badge(label=str(execution["status"]), tone="running"), + status_badge(label=_text(execution, "status"), tone="running"), h.div(class_="min-w-56 whitespace-normal")[ - h.p(class_="font-medium text-slate-900")[execution["stats"]], - h.p(class_="mt-1 text-xs text-slate-500")[execution["worker"]], + h.p(class_="font-medium text-slate-900")[_text(execution, "stats")], + h.p(class_="mt-1 text-xs text-slate-500")[_text(execution, "worker")], ], h.div(class_="flex flex-nowrap items-center gap-3")[ inline_link( - href=str(execution["log_href"]), + href=_text(execution, "log_href"), label="View log", tone="amber", ), - inline_button(label="Stop", tone="danger"), + _action_button( + label="Stop", + tone="danger", + post_path=_maybe_text(execution, "cancel_post_path"), + ), ], ) -def _upcoming_row(job: dict[str, str | bool]) -> tuple[Node, ...]: +def _upcoming_row(job: Mapping[str, object]) -> tuple[Node, ...]: return ( h.div[ - h.div(class_="font-semibold text-slate-950")[job["source"]], - h.p(class_="mt-1 font-mono text-xs text-slate-500")[job["slug"]], + h.div(class_="font-semibold text-slate-950")[_text(job, "source")], + h.p(class_="mt-1 font-mono text-xs text-slate-500")[_text(job, "slug")], ], h.div[ - h.p(class_="font-medium text-slate-900")[job["next_run"]], - h.p(class_="mt-1 text-xs text-slate-500")[f"job {job['job_id']}"], + h.p(class_="font-medium text-slate-900")[_text(job, "next_run")], + h.p(class_="mt-1 text-xs text-slate-500")[f"job {_text(job, 'job_id')}"], ], - h.p(class_="font-mono text-xs text-slate-600")[job["schedule"]], + h.p(class_="font-mono text-xs text-slate-600")[_text(job, "schedule")], status_badge( - label=str(job["enabled_label"]), - tone=str(job["enabled_tone"]), + label=_text(job, "enabled_label"), + tone=_text(job, "enabled_tone"), ), h.p(class_="max-w-40 whitespace-normal text-sm text-slate-500")[ - job["run_reason"] + _text(job, "run_reason") ], h.div(class_="flex flex-nowrap items-center gap-2")[ - inline_button(label="Run now", disabled=bool(job["run_disabled"])), - inline_button(label=str(job["toggle_label"])), - inline_button(label="Delete", tone="danger"), + _action_button( + label="Run now", + disabled=_flag(job, "run_disabled"), + post_path=_maybe_text(job, "run_post_path"), + ), + _action_button( + label=_text(job, "toggle_label"), + post_path=_maybe_text(job, "toggle_post_path"), + ), + _action_button( + label="Delete", + tone="danger", + post_path=_maybe_text(job, "delete_post_path"), + ), ], ) -def _completed_row(execution: dict[str, str]) -> tuple[Node, ...]: +def _completed_row(execution: Mapping[str, object]) -> tuple[Node, ...]: return ( h.div[ - h.div(class_="font-semibold text-slate-950")[execution["source"]], - h.p(class_="mt-1 font-mono text-xs text-slate-500")[execution["slug"]], + h.div(class_="font-semibold text-slate-950")[_text(execution, "source")], + h.p(class_="mt-1 font-mono text-xs text-slate-500")[ + _text(execution, "slug") + ], ], h.div[ - h.p(class_="font-medium text-slate-900")[f"#{execution['execution_id']}"], - h.p(class_="mt-1 text-xs text-slate-500")[f"job {execution['job_id']}"], + h.p(class_="font-medium text-slate-900")[ + f"#{_text(execution, 'execution_id')}" + ], + h.p(class_="mt-1 text-xs text-slate-500")[ + f"job {_text(execution, 'job_id')}" + ], ], h.div[ - h.p(class_="font-medium text-slate-900")[execution["ended_at"]], - h.p(class_="mt-1 text-xs text-slate-500")[execution["summary"]], + h.p(class_="font-medium text-slate-900")[_text(execution, "ended_at")], + h.p(class_="mt-1 text-xs text-slate-500")[_text(execution, "summary")], ], status_badge( - label=execution["status"], - tone=execution["status_tone"], + label=_text(execution, "status"), + tone=_text(execution, "status_tone"), ), h.div(class_="min-w-48 whitespace-normal")[ - h.p(class_="font-medium text-slate-900")[execution["stats"]] + h.p(class_="font-medium text-slate-900")[_text(execution, "stats")] ], inline_link( - href=execution["log_href"], + href=_text(execution, "log_href"), label="View log", tone="amber", ), ) -def delete_confirmation_preview() -> Renderable: - return section_card( - content=( - h.div(class_="flex items-center justify-between gap-4")[ - h.div[ - h.p( - class_="text-xs font-semibold uppercase tracking-[0.22em] text-amber-600" - )["Modal preview"], - h.h2(class_="mt-2 text-xl font-semibold text-slate-950")[ - "Delete confirmation" - ], - h.p(class_="mt-2 max-w-2xl text-sm text-slate-600")[ - "Upcoming jobs use a confirmation modal before deleting a job. This is the intended open state, placed inline for the static UI pass." - ], - ], - status_badge(label="Preview", tone="scheduled"), - ], - h.div(class_="mt-3 rounded-[1.5rem] bg-stone-50 p-5")[ - h.p( - class_="text-sm font-semibold uppercase tracking-[0.18em] text-slate-500" - )["Delete job"], - h.h3(class_="mt-2 text-lg font-semibold text-slate-950")[ - "Delete Weekly digest feed?" - ], - h.p(class_="mt-3 max-w-2xl text-sm leading-6 text-slate-600")[ - "This removes the source-linked job record and its schedule. Existing execution history and log files remain available for inspection." - ], - h.div(class_="mt-6 flex flex-wrap gap-3")[ - inline_button(label="Cancel"), - inline_button(label="Delete job", tone="danger"), - ], - ], - ) - ) - - -def runs_page() -> Renderable: - running_rows = tuple(_running_row(execution) for execution in RUNNING_EXECUTIONS) - upcoming_rows = tuple(_upcoming_row(job) for job in UPCOMING_JOBS) - completed_rows = tuple( - _completed_row(execution) for execution in COMPLETED_EXECUTIONS - ) +def runs_page( + *, + running_executions: tuple[Mapping[str, object], ...] | None = None, + upcoming_jobs: tuple[Mapping[str, object], ...] | None = None, + completed_executions: tuple[Mapping[str, object], ...] | None = None, +) -> Renderable: + running_items = running_executions or () + upcoming_items = upcoming_jobs or () + completed_items = completed_executions or () + running_rows = tuple(_running_row(execution) for execution in running_items) + upcoming_rows = tuple(_upcoming_row(job) for job in upcoming_items) + completed_rows = tuple(_completed_row(execution) for execution in completed_items) return page_shell( current_path="/runs", @@ -286,7 +207,7 @@ def runs_page() -> Renderable: table_section( eyebrow="Queue", title="Upcoming jobs", - subtitle="Scheduled work shows enable or disable state, run-now affordances, and delete controls. Run now is disabled while the job is already running.", + subtitle="Scheduled work shows enable or disable state, run-now affordances, and destructive delete controls. Deleting removes the source-linked job and its execution history.", headers=( "Source", "Next run", @@ -311,17 +232,43 @@ def runs_page() -> Renderable: ), rows=completed_rows, ), - delete_confirmation_preview(), ), ) -def execution_logs_page(*, job_id: int, execution_id: int) -> Renderable: +def execution_logs_page( + *, + job_id: int, + execution_id: int, + log_view: Mapping[str, object] | None = None, +) -> Renderable: + if log_view is None: + log_view = { + "title": f"Job {job_id} / execution {execution_id}", + "description": "Plain text log view routed through the app.", + "status_label": "Unavailable", + "status_tone": "failed", + "log_text": "", + "error_message": "Execution log is only available from persisted job runs.", + } + + error_message = _maybe_text(log_view, "error_message") + error_notice = ( + h.div( + class_="mt-3 rounded-2xl bg-rose-50 px-4 py-3 text-sm font-medium text-rose-800" + )[ + h.p["Execution log unavailable"], + h.p(class_="mt-1 font-normal")[error_message], + ] + if error_message is not None + else None + ) + return page_shell( current_path=f"/job/{job_id}/execution/{execution_id}/logs", eyebrow="Execution log", - title=f"Job {job_id} / execution {execution_id}", - description="Plain text log view routed through the app. The final version will stream appended lines while the worker is still active.", + title=_text(log_view, "title"), + description=_text(log_view, "description"), actions=muted_action_link(href="/runs", label="Back to runs"), content=( section_card( @@ -335,25 +282,18 @@ def execution_logs_page(*, job_id: int, execution_id: int) -> Renderable: f"/job/{job_id}/execution/{execution_id}/logs" ], h.p(class_="mt-2 text-sm text-slate-600")[ - "Streaming text log view. No arbitrary file paths are exposed in the UI." + _text(log_view, "description") ], ], - status_badge(label="Streaming", tone="running"), + status_badge( + label=_text(log_view, "status_label"), + tone=_text(log_view, "status_tone"), + ), ], + error_notice, h.pre( class_="mt-3 overflow-x-auto rounded-[1.5rem] bg-slate-950 p-5 text-xs leading-6 text-emerald-200" - )[ - "\n".join( - ( - "11:42:01 scheduler: run_now requested for job 7", - "11:42:02 worker[7]: starting pangea-mobile", - "11:42:08 stats: requests=18 items=4 bytes=1.8MB", - "11:42:11 stats: requests=26 items=7 bytes=2.6MB", - "11:42:17 stats: requests=31 items=9 bytes=3.0MB", - "11:42:24 worker[7]: waiting for more log lines ...", - ) - ) - ], + )[_text(log_view, "log_text")], ) ), ), diff --git a/repub/sql/001_initial.sql b/repub/sql/001_initial.sql index 12f3d41..43ad445 100644 --- a/repub/sql/001_initial.sql +++ b/repub/sql/001_initial.sql @@ -52,6 +52,7 @@ CREATE TABLE IF NOT EXISTS job_execution ( created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, started_at TEXT, ended_at TEXT, + stop_requested_at TEXT, running_status INTEGER NOT NULL DEFAULT 0 CHECK (running_status BETWEEN 0 AND 4), requests_count INTEGER NOT NULL DEFAULT 0, items_count INTEGER NOT NULL DEFAULT 0, diff --git a/repub/web.py b/repub/web.py index fdb24fa..a216448 100644 --- a/repub/web.py +++ b/repub/web.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio import hashlib from collections.abc import AsyncGenerator, Awaitable, Callable +from pathlib import Path from typing import TypedDict, cast from urllib.parse import urlparse @@ -15,8 +16,16 @@ from peewee import IntegrityError from quart import Quart, Response, request, url_for from repub.datastar import RefreshBroker, render_stream +from repub.jobs import ( + JobRuntime, + load_dashboard_view, + load_execution_log_view, + load_runs_view, +) from repub.model import ( + Job, create_source, + delete_job_source, initialize_database, load_source_form, load_sources, @@ -25,7 +34,7 @@ from repub.model import ( ) from repub.pages import ( create_source_page, - dashboard_page, + dashboard_page_with_data, edit_source_page, execution_logs_page, runs_page, @@ -35,6 +44,8 @@ from repub.pages import ( from repub.pages.sources import PANGEA_CONTENT_FORMATS, PANGEA_CONTENT_TYPES REFRESH_BROKER_KEY = "repub.refresh_broker" +JOB_RUNTIME_KEY = "repub.job_runtime" +DEFAULT_LOG_DIR = Path("out/logs") RenderFunction = Callable[[], Awaitable[Renderable]] @@ -83,7 +94,12 @@ def _render_shim_page(*, stylesheet_href: str, datastar_src: str) -> tuple[str, def create_app() -> Quart: app = Quart(__name__) app.config["REPUB_DB_PATH"] = str(initialize_database()) + app.config.setdefault("REPUB_LOG_DIR", DEFAULT_LOG_DIR) + app.config.setdefault("REPUB_JOB_WORKER_DURATION_SECONDS", 20.0) + app.config.setdefault("REPUB_JOB_WORKER_STATS_INTERVAL_SECONDS", 1.0) + app.config.setdefault("REPUB_JOB_WORKER_FAILURE_PROBABILITY", 0.3) app.extensions[REFRESH_BROKER_KEY] = RefreshBroker() + app.extensions[JOB_RUNTIME_KEY] = None @app.get("/") @app.get("/sources") @@ -112,7 +128,7 @@ def create_app() -> Quart: @app.post("/") async def dashboard_patch() -> DatastarResponse: - return _page_patch_response(app, render_dashboard) + return _page_patch_response(app, lambda: render_dashboard(app)) @app.post("/sources") async def sources_patch() -> DatastarResponse: @@ -147,6 +163,7 @@ def create_app() -> Quart: {"_formError": "Slug must be unique.", "_formSuccess": ""} ) ) + get_job_runtime(app).sync_jobs() trigger_refresh(app) return DatastarResponse(SSE.redirect("/sources")) @@ -171,20 +188,58 @@ def create_app() -> Quart: {"_formError": "Source does not exist.", "_formSuccess": ""} ) ) + get_job_runtime(app).sync_jobs() trigger_refresh(app) return DatastarResponse(SSE.redirect("/sources")) @app.post("/runs") async def runs_patch() -> DatastarResponse: - return _page_patch_response(app, render_runs) + return _page_patch_response(app, lambda: render_runs(app)) + + @app.post("/actions/jobs//run-now") + async def run_job_now_action(job_id: int) -> Response: + get_job_runtime(app).run_job_now(job_id, reason="manual") + trigger_refresh(app) + return Response(status=204) + + @app.post("/actions/jobs//toggle-enabled") + async def toggle_job_enabled_action(job_id: int) -> Response: + job = Job.get_or_none(id=job_id) + if job is not None: + get_job_runtime(app).set_job_enabled(job_id, enabled=not job.enabled) + trigger_refresh(app) + return Response(status=204) + + @app.post("/actions/jobs//delete") + async def delete_job_action(job_id: int) -> Response: + delete_job_source(job_id) + get_job_runtime(app).sync_jobs() + trigger_refresh(app) + return Response(status=204) + + @app.post("/actions/executions//cancel") + async def cancel_execution_action(execution_id: int) -> Response: + get_job_runtime(app).request_execution_cancel(execution_id) + trigger_refresh(app) + return Response(status=204) @app.post("/job//execution//logs") async def logs_patch(job_id: int, execution_id: int) -> DatastarResponse: async def render() -> Renderable: - return await render_execution_logs(job_id=job_id, execution_id=execution_id) + return await render_execution_logs( + app, job_id=job_id, execution_id=execution_id + ) return _page_patch_response(app, render) + @app.before_serving + async def start_runtime() -> None: + get_job_runtime(app).start() + + @app.after_serving + async def stop_runtime() -> None: + get_job_runtime(app).shutdown() + return app @@ -192,12 +247,39 @@ def get_refresh_broker(app: Quart) -> RefreshBroker: return cast(RefreshBroker, app.extensions[REFRESH_BROKER_KEY]) +def get_job_runtime(app: Quart) -> JobRuntime: + runtime = cast(JobRuntime | None, app.extensions.get(JOB_RUNTIME_KEY)) + if runtime is None: + runtime = JobRuntime( + log_dir=app.config["REPUB_LOG_DIR"], + worker_duration_seconds=float( + app.config["REPUB_JOB_WORKER_DURATION_SECONDS"] + ), + worker_stats_interval_seconds=float( + app.config["REPUB_JOB_WORKER_STATS_INTERVAL_SECONDS"] + ), + worker_failure_probability=float( + app.config["REPUB_JOB_WORKER_FAILURE_PROBABILITY"] + ), + refresh_callback=lambda: trigger_refresh(app), + ) + app.extensions[JOB_RUNTIME_KEY] = runtime + return runtime + + def trigger_refresh(app: Quart, event: object = "refresh-event") -> None: get_refresh_broker(app).publish(event) -async def render_dashboard() -> Renderable: - return dashboard_page() +async def render_dashboard(app: Quart | None = None) -> Renderable: + if app is None: + return dashboard_page_with_data() + + view = load_dashboard_view(log_dir=app.config["REPUB_LOG_DIR"]) + return dashboard_page_with_data( + snapshot=cast(dict[str, str], view["snapshot"]), + running_executions=cast(tuple[dict[str, object], ...], view["running"]), + ) async def render_sources(app: Quart | None = None) -> Renderable: @@ -221,12 +303,41 @@ async def render_edit_source(slug: str) -> Renderable: ) -async def render_runs() -> Renderable: - return runs_page() +async def render_runs(app: Quart | None = None) -> Renderable: + if app is None: + return runs_page() + + view = load_runs_view(log_dir=app.config["REPUB_LOG_DIR"]) + return runs_page( + running_executions=cast(tuple[dict[str, object], ...], view["running"]), + upcoming_jobs=cast(tuple[dict[str, object], ...], view["upcoming"]), + completed_executions=cast(tuple[dict[str, object], ...], view["completed"]), + ) -async def render_execution_logs(*, job_id: int, execution_id: int) -> Renderable: - return execution_logs_page(job_id=job_id, execution_id=execution_id) +async def render_execution_logs( + app: Quart | None = None, *, job_id: int, execution_id: int +) -> Renderable: + if app is None: + return execution_logs_page(job_id=job_id, execution_id=execution_id) + + log_view = load_execution_log_view( + log_dir=app.config["REPUB_LOG_DIR"], + job_id=job_id, + execution_id=execution_id, + ) + return execution_logs_page( + job_id=job_id, + execution_id=execution_id, + log_view={ + "title": log_view.title, + "description": log_view.description, + "status_label": log_view.status_label, + "status_tone": log_view.status_tone, + "log_text": log_view.log_text, + "error_message": log_view.error_message, + }, + ) def _page_patch_response(app: Quart, render: RenderFunction) -> DatastarResponse: diff --git a/tests/test_scheduler_runtime.py b/tests/test_scheduler_runtime.py new file mode 100644 index 0000000..df226dc --- /dev/null +++ b/tests/test_scheduler_runtime.py @@ -0,0 +1,346 @@ +from __future__ import annotations + +import asyncio +import json +import time +from pathlib import Path + +from repub.jobs import JobArtifacts, JobRuntime +from repub.model import ( + Job, + JobExecution, + JobExecutionStatus, + Source, + create_source, + initialize_database, +) +from repub.web import create_app, get_job_runtime, render_execution_logs, render_runs + + +def test_job_runtime_syncs_enabled_jobs_into_apscheduler(tmp_path: Path) -> None: + initialize_database(tmp_path / "scheduler.db") + enabled_source = create_source( + name="Enabled source", + slug="enabled-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/enabled.xml", + ) + disabled_source = create_source( + name="Disabled source", + slug="disabled-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="15", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/disabled.xml", + ) + enabled_job = Job.get(Job.source == enabled_source) + disabled_job = Job.get(Job.source == disabled_source) + + runtime = JobRuntime( + log_dir=tmp_path / "out" / "logs", + worker_duration_seconds=0.4, + worker_stats_interval_seconds=0.05, + worker_failure_probability=0.0, + ) + try: + runtime.start() + runtime.sync_jobs() + + scheduled_ids = {job.id for job in runtime.scheduler.get_jobs()} + + assert f"job-{enabled_job.id}" in scheduled_ids + assert f"job-{disabled_job.id}" not in scheduled_ids + + enabled_job.enabled = False + enabled_job.save() + runtime.sync_jobs() + + scheduled_ids = {job.id for job in runtime.scheduler.get_jobs()} + assert f"job-{enabled_job.id}" not in scheduled_ids + finally: + runtime.shutdown() + + +def test_job_runtime_run_now_writes_log_and_stats_and_marks_success( + tmp_path: Path, +) -> None: + initialize_database(tmp_path / "run-now.db") + source = create_source( + name="Manual source", + slug="manual-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/manual.xml", + ) + job = Job.get(Job.source == source) + + runtime = JobRuntime( + log_dir=tmp_path / "out" / "logs", + worker_duration_seconds=0.35, + worker_stats_interval_seconds=0.05, + worker_failure_probability=0.0, + ) + try: + runtime.start() + execution_id = runtime.run_job_now(job.id, reason="manual") + assert execution_id is not None + execution = _wait_for_terminal_execution(execution_id) + artifacts = JobArtifacts.for_execution( + log_dir=tmp_path / "out" / "logs", + job_id=job.id, + execution_id=execution_id, + ) + + assert execution.running_status == JobExecutionStatus.SUCCEEDED + assert execution.started_at is not None + assert execution.ended_at is not None + assert execution.requests_count > 0 + assert execution.items_count > 0 + assert execution.bytes_count > 0 + assert artifacts.log_path.exists() + assert artifacts.stats_path.exists() + assert "starting simulated crawl" in artifacts.log_path.read_text( + encoding="utf-8" + ) + + stats_lines = [ + json.loads(line) + for line in artifacts.stats_path.read_text(encoding="utf-8").splitlines() + ] + assert len(stats_lines) >= 2 + assert stats_lines[-1]["requests_count"] == execution.requests_count + finally: + runtime.shutdown() + + +def test_job_runtime_cancel_marks_execution_canceled(tmp_path: Path) -> None: + initialize_database(tmp_path / "cancel.db") + source = create_source( + name="Cancelable source", + slug="cancelable-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/cancelable.xml", + ) + job = Job.get(Job.source == source) + + runtime = JobRuntime( + log_dir=tmp_path / "out" / "logs", + worker_duration_seconds=2.0, + worker_stats_interval_seconds=0.1, + worker_failure_probability=0.0, + ) + try: + runtime.start() + execution_id = runtime.run_job_now(job.id, reason="manual") + assert execution_id is not None + _wait_for_running_execution(execution_id) + + runtime.request_execution_cancel(execution_id) + execution = _wait_for_terminal_execution(execution_id) + artifacts = JobArtifacts.for_execution( + log_dir=tmp_path / "out" / "logs", + job_id=job.id, + execution_id=execution_id, + ) + + assert execution.running_status == JobExecutionStatus.CANCELED + assert execution.ended_at is not None + assert execution.stop_requested_at is not None + assert "graceful stop requested" in artifacts.log_path.read_text( + encoding="utf-8" + ) + finally: + runtime.shutdown() + + +def test_render_runs_uses_database_backed_jobs_and_executions( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "runs-page.db" + log_dir = tmp_path / "out" / "logs" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + + app = create_app() + app.config["REPUB_LOG_DIR"] = log_dir + app.config["REPUB_JOB_WORKER_DURATION_SECONDS"] = 0.35 + app.config["REPUB_JOB_WORKER_STATS_INTERVAL_SECONDS"] = 0.05 + app.config["REPUB_JOB_WORKER_FAILURE_PROBABILITY"] = 0.0 + + source = create_source( + name="Runs page source", + slug="runs-page-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/runs-page.xml", + ) + job = Job.get(Job.source == source) + runtime = get_job_runtime(app) + runtime.start() + try: + execution_id = runtime.run_job_now(job.id, reason="manual") + assert execution_id is not None + execution = _wait_for_terminal_execution(execution_id) + + async def run() -> None: + body = str(await render_runs(app)) + + assert "runs-page-source" in body + assert "Running job executions" in body + assert "Upcoming jobs" in body + assert "Completed job executions" in body + assert f"/job/{job.id}/execution/{execution.get_id()}/logs" in body + assert "Succeeded" in body + assert "Run now" in body + + asyncio.run(run()) + finally: + runtime.shutdown() + + +def test_render_execution_logs_handles_missing_execution_and_missing_log_file( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "log-errors.db" + log_dir = tmp_path / "out" / "logs" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + + app = create_app() + app.config["REPUB_LOG_DIR"] = log_dir + source = create_source( + name="Log source", + slug="log-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/log-source.xml", + ) + job = Job.get(Job.source == source) + execution = JobExecution.create( + job=job, + running_status=JobExecutionStatus.FAILED, + ) + + async def run() -> None: + missing_execution = str( + await render_execution_logs(app, job_id=job.id, execution_id=9999) + ) + missing_log = str( + await render_execution_logs(app, job_id=job.id, execution_id=execution.id) + ) + + assert "Execution log unavailable" in missing_execution + assert "Execution does not exist." in missing_execution + assert "Execution log unavailable" in missing_log + assert "Log file has not been created yet." in missing_log + + asyncio.run(run()) + + +def test_delete_job_action_removes_source_job_and_execution_history( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "delete-job.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + + async def run() -> None: + app = create_app() + client = app.test_client() + + source = create_source( + name="Delete source", + slug="delete-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*/30", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/delete.xml", + ) + job = Job.get(Job.source == source) + execution = JobExecution.create( + job=job, + running_status=JobExecutionStatus.SUCCEEDED, + ) + + response = await client.post(f"/actions/jobs/{job.id}/delete") + + assert response.status_code == 204 + assert Source.get_or_none(Source.slug == "delete-source") is None + assert Job.get_or_none(id=job.id) is None + assert JobExecution.get_or_none(id=int(execution.get_id())) is None + + asyncio.run(run()) + + +def _wait_for_running_execution( + execution_id: int, *, timeout_seconds: float = 2.0 +) -> JobExecution: + deadline = time.monotonic() + timeout_seconds + while time.monotonic() < deadline: + execution = JobExecution.get_by_id(execution_id) + if execution.running_status == JobExecutionStatus.RUNNING: + return execution + time.sleep(0.02) + raise AssertionError(f"execution {execution_id} never entered RUNNING state") + + +def _wait_for_terminal_execution( + execution_id: int, *, timeout_seconds: float = 4.0 +) -> JobExecution: + deadline = time.monotonic() + timeout_seconds + while time.monotonic() < deadline: + execution = JobExecution.get_by_id(execution_id) + if execution.running_status in { + JobExecutionStatus.SUCCEEDED, + JobExecutionStatus.FAILED, + JobExecutionStatus.CANCELED, + }: + return execution + time.sleep(0.02) + raise AssertionError(f"execution {execution_id} did not finish in time") diff --git a/tests/test_web.py b/tests/test_web.py index 3952930..51d469d 100644 --- a/tests/test_web.py +++ b/tests/test_web.py @@ -5,7 +5,15 @@ from pathlib import Path from typing import Any, cast from repub.datastar import RefreshBroker, render_sse_event, render_stream -from repub.model import Job, Source, SourceFeed, SourcePangea, create_source +from repub.model import ( + Job, + JobExecution, + JobExecutionStatus, + Source, + SourceFeed, + SourcePangea, + create_source, +) from repub.web import ( create_app, get_refresh_broker, @@ -141,15 +149,20 @@ def test_render_stream_yields_on_connect_and_refresh() -> None: asyncio.run(run()) -def test_render_dashboard_shows_dashboard_information_architecture() -> None: +def test_render_dashboard_shows_dashboard_information_architecture( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "dashboard-render.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + async def run() -> None: - body = str(await render_dashboard()) + app = create_app() + body = str(await render_dashboard(app)) assert "Operational snapshot" in body assert "Running executions" in body assert 'href="/sources"' in body assert 'href="/runs"' in body - assert "/job/7/execution/104/logs" in body assert "Create source" in body asyncio.run(run()) @@ -569,27 +582,97 @@ def test_create_source_action_validates_duplicate_slug_and_pangea_type( asyncio.run(run()) -def test_render_runs_shows_running_upcoming_and_completed_tables() -> None: +def test_render_runs_shows_running_upcoming_and_completed_tables( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "runs-render.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + async def run() -> None: - body = str(await render_runs()) + app = create_app() + + source = create_source( + name="Runs render source", + slug="runs-render-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*/30", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/runs.xml", + ) + job = Job.get(Job.source == source) + execution = JobExecution.create( + job=job, + running_status=JobExecutionStatus.SUCCEEDED, + ) + + body = str(await render_runs(app)) assert "Running job executions" in body assert "Upcoming jobs" in body assert "Completed job executions" in body - assert "Delete confirmation" in body - assert "/job/11/execution/101/logs" in body - assert "Already running" in body + assert "runs-render-source" in body + assert f"/job/{job.id}/execution/{execution.get_id()}/logs" in body + assert "Already running" not in body asyncio.run(run()) -def test_render_execution_logs_uses_app_route() -> None: - async def run() -> None: - body = str(await render_execution_logs(job_id=7, execution_id=104)) +def test_render_execution_logs_uses_app_route(monkeypatch, tmp_path: Path) -> None: + db_path = tmp_path / "logs-render.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) - assert "Job 7 / execution 104" in body - assert "/job/7/execution/104/logs" in body - assert "Streaming text log view" in body + async def run() -> None: + log_dir = tmp_path / "out" / "logs" + app = create_app() + app.config["REPUB_LOG_DIR"] = log_dir + + source = create_source( + name="Log render source", + slug="log-render-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/30", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/logs.xml", + ) + job = Job.get(Job.source == source) + execution = JobExecution.create( + job=job, + running_status=JobExecutionStatus.RUNNING, + ) + log_path = log_dir / f"job-{job.id}-execution-{execution.get_id()}.log" + log_path.parent.mkdir(parents=True, exist_ok=True) + log_path.write_text( + "\n".join( + ( + "scheduler: run_now requested", + "worker: starting simulated crawl", + "worker: waiting for more log lines ...", + ) + ), + encoding="utf-8", + ) + + body = str( + await render_execution_logs( + app, job_id=job.id, execution_id=int(execution.get_id()) + ) + ) + + assert f"Job {job.id} / execution {execution.get_id()}" in body + assert f"/job/{job.id}/execution/{execution.get_id()}/logs" in body + assert "Route: /job/" in body assert "waiting for more log lines" in body asyncio.run(run())