Clean up cancelled SSE wait tasks
This commit is contained in:
parent
ca3d34053f
commit
3f33994cdc
2 changed files with 50 additions and 14 deletions
|
|
@ -169,20 +169,23 @@ async def render_stream(
|
||||||
return
|
return
|
||||||
queue_task = asyncio.create_task(queue.get())
|
queue_task = asyncio.create_task(queue.get())
|
||||||
shutdown_task = asyncio.create_task(shutdown_event.wait())
|
shutdown_task = asyncio.create_task(shutdown_event.wait())
|
||||||
done, pending = await asyncio.wait(
|
try:
|
||||||
{queue_task, shutdown_task},
|
done, _pending = await asyncio.wait(
|
||||||
return_when=asyncio.FIRST_COMPLETED,
|
{queue_task, shutdown_task},
|
||||||
)
|
return_when=asyncio.FIRST_COMPLETED,
|
||||||
for task in pending:
|
)
|
||||||
task.cancel()
|
if shutdown_task in done:
|
||||||
for task in pending:
|
return
|
||||||
with suppress(asyncio.CancelledError):
|
event_name = queue_task.result()
|
||||||
await task
|
finally:
|
||||||
if shutdown_task in done:
|
for task in (queue_task, shutdown_task):
|
||||||
with suppress(asyncio.CancelledError):
|
if not task.done():
|
||||||
await queue_task
|
task.cancel()
|
||||||
return
|
for task in (queue_task, shutdown_task):
|
||||||
event_name = queue_task.result()
|
if task.done() and not task.cancelled():
|
||||||
|
continue
|
||||||
|
with suppress(asyncio.CancelledError):
|
||||||
|
await task
|
||||||
last_event_id, event = await render_sse_event(
|
last_event_id, event = await render_sse_event(
|
||||||
render,
|
render,
|
||||||
last_event_id=last_event_id,
|
last_event_id=last_event_id,
|
||||||
|
|
|
||||||
|
|
@ -575,6 +575,39 @@ def test_render_stream_stops_when_shutdown_is_requested() -> None:
|
||||||
asyncio.run(run())
|
asyncio.run(run())
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_stream_cleans_up_child_tasks_when_cancelled() -> None:
|
||||||
|
async def run() -> None:
|
||||||
|
queue = RefreshBroker().subscribe()
|
||||||
|
shutdown_event = asyncio.Event()
|
||||||
|
|
||||||
|
async def render() -> str:
|
||||||
|
return '<main id="morph">queue</main>'
|
||||||
|
|
||||||
|
stream = render_stream(
|
||||||
|
queue,
|
||||||
|
render,
|
||||||
|
render_on_connect=False,
|
||||||
|
shutdown_event=shutdown_event,
|
||||||
|
)
|
||||||
|
next_event = asyncio.create_task(anext(stream))
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
next_event.cancel()
|
||||||
|
|
||||||
|
with pytest.raises(asyncio.CancelledError):
|
||||||
|
await next_event
|
||||||
|
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
pending = tuple(
|
||||||
|
task
|
||||||
|
for task in asyncio.all_tasks()
|
||||||
|
if task is not asyncio.current_task() and not task.done()
|
||||||
|
)
|
||||||
|
assert pending == ()
|
||||||
|
|
||||||
|
asyncio.run(run())
|
||||||
|
|
||||||
|
|
||||||
def test_render_dashboard_shows_dashboard_information_architecture(
|
def test_render_dashboard_shows_dashboard_information_architecture(
|
||||||
monkeypatch, tmp_path: Path
|
monkeypatch, tmp_path: Path
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue