Compare commits

..

3 commits

17 changed files with 1897 additions and 306 deletions

View file

@ -80,7 +80,7 @@ The only way for actions to affect the view returned by the render-fn running in
- Enter the dev environment with `nix develop` if you are not already inside it - Enter the dev environment with `nix develop` if you are not already inside it
- Sync Python dependencies with `uv sync --all-groups`. - Sync Python dependencies with `uv sync --all-groups`.
- Run the app with `uv run repub`. - Run the app with `uv run repub`.
- Generate CSS with `tailwindcss -i ./path/to/input.css -o ./path/to/output.css` and add `--watch` when you need live rebuilds. - Generate CSS with `tailwindcss -i ./repub/static/app.tailwind.css -o ./repub/static/app.css` and add `--watch` when you need live rebuilds.
```sh ```sh
uv sync --all-groups uv sync --all-groups

View file

@ -1,9 +1,45 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Mapping
import htpy as h import htpy as h
from htpy import Node, Renderable from htpy import Node, Renderable
def _button_classes(*, tone: str, emphasis: str, disabled: bool = False) -> str:
base = "inline-flex shrink-0 items-center justify-center rounded-full font-semibold transition "
emphasis_classes = {
"compact": "px-3 py-1.5 text-sm",
"regular": "px-4 py-2.5 text-sm",
"soft": "px-3.5 py-2 text-sm",
"icon": "size-8 p-0",
}
tone_classes = {
"amber": "bg-amber-400 text-slate-950 hover:bg-amber-300",
"header-secondary": (
"border border-white/15 bg-white/5 text-white hover:bg-white/10"
),
"muted": "border border-slate-200 bg-white text-slate-700 shadow-sm hover:bg-slate-50",
"default": "bg-stone-100 text-slate-700 hover:bg-stone-200",
"danger": "bg-rose-50 text-rose-700 hover:bg-rose-100",
"success": "bg-emerald-100 text-emerald-800 hover:bg-emerald-200",
"dark": "bg-slate-950 text-white hover:bg-slate-800",
}
disabled_classes = {
"default": "bg-slate-100 text-slate-400",
"danger": "bg-slate-100 text-slate-400",
"success": "bg-slate-100 text-slate-400",
"dark": "bg-slate-300 text-white/80",
}
interactive = "cursor-not-allowed" if disabled else "cursor-pointer"
colors = (
disabled_classes.get(tone, "bg-slate-100 text-slate-400")
if disabled
else tone_classes[tone]
)
return f"{base}{emphasis_classes[emphasis]} {interactive} {colors}"
def base_layout(*, page_title: str, stylesheet_href: str, content: Node) -> Renderable: def base_layout(*, page_title: str, stylesheet_href: str, content: Node) -> Renderable:
return h.html(lang="en", class_="h-full bg-slate-100")[ return h.html(lang="en", class_="h-full bg-slate-100")[
h.head[ h.head[
@ -43,15 +79,15 @@ def admin_sidebar(
*, current_path: str, source_count: int = 0, running_count: int = 0 *, current_path: str, source_count: int = 0, running_count: int = 0
) -> Renderable: ) -> Renderable:
return h.aside( return h.aside(
class_="relative overflow-hidden bg-slate-950 px-6 py-8 text-white lg:min-h-screen" class_="relative overflow-hidden bg-slate-950 px-4 py-6 text-white lg:min-h-screen"
)[ )[
h.div( h.div(
class_="absolute inset-x-0 top-0 h-40 bg-radial from-amber-400/25 via-amber-400/10 to-transparent" class_="absolute inset-x-0 top-0 h-40 bg-radial from-amber-400/25 via-amber-400/10 to-transparent"
), ),
h.div(class_="relative flex h-full flex-col")[ h.div(class_="relative flex h-full flex-col")[
h.div(class_="flex items-center gap-3")[ h.div(class_="flex items-center gap-2.5")[
h.div( h.div(
class_="flex size-11 items-center justify-center rounded-2xl bg-amber-400 text-base font-black text-slate-950" class_="flex size-10 items-center justify-center rounded-2xl bg-amber-400 text-base font-black text-slate-950"
)["AR"], )["AR"],
h.div[ h.div[
h.p( h.p(
@ -59,7 +95,7 @@ def admin_sidebar(
)["Republisher"], )["Republisher"],
], ],
], ],
h.nav(class_="mt-10 space-y-2")[ h.nav(class_="mt-8 space-y-2")[
nav_link( nav_link(
label="Dashboard", label="Dashboard",
href="/", href="/",
@ -86,7 +122,7 @@ def admin_sidebar(
badge="App", badge="App",
), ),
], ],
h.div(class_="mt-auto rounded-3xl bg-white/5 p-5 ring-1 ring-white/10")[ h.div(class_="mt-auto rounded-3xl bg-white/5 p-4 ring-1 ring-white/10")[
h.p(class_="text-sm font-semibold text-white")[ h.p(class_="text-sm font-semibold text-white")[
"AnyNews Republisher v2.0" "AnyNews Republisher v2.0"
], ],
@ -101,21 +137,21 @@ def admin_sidebar(
def header_action_link(*, href: str, label: str) -> Renderable: def header_action_link(*, href: str, label: str) -> Renderable:
return h.a( return h.a(
href=href, href=href,
class_="inline-flex items-center rounded-full bg-amber-400 px-4 py-2.5 text-sm font-semibold text-slate-950 shadow-sm transition hover:bg-amber-300", class_=_button_classes(tone="amber", emphasis="regular"),
)[label] )[label]
def header_secondary_link(*, href: str, label: str) -> Renderable: def header_secondary_link(*, href: str, label: str) -> Renderable:
return h.a( return h.a(
href=href, href=href,
class_="inline-flex items-center rounded-full border border-white/15 bg-white/5 px-4 py-2.5 text-sm font-semibold text-white transition hover:bg-white/10", class_=_button_classes(tone="header-secondary", emphasis="regular"),
)[label] )[label]
def muted_action_link(*, href: str, label: str) -> Renderable: def muted_action_link(*, href: str, label: str) -> Renderable:
return h.a( return h.a(
href=href, href=href,
class_="inline-flex items-center rounded-full border border-slate-200 bg-white px-3.5 py-2 text-sm font-semibold text-slate-700 shadow-sm transition hover:bg-slate-50", class_=_button_classes(tone="muted", emphasis="soft"),
)[label] )[label]
@ -131,22 +167,62 @@ def inline_link(*, href: str, label: str, tone: str = "default") -> Renderable:
)[label] )[label]
def action_button(
*,
label: Node,
tone: str = "default",
emphasis: str = "compact",
disabled: bool = False,
button_type: str = "button",
post_path: str | None = None,
title: str | None = None,
) -> Renderable:
attributes: dict[str, str] = {}
if post_path is not None and not disabled:
attributes["data-on:pointerdown"] = f"@post('{post_path}')"
if title is not None:
attributes["aria-label"] = title
return h.button(
attributes,
type=button_type,
disabled=disabled,
title=title,
class_=_button_classes(tone=tone, emphasis=emphasis, disabled=disabled),
)[label]
def inline_button( def inline_button(
*, label: str, tone: str = "default", disabled: bool = False *, label: str, tone: str = "default", disabled: bool = False
) -> Renderable: ) -> Renderable:
classes = { return action_button(
"default": "bg-stone-100 text-slate-700 hover:bg-stone-200", label=label,
"danger": "bg-rose-50 text-rose-700 hover:bg-rose-100", tone=tone,
"success": "bg-emerald-100 text-emerald-800 hover:bg-emerald-200", emphasis="compact",
} button_type="button",
class_name = (
"cursor-not-allowed bg-slate-100 text-slate-400" if disabled else classes[tone]
)
return h.button(
type="button",
disabled=disabled, disabled=disabled,
class_=f"inline-flex items-center whitespace-nowrap rounded-full px-3 py-1.5 text-sm font-semibold transition {class_name}", )
)[label]
def app_shell(
*,
current_path: str,
source_count: int = 0,
running_count: int = 0,
content: Node,
) -> Renderable:
return h.main(
id="morph",
class_="min-h-screen lg:grid lg:grid-cols-[14rem_minmax(0,1fr)]",
)[
admin_sidebar(
current_path=current_path,
source_count=source_count,
running_count=running_count,
),
h.div(class_="px-4 py-4 sm:px-4 lg:px-5 lg:py-4")[
h.div(class_="mx-auto max-w-7xl space-y-4")[content]
],
]
def page_shell( def page_shell(
@ -160,17 +236,11 @@ def page_shell(
running_count: int = 0, running_count: int = 0,
content: Node, content: Node,
) -> Renderable: ) -> Renderable:
return h.main( return app_shell(
id="morph",
class_="min-h-screen lg:grid lg:grid-cols-[18rem_minmax(0,1fr)]",
)[
admin_sidebar(
current_path=current_path, current_path=current_path,
source_count=source_count, source_count=source_count,
running_count=running_count, running_count=running_count,
), content=(
h.div(class_="px-4 py-4 sm:px-5 lg:px-6 lg:py-5")[
h.div(class_="mx-auto max-w-7xl space-y-5")[
h.section[ h.section[
h.div( h.div(
class_="flex flex-col gap-4 sm:flex-row sm:items-start sm:justify-between" class_="flex flex-col gap-4 sm:flex-row sm:items-start sm:justify-between"
@ -181,18 +251,15 @@ def page_shell(
)[title], )[title],
( (
description description
and h.p(class_="mt-1 text-sm text-slate-600")[ and h.p(class_="mt-1 text-sm text-slate-600")[description]
description
]
), ),
], ],
actions and h.div(class_="flex flex-wrap gap-2")[actions], actions and h.div(class_="flex flex-wrap gap-2")[actions],
] ]
], ],
content, content,
] ),
], )
]
def section_card(*, content: Node) -> Renderable: def section_card(*, content: Node) -> Renderable:
@ -207,17 +274,27 @@ def table_section(
empty_message: str, empty_message: str,
headers: tuple[str, ...], headers: tuple[str, ...],
rows: tuple[tuple[Node, ...], ...], rows: tuple[tuple[Node, ...], ...],
row_attrs: tuple[Mapping[str, str], ...] | None = None,
first_header_class: str | None = None,
first_cell_class: str | None = None,
actions: Node | None = None, actions: Node | None = None,
) -> Renderable: ) -> Renderable:
def render_row(row: tuple[Node, ...]) -> Renderable: def render_row(
row: tuple[Node, ...], attrs: Mapping[str, str] | None = None
) -> Renderable:
first_cell, *other_cells = row first_cell, *other_cells = row
return h.tr(class_="align-top")[ row_attributes = dict(attrs or {})
h.td(class_="py-4 pr-6 pl-4 text-sm font-medium text-slate-950 sm:pl-6")[ row_attributes["class"] = f"align-top {row_attributes.get('class', '')}".strip()
first_cell return h.tr(row_attributes)[
], h.td(
class_=(
first_cell_class
or "py-3 pr-5 pl-3 text-sm font-medium text-slate-950 sm:pl-4"
)
)[first_cell],
( (
h.td( h.td(
class_="px-3 py-4 align-top text-sm whitespace-nowrap text-slate-600" class_="px-2.5 py-3 align-top text-sm whitespace-nowrap text-slate-600"
)[cell] )[cell]
for cell in other_cells for cell in other_cells
), ),
@ -225,17 +302,23 @@ def table_section(
body_rows: Node body_rows: Node
if rows: if rows:
body_rows = (render_row(row) for row in rows) row_attributes = row_attrs or tuple({} for _ in rows)
body_rows = (
render_row(row, attrs)
for row, attrs in zip(rows, row_attributes, strict=False)
)
else: else:
body_rows = h.tr[ body_rows = h.tr[
h.td( h.td(
colspan=str(len(headers)), colspan=str(len(headers)),
class_="px-4 py-8 text-center text-sm text-slate-500 sm:px-6", class_="px-3 py-8 text-center text-sm text-slate-500 sm:px-4",
)[empty_message] )[empty_message]
] ]
return h.section[ return h.section[
h.div(class_="flex flex-col gap-3 sm:flex-row sm:items-end sm:justify-between")[ h.div(
class_="flex flex-col gap-2.5 sm:flex-row sm:items-end sm:justify-between"
)[
h.div[ h.div[
eyebrow eyebrow
and h.p( and h.p(
@ -251,16 +334,20 @@ def table_section(
)[ )[
h.div(class_="overflow-x-auto")[ h.div(class_="overflow-x-auto")[
h.table( h.table(
class_="relative w-full min-w-[72rem] divide-y divide-slate-200 table-auto" class_="relative w-full min-w-[64rem] divide-y divide-slate-200 table-auto"
)[ )[
h.thead(class_="bg-stone-50")[ h.thead(class_="bg-stone-50")[
h.tr[ h.tr[
( (
h.th( h.th(
scope="col", scope="col",
class_="px-3 py-2.5 text-left text-xs font-semibold uppercase tracking-[0.18em] whitespace-nowrap text-slate-500 first:pl-4 sm:first:pl-6", class_=(
first_header_class
if index == 0 and first_header_class is not None
else "px-2.5 py-2.5 text-left text-xs font-semibold uppercase tracking-[0.18em] whitespace-nowrap text-slate-500 first:pl-3 sm:first:pl-4"
),
)[header] )[header]
for header in headers for index, header in enumerate(headers)
) )
] ]
], ],

View file

@ -47,13 +47,20 @@ def _publish_event(queue: asyncio.Queue[object], event: object) -> None:
async def render_sse_event( async def render_sse_event(
render: RenderFunction, *, last_event_id: str | None = None render: RenderFunction,
*,
last_event_id: str | None = None,
use_view_transition: bool = False,
) -> tuple[str | None, DatastarEvent | None]: ) -> tuple[str | None, DatastarEvent | None]:
html = _coerce_html(await render()) html = _coerce_html(await render())
event_id = _render_hash(html) event_id = _render_hash(html)
if event_id == last_event_id: if event_id == last_event_id:
return last_event_id, None return last_event_id, None
return event_id, SSE.patch_elements(html, event_id=event_id) return event_id, SSE.patch_elements(
html,
event_id=event_id,
use_view_transition=use_view_transition,
)
async def render_stream( async def render_stream(
@ -71,9 +78,11 @@ async def render_stream(
yield event yield event
while True: while True:
await queue.get() event_name = await queue.get()
last_event_id, event = await render_sse_event( last_event_id, event = await render_sse_event(
render, last_event_id=last_event_id render,
last_event_id=last_event_id,
use_view_transition=event_name == "queue-reordered",
) )
if event is not None: if event is not None:
yield event yield event

View file

@ -14,6 +14,7 @@ from typing import Callable, TextIO, cast
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.cron import CronTrigger
from peewee import IntegrityError
from repub.config import feed_output_dir, feed_output_path from repub.config import feed_output_dir, feed_output_path
from repub.model import ( from repub.model import (
@ -106,7 +107,7 @@ class JobRuntime:
self, self,
*, *,
log_dir: str | Path, log_dir: str | Path,
refresh_callback: Callable[[], None] | None = None, refresh_callback: Callable[[object], None] | None = None,
graceful_stop_seconds: float = 15.0, graceful_stop_seconds: float = 15.0,
) -> None: ) -> None:
self.log_dir = Path(log_dir) self.log_dir = Path(log_dir)
@ -116,6 +117,7 @@ class JobRuntime:
self._workers: dict[int, RunningWorker] = {} self._workers: dict[int, RunningWorker] = {}
self._run_lock = threading.Lock() self._run_lock = threading.Lock()
self._started = False self._started = False
self._last_runtime_refresh_at = 0.0
def start(self) -> None: def start(self) -> None:
if self._started: if self._started:
@ -143,6 +145,7 @@ class JobRuntime:
) )
self.sync_jobs() self.sync_jobs()
self._started = True self._started = True
self._start_queued_jobs()
def shutdown(self) -> None: def shutdown(self) -> None:
for execution_id in tuple(self._workers): for execution_id in tuple(self._workers):
@ -183,20 +186,84 @@ class JobRuntime:
self.scheduler.remove_job(scheduled_job.id) self.scheduler.remove_job(scheduled_job.id)
def run_scheduled_job(self, job_id: int) -> None: def run_scheduled_job(self, job_id: int) -> None:
self.run_job_now(job_id, reason="scheduled") self.enqueue_job_run(job_id, reason="scheduled")
def run_job_now(self, job_id: int, *, reason: str) -> int | None: def run_job_now(self, job_id: int, *, reason: str) -> int | None:
return self.enqueue_job_run(job_id, reason=reason)
def enqueue_job_run(self, job_id: int, *, reason: str) -> int | None:
del reason del reason
self.start() self.start()
with self._run_lock: with self._run_lock:
execution_id = self._enqueue_job_run_locked(job_id)
self._start_queued_jobs_locked()
if execution_id is not None:
self._trigger_refresh()
return execution_id
def _enqueue_job_run_locked(self, job_id: int) -> int | None:
with database.connection_context(): with database.connection_context():
with database.atomic():
job = Job.get_or_none(id=job_id) job = Job.get_or_none(id=job_id)
if job is None: if job is None:
return None return None
if self._max_concurrent_jobs_reached(): pending_execution = JobExecution.get_or_none(
return None (JobExecution.job == job)
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
if pending_execution is not None:
return _execution_id(pending_execution)
try:
execution = JobExecution.create(
job=job,
running_status=JobExecutionStatus.PENDING,
)
except IntegrityError:
pending_execution = JobExecution.get_or_none(
(JobExecution.job == job)
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
return (
_execution_id(pending_execution)
if pending_execution is not None
else None
)
return _execution_id(execution)
def _start_queued_jobs(self) -> None:
with self._run_lock:
self._start_queued_jobs_locked()
def _start_queued_jobs_locked(self) -> None:
while True:
if self._max_concurrent_jobs_reached():
return
claimed_execution = self._claim_next_pending_execution()
if claimed_execution is None:
return
job = cast(Job, claimed_execution.job)
self._start_worker_for_execution(
job_id=_job_id(job),
execution_id=_execution_id(claimed_execution),
)
def _claim_next_pending_execution(self) -> JobExecution | None:
with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
pending_executions = tuple(
JobExecution.select(JobExecution, Job)
.join(Job)
.where(JobExecution.running_status == JobExecutionStatus.PENDING)
.order_by(JobExecution.created_at.asc(), execution_primary_key.asc())
)
for execution in pending_executions:
job = cast(Job, execution.job)
already_running = ( already_running = (
JobExecution.select() JobExecution.select()
.where( .where(
@ -206,15 +273,25 @@ class JobRuntime:
.exists() .exists()
) )
if already_running: if already_running:
return None continue
execution = JobExecution.create( started_at = utc_now()
job=job, claimed = (
started_at=utc_now(), JobExecution.update(
started_at=started_at,
running_status=JobExecutionStatus.RUNNING, running_status=JobExecutionStatus.RUNNING,
) )
execution_id = _execution_id(execution) .where(
(execution_primary_key == _execution_id(execution))
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
.execute()
)
if claimed:
return JobExecution.get_by_id(_execution_id(execution))
return None
def _start_worker_for_execution(self, *, job_id: int, execution_id: int) -> None:
artifacts = JobArtifacts.for_execution( artifacts = JobArtifacts.for_execution(
log_dir=self.log_dir, job_id=job_id, execution_id=execution_id log_dir=self.log_dir, job_id=job_id, execution_id=execution_id
) )
@ -250,8 +327,6 @@ class JobRuntime:
log_handle=log_handle, log_handle=log_handle,
artifacts=artifacts, artifacts=artifacts,
) )
self._trigger_refresh()
return execution_id
def _max_concurrent_jobs_reached(self) -> bool: def _max_concurrent_jobs_reached(self) -> bool:
return ( return (
@ -279,21 +354,123 @@ class JobRuntime:
) )
worker.process.terminate() worker.process.terminate()
self._trigger_refresh("queue-reordered")
return True
def cancel_queued_execution(self, execution_id: int) -> bool:
with self._run_lock:
with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
deleted = (
JobExecution.delete()
.where(
(execution_primary_key == execution_id)
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
.execute()
)
if not deleted:
return False
self._trigger_refresh()
return True
def move_queued_execution(self, execution_id: int, *, direction: str) -> bool:
offset = -1 if direction == "up" else 1
with self._run_lock:
with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
queued_executions = tuple(
JobExecution.select()
.where(JobExecution.running_status == JobExecutionStatus.PENDING)
.order_by(
JobExecution.created_at.asc(), execution_primary_key.asc()
)
)
current_index = next(
(
index
for index, execution in enumerate(queued_executions)
if _execution_id(execution) == execution_id
),
None,
)
if current_index is None:
return False
target_index = current_index + offset
if target_index < 0 or target_index >= len(queued_executions):
return False
current_execution = queued_executions[current_index]
target_execution = queued_executions[target_index]
current_created_at = _coerce_datetime(
cast(datetime | str, current_execution.created_at)
)
target_created_at = _coerce_datetime(
cast(datetime | str, target_execution.created_at)
)
with database.atomic():
if current_created_at == target_created_at:
adjusted_created_at = target_created_at + timedelta(
microseconds=-1 if offset < 0 else 1
)
(
JobExecution.update(created_at=adjusted_created_at)
.where(
execution_primary_key
== _execution_id(current_execution)
)
.execute()
)
else:
(
JobExecution.update(created_at=target_created_at)
.where(
execution_primary_key
== _execution_id(current_execution)
)
.execute()
)
(
JobExecution.update(created_at=current_created_at)
.where(
execution_primary_key == _execution_id(target_execution)
)
.execute()
)
self._trigger_refresh() self._trigger_refresh()
return True return True
def set_job_enabled(self, job_id: int, *, enabled: bool) -> bool: def set_job_enabled(self, job_id: int, *, enabled: bool) -> bool:
with database.connection_context(): with database.connection_context():
with database.atomic():
job = Job.get_or_none(id=job_id) job = Job.get_or_none(id=job_id)
if job is None: if job is None:
return False return False
job.enabled = enabled job.enabled = enabled
job.save() job.save()
if not enabled:
(
JobExecution.delete()
.where(
(JobExecution.job == job)
& (
JobExecution.running_status
== JobExecutionStatus.PENDING
)
)
.execute()
)
self.sync_jobs() self.sync_jobs()
self._trigger_refresh() self._trigger_refresh()
return True return True
def poll_workers(self) -> None: def poll_workers(self) -> None:
any_finished = False
for execution_id in tuple(self._workers): for execution_id in tuple(self._workers):
worker = self._workers[execution_id] worker = self._workers[execution_id]
self._apply_stats(worker) self._apply_stats(worker)
@ -315,8 +492,14 @@ class JobRuntime:
worker.log_handle.close() worker.log_handle.close()
del self._workers[execution_id] del self._workers[execution_id]
any_finished = True
self._trigger_refresh() self._trigger_refresh()
if any_finished:
self._start_queued_jobs()
self._refresh_running_runtime()
def _apply_stats(self, worker: RunningWorker) -> None: def _apply_stats(self, worker: RunningWorker) -> None:
if not worker.artifacts.stats_path.exists(): if not worker.artifacts.stats_path.exists():
return return
@ -360,9 +543,27 @@ class JobRuntime:
): ):
worker.process.kill() worker.process.kill()
def _trigger_refresh(self) -> None: def _trigger_refresh(self, event: object = "refresh-event") -> None:
if self.refresh_callback is not None: if self.refresh_callback is not None:
self.refresh_callback() self.refresh_callback(event)
def _refresh_running_runtime(self) -> None:
if not self._has_running_executions():
return
current_time = time.monotonic()
if current_time - self._last_runtime_refresh_at < 1.0:
return
self._last_runtime_refresh_at = current_time
self._trigger_refresh()
def _has_running_executions(self) -> bool:
return (
JobExecution.select()
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
.exists()
)
def _reconcile_stale_executions(self) -> None: def _reconcile_stale_executions(self) -> None:
live_workers = _find_live_workers() live_workers = _find_live_workers()
@ -451,7 +652,15 @@ def load_runs_view(
reference_time = now or datetime.now(UTC) reference_time = now or datetime.now(UTC)
resolved_log_dir = Path(log_dir) resolved_log_dir = Path(log_dir)
with database.connection_context(): with database.connection_context():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
jobs = tuple(Job.select(Job, Source).join(Source).order_by(Source.name.asc())) jobs = tuple(Job.select(Job, Source).join(Source).order_by(Source.name.asc()))
queued_executions = tuple(
JobExecution.select(JobExecution, Job, Source)
.join(Job)
.join(Source)
.where(JobExecution.running_status == JobExecutionStatus.PENDING)
.order_by(JobExecution.created_at.asc(), execution_primary_key.asc())
)
running_executions = tuple( running_executions = tuple(
JobExecution.select(JobExecution, Job, Source) JobExecution.select(JobExecution, Job, Source)
.join(Job) .join(Job)
@ -477,15 +686,39 @@ def load_runs_view(
) )
running_by_job = { running_by_job = {
_job_id(execution.job): execution for execution in running_executions _job_id(cast(Job, execution.job)): execution
for execution in running_executions
}
queued_by_job = {
_job_id(cast(Job, execution.job)): execution
for execution in queued_executions
} }
return { return {
"running": tuple( "running": tuple(
_project_running_execution(execution, resolved_log_dir, reference_time) _project_running_execution(
execution,
resolved_log_dir,
reference_time,
queued_follow_up=queued_by_job.get(_job_id(cast(Job, execution.job))),
)
for execution in running_executions for execution in running_executions
), ),
"queued": tuple(
_project_queued_execution(
execution,
reference_time,
position=position,
total_count=len(queued_executions),
)
for position, execution in enumerate(queued_executions, start=1)
),
"upcoming": tuple( "upcoming": tuple(
_project_upcoming_job(job, running_by_job.get(job.id), reference_time) _project_upcoming_job(
job,
running_by_job.get(job.id),
queued_by_job.get(job.id),
reference_time,
)
for job in jobs for job in jobs
), ),
"completed": tuple( "completed": tuple(
@ -596,7 +829,11 @@ def _scheduler_job_id(job_id: int) -> str:
def _project_running_execution( def _project_running_execution(
execution: JobExecution, log_dir: Path, reference_time: datetime execution: JobExecution,
log_dir: Path,
reference_time: datetime,
*,
queued_follow_up: JobExecution | None = None,
) -> dict[str, object]: ) -> dict[str, object]:
job = cast(Job, execution.job) job = cast(Job, execution.job)
job_id = _job_id(job) job_id = _job_id(job)
@ -624,12 +861,59 @@ def _project_running_execution(
), ),
"log_href": f"/job/{job_id}/execution/{execution_id}/logs", "log_href": f"/job/{job_id}/execution/{execution_id}/logs",
"log_exists": artifacts.log_path.exists(), "log_exists": artifacts.log_path.exists(),
"cancel_post_path": f"/actions/executions/{execution_id}/cancel", "cancel_label": "Cancel" if queued_follow_up is not None else "Stop",
"cancel_post_path": (
f"/actions/queued-executions/{_execution_id(queued_follow_up)}/cancel"
if queued_follow_up is not None
else f"/actions/executions/{execution_id}/cancel"
),
}
def _project_queued_execution(
execution: JobExecution,
reference_time: datetime,
*,
position: int,
total_count: int,
) -> dict[str, object]:
job = cast(Job, execution.job)
queued_at = _coerce_datetime(cast(datetime | str, execution.created_at))
execution_id = _execution_id(execution)
return {
"source": job.source.name,
"slug": job.source.slug,
"job_id": _job_id(job),
"execution_id": execution_id,
"queued_at": _humanize_relative_time(reference_time, queued_at),
"queued_at_iso": queued_at.isoformat(),
"queue_position": position,
"status": "Queued",
"status_tone": "idle",
"run_label": "Queued",
"run_disabled": True,
"run_post_path": f"/actions/jobs/{_job_id(job)}/run-now",
"cancel_post_path": (f"/actions/queued-executions/{execution_id}/cancel"),
"move_up_disabled": position == 1,
"move_up_post_path": (
None
if position == 1
else f"/actions/queued-executions/{execution_id}/move-up"
),
"move_down_disabled": position == total_count,
"move_down_post_path": (
None
if position == total_count
else f"/actions/queued-executions/{execution_id}/move-down"
),
} }
def _project_upcoming_job( def _project_upcoming_job(
job: Job, running_execution: JobExecution | None, reference_time: datetime job: Job,
running_execution: JobExecution | None,
queued_execution: JobExecution | None,
reference_time: datetime,
) -> dict[str, object]: ) -> dict[str, object]:
job_id = _job_id(job) job_id = _job_id(job)
trigger = _job_trigger(job) trigger = _job_trigger(job)
@ -638,6 +922,12 @@ def _project_upcoming_job(
if job.enabled and running_execution is None if job.enabled and running_execution is None
else None else None
) )
run_disabled = running_execution is not None or queued_execution is not None
run_reason = (
"Already running"
if running_execution is not None
else ("Queued" if queued_execution is not None else "Ready")
)
return { return {
"source": job.source.name, "source": job.source.name,
"slug": job.source.slug, "slug": job.source.slug,
@ -659,8 +949,8 @@ def _project_upcoming_job(
), ),
"enabled_label": "Enabled" if job.enabled else "Disabled", "enabled_label": "Enabled" if job.enabled else "Disabled",
"enabled_tone": "scheduled" if job.enabled else "idle", "enabled_tone": "scheduled" if job.enabled else "idle",
"run_disabled": running_execution is not None, "run_disabled": run_disabled,
"run_reason": "Already running" if running_execution is not None else "Ready", "run_reason": run_reason,
"toggle_label": "Disable" if job.enabled else "Enable", "toggle_label": "Disable" if job.enabled else "Enable",
"toggle_enabled": not job.enabled, "toggle_enabled": not job.enabled,
"run_post_path": f"/actions/jobs/{job_id}/run-now", "run_post_path": f"/actions/jobs/{job_id}/run-now",

View file

@ -6,7 +6,7 @@ import htpy as h
from htpy import Node, Renderable from htpy import Node, Renderable
from repub.components import ( from repub.components import (
admin_sidebar, app_shell,
header_action_link, header_action_link,
inline_button, inline_button,
inline_link, inline_link,
@ -253,21 +253,14 @@ def dashboard_page_with_data(
) -> Renderable: ) -> Renderable:
running_items = running_executions or () running_items = running_executions or ()
source_items = source_feeds or () source_items = source_feeds or ()
return h.main( return app_shell(
id="morph",
class_="min-h-screen lg:grid lg:grid-cols-[18rem_minmax(0,1fr)]",
)[
admin_sidebar(
current_path="/", current_path="/",
source_count=len(source_items), source_count=len(source_items),
running_count=len(running_items), running_count=len(running_items),
), content=(
h.div(class_="px-4 py-4 sm:px-5 lg:px-6 lg:py-5")[
h.div(class_="mx-auto max-w-7xl space-y-5")[
dashboard_header(), dashboard_header(),
operational_snapshot(snapshot=snapshot), operational_snapshot(snapshot=snapshot),
running_executions_table(running_executions=running_items), running_executions_table(running_executions=running_items),
published_feeds_table(source_feeds=source_items), published_feeds_table(source_feeds=source_items),
] ),
], )
]

View file

@ -6,6 +6,7 @@ import htpy as h
from htpy import Node, Renderable from htpy import Node, Renderable
from repub.components import ( from repub.components import (
action_button,
inline_link, inline_link,
muted_action_link, muted_action_link,
page_shell, page_shell,
@ -15,34 +16,6 @@ from repub.components import (
) )
def _action_button(
*,
label: str,
tone: str = "default",
disabled: bool = False,
post_path: str | None = None,
) -> Renderable:
classes = {
"default": "bg-stone-100 text-slate-700 hover:bg-stone-200",
"danger": "bg-rose-50 text-rose-700 hover:bg-rose-100",
}
class_name = (
"cursor-not-allowed bg-slate-100 text-slate-400" if disabled else classes[tone]
)
attributes: dict[str, str] = {}
if post_path is not None and not disabled:
attributes["data-on:pointerdown"] = f"@post('{post_path}')"
return h.button(
attributes,
type="button",
disabled=disabled,
class_=(
"inline-flex items-center whitespace-nowrap rounded-full px-3 py-1.5 "
f"text-sm font-semibold transition {class_name}"
),
)[label]
def _text(values: Mapping[str, object], key: str) -> str: def _text(values: Mapping[str, object], key: str) -> str:
return str(values[key]) return str(values[key])
@ -58,36 +31,121 @@ def _flag(values: Mapping[str, object], key: str) -> bool:
return bool(values[key]) return bool(values[key])
def _queue_icon(direction: str) -> Renderable:
path = (
"M4.5 10.5 12 3m0 0 7.5 7.5M12 3v18"
if direction == "up"
else "M19.5 13.5 12 21m0 0-7.5-7.5M12 21V3"
)
return h.svg(
xmlns="http://www.w3.org/2000/svg",
fill="none",
viewBox="0 0 24 24",
stroke_width="1.5",
stroke="currentColor",
class_="size-4",
)[
h.path(
stroke_linecap="round",
stroke_linejoin="round",
d=path,
)
]
def _queue_row_attrs(execution: Mapping[str, object]) -> dict[str, str]:
return {
"style": (
"view-transition-name: " f"running-job-{_text(execution, 'execution_id')};"
)
}
def _running_row(execution: Mapping[str, object]) -> tuple[Node, ...]: def _running_row(execution: Mapping[str, object]) -> tuple[Node, ...]:
return ( return (
h.p(class_="w-px whitespace-nowrap font-medium text-slate-900")[
f"#{_text(execution, 'execution_id')}"
],
h.div[ h.div[
h.div(class_="font-semibold text-slate-950")[_text(execution, "source")], h.div(class_="font-semibold text-slate-950")[_text(execution, "source")],
h.p(class_="mt-1 font-mono text-xs text-slate-500")[ h.p(class_="mt-0.5 font-mono text-xs text-slate-500")[
_text(execution, "slug") _text(execution, "slug")
], ],
], ],
h.div[
h.p(class_="font-medium text-slate-900")[
f"#{_text(execution, 'execution_id')}"
],
],
h.div[ h.div[
h.p(class_="font-medium text-slate-900")[_text(execution, "started_at")], h.p(class_="font-medium text-slate-900")[_text(execution, "started_at")],
h.p(class_="mt-1 text-xs text-slate-500")[_text(execution, "runtime")], h.p(class_="mt-0.5 text-xs text-slate-500")[_text(execution, "runtime")],
], ],
status_badge(label=_text(execution, "status"), tone="running"), status_badge(label=_text(execution, "status"), tone="running"),
h.div(class_="min-w-56 whitespace-normal")[ h.div(class_="max-w-xs whitespace-normal")[
h.p(class_="font-medium text-slate-900")[_text(execution, "stats")], h.p(class_="font-medium text-slate-900")[_text(execution, "stats")],
h.p(class_="mt-1 text-xs text-slate-500")[_text(execution, "worker")], h.p(class_="mt-0.5 text-xs text-slate-500")[_text(execution, "worker")],
], ],
h.div(class_="flex flex-nowrap items-center gap-3")[ h.div(class_="flex flex-wrap items-center gap-2")[
inline_link( inline_link(
href=_text(execution, "log_href"), href=_text(execution, "log_href"),
label="View log", label="View log",
tone="amber", tone="amber",
), ),
_action_button( action_button(
label="Stop", label=_text(execution, "cancel_label"),
tone="danger",
post_path=_maybe_text(execution, "cancel_post_path"),
),
],
)
def _queued_row(execution: Mapping[str, object]) -> tuple[Node, ...]:
queued_at = _maybe_text(execution, "queued_at_iso")
queued_label: Node = h.p(class_="font-medium text-slate-900")[
_text(execution, "queued_at")
]
if queued_at is not None:
queued_label = h.time(
{
"data-queued-at": queued_at,
"title": queued_at,
},
datetime=queued_at,
class_="font-medium text-slate-900",
)[_text(execution, "queued_at")]
return (
h.p(class_="w-px whitespace-nowrap font-medium text-slate-900")[
f"#{_text(execution, 'execution_id')}"
],
h.div[
h.div(class_="font-semibold text-slate-950")[_text(execution, "source")],
h.p(class_="mt-0.5 font-mono text-xs text-slate-500")[
_text(execution, "slug")
],
],
queued_label,
status_badge(label="Queued", tone="idle"),
h.div(class_="max-w-xs whitespace-normal")[
h.p(class_="font-medium text-slate-900")[
f"Queue position #{_text(execution, 'queue_position')}"
],
h.p(class_="mt-0.5 text-xs text-slate-500")["waiting for capacity"],
],
h.div(class_="flex flex-wrap items-center gap-2")[
action_button(
label=_queue_icon("up"),
emphasis="icon",
title="Move up",
disabled=_flag(execution, "move_up_disabled"),
post_path=_maybe_text(execution, "move_up_post_path"),
),
action_button(
label=_queue_icon("down"),
emphasis="icon",
title="Move down",
disabled=_flag(execution, "move_down_disabled"),
post_path=_maybe_text(execution, "move_down_post_path"),
),
action_button(
label="Cancel",
tone="danger", tone="danger",
post_path=_maybe_text(execution, "cancel_post_path"), post_path=_maybe_text(execution, "cancel_post_path"),
), ),
@ -113,7 +171,7 @@ def _upcoming_row(job: Mapping[str, object]) -> tuple[Node, ...]:
return ( return (
h.div[ h.div[
h.div(class_="font-semibold text-slate-950")[_text(job, "source")], h.div(class_="font-semibold text-slate-950")[_text(job, "source")],
h.p(class_="mt-1 font-mono text-xs text-slate-500")[_text(job, "slug")], h.p(class_="mt-0.5 font-mono text-xs text-slate-500")[_text(job, "slug")],
], ],
h.div[next_run_label,], h.div[next_run_label,],
h.p(class_="font-mono text-xs text-slate-600")[_text(job, "schedule")], h.p(class_="font-mono text-xs text-slate-600")[_text(job, "schedule")],
@ -121,20 +179,20 @@ def _upcoming_row(job: Mapping[str, object]) -> tuple[Node, ...]:
label=_text(job, "enabled_label"), label=_text(job, "enabled_label"),
tone=_text(job, "enabled_tone"), tone=_text(job, "enabled_tone"),
), ),
h.p(class_="max-w-40 whitespace-normal text-sm text-slate-500")[ h.p(class_="max-w-32 whitespace-normal text-sm text-slate-500")[
_text(job, "run_reason") _text(job, "run_reason")
], ],
h.div(class_="flex flex-nowrap items-center gap-2")[ h.div(class_="flex flex-wrap items-center gap-2")[
_action_button( action_button(
label="Run now", label="Run now",
disabled=_flag(job, "run_disabled"), disabled=_flag(job, "run_disabled"),
post_path=_maybe_text(job, "run_post_path"), post_path=_maybe_text(job, "run_post_path"),
), ),
_action_button( action_button(
label=_text(job, "toggle_label"), label=_text(job, "toggle_label"),
post_path=_maybe_text(job, "toggle_post_path"), post_path=_maybe_text(job, "toggle_post_path"),
), ),
_action_button( action_button(
label="Delete", label="Delete",
tone="danger", tone="danger",
post_path=_maybe_text(job, "delete_post_path"), post_path=_maybe_text(job, "delete_post_path"),
@ -159,26 +217,21 @@ def _completed_row(execution: Mapping[str, object]) -> tuple[Node, ...]:
)[_text(execution, "ended_at")] )[_text(execution, "ended_at")]
return ( return (
h.p(class_="w-px whitespace-nowrap font-medium text-slate-900")[
f"#{_text(execution, 'execution_id')}"
],
h.div[ h.div[
h.div(class_="font-semibold text-slate-950")[_text(execution, "source")], h.div(class_="font-semibold text-slate-950")[_text(execution, "source")],
h.p(class_="mt-1 font-mono text-xs text-slate-500")[ h.p(class_="mt-0.5 font-mono text-xs text-slate-500")[
_text(execution, "slug") _text(execution, "slug")
], ],
], ],
h.div[ h.div[ended_at_label,],
h.p(class_="font-medium text-slate-900")[
f"#{_text(execution, 'execution_id')}"
],
],
h.div[
ended_at_label,
h.p(class_="mt-1 text-xs text-slate-500")[_text(execution, "summary")],
],
status_badge( status_badge(
label=_text(execution, "status"), label=_text(execution, "status"),
tone=_text(execution, "status_tone"), tone=_text(execution, "status_tone"),
), ),
h.div(class_="min-w-48 whitespace-normal")[ h.div(class_="max-w-[14rem] whitespace-normal")[
h.p(class_="font-medium text-slate-900")[_text(execution, "stats")] h.p(class_="font-medium text-slate-900")[_text(execution, "stats")]
], ],
inline_link( inline_link(
@ -192,14 +245,21 @@ def _completed_row(execution: Mapping[str, object]) -> tuple[Node, ...]:
def runs_page( def runs_page(
*, *,
running_executions: tuple[Mapping[str, object], ...] | None = None, running_executions: tuple[Mapping[str, object], ...] | None = None,
queued_executions: tuple[Mapping[str, object], ...] | None = None,
upcoming_jobs: tuple[Mapping[str, object], ...] | None = None, upcoming_jobs: tuple[Mapping[str, object], ...] | None = None,
completed_executions: tuple[Mapping[str, object], ...] | None = None, completed_executions: tuple[Mapping[str, object], ...] | None = None,
source_count: int = 0, source_count: int = 0,
) -> Renderable: ) -> Renderable:
running_items = running_executions or () running_items = running_executions or ()
queued_items = queued_executions or ()
upcoming_items = upcoming_jobs or () upcoming_items = upcoming_jobs or ()
completed_items = completed_executions or () completed_items = completed_executions or ()
running_rows = tuple(_running_row(execution) for execution in running_items) running_rows = tuple(_running_row(execution) for execution in running_items)
queued_rows = tuple(_queued_row(execution) for execution in queued_items)
live_rows = running_rows + queued_rows
live_row_attrs = tuple(
_queue_row_attrs(execution) for execution in running_items + queued_items
)
upcoming_rows = tuple(_upcoming_row(job) for job in upcoming_items) upcoming_rows = tuple(_upcoming_row(job) for job in upcoming_items)
completed_rows = tuple(_completed_row(execution) for execution in completed_items) completed_rows = tuple(_completed_row(execution) for execution in completed_items)
@ -213,21 +273,24 @@ def runs_page(
content=( content=(
table_section( table_section(
eyebrow="Live work", eyebrow="Live work",
title="Running job executions", title="Running jobs",
empty_message="No job executions are running.", empty_message="No jobs are running or queued.",
headers=( headers=(
"#",
"Source", "Source",
"Execution", "Activity",
"Started", "State",
"Status", "Details",
"Stats",
"Actions", "Actions",
), ),
rows=running_rows, rows=live_rows,
row_attrs=live_row_attrs,
first_header_class="w-px py-2.5 pr-2 pl-3 text-left text-xs font-semibold uppercase tracking-[0.18em] whitespace-nowrap text-slate-500 sm:pl-3",
first_cell_class="w-px py-3 pr-2 pl-3 text-sm font-medium text-slate-950 sm:pl-3",
), ),
table_section( table_section(
eyebrow="Queue", eyebrow="Schedule",
title="Upcoming jobs", title="Scheduled jobs",
empty_message="No jobs are scheduled.", empty_message="No jobs are scheduled.",
headers=( headers=(
"Source", "Source",
@ -244,14 +307,16 @@ def runs_page(
title="Completed job executions", title="Completed job executions",
empty_message="No job executions have completed yet.", empty_message="No job executions have completed yet.",
headers=( headers=(
"#",
"Source", "Source",
"Execution",
"Ended", "Ended",
"Status", "State",
"Summary", "Summary",
"Log", "Log",
), ),
rows=completed_rows, rows=completed_rows,
first_header_class="w-px py-2.5 pr-2 pl-3 text-left text-xs font-semibold uppercase tracking-[0.18em] whitespace-nowrap text-slate-500 sm:pl-3",
first_cell_class="w-px py-3 pr-2 pl-3 text-sm font-medium text-slate-950 sm:pl-3",
), ),
h.script[ h.script[
""" """

View file

@ -5,7 +5,13 @@ from collections.abc import Mapping
import htpy as h import htpy as h
from htpy import Renderable from htpy import Renderable
from repub.components import input_field, muted_action_link, page_shell, section_card from repub.components import (
action_button,
input_field,
muted_action_link,
page_shell,
section_card,
)
def _value(settings: Mapping[str, object] | None, key: str, default: str = "") -> str: def _value(settings: Mapping[str, object] | None, key: str, default: str = "") -> str:
@ -71,10 +77,12 @@ def settings_page(
], ],
h.div(class_="flex flex-wrap justify-end gap-3 pt-2")[ h.div(class_="flex flex-wrap justify-end gap-3 pt-2")[
muted_action_link(href="/", label="Back to dashboard"), muted_action_link(href="/", label="Back to dashboard"),
h.button( action_button(
type="submit", label="Save settings",
class_="rounded-full bg-slate-950 px-4 py-2.5 text-sm font-semibold text-white transition hover:bg-slate-800", tone="dark",
)["Save settings"], emphasis="regular",
button_type="submit",
),
], ],
], ],
) )

View file

@ -3,7 +3,7 @@ from __future__ import annotations
import htpy as h import htpy as h
from htpy import Node, Renderable from htpy import Node, Renderable
from repub.components import admin_sidebar from repub.components import app_shell
ON_LOAD_JS = ( ON_LOAD_JS = (
"@post(window.location.pathname + " "@post(window.location.pathname + "
@ -33,17 +33,9 @@ def shim_page(
} }
), ),
h.noscript["Your browser does not support JavaScript!"], h.noscript["Your browser does not support JavaScript!"],
h.main( app_shell(
id="morph",
class_="min-h-screen lg:grid lg:grid-cols-[18rem_minmax(0,1fr)]",
)[
admin_sidebar(
current_path=current_path, current_path=current_path,
source_count=0, content=(
running_count=0,
),
h.div(class_="px-4 py-4 sm:px-5 lg:px-6 lg:py-5")[
h.div(class_="mx-auto max-w-7xl space-y-5")[
h.section[ h.section[
h.div( h.div(
class_="flex flex-col gap-4 sm:flex-row sm:items-start sm:justify-between" class_="flex flex-col gap-4 sm:flex-row sm:items-start sm:justify-between"
@ -68,8 +60,7 @@ def shim_page(
h.div(class_="h-12 rounded-2xl bg-stone-100"), h.div(class_="h-12 rounded-2xl bg-stone-100"),
] ]
], ],
] ),
], ),
],
], ],
] ]

View file

@ -6,6 +6,7 @@ import htpy as h
from htpy import Node, Renderable from htpy import Node, Renderable
from repub.components import ( from repub.components import (
action_button,
header_action_link, header_action_link,
inline_link, inline_link,
input_field, input_field,
@ -54,29 +55,6 @@ def _checked(source: Mapping[str, object] | None, key: str, default: bool) -> bo
return bool(value) return bool(value)
def _action_button(
*,
label: str,
tone: str = "default",
post_path: str | None = None,
) -> Renderable:
classes = {
"default": "bg-stone-100 text-slate-700 hover:bg-stone-200",
"danger": "bg-rose-50 text-rose-700 hover:bg-rose-100",
}
attributes: dict[str, str] = {}
if post_path is not None:
attributes["data-on:pointerdown"] = f"@post('{post_path}')"
return h.button(
attributes,
type="button",
class_=(
"inline-flex items-center whitespace-nowrap rounded-full px-3 py-1.5 "
f"text-sm font-semibold transition {classes[tone]}"
),
)[label]
def _source_row(source: Mapping[str, object]) -> tuple[Node, ...]: def _source_row(source: Mapping[str, object]) -> tuple[Node, ...]:
return ( return (
h.div[ h.div[
@ -99,12 +77,12 @@ def _source_row(source: Mapping[str, object]) -> tuple[Node, ...]:
), ),
h.p(class_="mt-2 text-xs text-slate-500")[str(source["last_run"])], h.p(class_="mt-2 text-xs text-slate-500")[str(source["last_run"])],
], ],
h.div(class_="flex flex-nowrap items-center gap-3")[ h.div(class_="flex flex-wrap items-center gap-2")[
inline_link( inline_link(
href=f"/sources/{source['slug']}/edit", label="Edit", tone="amber" href=f"/sources/{source['slug']}/edit", label="Edit", tone="amber"
), ),
inline_link(href="/runs", label="View runs"), inline_link(href="/runs", label="View runs"),
_action_button( action_button(
label="Delete", label="Delete",
tone="danger", tone="danger",
post_path=f"/actions/sources/{source['slug']}/delete", post_path=f"/actions/sources/{source['slug']}/delete",
@ -422,10 +400,12 @@ def source_form(
class_="flex flex-wrap justify-end gap-3 border-t border-slate-200 pt-6" class_="flex flex-wrap justify-end gap-3 border-t border-slate-200 pt-6"
)[ )[
muted_action_link(href="/sources", label="Cancel"), muted_action_link(href="/sources", label="Cancel"),
h.button( action_button(
type="submit", label=submit_label,
class_="rounded-full bg-slate-950 px-4 py-2.5 text-sm font-semibold text-white transition hover:bg-slate-800", tone="dark",
)[submit_label], emphasis="regular",
button_type="submit",
),
], ],
], ],
) )

View file

@ -0,0 +1,6 @@
CREATE INDEX IF NOT EXISTS job_execution_pending_created_at_idx
ON job_execution (running_status, created_at ASC);
CREATE UNIQUE INDEX IF NOT EXISTS job_execution_pending_unique_job_idx
ON job_execution (job_id)
WHERE running_status = 0;

View file

@ -42,6 +42,7 @@
--color-stone-200: oklch(92.3% 0.003 48.717); --color-stone-200: oklch(92.3% 0.003 48.717);
--color-white: #fff; --color-white: #fff;
--spacing: 0.25rem; --spacing: 0.25rem;
--container-xs: 20rem;
--container-sm: 24rem; --container-sm: 24rem;
--container-3xl: 48rem; --container-3xl: 48rem;
--container-7xl: 80rem; --container-7xl: 80rem;
@ -289,8 +290,8 @@
.mt-5 { .mt-5 {
margin-top: calc(var(--spacing) * 5); margin-top: calc(var(--spacing) * 5);
} }
.mt-10 { .mt-8 {
margin-top: calc(var(--spacing) * 10); margin-top: calc(var(--spacing) * 8);
} }
.mt-auto { .mt-auto {
margin-top: auto; margin-top: auto;
@ -316,13 +317,21 @@
.table { .table {
display: table; display: table;
} }
.size-4 {
width: calc(var(--spacing) * 4);
height: calc(var(--spacing) * 4);
}
.size-5 { .size-5 {
width: calc(var(--spacing) * 5); width: calc(var(--spacing) * 5);
height: calc(var(--spacing) * 5); height: calc(var(--spacing) * 5);
} }
.size-11 { .size-8 {
width: calc(var(--spacing) * 11); width: calc(var(--spacing) * 8);
height: calc(var(--spacing) * 11); height: calc(var(--spacing) * 8);
}
.size-10 {
width: calc(var(--spacing) * 10);
height: calc(var(--spacing) * 10);
} }
.h-5 { .h-5 {
height: calc(var(--spacing) * 5); height: calc(var(--spacing) * 5);
@ -348,36 +357,42 @@
.w-full { .w-full {
width: 100%; width: 100%;
} }
.w-px {
width: 1px;
}
.max-w-3xl { .max-w-3xl {
max-width: var(--container-3xl); max-width: var(--container-3xl);
} }
.max-w-7xl { .max-w-7xl {
max-width: var(--container-7xl); max-width: var(--container-7xl);
} }
.max-w-40 { .max-w-32 {
max-width: calc(var(--spacing) * 40); max-width: calc(var(--spacing) * 32);
}
.max-w-\[14rem\] {
max-width: 14rem;
} }
.max-w-sm { .max-w-sm {
max-width: var(--container-sm); max-width: var(--container-sm);
} }
.max-w-xs {
max-width: var(--container-xs);
}
.min-w-32 { .min-w-32 {
min-width: calc(var(--spacing) * 32); min-width: calc(var(--spacing) * 32);
} }
.min-w-48 {
min-width: calc(var(--spacing) * 48);
}
.min-w-56 { .min-w-56 {
min-width: calc(var(--spacing) * 56); min-width: calc(var(--spacing) * 56);
} }
.min-w-64 { .min-w-64 {
min-width: calc(var(--spacing) * 64); min-width: calc(var(--spacing) * 64);
} }
.min-w-\[64rem\] {
min-width: 64rem;
}
.min-w-\[70rem\] { .min-w-\[70rem\] {
min-width: 70rem; min-width: 70rem;
} }
.min-w-\[72rem\] {
min-width: 72rem;
}
.shrink-0 { .shrink-0 {
flex-shrink: 0; flex-shrink: 0;
} }
@ -427,6 +442,9 @@
.gap-2 { .gap-2 {
gap: calc(var(--spacing) * 2); gap: calc(var(--spacing) * 2);
} }
.gap-2\.5 {
gap: calc(var(--spacing) * 2.5);
}
.gap-3 { .gap-3 {
gap: calc(var(--spacing) * 3); gap: calc(var(--spacing) * 3);
} }
@ -450,13 +468,6 @@
margin-block-end: calc(calc(var(--spacing) * 4) * calc(1 - var(--tw-space-y-reverse))); margin-block-end: calc(calc(var(--spacing) * 4) * calc(1 - var(--tw-space-y-reverse)));
} }
} }
.space-y-5 {
:where(& > :not(:last-child)) {
--tw-space-y-reverse: 0;
margin-block-start: calc(calc(var(--spacing) * 5) * var(--tw-space-y-reverse));
margin-block-end: calc(calc(var(--spacing) * 5) * calc(1 - var(--tw-space-y-reverse)));
}
}
.space-y-6 { .space-y-6 {
:where(& > :not(:last-child)) { :where(& > :not(:last-child)) {
--tw-space-y-reverse: 0; --tw-space-y-reverse: 0;
@ -552,6 +563,9 @@
.bg-slate-200 { .bg-slate-200 {
background-color: var(--color-slate-200); background-color: var(--color-slate-200);
} }
.bg-slate-300 {
background-color: var(--color-slate-300);
}
.bg-slate-800 { .bg-slate-800 {
background-color: var(--color-slate-800); background-color: var(--color-slate-800);
} }
@ -622,6 +636,9 @@
--tw-gradient-to: transparent; --tw-gradient-to: transparent;
--tw-gradient-stops: var(--tw-gradient-via-stops, var(--tw-gradient-position), var(--tw-gradient-from) var(--tw-gradient-from-position), var(--tw-gradient-to) var(--tw-gradient-to-position)); --tw-gradient-stops: var(--tw-gradient-via-stops, var(--tw-gradient-position), var(--tw-gradient-from) var(--tw-gradient-from-position), var(--tw-gradient-to) var(--tw-gradient-to-position));
} }
.p-0 {
padding: calc(var(--spacing) * 0);
}
.p-0\.5 { .p-0\.5 {
padding: calc(var(--spacing) * 0.5); padding: calc(var(--spacing) * 0.5);
} }
@ -649,9 +666,6 @@
.px-4 { .px-4 {
padding-inline: calc(var(--spacing) * 4); padding-inline: calc(var(--spacing) * 4);
} }
.px-6 {
padding-inline: calc(var(--spacing) * 6);
}
.py-0\.5 { .py-0\.5 {
padding-block: calc(var(--spacing) * 0.5); padding-block: calc(var(--spacing) * 0.5);
} }
@ -673,6 +687,9 @@
.py-4 { .py-4 {
padding-block: calc(var(--spacing) * 4); padding-block: calc(var(--spacing) * 4);
} }
.py-6 {
padding-block: calc(var(--spacing) * 6);
}
.py-8 { .py-8 {
padding-block: calc(var(--spacing) * 8); padding-block: calc(var(--spacing) * 8);
} }
@ -682,9 +699,24 @@
.pt-6 { .pt-6 {
padding-top: calc(var(--spacing) * 6); padding-top: calc(var(--spacing) * 6);
} }
.pr-1 {
padding-right: calc(var(--spacing) * 1);
}
.pr-2 {
padding-right: calc(var(--spacing) * 2);
}
.pr-5 {
padding-right: calc(var(--spacing) * 5);
}
.pr-6 { .pr-6 {
padding-right: calc(var(--spacing) * 6); padding-right: calc(var(--spacing) * 6);
} }
.pl-2 {
padding-left: calc(var(--spacing) * 2);
}
.pl-3 {
padding-left: calc(var(--spacing) * 3);
}
.pl-4 { .pl-4 {
padding-left: calc(var(--spacing) * 4); padding-left: calc(var(--spacing) * 4);
} }
@ -820,6 +852,12 @@
.text-white { .text-white {
color: var(--color-white); color: var(--color-white);
} }
.text-white\/80 {
color: color-mix(in srgb, #fff 80%, transparent);
@supports (color: color-mix(in lab, red, red)) {
color: color-mix(in oklab, var(--color-white) 80%, transparent);
}
}
.uppercase { .uppercase {
text-transform: uppercase; text-transform: uppercase;
} }
@ -879,6 +917,11 @@
color: var(--color-slate-400); color: var(--color-slate-400);
} }
} }
.first\:pl-3 {
&:first-child {
padding-left: calc(var(--spacing) * 3);
}
}
.first\:pl-4 { .first\:pl-4 {
&:first-child { &:first-child {
padding-left: calc(var(--spacing) * 4); padding-left: calc(var(--spacing) * 4);
@ -1036,14 +1079,19 @@
justify-content: space-between; justify-content: space-between;
} }
} }
.sm\:px-5 { .sm\:px-4 {
@media (width >= 40rem) { @media (width >= 40rem) {
padding-inline: calc(var(--spacing) * 5); padding-inline: calc(var(--spacing) * 4);
} }
} }
.sm\:px-6 { .sm\:pl-2\.5 {
@media (width >= 40rem) { @media (width >= 40rem) {
padding-inline: calc(var(--spacing) * 6); padding-left: calc(var(--spacing) * 2.5);
}
}
.sm\:pl-3 {
@media (width >= 40rem) {
padding-left: calc(var(--spacing) * 3);
} }
} }
.sm\:pl-4 { .sm\:pl-4 {
@ -1051,15 +1099,10 @@
padding-left: calc(var(--spacing) * 4); padding-left: calc(var(--spacing) * 4);
} }
} }
.sm\:pl-6 { .sm\:first\:pl-4 {
@media (width >= 40rem) {
padding-left: calc(var(--spacing) * 6);
}
}
.sm\:first\:pl-6 {
@media (width >= 40rem) { @media (width >= 40rem) {
&:first-child { &:first-child {
padding-left: calc(var(--spacing) * 6); padding-left: calc(var(--spacing) * 4);
} }
} }
} }
@ -1088,19 +1131,19 @@
grid-template-columns: repeat(3, minmax(0, 1fr)); grid-template-columns: repeat(3, minmax(0, 1fr));
} }
} }
.lg\:grid-cols-\[18rem_minmax\(0\,1fr\)\] { .lg\:grid-cols-\[14rem_minmax\(0\,1fr\)\] {
@media (width >= 64rem) { @media (width >= 64rem) {
grid-template-columns: 18rem minmax(0,1fr); grid-template-columns: 14rem minmax(0,1fr);
} }
} }
.lg\:px-6 { .lg\:px-5 {
@media (width >= 64rem) { @media (width >= 64rem) {
padding-inline: calc(var(--spacing) * 6); padding-inline: calc(var(--spacing) * 5);
} }
} }
.lg\:py-5 { .lg\:py-4 {
@media (width >= 64rem) { @media (width >= 64rem) {
padding-block: calc(var(--spacing) * 5); padding-block: calc(var(--spacing) * 4);
} }
} }
.xl\:grid-cols-4 { .xl\:grid-cols-4 {
@ -1119,6 +1162,12 @@
} }
} }
} }
@layer base {
::view-transition-group(*) {
animation-duration: 180ms;
animation-timing-function: ease;
}
}
@property --tw-translate-x { @property --tw-translate-x {
syntax: "*"; syntax: "*";
inherits: false; inherits: false;

View file

@ -1,2 +1,9 @@
@import "tailwindcss" source("../"); @import "tailwindcss" source("../");
@source inline("bg-amber-500 translate-x-5"); @source inline("bg-amber-500 translate-x-5");
@layer base {
::view-transition-group(*) {
animation-duration: 180ms;
animation-timing-function: ease;
}
}

View file

@ -276,6 +276,22 @@ def create_app(*, dev_mode: bool = False) -> Quart:
trigger_refresh(app) trigger_refresh(app)
return Response(status=204) return Response(status=204)
@app.post("/actions/queued-executions/<int:execution_id>/cancel")
async def cancel_queued_execution_action(execution_id: int) -> Response:
get_job_runtime(app).cancel_queued_execution(execution_id)
trigger_refresh(app)
return Response(status=204)
@app.post("/actions/queued-executions/<int:execution_id>/move-up")
async def move_queued_execution_up_action(execution_id: int) -> Response:
get_job_runtime(app).move_queued_execution(execution_id, direction="up")
return Response(status=204)
@app.post("/actions/queued-executions/<int:execution_id>/move-down")
async def move_queued_execution_down_action(execution_id: int) -> Response:
get_job_runtime(app).move_queued_execution(execution_id, direction="down")
return Response(status=204)
@app.post("/job/<int:job_id>/execution/<int:execution_id>/logs") @app.post("/job/<int:job_id>/execution/<int:execution_id>/logs")
async def logs_patch(job_id: int, execution_id: int) -> DatastarResponse: async def logs_patch(job_id: int, execution_id: int) -> DatastarResponse:
async def render() -> Renderable: async def render() -> Renderable:
@ -305,7 +321,7 @@ def get_job_runtime(app: Quart) -> JobRuntime:
if runtime is None: if runtime is None:
runtime = JobRuntime( runtime = JobRuntime(
log_dir=app.config["REPUB_LOG_DIR"], log_dir=app.config["REPUB_LOG_DIR"],
refresh_callback=lambda: trigger_refresh(app), refresh_callback=lambda event="refresh-event": trigger_refresh(app, event),
) )
app.extensions[JOB_RUNTIME_KEY] = runtime app.extensions[JOB_RUNTIME_KEY] = runtime
return runtime return runtime
@ -370,6 +386,7 @@ async def render_runs(app: Quart | None = None) -> Renderable:
view = load_runs_view(log_dir=app.config["REPUB_LOG_DIR"]) view = load_runs_view(log_dir=app.config["REPUB_LOG_DIR"])
return runs_page( return runs_page(
running_executions=cast(tuple[dict[str, object], ...], view["running"]), running_executions=cast(tuple[dict[str, object], ...], view["running"]),
queued_executions=cast(tuple[dict[str, object], ...], view["queued"]),
upcoming_jobs=cast(tuple[dict[str, object], ...], view["upcoming"]), upcoming_jobs=cast(tuple[dict[str, object], ...], view["upcoming"]),
completed_executions=cast(tuple[dict[str, object], ...], view["completed"]), completed_executions=cast(tuple[dict[str, object], ...], view["completed"]),
source_count=len(load_sources()), source_count=len(load_sources()),

View file

@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
from datetime import UTC, datetime from datetime import UTC, datetime, timedelta
from pathlib import Path from pathlib import Path
from repub.jobs import load_runs_view from repub.jobs import load_runs_view
@ -83,3 +83,174 @@ def test_load_runs_view_humanizes_running_execution_summary_bytes(
) )
assert view["running"][0]["stats"] == "14 requests • 11 items • 1.5 KiB" assert view["running"][0]["stats"] == "14 requests • 11 items • 1.5 KiB"
def test_load_runs_view_projects_queued_executions_in_fifo_order(
tmp_path: Path,
) -> None:
initialize_database(tmp_path / "jobs-queued.db")
first_source = create_source(
name="First queued source",
slug="first-queued-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/first.xml",
)
second_source = create_source(
name="Second queued source",
slug="second-queued-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/second.xml",
)
first_job = Job.get(Job.source == first_source)
second_job = Job.get(Job.source == second_source)
reference_time = datetime(2026, 3, 30, 12, 30, tzinfo=UTC)
first_created_at = reference_time - timedelta(minutes=7)
second_created_at = reference_time - timedelta(minutes=3)
first_execution = JobExecution.create(
job=first_job,
created_at=first_created_at,
running_status=JobExecutionStatus.PENDING,
)
second_execution = JobExecution.create(
job=second_job,
created_at=second_created_at,
running_status=JobExecutionStatus.PENDING,
)
view = load_runs_view(
log_dir=tmp_path / "out" / "logs",
now=reference_time,
)
assert tuple(row["execution_id"] for row in view["queued"]) == (
int(first_execution.get_id()),
int(second_execution.get_id()),
)
assert tuple(row["queue_position"] for row in view["queued"]) == (1, 2)
assert tuple(row["queued_at"] for row in view["queued"]) == (
"7 minutes ago",
"3 minutes ago",
)
assert view["queued"][0]["move_up_disabled"] is True
assert (
view["queued"][0]["move_down_post_path"]
== f"/actions/queued-executions/{int(first_execution.get_id())}/move-down"
)
assert (
view["queued"][1]["move_up_post_path"]
== f"/actions/queued-executions/{int(second_execution.get_id())}/move-up"
)
assert view["queued"][1]["move_down_disabled"] is True
def test_load_runs_view_keeps_queued_jobs_in_scheduled_jobs(
tmp_path: Path,
) -> None:
initialize_database(tmp_path / "jobs-queue-separation.db")
queued_source = create_source(
name="Queued source",
slug="queued-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/queued.xml",
)
scheduled_source = create_source(
name="Scheduled source",
slug="scheduled-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/scheduled.xml",
)
queued_job = Job.get(Job.source == queued_source)
Job.get(Job.source == scheduled_source)
JobExecution.create(
job=queued_job,
running_status=JobExecutionStatus.PENDING,
)
view = load_runs_view(
log_dir=tmp_path / "out" / "logs",
now=datetime(2026, 3, 30, 12, 30, tzinfo=UTC),
)
assert tuple(row["slug"] for row in view["queued"]) == ("queued-source",)
assert tuple(row["slug"] for row in view["upcoming"]) == (
"queued-source",
"scheduled-source",
)
assert view["upcoming"][0]["run_reason"] == "Queued"
assert view["upcoming"][0]["run_disabled"] is True
assert view["upcoming"][1]["run_reason"] == "Ready"
assert view["upcoming"][1]["run_disabled"] is False
def test_load_runs_view_running_row_targets_queued_follow_up_cancel(
tmp_path: Path,
) -> None:
initialize_database(tmp_path / "jobs-running-cancel.db")
source = create_source(
name="Running source",
slug="running-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/running.xml",
)
job = Job.get(Job.source == source)
JobExecution.create(
job=job,
started_at=datetime(2026, 3, 30, 12, 0, tzinfo=UTC),
running_status=JobExecutionStatus.RUNNING,
)
pending_execution = JobExecution.create(
job=job,
created_at=datetime(2026, 3, 30, 12, 5, tzinfo=UTC),
running_status=JobExecutionStatus.PENDING,
)
view = load_runs_view(
log_dir=tmp_path / "out" / "logs",
now=datetime(2026, 3, 30, 12, 30, tzinfo=UTC),
)
running_row = view["running"][0]
assert running_row["cancel_label"] == "Cancel"
assert running_row["cancel_post_path"] == (
f"/actions/queued-executions/{int(pending_execution.get_id())}/cancel"
)

View file

@ -169,6 +169,40 @@ def test_initialize_database_creates_scheduler_and_execution_indexes(
connection.close() connection.close()
def test_initialize_database_creates_run_queue_indexes(tmp_path: Path) -> None:
db_path = tmp_path / "queue-indexes.db"
initialize_database(db_path)
connection = sqlite3.connect(db_path)
try:
indexes = {
row[0]: row[1]
for row in connection.execute(
"""
SELECT name, sql
FROM sqlite_master
WHERE type = 'index'
AND name IN (
'job_execution_pending_created_at_idx',
'job_execution_pending_unique_job_idx'
)
"""
)
}
assert set(indexes) == {
"job_execution_pending_created_at_idx",
"job_execution_pending_unique_job_idx",
}
assert indexes["job_execution_pending_unique_job_idx"] is not None
assert (
"WHERE running_status = 0"
in indexes["job_execution_pending_unique_job_idx"]
)
finally:
connection.close()
def test_job_table_allows_exactly_one_job_per_source(tmp_path: Path) -> None: def test_job_table_allows_exactly_one_job_per_source(tmp_path: Path) -> None:
initialize_database(tmp_path / "jobs.db") initialize_database(tmp_path / "jobs.db")

View file

@ -186,13 +186,24 @@ def test_job_runtime_respects_max_concurrent_jobs_setting(tmp_path: Path) -> Non
second_execution_id = runtime.run_job_now(second_job.id, reason="manual") second_execution_id = runtime.run_job_now(second_job.id, reason="manual")
assert second_execution_id is None assert second_execution_id is not None
second_execution = _wait_for_execution_status(
second_execution_id,
JobExecutionStatus.PENDING,
)
assert ( assert (
JobExecution.select() JobExecution.select()
.where(JobExecution.running_status == JobExecutionStatus.RUNNING) .where(JobExecution.running_status == JobExecutionStatus.RUNNING)
.count() .count()
== 1 == 1
) )
assert second_execution.started_at is None
assert (
JobExecution.select()
.where(JobExecution.running_status == JobExecutionStatus.PENDING)
.count()
== 1
)
runtime.request_execution_cancel(first_execution_id) runtime.request_execution_cancel(first_execution_id)
finished_execution = _wait_for_terminal_execution(first_execution_id) finished_execution = _wait_for_terminal_execution(first_execution_id)
assert finished_execution.running_status == JobExecutionStatus.CANCELED assert finished_execution.running_status == JobExecutionStatus.CANCELED
@ -200,6 +211,332 @@ def test_job_runtime_respects_max_concurrent_jobs_setting(tmp_path: Path) -> Non
runtime.shutdown() runtime.shutdown()
def test_job_runtime_starts_queued_execution_after_capacity_opens(
tmp_path: Path,
) -> None:
db_path = tmp_path / "drain-queue.db"
log_dir = tmp_path / "out" / "logs"
initialize_database(db_path)
save_setting("max_concurrent_jobs", 1)
with _slow_feed_server() as feed_url:
first_source = create_source(
name="First source",
slug="first-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url=feed_url,
)
second_source = create_source(
name="Second source",
slug="second-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url=FIXTURE_FEED_PATH.as_uri(),
)
first_job = Job.get(Job.source == first_source)
second_job = Job.get(Job.source == second_source)
runtime = JobRuntime(log_dir=log_dir)
try:
runtime.start()
first_execution_id = runtime.run_job_now(first_job.id, reason="manual")
assert first_execution_id is not None
_wait_for_running_execution(first_execution_id)
second_execution_id = runtime.run_job_now(second_job.id, reason="manual")
assert second_execution_id is not None
_wait_for_execution_status(second_execution_id, JobExecutionStatus.PENDING)
runtime.request_execution_cancel(first_execution_id)
finished_execution = _wait_for_terminal_execution(first_execution_id)
assert finished_execution.running_status == JobExecutionStatus.CANCELED
_wait_for_running_execution(second_execution_id)
drained_execution = _wait_for_terminal_execution(second_execution_id)
assert drained_execution.running_status == JobExecutionStatus.SUCCEEDED
assert drained_execution.started_at is not None
finally:
runtime.shutdown()
def test_job_runtime_deduplicates_manual_queue_requests(tmp_path: Path) -> None:
db_path = tmp_path / "queue-dedup.db"
log_dir = tmp_path / "out" / "logs"
initialize_database(db_path)
save_setting("max_concurrent_jobs", 1)
with _slow_feed_server() as feed_url:
blocking_source = create_source(
name="Blocking source",
slug="blocking-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url=feed_url,
)
queued_source = create_source(
name="Queued source",
slug="queued-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/queued.xml",
)
blocking_job = Job.get(Job.source == blocking_source)
queued_job = Job.get(Job.source == queued_source)
runtime = JobRuntime(log_dir=log_dir)
try:
runtime.start()
blocking_execution_id = runtime.run_job_now(
blocking_job.id, reason="manual"
)
assert blocking_execution_id is not None
_wait_for_running_execution(blocking_execution_id)
first_pending_id = runtime.run_job_now(queued_job.id, reason="manual")
second_pending_id = runtime.run_job_now(queued_job.id, reason="manual")
assert first_pending_id is not None
assert second_pending_id == first_pending_id
assert (
JobExecution.select()
.where(
(JobExecution.job == queued_job)
& (JobExecution.running_status == JobExecutionStatus.PENDING)
)
.count()
== 1
)
finally:
runtime.shutdown()
def test_job_runtime_allows_one_running_and_one_pending_per_job(
tmp_path: Path,
) -> None:
db_path = tmp_path / "running-plus-pending.db"
log_dir = tmp_path / "out" / "logs"
initialize_database(db_path)
save_setting("max_concurrent_jobs", 1)
with _slow_feed_server() as feed_url:
source = create_source(
name="Busy source",
slug="busy-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url=feed_url,
)
job = Job.get(Job.source == source)
runtime = JobRuntime(log_dir=log_dir)
try:
runtime.start()
running_execution_id = runtime.run_job_now(job.id, reason="manual")
assert running_execution_id is not None
_wait_for_running_execution(running_execution_id)
pending_execution_id = runtime.run_job_now(job.id, reason="manual")
duplicate_pending_id = runtime.run_job_now(job.id, reason="manual")
runtime.run_scheduled_job(job.id)
assert pending_execution_id is not None
assert duplicate_pending_id == pending_execution_id
assert (
JobExecution.select()
.where(JobExecution.job == job)
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
.count()
== 1
)
assert (
JobExecution.select()
.where(JobExecution.job == job)
.where(JobExecution.running_status == JobExecutionStatus.PENDING)
.count()
== 1
)
finally:
runtime.shutdown()
def test_job_runtime_start_drains_pending_rows_created_before_start(
tmp_path: Path,
) -> None:
db_path = tmp_path / "startup-drain.db"
log_dir = tmp_path / "out" / "logs"
initialize_database(db_path)
source = create_source(
name="Queued source",
slug="queued-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url=FIXTURE_FEED_PATH.as_uri(),
)
job = Job.get(Job.source == source)
pending_execution = JobExecution.create(
job=job,
running_status=JobExecutionStatus.PENDING,
)
runtime = JobRuntime(log_dir=log_dir)
try:
runtime.start()
_wait_for_running_execution(int(pending_execution.get_id()))
drained_execution = _wait_for_terminal_execution(
int(pending_execution.get_id())
)
assert drained_execution.running_status == JobExecutionStatus.SUCCEEDED
assert drained_execution.started_at is not None
finally:
runtime.shutdown()
def test_job_runtime_scheduled_runs_use_the_persistent_queue(
tmp_path: Path,
) -> None:
db_path = tmp_path / "scheduled-queue.db"
log_dir = tmp_path / "out" / "logs"
initialize_database(db_path)
save_setting("max_concurrent_jobs", 1)
with _slow_feed_server() as feed_url:
first_source = create_source(
name="First scheduled source",
slug="first-scheduled-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url=feed_url,
)
second_source = create_source(
name="Second scheduled source",
slug="second-scheduled-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/second-scheduled.xml",
)
first_job = Job.get(Job.source == first_source)
second_job = Job.get(Job.source == second_source)
runtime = JobRuntime(log_dir=log_dir)
try:
runtime.start()
runtime.run_scheduled_job(first_job.id)
first_execution = JobExecution.get(JobExecution.job == first_job)
_wait_for_running_execution(int(first_execution.get_id()))
runtime.run_scheduled_job(second_job.id)
second_execution = JobExecution.get(JobExecution.job == second_job)
assert second_execution.running_status == JobExecutionStatus.PENDING
assert second_execution.started_at is None
finally:
runtime.shutdown()
def test_job_runtime_cancel_pending_follow_up_keeps_running_worker_alive(
tmp_path: Path,
) -> None:
db_path = tmp_path / "cancel-pending.db"
log_dir = tmp_path / "out" / "logs"
initialize_database(db_path)
save_setting("max_concurrent_jobs", 1)
with _slow_feed_server() as feed_url:
source = create_source(
name="Cancelable queued source",
slug="cancelable-queued-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url=feed_url,
)
job = Job.get(Job.source == source)
runtime = JobRuntime(log_dir=log_dir)
try:
runtime.start()
running_execution_id = runtime.run_job_now(job.id, reason="manual")
assert running_execution_id is not None
_wait_for_running_execution(running_execution_id)
pending_execution_id = runtime.run_job_now(job.id, reason="manual")
assert pending_execution_id is not None
_wait_for_execution_status(pending_execution_id, JobExecutionStatus.PENDING)
assert runtime.cancel_queued_execution(pending_execution_id) is True
assert JobExecution.get_or_none(id=pending_execution_id) is None
assert (
JobExecution.get_by_id(running_execution_id).running_status
== JobExecutionStatus.RUNNING
)
finally:
runtime.shutdown()
def test_job_runtime_cancel_marks_execution_canceled(tmp_path: Path) -> None: def test_job_runtime_cancel_marks_execution_canceled(tmp_path: Path) -> None:
initialize_database(tmp_path / "cancel.db") initialize_database(tmp_path / "cancel.db")
with _slow_feed_server() as feed_url: with _slow_feed_server() as feed_url:
@ -291,6 +628,40 @@ def test_job_runtime_start_reconciles_stale_running_execution(tmp_path: Path) ->
runtime.shutdown() runtime.shutdown()
def test_job_runtime_publishes_refresh_while_jobs_are_running(tmp_path: Path) -> None:
initialize_database(tmp_path / "runtime-refresh.db")
source = create_source(
name="Running source",
slug="running-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/running.xml",
)
job = Job.get(Job.source == source)
JobExecution.create(
job=job,
started_at=datetime(2026, 3, 30, 12, 0, tzinfo=UTC),
running_status=JobExecutionStatus.RUNNING,
)
events: list[object] = []
runtime = JobRuntime(
log_dir=tmp_path / "out" / "logs",
refresh_callback=events.append,
)
runtime._last_runtime_refresh_at = time.monotonic() - 2.0
runtime.poll_workers()
assert "refresh-event" in events
def test_job_runtime_start_reattaches_live_worker_after_app_restart( def test_job_runtime_start_reattaches_live_worker_after_app_restart(
tmp_path: Path, tmp_path: Path,
) -> None: ) -> None:
@ -570,8 +941,8 @@ def test_render_runs_uses_database_backed_jobs_and_executions(
body = str(await render_runs(app)) body = str(await render_runs(app))
assert "runs-page-source" in body assert "runs-page-source" in body
assert "Running job executions" in body assert "Running jobs" in body
assert "Upcoming jobs" in body assert "Scheduled jobs" in body
assert "Completed job executions" in body assert "Completed job executions" in body
assert f"/job/{job.id}/execution/{execution.get_id()}/logs" in body assert f"/job/{job.id}/execution/{execution.get_id()}/logs" in body
assert "Succeeded" in body assert "Succeeded" in body
@ -719,6 +1090,21 @@ def _wait_for_running_execution(
raise AssertionError(f"execution {execution_id} never entered RUNNING state") raise AssertionError(f"execution {execution_id} never entered RUNNING state")
def _wait_for_execution_status(
execution_id: int,
status: JobExecutionStatus,
*,
timeout_seconds: float = 2.0,
) -> JobExecution:
deadline = time.monotonic() + timeout_seconds
while time.monotonic() < deadline:
execution = JobExecution.get_by_id(execution_id)
if execution.running_status == status:
return execution
time.sleep(0.02)
raise AssertionError(f"execution {execution_id} never entered {status.name}")
def _wait_for_terminal_execution( def _wait_for_terminal_execution(
execution_id: int, *, timeout_seconds: float = 4.0 execution_id: int, *, timeout_seconds: float = 4.0
) -> JobExecution: ) -> JobExecution:

View file

@ -7,7 +7,7 @@ from datetime import UTC, datetime, timedelta
from pathlib import Path from pathlib import Path
from typing import Any, cast from typing import Any, cast
from repub.components import status_badge, toggle_field from repub.components import action_button, status_badge, toggle_field
from repub.datastar import RefreshBroker, render_sse_event, render_stream from repub.datastar import RefreshBroker, render_sse_event, render_stream
from repub.jobs import load_dashboard_view from repub.jobs import load_dashboard_view
from repub.model import ( from repub.model import (
@ -61,6 +61,52 @@ def test_toggle_field_active_state_utilities_exist_in_built_css() -> None:
assert ".translate-x-5" in css assert ".translate-x-5" in css
def test_action_button_adds_cursor_pointer_for_active_buttons() -> None:
markup = str(action_button(label="Run now"))
assert "cursor-pointer" in markup
assert 'type="button"' in markup
def test_action_button_omits_post_handler_when_disabled() -> None:
markup = str(
action_button(
label="Queued",
disabled=True,
post_path="/actions/jobs/7/run-now",
)
)
assert "cursor-not-allowed" in markup
assert "@post(" not in markup
def test_action_button_supports_submit_variant() -> None:
markup = str(
action_button(
label="Save settings",
tone="dark",
button_type="submit",
)
)
assert 'type="submit"' in markup
assert "bg-slate-950" in markup
assert "cursor-pointer" in markup
def test_action_button_supports_datastar_pointerdown_post() -> None:
markup = str(
action_button(
label="Delete",
tone="danger",
post_path="/actions/jobs/7/delete",
)
)
assert 'data-on:pointerdown="@post(&#39;/actions/jobs/7/delete&#39;)"' in markup
def test_runs_page_renders_completed_execution_end_time_as_relative_hoverable_time() -> ( def test_runs_page_renders_completed_execution_end_time_as_relative_hoverable_time() -> (
None None
): ):
@ -92,6 +138,39 @@ def test_runs_page_renders_completed_execution_end_time_as_relative_hoverable_ti
assert ">2 hours ago<" in body assert ">2 hours ago<" in body
def test_runs_page_renders_combined_running_jobs_table() -> None:
body = str(
runs_page(
queued_executions=(
{
"source": "Queued source",
"slug": "queued-source",
"job_id": 7,
"execution_id": 42,
"queued_at": "2 minutes ago",
"queued_at_iso": "2026-03-30T12:28:00+00:00",
"queue_position": 1,
"status": "Queued",
"status_tone": "idle",
"run_label": "Queued",
"run_disabled": True,
"run_post_path": "/actions/jobs/7/run-now",
"cancel_post_path": "/actions/queued-executions/42/cancel",
"move_up_disabled": True,
"move_up_post_path": None,
"move_down_disabled": True,
"move_down_post_path": None,
},
)
)
)
assert "Running jobs" in body
assert "queued-source" in body
assert ">Queued<" in body
assert "/actions/queued-executions/42/cancel" in body
def test_root_get_serves_datastar_shim() -> None: def test_root_get_serves_datastar_shim() -> None:
async def run() -> None: async def run() -> None:
client = create_app().test_client() client = create_app().test_client()
@ -111,6 +190,8 @@ def test_root_get_serves_datastar_shim() -> None:
assert "retryMaxCount: Infinity" in body assert "retryMaxCount: Infinity" in body
assert "data-on:online__window=" in body assert "data-on:online__window=" in body
assert '<main id="morph"' in body assert '<main id="morph"' in body
assert "lg:grid-cols-[14rem_minmax(0,1fr)]" in body
assert "lg:px-5 lg:py-4" in body
assert 'href="/sources"' in body assert 'href="/sources"' in body
assert 'href="/runs"' in body assert 'href="/runs"' in body
assert 'href="/settings"' in body assert 'href="/settings"' in body
@ -219,6 +300,23 @@ def test_render_stream_yields_on_connect_and_refresh() -> None:
asyncio.run(run()) asyncio.run(run())
def test_render_stream_uses_view_transition_for_queue_reorders() -> None:
async def run() -> None:
queue = RefreshBroker().subscribe()
async def render() -> str:
return '<main id="morph">queue</main>'
stream = render_stream(queue, render, render_on_connect=False)
await queue.put("queue-reordered")
event = await anext(stream)
await stream.aclose()
assert "useViewTransition true" in str(event)
asyncio.run(run())
def test_render_dashboard_shows_dashboard_information_architecture( def test_render_dashboard_shows_dashboard_information_architecture(
monkeypatch, tmp_path: Path monkeypatch, tmp_path: Path
) -> None: ) -> None:
@ -235,6 +333,8 @@ def test_render_dashboard_shows_dashboard_information_architecture(
assert 'href="/sources"' in body assert 'href="/sources"' in body
assert 'href="/runs"' in body assert 'href="/runs"' in body
assert "Create source" in body assert "Create source" in body
assert "lg:grid-cols-[14rem_minmax(0,1fr)]" in body
assert "lg:px-5 lg:py-4" in body
asyncio.run(run()) asyncio.run(run())
@ -665,6 +765,8 @@ def test_render_settings_shows_current_max_concurrent_jobs(
assert "/actions/settings" in body assert "/actions/settings" in body
assert 'value="3"' in body assert 'value="3"' in body
assert "Max concurrent jobs" in body assert "Max concurrent jobs" in body
assert 'type="submit"' in body
assert "cursor-pointer" in body
asyncio.run(run()) asyncio.run(run())
@ -1037,7 +1139,7 @@ def test_settings_action_rejects_non_positive_max_concurrent_jobs(
asyncio.run(run()) asyncio.run(run())
def test_render_runs_shows_running_upcoming_and_completed_tables( def test_render_runs_shows_running_scheduled_and_completed_tables(
monkeypatch, tmp_path: Path monkeypatch, tmp_path: Path
) -> None: ) -> None:
db_path = tmp_path / "runs-render.db" db_path = tmp_path / "runs-render.db"
@ -1068,14 +1170,30 @@ def test_render_runs_shows_running_upcoming_and_completed_tables(
body = str(await render_runs(app)) body = str(await render_runs(app))
assert "Running job executions" in body assert "Running jobs" in body
assert "Upcoming jobs" in body assert "Scheduled jobs" in body
assert "Completed job executions" in body assert "Completed job executions" in body
assert "runs-render-source" in body assert "runs-render-source" in body
assert f"/job/{job.id}/execution/{execution.get_id()}/logs" in body assert f"/job/{job.id}/execution/{execution.get_id()}/logs" in body
assert "data-next-run-at" in body assert "data-next-run-at" in body
assert "in " in body assert "in " in body
assert "Already running" not in body
asyncio.run(run())
def test_render_runs_uses_compact_shell_and_table_classes(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "runs-compact.db"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
async def run() -> None:
app = create_app()
body = str(await render_runs(app))
assert "lg:grid-cols-[14rem_minmax(0,1fr)]" in body
assert "lg:px-5 lg:py-4" in body
assert "min-w-[64rem]" in body
asyncio.run(run()) asyncio.run(run())
@ -1088,13 +1206,393 @@ def test_render_runs_shows_empty_state_rows(monkeypatch, tmp_path: Path) -> None
app = create_app() app = create_app()
body = str(await render_runs(app)) body = str(await render_runs(app))
assert body.count("No job executions are running.") == 1 assert body.count("No jobs are running or queued.") == 1
assert "No jobs are scheduled." in body assert "No jobs are scheduled." in body
assert "No job executions have completed yet." in body assert "No job executions have completed yet." in body
asyncio.run(run()) asyncio.run(run())
def test_render_runs_keeps_queued_execution_in_scheduled_jobs_table(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "runs-queued-render.db"
log_dir = tmp_path / "out" / "logs"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
app = create_app()
app.config["REPUB_LOG_DIR"] = log_dir
queued_source = create_source(
name="Queued source",
slug="queued-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/queued.xml",
)
create_source(
name="Scheduled source",
slug="scheduled-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/scheduled.xml",
)
queued_job = Job.get(Job.source == queued_source)
queued_execution = JobExecution.create(
job=queued_job,
running_status=JobExecutionStatus.PENDING,
)
async def run() -> None:
body = str(await render_runs(app))
assert "Running jobs" in body
assert "Scheduled jobs" in body
assert "queued-source" in body
assert "scheduled-source" in body
assert ">Queued<" in body
assert (
f"/actions/queued-executions/{int(queued_execution.get_id())}/cancel"
in body
)
assert "Ready" in body
asyncio.run(run())
def test_render_runs_shows_cancel_button_for_running_row_with_queued_follow_up(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "runs-cancel-follow-up.db"
log_dir = tmp_path / "out" / "logs"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
app = create_app()
app.config["REPUB_LOG_DIR"] = log_dir
source = create_source(
name="Busy source",
slug="busy-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/busy.xml",
)
job = Job.get(Job.source == source)
running_execution = JobExecution.create(
job=job,
started_at=datetime(2026, 3, 30, 12, 0, tzinfo=UTC),
running_status=JobExecutionStatus.RUNNING,
)
pending_execution = JobExecution.create(
job=job,
running_status=JobExecutionStatus.PENDING,
)
async def run() -> None:
body = str(await render_runs(app))
assert f"/job/{job.id}/execution/{int(running_execution.get_id())}/logs" in body
assert (
f"/actions/queued-executions/{int(pending_execution.get_id())}/cancel"
in body
)
assert ">Cancel<" in body
assert "Running jobs" in body
asyncio.run(run())
def test_render_runs_keeps_all_action_controls_visible_in_html_after_compaction() -> (
None
):
body = str(
runs_page(
running_executions=(
{
"source": "Running source",
"slug": "running-source",
"job_id": 1,
"execution_id": 11,
"started_at": "2026-03-30 12:00 UTC",
"runtime": "running for 10s",
"status": "Running",
"stats": "1 requests • 1 items • 1 byte",
"worker": "streaming stats from worker",
"log_href": "/job/1/execution/11/logs",
"cancel_label": "Stop",
"cancel_post_path": "/actions/executions/11/cancel",
},
),
queued_executions=(
{
"source": "Queued source",
"slug": "queued-source",
"job_id": 2,
"execution_id": 22,
"queued_at": "2 minutes ago",
"queued_at_iso": "2026-03-30T12:28:00+00:00",
"queue_position": 1,
"status": "Queued",
"status_tone": "idle",
"run_label": "Queued",
"run_disabled": True,
"run_post_path": "/actions/jobs/2/run-now",
"cancel_post_path": "/actions/queued-executions/22/cancel",
"move_up_disabled": True,
"move_up_post_path": None,
"move_down_disabled": True,
"move_down_post_path": None,
},
),
upcoming_jobs=(
{
"source": "Scheduled source",
"slug": "scheduled-source",
"job_id": 3,
"next_run": "in 5 minutes",
"next_run_at": "2026-03-30T12:35:00+00:00",
"schedule": "*/5 * * * *",
"enabled_label": "Enabled",
"enabled_tone": "scheduled",
"run_disabled": False,
"run_reason": "Ready",
"toggle_label": "Disable",
"toggle_post_path": "/actions/jobs/3/toggle-enabled",
"run_post_path": "/actions/jobs/3/run-now",
"delete_post_path": "/actions/jobs/3/delete",
},
),
completed_executions=(
{
"source": "Completed source",
"slug": "completed-source",
"job_id": 4,
"execution_id": 44,
"ended_at": "2 minutes ago",
"ended_at_iso": "2026-03-30T12:28:00+00:00",
"status": "Succeeded",
"status_tone": "done",
"stats": "1 requests • 1 items • 1 byte",
"summary": "Worker exited successfully",
"log_href": "/job/4/execution/44/logs",
},
),
)
)
assert "Running jobs" in body
assert ">Stop<" in body
assert ">Cancel<" in body
assert ">Run now<" in body
assert ">Disable<" in body
assert "/job/4/execution/44/logs" in body
def test_cancel_queued_execution_action_deletes_pending_row_without_touching_running_execution(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "cancel-queued-action.db"
log_dir = tmp_path / "out" / "logs"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
async def run() -> None:
app = create_app()
app.config["REPUB_LOG_DIR"] = log_dir
client = app.test_client()
source = create_source(
name="Busy source",
slug="busy-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/busy.xml",
)
job = Job.get(Job.source == source)
running_execution = JobExecution.create(
job=job,
started_at=datetime(2026, 3, 30, 12, 0, tzinfo=UTC),
running_status=JobExecutionStatus.RUNNING,
)
pending_execution = JobExecution.create(
job=job,
running_status=JobExecutionStatus.PENDING,
)
response = await client.post(
f"/actions/queued-executions/{int(pending_execution.get_id())}/cancel"
)
assert response.status_code == 204
assert JobExecution.get_or_none(id=int(pending_execution.get_id())) is None
assert (
JobExecution.get_by_id(int(running_execution.get_id())).running_status
== JobExecutionStatus.RUNNING
)
asyncio.run(run())
def test_move_queued_execution_action_reorders_queue(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "move-queued-action.db"
log_dir = tmp_path / "out" / "logs"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
async def run() -> None:
app = create_app()
app.config["REPUB_LOG_DIR"] = log_dir
client = app.test_client()
first_source = create_source(
name="First queued source",
slug="first-queued-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/first.xml",
)
second_source = create_source(
name="Second queued source",
slug="second-queued-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/second.xml",
)
first_job = Job.get(Job.source == first_source)
second_job = Job.get(Job.source == second_source)
first_execution = JobExecution.create(
job=first_job,
created_at=datetime(2026, 3, 30, 12, 0, tzinfo=UTC),
running_status=JobExecutionStatus.PENDING,
)
second_execution = JobExecution.create(
job=second_job,
created_at=datetime(2026, 3, 30, 12, 5, tzinfo=UTC),
running_status=JobExecutionStatus.PENDING,
)
response = await client.post(
f"/actions/queued-executions/{int(second_execution.get_id())}/move-up"
)
assert response.status_code == 204
body = str(await render_runs(app))
assert body.index("second-queued-source") < body.index("first-queued-source")
assert (
f"/actions/queued-executions/{int(second_execution.get_id())}/move-down"
in body
)
assert (
f"/actions/queued-executions/{int(first_execution.get_id())}/move-up"
in body
)
asyncio.run(run())
def test_toggle_job_enabled_action_removes_queued_execution(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "toggle-removes-queue.db"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
async def run() -> None:
app = create_app()
client = app.test_client()
source = create_source(
name="Queued source",
slug="queued-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/queued.xml",
)
job = Job.get(Job.source == source)
queued_execution = JobExecution.create(
job=job,
running_status=JobExecutionStatus.PENDING,
)
response = await client.post(f"/actions/jobs/{job.id}/toggle-enabled")
assert response.status_code == 204
assert Job.get_by_id(job.id).enabled is False
assert JobExecution.get_or_none(id=int(queued_execution.get_id())) is None
body = str(await render_runs(app))
assert (
f"/actions/queued-executions/{int(queued_execution.get_id())}/cancel"
not in body
)
assert "Disabled" in body
asyncio.run(run())
def test_render_create_source_uses_shared_submit_button(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "create-source-shared-submit.db"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
async def run() -> None:
app = create_app()
body = str(await render_create_source(app))
assert 'type="submit"' in body
assert "Create source" in body
assert "cursor-pointer" in body
assert "bg-slate-950" in body
asyncio.run(run())
def test_render_execution_logs_uses_app_route(monkeypatch, tmp_path: Path) -> None: def test_render_execution_logs_uses_app_route(monkeypatch, tmp_path: Path) -> None:
db_path = tmp_path / "logs-render.db" db_path = tmp_path / "logs-render.db"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))