republisher/repub/web.py

774 lines
27 KiB
Python
Raw Normal View History

2026-03-30 11:42:13 +02:00
from __future__ import annotations
2026-03-30 12:34:38 +02:00
import asyncio
2026-03-30 12:27:45 +02:00
import hashlib
2026-03-30 13:11:37 +02:00
from collections.abc import AsyncGenerator, Awaitable, Callable
from contextlib import suppress
from datetime import timedelta
2026-03-30 14:02:39 +02:00
from pathlib import Path
2026-03-30 13:37:25 +02:00
from typing import TypedDict, cast
2026-03-30 13:23:36 +02:00
from urllib.parse import urlparse
2026-03-30 12:13:04 +02:00
2026-03-30 12:27:45 +02:00
import htpy as h
2026-03-30 13:23:36 +02:00
from datastar_py import ServerSentEventGenerator as SSE
from datastar_py.quart import DatastarResponse, read_signals
2026-03-30 12:34:38 +02:00
from datastar_py.sse import DatastarEvent
from htpy import Renderable
2026-03-30 13:37:25 +02:00
from peewee import IntegrityError
from quart import Quart, Response, request, send_from_directory, url_for
2026-03-30 12:27:45 +02:00
from repub.datastar import RefreshBroker, TabStateStore, render_stream
2026-03-30 14:02:39 +02:00
from repub.jobs import (
COMPLETED_EXECUTION_PAGE_SIZE,
2026-03-30 14:02:39 +02:00
JobRuntime,
clear_completed_executions,
2026-03-30 14:02:39 +02:00
load_dashboard_view,
load_execution_log_view,
load_runs_view,
)
2026-03-30 13:37:25 +02:00
from repub.model import (
create_source,
2026-03-30 14:02:39 +02:00
delete_job_source,
delete_source,
2026-03-30 13:37:25 +02:00
initialize_database,
load_job_enabled,
2026-03-30 18:26:02 +02:00
load_settings_form,
2026-03-30 13:49:00 +02:00
load_source_form,
2026-03-30 13:37:25 +02:00
load_sources,
2026-03-30 18:26:02 +02:00
save_setting,
2026-03-30 13:37:25 +02:00
source_slug_exists,
2026-03-30 13:49:00 +02:00
update_source,
2026-03-30 13:37:25 +02:00
)
2026-03-30 13:11:37 +02:00
from repub.pages import (
create_source_page,
2026-03-30 14:02:39 +02:00
dashboard_page_with_data,
2026-03-30 13:49:00 +02:00
edit_source_page,
2026-03-30 13:11:37 +02:00
execution_logs_page,
runs_page,
2026-03-30 18:26:02 +02:00
settings_page,
2026-03-30 13:11:37 +02:00
shim_page,
sources_page,
)
2026-03-30 13:37:25 +02:00
from repub.pages.sources import PANGEA_CONTENT_FORMATS, PANGEA_CONTENT_TYPES
2026-03-30 12:27:45 +02:00
2026-03-30 12:34:38 +02:00
REFRESH_BROKER_KEY = "repub.refresh_broker"
2026-03-30 14:02:39 +02:00
JOB_RUNTIME_KEY = "repub.job_runtime"
TAB_STATE_STORE_KEY = "repub.tab_state_store"
TAB_STATE_CLEANER_TASK_KEY = "repub.tab_state_cleaner_task"
2026-03-31 12:47:36 +02:00
SHUTDOWN_EVENT_KEY = "repub.shutdown_event"
2026-03-30 14:02:39 +02:00
DEFAULT_LOG_DIR = Path("out/logs")
2026-03-30 15:36:12 +02:00
DEFAULT_FEEDS_DIR = Path("out/feeds")
RUNS_TAB_STATE_KEY = "runs"
TAB_STATE_CLEAN_INTERVAL = timedelta(seconds=10)
2026-03-30 13:11:37 +02:00
PatchRenderFunction = Callable[[str | None], Awaitable[Renderable]]
2026-03-30 12:34:38 +02:00
2026-03-30 12:27:45 +02:00
2026-03-30 13:37:25 +02:00
class SourceFormData(TypedDict):
name: str
slug: str
source_type: str
notes: str
spider_arguments: str
enabled: bool
2026-03-30 18:26:02 +02:00
convert_images: bool
convert_video: bool
2026-03-30 13:37:25 +02:00
cron_minute: str
cron_hour: str
cron_day_of_month: str
cron_day_of_week: str
cron_month: str
feed_url: str
pangea_domain: str
pangea_category: str
content_format: str
content_type: str
max_articles: int | None
oldest_article: int | None
only_newest: bool
include_authors: bool
exclude_media: bool
include_content: bool
2026-03-30 18:26:02 +02:00
class SettingsFormData(TypedDict):
max_concurrent_jobs: int
2026-03-31 12:14:47 +02:00
feed_url: str
2026-03-30 18:26:02 +02:00
2026-03-30 13:37:25 +02:00
DEFAULT_PANGEA_CONTENT_FORMAT = "MOBILE_3"
DEFAULT_PANGEA_CONTENT_TYPE = "articles"
DEFAULT_PANGEA_MAX_ARTICLES = "10"
DEFAULT_PANGEA_OLDEST_ARTICLE = "3"
2026-03-31 10:37:33 +02:00
STATIC_DIR = Path(__file__).resolve().parent / "static"
CACHE_BUSTED_STATIC_ASSETS = frozenset({"app.css"})
CACHE_BUSTED_HASH_LENGTH = 12
2026-03-30 13:37:25 +02:00
2026-03-30 14:16:15 +02:00
def _render_shim_page(
*, stylesheet_href: str, datastar_src: str, current_path: str
) -> tuple[str, str]:
2026-03-30 12:27:45 +02:00
head = (
h.title["Republisher Admin UI"],
h.link(rel="stylesheet", href=stylesheet_href),
)
2026-03-30 14:16:15 +02:00
body = str(
shim_page(datastar_src=datastar_src, current_path=current_path, head=head)
)
2026-03-30 12:27:45 +02:00
etag = hashlib.sha256(body.encode("utf-8")).hexdigest()
return body, etag
2026-03-30 11:42:13 +02:00
2026-03-31 10:37:33 +02:00
def versioned_static_asset_filename(filename: str) -> str:
_require_cache_busted_static_asset(filename)
asset_path = STATIC_DIR / filename
truncated_hash = hashlib.sha256(asset_path.read_bytes()).hexdigest()[
:CACHE_BUSTED_HASH_LENGTH
]
return f"{asset_path.stem}-{truncated_hash}{asset_path.suffix}"
def versioned_static_asset_href(filename: str) -> str:
return f"/static/{versioned_static_asset_filename(filename)}"
def _require_cache_busted_static_asset(filename: str) -> None:
if filename not in CACHE_BUSTED_STATIC_ASSETS:
raise ValueError(f"Unsupported cache-busted static asset: {filename}")
2026-03-30 15:36:12 +02:00
def create_app(*, dev_mode: bool = False) -> Quart:
2026-03-30 11:42:13 +02:00
app = Quart(__name__)
2026-03-30 13:23:36 +02:00
app.config["REPUB_DB_PATH"] = str(initialize_database())
2026-03-30 14:02:39 +02:00
app.config.setdefault("REPUB_LOG_DIR", DEFAULT_LOG_DIR)
2026-03-30 15:36:12 +02:00
app.config.setdefault("REPUB_FEEDS_DIR", DEFAULT_FEEDS_DIR)
app.config["REPUB_DEV_MODE"] = dev_mode
2026-03-30 12:34:38 +02:00
app.extensions[REFRESH_BROKER_KEY] = RefreshBroker()
2026-03-30 14:02:39 +02:00
app.extensions[JOB_RUNTIME_KEY] = None
app.extensions[TAB_STATE_STORE_KEY] = TabStateStore()
app.extensions[TAB_STATE_CLEANER_TASK_KEY] = None
2026-03-31 12:47:36 +02:00
app.extensions[SHUTDOWN_EVENT_KEY] = None
2026-03-30 11:42:13 +02:00
2026-03-30 15:36:12 +02:00
@app.get("/feeds/<path:feed_path>")
async def published_feed(feed_path: str) -> Response:
if not bool(app.config["REPUB_DEV_MODE"]):
return Response(status=404)
response = await send_from_directory(
2026-03-30 15:36:12 +02:00
str(Path(app.config["REPUB_FEEDS_DIR"])),
feed_path,
)
if Path(feed_path).suffix == ".rss":
response.mimetype = "application/rss+xml"
return response
2026-03-30 15:36:12 +02:00
2026-03-31 10:37:33 +02:00
@app.get("/static/<string:asset_name>-<string:asset_hash>.<string:extension>")
async def versioned_static_asset(
asset_name: str, asset_hash: str, extension: str
) -> Response:
logical_filename = f"{asset_name}.{extension}"
requested_filename = f"{asset_name}-{asset_hash}.{extension}"
if logical_filename in CACHE_BUSTED_STATIC_ASSETS:
response = await send_from_directory(str(STATIC_DIR), logical_filename)
response.cache_control.public = True
response.cache_control.max_age = 31536000
response.cache_control.immutable = True
return response
response = await send_from_directory(str(STATIC_DIR), requested_filename)
return response
2026-03-30 11:42:13 +02:00
@app.get("/")
2026-03-30 13:11:37 +02:00
@app.get("/sources")
@app.get("/sources/create")
2026-03-30 13:49:00 +02:00
@app.get("/sources/<string:slug>/edit")
2026-03-30 13:11:37 +02:00
@app.get("/runs")
2026-03-30 18:26:02 +02:00
@app.get("/settings")
2026-03-30 13:11:37 +02:00
@app.get("/job/<int:job_id>/execution/<int:execution_id>/logs")
async def page_shim(
2026-03-30 13:49:00 +02:00
slug: str | None = None,
job_id: int | None = None,
execution_id: int | None = None,
2026-03-30 13:11:37 +02:00
) -> Response:
2026-03-30 13:49:00 +02:00
del slug, job_id, execution_id
2026-03-30 12:27:45 +02:00
body, etag = _render_shim_page(
2026-03-31 10:37:33 +02:00
stylesheet_href=versioned_static_asset_href("app.css"),
2026-03-30 12:27:45 +02:00
datastar_src=url_for("static", filename="datastar@1.0.0-RC.8.js"),
2026-03-30 14:16:15 +02:00
current_path=request.path,
2026-03-30 12:27:45 +02:00
)
if request.if_none_match.contains(etag):
response = Response(status=304)
response.set_etag(etag)
return response
response = Response(body, mimetype="text/html")
response.set_etag(etag)
return response
@app.post("/")
2026-03-30 13:11:37 +02:00
async def dashboard_patch() -> DatastarResponse:
return await _page_patch_response(app, lambda _tab_id: render_dashboard(app))
2026-03-30 13:11:37 +02:00
@app.post("/sources")
async def sources_patch() -> DatastarResponse:
return await _page_patch_response(app, lambda _tab_id: render_sources(app))
2026-03-30 13:11:37 +02:00
@app.post("/sources/create")
async def create_source_patch() -> DatastarResponse:
return await _page_patch_response(
app, lambda _tab_id: render_create_source(app)
)
2026-03-30 13:23:36 +02:00
2026-03-30 13:49:00 +02:00
@app.post("/sources/<string:slug>/edit")
async def edit_source_patch(slug: str) -> DatastarResponse:
return await _page_patch_response(
app, lambda _tab_id: render_edit_source(slug, app)
)
2026-03-30 18:26:02 +02:00
@app.post("/settings")
async def settings_patch() -> DatastarResponse:
return await _page_patch_response(app, lambda _tab_id: render_settings(app))
2026-03-30 13:49:00 +02:00
2026-03-30 13:23:36 +02:00
@app.post("/actions/sources/create")
async def create_source_action() -> DatastarResponse:
signals = cast(dict[str, object], await read_signals())
source, error = validate_source_form(
signals,
2026-03-30 13:37:25 +02:00
slug_exists=source_slug_exists,
2026-03-30 13:23:36 +02:00
)
if error is not None:
return DatastarResponse(
SSE.patch_signals({"_formError": error, "_formSuccess": ""})
)
assert source is not None
2026-03-30 13:37:25 +02:00
try:
create_source(**source)
except IntegrityError:
return DatastarResponse(
SSE.patch_signals(
{"_formError": "Slug must be unique.", "_formSuccess": ""}
)
)
2026-03-30 14:02:39 +02:00
get_job_runtime(app).sync_jobs()
2026-03-30 13:23:36 +02:00
trigger_refresh(app)
return DatastarResponse(SSE.redirect("/sources"))
2026-03-30 11:42:13 +02:00
2026-03-30 13:49:00 +02:00
@app.post("/actions/sources/<string:slug>/edit")
async def edit_source_action(slug: str) -> DatastarResponse:
signals = cast(dict[str, object], await read_signals())
source, error = validate_source_form(
signals,
slug_exists=lambda candidate: candidate != slug
and source_slug_exists(candidate),
immutable_slug=slug,
)
if error is not None:
return DatastarResponse(
SSE.patch_signals({"_formError": error, "_formSuccess": ""})
)
assert source is not None
if update_source(slug, **source) is None:
return DatastarResponse(
SSE.patch_signals(
{"_formError": "Source does not exist.", "_formSuccess": ""}
)
)
2026-03-30 14:02:39 +02:00
get_job_runtime(app).sync_jobs()
2026-03-30 13:49:00 +02:00
trigger_refresh(app)
return DatastarResponse(SSE.redirect("/sources"))
@app.post("/actions/sources/<string:slug>/delete")
async def delete_source_action(slug: str) -> Response:
delete_source(slug)
get_job_runtime(app).sync_jobs()
trigger_refresh(app)
return Response(status=204)
2026-03-30 18:26:02 +02:00
@app.post("/actions/settings")
async def update_settings_action() -> DatastarResponse:
signals = cast(dict[str, object], await read_signals())
settings, error = validate_settings_form(signals)
if error is not None:
return DatastarResponse(
SSE.patch_signals({"_formError": error, "_formSuccess": ""})
)
assert settings is not None
save_setting("max_concurrent_jobs", settings["max_concurrent_jobs"])
2026-03-31 12:14:47 +02:00
save_setting("feed_url", settings["feed_url"])
2026-03-30 18:26:02 +02:00
trigger_refresh(app)
return DatastarResponse(SSE.redirect("/settings"))
2026-03-30 13:11:37 +02:00
@app.post("/runs")
async def runs_patch() -> DatastarResponse:
return await _page_patch_response(
app,
lambda tab_id: render_runs(app, tab_id=tab_id),
)
@app.post("/actions/runs/completed-page/<int:page>")
async def set_completed_runs_page_action(page: int) -> Response:
signals = await _read_optional_signals()
tab_id = _read_tab_id(signals)
if tab_id is None:
return Response(status=400)
get_tab_state_store(app).update_page_state(
tab_id,
RUNS_TAB_STATE_KEY,
lambda state: {**state, "completed_page": max(1, page)},
)
trigger_refresh(app, tab_id=tab_id)
return Response(status=204)
2026-03-30 14:02:39 +02:00
@app.post("/actions/jobs/<int:job_id>/run-now")
async def run_job_now_action(job_id: int) -> Response:
get_job_runtime(app).run_job_now(job_id, reason="manual")
trigger_refresh(app)
return Response(status=204)
@app.post("/actions/jobs/<int:job_id>/toggle-enabled")
async def toggle_job_enabled_action(job_id: int) -> Response:
enabled = load_job_enabled(job_id)
if enabled is not None:
get_job_runtime(app).set_job_enabled(job_id, enabled=not enabled)
2026-03-30 14:02:39 +02:00
trigger_refresh(app)
return Response(status=204)
@app.post("/actions/jobs/<int:job_id>/delete")
async def delete_job_action(job_id: int) -> Response:
delete_job_source(job_id)
get_job_runtime(app).sync_jobs()
trigger_refresh(app)
return Response(status=204)
@app.post("/actions/executions/<int:execution_id>/cancel")
async def cancel_execution_action(execution_id: int) -> Response:
get_job_runtime(app).request_execution_cancel(execution_id)
trigger_refresh(app)
return Response(status=204)
2026-03-30 12:48:32 +02:00
2026-03-31 09:24:46 +02:00
@app.post("/actions/queued-executions/<int:execution_id>/cancel")
async def cancel_queued_execution_action(execution_id: int) -> Response:
get_job_runtime(app).cancel_queued_execution(execution_id)
trigger_refresh(app)
return Response(status=204)
@app.post("/actions/queued-executions/<int:execution_id>/move-up")
async def move_queued_execution_up_action(execution_id: int) -> Response:
get_job_runtime(app).move_queued_execution(execution_id, direction="up")
return Response(status=204)
@app.post("/actions/queued-executions/<int:execution_id>/move-down")
async def move_queued_execution_down_action(execution_id: int) -> Response:
get_job_runtime(app).move_queued_execution(execution_id, direction="down")
return Response(status=204)
@app.post("/actions/completed-executions/clear")
async def clear_completed_executions_action() -> Response:
clear_completed_executions(log_dir=app.config["REPUB_LOG_DIR"])
trigger_refresh(app)
return Response(status=204)
2026-03-30 13:11:37 +02:00
@app.post("/job/<int:job_id>/execution/<int:execution_id>/logs")
async def logs_patch(job_id: int, execution_id: int) -> DatastarResponse:
async def render() -> Renderable:
2026-03-30 14:02:39 +02:00
return await render_execution_logs(
app, job_id=job_id, execution_id=execution_id
)
2026-03-30 13:11:37 +02:00
return await _page_patch_response(app, lambda _tab_id: render())
2026-03-30 12:48:32 +02:00
2026-03-30 14:02:39 +02:00
@app.before_serving
async def start_runtime() -> None:
get_job_runtime(app).start()
app.extensions[TAB_STATE_CLEANER_TASK_KEY] = asyncio.create_task(
_clean_tab_state_periodically(app)
)
2026-03-30 14:02:39 +02:00
@app.after_serving
async def stop_runtime() -> None:
cleaner = cast(
asyncio.Task[None] | None, app.extensions.get(TAB_STATE_CLEANER_TASK_KEY)
)
if cleaner is not None:
cleaner.cancel()
with suppress(asyncio.CancelledError):
await cleaner
2026-03-30 14:02:39 +02:00
get_job_runtime(app).shutdown()
2026-03-30 11:42:13 +02:00
return app
2026-03-30 12:34:38 +02:00
def get_refresh_broker(app: Quart) -> RefreshBroker:
return cast(RefreshBroker, app.extensions[REFRESH_BROKER_KEY])
2026-03-31 12:47:36 +02:00
def get_shutdown_event(app: Quart) -> asyncio.Event | None:
return cast(asyncio.Event | None, app.extensions.get(SHUTDOWN_EVENT_KEY))
def get_tab_state_store(app: Quart) -> TabStateStore:
return cast(TabStateStore, app.extensions[TAB_STATE_STORE_KEY])
2026-03-30 14:02:39 +02:00
def get_job_runtime(app: Quart) -> JobRuntime:
runtime = cast(JobRuntime | None, app.extensions.get(JOB_RUNTIME_KEY))
if runtime is None:
runtime = JobRuntime(
log_dir=app.config["REPUB_LOG_DIR"],
refresh_callback=lambda event="refresh-event": trigger_refresh(app, event),
2026-03-30 14:02:39 +02:00
)
app.extensions[JOB_RUNTIME_KEY] = runtime
return runtime
def trigger_refresh(
app: Quart, event: object = "refresh-event", *, tab_id: str | None = None
) -> None:
get_refresh_broker(app).publish(event, tab_id=tab_id)
2026-03-30 12:34:38 +02:00
2026-03-30 14:02:39 +02:00
async def render_dashboard(app: Quart | None = None) -> Renderable:
if app is None:
return dashboard_page_with_data()
view = load_dashboard_view(log_dir=app.config["REPUB_LOG_DIR"])
return dashboard_page_with_data(
snapshot=cast(dict[str, str], view["snapshot"]),
running_executions=cast(tuple[dict[str, object], ...], view["running"]),
queued_executions=cast(tuple[dict[str, object], ...], view["queued"]),
2026-03-30 15:21:39 +02:00
source_feeds=cast(tuple[dict[str, object], ...], view["source_feeds"]),
2026-03-30 14:02:39 +02:00
)
2026-03-30 12:34:38 +02:00
2026-03-30 13:23:36 +02:00
async def render_sources(app: Quart | None = None) -> Renderable:
2026-03-30 18:26:02 +02:00
if app is None:
return sources_page()
sources = load_sources()
return sources_page(
sources=sources,
running_count=len(
load_runs_view(log_dir=app.config["REPUB_LOG_DIR"])["running"]
),
)
2026-03-30 13:23:36 +02:00
async def render_create_source(app: Quart | None = None) -> Renderable:
2026-03-30 18:26:02 +02:00
if app is None:
return create_source_page()
2026-03-30 12:34:38 +02:00
2026-03-30 18:26:02 +02:00
sidebar_counts = _load_sidebar_counts(app)
return create_source_page(
source_count=sidebar_counts["source_count"],
running_count=sidebar_counts["running_count"],
)
2026-03-30 12:34:38 +02:00
2026-03-30 18:26:02 +02:00
async def render_edit_source(slug: str, app: Quart | None = None) -> Renderable:
2026-03-30 13:49:00 +02:00
source = load_source_form(slug)
if source is None:
return sources_page(sources=())
return edit_source_page(
slug=slug,
source=source,
action_path=f"/actions/sources/{slug}/edit",
2026-03-30 18:26:02 +02:00
**({} if app is None else _load_sidebar_counts(app)),
2026-03-30 13:49:00 +02:00
)
async def render_runs(
app: Quart | None = None, *, tab_id: str | None = None
) -> Renderable:
2026-03-30 14:02:39 +02:00
if app is None:
return runs_page()
2026-03-30 12:34:38 +02:00
tab_state = get_tab_state_store(app).get_page_state(tab_id, RUNS_TAB_STATE_KEY)
resolved_completed_page = max(1, _read_int(tab_state.get("completed_page"), 1))
view = load_runs_view(
log_dir=app.config["REPUB_LOG_DIR"],
completed_page=resolved_completed_page,
completed_page_size=COMPLETED_EXECUTION_PAGE_SIZE,
)
2026-03-30 14:02:39 +02:00
return runs_page(
running_executions=cast(tuple[dict[str, object], ...], view["running"]),
2026-03-31 09:24:46 +02:00
queued_executions=cast(tuple[dict[str, object], ...], view["queued"]),
2026-03-30 14:02:39 +02:00
upcoming_jobs=cast(tuple[dict[str, object], ...], view["upcoming"]),
completed_executions=cast(tuple[dict[str, object], ...], view["completed"]),
completed_page=cast(int, view["completed_page"]),
completed_page_size=cast(int, view["completed_page_size"]),
completed_total_count=cast(int, view["completed_total_count"]),
completed_total_pages=cast(int, view["completed_total_pages"]),
2026-03-30 18:26:02 +02:00
source_count=len(load_sources()),
)
async def render_settings(app: Quart | None = None) -> Renderable:
if app is None:
return settings_page(settings=load_settings_form())
sidebar_counts = _load_sidebar_counts(app)
return settings_page(
settings=load_settings_form(),
source_count=sidebar_counts["source_count"],
running_count=sidebar_counts["running_count"],
2026-03-30 14:02:39 +02:00
)
2026-03-30 12:34:38 +02:00
2026-03-30 14:02:39 +02:00
async def render_execution_logs(
app: Quart | None = None, *, job_id: int, execution_id: int
) -> Renderable:
if app is None:
return execution_logs_page(job_id=job_id, execution_id=execution_id)
log_view = load_execution_log_view(
log_dir=app.config["REPUB_LOG_DIR"],
job_id=job_id,
execution_id=execution_id,
)
return execution_logs_page(
job_id=job_id,
execution_id=execution_id,
log_view={
"title": log_view.title,
"description": log_view.description,
"status_label": log_view.status_label,
"status_tone": log_view.status_tone,
"log_text": log_view.log_text,
"error_message": log_view.error_message,
},
)
2026-03-30 12:48:32 +02:00
async def _page_patch_response(
app: Quart, render: PatchRenderFunction
) -> DatastarResponse:
signals = await _read_optional_signals()
tab_id = _read_tab_id(signals)
if tab_id is not None:
get_tab_state_store(app).connect(tab_id)
queue = get_refresh_broker(app).subscribe(tab_id=tab_id)
2026-03-30 13:11:37 +02:00
stream = render_stream(
queue,
render=lambda: render(tab_id),
2026-03-30 13:11:37 +02:00
last_event_id=request.headers.get("last-event-id"),
2026-03-31 12:47:36 +02:00
shutdown_event=get_shutdown_event(app),
2026-03-30 12:48:32 +02:00
)
return DatastarResponse(_unsubscribe_on_close(queue, stream, app, tab_id=tab_id))
2026-03-30 12:48:32 +02:00
2026-03-30 13:11:37 +02:00
async def _unsubscribe_on_close(
queue: object,
stream: AsyncGenerator[DatastarEvent, None],
app: Quart,
*,
tab_id: str | None,
2026-03-30 13:11:37 +02:00
) -> AsyncGenerator[DatastarEvent, None]:
try:
async for event in stream:
yield event
finally:
get_refresh_broker(app).unsubscribe(cast(asyncio.Queue[object], queue))
if tab_id is not None:
get_tab_state_store(app).disconnect(tab_id)
2026-03-30 13:23:36 +02:00
2026-03-30 18:26:02 +02:00
def _load_sidebar_counts(app: Quart) -> dict[str, int]:
return {
"source_count": len(load_sources()),
"running_count": len(
load_runs_view(log_dir=app.config["REPUB_LOG_DIR"])["running"]
),
}
async def _clean_tab_state_periodically(app: Quart) -> None:
while True:
await asyncio.sleep(TAB_STATE_CLEAN_INTERVAL.total_seconds())
get_tab_state_store(app).cleanup_stale()
async def _read_optional_signals() -> dict[str, object] | None:
content_type = request.headers.get("Content-Type", "")
if request.content_length in (None, 0) and "application/json" not in content_type:
return None
try:
return cast(dict[str, object] | None, await read_signals())
except Exception:
return None
def _read_tab_id(signals: dict[str, object] | None) -> str | None:
if signals is None:
return None
tab_id = _read_string(signals, "tabid")
return tab_id or None
2026-03-30 13:23:36 +02:00
def validate_source_form(
signals: dict[str, object] | None,
*,
2026-03-30 13:37:25 +02:00
slug_exists: Callable[[str], bool],
2026-03-30 13:49:00 +02:00
immutable_slug: str | None = None,
2026-03-30 13:37:25 +02:00
) -> tuple[SourceFormData | None, str | None]:
2026-03-30 13:23:36 +02:00
if signals is None:
return None, "Missing form data."
source_name = _read_string(signals, "sourceName")
source_slug = _read_string(signals, "sourceSlug")
source_type = _read_string(signals, "sourceType")
feed_url = _read_string(signals, "feedUrl")
pangea_domain = _read_string(signals, "pangeaDomain")
pangea_category = _read_string(signals, "pangeaCategory", strip=False)
2026-03-30 13:23:36 +02:00
content_format = _read_string(signals, "contentFormat")
content_type = _read_string(signals, "contentType")
max_articles = _read_string(signals, "maxArticles")
oldest_article = _read_string(signals, "oldestArticle")
source_notes = _read_string(signals, "sourceNotes")
2026-03-30 13:37:25 +02:00
spider_arguments = _normalize_multiline(_read_string(signals, "spiderArguments"))
2026-03-30 13:23:36 +02:00
cron_minute = _read_string(signals, "cronMinute")
cron_hour = _read_string(signals, "cronHour")
cron_day_of_month = _read_string(signals, "cronDayOfMonth")
cron_day_of_week = _read_string(signals, "cronDayOfWeek")
cron_month = _read_string(signals, "cronMonth")
errors: list[str] = []
if source_name == "":
errors.append("Source name is required.")
if source_slug == "":
errors.append("Slug is required.")
2026-03-30 13:49:00 +02:00
elif immutable_slug is not None and source_slug != immutable_slug:
errors.append("Slug is immutable.")
2026-03-30 13:37:25 +02:00
elif slug_exists(source_slug):
2026-03-30 13:23:36 +02:00
errors.append("Slug must be unique.")
if source_type not in {"feed", "pangea"}:
errors.append("Source type must be feed or pangea.")
if source_type == "feed":
if feed_url == "":
errors.append("Feed URL is required for feed sources.")
elif not _is_valid_url(feed_url):
errors.append("Feed URL must be a valid URL.")
if source_type == "pangea":
2026-03-30 13:37:25 +02:00
content_format = content_format or DEFAULT_PANGEA_CONTENT_FORMAT
content_type = content_type or DEFAULT_PANGEA_CONTENT_TYPE
max_articles = max_articles or DEFAULT_PANGEA_MAX_ARTICLES
oldest_article = oldest_article or DEFAULT_PANGEA_OLDEST_ARTICLE
2026-03-30 13:23:36 +02:00
if pangea_domain == "":
errors.append("Pangea domain is required.")
if pangea_category == "":
errors.append("Category name is required.")
if content_format not in PANGEA_CONTENT_FORMATS:
errors.append("Content format is invalid.")
if content_type not in PANGEA_CONTENT_TYPES:
errors.append("Content type is invalid.")
if _parse_int(max_articles) is None:
errors.append("Max articles must be an integer.")
if _parse_int(oldest_article) is None:
errors.append("Oldest article must be an integer.")
cron_values = (
cron_minute,
cron_hour,
cron_day_of_month,
cron_day_of_week,
cron_month,
)
if any(value == "" for value in cron_values):
errors.append("All cron fields are required.")
if errors:
return None, " ".join(errors)
enabled = _read_bool(signals, "jobEnabled")
2026-03-30 13:37:25 +02:00
source: SourceFormData = {
2026-03-30 13:23:36 +02:00
"name": source_name,
"slug": source_slug,
2026-03-30 13:37:25 +02:00
"source_type": source_type,
2026-03-30 13:23:36 +02:00
"notes": source_notes,
"spider_arguments": spider_arguments,
"feed_url": feed_url,
"pangea_domain": pangea_domain,
"pangea_category": pangea_category,
"content_format": content_format,
"content_type": content_type,
2026-03-30 13:37:25 +02:00
"max_articles": _parse_int(max_articles),
"oldest_article": _parse_int(oldest_article),
"enabled": enabled,
2026-03-30 18:26:02 +02:00
"convert_images": _read_bool(signals, "convertImages", default=True),
"convert_video": _read_bool(signals, "convertVideo", default=True),
2026-03-30 13:37:25 +02:00
"only_newest": _read_bool(signals, "onlyNewest", default=True),
"include_authors": _read_bool(signals, "includeAuthors", default=True),
"exclude_media": _read_bool(signals, "excludeMedia", default=False),
"include_content": _read_bool(signals, "includeContent", default=True),
2026-03-30 13:23:36 +02:00
"cron_minute": cron_minute,
"cron_hour": cron_hour,
"cron_day_of_month": cron_day_of_month,
"cron_day_of_week": cron_day_of_week,
"cron_month": cron_month,
}
return source, None
2026-03-30 18:26:02 +02:00
def validate_settings_form(
signals: dict[str, object] | None,
) -> tuple[SettingsFormData | None, str | None]:
if signals is None:
return None, "Missing form data."
max_concurrent_jobs = _parse_int(_read_string(signals, "maxConcurrentJobs"))
2026-03-31 12:14:47 +02:00
feed_url = _read_string(signals, "feedUrl").rstrip("/")
2026-03-30 18:26:02 +02:00
if max_concurrent_jobs is None:
return None, "Max concurrent jobs must be an integer."
if max_concurrent_jobs < 1:
return None, "Max concurrent jobs must be at least 1."
2026-03-31 12:14:47 +02:00
if feed_url != "" and not _is_valid_url(feed_url):
return None, "Feed URL must be a valid URL."
return {
"max_concurrent_jobs": max_concurrent_jobs,
"feed_url": feed_url,
}, None
2026-03-30 18:26:02 +02:00
def _read_string(signals: dict[str, object], key: str, *, strip: bool = True) -> str:
value = str(signals.get(key, ""))
return value.strip() if strip else value
2026-03-30 13:23:36 +02:00
2026-03-30 13:37:25 +02:00
def _read_bool(signals: dict[str, object], key: str, *, default: bool = False) -> bool:
value = signals.get(key, default)
2026-03-30 13:23:36 +02:00
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.lower() in {"true", "1", "on", "yes"}
return bool(value)
2026-03-30 13:37:25 +02:00
def _normalize_multiline(value: str) -> str:
return value.replace("\r\n", "\n").replace("\r", "\n")
2026-03-30 13:23:36 +02:00
def _parse_int(value: str) -> int | None:
try:
return int(value)
except ValueError:
return None
def _read_int(value: object, default: int) -> int:
if isinstance(value, bool):
return int(value)
if isinstance(value, int):
return value
if isinstance(value, str):
parsed = _parse_int(value)
return default if parsed is None else parsed
return default
2026-03-30 13:23:36 +02:00
def _is_valid_url(value: str) -> bool:
parsed = urlparse(value)
return parsed.scheme in {"http", "https"} and parsed.netloc != ""