From c04efeb1894aef3529c90f98ac1576483bb199af Mon Sep 17 00:00:00 2001 From: Abel Luck Date: Tue, 31 Mar 2026 12:47:36 +0200 Subject: [PATCH] Use Hypercorn for republisher serve --- pyproject.toml | 1 + repub/datastar.py | 24 +++++++++++++++++++- repub/entrypoint.py | 48 +++++++++++++++++++++++++++++++++++++--- repub/web.py | 7 ++++++ tests/test_entrypoint.py | 36 +++++++++++++++++++++++++----- tests/test_web.py | 24 ++++++++++++++++++++ uv.lock | 2 ++ 7 files changed, 133 insertions(+), 9 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1de1d45..668db40 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "pillow>=10.3.0,<11.0.0", "ffmpeg-python>=0.2.0,<0.3.0", "Quart>=0.20.0,<0.21.0", + "hypercorn>=0.18.0,<0.19.0", "apscheduler>=3.11.0,<4.0.0", "aiosqlite>=0.21.0,<0.22.0", "datastar-py>=0.8.0,<0.9.0", diff --git a/repub/datastar.py b/repub/datastar.py index 19cff02..49b95c0 100644 --- a/repub/datastar.py +++ b/repub/datastar.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio import hashlib from collections.abc import AsyncGenerator, Awaitable, Callable +from contextlib import suppress from dataclasses import dataclass, field from datetime import UTC, datetime, timedelta from typing import Protocol @@ -151,6 +152,7 @@ async def render_stream( *, last_event_id: str | None = None, render_on_connect: bool = True, + shutdown_event: asyncio.Event | None = None, ) -> AsyncGenerator[DatastarEvent, None]: if render_on_connect: last_event_id, event = await render_sse_event( @@ -160,7 +162,27 @@ async def render_stream( yield event while True: - event_name = await queue.get() + if shutdown_event is None: + event_name = await queue.get() + else: + if shutdown_event.is_set(): + return + queue_task = asyncio.create_task(queue.get()) + shutdown_task = asyncio.create_task(shutdown_event.wait()) + done, pending = await asyncio.wait( + {queue_task, shutdown_task}, + return_when=asyncio.FIRST_COMPLETED, + ) + for task in pending: + task.cancel() + for task in pending: + with suppress(asyncio.CancelledError): + await task + if shutdown_task in done: + with suppress(asyncio.CancelledError): + await queue_task + return + event_name = queue_task.result() last_event_id, event = await render_sse_event( render, last_event_id=last_event_id, diff --git a/repub/entrypoint.py b/repub/entrypoint.py index 71861a6..1cc7932 100644 --- a/repub/entrypoint.py +++ b/repub/entrypoint.py @@ -1,12 +1,18 @@ from __future__ import annotations import argparse +import asyncio import logging import os +import signal import sys +from contextlib import suppress + +from hypercorn.asyncio import serve as hypercorn_serve +from hypercorn.config import Config as HypercornConfig import repub.crawl as crawl_module -from repub.web import create_app +from repub.web import SHUTDOWN_EVENT_KEY, create_app FeedNameFilter = crawl_module.FeedNameFilter check_runtime = crawl_module.check_runtime @@ -67,6 +73,42 @@ def parse_args(argv: list[str] | None = None) -> tuple[str, argparse.Namespace]: return command, args +def _install_signal_handlers(stop_event: asyncio.Event) -> None: + loop = asyncio.get_running_loop() + + def request_stop(*_: object) -> None: + if not stop_event.is_set(): + stop_event.set() + + for signum in (signal.SIGINT, signal.SIGTERM): + try: + loop.add_signal_handler(signum, request_stop) + except NotImplementedError: + signal.signal(signum, request_stop) + + +async def _serve_app(*, host: str, port: int, dev_mode: bool) -> None: + stop_event = asyncio.Event() + _install_signal_handlers(stop_event) + + app = create_app(dev_mode=dev_mode) + app.extensions[SHUTDOWN_EVENT_KEY] = stop_event + + config = HypercornConfig() + config.bind = [f"{host}:{port}"] + config.use_reloader = False + config.accesslog = "-" + config.errorlog = "-" + + async def shutdown_trigger() -> None: + await stop_event.wait() + + try: + await hypercorn_serve(app, config, shutdown_trigger=shutdown_trigger) + finally: + stop_event.set() + + def entrypoint(argv: list[str] | None = None) -> int: command, args = parse_args(argv) @@ -80,8 +122,8 @@ def entrypoint(argv: list[str] | None = None) -> int: logger.error("Invalid REPUBLISHER_PORT/--port value: %s", args.port) return 2 - app = create_app(dev_mode=bool(args.dev_mode)) - app.run(host=args.host, port=port) + with suppress(KeyboardInterrupt): + asyncio.run(_serve_app(host=args.host, port=port, dev_mode=bool(args.dev_mode))) return 0 diff --git a/repub/web.py b/repub/web.py index db9128f..f5d2339 100644 --- a/repub/web.py +++ b/repub/web.py @@ -55,6 +55,7 @@ REFRESH_BROKER_KEY = "repub.refresh_broker" JOB_RUNTIME_KEY = "repub.job_runtime" TAB_STATE_STORE_KEY = "repub.tab_state_store" TAB_STATE_CLEANER_TASK_KEY = "repub.tab_state_cleaner_task" +SHUTDOWN_EVENT_KEY = "repub.shutdown_event" DEFAULT_LOG_DIR = Path("out/logs") DEFAULT_FEEDS_DIR = Path("out/feeds") RUNS_TAB_STATE_KEY = "runs" @@ -146,6 +147,7 @@ def create_app(*, dev_mode: bool = False) -> Quart: app.extensions[JOB_RUNTIME_KEY] = None app.extensions[TAB_STATE_STORE_KEY] = TabStateStore() app.extensions[TAB_STATE_CLEANER_TASK_KEY] = None + app.extensions[SHUTDOWN_EVENT_KEY] = None @app.get("/feeds/") async def published_feed(feed_path: str) -> Response: @@ -402,6 +404,10 @@ def get_refresh_broker(app: Quart) -> RefreshBroker: return cast(RefreshBroker, app.extensions[REFRESH_BROKER_KEY]) +def get_shutdown_event(app: Quart) -> asyncio.Event | None: + return cast(asyncio.Event | None, app.extensions.get(SHUTDOWN_EVENT_KEY)) + + def get_tab_state_store(app: Quart) -> TabStateStore: return cast(TabStateStore, app.extensions[TAB_STATE_STORE_KEY]) @@ -545,6 +551,7 @@ async def _page_patch_response( queue, render=lambda: render(tab_id), last_event_id=request.headers.get("last-event-id"), + shutdown_event=get_shutdown_event(app), ) return DatastarResponse(_unsubscribe_on_close(queue, stream, app, tab_id=tab_id)) diff --git a/tests/test_entrypoint.py b/tests/test_entrypoint.py index bc0e6a0..87edd9b 100644 --- a/tests/test_entrypoint.py +++ b/tests/test_entrypoint.py @@ -1,7 +1,8 @@ import io import logging +from collections.abc import Awaitable, Callable from types import SimpleNamespace -from typing import cast +from typing import Any, cast from repub.entrypoint import FeedNameFilter, entrypoint, logger, parse_args @@ -69,19 +70,44 @@ def test_entrypoint_passes_dev_mode_to_create_app(monkeypatch) -> None: recorded: dict[str, object] = {} class StubApp: - def run(self, *, host: str, port: int) -> None: - recorded["host"] = host - recorded["port"] = port + def __init__(self) -> None: + self.extensions: dict[str, object] = {} def fake_create_app(*, dev_mode: bool) -> StubApp: recorded["dev_mode"] = dev_mode return StubApp() + def fake_install_signal_handlers(stop_event: object) -> None: + recorded["stop_event"] = stop_event + + async def fake_hypercorn_serve( + app: StubApp, + config: Any, + *, + shutdown_trigger: Callable[[], Awaitable[None]], + ) -> None: + recorded["app"] = app + recorded["host"] = config.bind[0].split(":")[0] + recorded["port"] = int(config.bind[0].split(":")[1]) + recorded["shutdown_trigger"] = shutdown_trigger + shutdown_event = cast(Any, app.extensions["repub.shutdown_event"]) + recorded["app_shutdown_event"] = shutdown_event + shutdown_event.set() + await shutdown_trigger() + monkeypatch.setattr("repub.entrypoint.create_app", fake_create_app) + monkeypatch.setattr( + "repub.entrypoint._install_signal_handlers", fake_install_signal_handlers + ) + monkeypatch.setattr("repub.entrypoint.hypercorn_serve", fake_hypercorn_serve) exit_code = entrypoint( ["serve", "--dev-mode", "--host", "0.0.0.0", "--port", "9090"] ) assert exit_code == 0 - assert recorded == {"dev_mode": True, "host": "0.0.0.0", "port": 9090} + assert recorded["dev_mode"] is True + assert recorded["host"] == "0.0.0.0" + assert recorded["port"] == 9090 + assert recorded["stop_event"] is recorded["app_shutdown_event"] + assert callable(recorded["shutdown_trigger"]) diff --git a/tests/test_web.py b/tests/test_web.py index 11b2443..1ebae13 100644 --- a/tests/test_web.py +++ b/tests/test_web.py @@ -548,6 +548,30 @@ def test_render_stream_uses_view_transition_for_queue_reorders() -> None: asyncio.run(run()) +def test_render_stream_stops_when_shutdown_is_requested() -> None: + async def run() -> None: + queue = RefreshBroker().subscribe() + shutdown_event = asyncio.Event() + + async def render() -> str: + return '
queue
' + + stream = render_stream( + queue, + render, + render_on_connect=False, + shutdown_event=shutdown_event, + ) + next_event = asyncio.create_task(anext(stream)) + await asyncio.sleep(0) + shutdown_event.set() + + with pytest.raises(StopAsyncIteration): + await asyncio.wait_for(next_event, timeout=1) + + await stream.aclose() + + asyncio.run(run()) def test_render_dashboard_shows_dashboard_information_architecture( monkeypatch, tmp_path: Path ) -> None: diff --git a/uv.lock b/uv.lock index bfa042b..99baa03 100644 --- a/uv.lock +++ b/uv.lock @@ -1090,6 +1090,7 @@ dependencies = [ { name = "ffmpeg-python" }, { name = "greenlet" }, { name = "htpy" }, + { name = "hypercorn" }, { name = "lxml" }, { name = "peewee" }, { name = "pillow" }, @@ -1122,6 +1123,7 @@ requires-dist = [ { name = "ffmpeg-python", specifier = ">=0.2.0,<0.3.0" }, { name = "greenlet", specifier = ">=3.2.4,<4.0.0" }, { name = "htpy", specifier = ">=25.12.0,<26.0.0" }, + { name = "hypercorn", specifier = ">=0.18.0,<0.19.0" }, { name = "lxml", specifier = ">=5.2.1,<6.0.0" }, { name = "peewee", specifier = ">=3.19.0,<4.0.0" }, { name = "pillow", specifier = ">=10.3.0,<11.0.0" },