Use Hypercorn for republisher serve
This commit is contained in:
parent
73617cd40c
commit
c04efeb189
7 changed files with 133 additions and 9 deletions
|
|
@ -15,6 +15,7 @@ dependencies = [
|
|||
"pillow>=10.3.0,<11.0.0",
|
||||
"ffmpeg-python>=0.2.0,<0.3.0",
|
||||
"Quart>=0.20.0,<0.21.0",
|
||||
"hypercorn>=0.18.0,<0.19.0",
|
||||
"apscheduler>=3.11.0,<4.0.0",
|
||||
"aiosqlite>=0.21.0,<0.22.0",
|
||||
"datastar-py>=0.8.0,<0.9.0",
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ from __future__ import annotations
|
|||
import asyncio
|
||||
import hashlib
|
||||
from collections.abc import AsyncGenerator, Awaitable, Callable
|
||||
from contextlib import suppress
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import Protocol
|
||||
|
|
@ -151,6 +152,7 @@ async def render_stream(
|
|||
*,
|
||||
last_event_id: str | None = None,
|
||||
render_on_connect: bool = True,
|
||||
shutdown_event: asyncio.Event | None = None,
|
||||
) -> AsyncGenerator[DatastarEvent, None]:
|
||||
if render_on_connect:
|
||||
last_event_id, event = await render_sse_event(
|
||||
|
|
@ -160,7 +162,27 @@ async def render_stream(
|
|||
yield event
|
||||
|
||||
while True:
|
||||
event_name = await queue.get()
|
||||
if shutdown_event is None:
|
||||
event_name = await queue.get()
|
||||
else:
|
||||
if shutdown_event.is_set():
|
||||
return
|
||||
queue_task = asyncio.create_task(queue.get())
|
||||
shutdown_task = asyncio.create_task(shutdown_event.wait())
|
||||
done, pending = await asyncio.wait(
|
||||
{queue_task, shutdown_task},
|
||||
return_when=asyncio.FIRST_COMPLETED,
|
||||
)
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
for task in pending:
|
||||
with suppress(asyncio.CancelledError):
|
||||
await task
|
||||
if shutdown_task in done:
|
||||
with suppress(asyncio.CancelledError):
|
||||
await queue_task
|
||||
return
|
||||
event_name = queue_task.result()
|
||||
last_event_id, event = await render_sse_event(
|
||||
render,
|
||||
last_event_id=last_event_id,
|
||||
|
|
|
|||
|
|
@ -1,12 +1,18 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
from contextlib import suppress
|
||||
|
||||
from hypercorn.asyncio import serve as hypercorn_serve
|
||||
from hypercorn.config import Config as HypercornConfig
|
||||
|
||||
import repub.crawl as crawl_module
|
||||
from repub.web import create_app
|
||||
from repub.web import SHUTDOWN_EVENT_KEY, create_app
|
||||
|
||||
FeedNameFilter = crawl_module.FeedNameFilter
|
||||
check_runtime = crawl_module.check_runtime
|
||||
|
|
@ -67,6 +73,42 @@ def parse_args(argv: list[str] | None = None) -> tuple[str, argparse.Namespace]:
|
|||
return command, args
|
||||
|
||||
|
||||
def _install_signal_handlers(stop_event: asyncio.Event) -> None:
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
def request_stop(*_: object) -> None:
|
||||
if not stop_event.is_set():
|
||||
stop_event.set()
|
||||
|
||||
for signum in (signal.SIGINT, signal.SIGTERM):
|
||||
try:
|
||||
loop.add_signal_handler(signum, request_stop)
|
||||
except NotImplementedError:
|
||||
signal.signal(signum, request_stop)
|
||||
|
||||
|
||||
async def _serve_app(*, host: str, port: int, dev_mode: bool) -> None:
|
||||
stop_event = asyncio.Event()
|
||||
_install_signal_handlers(stop_event)
|
||||
|
||||
app = create_app(dev_mode=dev_mode)
|
||||
app.extensions[SHUTDOWN_EVENT_KEY] = stop_event
|
||||
|
||||
config = HypercornConfig()
|
||||
config.bind = [f"{host}:{port}"]
|
||||
config.use_reloader = False
|
||||
config.accesslog = "-"
|
||||
config.errorlog = "-"
|
||||
|
||||
async def shutdown_trigger() -> None:
|
||||
await stop_event.wait()
|
||||
|
||||
try:
|
||||
await hypercorn_serve(app, config, shutdown_trigger=shutdown_trigger)
|
||||
finally:
|
||||
stop_event.set()
|
||||
|
||||
|
||||
def entrypoint(argv: list[str] | None = None) -> int:
|
||||
command, args = parse_args(argv)
|
||||
|
||||
|
|
@ -80,8 +122,8 @@ def entrypoint(argv: list[str] | None = None) -> int:
|
|||
logger.error("Invalid REPUBLISHER_PORT/--port value: %s", args.port)
|
||||
return 2
|
||||
|
||||
app = create_app(dev_mode=bool(args.dev_mode))
|
||||
app.run(host=args.host, port=port)
|
||||
with suppress(KeyboardInterrupt):
|
||||
asyncio.run(_serve_app(host=args.host, port=port, dev_mode=bool(args.dev_mode)))
|
||||
return 0
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -55,6 +55,7 @@ REFRESH_BROKER_KEY = "repub.refresh_broker"
|
|||
JOB_RUNTIME_KEY = "repub.job_runtime"
|
||||
TAB_STATE_STORE_KEY = "repub.tab_state_store"
|
||||
TAB_STATE_CLEANER_TASK_KEY = "repub.tab_state_cleaner_task"
|
||||
SHUTDOWN_EVENT_KEY = "repub.shutdown_event"
|
||||
DEFAULT_LOG_DIR = Path("out/logs")
|
||||
DEFAULT_FEEDS_DIR = Path("out/feeds")
|
||||
RUNS_TAB_STATE_KEY = "runs"
|
||||
|
|
@ -146,6 +147,7 @@ def create_app(*, dev_mode: bool = False) -> Quart:
|
|||
app.extensions[JOB_RUNTIME_KEY] = None
|
||||
app.extensions[TAB_STATE_STORE_KEY] = TabStateStore()
|
||||
app.extensions[TAB_STATE_CLEANER_TASK_KEY] = None
|
||||
app.extensions[SHUTDOWN_EVENT_KEY] = None
|
||||
|
||||
@app.get("/feeds/<path:feed_path>")
|
||||
async def published_feed(feed_path: str) -> Response:
|
||||
|
|
@ -402,6 +404,10 @@ def get_refresh_broker(app: Quart) -> RefreshBroker:
|
|||
return cast(RefreshBroker, app.extensions[REFRESH_BROKER_KEY])
|
||||
|
||||
|
||||
def get_shutdown_event(app: Quart) -> asyncio.Event | None:
|
||||
return cast(asyncio.Event | None, app.extensions.get(SHUTDOWN_EVENT_KEY))
|
||||
|
||||
|
||||
def get_tab_state_store(app: Quart) -> TabStateStore:
|
||||
return cast(TabStateStore, app.extensions[TAB_STATE_STORE_KEY])
|
||||
|
||||
|
|
@ -545,6 +551,7 @@ async def _page_patch_response(
|
|||
queue,
|
||||
render=lambda: render(tab_id),
|
||||
last_event_id=request.headers.get("last-event-id"),
|
||||
shutdown_event=get_shutdown_event(app),
|
||||
)
|
||||
return DatastarResponse(_unsubscribe_on_close(queue, stream, app, tab_id=tab_id))
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
import io
|
||||
import logging
|
||||
from collections.abc import Awaitable, Callable
|
||||
from types import SimpleNamespace
|
||||
from typing import cast
|
||||
from typing import Any, cast
|
||||
|
||||
from repub.entrypoint import FeedNameFilter, entrypoint, logger, parse_args
|
||||
|
||||
|
|
@ -69,19 +70,44 @@ def test_entrypoint_passes_dev_mode_to_create_app(monkeypatch) -> None:
|
|||
recorded: dict[str, object] = {}
|
||||
|
||||
class StubApp:
|
||||
def run(self, *, host: str, port: int) -> None:
|
||||
recorded["host"] = host
|
||||
recorded["port"] = port
|
||||
def __init__(self) -> None:
|
||||
self.extensions: dict[str, object] = {}
|
||||
|
||||
def fake_create_app(*, dev_mode: bool) -> StubApp:
|
||||
recorded["dev_mode"] = dev_mode
|
||||
return StubApp()
|
||||
|
||||
def fake_install_signal_handlers(stop_event: object) -> None:
|
||||
recorded["stop_event"] = stop_event
|
||||
|
||||
async def fake_hypercorn_serve(
|
||||
app: StubApp,
|
||||
config: Any,
|
||||
*,
|
||||
shutdown_trigger: Callable[[], Awaitable[None]],
|
||||
) -> None:
|
||||
recorded["app"] = app
|
||||
recorded["host"] = config.bind[0].split(":")[0]
|
||||
recorded["port"] = int(config.bind[0].split(":")[1])
|
||||
recorded["shutdown_trigger"] = shutdown_trigger
|
||||
shutdown_event = cast(Any, app.extensions["repub.shutdown_event"])
|
||||
recorded["app_shutdown_event"] = shutdown_event
|
||||
shutdown_event.set()
|
||||
await shutdown_trigger()
|
||||
|
||||
monkeypatch.setattr("repub.entrypoint.create_app", fake_create_app)
|
||||
monkeypatch.setattr(
|
||||
"repub.entrypoint._install_signal_handlers", fake_install_signal_handlers
|
||||
)
|
||||
monkeypatch.setattr("repub.entrypoint.hypercorn_serve", fake_hypercorn_serve)
|
||||
|
||||
exit_code = entrypoint(
|
||||
["serve", "--dev-mode", "--host", "0.0.0.0", "--port", "9090"]
|
||||
)
|
||||
|
||||
assert exit_code == 0
|
||||
assert recorded == {"dev_mode": True, "host": "0.0.0.0", "port": 9090}
|
||||
assert recorded["dev_mode"] is True
|
||||
assert recorded["host"] == "0.0.0.0"
|
||||
assert recorded["port"] == 9090
|
||||
assert recorded["stop_event"] is recorded["app_shutdown_event"]
|
||||
assert callable(recorded["shutdown_trigger"])
|
||||
|
|
|
|||
|
|
@ -548,6 +548,30 @@ def test_render_stream_uses_view_transition_for_queue_reorders() -> None:
|
|||
asyncio.run(run())
|
||||
|
||||
|
||||
def test_render_stream_stops_when_shutdown_is_requested() -> None:
|
||||
async def run() -> None:
|
||||
queue = RefreshBroker().subscribe()
|
||||
shutdown_event = asyncio.Event()
|
||||
|
||||
async def render() -> str:
|
||||
return '<main id="morph">queue</main>'
|
||||
|
||||
stream = render_stream(
|
||||
queue,
|
||||
render,
|
||||
render_on_connect=False,
|
||||
shutdown_event=shutdown_event,
|
||||
)
|
||||
next_event = asyncio.create_task(anext(stream))
|
||||
await asyncio.sleep(0)
|
||||
shutdown_event.set()
|
||||
|
||||
with pytest.raises(StopAsyncIteration):
|
||||
await asyncio.wait_for(next_event, timeout=1)
|
||||
|
||||
await stream.aclose()
|
||||
|
||||
asyncio.run(run())
|
||||
def test_render_dashboard_shows_dashboard_information_architecture(
|
||||
monkeypatch, tmp_path: Path
|
||||
) -> None:
|
||||
|
|
|
|||
2
uv.lock
generated
2
uv.lock
generated
|
|
@ -1090,6 +1090,7 @@ dependencies = [
|
|||
{ name = "ffmpeg-python" },
|
||||
{ name = "greenlet" },
|
||||
{ name = "htpy" },
|
||||
{ name = "hypercorn" },
|
||||
{ name = "lxml" },
|
||||
{ name = "peewee" },
|
||||
{ name = "pillow" },
|
||||
|
|
@ -1122,6 +1123,7 @@ requires-dist = [
|
|||
{ name = "ffmpeg-python", specifier = ">=0.2.0,<0.3.0" },
|
||||
{ name = "greenlet", specifier = ">=3.2.4,<4.0.0" },
|
||||
{ name = "htpy", specifier = ">=25.12.0,<26.0.0" },
|
||||
{ name = "hypercorn", specifier = ">=0.18.0,<0.19.0" },
|
||||
{ name = "lxml", specifier = ">=5.2.1,<6.0.0" },
|
||||
{ name = "peewee", specifier = ">=3.19.0,<4.0.0" },
|
||||
{ name = "pillow", specifier = ">=10.3.0,<11.0.0" },
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue