from __future__ import annotations import asyncio import os from datetime import UTC, datetime, timedelta from pathlib import Path from typing import Any, cast from repub.components import status_badge from repub.datastar import RefreshBroker, render_sse_event, render_stream from repub.jobs import load_dashboard_view from repub.model import ( Job, JobExecution, JobExecutionStatus, Source, SourceFeed, SourcePangea, create_source, ) from repub.pages.runs import runs_page from repub.web import ( create_app, get_refresh_broker, render_create_source, render_dashboard, render_edit_source, render_execution_logs, render_runs, render_sources, ) def test_status_badge_uses_green_done_tone() -> None: badge = str(status_badge(label="Succeeded", tone="done")) assert "bg-emerald-100 text-emerald-800" in badge assert "Succeeded" in badge def test_runs_page_renders_completed_execution_end_time_as_relative_hoverable_time() -> ( None ): ended_at = "2026-01-15T10:00:00+00:00" body = str( runs_page( completed_executions=( { "source": "Completed source", "slug": "completed-source", "job_id": 7, "execution_id": 42, "ended_at": "2 hours ago", "ended_at_iso": ended_at, "status": "Succeeded", "status_tone": "done", "stats": "1 requests • 1 items • 1 bytes", "summary": "Worker exited successfully", "log_href": "/job/7/execution/42/logs", }, ) ) ) assert "data-ended-at" in body assert f'data-ended-at="{ended_at}"' in body assert f'datetime="{ended_at}"' in body assert f'title="{ended_at}"' in body assert ">2 hours ago<" in body def test_root_get_serves_datastar_shim() -> None: async def run() -> None: client = create_app().test_client() response = await client.get("/") body = await response.get_data(as_text=True) assert response.status_code == 200 assert response.headers["ETag"] assert body.startswith("") assert ( '' in body ) assert 'data-signals:tabid="self.crypto.randomUUID().substring(0,8)"' in body assert 'data-init="@post(window.location.pathname +' in body assert "retryMaxCount: Infinity" in body assert "data-on:online__window=" in body assert '
None: monkeypatch.chdir(tmp_path) app = create_app() assert Path(app.config["REPUB_DB_PATH"]) == tmp_path / "republisher.db" assert (tmp_path / "republisher.db").exists() def test_root_get_honors_if_none_match() -> None: async def run() -> None: client = create_app().test_client() initial = await client.get("/") etag = initial.headers["ETag"] response = await client.get("/", headers={"If-None-Match": etag}) assert response.status_code == 304 assert response.headers["ETag"] == etag asyncio.run(run()) def test_dashboard_post_serves_morph_component() -> None: async def run() -> None: client = create_app().test_client() async with client.request("/?u=shim", method="POST") as connection: await connection.send_complete() chunk = await asyncio.wait_for(connection.receive(), timeout=1) raw_connection = cast(Any, connection) assert raw_connection.status_code == 200 assert raw_connection.headers["Content-Type"] == "text/event-stream" assert b"event: datastar-patch-elements" in chunk assert b"id: " in chunk assert b'
None: async def run() -> None: async def render() -> str: return '
same
' event_id, event = await render_sse_event(render) repeated_id, repeated_event = await render_sse_event( render, last_event_id=event_id ) assert repeated_id == event_id assert event is not None assert repeated_event is None asyncio.run(run()) def test_app_refresh_broker_publishes_events() -> None: async def run() -> None: app = create_app() broker = get_refresh_broker(app) queue = broker.subscribe() broker.publish() event = await asyncio.wait_for(queue.get(), timeout=1) assert event == "refresh-event" broker.unsubscribe(queue) asyncio.run(run()) def test_render_stream_yields_on_connect_and_refresh() -> None: async def run() -> None: queue = RefreshBroker().subscribe() renders = 0 async def render() -> str: nonlocal renders renders += 1 return f'
{renders}
' stream = render_stream(queue, render) first = await anext(stream) await queue.put("refresh-event") second = await anext(stream) await stream.aclose() assert "1
" in first assert "2
" in second asyncio.run(run()) def test_render_dashboard_shows_dashboard_information_architecture( monkeypatch, tmp_path: Path ) -> None: db_path = tmp_path / "dashboard-render.db" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) async def run() -> None: app = create_app() body = str(await render_dashboard(app)) assert "Operational snapshot" in body assert "Running executions" in body assert "Published feeds" in body assert 'href="/sources"' in body assert 'href="/runs"' in body assert "Create source" in body asyncio.run(run()) def test_load_dashboard_view_measures_log_artifact_path( monkeypatch, tmp_path: Path ) -> None: db_path = tmp_path / "dashboard-footprint.db" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) create_app() out_dir = tmp_path / "out" log_dir = out_dir / "logs" cache_dir = out_dir / "httpcache" log_dir.mkdir(parents=True) cache_dir.mkdir(parents=True) (log_dir / "run.log").write_bytes(b"x" * 1024) (cache_dir / "cache.bin").write_bytes(b"y" * 2048) snapshot = load_dashboard_view(log_dir=log_dir)["snapshot"] assert cast(dict[str, str], snapshot)["artifact_footprint"] == "3.0 KB" def test_render_dashboard_describes_log_artifact_footprint( monkeypatch, tmp_path: Path ) -> None: db_path = tmp_path / "dashboard-footprint-copy.db" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) async def run() -> None: app = create_app() body = str(await render_dashboard(app)) assert "Current artifact size under the output path." in body asyncio.run(run()) def test_load_dashboard_view_lists_source_feed_artifacts( monkeypatch, tmp_path: Path ) -> None: db_path = tmp_path / "dashboard-feeds.db" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) app = create_app() out_dir = tmp_path / "out" log_dir = out_dir / "logs" app.config["REPUB_LOG_DIR"] = log_dir log_dir.mkdir(parents=True) create_source( name="Available source", slug="available-source", source_type="feed", notes="", spider_arguments="", enabled=False, cron_minute="*/5", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", feed_url="https://example.com/available.xml", ) create_source( name="Missing source", slug="missing-source", source_type="feed", notes="", spider_arguments="", enabled=False, cron_minute="*/5", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", feed_url="https://example.com/missing.xml", ) feed_dir = out_dir / "feeds" / "available-source" feed_dir.mkdir(parents=True) feed_path = feed_dir / "feed.rss" feed_path.write_bytes(b"x" * 1024) (feed_dir / "audio.mp3").write_bytes(b"y" * 2048) reference_time = datetime(2026, 3, 30, 12, 30, tzinfo=UTC) updated_at = reference_time - timedelta(minutes=32) updated_at_epoch = updated_at.timestamp() os.utime(feed_path, (updated_at_epoch, updated_at_epoch)) source_feeds = cast( tuple[dict[str, object], ...], load_dashboard_view(log_dir=log_dir, now=reference_time)["source_feeds"], ) assert source_feeds == ( { "source": "Available source", "slug": "available-source", "feed_href": "/feeds/available-source/feed.rss", "feed_status_label": "Available", "feed_status_tone": "done", "feed_exists": True, "last_updated": "32 minutes ago", "last_updated_iso": updated_at.isoformat(), "artifact_footprint": "3.0 KB", }, { "source": "Missing source", "slug": "missing-source", "feed_href": "/feeds/missing-source/feed.rss", "feed_status_label": "Missing", "feed_status_tone": "failed", "feed_exists": False, "last_updated": "Never published", "last_updated_iso": None, "artifact_footprint": "0 B", }, ) def test_render_dashboard_shows_source_feed_links_and_statuses( monkeypatch, tmp_path: Path ) -> None: db_path = tmp_path / "dashboard-feed-links.db" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) app = create_app() app.config["REPUB_LOG_DIR"] = tmp_path / "out" / "logs" create_source( name="Published source", slug="published-source", source_type="feed", notes="", spider_arguments="", enabled=False, cron_minute="*/5", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", feed_url="https://example.com/published.xml", ) create_source( name="Missing source", slug="missing-source", source_type="feed", notes="", spider_arguments="", enabled=False, cron_minute="*/5", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", feed_url="https://example.com/missing.xml", ) async def run() -> None: published_feed = tmp_path / "out" / "feeds" / "published-source" / "feed.rss" published_feed.parent.mkdir(parents=True) published_feed.write_text("\n", encoding="utf-8") body = str(await render_dashboard(app)) assert "Published feeds" in body assert 'href="/feeds/published-source/feed.rss"' in body assert 'href="/feeds/missing-source/feed.rss"' in body assert "Available" in body assert "Missing" in body assert "Never published" in body asyncio.run(run()) def test_render_sources_shows_table_and_create_link() -> None: async def run() -> None: body = str(await render_sources()) assert ">Sources<" in body assert 'href="/sources/create"' in body assert "guardian-feed" not in body assert "podcast-audio" not in body asyncio.run(run()) def test_render_create_source_shows_dedicated_form_page() -> None: async def run() -> None: body = str(await render_create_source()) assert ">Create source<" in body assert "Source and job setup" in body assert "data-signals__ifmissing" in body assert "/actions/sources/create" in body assert 'data-show="$sourceType === 'feed'"' in body assert 'data-show="$sourceType === 'pangea'"' in body assert "jobEnabled" in body assert "onlyNewest" in body assert "includeAuthors" in body assert "excludeMedia" in body assert "includeContent" in body assert "TEXT_ONLY" in body assert "breakingnews" in body assert "Pangea domain" in body assert "Feed URL" in body assert "Cron schedule" in body assert "Initial job state" in body assert "Pangea mobile articles" not in body assert "pangea-mobile" not in body assert "guardianproject.info" not in body assert ( "Primary Pangea mobile article mirror for the operator landing page." not in body ) assert "language=en,download_media=true" not in body assert "language=en\ndownload_media=true" in body assert 'value="articles"' in body assert 'value="10"' in body assert 'value="3"' in body assert 'value="*/30"' in body assert 'value="*"' in body asyncio.run(run()) def test_render_edit_source_shows_existing_values(monkeypatch, tmp_path: Path) -> None: db_path = tmp_path / "edit-page.db" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) create_app() create_source( name="Kenya health desk", slug="kenya-health", source_type="pangea", notes="Regional health alerts.", spider_arguments="language=en\ndownload_media=true", enabled=True, cron_minute="0", cron_hour="*/6", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", pangea_domain="example.org", pangea_category="Health", content_type="breakingnews", only_newest=True, max_articles=12, oldest_article=5, include_authors=True, exclude_media=False, include_content=True, content_format="MOBILE_3", ) async def run() -> None: body = str(await render_edit_source("kenya-health")) assert "Edit source" in body assert "/actions/sources/kenya-health/edit" in body assert "Kenya health desk" in body assert "kenya-health" in body assert 'id="source-slug"' in body assert ( 'id="source-slug" name="source-slug" type="text" value="kenya-health"' in body ) assert " disabled " in body assert "cursor-not-allowed bg-slate-100 text-slate-500" in body assert "example.org" in body assert "Health" in body assert "language=en\ndownload_media=true" in body asyncio.run(run()) def test_create_source_action_creates_pangea_source_and_job_in_database( monkeypatch, tmp_path: Path ) -> None: db_path = tmp_path / "sources.db" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) async def run() -> None: app = create_app() client = app.test_client() response = await client.post( "/actions/sources/create", headers={"Datastar-Request": "true"}, json={ "sourceName": "Kenya health desk", "sourceSlug": "kenya-health", "sourceType": "pangea", "pangeaDomain": "example.org", "pangeaCategory": "Health", "contentFormat": "MOBILE_3", "contentType": "breakingnews", "maxArticles": "12", "oldestArticle": "5", "sourceNotes": "Regional health alerts.", "spiderArguments": "language=en\ndownload_media=true", "cronMinute": "0", "cronHour": "*/6", "cronDayOfMonth": "*", "cronDayOfWeek": "*", "cronMonth": "*", "jobEnabled": True, "onlyNewest": True, "includeAuthors": True, "excludeMedia": False, }, ) body = await response.get_data(as_text=True) assert response.status_code == 200 assert "window.location = '/sources'" in body source = Source.get(Source.slug == "kenya-health") pangea = SourcePangea.get(SourcePangea.source == source) job = Job.get(Job.source == source) rendered_sources = str(await render_sources(app)) assert source.name == "Kenya health desk" assert source.source_type == "pangea" assert pangea.content_type == "breakingnews" assert pangea.include_content is True assert job.enabled is True assert job.spider_arguments == "language=en\ndownload_media=true" assert job.cron_hour == "*/6" assert "kenya-health" in rendered_sources assert "example.org / Health" in rendered_sources assert "Enabled" in rendered_sources asyncio.run(run()) def test_create_source_action_creates_feed_source_and_job_in_database( monkeypatch, tmp_path: Path ) -> None: db_path = tmp_path / "feed-sources.db" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) async def run() -> None: app = create_app() client = app.test_client() response = await client.post( "/actions/sources/create", headers={"Datastar-Request": "true"}, json={ "sourceName": "NASA feed", "sourceSlug": "nasa-feed", "sourceType": "feed", "feedUrl": "https://www.nasa.gov/rss/dyn/breaking_news.rss", "sourceNotes": "Primary NASA mirror.", "spiderArguments": "", "cronMinute": "30", "cronHour": "*", "cronDayOfMonth": "*", "cronDayOfWeek": "*", "cronMonth": "*", "jobEnabled": False, }, ) body = await response.get_data(as_text=True) assert response.status_code == 200 assert "window.location = '/sources'" in body source = Source.get(Source.slug == "nasa-feed") feed = SourceFeed.get(SourceFeed.source == source) job = Job.get(Job.source == source) rendered_sources = str(await render_sources(app)) assert source.source_type == "feed" assert feed.feed_url == "https://www.nasa.gov/rss/dyn/breaking_news.rss" assert job.enabled is False assert "nasa-feed" in rendered_sources assert "https://www.nasa.gov/rss/dyn/breaking_news.rss" in rendered_sources assert "Disabled" in rendered_sources asyncio.run(run()) def test_edit_source_action_updates_existing_source_and_job_in_database( monkeypatch, tmp_path: Path ) -> None: db_path = tmp_path / "edit-source.db" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) create_app() create_source( name="Kenya health desk", slug="kenya-health", source_type="pangea", notes="Regional health alerts.", spider_arguments="language=en\ndownload_media=true", enabled=True, cron_minute="0", cron_hour="*/6", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", pangea_domain="example.org", pangea_category="Health", content_type="breakingnews", only_newest=True, max_articles=12, oldest_article=5, include_authors=True, exclude_media=False, include_content=True, content_format="MOBILE_3", ) async def run() -> None: app = create_app() client = app.test_client() response = await client.post( "/actions/sources/kenya-health/edit", headers={"Datastar-Request": "true"}, json={ "sourceName": "Kenya health desk nightly", "sourceSlug": "kenya-health", "sourceType": "pangea", "pangeaDomain": "example.org", "pangeaCategory": "Nightly", "contentFormat": "TEXT_ONLY", "contentType": "articles", "maxArticles": "25", "oldestArticle": "7", "sourceNotes": "Updated nightly run.", "spiderArguments": "language=sw\ninclude_audio=false", "cronMinute": "15", "cronHour": "2", "cronDayOfMonth": "*", "cronDayOfWeek": "*", "cronMonth": "*", "jobEnabled": False, "onlyNewest": False, "includeAuthors": False, "excludeMedia": True, "includeContent": True, }, ) body = await response.get_data(as_text=True) assert response.status_code == 200 assert "window.location = '/sources'" in body source = Source.get(Source.slug == "kenya-health") pangea = SourcePangea.get(SourcePangea.source == source) job = Job.get(Job.source == source) rendered_sources = str(await render_sources(app)) assert source.name == "Kenya health desk nightly" assert source.notes == "Updated nightly run." assert pangea.category_name == "Nightly" assert pangea.content_format == "TEXT_ONLY" assert pangea.max_articles == 25 assert pangea.include_authors is False assert pangea.exclude_media is True assert job.enabled is False assert job.spider_arguments == "language=sw\ninclude_audio=false" assert job.cron_hour == "2" assert "Kenya health desk nightly" in rendered_sources assert "example.org / Nightly" in rendered_sources assert "Disabled" in rendered_sources asyncio.run(run()) def test_edit_source_action_rejects_slug_changes(monkeypatch, tmp_path: Path) -> None: db_path = tmp_path / "edit-invalid.db" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) create_app() create_source( name="Kenya health desk", slug="kenya-health", source_type="pangea", notes="Regional health alerts.", spider_arguments="language=en\ndownload_media=true", enabled=True, cron_minute="0", cron_hour="*/6", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", pangea_domain="example.org", pangea_category="Health", content_type="breakingnews", only_newest=True, max_articles=12, oldest_article=5, include_authors=True, exclude_media=False, include_content=True, content_format="MOBILE_3", ) async def run() -> None: app = create_app() client = app.test_client() response = await client.post( "/actions/sources/kenya-health/edit", headers={"Datastar-Request": "true"}, json={ "sourceName": "Kenya health desk", "sourceSlug": "kenya-health-renamed", "sourceType": "pangea", "pangeaDomain": "example.org", "pangeaCategory": "Health", "contentFormat": "MOBILE_3", "contentType": "breakingnews", "maxArticles": "12", "oldestArticle": "5", "sourceNotes": "Regional health alerts.", "spiderArguments": "language=en\ndownload_media=true", "cronMinute": "0", "cronHour": "*/6", "cronDayOfMonth": "*", "cronDayOfWeek": "*", "cronMonth": "*", "jobEnabled": True, "onlyNewest": True, "includeAuthors": True, "excludeMedia": False, "includeContent": True, }, ) body = await response.get_data(as_text=True) assert response.status_code == 200 assert "Slug is immutable." in body assert Source.get(Source.slug == "kenya-health").name == "Kenya health desk" assert Source.select().where(Source.slug == "kenya-health-renamed").count() == 0 asyncio.run(run()) def test_create_source_action_validates_duplicate_slug_and_pangea_type( monkeypatch, tmp_path: Path ) -> None: db_path = tmp_path / "duplicate.db" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) async def run() -> None: app = create_app() Source.create( name="Guardian feed mirror", slug="guardian-feed", source_type="feed", ) client = app.test_client() response = await client.post( "/actions/sources/create", headers={"Datastar-Request": "true"}, json={ "sourceName": "Duplicate guardian", "sourceSlug": "guardian-feed", "sourceType": "pangea", "pangeaDomain": "example.org", "pangeaCategory": "News", "contentFormat": "WEB", "contentType": "not-a-real-type", "maxArticles": "ten", "oldestArticle": "3", "cronMinute": "0", "cronHour": "*", "cronDayOfMonth": "*", "cronDayOfWeek": "*", "cronMonth": "*", "jobEnabled": True, }, ) body = await response.get_data(as_text=True) assert response.status_code == 200 assert "Slug must be unique." in body assert "Content format is invalid." in body assert "Content type is invalid." in body assert "Max articles must be an integer." in body assert Source.select().where(Source.name == "Duplicate guardian").count() == 0 asyncio.run(run()) def test_render_runs_shows_running_upcoming_and_completed_tables( monkeypatch, tmp_path: Path ) -> None: db_path = tmp_path / "runs-render.db" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) async def run() -> None: app = create_app() source = create_source( name="Runs render source", slug="runs-render-source", source_type="feed", notes="", spider_arguments="", enabled=True, cron_minute="*/30", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", feed_url="https://example.com/runs.xml", ) job = Job.get(Job.source == source) execution = JobExecution.create( job=job, running_status=JobExecutionStatus.SUCCEEDED, ) body = str(await render_runs(app)) assert "Running job executions" in body assert "Upcoming jobs" in body assert "Completed job executions" in body assert "runs-render-source" in body assert f"/job/{job.id}/execution/{execution.get_id()}/logs" in body assert "data-next-run-at" in body assert "in " in body assert "Already running" not in body asyncio.run(run()) def test_render_execution_logs_uses_app_route(monkeypatch, tmp_path: Path) -> None: db_path = tmp_path / "logs-render.db" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) async def run() -> None: log_dir = tmp_path / "out" / "logs" app = create_app() app.config["REPUB_LOG_DIR"] = log_dir source = create_source( name="Log render source", slug="log-render-source", source_type="feed", notes="", spider_arguments="", enabled=False, cron_minute="*/30", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", feed_url="https://example.com/logs.xml", ) job = Job.get(Job.source == source) execution = JobExecution.create( job=job, running_status=JobExecutionStatus.RUNNING, ) log_path = log_dir / f"job-{job.id}-execution-{execution.get_id()}.log" log_path.parent.mkdir(parents=True, exist_ok=True) log_path.write_text( "\n".join( ( "scheduler: run_now requested", "worker: starting simulated crawl", "worker: waiting for more log lines ...", ) ), encoding="utf-8", ) body = str( await render_execution_logs( app, job_id=job.id, execution_id=int(execution.get_id()) ) ) assert f"Job {job.id} / execution {execution.get_id()}" in body assert f"/job/{job.id}/execution/{execution.get_id()}/logs" in body assert "waiting for more log lines" in body asyncio.run(run())