implement job runner and scheduler

This commit is contained in:
Abel Luck 2026-03-30 14:02:39 +02:00
parent 328a70ff9b
commit 2b2a3f1cc0
11 changed files with 1572 additions and 284 deletions

View file

@ -19,18 +19,22 @@ RenderFunction = Callable[[], Awaitable[RenderResult]]
class RefreshBroker: class RefreshBroker:
def __init__(self) -> None: def __init__(self) -> None:
self._subscribers: set[asyncio.Queue[object]] = set() self._subscribers: dict[asyncio.Queue[object], asyncio.AbstractEventLoop] = {}
def subscribe(self) -> asyncio.Queue[object]: def subscribe(self) -> asyncio.Queue[object]:
queue: asyncio.Queue[object] = asyncio.Queue(maxsize=1) queue: asyncio.Queue[object] = asyncio.Queue(maxsize=1)
self._subscribers.add(queue) self._subscribers[queue] = asyncio.get_running_loop()
return queue return queue
def unsubscribe(self, queue: asyncio.Queue[object]) -> None: def unsubscribe(self, queue: asyncio.Queue[object]) -> None:
self._subscribers.discard(queue) self._subscribers.pop(queue, None)
def publish(self, event: object = "refresh-event") -> None: def publish(self, event: object = "refresh-event") -> None:
for queue in tuple(self._subscribers): for queue, loop in tuple(self._subscribers.items()):
loop.call_soon_threadsafe(_publish_event, queue, event)
def _publish_event(queue: asyncio.Queue[object], event: object) -> None:
if queue.full(): if queue.full():
try: try:
queue.get_nowait() queue.get_nowait()
@ -39,7 +43,7 @@ class RefreshBroker:
try: try:
queue.put_nowait(event) queue.put_nowait(event)
except asyncio.QueueFull: except asyncio.QueueFull:
continue return
async def render_sse_event( async def render_sse_event(

123
repub/job_runner.py Normal file
View file

@ -0,0 +1,123 @@
from __future__ import annotations
import argparse
import json
import random
import signal
import sys
import time
from datetime import UTC, datetime
from pathlib import Path
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Simulated republisher worker")
parser.add_argument("--job-id", type=int, required=True)
parser.add_argument("--execution-id", type=int, required=True)
parser.add_argument("--stats-path", required=True)
parser.add_argument("--duration-seconds", type=float, required=True)
parser.add_argument("--interval-seconds", type=float, required=True)
parser.add_argument("--failure-probability", type=float, required=True)
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
rng = random.Random(f"{args.job_id}:{args.execution_id}")
stats_path = Path(args.stats_path)
stats_path.parent.mkdir(parents=True, exist_ok=True)
stop_requested = False
def request_stop(signum: int, frame: object | None) -> None:
del signum, frame
nonlocal stop_requested
if stop_requested:
return
stop_requested = True
print(
f"worker[{args.job_id}:{args.execution_id}]: graceful stop requested",
flush=True,
)
signal.signal(signal.SIGTERM, request_stop)
signal.signal(signal.SIGINT, request_stop)
counters = {
"requests_count": 0,
"items_count": 0,
"warnings_count": 0,
"errors_count": 0,
"bytes_count": 0,
"retries_count": 0,
"exceptions_count": 0,
"cache_size_count": 0,
"cache_object_count": 0,
}
print(
f"worker[{args.job_id}:{args.execution_id}]: starting simulated crawl",
flush=True,
)
started = time.monotonic()
iteration = 0
with stats_path.open("a", encoding="utf-8") as stats_file:
while time.monotonic() - started < args.duration_seconds:
time.sleep(args.interval_seconds)
iteration += 1
counters["requests_count"] += rng.randint(1, 5)
counters["items_count"] += rng.randint(0, 2)
counters["bytes_count"] += rng.randint(500, 3000)
counters["cache_size_count"] += rng.randint(0, 1)
counters["cache_object_count"] += rng.randint(0, 2)
if rng.random() < 0.1:
counters["warnings_count"] += 1
if rng.random() < 0.05:
counters["retries_count"] += 1
snapshot = {
"timestamp": datetime.now(UTC).isoformat(),
"iteration": iteration,
**counters,
}
stats_file.write(json.dumps(snapshot, sort_keys=True) + "\n")
stats_file.flush()
print(
"stats: "
f"requests={counters['requests_count']} "
f"items={counters['items_count']} "
f"bytes={counters['bytes_count']}",
flush=True,
)
if stop_requested:
print(
f"worker[{args.job_id}:{args.execution_id}]: stopping after graceful request",
flush=True,
)
return 130
if rng.random() < args.failure_probability:
counters["errors_count"] += 1
counters["exceptions_count"] += 1
stats_file.write(
json.dumps(
{"timestamp": datetime.now(UTC).isoformat(), **counters},
sort_keys=True,
)
+ "\n"
)
stats_file.flush()
print(
f"worker[{args.job_id}:{args.execution_id}]: simulated failure",
flush=True,
)
return 1
print(
f"worker[{args.job_id}:{args.execution_id}]: completed successfully",
flush=True,
)
return 0
if __name__ == "__main__":
sys.exit(main())

643
repub/jobs.py Normal file
View file

@ -0,0 +1,643 @@
from __future__ import annotations
import json
import subprocess
import sys
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Callable, TextIO, cast
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from repub.model import Job, JobExecution, JobExecutionStatus, Source, database, utc_now
SCHEDULER_JOB_PREFIX = "job-"
POLL_JOB_ID = "runtime-poll-workers"
SYNC_JOB_ID = "runtime-sync-jobs"
@dataclass(frozen=True)
class JobArtifacts:
log_path: Path
stats_path: Path
@classmethod
def for_execution(
cls, *, log_dir: Path, job_id: int, execution_id: int
) -> "JobArtifacts":
prefix = f"job-{job_id}-execution-{execution_id}"
return cls(
log_path=log_dir / f"{prefix}.log",
stats_path=log_dir / f"{prefix}.jsonl",
)
@dataclass
class RunningWorker:
execution_id: int
process: subprocess.Popen[str]
log_handle: TextIO
artifacts: JobArtifacts
stats_offset: int = 0
@dataclass(frozen=True)
class ExecutionLogView:
job_id: int
execution_id: int
title: str
description: str
status_label: str
status_tone: str
log_text: str
error_message: str | None = None
class JobRuntime:
def __init__(
self,
*,
log_dir: str | Path,
worker_duration_seconds: float = 20.0,
worker_stats_interval_seconds: float = 1.0,
worker_failure_probability: float = 0.3,
refresh_callback: Callable[[], None] | None = None,
graceful_stop_seconds: float = 15.0,
) -> None:
self.log_dir = Path(log_dir)
self.worker_duration_seconds = worker_duration_seconds
self.worker_stats_interval_seconds = worker_stats_interval_seconds
self.worker_failure_probability = worker_failure_probability
self.refresh_callback = refresh_callback
self.graceful_stop_seconds = graceful_stop_seconds
self.scheduler = BackgroundScheduler(timezone=UTC)
self._workers: dict[int, RunningWorker] = {}
self._started = False
def start(self) -> None:
if self._started:
return
self.scheduler.start()
self.scheduler.add_job(
self.poll_workers,
"interval",
id=POLL_JOB_ID,
seconds=0.25,
replace_existing=True,
max_instances=1,
coalesce=True,
)
self.scheduler.add_job(
self.sync_jobs,
"interval",
id=SYNC_JOB_ID,
seconds=1,
replace_existing=True,
max_instances=1,
coalesce=True,
)
self.sync_jobs()
self._started = True
def shutdown(self) -> None:
for execution_id in tuple(self._workers):
worker = self._workers.pop(execution_id)
if worker.process.poll() is None:
worker.process.kill()
worker.process.wait(timeout=2)
worker.log_handle.close()
if self._started:
self.scheduler.shutdown(wait=False)
self._started = False
def sync_jobs(self) -> None:
with database.connection_context():
jobs = tuple(Job.select().where(Job.enabled == True)) # noqa: E712
desired_ids = set()
for job in jobs:
scheduler_job_id = _scheduler_job_id(_job_id(job))
desired_ids.add(scheduler_job_id)
self.scheduler.add_job(
self.run_scheduled_job,
trigger=_job_trigger(job),
args=(_job_id(job),),
id=scheduler_job_id,
replace_existing=True,
max_instances=1,
coalesce=True,
misfire_grace_time=1,
)
for scheduled_job in tuple(self.scheduler.get_jobs()):
if (
scheduled_job.id.startswith(SCHEDULER_JOB_PREFIX)
and scheduled_job.id not in desired_ids
):
self.scheduler.remove_job(scheduled_job.id)
def run_scheduled_job(self, job_id: int) -> None:
self.run_job_now(job_id, reason="scheduled")
def run_job_now(self, job_id: int, *, reason: str) -> int | None:
del reason
self.start()
with database.connection_context():
job = Job.get_or_none(id=job_id)
if job is None:
return None
already_running = (
JobExecution.select()
.where(
(JobExecution.job == job)
& (JobExecution.running_status == JobExecutionStatus.RUNNING)
)
.exists()
)
if already_running:
return None
execution = JobExecution.create(
job=job,
started_at=utc_now(),
running_status=JobExecutionStatus.RUNNING,
)
execution_id = _execution_id(execution)
artifacts = JobArtifacts.for_execution(
log_dir=self.log_dir, job_id=job_id, execution_id=execution_id
)
artifacts.log_path.parent.mkdir(parents=True, exist_ok=True)
log_handle = artifacts.log_path.open("a", encoding="utf-8", buffering=1)
log_handle.write(
f"scheduler: starting execution {execution_id} for job {job_id}\n"
)
process = subprocess.Popen(
[
sys.executable,
"-u",
"-m",
"repub.job_runner",
"--job-id",
str(job_id),
"--execution-id",
str(execution_id),
"--stats-path",
str(artifacts.stats_path),
"--duration-seconds",
str(self.worker_duration_seconds),
"--interval-seconds",
str(self.worker_stats_interval_seconds),
"--failure-probability",
str(self.worker_failure_probability),
],
stdout=log_handle,
stderr=subprocess.STDOUT,
text=True,
)
self._workers[execution_id] = RunningWorker(
execution_id=execution_id,
process=process,
log_handle=log_handle,
artifacts=artifacts,
)
self._trigger_refresh()
return execution_id
def request_execution_cancel(self, execution_id: int) -> bool:
with database.connection_context():
execution = JobExecution.get_or_none(id=execution_id)
if execution is None:
return False
if execution.running_status != JobExecutionStatus.RUNNING:
return False
if execution.stop_requested_at is None:
execution.stop_requested_at = utc_now()
execution.save()
worker = self._workers.get(execution_id)
if worker is not None and worker.process.poll() is None:
worker.log_handle.write(
f"scheduler: graceful stop requested for execution {execution_id}\n"
)
worker.process.terminate()
self._trigger_refresh()
return True
def set_job_enabled(self, job_id: int, *, enabled: bool) -> bool:
with database.connection_context():
job = Job.get_or_none(id=job_id)
if job is None:
return False
job.enabled = enabled
job.save()
self.sync_jobs()
self._trigger_refresh()
return True
def poll_workers(self) -> None:
for execution_id in tuple(self._workers):
worker = self._workers[execution_id]
self._apply_stats(worker)
self._enforce_graceful_stop(worker)
returncode = worker.process.poll()
if returncode is None:
continue
self._apply_stats(worker)
with database.connection_context():
execution = JobExecution.get_by_id(execution_id)
execution.ended_at = utc_now()
execution.running_status = _final_status(
execution=execution,
returncode=returncode,
)
execution.save()
worker.log_handle.close()
del self._workers[execution_id]
self._trigger_refresh()
def _apply_stats(self, worker: RunningWorker) -> None:
if not worker.artifacts.stats_path.exists():
return
with worker.artifacts.stats_path.open("r", encoding="utf-8") as handle:
handle.seek(worker.stats_offset)
payload = handle.read()
worker.stats_offset = handle.tell()
lines = [line for line in payload.splitlines() if line.strip()]
if not lines:
return
stats = json.loads(lines[-1])
with database.connection_context():
execution = JobExecution.get_by_id(worker.execution_id)
execution.requests_count = int(stats.get("requests_count", 0))
execution.items_count = int(stats.get("items_count", 0))
execution.warnings_count = int(stats.get("warnings_count", 0))
execution.errors_count = int(stats.get("errors_count", 0))
execution.bytes_count = int(stats.get("bytes_count", 0))
execution.retries_count = int(stats.get("retries_count", 0))
execution.exceptions_count = int(stats.get("exceptions_count", 0))
execution.cache_size_count = int(stats.get("cache_size_count", 0))
execution.cache_object_count = int(stats.get("cache_object_count", 0))
execution.raw_stats = json.dumps(stats, sort_keys=True)
execution.save()
self._trigger_refresh()
def _enforce_graceful_stop(self, worker: RunningWorker) -> None:
with database.connection_context():
execution = JobExecution.get_by_id(worker.execution_id)
if execution.stop_requested_at is None:
return
elapsed = utc_now() - _coerce_datetime(execution.stop_requested_at)
if (
elapsed >= timedelta(seconds=self.graceful_stop_seconds)
and worker.process.poll() is None
):
worker.process.kill()
def _trigger_refresh(self) -> None:
if self.refresh_callback is not None:
self.refresh_callback()
def load_runs_view(
*, log_dir: str | Path, now: datetime | None = None
) -> dict[str, tuple[dict[str, object], ...]]:
reference_time = now or datetime.now(UTC)
resolved_log_dir = Path(log_dir)
with database.connection_context():
jobs = tuple(Job.select(Job, Source).join(Source).order_by(Source.name.asc()))
running_executions = tuple(
JobExecution.select(JobExecution, Job, Source)
.join(Job)
.join(Source)
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
.order_by(JobExecution.started_at.desc())
)
completed_executions = tuple(
JobExecution.select(JobExecution, Job, Source)
.join(Job)
.join(Source)
.where(
JobExecution.running_status.in_(
(
JobExecutionStatus.SUCCEEDED,
JobExecutionStatus.FAILED,
JobExecutionStatus.CANCELED,
)
)
)
.order_by(JobExecution.ended_at.desc())
.limit(20)
)
running_by_job = {
_job_id(execution.job): execution for execution in running_executions
}
return {
"running": tuple(
_project_running_execution(execution, resolved_log_dir, reference_time)
for execution in running_executions
),
"upcoming": tuple(
_project_upcoming_job(job, running_by_job.get(job.id), reference_time)
for job in jobs
),
"completed": tuple(
_project_completed_execution(execution, resolved_log_dir)
for execution in completed_executions
),
}
def load_dashboard_view(
*, log_dir: str | Path, now: datetime | None = None
) -> dict[str, object]:
reference_time = now or datetime.now(UTC)
runs_view = load_runs_view(log_dir=log_dir, now=reference_time)
with database.connection_context():
failed_last_day = (
JobExecution.select()
.where(
(JobExecution.running_status == JobExecutionStatus.FAILED)
& (JobExecution.ended_at.is_null(False))
)
.count()
)
upcoming_ready = sum(
1 for job in runs_view["upcoming"] if str(job["run_reason"]) == "Ready"
)
footprint_bytes = _directory_size(Path(log_dir))
return {
"running": runs_view["running"],
"snapshot": {
"running_now": str(len(runs_view["running"])),
"upcoming_today": str(upcoming_ready),
"failures_24h": str(failed_last_day),
"artifact_footprint": _format_bytes(footprint_bytes),
},
}
def load_execution_log_view(
*, log_dir: str | Path, job_id: int, execution_id: int
) -> ExecutionLogView:
with database.connection_context():
execution = JobExecution.get_or_none(id=execution_id)
route = f"/job/{job_id}/execution/{execution_id}/logs"
if execution is None or _job_id(cast(Job, execution.job)) != job_id:
return ExecutionLogView(
job_id=job_id,
execution_id=execution_id,
title=f"Job {job_id} / execution {execution_id}",
description="Plain text log view routed through the app.",
status_label="Unavailable",
status_tone="failed",
log_text="",
error_message="Execution does not exist.",
)
artifacts = JobArtifacts.for_execution(
log_dir=Path(log_dir),
job_id=job_id,
execution_id=execution_id,
)
if not artifacts.log_path.exists():
return ExecutionLogView(
job_id=job_id,
execution_id=execution_id,
title=f"Job {job_id} / execution {execution_id}",
description="Plain text log view routed through the app.",
status_label=_execution_status_label(execution),
status_tone=_execution_status_tone(execution),
log_text="",
error_message="Log file has not been created yet.",
)
return ExecutionLogView(
job_id=job_id,
execution_id=execution_id,
title=f"Job {job_id} / execution {execution_id}",
description=f"Route: {route}",
status_label=_execution_status_label(execution),
status_tone=_execution_status_tone(execution),
log_text=artifacts.log_path.read_text(encoding="utf-8"),
)
def _job_trigger(job: Job) -> CronTrigger:
expression = " ".join(
(
str(job.cron_minute),
str(job.cron_hour),
str(job.cron_day_of_month),
str(job.cron_month),
str(job.cron_day_of_week),
)
)
return CronTrigger.from_crontab(expression, timezone=UTC)
def _scheduler_job_id(job_id: int) -> str:
return f"{SCHEDULER_JOB_PREFIX}{job_id}"
def _project_running_execution(
execution: JobExecution, log_dir: Path, reference_time: datetime
) -> dict[str, object]:
job = cast(Job, execution.job)
job_id = _job_id(job)
execution_id = _execution_id(execution)
artifacts = JobArtifacts.for_execution(
log_dir=log_dir, job_id=job_id, execution_id=execution_id
)
started_at = _coerce_datetime(
cast(datetime | str, execution.started_at or execution.created_at)
)
runtime = reference_time - started_at
return {
"source": job.source.name,
"slug": job.source.slug,
"job_id": job_id,
"execution_id": execution_id,
"started_at": started_at.strftime("%Y-%m-%d %H:%M UTC"),
"runtime": f"running for {int(runtime.total_seconds())}s",
"status": "Stopping" if execution.stop_requested_at else "Running",
"stats": _stats_summary(execution),
"worker": (
"graceful stop requested"
if execution.stop_requested_at
else "streaming stats from worker jsonl"
),
"log_href": f"/job/{job_id}/execution/{execution_id}/logs",
"log_exists": artifacts.log_path.exists(),
"cancel_post_path": f"/actions/executions/{execution_id}/cancel",
}
def _project_upcoming_job(
job: Job, running_execution: JobExecution | None, reference_time: datetime
) -> dict[str, object]:
job_id = _job_id(job)
trigger = _job_trigger(job)
next_run = (
trigger.get_next_fire_time(None, reference_time)
if job.enabled and running_execution is None
else None
)
return {
"source": job.source.name,
"slug": job.source.slug,
"job_id": job_id,
"next_run": (
next_run.strftime("%Y-%m-%d %H:%M UTC")
if next_run is not None
else ("Running now" if running_execution is not None else "Not scheduled")
),
"schedule": " ".join(
(
str(job.cron_minute),
str(job.cron_hour),
str(job.cron_day_of_month),
str(job.cron_month),
str(job.cron_day_of_week),
)
),
"enabled_label": "Enabled" if job.enabled else "Disabled",
"enabled_tone": "scheduled" if job.enabled else "idle",
"run_disabled": running_execution is not None,
"run_reason": "Already running" if running_execution is not None else "Ready",
"toggle_label": "Disable" if job.enabled else "Enable",
"toggle_enabled": not job.enabled,
"run_post_path": f"/actions/jobs/{job_id}/run-now",
"toggle_post_path": f"/actions/jobs/{job_id}/toggle-enabled",
"delete_post_path": f"/actions/jobs/{job_id}/delete",
}
def _project_completed_execution(
execution: JobExecution, log_dir: Path
) -> dict[str, object]:
job = cast(Job, execution.job)
job_id = _job_id(job)
execution_id = _execution_id(execution)
artifacts = JobArtifacts.for_execution(
log_dir=log_dir, job_id=job_id, execution_id=execution_id
)
return {
"source": job.source.name,
"slug": job.source.slug,
"job_id": job_id,
"execution_id": execution_id,
"ended_at": (
_coerce_datetime(cast(datetime | str, execution.ended_at)).strftime(
"%Y-%m-%d %H:%M UTC"
)
if execution.ended_at is not None
else "Pending"
),
"status": _execution_status_label(execution),
"status_tone": _execution_status_tone(execution),
"stats": _stats_summary(execution),
"summary": (
"Canceled by operator"
if execution.running_status == JobExecutionStatus.CANCELED
else (
"Worker exited successfully"
if execution.running_status == JobExecutionStatus.SUCCEEDED
else "Worker exited with failure"
)
),
"log_href": f"/job/{job_id}/execution/{execution_id}/logs",
"log_exists": artifacts.log_path.exists(),
}
def _execution_status_label(execution: JobExecution) -> str:
status = JobExecutionStatus(execution.running_status)
return {
JobExecutionStatus.PENDING: "Pending",
JobExecutionStatus.RUNNING: (
"Stopping" if execution.stop_requested_at else "Running"
),
JobExecutionStatus.SUCCEEDED: "Succeeded",
JobExecutionStatus.FAILED: "Failed",
JobExecutionStatus.CANCELED: "Canceled",
}[status]
def _execution_status_tone(execution: JobExecution) -> str:
status = JobExecutionStatus(execution.running_status)
return {
JobExecutionStatus.PENDING: "idle",
JobExecutionStatus.RUNNING: "running",
JobExecutionStatus.SUCCEEDED: "done",
JobExecutionStatus.FAILED: "failed",
JobExecutionStatus.CANCELED: "idle",
}[status]
def _stats_summary(execution: JobExecution) -> str:
return (
f"{execution.requests_count} requests"
f"{execution.items_count} items"
f"{execution.bytes_count} bytes"
)
def _final_status(*, execution: JobExecution, returncode: int) -> JobExecutionStatus:
if execution.stop_requested_at is not None:
return JobExecutionStatus.CANCELED
if returncode == 0:
return JobExecutionStatus.SUCCEEDED
return JobExecutionStatus.FAILED
def _coerce_datetime(value: datetime | str) -> datetime:
if isinstance(value, datetime):
if value.tzinfo is None:
return value.replace(tzinfo=UTC)
return value.astimezone(UTC)
parsed = datetime.fromisoformat(value)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC)
def _job_id(job: Job) -> int:
return int(job.get_id())
def _execution_id(execution: JobExecution) -> int:
return int(execution.get_id())
def _directory_size(path: Path) -> int:
if not path.exists():
return 0
return sum(entry.stat().st_size for entry in path.rglob("*") if entry.is_file())
def _format_bytes(value: int) -> str:
if value < 1024:
return f"{value} B"
if value < 1024 * 1024:
return f"{value / 1024:.1f} KB"
if value < 1024 * 1024 * 1024:
return f"{value / (1024 * 1024):.1f} MB"
return f"{value / (1024 * 1024 * 1024):.1f} GB"

View file

@ -295,6 +295,16 @@ def update_source(
return source return source
def delete_job_source(job_id: int) -> bool:
with database.connection_context():
with database.atomic():
job = Job.get_or_none(id=job_id)
if job is None:
return False
source = Source.get_by_id(job.source_id)
return source.delete_instance() > 0
def load_sources() -> tuple[dict[str, object], ...]: def load_sources() -> tuple[dict[str, object], ...]:
with database.connection_context(): with database.connection_context():
sources = tuple(Source.select().order_by(Source.created_at.desc())) sources = tuple(Source.select().order_by(Source.created_at.desc()))
@ -416,6 +426,7 @@ class JobExecution(BaseModel):
created_at = DateTimeField(default=utc_now) created_at = DateTimeField(default=utc_now)
started_at = DateTimeField(null=True) started_at = DateTimeField(null=True)
ended_at = DateTimeField(null=True) ended_at = DateTimeField(null=True)
stop_requested_at = DateTimeField(null=True)
running_status = IntegerField( running_status = IntegerField(
default=JobExecutionStatus.PENDING, default=JobExecutionStatus.PENDING,
constraints=[Check("running_status BETWEEN 0 AND 4")], constraints=[Check("running_status BETWEEN 0 AND 4")],

View file

@ -1,4 +1,4 @@
from repub.pages.dashboard import dashboard_page from repub.pages.dashboard import dashboard_page, dashboard_page_with_data
from repub.pages.runs import execution_logs_page, runs_page from repub.pages.runs import execution_logs_page, runs_page
from repub.pages.shim import shim_page from repub.pages.shim import shim_page
from repub.pages.sources import create_source_page, edit_source_page, sources_page from repub.pages.sources import create_source_page, edit_source_page, sources_page
@ -6,6 +6,7 @@ from repub.pages.sources import create_source_page, edit_source_page, sources_pa
__all__ = [ __all__ = [
"create_source_page", "create_source_page",
"dashboard_page", "dashboard_page",
"dashboard_page_with_data",
"edit_source_page", "edit_source_page",
"execution_logs_page", "execution_logs_page",
"runs_page", "runs_page",

View file

@ -1,5 +1,7 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Mapping
import htpy as h import htpy as h
from htpy import Node, Renderable from htpy import Node, Renderable
@ -12,36 +14,43 @@ from repub.components import (
stat_card, stat_card,
status_badge, status_badge,
) )
from repub.pages.runs import RUNNING_EXECUTIONS
def _running_execution_row(execution: dict[str, str | bool]) -> tuple[Node, ...]: def _text(values: Mapping[str, object], key: str) -> str:
status_tone = "running" if execution["is_running"] else "done" return str(values[key])
def _running_execution_row(execution: Mapping[str, object]) -> tuple[Node, ...]:
status_tone = "running" if _text(execution, "status") != "Succeeded" else "done"
return ( return (
h.div[ h.div[
h.div(class_="font-semibold text-slate-950")[execution["source"]], h.div(class_="font-semibold text-slate-950")[_text(execution, "source")],
h.p(class_="mt-0.5 font-mono text-[11px] text-slate-500")[ h.p(class_="mt-0.5 font-mono text-[11px] text-slate-500")[
execution["slug"] _text(execution, "slug")
], ],
], ],
h.div[ h.div[
h.p(class_="font-medium text-slate-900")[f"#{execution['execution_id']}"], h.p(class_="font-medium text-slate-900")[
f"#{_text(execution, 'execution_id')}"
],
h.p(class_="mt-0.5 text-[11px] text-slate-500")[ h.p(class_="mt-0.5 text-[11px] text-slate-500")[
f"job {execution['job_id']}" f"job {_text(execution, 'job_id')}"
], ],
], ],
h.div[ h.div[
h.p(class_="font-medium text-slate-900")[execution["started_at"]], h.p(class_="font-medium text-slate-900")[_text(execution, "started_at")],
h.p(class_="mt-0.5 text-[11px] text-slate-500")[execution["runtime"]], h.p(class_="mt-0.5 text-[11px] text-slate-500")[
_text(execution, "runtime")
], ],
status_badge(label=str(execution["status"]), tone=status_tone), ],
status_badge(label=_text(execution, "status"), tone=status_tone),
h.div(class_="min-w-56 whitespace-normal")[ h.div(class_="min-w-56 whitespace-normal")[
h.p(class_="font-medium text-slate-900")[execution["stats"]], h.p(class_="font-medium text-slate-900")[_text(execution, "stats")],
h.p(class_="mt-0.5 text-[11px] text-slate-500")[execution["worker"]], h.p(class_="mt-0.5 text-[11px] text-slate-500")[_text(execution, "worker")],
], ],
h.div(class_="flex flex-nowrap items-center gap-3")[ h.div(class_="flex flex-nowrap items-center gap-3")[
inline_link( inline_link(
href=str(execution["log_href"]), href=_text(execution, "log_href"),
label="View log", label="View log",
tone="amber", tone="amber",
), ),
@ -71,7 +80,13 @@ def dashboard_header() -> Renderable:
] ]
def operational_snapshot() -> Renderable: def operational_snapshot(*, snapshot: Mapping[str, str] | None = None) -> Renderable:
values = snapshot or {
"running_now": "0",
"upcoming_today": "0",
"failures_24h": "0",
"artifact_footprint": "0 B",
}
return h.section[ return h.section[
h.div(class_="mb-3 flex items-end justify-between gap-4")[ h.div(class_="mb-3 flex items-end justify-between gap-4")[
h.div[ h.div[
@ -82,37 +97,39 @@ def operational_snapshot() -> Renderable:
"Operational snapshot" "Operational snapshot"
], ],
], ],
h.p(class_="text-xs text-slate-500")[ h.p(class_="text-xs text-slate-500")["Live values from the database"],
"Static fixture data shaped around the intended operator dashboard"
],
], ],
h.dl(class_="grid gap-3 md:grid-cols-2 xl:grid-cols-4")[ h.dl(class_="grid gap-3 md:grid-cols-2 xl:grid-cols-4")[
stat_card( stat_card(
label="Running now", label="Running now",
value="3", value=values["running_now"],
detail="Two feed workers and one Pangea worker are active.", detail="Currently active job executions.",
), ),
stat_card( stat_card(
label="Upcoming today", label="Upcoming today",
value="11", value=values["upcoming_today"],
detail="Next scheduled job fires in 13 minutes.", detail="Enabled jobs that are ready for their next run.",
), ),
stat_card( stat_card(
label="Failures in 24h", label="Failures in 24h",
value="2", value=values["failures_24h"],
detail="One network timeout and one source parsing error.", detail="Recent failed executions recorded by the scheduler.",
), ),
stat_card( stat_card(
label="Output footprint", label="Artifact footprint",
value="18.4 GB", value=values["artifact_footprint"],
detail="Mirrored feeds, media, logs, and execution stats.", detail="Current log and stats artifact size under out/logs.",
), ),
], ],
] ]
def running_executions_table() -> Renderable: def running_executions_table(
rows = tuple(_running_execution_row(execution) for execution in RUNNING_EXECUTIONS) *, running_executions: tuple[Mapping[str, object], ...] | None = None
) -> Renderable:
rows = tuple(
_running_execution_row(execution) for execution in (running_executions or ())
)
headers = ("Source", "Execution", "Started", "Status", "Stats", "Actions") headers = ("Source", "Execution", "Started", "Status", "Stats", "Actions")
def render_row(row: tuple[Node, ...]) -> Renderable: def render_row(row: tuple[Node, ...]) -> Renderable:
@ -172,6 +189,14 @@ def running_executions_table() -> Renderable:
def dashboard_page() -> Renderable: def dashboard_page() -> Renderable:
return dashboard_page_with_data()
def dashboard_page_with_data(
*,
snapshot: Mapping[str, str] | None = None,
running_executions: tuple[Mapping[str, object], ...] | None = None,
) -> Renderable:
return h.main( return h.main(
id="morph", id="morph",
class_="min-h-screen lg:grid lg:grid-cols-[18rem_minmax(0,1fr)]", class_="min-h-screen lg:grid lg:grid-cols-[18rem_minmax(0,1fr)]",
@ -180,8 +205,8 @@ def dashboard_page() -> Renderable:
h.div(class_="px-4 py-4 sm:px-5 lg:px-6 lg:py-5")[ h.div(class_="px-4 py-4 sm:px-5 lg:px-6 lg:py-5")[
h.div(class_="mx-auto max-w-7xl space-y-5")[ h.div(class_="mx-auto max-w-7xl space-y-5")[
dashboard_header(), dashboard_header(),
operational_snapshot(), operational_snapshot(snapshot=snapshot),
running_executions_table(), running_executions_table(running_executions=running_executions),
] ]
], ],
] ]

View file

@ -1,10 +1,11 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Mapping
import htpy as h import htpy as h
from htpy import Node, Renderable from htpy import Node, Renderable
from repub.components import ( from repub.components import (
inline_button,
inline_link, inline_link,
muted_action_link, muted_action_link,
page_shell, page_shell,
@ -13,254 +14,174 @@ from repub.components import (
table_section, table_section,
) )
RUNNING_EXECUTIONS: tuple[dict[str, str | bool], ...] = (
{
"source": "Pangea mobile articles",
"slug": "pangea-mobile",
"job_id": "7",
"execution_id": "104",
"started_at": "Today, 11:42 UTC",
"runtime": "running for 8m",
"status": "Running",
"stats": "26 requests • 7 items • 2.6 MB",
"worker": "graceful stop after current item",
"log_href": "/job/7/execution/104/logs",
"is_running": True,
},
{
"source": "Guardian feed mirror",
"slug": "guardian-feed",
"job_id": "3",
"execution_id": "103",
"started_at": "Today, 11:33 UTC",
"runtime": "running for 17m",
"status": "Running",
"stats": "91 requests • 13 items • 5.1 MB",
"worker": "streaming stats from worker jsonl",
"log_href": "/job/3/execution/103/logs",
"is_running": True,
},
{
"source": "Podcast enclosure mirror",
"slug": "podcast-audio",
"job_id": "11",
"execution_id": "105",
"started_at": "Today, 11:48 UTC",
"runtime": "running for 2m",
"status": "Stopping",
"stats": "4 requests • 0 items • 0.8 MB",
"worker": "waiting for 15s graceful shutdown window",
"log_href": "/job/11/execution/105/logs",
"is_running": True,
},
)
UPCOMING_JOBS: tuple[dict[str, str | bool], ...] = ( def _action_button(
{ *,
"source": "Podcast enclosure mirror", label: str,
"slug": "podcast-audio", tone: str = "default",
"job_id": "11", disabled: bool = False,
"next_run": "Today, 12:15 local", post_path: str | None = None,
"schedule": "15 */4 * * 1-6", ) -> Renderable:
"enabled_label": "Enabled", classes = {
"enabled_tone": "scheduled", "default": "bg-stone-100 text-slate-700 hover:bg-stone-200",
"run_disabled": True, "danger": "bg-rose-50 text-rose-700 hover:bg-rose-100",
"run_reason": "Already running", }
"toggle_label": "Disable", class_name = (
}, "cursor-not-allowed bg-slate-100 text-slate-400" if disabled else classes[tone]
{ )
"source": "Weekly digest feed", attributes: dict[str, str] = {}
"slug": "weekly-digest", if post_path is not None and not disabled:
"job_id": "18", attributes["data-on:pointerdown"] = f"@post('{post_path}')"
"next_run": "Tomorrow, 08:00 local", return h.button(
"schedule": "0 8 * * 1", attributes,
"enabled_label": "Disabled", type="button",
"enabled_tone": "idle", disabled=disabled,
"run_disabled": False, class_=(
"run_reason": "Ready", "inline-flex items-center whitespace-nowrap rounded-full px-3 py-1.5 "
"toggle_label": "Enable", f"text-sm font-semibold transition {class_name}"
}, ),
{ )[label]
"source": "Kenya health desk",
"slug": "kenya-health",
"job_id": "22",
"next_run": "Today, 13:00 local",
"schedule": "0 */6 * * *",
"enabled_label": "Enabled",
"enabled_tone": "scheduled",
"run_disabled": False,
"run_reason": "Ready",
"toggle_label": "Disable",
},
)
COMPLETED_EXECUTIONS: tuple[dict[str, str], ...] = (
{
"source": "Guardian feed mirror",
"slug": "guardian-feed",
"job_id": "3",
"execution_id": "102",
"ended_at": "Today, 10:57 UTC",
"status": "Succeeded",
"status_tone": "done",
"stats": "204 requests • 28 items • 9.4 MB",
"summary": "Finished on schedule",
"log_href": "/job/3/execution/102/logs",
},
{
"source": "Podcast enclosure mirror",
"slug": "podcast-audio",
"job_id": "11",
"execution_id": "101",
"ended_at": "Today, 09:12 UTC",
"status": "Failed",
"status_tone": "failed",
"stats": "timeout after 3 retries",
"summary": "Worker exited with failure",
"log_href": "/job/11/execution/101/logs",
},
{
"source": "Pangea mobile articles",
"slug": "pangea-mobile",
"job_id": "7",
"execution_id": "100",
"ended_at": "Today, 05:48 UTC",
"status": "Canceled",
"status_tone": "idle",
"stats": "stopped by operator after 11m",
"summary": "Graceful stop completed",
"log_href": "/job/7/execution/100/logs",
},
)
def _running_row(execution: dict[str, str | bool]) -> tuple[Node, ...]: def _text(values: Mapping[str, object], key: str) -> str:
return str(values[key])
def _maybe_text(values: Mapping[str, object], key: str) -> str | None:
value = values.get(key)
if value in {None, ""}:
return None
return str(value)
def _flag(values: Mapping[str, object], key: str) -> bool:
return bool(values[key])
def _running_row(execution: Mapping[str, object]) -> tuple[Node, ...]:
return ( return (
h.div[ h.div[
h.div(class_="font-semibold text-slate-950")[execution["source"]], h.div(class_="font-semibold text-slate-950")[_text(execution, "source")],
h.p(class_="mt-1 font-mono text-xs text-slate-500")[execution["slug"]], h.p(class_="mt-1 font-mono text-xs text-slate-500")[
_text(execution, "slug")
],
], ],
h.div[ h.div[
h.p(class_="font-medium text-slate-900")[f"#{execution['execution_id']}"], h.p(class_="font-medium text-slate-900")[
h.p(class_="mt-1 text-xs text-slate-500")[f"job {execution['job_id']}"], f"#{_text(execution, 'execution_id')}"
],
h.p(class_="mt-1 text-xs text-slate-500")[
f"job {_text(execution, 'job_id')}"
],
], ],
h.div[ h.div[
h.p(class_="font-medium text-slate-900")[execution["started_at"]], h.p(class_="font-medium text-slate-900")[_text(execution, "started_at")],
h.p(class_="mt-1 text-xs text-slate-500")[execution["runtime"]], h.p(class_="mt-1 text-xs text-slate-500")[_text(execution, "runtime")],
], ],
status_badge(label=str(execution["status"]), tone="running"), status_badge(label=_text(execution, "status"), tone="running"),
h.div(class_="min-w-56 whitespace-normal")[ h.div(class_="min-w-56 whitespace-normal")[
h.p(class_="font-medium text-slate-900")[execution["stats"]], h.p(class_="font-medium text-slate-900")[_text(execution, "stats")],
h.p(class_="mt-1 text-xs text-slate-500")[execution["worker"]], h.p(class_="mt-1 text-xs text-slate-500")[_text(execution, "worker")],
], ],
h.div(class_="flex flex-nowrap items-center gap-3")[ h.div(class_="flex flex-nowrap items-center gap-3")[
inline_link( inline_link(
href=str(execution["log_href"]), href=_text(execution, "log_href"),
label="View log", label="View log",
tone="amber", tone="amber",
), ),
inline_button(label="Stop", tone="danger"), _action_button(
label="Stop",
tone="danger",
post_path=_maybe_text(execution, "cancel_post_path"),
),
], ],
) )
def _upcoming_row(job: dict[str, str | bool]) -> tuple[Node, ...]: def _upcoming_row(job: Mapping[str, object]) -> tuple[Node, ...]:
return ( return (
h.div[ h.div[
h.div(class_="font-semibold text-slate-950")[job["source"]], h.div(class_="font-semibold text-slate-950")[_text(job, "source")],
h.p(class_="mt-1 font-mono text-xs text-slate-500")[job["slug"]], h.p(class_="mt-1 font-mono text-xs text-slate-500")[_text(job, "slug")],
], ],
h.div[ h.div[
h.p(class_="font-medium text-slate-900")[job["next_run"]], h.p(class_="font-medium text-slate-900")[_text(job, "next_run")],
h.p(class_="mt-1 text-xs text-slate-500")[f"job {job['job_id']}"], h.p(class_="mt-1 text-xs text-slate-500")[f"job {_text(job, 'job_id')}"],
], ],
h.p(class_="font-mono text-xs text-slate-600")[job["schedule"]], h.p(class_="font-mono text-xs text-slate-600")[_text(job, "schedule")],
status_badge( status_badge(
label=str(job["enabled_label"]), label=_text(job, "enabled_label"),
tone=str(job["enabled_tone"]), tone=_text(job, "enabled_tone"),
), ),
h.p(class_="max-w-40 whitespace-normal text-sm text-slate-500")[ h.p(class_="max-w-40 whitespace-normal text-sm text-slate-500")[
job["run_reason"] _text(job, "run_reason")
], ],
h.div(class_="flex flex-nowrap items-center gap-2")[ h.div(class_="flex flex-nowrap items-center gap-2")[
inline_button(label="Run now", disabled=bool(job["run_disabled"])), _action_button(
inline_button(label=str(job["toggle_label"])), label="Run now",
inline_button(label="Delete", tone="danger"), disabled=_flag(job, "run_disabled"),
post_path=_maybe_text(job, "run_post_path"),
),
_action_button(
label=_text(job, "toggle_label"),
post_path=_maybe_text(job, "toggle_post_path"),
),
_action_button(
label="Delete",
tone="danger",
post_path=_maybe_text(job, "delete_post_path"),
),
], ],
) )
def _completed_row(execution: dict[str, str]) -> tuple[Node, ...]: def _completed_row(execution: Mapping[str, object]) -> tuple[Node, ...]:
return ( return (
h.div[ h.div[
h.div(class_="font-semibold text-slate-950")[execution["source"]], h.div(class_="font-semibold text-slate-950")[_text(execution, "source")],
h.p(class_="mt-1 font-mono text-xs text-slate-500")[execution["slug"]], h.p(class_="mt-1 font-mono text-xs text-slate-500")[
_text(execution, "slug")
],
], ],
h.div[ h.div[
h.p(class_="font-medium text-slate-900")[f"#{execution['execution_id']}"], h.p(class_="font-medium text-slate-900")[
h.p(class_="mt-1 text-xs text-slate-500")[f"job {execution['job_id']}"], f"#{_text(execution, 'execution_id')}"
],
h.p(class_="mt-1 text-xs text-slate-500")[
f"job {_text(execution, 'job_id')}"
],
], ],
h.div[ h.div[
h.p(class_="font-medium text-slate-900")[execution["ended_at"]], h.p(class_="font-medium text-slate-900")[_text(execution, "ended_at")],
h.p(class_="mt-1 text-xs text-slate-500")[execution["summary"]], h.p(class_="mt-1 text-xs text-slate-500")[_text(execution, "summary")],
], ],
status_badge( status_badge(
label=execution["status"], label=_text(execution, "status"),
tone=execution["status_tone"], tone=_text(execution, "status_tone"),
), ),
h.div(class_="min-w-48 whitespace-normal")[ h.div(class_="min-w-48 whitespace-normal")[
h.p(class_="font-medium text-slate-900")[execution["stats"]] h.p(class_="font-medium text-slate-900")[_text(execution, "stats")]
], ],
inline_link( inline_link(
href=execution["log_href"], href=_text(execution, "log_href"),
label="View log", label="View log",
tone="amber", tone="amber",
), ),
) )
def delete_confirmation_preview() -> Renderable: def runs_page(
return section_card( *,
content=( running_executions: tuple[Mapping[str, object], ...] | None = None,
h.div(class_="flex items-center justify-between gap-4")[ upcoming_jobs: tuple[Mapping[str, object], ...] | None = None,
h.div[ completed_executions: tuple[Mapping[str, object], ...] | None = None,
h.p( ) -> Renderable:
class_="text-xs font-semibold uppercase tracking-[0.22em] text-amber-600" running_items = running_executions or ()
)["Modal preview"], upcoming_items = upcoming_jobs or ()
h.h2(class_="mt-2 text-xl font-semibold text-slate-950")[ completed_items = completed_executions or ()
"Delete confirmation" running_rows = tuple(_running_row(execution) for execution in running_items)
], upcoming_rows = tuple(_upcoming_row(job) for job in upcoming_items)
h.p(class_="mt-2 max-w-2xl text-sm text-slate-600")[ completed_rows = tuple(_completed_row(execution) for execution in completed_items)
"Upcoming jobs use a confirmation modal before deleting a job. This is the intended open state, placed inline for the static UI pass."
],
],
status_badge(label="Preview", tone="scheduled"),
],
h.div(class_="mt-3 rounded-[1.5rem] bg-stone-50 p-5")[
h.p(
class_="text-sm font-semibold uppercase tracking-[0.18em] text-slate-500"
)["Delete job"],
h.h3(class_="mt-2 text-lg font-semibold text-slate-950")[
"Delete Weekly digest feed?"
],
h.p(class_="mt-3 max-w-2xl text-sm leading-6 text-slate-600")[
"This removes the source-linked job record and its schedule. Existing execution history and log files remain available for inspection."
],
h.div(class_="mt-6 flex flex-wrap gap-3")[
inline_button(label="Cancel"),
inline_button(label="Delete job", tone="danger"),
],
],
)
)
def runs_page() -> Renderable:
running_rows = tuple(_running_row(execution) for execution in RUNNING_EXECUTIONS)
upcoming_rows = tuple(_upcoming_row(job) for job in UPCOMING_JOBS)
completed_rows = tuple(
_completed_row(execution) for execution in COMPLETED_EXECUTIONS
)
return page_shell( return page_shell(
current_path="/runs", current_path="/runs",
@ -286,7 +207,7 @@ def runs_page() -> Renderable:
table_section( table_section(
eyebrow="Queue", eyebrow="Queue",
title="Upcoming jobs", title="Upcoming jobs",
subtitle="Scheduled work shows enable or disable state, run-now affordances, and delete controls. Run now is disabled while the job is already running.", subtitle="Scheduled work shows enable or disable state, run-now affordances, and destructive delete controls. Deleting removes the source-linked job and its execution history.",
headers=( headers=(
"Source", "Source",
"Next run", "Next run",
@ -311,17 +232,43 @@ def runs_page() -> Renderable:
), ),
rows=completed_rows, rows=completed_rows,
), ),
delete_confirmation_preview(),
), ),
) )
def execution_logs_page(*, job_id: int, execution_id: int) -> Renderable: def execution_logs_page(
*,
job_id: int,
execution_id: int,
log_view: Mapping[str, object] | None = None,
) -> Renderable:
if log_view is None:
log_view = {
"title": f"Job {job_id} / execution {execution_id}",
"description": "Plain text log view routed through the app.",
"status_label": "Unavailable",
"status_tone": "failed",
"log_text": "",
"error_message": "Execution log is only available from persisted job runs.",
}
error_message = _maybe_text(log_view, "error_message")
error_notice = (
h.div(
class_="mt-3 rounded-2xl bg-rose-50 px-4 py-3 text-sm font-medium text-rose-800"
)[
h.p["Execution log unavailable"],
h.p(class_="mt-1 font-normal")[error_message],
]
if error_message is not None
else None
)
return page_shell( return page_shell(
current_path=f"/job/{job_id}/execution/{execution_id}/logs", current_path=f"/job/{job_id}/execution/{execution_id}/logs",
eyebrow="Execution log", eyebrow="Execution log",
title=f"Job {job_id} / execution {execution_id}", title=_text(log_view, "title"),
description="Plain text log view routed through the app. The final version will stream appended lines while the worker is still active.", description=_text(log_view, "description"),
actions=muted_action_link(href="/runs", label="Back to runs"), actions=muted_action_link(href="/runs", label="Back to runs"),
content=( content=(
section_card( section_card(
@ -335,25 +282,18 @@ def execution_logs_page(*, job_id: int, execution_id: int) -> Renderable:
f"/job/{job_id}/execution/{execution_id}/logs" f"/job/{job_id}/execution/{execution_id}/logs"
], ],
h.p(class_="mt-2 text-sm text-slate-600")[ h.p(class_="mt-2 text-sm text-slate-600")[
"Streaming text log view. No arbitrary file paths are exposed in the UI." _text(log_view, "description")
], ],
], ],
status_badge(label="Streaming", tone="running"), status_badge(
label=_text(log_view, "status_label"),
tone=_text(log_view, "status_tone"),
),
], ],
error_notice,
h.pre( h.pre(
class_="mt-3 overflow-x-auto rounded-[1.5rem] bg-slate-950 p-5 text-xs leading-6 text-emerald-200" class_="mt-3 overflow-x-auto rounded-[1.5rem] bg-slate-950 p-5 text-xs leading-6 text-emerald-200"
)[ )[_text(log_view, "log_text")],
"\n".join(
(
"11:42:01 scheduler: run_now requested for job 7",
"11:42:02 worker[7]: starting pangea-mobile",
"11:42:08 stats: requests=18 items=4 bytes=1.8MB",
"11:42:11 stats: requests=26 items=7 bytes=2.6MB",
"11:42:17 stats: requests=31 items=9 bytes=3.0MB",
"11:42:24 worker[7]: waiting for more log lines ...",
)
)
],
) )
), ),
), ),

