from __future__ import annotations
import asyncio
from typing import Any, cast
from repub.datastar import RefreshBroker, render_sse_event, render_stream
from repub.web import (
create_app,
get_active_jobs,
get_refresh_broker,
render_dashboard,
set_active_jobs,
)
def test_root_get_serves_datastar_shim() -> None:
async def run() -> None:
client = create_app(enable_demo_refresh=False).test_client()
response = await client.get("/")
body = await response.get_data(as_text=True)
assert response.status_code == 200
assert response.headers["ETag"]
assert body.startswith("")
assert (
''
in body
)
assert 'data-signals:tabid="self.crypto.randomUUID().substring(0,8)"' in body
assert 'data-init="@post(window.location.pathname +' in body
assert "retryMaxCount: Infinity" in body
assert "data-on:online__window=" in body
assert '' in body
asyncio.run(run())
def test_root_get_honors_if_none_match() -> None:
async def run() -> None:
client = create_app(enable_demo_refresh=False).test_client()
initial = await client.get("/")
etag = initial.headers["ETag"]
response = await client.get("/", headers={"If-None-Match": etag})
assert response.status_code == 304
assert response.headers["ETag"] == etag
asyncio.run(run())
def test_root_post_serves_morph_component() -> None:
async def run() -> None:
client = create_app(enable_demo_refresh=False).test_client()
async with client.request("/?u=shim", method="POST") as connection:
await connection.send_complete()
chunk = await asyncio.wait_for(connection.receive(), timeout=1)
raw_connection = cast(Any, connection)
assert raw_connection.status_code == 200
assert raw_connection.headers["Content-Type"] == "text/event-stream"
assert b"event: datastar-patch-elements" in chunk
assert b"id: " in chunk
assert b' None:
async def run() -> None:
async def render() -> str:
return 'same'
event_id, event = await render_sse_event(render)
repeated_id, repeated_event = await render_sse_event(
render, last_event_id=event_id
)
assert repeated_id == event_id
assert event is not None
assert repeated_event is None
asyncio.run(run())
def test_app_refresh_broker_publishes_events() -> None:
async def run() -> None:
app = create_app(enable_demo_refresh=False)
broker = get_refresh_broker(app)
queue = broker.subscribe()
broker.publish()
event = await asyncio.wait_for(queue.get(), timeout=1)
assert event == "refresh-event"
broker.unsubscribe(queue)
asyncio.run(run())
def test_render_stream_yields_on_connect_and_refresh() -> None:
async def run() -> None:
queue = RefreshBroker().subscribe()
renders = 0
async def render() -> str:
nonlocal renders
renders += 1
return f'{renders}'
stream = render_stream(queue, render)
first = await anext(stream)
await queue.put("refresh-event")
second = await anext(stream)
await stream.aclose()
assert "1" in first
assert "2" in second
asyncio.run(run())
def test_render_dashboard_uses_active_jobs_from_app_state() -> None:
async def run() -> None:
app = create_app(enable_demo_refresh=False)
assert get_active_jobs(app) == 12
set_active_jobs(app, 27)
async with app.app_context():
body = str(await render_dashboard(app))
assert "27" in body
assert "Temporary live demo counter for Datastar refresh testing" in body
assert "/demo/decrement" in body
assert "data-bind:decrement-amount" in body
asyncio.run(run())
def test_demo_decrement_action_decrements_active_jobs() -> None:
async def run() -> None:
app = create_app(enable_demo_refresh=False)
broker = get_refresh_broker(app)
queue = broker.subscribe()
client = app.test_client()
response = await client.post(
"/demo/decrement",
headers={"Datastar-Request": "true"},
json={"decrementAmount": "3"},
)
body = await response.get_data(as_text=True)
event = await asyncio.wait_for(queue.get(), timeout=1)
assert response.status_code == 200
assert get_active_jobs(app) == 9
assert event == "refresh-event"
assert 'data: signals {"decrementError":""}' in body
broker.unsubscribe(queue)
asyncio.run(run())
def test_demo_decrement_action_validates_odd_amount() -> None:
async def run() -> None:
app = create_app(enable_demo_refresh=False)
broker = get_refresh_broker(app)
queue = broker.subscribe()
client = app.test_client()
response = await client.post(
"/demo/decrement",
headers={"Datastar-Request": "true"},
json={"decrementAmount": "2"},
)
body = await response.get_data(as_text=True)
assert response.status_code == 200
assert get_active_jobs(app) == 12
assert "odd integer" in body
try:
await asyncio.wait_for(queue.get(), timeout=0.1)
except TimeoutError:
pass
else:
raise AssertionError("invalid decrement should not publish a refresh")
finally:
broker.unsubscribe(queue)
asyncio.run(run())