Implement tab-scoped runs pagination state

This commit is contained in:
Abel Luck 2026-03-31 12:12:36 +02:00
parent dce67ea9e3
commit c834c3c254
6 changed files with 444 additions and 62 deletions

View file

@ -3,6 +3,8 @@ from __future__ import annotations
import asyncio
import hashlib
from collections.abc import AsyncGenerator, Awaitable, Callable
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from typing import Protocol
from datastar_py import ServerSentEventGenerator as SSE
@ -15,22 +17,102 @@ class HtmlRenderable(Protocol):
RenderResult = str | HtmlRenderable
RenderFunction = Callable[[], Awaitable[RenderResult]]
PageState = dict[str, object]
TabState = dict[str, PageState]
@dataclass
class _TabSession:
data: TabState = field(default_factory=dict)
created_at: datetime = field(default_factory=lambda: datetime.now(UTC))
modified_at: datetime = field(default_factory=lambda: datetime.now(UTC))
connections: int = 0
class TabStateStore:
def __init__(self, *, clean_age_threshold: timedelta = timedelta(hours=24)) -> None:
self._sessions: dict[str, _TabSession] = {}
self.clean_age_threshold = clean_age_threshold
def connect(self, tab_id: str, *, now: datetime | None = None) -> TabState:
session = self._sessions.get(tab_id)
current_time = _now(now)
if session is None:
session = _TabSession(created_at=current_time, modified_at=current_time)
self._sessions[tab_id] = session
session.connections += 1
return session.data
def disconnect(self, tab_id: str) -> None:
session = self._sessions.get(tab_id)
if session is None:
return
session.connections = max(0, session.connections - 1)
if session.connections == 0:
self._sessions.pop(tab_id, None)
def get_tab_state(self, tab_id: str) -> TabState | None:
session = self._sessions.get(tab_id)
return None if session is None else session.data
def get_page_state(self, tab_id: str | None, page_key: str) -> PageState:
if tab_id is None:
return {}
session = self._sessions.get(tab_id)
if session is None:
return {}
return session.data.get(page_key, {})
def update_page_state(
self,
tab_id: str,
page_key: str,
update: Callable[[PageState], PageState],
*,
now: datetime | None = None,
) -> PageState:
current_time = _now(now)
session = self._sessions.get(tab_id)
if session is None:
session = _TabSession(created_at=current_time, modified_at=current_time)
self._sessions[tab_id] = session
page_state = dict(session.data.get(page_key, {}))
session.data[page_key] = update(page_state)
session.modified_at = current_time
return session.data[page_key]
def cleanup_stale(self, *, now: datetime | None = None) -> set[str]:
current_time = _now(now)
removed: set[str] = set()
for tab_id, session in tuple(self._sessions.items()):
if current_time - session.modified_at < self.clean_age_threshold:
continue
self._sessions.pop(tab_id, None)
removed.add(tab_id)
return removed
class RefreshBroker:
def __init__(self) -> None:
self._subscribers: dict[asyncio.Queue[object], asyncio.AbstractEventLoop] = {}
self._subscribers: dict[
asyncio.Queue[object], tuple[asyncio.AbstractEventLoop, str | None]
] = {}
def subscribe(self) -> asyncio.Queue[object]:
def subscribe(self, *, tab_id: str | None = None) -> asyncio.Queue[object]:
queue: asyncio.Queue[object] = asyncio.Queue(maxsize=1)
self._subscribers[queue] = asyncio.get_running_loop()
self._subscribers[queue] = (asyncio.get_running_loop(), tab_id)
return queue
def unsubscribe(self, queue: asyncio.Queue[object]) -> None:
self._subscribers.pop(queue, None)
def publish(self, event: object = "refresh-event") -> None:
for queue, loop in tuple(self._subscribers.items()):
def publish(
self, event: object = "refresh-event", *, tab_id: str | None = None
) -> None:
for queue, (loop, subscriber_tab_id) in tuple(self._subscribers.items()):
if tab_id is not None and subscriber_tab_id != tab_id:
continue
loop.call_soon_threadsafe(_publish_event, queue, event)
@ -96,3 +178,7 @@ def _coerce_html(view: RenderResult) -> str:
def _render_hash(html: str) -> str:
return hashlib.blake2s(html.encode("utf-8"), digest_size=16).hexdigest()
def _now(now: datetime | None) -> datetime:
return now or datetime.now(UTC)

View file