View file

@ -52,6 +52,7 @@ CREATE TABLE IF NOT EXISTS job_execution (
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
started_at TEXT, started_at TEXT,
ended_at TEXT, ended_at TEXT,
stop_requested_at TEXT,
running_status INTEGER NOT NULL DEFAULT 0 CHECK (running_status BETWEEN 0 AND 4), running_status INTEGER NOT NULL DEFAULT 0 CHECK (running_status BETWEEN 0 AND 4),
requests_count INTEGER NOT NULL DEFAULT 0, requests_count INTEGER NOT NULL DEFAULT 0,
items_count INTEGER NOT NULL DEFAULT 0, items_count INTEGER NOT NULL DEFAULT 0,

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio import asyncio
import hashlib import hashlib
from collections.abc import AsyncGenerator, Awaitable, Callable from collections.abc import AsyncGenerator, Awaitable, Callable
from pathlib import Path
from typing import TypedDict, cast from typing import TypedDict, cast
from urllib.parse import urlparse from urllib.parse import urlparse
@ -15,8 +16,16 @@ from peewee import IntegrityError
from quart import Quart, Response, request, url_for from quart import Quart, Response, request, url_for
from repub.datastar import RefreshBroker, render_stream from repub.datastar import RefreshBroker, render_stream
from repub.jobs import (
JobRuntime,
load_dashboard_view,
load_execution_log_view,
load_runs_view,
)
from repub.model import ( from repub.model import (
Job,
create_source, create_source,
delete_job_source,
initialize_database, initialize_database,
load_source_form, load_source_form,
load_sources, load_sources,
@ -25,7 +34,7 @@ from repub.model import (
) )
from repub.pages import ( from repub.pages import (
create_source_page, create_source_page,
dashboard_page, dashboard_page_with_data,
edit_source_page, edit_source_page,
execution_logs_page, execution_logs_page,
runs_page, runs_page,
@ -35,6 +44,8 @@ from repub.pages import (
from repub.pages.sources import PANGEA_CONTENT_FORMATS, PANGEA_CONTENT_TYPES from repub.pages.sources import PANGEA_CONTENT_FORMATS, PANGEA_CONTENT_TYPES
REFRESH_BROKER_KEY = "repub.refresh_broker" REFRESH_BROKER_KEY = "repub.refresh_broker"
JOB_RUNTIME_KEY = "repub.job_runtime"
DEFAULT_LOG_DIR = Path("out/logs")
RenderFunction = Callable[[], Awaitable[Renderable]] RenderFunction = Callable[[], Awaitable[Renderable]]
@ -83,7 +94,12 @@ def _render_shim_page(*, stylesheet_href: str, datastar_src: str) -> tuple[str,
def create_app() -> Quart: def create_app() -> Quart:
app = Quart(__name__) app = Quart(__name__)
app.config["REPUB_DB_PATH"] = str(initialize_database()) app.config["REPUB_DB_PATH"] = str(initialize_database())
app.config.setdefault("REPUB_LOG_DIR", DEFAULT_LOG_DIR)
app.config.setdefault("REPUB_JOB_WORKER_DURATION_SECONDS", 20.0)
app.config.setdefault("REPUB_JOB_WORKER_STATS_INTERVAL_SECONDS", 1.0)
app.config.setdefault("REPUB_JOB_WORKER_FAILURE_PROBABILITY", 0.3)
app.extensions[REFRESH_BROKER_KEY] = RefreshBroker() app.extensions[REFRESH_BROKER_KEY] = RefreshBroker()
app.extensions[JOB_RUNTIME_KEY] = None
@app.get("/") @app.get("/")
@app.get("/sources") @app.get("/sources")
@ -112,7 +128,7 @@ def create_app() -> Quart:
@app.post("/") @app.post("/")
async def dashboard_patch() -> DatastarResponse: async def dashboard_patch() -> DatastarResponse:
return _page_patch_response(app, render_dashboard) return _page_patch_response(app, lambda: render_dashboard(app))
@app.post("/sources") @app.post("/sources")
async def sources_patch() -> DatastarResponse: async def sources_patch() -> DatastarResponse:
@ -147,6 +163,7 @@ def create_app() -> Quart:
{"_formError": "Slug must be unique.", "_formSuccess": ""} {"_formError": "Slug must be unique.", "_formSuccess": ""}
) )
) )
get_job_runtime(app).sync_jobs()
trigger_refresh(app) trigger_refresh(app)
return DatastarResponse(SSE.redirect("/sources")) return DatastarResponse(SSE.redirect("/sources"))
@ -171,20 +188,58 @@ def create_app() -> Quart:
{"_formError": "Source does not exist.", "_formSuccess": ""} {"_formError": "Source does not exist.", "_formSuccess": ""}
) )
) )
get_job_runtime(app).sync_jobs()
trigger_refresh(app) trigger_refresh(app)
return DatastarResponse(SSE.redirect("/sources")) return DatastarResponse(SSE.redirect("/sources"))
@app.post("/runs") @app.post("/runs")
async def runs_patch() -> DatastarResponse: async def runs_patch() -> DatastarResponse:
return _page_patch_response(app, render_runs) return _page_patch_response(app, lambda: render_runs(app))
@app.post("/actions/jobs/<int:job_id>/run-now")
async def run_job_now_action(job_id: int) -> Response:
get_job_runtime(app).run_job_now(job_id, reason="manual")
trigger_refresh(app)
return Response(status=204)
@app.post("/actions/jobs/<int:job_id>/toggle-enabled")
async def toggle_job_enabled_action(job_id: int) -> Response:
job = Job.get_or_none(id=job_id)
if job is not None:
get_job_runtime(app).set_job_enabled(job_id, enabled=not job.enabled)
trigger_refresh(app)
return Response(status=204)
@app.post("/actions/jobs/<int:job_id>/delete")
async def delete_job_action(job_id: int) -> Response:
delete_job_source(job_id)
get_job_runtime(app).sync_jobs()
trigger_refresh(app)
return Response(status=204)
@app.post("/actions/executions/<int:execution_id>/cancel")
async def cancel_execution_action(execution_id: int) -> Response:
get_job_runtime(app).request_execution_cancel(execution_id)
trigger_refresh(app)
return Response(status=204)
@app.post("/job/<int:job_id>/execution/<int:execution_id>/logs") @app.post("/job/<int:job_id>/execution/<int:execution_id>/logs")
async def logs_patch(job_id: int, execution_id: int) -> DatastarResponse: async def logs_patch(job_id: int, execution_id: int) -> DatastarResponse:
async def render() -> Renderable: async def render() -> Renderable:
return await render_execution_logs(job_id=job_id, execution_id=execution_id) return await render_execution_logs(
app, job_id=job_id, execution_id=execution_id
)
return _page_patch_response(app, render) return _page_patch_response(app, render)
@app.before_serving
async def start_runtime() -> None:
get_job_runtime(app).start()
@app.after_serving
async def stop_runtime() -> None:
get_job_runtime(app).shutdown()
return app return app
@ -192,12 +247,39 @@ def get_refresh_broker(app: Quart) -> RefreshBroker:
return cast(RefreshBroker, app.extensions[REFRESH_BROKER_KEY]) return cast(RefreshBroker, app.extensions[REFRESH_BROKER_KEY])
def get_job_runtime(app: Quart) -> JobRuntime:
runtime = cast(JobRuntime | None, app.extensions.get(JOB_RUNTIME_KEY))
if runtime is None:
runtime = JobRuntime(
log_dir=app.config["REPUB_LOG_DIR"],
worker_duration_seconds=float(
app.config["REPUB_JOB_WORKER_DURATION_SECONDS"]
),
worker_stats_interval_seconds=float(
app.config["REPUB_JOB_WORKER_STATS_INTERVAL_SECONDS"]
),
worker_failure_probability=float(
app.config["REPUB_JOB_WORKER_FAILURE_PROBABILITY"]
),
refresh_callback=lambda: trigger_refresh(app),
)
app.extensions[JOB_RUNTIME_KEY] = runtime
return runtime
def trigger_refresh(app: Quart, event: object = "refresh-event") -> None: def trigger_refresh(app: Quart, event: object = "refresh-event") -> None:
get_refresh_broker(app).publish(event) get_refresh_broker(app).publish(event)
async def render_dashboard() -> Renderable: async def render_dashboard(app: Quart | None = None) -> Renderable:
return dashboard_page() if app is None:
return dashboard_page_with_data()
view = load_dashboard_view(log_dir=app.config["REPUB_LOG_DIR"])
return dashboard_page_with_data(
snapshot=cast(dict[str, str], view["snapshot"]),
running_executions=cast(tuple[dict[str, object], ...], view["running"]),
)
async def render_sources(app: Quart | None = None) -> Renderable: async def render_sources(app: Quart | None = None) -> Renderable:
@ -221,13 +303,42 @@ async def render_edit_source(slug: str) -> Renderable:
) )
async def render_runs() -> Renderable: async def render_runs(app: Quart | None = None) -> Renderable:
if app is None:
return runs_page() return runs_page()
view = load_runs_view(log_dir=app.config["REPUB_LOG_DIR"])
return runs_page(
running_executions=cast(tuple[dict[str, object], ...], view["running"]),
upcoming_jobs=cast(tuple[dict[str, object], ...], view["upcoming"]),
completed_executions=cast(tuple[dict[str, object], ...], view["completed"]),
)
async def render_execution_logs(*, job_id: int, execution_id: int) -> Renderable:
async def render_execution_logs(
app: Quart | None = None, *, job_id: int, execution_id: int
) -> Renderable:
if app is None:
return execution_logs_page(job_id=job_id, execution_id=execution_id) return execution_logs_page(job_id=job_id, execution_id=execution_id)
log_view = load_execution_log_view(
log_dir=app.config["REPUB_LOG_DIR"],
job_id=job_id,
execution_id=execution_id,
)
return execution_logs_page(
job_id=job_id,
execution_id=execution_id,
log_view={
"title": log_view.title,
"description": log_view.description,
"status_label": log_view.status_label,
"status_tone": log_view.status_tone,
"log_text": log_view.log_text,
"error_message": log_view.error_message,
},
)
def _page_patch_response(app: Quart, render: RenderFunction) -> DatastarResponse: def _page_patch_response(app: Quart, render: RenderFunction) -> DatastarResponse:
queue = get_refresh_broker(app).subscribe() queue = get_refresh_broker(app).subscribe()

