From c834c3c2548942b9a638c2f2eed2fc7b4933be5e Mon Sep 17 00:00:00 2001 From: Abel Luck Date: Tue, 31 Mar 2026 12:12:36 +0200 Subject: [PATCH] Implement tab-scoped runs pagination state --- AGENTS.md | 15 +++-- repub/datastar.py | 96 ++++++++++++++++++++++++-- repub/pages/runs.py | 65 ++++++++++++------ repub/web.py | 145 +++++++++++++++++++++++++++++++--------- tests/test_tab_state.py | 41 ++++++++++++ tests/test_web.py | 144 ++++++++++++++++++++++++++++++++++++++- 6 files changed, 444 insertions(+), 62 deletions(-) create mode 100644 tests/test_tab_state.py diff --git a/AGENTS.md b/AGENTS.md index f56a2d9..7082a72 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -27,7 +27,7 @@ The views are pure functions data in -> html out. By only using data: mode morph and always targeting the main element of the document the API can be massively simplified. This avoids having the explosion of endpoints you get with HTMX and makes reasoning about your app much simpler. - we only have a single render function per page - By having a single render function per page you can simplify the reasoning about your app to view = f(state). You can then reason about your pushed updates as a continuous signal rather than discrete event stream. The benefit of this is you don't have to handle missed events, disconnects and reconnects. When the state changes on the server you push down the latest view, not the delta between views. On the client idiomorph can translate that into fine grained dom updates. + By having a single render function per page you can simplify the reasoning about your app to view = f(state). In immediate-mode terms the server is re-running the whole page render against the latest state, like a game loop, rather than trying to patch the view incrementally by hand. You can then reason about your pushed updates as a continuous signal rather than discrete event stream. The benefit of this is you don't have to handle missed events, disconnects and reconnects. When the state changes on the server you push down the latest view, not the delta between views. On the client idiomorph can translate that into fine grained dom updates. - any database change -> re render all connected users with 200ms throttle @@ -52,11 +52,18 @@ The views are pure functions data in -> html out. Actions should not update the view via patch elements. This is because the changes they make would get overwritten on the next render-fn that pushes a new view down the updates SSE connection. However, they can still be used to update signals as those won't be changed by elements patch. This allows you to do things like validation on the server. - Stateless views -The only way for actions to affect the view returned by the render-fn running in a connection is via the database. The ensures CQRS. This means there is no connection state that needs to be persisted or maintained (so missed events and shutdowns/deploys will not lead to lost state). Even when you are running in a single process there is no way for an action (command) to communicate with/affect a view render (query) without going through the database. +The state passed to a render-fn should be thought of as `{persistent db state, ephemeral tab state}`. The database is the source of truth for durable application state. Ephemeral tab state is server-owned in-memory state keyed by tab id for non-persistent UI concerns like pagination, sort order, expanded panels, wizard step, etc. + + This tab state is not a client signal and not a database row. It exists so that non-persistent actions can still participate in the same immediate-mode render model: an action mutates server-side tab state, then the render-fn re-runs with the new `{db state, tab state}` and sends the latest full page view. + + Tab state must be scoped to a single tab/SSE connection, initialized when the long-lived page stream connects, cleaned up when that stream closes, and periodically reaped for stale entries so memory cannot grow without bound. + + Nothing else should influence a render. Do not smuggle view state through ad hoc globals, connection-local mutable objects, or client-owned signals that the server "trusts". If state should survive reconnects, restarts, or be shared across users, it belongs in the database. If it is purely per-tab and ephemeral, it belongs in tab state. - CQRS - Actions modify the database and return a 204 or a 200 if they patch-signals. - Render functions re-render when the database changes and send an update down the updates SSE connection. + Persistent actions modify the database and return a 204 or a 200 if they patch-signals. + Ephemeral actions modify tab state and return a 204 or a 200 if they patch-signals. + Render functions re-render from the combined `{db state, tab state}` and send an update down the updates SSE connection. - Work sharing (caching) Work sharing is the term I'm using for sharing renders between connected users. This can be useful when a lot of connected users share the same view. For example a leader board, game board, presence indicator etc. It ensures the work (eg: query and html generation) for that view is only done once regardless of the number of connected users. The simplest way to do this is to recalculate and cache values after after a batch has been run. diff --git a/repub/datastar.py b/repub/datastar.py index 8c63b02..19cff02 100644 --- a/repub/datastar.py +++ b/repub/datastar.py @@ -3,6 +3,8 @@ from __future__ import annotations import asyncio import hashlib from collections.abc import AsyncGenerator, Awaitable, Callable +from dataclasses import dataclass, field +from datetime import UTC, datetime, timedelta from typing import Protocol from datastar_py import ServerSentEventGenerator as SSE @@ -15,22 +17,102 @@ class HtmlRenderable(Protocol): RenderResult = str | HtmlRenderable RenderFunction = Callable[[], Awaitable[RenderResult]] +PageState = dict[str, object] +TabState = dict[str, PageState] + + +@dataclass +class _TabSession: + data: TabState = field(default_factory=dict) + created_at: datetime = field(default_factory=lambda: datetime.now(UTC)) + modified_at: datetime = field(default_factory=lambda: datetime.now(UTC)) + connections: int = 0 + + +class TabStateStore: + def __init__(self, *, clean_age_threshold: timedelta = timedelta(hours=24)) -> None: + self._sessions: dict[str, _TabSession] = {} + self.clean_age_threshold = clean_age_threshold + + def connect(self, tab_id: str, *, now: datetime | None = None) -> TabState: + session = self._sessions.get(tab_id) + current_time = _now(now) + if session is None: + session = _TabSession(created_at=current_time, modified_at=current_time) + self._sessions[tab_id] = session + session.connections += 1 + return session.data + + def disconnect(self, tab_id: str) -> None: + session = self._sessions.get(tab_id) + if session is None: + return + session.connections = max(0, session.connections - 1) + if session.connections == 0: + self._sessions.pop(tab_id, None) + + def get_tab_state(self, tab_id: str) -> TabState | None: + session = self._sessions.get(tab_id) + return None if session is None else session.data + + def get_page_state(self, tab_id: str | None, page_key: str) -> PageState: + if tab_id is None: + return {} + session = self._sessions.get(tab_id) + if session is None: + return {} + return session.data.get(page_key, {}) + + def update_page_state( + self, + tab_id: str, + page_key: str, + update: Callable[[PageState], PageState], + *, + now: datetime | None = None, + ) -> PageState: + current_time = _now(now) + session = self._sessions.get(tab_id) + if session is None: + session = _TabSession(created_at=current_time, modified_at=current_time) + self._sessions[tab_id] = session + + page_state = dict(session.data.get(page_key, {})) + session.data[page_key] = update(page_state) + session.modified_at = current_time + return session.data[page_key] + + def cleanup_stale(self, *, now: datetime | None = None) -> set[str]: + current_time = _now(now) + removed: set[str] = set() + for tab_id, session in tuple(self._sessions.items()): + if current_time - session.modified_at < self.clean_age_threshold: + continue + self._sessions.pop(tab_id, None) + removed.add(tab_id) + return removed class RefreshBroker: def __init__(self) -> None: - self._subscribers: dict[asyncio.Queue[object], asyncio.AbstractEventLoop] = {} + self._subscribers: dict[ + asyncio.Queue[object], tuple[asyncio.AbstractEventLoop, str | None] + ] = {} - def subscribe(self) -> asyncio.Queue[object]: + def subscribe(self, *, tab_id: str | None = None) -> asyncio.Queue[object]: queue: asyncio.Queue[object] = asyncio.Queue(maxsize=1) - self._subscribers[queue] = asyncio.get_running_loop() + self._subscribers[queue] = (asyncio.get_running_loop(), tab_id) return queue def unsubscribe(self, queue: asyncio.Queue[object]) -> None: self._subscribers.pop(queue, None) - def publish(self, event: object = "refresh-event") -> None: - for queue, loop in tuple(self._subscribers.items()): + def publish( + self, event: object = "refresh-event", *, tab_id: str | None = None + ) -> None: + for queue, (loop, subscriber_tab_id) in tuple(self._subscribers.items()): + if tab_id is not None and subscriber_tab_id != tab_id: + continue loop.call_soon_threadsafe(_publish_event, queue, event) @@ -96,3 +178,7 @@ def _coerce_html(view: RenderResult) -> str: def _render_hash(html: str) -> str: return hashlib.blake2s(html.encode("utf-8"), digest_size=16).hexdigest() + + +def _now(now: datetime | None) -> datetime: + return now or datetime.now(UTC) diff --git a/repub/pages/runs.py b/repub/pages/runs.py index cecd089..6fd4416 100644 --- a/repub/pages/runs.py +++ b/repub/pages/runs.py @@ -242,8 +242,27 @@ def _completed_row(execution: Mapping[str, object]) -> tuple[Node, ...]: ) -def _completed_page_href(page: int) -> str: - return f"/runs?completed_page={page}" +def _completed_page_action_path(page: int) -> str: + return f"/actions/runs/completed-page/{page}" + + +def _pagination_button( + *, + label: str, + page: int, + current: bool = False, + class_name: str, +) -> Renderable: + attributes = { + "data-on:pointerdown": f"@post('{_completed_page_action_path(page)}')", + } + if current: + attributes["aria-current"] = "page" + return h.button( + attributes, + type="button", + class_=class_name, + )[label] def _completed_history_pagination( @@ -258,8 +277,9 @@ def _completed_history_pagination( start_result = ((completed_page - 1) * completed_page_size) + 1 end_result = min(completed_total_count, completed_page * completed_page_size) - link_class = ( + button_class = ( "relative inline-flex items-center px-4 py-2 text-sm font-semibold text-slate-700 " + "cursor-pointer " "ring-1 ring-inset ring-slate-200 hover:bg-stone-50" ) @@ -267,16 +287,22 @@ def _completed_history_pagination( class_="flex items-center justify-between border-t border-slate-200 bg-white px-4 py-3 sm:px-6" )[ h.div(class_="flex flex-1 justify-between sm:hidden")[ - h.a( - href=_completed_page_href(max(1, completed_page - 1)), - class_="relative inline-flex items-center rounded-xl border border-slate-200 bg-white px-4 py-2 text-sm font-medium text-slate-700 hover:bg-stone-50", - )["Previous"], - h.a( - href=_completed_page_href( - min(completed_total_pages, completed_page + 1) + _pagination_button( + label="Previous", + page=max(1, completed_page - 1), + class_name=( + "relative inline-flex items-center rounded-xl border border-slate-200 " + "bg-white px-4 py-2 text-sm font-medium text-slate-700 hover:bg-stone-50" ), - class_="relative ml-3 inline-flex items-center rounded-xl border border-slate-200 bg-white px-4 py-2 text-sm font-medium text-slate-700 hover:bg-stone-50", - )["Next"], + ), + _pagination_button( + label="Next", + page=min(completed_total_pages, completed_page + 1), + class_name=( + "relative ml-3 inline-flex items-center rounded-xl border border-slate-200 " + "bg-white px-4 py-2 text-sm font-medium text-slate-700 hover:bg-stone-50" + ), + ), ], h.div(class_="hidden sm:flex sm:flex-1 sm:items-center sm:justify-between")[ h.p(class_="text-sm text-slate-600")[ @@ -293,17 +319,16 @@ def _completed_history_pagination( class_="isolate inline-flex -space-x-px rounded-xl shadow-xs", )[ ( - h.a( - href=_completed_page_href(page_number), - aria_current=( - "page" if page_number == completed_page else None - ), - class_=( + _pagination_button( + label=str(page_number), + page=page_number, + current=page_number == completed_page, + class_name=( "relative z-10 inline-flex items-center bg-amber-500 px-4 py-2 text-sm font-semibold text-slate-950" if page_number == completed_page - else link_class + else button_class ), - )[str(page_number)] + ) for page_number in range(1, completed_total_pages + 1) ) ], diff --git a/repub/web.py b/repub/web.py index 5f0c065..032cb2e 100644 --- a/repub/web.py +++ b/repub/web.py @@ -3,6 +3,8 @@ from __future__ import annotations import asyncio import hashlib from collections.abc import AsyncGenerator, Awaitable, Callable +from contextlib import suppress +from datetime import timedelta from pathlib import Path from typing import TypedDict, cast from urllib.parse import urlparse @@ -13,16 +15,9 @@ from datastar_py.quart import DatastarResponse, read_signals from datastar_py.sse import DatastarEvent from htpy import Renderable from peewee import IntegrityError -from quart import ( - Quart, - Response, - has_request_context, - request, - send_from_directory, - url_for, -) +from quart import Quart, Response, request, send_from_directory, url_for -from repub.datastar import RefreshBroker, render_stream +from repub.datastar import RefreshBroker, TabStateStore, render_stream from repub.jobs import ( COMPLETED_EXECUTION_PAGE_SIZE, JobRuntime, @@ -58,10 +53,14 @@ from repub.pages.sources import PANGEA_CONTENT_FORMATS, PANGEA_CONTENT_TYPES REFRESH_BROKER_KEY = "repub.refresh_broker" JOB_RUNTIME_KEY = "repub.job_runtime" +TAB_STATE_STORE_KEY = "repub.tab_state_store" +TAB_STATE_CLEANER_TASK_KEY = "repub.tab_state_cleaner_task" DEFAULT_LOG_DIR = Path("out/logs") DEFAULT_FEEDS_DIR = Path("out/feeds") +RUNS_TAB_STATE_KEY = "runs" +TAB_STATE_CLEAN_INTERVAL = timedelta(seconds=10) -RenderFunction = Callable[[], Awaitable[Renderable]] +PatchRenderFunction = Callable[[str | None], Awaitable[Renderable]] class SourceFormData(TypedDict): @@ -144,6 +143,8 @@ def create_app(*, dev_mode: bool = False) -> Quart: app.config["REPUB_DEV_MODE"] = dev_mode app.extensions[REFRESH_BROKER_KEY] = RefreshBroker() app.extensions[JOB_RUNTIME_KEY] = None + app.extensions[TAB_STATE_STORE_KEY] = TabStateStore() + app.extensions[TAB_STATE_CLEANER_TASK_KEY] = None @app.get("/feeds/") async def published_feed(feed_path: str) -> Response: @@ -202,23 +203,27 @@ def create_app(*, dev_mode: bool = False) -> Quart: @app.post("/") async def dashboard_patch() -> DatastarResponse: - return _page_patch_response(app, lambda: render_dashboard(app)) + return await _page_patch_response(app, lambda _tab_id: render_dashboard(app)) @app.post("/sources") async def sources_patch() -> DatastarResponse: - return _page_patch_response(app, lambda: render_sources(app)) + return await _page_patch_response(app, lambda _tab_id: render_sources(app)) @app.post("/sources/create") async def create_source_patch() -> DatastarResponse: - return _page_patch_response(app, lambda: render_create_source(app)) + return await _page_patch_response( + app, lambda _tab_id: render_create_source(app) + ) @app.post("/sources//edit") async def edit_source_patch(slug: str) -> DatastarResponse: - return _page_patch_response(app, lambda: render_edit_source(slug, app)) + return await _page_patch_response( + app, lambda _tab_id: render_edit_source(slug, app) + ) @app.post("/settings") async def settings_patch() -> DatastarResponse: - return _page_patch_response(app, lambda: render_settings(app)) + return await _page_patch_response(app, lambda _tab_id: render_settings(app)) @app.post("/actions/sources/create") async def create_source_action() -> DatastarResponse: @@ -293,7 +298,24 @@ def create_app(*, dev_mode: bool = False) -> Quart: @app.post("/runs") async def runs_patch() -> DatastarResponse: - return _page_patch_response(app, lambda: render_runs(app)) + return await _page_patch_response( + app, + lambda tab_id: render_runs(app, tab_id=tab_id), + ) + + @app.post("/actions/runs/completed-page/") + async def set_completed_runs_page_action(page: int) -> Response: + signals = await _read_optional_signals() + tab_id = _read_tab_id(signals) + if tab_id is None: + return Response(status=400) + get_tab_state_store(app).update_page_state( + tab_id, + RUNS_TAB_STATE_KEY, + lambda state: {**state, "completed_page": max(1, page)}, + ) + trigger_refresh(app, tab_id=tab_id) + return Response(status=204) @app.post("/actions/jobs//run-now") async def run_job_now_action(job_id: int) -> Response: @@ -351,14 +373,24 @@ def create_app(*, dev_mode: bool = False) -> Quart: app, job_id=job_id, execution_id=execution_id ) - return _page_patch_response(app, render) + return await _page_patch_response(app, lambda _tab_id: render()) @app.before_serving async def start_runtime() -> None: get_job_runtime(app).start() + app.extensions[TAB_STATE_CLEANER_TASK_KEY] = asyncio.create_task( + _clean_tab_state_periodically(app) + ) @app.after_serving async def stop_runtime() -> None: + cleaner = cast( + asyncio.Task[None] | None, app.extensions.get(TAB_STATE_CLEANER_TASK_KEY) + ) + if cleaner is not None: + cleaner.cancel() + with suppress(asyncio.CancelledError): + await cleaner get_job_runtime(app).shutdown() return app @@ -368,6 +400,10 @@ def get_refresh_broker(app: Quart) -> RefreshBroker: return cast(RefreshBroker, app.extensions[REFRESH_BROKER_KEY]) +def get_tab_state_store(app: Quart) -> TabStateStore: + return cast(TabStateStore, app.extensions[TAB_STATE_STORE_KEY]) + + def get_job_runtime(app: Quart) -> JobRuntime: runtime = cast(JobRuntime | None, app.extensions.get(JOB_RUNTIME_KEY)) if runtime is None: @@ -379,8 +415,10 @@ def get_job_runtime(app: Quart) -> JobRuntime: return runtime -def trigger_refresh(app: Quart, event: object = "refresh-event") -> None: - get_refresh_broker(app).publish(event) +def trigger_refresh( + app: Quart, event: object = "refresh-event", *, tab_id: str | None = None +) -> None: + get_refresh_broker(app).publish(event, tab_id=tab_id) async def render_dashboard(app: Quart | None = None) -> Renderable: @@ -431,18 +469,17 @@ async def render_edit_source(slug: str, app: Quart | None = None) -> Renderable: ) -async def render_runs(app: Quart | None = None) -> Renderable: +async def render_runs( + app: Quart | None = None, *, tab_id: str | None = None +) -> Renderable: if app is None: return runs_page() - completed_page = ( - max(1, request.args.get("completed_page", 1, type=int) or 1) - if has_request_context() - else 1 - ) + tab_state = get_tab_state_store(app).get_page_state(tab_id, RUNS_TAB_STATE_KEY) + resolved_completed_page = max(1, _read_int(tab_state.get("completed_page"), 1)) view = load_runs_view( log_dir=app.config["REPUB_LOG_DIR"], - completed_page=completed_page, + completed_page=resolved_completed_page, completed_page_size=COMPLETED_EXECUTION_PAGE_SIZE, ) return runs_page( @@ -494,24 +531,36 @@ async def render_execution_logs( ) -def _page_patch_response(app: Quart, render: RenderFunction) -> DatastarResponse: - queue = get_refresh_broker(app).subscribe() +async def _page_patch_response( + app: Quart, render: PatchRenderFunction +) -> DatastarResponse: + signals = await _read_optional_signals() + tab_id = _read_tab_id(signals) + if tab_id is not None: + get_tab_state_store(app).connect(tab_id) + queue = get_refresh_broker(app).subscribe(tab_id=tab_id) stream = render_stream( queue, - render=render, + render=lambda: render(tab_id), last_event_id=request.headers.get("last-event-id"), ) - return DatastarResponse(_unsubscribe_on_close(queue, stream, app)) + return DatastarResponse(_unsubscribe_on_close(queue, stream, app, tab_id=tab_id)) async def _unsubscribe_on_close( - queue: object, stream: AsyncGenerator[DatastarEvent, None], app: Quart + queue: object, + stream: AsyncGenerator[DatastarEvent, None], + app: Quart, + *, + tab_id: str | None, ) -> AsyncGenerator[DatastarEvent, None]: try: async for event in stream: yield event finally: get_refresh_broker(app).unsubscribe(cast(asyncio.Queue[object], queue)) + if tab_id is not None: + get_tab_state_store(app).disconnect(tab_id) def _load_sidebar_counts(app: Quart) -> dict[str, int]: @@ -523,6 +572,29 @@ def _load_sidebar_counts(app: Quart) -> dict[str, int]: } +async def _clean_tab_state_periodically(app: Quart) -> None: + while True: + await asyncio.sleep(TAB_STATE_CLEAN_INTERVAL.total_seconds()) + get_tab_state_store(app).cleanup_stale() + + +async def _read_optional_signals() -> dict[str, object] | None: + content_type = request.headers.get("Content-Type", "") + if request.content_length in (None, 0) and "application/json" not in content_type: + return None + try: + return cast(dict[str, object] | None, await read_signals()) + except Exception: + return None + + +def _read_tab_id(signals: dict[str, object] | None) -> str | None: + if signals is None: + return None + tab_id = _read_string(signals, "tabid") + return tab_id or None + + def validate_source_form( signals: dict[str, object] | None, *, @@ -669,6 +741,17 @@ def _parse_int(value: str) -> int | None: return None +def _read_int(value: object, default: int) -> int: + if isinstance(value, bool): + return int(value) + if isinstance(value, int): + return value + if isinstance(value, str): + parsed = _parse_int(value) + return default if parsed is None else parsed + return default + + def _is_valid_url(value: str) -> bool: parsed = urlparse(value) return parsed.scheme in {"http", "https"} and parsed.netloc != "" diff --git a/tests/test_tab_state.py b/tests/test_tab_state.py new file mode 100644 index 0000000..0499e2c --- /dev/null +++ b/tests/test_tab_state.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from datetime import UTC, datetime, timedelta + +from repub.datastar import TabStateStore + + +def test_tab_state_store_tracks_page_state_per_tab_and_connection_count() -> None: + store = TabStateStore() + + store.connect("tab-1") + store.connect("tab-1") + store.update_page_state( + "tab-1", + "runs", + lambda state: {**state, "completed_page": 2}, + ) + + assert store.get_tab_state("tab-1") == {"runs": {"completed_page": 2}} + + store.disconnect("tab-1") + + assert store.get_tab_state("tab-1") == {"runs": {"completed_page": 2}} + + store.disconnect("tab-1") + + assert store.get_tab_state("tab-1") is None + + +def test_tab_state_store_cleans_only_stale_disconnected_tabs() -> None: + store = TabStateStore(clean_age_threshold=timedelta(hours=24)) + now = datetime(2026, 3, 31, 12, 0, tzinfo=UTC) + + store.connect("stale-tab", now=now - timedelta(days=2)) + store.connect("fresh-tab", now=now) + + removed = store.cleanup_stale(now=now) + + assert removed == {"stale-tab"} + assert store.get_tab_state("stale-tab") is None + assert store.get_tab_state("fresh-tab") == {} diff --git a/tests/test_web.py b/tests/test_web.py index c309f48..dd144c2 100644 --- a/tests/test_web.py +++ b/tests/test_web.py @@ -1,12 +1,15 @@ from __future__ import annotations import asyncio +import json import os import re from datetime import UTC, datetime, timedelta from pathlib import Path from typing import Any, cast +import pytest + from repub.components import action_button, status_badge, toggle_field from repub.datastar import RefreshBroker, render_sse_event, render_stream from repub.jobs import load_dashboard_view @@ -26,6 +29,7 @@ from repub.pages.sources import sources_page from repub.web import ( create_app, get_refresh_broker, + get_tab_state_store, render_create_source, render_dashboard, render_edit_source, @@ -228,8 +232,8 @@ def test_runs_page_renders_clear_completed_button_and_pagination() -> None: assert ">Clear history<" in body assert "Showing" in body assert "21" in body - assert 'href="/runs?completed_page=1"' in body - assert 'href="/runs?completed_page=2"' in body + assert "@post('/actions/runs/completed-page/1')" in body + assert "@post('/actions/runs/completed-page/2')" in body assert 'aria-current="page"' in body @@ -1314,6 +1318,142 @@ def test_render_runs_shows_empty_state_rows(monkeypatch, tmp_path: Path) -> None asyncio.run(run()) +def test_runs_pagination_action_updates_only_the_current_tab( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "runs-tab-pagination.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + + async def run() -> None: + app = create_app() + client = app.test_client() + + source = create_source( + name="Paged runs source", + slug="paged-runs-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="*/30", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/paged-runs.xml", + ) + job = Job.get(Job.source == source) + + for minute in range(21): + JobExecution.create( + job=job, + ended_at=datetime(2026, 3, 30, 12, minute, tzinfo=UTC), + running_status=JobExecutionStatus.SUCCEEDED, + ) + + async with client.request( + "/runs?u=shim", + method="POST", + headers={ + "Datastar-Request": "true", + "Content-Type": "application/json", + }, + ) as first_connection: + async with client.request( + "/runs?u=shim", + method="POST", + headers={ + "Datastar-Request": "true", + "Content-Type": "application/json", + }, + ) as second_connection: + await first_connection.send(json.dumps({"tabid": "tab-1"}).encode()) + await second_connection.send(json.dumps({"tabid": "tab-2"}).encode()) + await first_connection.send_complete() + await second_connection.send_complete() + + first_body = ( + await asyncio.wait_for(first_connection.receive(), timeout=1) + ).decode() + second_body = ( + await asyncio.wait_for(second_connection.receive(), timeout=1) + ).decode() + + assert ( + 'href="/runs?completed_page=1" aria-current="page"' + not in first_body + ) + assert ( + 'Showing 1 to ' + '20 of ' + '21 results' + ) in first_body + assert ( + 'Showing 1 to ' + '20 of ' + '21 results' + ) in second_body + + response = await client.post( + "/actions/runs/completed-page/2", + headers={"Datastar-Request": "true"}, + json={"tabid": "tab-1"}, + ) + + assert response.status_code == 204 + + updated_first_body = ( + await asyncio.wait_for(first_connection.receive(), timeout=1) + ).decode() + + assert ( + 'Showing 21 to ' + '21 of ' + '21 results' + ) in updated_first_body + assert 'aria-current="page"' in updated_first_body + + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(second_connection.receive(), timeout=0.2) + + await second_connection.disconnect() + await first_connection.disconnect() + + asyncio.run(run()) + + +def test_runs_patch_creates_and_cleans_up_tab_state( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "runs-tab-state.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + + async def run() -> None: + app = create_app() + client = app.test_client() + + async with client.request( + "/runs?u=shim", + method="POST", + headers={ + "Datastar-Request": "true", + "Content-Type": "application/json", + }, + ) as connection: + await connection.send(json.dumps({"tabid": "tab-1"}).encode()) + await connection.send_complete() + await asyncio.wait_for(connection.receive(), timeout=1) + + assert get_tab_state_store(app).get_tab_state("tab-1") == {} + + await connection.disconnect() + await asyncio.sleep(0) + + assert get_tab_state_store(app).get_tab_state("tab-1") is None + + asyncio.run(run()) + + def test_render_runs_keeps_queued_execution_in_scheduled_jobs_table( monkeypatch, tmp_path: Path ) -> None: