From 847aeae7721a5de1dd23685eaf29130f7e0b8e8e Mon Sep 17 00:00:00 2001 From: Abel Luck Date: Mon, 30 Mar 2026 13:37:25 +0200 Subject: [PATCH] db backed source creation --- repub/model.py | 126 +++++++++++++++++++++++++++++++++++++++++ repub/pages/sources.py | 55 ++++-------------- repub/web.py | 117 ++++++++++++++++++++++++-------------- tests/test_model.py | 1 - tests/test_web.py | 112 ++++++++++++++++++++++++++++++++---- 5 files changed, 312 insertions(+), 99 deletions(-) diff --git a/repub/model.py b/repub/model.py index 8b26934..60c4bb4 100644 --- a/repub/model.py +++ b/repub/model.py @@ -84,6 +84,132 @@ def initialize_database(db_path: str | Path | None = None) -> Path: return resolved_path +def source_slug_exists(slug: str) -> bool: + with database.connection_context(): + return Source.select().where(Source.slug == slug).exists() + + +def create_source( + *, + name: str, + slug: str, + source_type: str, + notes: str, + spider_arguments: str, + enabled: bool, + cron_minute: str, + cron_hour: str, + cron_day_of_month: str, + cron_day_of_week: str, + cron_month: str, + feed_url: str = "", + pangea_domain: str = "", + pangea_category: str = "", + content_type: str = "", + only_newest: bool = True, + max_articles: int | None = None, + oldest_article: int | None = None, + include_authors: bool = True, + exclude_media: bool = False, + include_content: bool = True, + content_format: str = "", +) -> Source: + with database.connection_context(): + with database.atomic(): + source = Source.create( + name=name, + slug=slug, + source_type=source_type, + notes=notes, + ) + if source_type == "feed": + SourceFeed.create( + source=source, + feed_url=feed_url, + ) + else: + SourcePangea.create( + source=source, + domain=pangea_domain, + category_name=pangea_category, + content_type=content_type, + only_newest=only_newest, + max_articles=max_articles, + oldest_article=oldest_article, + include_authors=include_authors, + exclude_media=exclude_media, + include_content=include_content, + content_format=content_format, + ) + Job.create( + source=source, + enabled=enabled, + spider_arguments=spider_arguments, + cron_minute=cron_minute, + cron_hour=cron_hour, + cron_day_of_month=cron_day_of_month, + cron_day_of_week=cron_day_of_week, + cron_month=cron_month, + ) + return source + + +def load_sources() -> tuple[dict[str, object], ...]: + with database.connection_context(): + sources = tuple(Source.select().order_by(Source.created_at.desc())) + source_ids = tuple(int(source.get_id()) for source in sources) + if not source_ids: + return () + jobs = { + job.source_id: job for job in Job.select().where(Job.source.in_(source_ids)) + } + feed_configs = { + config.source_id: config + for config in SourceFeed.select().where(SourceFeed.source.in_(source_ids)) + } + pangea_configs = { + config.source_id: config + for config in SourcePangea.select().where( + SourcePangea.source.in_(source_ids) + ) + } + return tuple( + _project_source(source, jobs, feed_configs, pangea_configs) + for source in sources + ) + + +def _project_source( + source: "Source", + jobs: dict[int, "Job"], + feed_configs: dict[int, "SourceFeed"], + pangea_configs: dict[int, "SourcePangea"], +) -> dict[str, object]: + source_id = int(source.get_id()) + job = jobs[source_id] + if source.source_type == "feed": + upstream = feed_configs[source_id].feed_url + source_type = "Feed" + else: + pangea = pangea_configs[source_id] + upstream = f"{pangea.domain} / {pangea.category_name}" + source_type = "Pangea" + + return { + "name": source.name, + "slug": source.slug, + "source_type": source_type, + "upstream": upstream, + "schedule": ( + f"cron: {job.cron_minute} {job.cron_hour} {job.cron_day_of_month} " + f"{job.cron_month} {job.cron_day_of_week}" + ), + "last_run": "Never run", + "state": "Enabled" if job.enabled else "Disabled", + "state_tone": "scheduled" if job.enabled else "idle", + } + + class BaseModel(Model): class Meta: database = database diff --git a/repub/pages/sources.py b/repub/pages/sources.py index ea2af13..2da44f9 100644 --- a/repub/pages/sources.py +++ b/repub/pages/sources.py @@ -40,39 +40,6 @@ PANGEA_CONTENT_TYPES = ( "topstories", ) -DEFAULT_SOURCES: tuple[dict[str, str], ...] = ( - { - "name": "Guardian feed mirror", - "slug": "guardian-feed", - "source_type": "Feed", - "upstream": "https://guardianproject.info/feed.xml", - "schedule": "Every 30 minutes", - "last_run": "Succeeded 53m ago", - "state": "Enabled", - "state_tone": "scheduled", - }, - { - "name": "Pangea mobile articles", - "slug": "pangea-mobile", - "source_type": "Pangea", - "upstream": "guardianproject.info / News", - "schedule": "Every 4 hours", - "last_run": "Running now", - "state": "Enabled", - "state_tone": "running", - }, - { - "name": "Podcast enclosure mirror", - "slug": "podcast-audio", - "source_type": "Feed", - "upstream": "https://guardianproject.info/podcast/podcast.xml", - "schedule": "Paused", - "last_run": "Failed 2h ago", - "state": "Disabled", - "state_tone": "idle", - }, -) - def _source_row(source: Mapping[str, object]) -> tuple[Node, ...]: return ( @@ -106,7 +73,7 @@ def _source_row(source: Mapping[str, object]) -> tuple[Node, ...]: def sources_table( *, sources: tuple[Mapping[str, object], ...] | None = None ) -> Renderable: - rows = tuple(_source_row(source) for source in (sources or DEFAULT_SOURCES)) + rows = tuple(_source_row(source) for source in (sources or ())) return table_section( eyebrow="Inventory", title="Sources", @@ -175,13 +142,11 @@ def create_source_form(*, action_path: str = "/actions/sources/create") -> Rende input_field( label="Source name", field_id="source-name", - value="Pangea mobile articles", signal_name="sourceName", ), input_field( label="Slug", field_id="source-slug", - value="pangea-mobile", help_text="Immutable after creation.", signal_name="sourceSlug", ), @@ -244,13 +209,11 @@ def create_source_form(*, action_path: str = "/actions/sources/create") -> Rende input_field( label="Pangea domain", field_id="pangea-domain", - value="guardianproject.info", signal_name="pangeaDomain", ), input_field( label="Category name", field_id="pangea-category", - value="News", signal_name="pangeaCategory", ), select_field( @@ -299,19 +262,25 @@ def create_source_form(*, action_path: str = "/actions/sources/create") -> Rende signal_name="excludeMedia", checked=False, ), + toggle_field( + label="Include content", + description="Store article body content in mirrored output when the upstream provides it.", + signal_name="includeContent", + checked=True, + ), ], ], h.div(class_="grid gap-4 lg:grid-cols-2")[ textarea_field( label="Notes", field_id="source-notes", - value="Primary Pangea mobile article mirror for the operator landing page.", + value="", signal_name="sourceNotes", ), textarea_field( label="Spider arguments", field_id="spider-arguments", - value="language=en,download_media=true", + value="language=en\ndownload_media=true", signal_name="spiderArguments", ), ], @@ -331,13 +300,13 @@ def create_source_form(*, action_path: str = "/actions/sources/create") -> Rende input_field( label="Minute", field_id="cron-minute", - value="15", + value="*/30", signal_name="cronMinute", ), input_field( label="Hour", field_id="cron-hour", - value="*/4", + value="*", signal_name="cronHour", ), input_field( @@ -349,7 +318,7 @@ def create_source_form(*, action_path: str = "/actions/sources/create") -> Rende input_field( label="Day of week", field_id="cron-day-of-week", - value="1-6", + value="*", signal_name="cronDayOfWeek", ), input_field( diff --git a/repub/web.py b/repub/web.py index 86c9a71..350de6a 100644 --- a/repub/web.py +++ b/repub/web.py @@ -3,7 +3,7 @@ from __future__ import annotations import asyncio import hashlib from collections.abc import AsyncGenerator, Awaitable, Callable -from typing import cast +from typing import TypedDict, cast from urllib.parse import urlparse import htpy as h @@ -11,10 +11,16 @@ from datastar_py import ServerSentEventGenerator as SSE from datastar_py.quart import DatastarResponse, read_signals from datastar_py.sse import DatastarEvent from htpy import Renderable +from peewee import IntegrityError from quart import Quart, Response, request, url_for from repub.datastar import RefreshBroker, render_stream -from repub.model import initialize_database +from repub.model import ( + create_source, + initialize_database, + load_sources, + source_slug_exists, +) from repub.pages import ( create_source_page, dashboard_page, @@ -23,18 +29,44 @@ from repub.pages import ( shim_page, sources_page, ) -from repub.pages.sources import ( - DEFAULT_SOURCES, - PANGEA_CONTENT_FORMATS, - PANGEA_CONTENT_TYPES, -) +from repub.pages.sources import PANGEA_CONTENT_FORMATS, PANGEA_CONTENT_TYPES REFRESH_BROKER_KEY = "repub.refresh_broker" -SOURCES_KEY = "repub.sources" RenderFunction = Callable[[], Awaitable[Renderable]] +class SourceFormData(TypedDict): + name: str + slug: str + source_type: str + notes: str + spider_arguments: str + enabled: bool + cron_minute: str + cron_hour: str + cron_day_of_month: str + cron_day_of_week: str + cron_month: str + feed_url: str + pangea_domain: str + pangea_category: str + content_format: str + content_type: str + max_articles: int | None + oldest_article: int | None + only_newest: bool + include_authors: bool + exclude_media: bool + include_content: bool + + +DEFAULT_PANGEA_CONTENT_FORMAT = "MOBILE_3" +DEFAULT_PANGEA_CONTENT_TYPE = "articles" +DEFAULT_PANGEA_MAX_ARTICLES = "10" +DEFAULT_PANGEA_OLDEST_ARTICLE = "3" + + def _render_shim_page(*, stylesheet_href: str, datastar_src: str) -> tuple[str, str]: head = ( h.title["Republisher Admin UI"], @@ -49,7 +81,6 @@ def create_app() -> Quart: app = Quart(__name__) app.config["REPUB_DB_PATH"] = str(initialize_database()) app.extensions[REFRESH_BROKER_KEY] = RefreshBroker() - app.extensions[SOURCES_KEY] = _default_sources_dict() @app.get("/") @app.get("/sources") @@ -90,7 +121,7 @@ def create_app() -> Quart: signals = cast(dict[str, object], await read_signals()) source, error = validate_source_form( signals, - existing_sources=get_sources_dict(app), + slug_exists=source_slug_exists, ) if error is not None: return DatastarResponse( @@ -98,7 +129,14 @@ def create_app() -> Quart: ) assert source is not None - get_sources_dict(app)[str(source["slug"])] = source + try: + create_source(**source) + except IntegrityError: + return DatastarResponse( + SSE.patch_signals( + {"_formError": "Slug must be unique.", "_formSuccess": ""} + ) + ) trigger_refresh(app) return DatastarResponse(SSE.redirect("/sources")) @@ -128,12 +166,8 @@ async def render_dashboard() -> Renderable: return dashboard_page() -def get_sources_dict(app: Quart) -> dict[str, dict[str, object]]: - return cast(dict[str, dict[str, object]], app.extensions[SOURCES_KEY]) - - async def render_sources(app: Quart | None = None) -> Renderable: - sources = None if app is None else tuple(get_sources_dict(app).values()) + sources = None if app is None else load_sources() return sources_page(sources=sources) @@ -170,15 +204,11 @@ async def _unsubscribe_on_close( get_refresh_broker(app).unsubscribe(cast(asyncio.Queue[object], queue)) -def _default_sources_dict() -> dict[str, dict[str, object]]: - return {source["slug"]: dict(source) for source in DEFAULT_SOURCES} - - def validate_source_form( signals: dict[str, object] | None, *, - existing_sources: dict[str, dict[str, object]], -) -> tuple[dict[str, object] | None, str | None]: + slug_exists: Callable[[str], bool], +) -> tuple[SourceFormData | None, str | None]: if signals is None: return None, "Missing form data." @@ -193,7 +223,7 @@ def validate_source_form( max_articles = _read_string(signals, "maxArticles") oldest_article = _read_string(signals, "oldestArticle") source_notes = _read_string(signals, "sourceNotes") - spider_arguments = _read_string(signals, "spiderArguments") + spider_arguments = _normalize_multiline(_read_string(signals, "spiderArguments")) cron_minute = _read_string(signals, "cronMinute") cron_hour = _read_string(signals, "cronHour") cron_day_of_month = _read_string(signals, "cronDayOfMonth") @@ -205,7 +235,7 @@ def validate_source_form( errors.append("Source name is required.") if source_slug == "": errors.append("Slug is required.") - elif source_slug in existing_sources: + elif slug_exists(source_slug): errors.append("Slug must be unique.") if source_type not in {"feed", "pangea"}: @@ -218,6 +248,10 @@ def validate_source_form( errors.append("Feed URL must be a valid URL.") if source_type == "pangea": + content_format = content_format or DEFAULT_PANGEA_CONTENT_FORMAT + content_type = content_type or DEFAULT_PANGEA_CONTENT_TYPE + max_articles = max_articles or DEFAULT_PANGEA_MAX_ARTICLES + oldest_article = oldest_article or DEFAULT_PANGEA_OLDEST_ARTICLE if pangea_domain == "": errors.append("Pangea domain is required.") if pangea_category == "": @@ -245,33 +279,24 @@ def validate_source_form( return None, " ".join(errors) enabled = _read_bool(signals, "jobEnabled") - source = { + source: SourceFormData = { "name": source_name, "slug": source_slug, - "source_type": "Feed" if source_type == "feed" else "Pangea", - "upstream": ( - feed_url - if source_type == "feed" - else f"{pangea_domain} / {pangea_category}" - ), - "schedule": f"cron: {cron_minute} {cron_hour} {cron_day_of_month} {cron_month} {cron_day_of_week}", - "last_run": "Never run", - "state": "Enabled" if enabled else "Disabled", - "state_tone": "scheduled" if enabled else "idle", + "source_type": source_type, "notes": source_notes, "spider_arguments": spider_arguments, - "source_kind": source_type, "feed_url": feed_url, "pangea_domain": pangea_domain, "pangea_category": pangea_category, "content_format": content_format, "content_type": content_type, - "max_articles": max_articles, - "oldest_article": oldest_article, - "job_enabled": enabled, - "only_newest": _read_bool(signals, "onlyNewest"), - "include_authors": _read_bool(signals, "includeAuthors"), - "exclude_media": _read_bool(signals, "excludeMedia"), + "max_articles": _parse_int(max_articles), + "oldest_article": _parse_int(oldest_article), + "enabled": enabled, + "only_newest": _read_bool(signals, "onlyNewest", default=True), + "include_authors": _read_bool(signals, "includeAuthors", default=True), + "exclude_media": _read_bool(signals, "excludeMedia", default=False), + "include_content": _read_bool(signals, "includeContent", default=True), "cron_minute": cron_minute, "cron_hour": cron_hour, "cron_day_of_month": cron_day_of_month, @@ -285,8 +310,8 @@ def _read_string(signals: dict[str, object], key: str) -> str: return str(signals.get(key, "")).strip() -def _read_bool(signals: dict[str, object], key: str) -> bool: - value = signals.get(key, False) +def _read_bool(signals: dict[str, object], key: str, *, default: bool = False) -> bool: + value = signals.get(key, default) if isinstance(value, bool): return value if isinstance(value, str): @@ -294,6 +319,10 @@ def _read_bool(signals: dict[str, object], key: str) -> bool: return bool(value) +def _normalize_multiline(value: str) -> str: + return value.replace("\r\n", "\n").replace("\r", "\n") + + def _parse_int(value: str) -> int | None: try: return int(value) diff --git a/tests/test_model.py b/tests/test_model.py index b27bf8d..2df0b8f 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -53,7 +53,6 @@ def test_initialize_database_bootstraps_schema_from_sql_files(tmp_path: Path) -> assert table_names == { "job", "job_execution", - "settings", "source", "source_feed", "source_pangea", diff --git a/tests/test_web.py b/tests/test_web.py index 9f0475f..b866934 100644 --- a/tests/test_web.py +++ b/tests/test_web.py @@ -5,10 +5,10 @@ from pathlib import Path from typing import Any, cast from repub.datastar import RefreshBroker, render_sse_event, render_stream +from repub.model import Job, Source, SourceFeed, SourcePangea from repub.web import ( create_app, get_refresh_broker, - get_sources_dict, render_create_source, render_dashboard, render_execution_logs, @@ -161,8 +161,8 @@ def test_render_sources_shows_table_and_create_link() -> None: assert "Configured feed and Pangea sources live here as tables" in body assert ">Sources<" in body assert 'href="/sources/create"' in body - assert "guardian-feed" in body - assert "podcast-audio" in body + assert "guardian-feed" not in body + assert "podcast-audio" not in body asyncio.run(run()) @@ -181,17 +181,37 @@ def test_render_create_source_shows_dedicated_form_page() -> None: assert "onlyNewest" in body assert "includeAuthors" in body assert "excludeMedia" in body + assert "includeContent" in body assert "TEXT_ONLY" in body assert "breakingnews" in body assert "Pangea domain" in body assert "Feed URL" in body assert "Cron schedule" in body assert "Initial job state" in body + assert "Pangea mobile articles" not in body + assert "pangea-mobile" not in body + assert "guardianproject.info" not in body + assert ( + "Primary Pangea mobile article mirror for the operator landing page." + not in body + ) + assert "language=en,download_media=true" not in body + assert "language=en\ndownload_media=true" in body + assert 'value="articles"' in body + assert 'value="10"' in body + assert 'value="3"' in body + assert 'value="*/30"' in body + assert 'value="*"' in body asyncio.run(run()) -def test_create_source_action_adds_new_source_to_in_memory_store() -> None: +def test_create_source_action_creates_pangea_source_and_job_in_database( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "sources.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + async def run() -> None: app = create_app() client = app.test_client() @@ -210,7 +230,7 @@ def test_create_source_action_adds_new_source_to_in_memory_store() -> None: "maxArticles": "12", "oldestArticle": "5", "sourceNotes": "Regional health alerts.", - "spiderArguments": "language=en", + "spiderArguments": "language=en\ndownload_media=true", "cronMinute": "0", "cronHour": "*/6", "cronDayOfMonth": "*", @@ -226,17 +246,89 @@ def test_create_source_action_adds_new_source_to_in_memory_store() -> None: assert response.status_code == 200 assert "window.location = '/sources'" in body - assert "kenya-health" in get_sources_dict(app) - assert get_sources_dict(app)["kenya-health"]["content_type"] == "breakingnews" + + source = Source.get(Source.slug == "kenya-health") + pangea = SourcePangea.get(SourcePangea.source == source) + job = Job.get(Job.source == source) + rendered_sources = str(await render_sources(app)) + + assert source.name == "Kenya health desk" + assert source.source_type == "pangea" + assert pangea.content_type == "breakingnews" + assert pangea.include_content is True + assert job.enabled is True + assert job.spider_arguments == "language=en\ndownload_media=true" + assert job.cron_hour == "*/6" + assert "kenya-health" in rendered_sources + assert "example.org / Health" in rendered_sources + assert "Enabled" in rendered_sources asyncio.run(run()) -def test_create_source_action_validates_duplicate_slug_and_pangea_type() -> None: +def test_create_source_action_creates_feed_source_and_job_in_database( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "feed-sources.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + async def run() -> None: app = create_app() client = app.test_client() + response = await client.post( + "/actions/sources/create", + headers={"Datastar-Request": "true"}, + json={ + "sourceName": "NASA feed", + "sourceSlug": "nasa-feed", + "sourceType": "feed", + "feedUrl": "https://www.nasa.gov/rss/dyn/breaking_news.rss", + "sourceNotes": "Primary NASA mirror.", + "spiderArguments": "", + "cronMinute": "30", + "cronHour": "*", + "cronDayOfMonth": "*", + "cronDayOfWeek": "*", + "cronMonth": "*", + "jobEnabled": False, + }, + ) + body = await response.get_data(as_text=True) + + assert response.status_code == 200 + assert "window.location = '/sources'" in body + + source = Source.get(Source.slug == "nasa-feed") + feed = SourceFeed.get(SourceFeed.source == source) + job = Job.get(Job.source == source) + rendered_sources = str(await render_sources(app)) + + assert source.source_type == "feed" + assert feed.feed_url == "https://www.nasa.gov/rss/dyn/breaking_news.rss" + assert job.enabled is False + assert "nasa-feed" in rendered_sources + assert "https://www.nasa.gov/rss/dyn/breaking_news.rss" in rendered_sources + assert "Disabled" in rendered_sources + + asyncio.run(run()) + + +def test_create_source_action_validates_duplicate_slug_and_pangea_type( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "duplicate.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + + async def run() -> None: + app = create_app() + Source.create( + name="Guardian feed mirror", + slug="guardian-feed", + source_type="feed", + ) + client = app.test_client() + response = await client.post( "/actions/sources/create", headers={"Datastar-Request": "true"}, @@ -265,9 +357,7 @@ def test_create_source_action_validates_duplicate_slug_and_pangea_type() -> None assert "Content format is invalid." in body assert "Content type is invalid." in body assert "Max articles must be an integer." in body - assert "Duplicate guardian" not in { - str(source["name"]) for source in get_sources_dict(app).values() - } + assert Source.select().where(Source.name == "Duplicate guardian").count() == 0 asyncio.run(run())