View file

@ -0,0 +1,346 @@
from __future__ import annotations
import asyncio
import json
import time
from pathlib import Path
from repub.jobs import JobArtifacts, JobRuntime
from repub.model import (
Job,
JobExecution,
JobExecutionStatus,
Source,
create_source,
initialize_database,
)
from repub.web import create_app, get_job_runtime, render_execution_logs, render_runs
def test_job_runtime_syncs_enabled_jobs_into_apscheduler(tmp_path: Path) -> None:
initialize_database(tmp_path / "scheduler.db")
enabled_source = create_source(
name="Enabled source",
slug="enabled-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/enabled.xml",
)
disabled_source = create_source(
name="Disabled source",
slug="disabled-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="15",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/disabled.xml",
)
enabled_job = Job.get(Job.source == enabled_source)
disabled_job = Job.get(Job.source == disabled_source)
runtime = JobRuntime(
log_dir=tmp_path / "out" / "logs",
worker_duration_seconds=0.4,
worker_stats_interval_seconds=0.05,
worker_failure_probability=0.0,
)
try:
runtime.start()
runtime.sync_jobs()
scheduled_ids = {job.id for job in runtime.scheduler.get_jobs()}
assert f"job-{enabled_job.id}" in scheduled_ids
assert f"job-{disabled_job.id}" not in scheduled_ids
enabled_job.enabled = False
enabled_job.save()
runtime.sync_jobs()
scheduled_ids = {job.id for job in runtime.scheduler.get_jobs()}
assert f"job-{enabled_job.id}" not in scheduled_ids
finally:
runtime.shutdown()
def test_job_runtime_run_now_writes_log_and_stats_and_marks_success(
tmp_path: Path,
) -> None:
initialize_database(tmp_path / "run-now.db")
source = create_source(
name="Manual source",
slug="manual-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/manual.xml",
)
job = Job.get(Job.source == source)
runtime = JobRuntime(
log_dir=tmp_path / "out" / "logs",
worker_duration_seconds=0.35,
worker_stats_interval_seconds=0.05,
worker_failure_probability=0.0,
)
try:
runtime.start()
execution_id = runtime.run_job_now(job.id, reason="manual")
assert execution_id is not None
execution = _wait_for_terminal_execution(execution_id)
artifacts = JobArtifacts.for_execution(
log_dir=tmp_path / "out" / "logs",
job_id=job.id,
execution_id=execution_id,
)
assert execution.running_status == JobExecutionStatus.SUCCEEDED
assert execution.started_at is not None
assert execution.ended_at is not None
assert execution.requests_count > 0
assert execution.items_count > 0
assert execution.bytes_count > 0
assert artifacts.log_path.exists()
assert artifacts.stats_path.exists()
assert "starting simulated crawl" in artifacts.log_path.read_text(
encoding="utf-8"
)
stats_lines = [
json.loads(line)
for line in artifacts.stats_path.read_text(encoding="utf-8").splitlines()
]
assert len(stats_lines) >= 2
assert stats_lines[-1]["requests_count"] == execution.requests_count
finally:
runtime.shutdown()
def test_job_runtime_cancel_marks_execution_canceled(tmp_path: Path) -> None:
initialize_database(tmp_path / "cancel.db")
source = create_source(
name="Cancelable source",
slug="cancelable-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/cancelable.xml",
)
job = Job.get(Job.source == source)
runtime = JobRuntime(
log_dir=tmp_path / "out" / "logs",
worker_duration_seconds=2.0,
worker_stats_interval_seconds=0.1,
worker_failure_probability=0.0,
)
try:
runtime.start()
execution_id = runtime.run_job_now(job.id, reason="manual")
assert execution_id is not None
_wait_for_running_execution(execution_id)
runtime.request_execution_cancel(execution_id)
execution = _wait_for_terminal_execution(execution_id)
artifacts = JobArtifacts.for_execution(
log_dir=tmp_path / "out" / "logs",
job_id=job.id,
execution_id=execution_id,
)
assert execution.running_status == JobExecutionStatus.CANCELED
assert execution.ended_at is not None
assert execution.stop_requested_at is not None
assert "graceful stop requested" in artifacts.log_path.read_text(
encoding="utf-8"
)
finally:
runtime.shutdown()
def test_render_runs_uses_database_backed_jobs_and_executions(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "runs-page.db"
log_dir = tmp_path / "out" / "logs"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
app = create_app()
app.config["REPUB_LOG_DIR"] = log_dir
app.config["REPUB_JOB_WORKER_DURATION_SECONDS"] = 0.35
app.config["REPUB_JOB_WORKER_STATS_INTERVAL_SECONDS"] = 0.05
app.config["REPUB_JOB_WORKER_FAILURE_PROBABILITY"] = 0.0
source = create_source(
name="Runs page source",
slug="runs-page-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/runs-page.xml",
)
job = Job.get(Job.source == source)
runtime = get_job_runtime(app)
runtime.start()
try:
execution_id = runtime.run_job_now(job.id, reason="manual")
assert execution_id is not None
execution = _wait_for_terminal_execution(execution_id)
async def run() -> None:
body = str(await render_runs(app))
assert "runs-page-source" in body
assert "Running job executions" in body
assert "Upcoming jobs" in body
assert "Completed job executions" in body
assert f"/job/{job.id}/execution/{execution.get_id()}/logs" in body
assert "Succeeded" in body
assert "Run now" in body
asyncio.run(run())
finally:
runtime.shutdown()
def test_render_execution_logs_handles_missing_execution_and_missing_log_file(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "log-errors.db"
log_dir = tmp_path / "out" / "logs"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
app = create_app()
app.config["REPUB_LOG_DIR"] = log_dir
source = create_source(
name="Log source",
slug="log-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/log-source.xml",
)
job = Job.get(Job.source == source)
execution = JobExecution.create(
job=job,
running_status=JobExecutionStatus.FAILED,
)
async def run() -> None:
missing_execution = str(
await render_execution_logs(app, job_id=job.id, execution_id=9999)
)
missing_log = str(
await render_execution_logs(app, job_id=job.id, execution_id=execution.id)
)
assert "Execution log unavailable" in missing_execution
assert "Execution does not exist." in missing_execution
assert "Execution log unavailable" in missing_log
assert "Log file has not been created yet." in missing_log
asyncio.run(run())
def test_delete_job_action_removes_source_job_and_execution_history(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "delete-job.db"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
async def run() -> None:
app = create_app()
client = app.test_client()
source = create_source(
name="Delete source",
slug="delete-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/30",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/delete.xml",
)
job = Job.get(Job.source == source)
execution = JobExecution.create(
job=job,
running_status=JobExecutionStatus.SUCCEEDED,
)
response = await client.post(f"/actions/jobs/{job.id}/delete")
assert response.status_code == 204
assert Source.get_or_none(Source.slug == "delete-source") is None
assert Job.get_or_none(id=job.id) is None
assert JobExecution.get_or_none(id=int(execution.get_id())) is None
asyncio.run(run())
def _wait_for_running_execution(
execution_id: int, *, timeout_seconds: float = 2.0
) -> JobExecution:
deadline = time.monotonic() + timeout_seconds
while time.monotonic() < deadline:
execution = JobExecution.get_by_id(execution_id)
if execution.running_status == JobExecutionStatus.RUNNING:
return execution
time.sleep(0.02)
raise AssertionError(f"execution {execution_id} never entered RUNNING state")
def _wait_for_terminal_execution(
execution_id: int, *, timeout_seconds: float = 4.0
) -> JobExecution:
deadline = time.monotonic() + timeout_seconds
while time.monotonic() < deadline:
execution = JobExecution.get_by_id(execution_id)
if execution.running_status in {
JobExecutionStatus.SUCCEEDED,
JobExecutionStatus.FAILED,
JobExecutionStatus.CANCELED,
}:
return execution
time.sleep(0.02)
raise AssertionError(f"execution {execution_id} did not finish in time")

View file

@ -5,7 +5,15 @@ from pathlib import Path
from typing import Any, cast from typing import Any, cast
from repub.datastar import RefreshBroker, render_sse_event, render_stream from repub.datastar import RefreshBroker, render_sse_event, render_stream
from repub.model import Job, Source, SourceFeed, SourcePangea, create_source from repub.model import (
Job,
JobExecution,
JobExecutionStatus,
Source,
SourceFeed,
SourcePangea,
create_source,
)
from repub.web import ( from repub.web import (
create_app, create_app,
get_refresh_broker, get_refresh_broker,
@ -141,15 +149,20 @@ def test_render_stream_yields_on_connect_and_refresh() -> None:
asyncio.run(run()) asyncio.run(run())
def test_render_dashboard_shows_dashboard_information_architecture() -> None: def test_render_dashboard_shows_dashboard_information_architecture(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "dashboard-render.db"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
async def run() -> None: async def run() -> None:
body = str(await render_dashboard()) app = create_app()
body = str(await render_dashboard(app))
assert "Operational snapshot" in body assert "Operational snapshot" in body
assert "Running executions" in body assert "Running executions" in body
assert 'href="/sources"' in body assert 'href="/sources"' in body
assert 'href="/runs"' in body assert 'href="/runs"' in body
assert "/job/7/execution/104/logs" in body
assert "Create source" in body assert "Create source" in body
asyncio.run(run()) asyncio.run(run())
@ -569,27 +582,97 @@ def test_create_source_action_validates_duplicate_slug_and_pangea_type(
asyncio.run(run()) asyncio.run(run())
def test_render_runs_shows_running_upcoming_and_completed_tables() -> None: def test_render_runs_shows_running_upcoming_and_completed_tables(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "runs-render.db"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
async def run() -> None: async def run() -> None:
body = str(await render_runs()) app = create_app()
source = create_source(
name="Runs render source",
slug="runs-render-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/30",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/runs.xml",
)
job = Job.get(Job.source == source)
execution = JobExecution.create(
job=job,
running_status=JobExecutionStatus.SUCCEEDED,
)
body = str(await render_runs(app))
assert "Running job executions" in body assert "Running job executions" in body
assert "Upcoming jobs" in body assert "Upcoming jobs" in body
assert "Completed job executions" in body assert "Completed job executions" in body
assert "Delete confirmation" in body assert "runs-render-source" in body
assert "/job/11/execution/101/logs" in body assert f"/job/{job.id}/execution/{execution.get_id()}/logs" in body
assert "Already running" in body assert "Already running" not in body
asyncio.run(run()) asyncio.run(run())
def test_render_execution_logs_uses_app_route() -> None: def test_render_execution_logs_uses_app_route(monkeypatch, tmp_path: Path) -> None:
async def run() -> None: db_path = tmp_path / "logs-render.db"
body = str(await render_execution_logs(job_id=7, execution_id=104)) monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
assert "Job 7 / execution 104" in body async def run() -> None:
assert "/job/7/execution/104/logs" in body log_dir = tmp_path / "out" / "logs"
assert "Streaming text log view" in body app = create_app()
app.config["REPUB_LOG_DIR"] = log_dir
source = create_source(
name="Log render source",
slug="log-render-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/30",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/logs.xml",
)
job = Job.get(Job.source == source)
execution = JobExecution.create(
job=job,
running_status=JobExecutionStatus.RUNNING,
)
log_path = log_dir / f"job-{job.id}-execution-{execution.get_id()}.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
log_path.write_text(
"\n".join(
(
"scheduler: run_now requested",
"worker: starting simulated crawl",
"worker: waiting for more log lines ...",
)
),
encoding="utf-8",
)
body = str(
await render_execution_logs(
app, job_id=job.id, execution_id=int(execution.get_id())
)
)
assert f"Job {job.id} / execution {execution.get_id()}" in body
assert f"/job/{job.id}/execution/{execution.get_id()}/logs" in body
assert "Route: /job/" in body
assert "waiting for more log lines" in body assert "waiting for more log lines" in body
asyncio.run(run()) asyncio.run(run())