from __future__ import annotations import asyncio from typing import Any, cast from repub.datastar import RefreshBroker, render_sse_event, render_stream from repub.web import ( create_app, get_active_jobs, get_refresh_broker, render_dashboard, set_active_jobs, ) def test_root_get_serves_datastar_shim() -> None: async def run() -> None: client = create_app(enable_demo_refresh=False).test_client() response = await client.get("/") body = await response.get_data(as_text=True) assert response.status_code == 200 assert response.headers["ETag"] assert body.startswith("") assert ( '' in body ) assert 'data-signals:tabid="self.crypto.randomUUID().substring(0,8)"' in body assert 'data-init="@post(window.location.pathname +' in body assert "retryMaxCount: Infinity" in body assert "data-on:online__window=" in body assert '
' in body asyncio.run(run()) def test_root_get_honors_if_none_match() -> None: async def run() -> None: client = create_app(enable_demo_refresh=False).test_client() initial = await client.get("/") etag = initial.headers["ETag"] response = await client.get("/", headers={"If-None-Match": etag}) assert response.status_code == 304 assert response.headers["ETag"] == etag asyncio.run(run()) def test_root_post_serves_morph_component() -> None: async def run() -> None: client = create_app(enable_demo_refresh=False).test_client() async with client.request("/?u=shim", method="POST") as connection: await connection.send_complete() chunk = await asyncio.wait_for(connection.receive(), timeout=1) raw_connection = cast(Any, connection) assert raw_connection.status_code == 200 assert raw_connection.headers["Content-Type"] == "text/event-stream" assert b"event: datastar-patch-elements" in chunk assert b"id: " in chunk assert b'
None: async def run() -> None: async def render() -> str: return '
same
' event_id, event = await render_sse_event(render) repeated_id, repeated_event = await render_sse_event( render, last_event_id=event_id ) assert repeated_id == event_id assert event is not None assert repeated_event is None asyncio.run(run()) def test_app_refresh_broker_publishes_events() -> None: async def run() -> None: app = create_app(enable_demo_refresh=False) broker = get_refresh_broker(app) queue = broker.subscribe() broker.publish() event = await asyncio.wait_for(queue.get(), timeout=1) assert event == "refresh-event" broker.unsubscribe(queue) asyncio.run(run()) def test_render_stream_yields_on_connect_and_refresh() -> None: async def run() -> None: queue = RefreshBroker().subscribe() renders = 0 async def render() -> str: nonlocal renders renders += 1 return f'
{renders}
' stream = render_stream(queue, render) first = await anext(stream) await queue.put("refresh-event") second = await anext(stream) await stream.aclose() assert "1
" in first assert "2" in second asyncio.run(run()) def test_render_dashboard_uses_active_jobs_from_app_state() -> None: async def run() -> None: app = create_app(enable_demo_refresh=False) assert get_active_jobs(app) == 12 set_active_jobs(app, 27) async with app.app_context(): body = str(await render_dashboard(app)) assert "27" in body assert "Temporary live demo counter for Datastar refresh testing" in body asyncio.run(run())