from __future__ import annotations import asyncio from typing import Any, cast from repub.datastar import RefreshBroker, render_sse_event, render_stream from repub.web import ( create_app, get_refresh_broker, render_create_source, render_dashboard, render_execution_logs, render_runs, render_sources, ) def test_root_get_serves_datastar_shim() -> None: async def run() -> None: client = create_app().test_client() response = await client.get("/") body = await response.get_data(as_text=True) assert response.status_code == 200 assert response.headers["ETag"] assert body.startswith("") assert ( '' in body ) assert 'data-signals:tabid="self.crypto.randomUUID().substring(0,8)"' in body assert 'data-init="@post(window.location.pathname +' in body assert "retryMaxCount: Infinity" in body assert "data-on:online__window=" in body assert '
' in body asyncio.run(run()) def test_root_get_honors_if_none_match() -> None: async def run() -> None: client = create_app().test_client() initial = await client.get("/") etag = initial.headers["ETag"] response = await client.get("/", headers={"If-None-Match": etag}) assert response.status_code == 304 assert response.headers["ETag"] == etag asyncio.run(run()) def test_dashboard_post_serves_morph_component() -> None: async def run() -> None: client = create_app().test_client() async with client.request("/?u=shim", method="POST") as connection: await connection.send_complete() chunk = await asyncio.wait_for(connection.receive(), timeout=1) raw_connection = cast(Any, connection) assert raw_connection.status_code == 200 assert raw_connection.headers["Content-Type"] == "text/event-stream" assert b"event: datastar-patch-elements" in chunk assert b"id: " in chunk assert b'
None: async def run() -> None: async def render() -> str: return '
same
' event_id, event = await render_sse_event(render) repeated_id, repeated_event = await render_sse_event( render, last_event_id=event_id ) assert repeated_id == event_id assert event is not None assert repeated_event is None asyncio.run(run()) def test_app_refresh_broker_publishes_events() -> None: async def run() -> None: app = create_app() broker = get_refresh_broker(app) queue = broker.subscribe() broker.publish() event = await asyncio.wait_for(queue.get(), timeout=1) assert event == "refresh-event" broker.unsubscribe(queue) asyncio.run(run()) def test_render_stream_yields_on_connect_and_refresh() -> None: async def run() -> None: queue = RefreshBroker().subscribe() renders = 0 async def render() -> str: nonlocal renders renders += 1 return f'
{renders}
' stream = render_stream(queue, render) first = await anext(stream) await queue.put("refresh-event") second = await anext(stream) await stream.aclose() assert "1
" in first assert "2" in second asyncio.run(run()) def test_render_dashboard_shows_dashboard_information_architecture() -> None: async def run() -> None: body = str(await render_dashboard()) assert "Operational snapshot" in body assert "Running executions" in body assert 'href="/sources"' in body assert 'href="/runs"' in body assert "/job/7/execution/104/logs" in body assert "Create source" in body asyncio.run(run()) def test_render_sources_shows_table_and_create_link() -> None: async def run() -> None: body = str(await render_sources()) assert "Configured feed and Pangea sources live here as tables" in body assert ">Sources<" in body assert 'href="/sources/create"' in body assert "guardian-feed" in body assert "podcast-audio" in body asyncio.run(run()) def test_render_create_source_shows_dedicated_form_page() -> None: async def run() -> None: body = str(await render_create_source()) assert "Dedicated create page for the source form" in body assert "Source and job setup" in body assert "data-signals__ifmissing" in body assert 'data-show="$sourceType === 'feed'"' in body assert 'data-show="$sourceType === 'pangea'"' in body assert "jobEnabled" in body assert "onlyNewest" in body assert "includeAuthors" in body assert "excludeMedia" in body assert "Pangea domain" in body assert "Feed URL" in body assert "Cron schedule" in body assert "Initial job state" in body asyncio.run(run()) def test_render_runs_shows_running_upcoming_and_completed_tables() -> None: async def run() -> None: body = str(await render_runs()) assert "Running job executions" in body assert "Upcoming jobs" in body assert "Completed job executions" in body assert "Delete confirmation" in body assert "/job/11/execution/101/logs" in body assert "Already running" in body asyncio.run(run()) def test_render_execution_logs_uses_app_route() -> None: async def run() -> None: body = str(await render_execution_logs(job_id=7, execution_id=104)) assert "Job 7 / execution 104" in body assert "/job/7/execution/104/logs" in body assert "Streaming text log view" in body assert "waiting for more log lines" in body asyncio.run(run())