create sources in memory

This commit is contained in:
Abel Luck 2026-03-30 13:23:36 +02:00
parent 9e826fcee8
commit 06066c2394
4 changed files with 392 additions and 46 deletions

View file

@ -272,12 +272,14 @@ def input_field(
value: str = "", value: str = "",
placeholder: str = "", placeholder: str = "",
help_text: str | None = None, help_text: str | None = None,
signal_name: str | None = None,
) -> Renderable: ) -> Renderable:
return h.div[ return h.div[
h.label(for_=field_id, class_="block text-sm font-medium text-slate-900")[ h.label(for_=field_id, class_="block text-sm font-medium text-slate-900")[
label label
], ],
h.input( h.input(
{"data-bind": signal_name} if signal_name is not None else {},
id=field_id, id=field_id,
name=field_id, name=field_id,
type="text", type="text",
@ -296,12 +298,14 @@ def select_field(
options: tuple[str, ...], options: tuple[str, ...],
selected: str, selected: str,
help_text: str | None = None, help_text: str | None = None,
signal_name: str | None = None,
) -> Renderable: ) -> Renderable:
return h.div[ return h.div[
h.label(for_=field_id, class_="block text-sm font-medium text-slate-900")[ h.label(for_=field_id, class_="block text-sm font-medium text-slate-900")[
label label
], ],
h.select( h.select(
{"data-bind": signal_name} if signal_name is not None else {},
id=field_id, id=field_id,
name=field_id, name=field_id,
class_="mt-2 block w-full rounded-2xl border-0 bg-white px-3.5 py-2.5 text-sm text-slate-900 shadow-sm ring-1 ring-slate-200 focus:outline-hidden focus:ring-2 focus:ring-amber-500", class_="mt-2 block w-full rounded-2xl border-0 bg-white px-3.5 py-2.5 text-sm text-slate-900 shadow-sm ring-1 ring-slate-200 focus:outline-hidden focus:ring-2 focus:ring-amber-500",
@ -316,13 +320,19 @@ def select_field(
def textarea_field( def textarea_field(
*, label: str, field_id: str, value: str, rows: str = "4" *,
label: str,
field_id: str,
value: str,
rows: str = "4",
signal_name: str | None = None,
) -> Renderable: ) -> Renderable:
return h.div[ return h.div[
h.label(for_=field_id, class_="block text-sm font-medium text-slate-900")[ h.label(for_=field_id, class_="block text-sm font-medium text-slate-900")[
label label
], ],
h.textarea( h.textarea(
{"data-bind": signal_name} if signal_name is not None else {},
id=field_id, id=field_id,
name=field_id, name=field_id,
rows=rows, rows=rows,

View file

@ -1,5 +1,7 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Mapping
import htpy as h import htpy as h
from htpy import Node, Renderable from htpy import Node, Renderable
@ -17,7 +19,28 @@ from repub.components import (
toggle_field, toggle_field,
) )
SOURCES: tuple[dict[str, str], ...] = ( PANGEA_CONTENT_FORMATS = (
"WTF_0",
"TEXT_ONLY",
"WTF_1",
"MOBILE_1",
"MOBILE_2",
"MOBILE_3",
"WTF_2",
"XML_TX",
"JSON",
)
PANGEA_CONTENT_TYPES = (
"articles",
"audioclips",
"videoclips",
"breakingnews",
"mostpopular",
"topstories",
)
DEFAULT_SOURCES: tuple[dict[str, str], ...] = (
{ {
"name": "Guardian feed mirror", "name": "Guardian feed mirror",
"slug": "guardian-feed", "slug": "guardian-feed",
@ -51,22 +74,27 @@ SOURCES: tuple[dict[str, str], ...] = (
) )
def _source_row(source: dict[str, str]) -> tuple[Node, ...]: def _source_row(source: Mapping[str, object]) -> tuple[Node, ...]:
return ( return (
h.div[ h.div[
h.div(class_="font-semibold text-slate-950")[source["name"]], h.div(class_="font-semibold text-slate-950")[str(source["name"])],
h.p(class_="mt-1 font-mono text-xs text-slate-500")[source["slug"]], h.p(class_="mt-1 font-mono text-xs text-slate-500")[str(source["slug"])],
], ],
h.p(class_="font-medium whitespace-nowrap text-slate-900")[ h.p(class_="font-medium whitespace-nowrap text-slate-900")[
source["source_type"] str(source["source_type"])
], ],
h.p(class_="max-w-sm truncate font-mono text-xs text-slate-600")[ h.p(class_="max-w-sm truncate font-mono text-xs text-slate-600")[
source["upstream"] str(source["upstream"])
],
h.p(class_="font-medium whitespace-nowrap text-slate-900")[
str(source["schedule"])
], ],
h.p(class_="font-medium whitespace-nowrap text-slate-900")[source["schedule"]],
h.div(class_="min-w-32 whitespace-normal")[ h.div(class_="min-w-32 whitespace-normal")[
status_badge(label=source["state"], tone=source["state_tone"]), status_badge(
h.p(class_="mt-2 text-xs text-slate-500")[source["last_run"]], label=str(source["state"]),
tone=str(source["state_tone"]),
),
h.p(class_="mt-2 text-xs text-slate-500")[str(source["last_run"])],
], ],
h.div(class_="flex flex-nowrap items-center gap-3")[ h.div(class_="flex flex-nowrap items-center gap-3")[
inline_link(href="/sources/create", label="Edit", tone="amber"), inline_link(href="/sources/create", label="Edit", tone="amber"),
@ -75,8 +103,10 @@ def _source_row(source: dict[str, str]) -> tuple[Node, ...]:
) )
def sources_table() -> Renderable: def sources_table(
rows = tuple(_source_row(source) for source in SOURCES) *, sources: tuple[Mapping[str, object], ...] | None = None
) -> Renderable:
rows = tuple(_source_row(source) for source in (sources or DEFAULT_SOURCES))
return table_section( return table_section(
eyebrow="Inventory", eyebrow="Inventory",
title="Sources", title="Sources",
@ -87,18 +117,20 @@ def sources_table() -> Renderable:
) )
def sources_page() -> Renderable: def sources_page(
*, sources: tuple[Mapping[str, object], ...] | None = None
) -> Renderable:
return page_shell( return page_shell(
current_path="/sources", current_path="/sources",
eyebrow="Source management", eyebrow="Source management",
title="Sources", title="Sources",
description="Configured feed and Pangea sources live here as tables, with clear schedule and job state visibility instead of card-based CRUD.", description="Configured feed and Pangea sources live here as tables, with clear schedule and job state visibility instead of card-based CRUD.",
actions=header_action_link(href="/sources/create", label="Create source"), actions=header_action_link(href="/sources/create", label="Create source"),
content=sources_table(), content=sources_table(sources=sources),
) )
def create_source_form() -> Renderable: def create_source_form(*, action_path: str = "/actions/sources/create") -> Renderable:
return section_card( return section_card(
content=( content=(
h.div( h.div(
@ -118,20 +150,40 @@ def create_source_form() -> Renderable:
status_badge(label="New source", tone="scheduled"), status_badge(label="New source", tone="scheduled"),
], ],
h.form( h.form(
{"data-signals__ifmissing": "{sourceType: 'pangea'}"}, {
"data-signals": "{_formError: '', _formSuccess: ''}",
"data-signals__ifmissing": "{sourceType: 'pangea'}",
"data-on:submit": f"@post('{action_path}')",
},
class_="mt-5 space-y-6", class_="mt-5 space-y-6",
)[ )[
h.div(
{
"data-show": "$_formError !== ''",
"data-text": "$_formError",
},
class_="rounded-2xl bg-rose-50 px-4 py-3 text-sm font-medium text-rose-800",
),
h.div(
{
"data-show": "$_formSuccess !== ''",
"data-text": "$_formSuccess",
},
class_="rounded-2xl bg-emerald-100 px-4 py-3 text-sm font-medium text-emerald-800",
),
h.div(class_="grid gap-4 md:grid-cols-2")[ h.div(class_="grid gap-4 md:grid-cols-2")[
input_field( input_field(
label="Source name", label="Source name",
field_id="source-name", field_id="source-name",
value="Pangea mobile articles", value="Pangea mobile articles",
signal_name="sourceName",
), ),
input_field( input_field(
label="Slug", label="Slug",
field_id="source-slug", field_id="source-slug",
value="pangea-mobile", value="pangea-mobile",
help_text="Immutable after creation.", help_text="Immutable after creation.",
signal_name="sourceSlug",
), ),
h.div[ h.div[
h.label( h.label(
@ -169,6 +221,7 @@ def create_source_form() -> Renderable:
label="Feed URL", label="Feed URL",
field_id="feed-url", field_id="feed-url",
placeholder="https://example.com/feed.xml", placeholder="https://example.com/feed.xml",
signal_name="feedUrl",
), ),
], ],
], ],
@ -192,101 +245,42 @@ def create_source_form() -> Renderable:
label="Pangea domain", label="Pangea domain",
field_id="pangea-domain", field_id="pangea-domain",
value="guardianproject.info", value="guardianproject.info",
signal_name="pangeaDomain",
), ),
input_field( input_field(
label="Category name", label="Category name",
field_id="pangea-category", field_id="pangea-category",
value="News", value="News",
signal_name="pangeaCategory",
), ),
select_field( select_field(
label="Content format", label="Content format",
field_id="content-format", field_id="content-format",
options=("MOBILE_3", "MOBILE_2", "WEB"), options=PANGEA_CONTENT_FORMATS,
selected="MOBILE_3", selected="MOBILE_3",
signal_name="contentFormat",
), ),
input_field( select_field(
label="Content type", label="Content type",
field_id="content-type", field_id="content-type",
value="articles", options=PANGEA_CONTENT_TYPES,
selected="articles",
signal_name="contentType",
), ),
input_field( input_field(
label="Max articles", label="Max articles",
field_id="max-articles", field_id="max-articles",
value="10", value="10",
signal_name="maxArticles",
), ),
input_field( input_field(
label="Oldest article (days)", label="Oldest article (days)",
field_id="oldest-article", field_id="oldest-article",
value="3", value="3",
signal_name="oldestArticle",
), ),
], ],
], h.div(class_="grid gap-4 lg:grid-cols-3")[
h.div(class_="grid gap-4 lg:grid-cols-2")[
textarea_field(
label="Notes",
field_id="source-notes",
value="Primary Pangea mobile article mirror for the operator landing page.",
),
textarea_field(
label="Spider arguments",
field_id="spider-arguments",
value="language=en,download_media=true",
),
],
h.div(
class_="grid gap-6 xl:grid-cols-[minmax(0,1.3fr)_minmax(20rem,0.9fr)]"
)[
h.div(class_="rounded-[1.5rem] bg-stone-50 p-5")[
h.div[
h.h3(class_="text-lg font-semibold text-slate-950")[
"Cron schedule"
],
h.p(class_="mt-1 text-sm text-slate-600")[
"Stored in UTC and displayed in the browser timezone."
],
],
h.div(class_="mt-5 grid gap-4 sm:grid-cols-2 xl:grid-cols-5")[
input_field(
label="Minute",
field_id="cron-minute",
value="15",
),
input_field(
label="Hour",
field_id="cron-hour",
value="*/4",
),
input_field(
label="Day of month",
field_id="cron-day-of-month",
value="*",
),
input_field(
label="Day of week",
field_id="cron-day-of-week",
value="1-6",
),
input_field(
label="Month",
field_id="cron-month",
value="*",
),
],
],
h.div(class_="rounded-[1.5rem] bg-stone-50 p-5")[
h.p(
class_="text-xs font-semibold uppercase tracking-[0.22em] text-amber-600"
)["Job defaults"],
h.h3(class_="mt-2 text-lg font-semibold text-slate-950")[
"Initial job state"
],
h.div(class_="mt-5 grid gap-4")[
toggle_field(
label="Job enabled",
description="Scheduler will consider the new job immediately after creation.",
signal_name="jobEnabled",
checked=True,
),
toggle_field( toggle_field(
label="Only newest", label="Only newest",
description="Limit Pangea syncs to the newest material available in the selected category.", description="Limit Pangea syncs to the newest material available in the selected category.",
@ -307,13 +301,88 @@ def create_source_form() -> Renderable:
), ),
], ],
], ],
h.div(class_="grid gap-4 lg:grid-cols-2")[
textarea_field(
label="Notes",
field_id="source-notes",
value="Primary Pangea mobile article mirror for the operator landing page.",
signal_name="sourceNotes",
),
textarea_field(
label="Spider arguments",
field_id="spider-arguments",
value="language=en,download_media=true",
signal_name="spiderArguments",
),
],
h.div(
class_="grid gap-6 xl:grid-cols-[minmax(0,1.3fr)_minmax(20rem,0.9fr)]"
)[
h.div(class_="rounded-[1.5rem] bg-stone-50 p-5")[
h.div[
h.h3(class_="text-lg font-semibold text-slate-950")[
"Cron schedule"
],
h.p(class_="mt-1 text-sm text-slate-600")[
"Stored in UTC and displayed in the browser timezone."
],
],
h.div(class_="mt-5 grid gap-4 sm:grid-cols-2 xl:grid-cols-5")[
input_field(
label="Minute",
field_id="cron-minute",
value="15",
signal_name="cronMinute",
),
input_field(
label="Hour",
field_id="cron-hour",
value="*/4",
signal_name="cronHour",
),
input_field(
label="Day of month",
field_id="cron-day-of-month",
value="*",
signal_name="cronDayOfMonth",
),
input_field(
label="Day of week",
field_id="cron-day-of-week",
value="1-6",
signal_name="cronDayOfWeek",
),
input_field(
label="Month",
field_id="cron-month",
value="*",
signal_name="cronMonth",
),
],
],
h.div(class_="rounded-[1.5rem] bg-stone-50 p-5")[
h.p(
class_="text-xs font-semibold uppercase tracking-[0.22em] text-amber-600"
)["Job defaults"],
h.h3(class_="mt-2 text-lg font-semibold text-slate-950")[
"Initial job state"
],
h.div(class_="mt-5 grid gap-4")[
toggle_field(
label="Job enabled",
description="Scheduler will consider the new job immediately after creation.",
signal_name="jobEnabled",
checked=True,
),
],
],
], ],
h.div( h.div(
class_="flex flex-wrap justify-end gap-3 border-t border-slate-200 pt-6" class_="flex flex-wrap justify-end gap-3 border-t border-slate-200 pt-6"
)[ )[
muted_action_link(href="/sources", label="Cancel"), muted_action_link(href="/sources", label="Cancel"),
h.button( h.button(
type="button", type="submit",
class_="rounded-full bg-slate-950 px-4 py-2.5 text-sm font-semibold text-white transition hover:bg-slate-800", class_="rounded-full bg-slate-950 px-4 py-2.5 text-sm font-semibold text-white transition hover:bg-slate-800",
)["Create source"], )["Create source"],
], ],
@ -322,7 +391,7 @@ def create_source_form() -> Renderable:
) )
def create_source_page() -> Renderable: def create_source_page(*, action_path: str = "/actions/sources/create") -> Renderable:
actions = ( actions = (
muted_action_link(href="/sources", label="Back to sources"), muted_action_link(href="/sources", label="Back to sources"),
header_action_link(href="/runs", label="View runs"), header_action_link(href="/runs", label="View runs"),
@ -333,5 +402,5 @@ def create_source_page() -> Renderable:
title="Create source", title="Create source",
description="Dedicated create page for the source form. The list page stays focused on scanning existing sources, while this page handles the new source and job configuration flow.", description="Dedicated create page for the source form. The list page stays focused on scanning existing sources, while this page handles the new source and job configuration flow.",
actions=actions, actions=actions,
content=create_source_form(), content=create_source_form(action_path=action_path),
) )

View file

@ -4,14 +4,17 @@ import asyncio
import hashlib import hashlib
from collections.abc import AsyncGenerator, Awaitable, Callable from collections.abc import AsyncGenerator, Awaitable, Callable
from typing import cast from typing import cast
from urllib.parse import urlparse
import htpy as h import htpy as h
from datastar_py.quart import DatastarResponse from datastar_py import ServerSentEventGenerator as SSE
from datastar_py.quart import DatastarResponse, read_signals
from datastar_py.sse import DatastarEvent from datastar_py.sse import DatastarEvent
from htpy import Renderable from htpy import Renderable
from quart import Quart, Response, request, url_for from quart import Quart, Response, request, url_for
from repub.datastar import RefreshBroker, render_stream from repub.datastar import RefreshBroker, render_stream
from repub.model import initialize_database
from repub.pages import ( from repub.pages import (
create_source_page, create_source_page,
dashboard_page, dashboard_page,
@ -20,8 +23,14 @@ from repub.pages import (
shim_page, shim_page,
sources_page, sources_page,
) )
from repub.pages.sources import (
DEFAULT_SOURCES,
PANGEA_CONTENT_FORMATS,
PANGEA_CONTENT_TYPES,
)
REFRESH_BROKER_KEY = "repub.refresh_broker" REFRESH_BROKER_KEY = "repub.refresh_broker"
SOURCES_KEY = "repub.sources"
RenderFunction = Callable[[], Awaitable[Renderable]] RenderFunction = Callable[[], Awaitable[Renderable]]
@ -38,7 +47,9 @@ def _render_shim_page(*, stylesheet_href: str, datastar_src: str) -> tuple[str,
def create_app() -> Quart: def create_app() -> Quart:
app = Quart(__name__) app = Quart(__name__)
app.config["REPUB_DB_PATH"] = str(initialize_database())
app.extensions[REFRESH_BROKER_KEY] = RefreshBroker() app.extensions[REFRESH_BROKER_KEY] = RefreshBroker()
app.extensions[SOURCES_KEY] = _default_sources_dict()
@app.get("/") @app.get("/")
@app.get("/sources") @app.get("/sources")
@ -68,11 +79,28 @@ def create_app() -> Quart:
@app.post("/sources") @app.post("/sources")
async def sources_patch() -> DatastarResponse: async def sources_patch() -> DatastarResponse:
return _page_patch_response(app, render_sources) return _page_patch_response(app, lambda: render_sources(app))
@app.post("/sources/create") @app.post("/sources/create")
async def create_source_patch() -> DatastarResponse: async def create_source_patch() -> DatastarResponse:
return _page_patch_response(app, render_create_source) return _page_patch_response(app, lambda: render_create_source(app))
@app.post("/actions/sources/create")
async def create_source_action() -> DatastarResponse:
signals = cast(dict[str, object], await read_signals())
source, error = validate_source_form(
signals,
existing_sources=get_sources_dict(app),
)
if error is not None:
return DatastarResponse(
SSE.patch_signals({"_formError": error, "_formSuccess": ""})
)
assert source is not None
get_sources_dict(app)[str(source["slug"])] = source
trigger_refresh(app)
return DatastarResponse(SSE.redirect("/sources"))
@app.post("/runs") @app.post("/runs")
async def runs_patch() -> DatastarResponse: async def runs_patch() -> DatastarResponse:
@ -100,11 +128,17 @@ async def render_dashboard() -> Renderable:
return dashboard_page() return dashboard_page()
async def render_sources() -> Renderable: def get_sources_dict(app: Quart) -> dict[str, dict[str, object]]:
return sources_page() return cast(dict[str, dict[str, object]], app.extensions[SOURCES_KEY])
async def render_create_source() -> Renderable: async def render_sources(app: Quart | None = None) -> Renderable:
sources = None if app is None else tuple(get_sources_dict(app).values())
return sources_page(sources=sources)
async def render_create_source(app: Quart | None = None) -> Renderable:
del app
return create_source_page() return create_source_page()
@ -134,3 +168,139 @@ async def _unsubscribe_on_close(
yield event yield event
finally: finally:
get_refresh_broker(app).unsubscribe(cast(asyncio.Queue[object], queue)) get_refresh_broker(app).unsubscribe(cast(asyncio.Queue[object], queue))
def _default_sources_dict() -> dict[str, dict[str, object]]:
return {source["slug"]: dict(source) for source in DEFAULT_SOURCES}
def validate_source_form(
signals: dict[str, object] | None,
*,
existing_sources: dict[str, dict[str, object]],
) -> tuple[dict[str, object] | None, str | None]:
if signals is None:
return None, "Missing form data."
source_name = _read_string(signals, "sourceName")
source_slug = _read_string(signals, "sourceSlug")
source_type = _read_string(signals, "sourceType")
feed_url = _read_string(signals, "feedUrl")
pangea_domain = _read_string(signals, "pangeaDomain")
pangea_category = _read_string(signals, "pangeaCategory")
content_format = _read_string(signals, "contentFormat")
content_type = _read_string(signals, "contentType")
max_articles = _read_string(signals, "maxArticles")
oldest_article = _read_string(signals, "oldestArticle")
source_notes = _read_string(signals, "sourceNotes")
spider_arguments = _read_string(signals, "spiderArguments")
cron_minute = _read_string(signals, "cronMinute")
cron_hour = _read_string(signals, "cronHour")
cron_day_of_month = _read_string(signals, "cronDayOfMonth")
cron_day_of_week = _read_string(signals, "cronDayOfWeek")
cron_month = _read_string(signals, "cronMonth")
errors: list[str] = []
if source_name == "":
errors.append("Source name is required.")
if source_slug == "":
errors.append("Slug is required.")
elif source_slug in existing_sources:
errors.append("Slug must be unique.")
if source_type not in {"feed", "pangea"}:
errors.append("Source type must be feed or pangea.")
if source_type == "feed":
if feed_url == "":
errors.append("Feed URL is required for feed sources.")
elif not _is_valid_url(feed_url):
errors.append("Feed URL must be a valid URL.")
if source_type == "pangea":
if pangea_domain == "":
errors.append("Pangea domain is required.")
if pangea_category == "":
errors.append("Category name is required.")
if content_format not in PANGEA_CONTENT_FORMATS:
errors.append("Content format is invalid.")
if content_type not in PANGEA_CONTENT_TYPES:
errors.append("Content type is invalid.")
if _parse_int(max_articles) is None:
errors.append("Max articles must be an integer.")
if _parse_int(oldest_article) is None:
errors.append("Oldest article must be an integer.")
cron_values = (
cron_minute,
cron_hour,
cron_day_of_month,
cron_day_of_week,
cron_month,
)
if any(value == "" for value in cron_values):
errors.append("All cron fields are required.")
if errors:
return None, " ".join(errors)
enabled = _read_bool(signals, "jobEnabled")
source = {
"name": source_name,
"slug": source_slug,
"source_type": "Feed" if source_type == "feed" else "Pangea",
"upstream": (
feed_url
if source_type == "feed"
else f"{pangea_domain} / {pangea_category}"
),
"schedule": f"cron: {cron_minute} {cron_hour} {cron_day_of_month} {cron_month} {cron_day_of_week}",
"last_run": "Never run",
"state": "Enabled" if enabled else "Disabled",
"state_tone": "scheduled" if enabled else "idle",
"notes": source_notes,
"spider_arguments": spider_arguments,
"source_kind": source_type,
"feed_url": feed_url,
"pangea_domain": pangea_domain,
"pangea_category": pangea_category,
"content_format": content_format,
"content_type": content_type,
"max_articles": max_articles,
"oldest_article": oldest_article,
"job_enabled": enabled,
"only_newest": _read_bool(signals, "onlyNewest"),
"include_authors": _read_bool(signals, "includeAuthors"),
"exclude_media": _read_bool(signals, "excludeMedia"),
"cron_minute": cron_minute,
"cron_hour": cron_hour,
"cron_day_of_month": cron_day_of_month,
"cron_day_of_week": cron_day_of_week,
"cron_month": cron_month,
}
return source, None
def _read_string(signals: dict[str, object], key: str) -> str:
return str(signals.get(key, "")).strip()
def _read_bool(signals: dict[str, object], key: str) -> bool:
value = signals.get(key, False)
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.lower() in {"true", "1", "on", "yes"}
return bool(value)
def _parse_int(value: str) -> int | None:
try:
return int(value)
except ValueError:
return None
def _is_valid_url(value: str) -> bool:
parsed = urlparse(value)
return parsed.scheme in {"http", "https"} and parsed.netloc != ""

View file

@ -1,12 +1,14 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from pathlib import Path
from typing import Any, cast from typing import Any, cast
from repub.datastar import RefreshBroker, render_sse_event, render_stream from repub.datastar import RefreshBroker, render_sse_event, render_stream
from repub.web import ( from repub.web import (
create_app, create_app,
get_refresh_broker, get_refresh_broker,
get_sources_dict,
render_create_source, render_create_source,
render_dashboard, render_dashboard,
render_execution_logs, render_execution_logs,
@ -38,6 +40,17 @@ def test_root_get_serves_datastar_shim() -> None:
asyncio.run(run()) asyncio.run(run())
def test_create_app_bootstraps_default_database_path(
monkeypatch, tmp_path: Path
) -> None:
monkeypatch.chdir(tmp_path)
app = create_app()
assert Path(app.config["REPUB_DB_PATH"]) == tmp_path / "republisher.db"
assert (tmp_path / "republisher.db").exists()
def test_root_get_honors_if_none_match() -> None: def test_root_get_honors_if_none_match() -> None:
async def run() -> None: async def run() -> None:
client = create_app().test_client() client = create_app().test_client()
@ -161,12 +174,15 @@ def test_render_create_source_shows_dedicated_form_page() -> None:
assert "Dedicated create page for the source form" in body assert "Dedicated create page for the source form" in body
assert "Source and job setup" in body assert "Source and job setup" in body
assert "data-signals__ifmissing" in body assert "data-signals__ifmissing" in body
assert "/actions/sources/create" in body
assert 'data-show="$sourceType === 'feed'"' in body assert 'data-show="$sourceType === 'feed'"' in body
assert 'data-show="$sourceType === 'pangea'"' in body assert 'data-show="$sourceType === 'pangea'"' in body
assert "jobEnabled" in body assert "jobEnabled" in body
assert "onlyNewest" in body assert "onlyNewest" in body
assert "includeAuthors" in body assert "includeAuthors" in body
assert "excludeMedia" in body assert "excludeMedia" in body
assert "TEXT_ONLY" in body
assert "breakingnews" in body
assert "Pangea domain" in body assert "Pangea domain" in body
assert "Feed URL" in body assert "Feed URL" in body
assert "Cron schedule" in body assert "Cron schedule" in body
@ -175,6 +191,87 @@ def test_render_create_source_shows_dedicated_form_page() -> None:
asyncio.run(run()) asyncio.run(run())
def test_create_source_action_adds_new_source_to_in_memory_store() -> None:
async def run() -> None:
app = create_app()
client = app.test_client()
response = await client.post(
"/actions/sources/create",
headers={"Datastar-Request": "true"},
json={
"sourceName": "Kenya health desk",
"sourceSlug": "kenya-health",
"sourceType": "pangea",
"pangeaDomain": "example.org",
"pangeaCategory": "Health",
"contentFormat": "MOBILE_3",
"contentType": "breakingnews",
"maxArticles": "12",
"oldestArticle": "5",
"sourceNotes": "Regional health alerts.",
"spiderArguments": "language=en",
"cronMinute": "0",
"cronHour": "*/6",
"cronDayOfMonth": "*",
"cronDayOfWeek": "*",
"cronMonth": "*",
"jobEnabled": True,
"onlyNewest": True,
"includeAuthors": True,
"excludeMedia": False,
},
)
body = await response.get_data(as_text=True)
assert response.status_code == 200
assert "window.location = '/sources'" in body
assert "kenya-health" in get_sources_dict(app)
assert get_sources_dict(app)["kenya-health"]["content_type"] == "breakingnews"
asyncio.run(run())
def test_create_source_action_validates_duplicate_slug_and_pangea_type() -> None:
async def run() -> None:
app = create_app()
client = app.test_client()
response = await client.post(
"/actions/sources/create",
headers={"Datastar-Request": "true"},
json={
"sourceName": "Duplicate guardian",
"sourceSlug": "guardian-feed",
"sourceType": "pangea",
"pangeaDomain": "example.org",
"pangeaCategory": "News",
"contentFormat": "WEB",
"contentType": "not-a-real-type",
"maxArticles": "ten",
"oldestArticle": "3",
"cronMinute": "0",
"cronHour": "*",
"cronDayOfMonth": "*",
"cronDayOfWeek": "*",
"cronMonth": "*",
"jobEnabled": True,
},
)
body = await response.get_data(as_text=True)
assert response.status_code == 200
assert "Slug must be unique." in body
assert "Content format is invalid." in body
assert "Content type is invalid." in body
assert "Max articles must be an integer." in body
assert "Duplicate guardian" not in {
str(source["name"]) for source in get_sources_dict(app).values()
}
asyncio.run(run())
def test_render_runs_shows_running_upcoming_and_completed_tables() -> None: def test_render_runs_shows_running_upcoming_and_completed_tables() -> None:
async def run() -> None: async def run() -> None:
body = str(await render_runs()) body = str(await render_runs())