@ -242,8 +242,27 @@ def _completed_row(execution: Mapping[str, object]) -> tuple[Node, ...]:
)
def _completed_page_href(page: int) -> str:
return f"/runs?completed_page={page}"
def _completed_page_action_path(page: int) -> str:
return f"/actions/runs/completed-page/{page}"
def _pagination_button(
*,
label: str,
page: int,
current: bool = False,
class_name: str,
) -> Renderable:
attributes = {
"data-on:pointerdown": f"@post('{_completed_page_action_path(page)}')",
}
if current:
attributes["aria-current"] = "page"
return h.button(
attributes,
type="button",
class_=class_name,
)[label]
def _completed_history_pagination(
@ -258,8 +277,9 @@ def _completed_history_pagination(
start_result = ((completed_page - 1) * completed_page_size) + 1
end_result = min(completed_total_count, completed_page * completed_page_size)
link_class = (
button_class = (
"relative inline-flex items-center px-4 py-2 text-sm font-semibold text-slate-700 "
"cursor-pointer "
"ring-1 ring-inset ring-slate-200 hover:bg-stone-50"
)
@ -267,16 +287,22 @@ def _completed_history_pagination(
class_="flex items-center justify-between border-t border-slate-200 bg-white px-4 py-3 sm:px-6"
)[
h.div(class_="flex flex-1 justify-between sm:hidden")[
h.a(
href=_completed_page_href(max(1, completed_page - 1)),
class_="relative inline-flex items-center rounded-xl border border-slate-200 bg-white px-4 py-2 text-sm font-medium text-slate-700 hover:bg-stone-50",
)["Previous"],
h.a(
href=_completed_page_href(
min(completed_total_pages, completed_page + 1)
_pagination_button(
label="Previous",
page=max(1, completed_page - 1),
class_name=(
"relative inline-flex items-center rounded-xl border border-slate-200 "
"bg-white px-4 py-2 text-sm font-medium text-slate-700 hover:bg-stone-50"
),
class_="relative ml-3 inline-flex items-center rounded-xl border border-slate-200 bg-white px-4 py-2 text-sm font-medium text-slate-700 hover:bg-stone-50",
)["Next"],
),
_pagination_button(
label="Next",
page=min(completed_total_pages, completed_page + 1),
class_name=(
"relative ml-3 inline-flex items-center rounded-xl border border-slate-200 "
"bg-white px-4 py-2 text-sm font-medium text-slate-700 hover:bg-stone-50"
),
),
],
h.div(class_="hidden sm:flex sm:flex-1 sm:items-center sm:justify-between")[
h.p(class_="text-sm text-slate-600")[
@ -293,17 +319,16 @@ def _completed_history_pagination(
class_="isolate inline-flex -space-x-px rounded-xl shadow-xs",
)[
(
h.a(
href=_completed_page_href(page_number),
aria_current=(
"page" if page_number == completed_page else None
),
class_=(
_pagination_button(
label=str(page_number),
page=page_number,
current=page_number == completed_page,
class_name=(
"relative z-10 inline-flex items-center bg-amber-500 px-4 py-2 text-sm font-semibold text-slate-950"
if page_number == completed_page
else link_class
else button_class
),
)[str(page_number)]
)
for page_number in range(1, completed_total_pages + 1)
)
],

View file

@ -3,6 +3,8 @@ from __future__ import annotations
import asyncio
import hashlib
from collections.abc import AsyncGenerator, Awaitable, Callable
from contextlib import suppress
from datetime import timedelta
from pathlib import Path
from typing import TypedDict, cast
from urllib.parse import urlparse
@ -13,16 +15,9 @@ from datastar_py.quart import DatastarResponse, read_signals
from datastar_py.sse import DatastarEvent
from htpy import Renderable
from peewee import IntegrityError
from quart import (
Quart,
Response,
has_request_context,
request,
send_from_directory,
url_for,
)
from quart import Quart, Response, request, send_from_directory, url_for
from repub.datastar import RefreshBroker, render_stream
from repub.datastar import RefreshBroker, TabStateStore, render_stream
from repub.jobs import (
COMPLETED_EXECUTION_PAGE_SIZE,
JobRuntime,
@ -58,10 +53,14 @@ from repub.pages.sources import PANGEA_CONTENT_FORMATS, PANGEA_CONTENT_TYPES
REFRESH_BROKER_KEY = "repub.refresh_broker"
JOB_RUNTIME_KEY = "repub.job_runtime"
TAB_STATE_STORE_KEY = "repub.tab_state_store"
TAB_STATE_CLEANER_TASK_KEY = "repub.tab_state_cleaner_task"
DEFAULT_LOG_DIR = Path("out/logs")
DEFAULT_FEEDS_DIR = Path("out/feeds")
RUNS_TAB_STATE_KEY = "runs"
TAB_STATE_CLEAN_INTERVAL = timedelta(seconds=10)
RenderFunction = Callable[[], Awaitable[Renderable]]
PatchRenderFunction = Callable[[str | None], Awaitable[Renderable]]
class SourceFormData(TypedDict):
@ -144,6 +143,8 @@ def create_app(*, dev_mode: bool = False) -> Quart:
app.config["REPUB_DEV_MODE"] = dev_mode
app.extensions[REFRESH_BROKER_KEY] = RefreshBroker()
app.extensions[JOB_RUNTIME_KEY] = None
app.extensions[TAB_STATE_STORE_KEY] = TabStateStore()
app.extensions[TAB_STATE_CLEANER_TASK_KEY] = None
@app.get("/feeds/<path:feed_path>")
async def published_feed(feed_path: str) -> Response:
@ -202,23 +203,27 @@ def create_app(*, dev_mode: bool = False) -> Quart:
@app.post("/")
async def dashboard_patch() -> DatastarResponse:
return _page_patch_response(app, lambda: render_dashboard(app))
return await _page_patch_response(app, lambda _tab_id: render_dashboard(app))
@app.post("/sources")
async def sources_patch() -> DatastarResponse:
return _page_patch_response(app, lambda: render_sources(app))
return await _page_patch_response(app, lambda _tab_id: render_sources(app))
@app.post("/sources/create")
async def create_source_patch() -> DatastarResponse:
return _page_patch_response(app, lambda: render_create_source(app))
return await _page_patch_response(
app, lambda _tab_id: render_create_source(app)
)
@app.post("/sources/<string:slug>/edit")
async def edit_source_patch(slug: str) -> DatastarResponse:
return _page_patch_response(app, lambda: render_edit_source(slug, app))
return await _page_patch_response(
app, lambda _tab_id: render_edit_source(slug, app)
)
@app.post("/settings")
async def settings_patch() -> DatastarResponse:
return _page_patch_response(app, lambda: render_settings(app))
return await _page_patch_response(app, lambda _tab_id: render_settings(app))
@app.post("/actions/sources/create")
async def create_source_action() -> DatastarResponse:
@ -293,7 +298,24 @@ def create_app(*, dev_mode: bool = False) -> Quart:
@app.post("/runs")
async def runs_patch() -> DatastarResponse:
return _page_patch_response(app, lambda: render_runs(app))
return await _page_patch_response(
app,
lambda tab_id: render_runs(app, tab_id=tab_id),
)
@app.post("/actions/runs/completed-page/<int:page>")
async def set_completed_runs_page_action(page: int) -> Response:
signals = await _read_optional_signals()
tab_id = _read_tab_id(signals)
if tab_id is None:
return Response(status=400)
get_tab_state_store(app).update_page_state(
tab_id,
RUNS_TAB_STATE_KEY,
lambda state: {**state, "completed_page": max(1, page)},
)
trigger_refresh(app, tab_id=tab_id)
return Response(status=204)
@app.post("/actions/jobs/<int:job_id>/run-now")
async def run_job_now_action(job_id: int) -> Response:
@ -351,14 +373,24 @@ def create_app(*, dev_mode: bool = False) -> Quart:
app, job_id=job_id, execution_id=execution_id
)
return _page_patch_response(app, render)
return await _page_patch_response(app, lambda _tab_id: render())
@app.before_serving
async def start_runtime() -> None:
get_job_runtime(app).start()
app.extensions[TAB_STATE_CLEANER_TASK_KEY] = asyncio.create_task(
_clean_tab_state_periodically(app)
)
@app.after_serving
async def stop_runtime() -> None:
cleaner = cast(
asyncio.Task[None] | None, app.extensions.get(TAB_STATE_CLEANER_TASK_KEY)
)
if cleaner is not None:
cleaner.cancel()
with suppress(asyncio.CancelledError):
await cleaner
get_job_runtime(app).shutdown()
return app
@ -368,6 +400,10 @@ def get_refresh_broker(app: Quart) -> RefreshBroker:
return cast(RefreshBroker, app.extensions[REFRESH_BROKER_KEY])
def get_tab_state_store(app: Quart) -> TabStateStore:
return cast(TabStateStore, app.extensions[TAB_STATE_STORE_KEY])
def get_job_runtime(app: Quart) -> JobRuntime:
runtime = cast(JobRuntime | None, app.extensions.get(JOB_RUNTIME_KEY))
if runtime is None:
@ -379,8 +415,10 @@ def get_job_runtime(app: Quart) -> JobRuntime:
return runtime
def trigger_refresh(app: Quart, event: object = "refresh-event") -> None:
get_refresh_broker(app).publish(event)
def trigger_refresh(
app: Quart, event: object = "refresh-event", *, tab_id: str | None = None
) -> None:
get_refresh_broker(app).publish(event, tab_id=tab_id)
async def render_dashboard(app: Quart | None = None) -> Renderable:
@ -431,18 +469,17 @@ async def render_edit_source(slug: str, app: Quart | None = None) -> Renderable:
)
async def render_runs(app: Quart | None = None) -> Renderable:
async def render_runs(
app: Quart | None = None, *, tab_id: str | None = None
) -> Renderable:
if app is None:
return runs_page()
completed_page = (
max(1, request.args.get("completed_page", 1, type=int) or 1)
if has_request_context()
else 1
)
tab_state = get_tab_state_store(app).get_page_state(tab_id, RUNS_TAB_STATE_KEY)
resolved_completed_page = max(1, _read_int(tab_state.get("completed_page"), 1))
view = load_runs_view(
log_dir=app.config["REPUB_LOG_DIR"],
completed_page=completed_page,
completed_page=resolved_completed_page,
completed_page_size=COMPLETED_EXECUTION_PAGE_SIZE,
)
return runs_page(
@ -494,24 +531,36 @@ async def render_execution_logs(
)
def _page_patch_response(app: Quart, render: RenderFunction) -> DatastarResponse:
queue = get_refresh_broker(app).subscribe()
async def _page_patch_response(
app: Quart, render: PatchRenderFunction
) -> DatastarResponse:
signals = await _read_optional_signals()
tab_id = _read_tab_id(signals)
if tab_id is not None:
get_tab_state_store(app).connect(tab_id)
queue = get_refresh_broker(app).subscribe(tab_id=tab_id)
stream = render_stream(
queue,
render=render,
render=lambda: render(tab_id),
last_event_id=request.headers.get("last-event-id"),
)
return DatastarResponse(_unsubscribe_on_close(queue, stream, app))
return DatastarResponse(_unsubscribe_on_close(queue, stream, app, tab_id=tab_id))
async def _unsubscribe_on_close(
queue: object, stream: AsyncGenerator[DatastarEvent, None], app: Quart
queue: object,
stream: AsyncGenerator[DatastarEvent, None],
app: Quart,
*,
tab_id: str | None,
) -> AsyncGenerator[DatastarEvent, None]:
try:
async for event in stream:
yield event
finally:
get_refresh_broker(app).unsubscribe(cast(asyncio.Queue[object], queue))
if tab_id is not None:
get_tab_state_store(app).disconnect(tab_id)
def _load_sidebar_counts(app: Quart) -> dict[str, int]:
@ -523,6 +572,29 @@ def _load_sidebar_counts(app: Quart) -> dict[str, int]:
}
async def _clean_tab_state_periodically(app: Quart) -> None:
while True:
await asyncio.sleep(TAB_STATE_CLEAN_INTERVAL.total_seconds())
get_tab_state_store(app).cleanup_stale()
async def _read_optional_signals() -> dict[str, object] | None:
content_type = request.headers.get("Content-Type", "")
if request.content_length in (None, 0) and "application/json" not in content_type:
return None
try:
return cast(dict[str, object] | None, await read_signals())
except Exception:
return None
def _read_tab_id(signals: dict[str, object] | None) -> str | None:
if signals is None:
return None
tab_id = _read_string(signals, "tabid")
return tab_id or None
def validate_source_form(
signals: dict[str, object] | None,
*,
@ -669,6 +741,17 @@ def _parse_int(value: str) -> int | None:
return None
def _read_int(value: object, default: int) -> int:
if isinstance(value, bool):
return int(value)
if isinstance(value, int):
return value
if isinstance(value, str):
parsed = _parse_int(value)
return default if parsed is None else parsed
return default
def _is_valid_url(value: str) -> bool:
parsed = urlparse(value)
return parsed.scheme in {"http", "https"} and parsed.netloc != ""