Add settings and live sidebar counts

This commit is contained in:
Abel Luck 2026-03-30 18:26:02 +02:00
parent 2a99edeec3
commit a809bde16c
16 changed files with 696 additions and 51 deletions

View file

@ -39,7 +39,9 @@ def nav_link(
]
def admin_sidebar(*, current_path: str) -> Renderable:
def admin_sidebar(
*, current_path: str, source_count: int = 0, running_count: int = 0
) -> Renderable:
return h.aside(
class_="relative overflow-hidden bg-slate-950 px-6 py-8 text-white lg:min-h-screen"
)[
@ -68,14 +70,20 @@ def admin_sidebar(*, current_path: str) -> Renderable:
label="Sources",
href="/sources",
active=current_path.startswith("/sources"),
badge="12",
badge=str(source_count),
),
nav_link(
label="Runs",
href="/runs",
active=current_path.startswith("/runs")
or current_path.startswith("/job/"),
badge="3",
badge=str(running_count),
),
nav_link(
label="Settings",
href="/settings",
active=current_path.startswith("/settings"),
badge="App",
),
],
h.div(class_="mt-auto rounded-3xl bg-white/5 p-5 ring-1 ring-white/10")[
@ -148,13 +156,19 @@ def page_shell(
title: str,
description: str | None = None,
actions: Node | None = None,
source_count: int = 0,
running_count: int = 0,
content: Node,
) -> Renderable:
return h.main(
id="morph",
class_="min-h-screen lg:grid lg:grid-cols-[18rem_minmax(0,1fr)]",
)[
admin_sidebar(current_path=current_path),
admin_sidebar(
current_path=current_path,
source_count=source_count,
running_count=running_count,
),
h.div(class_="px-4 py-4 sm:px-5 lg:px-6 lg:py-5")[
h.div(class_="mx-auto max-w-7xl space-y-5")[
h.section[

View file

@ -180,6 +180,8 @@ def build_feed_settings(
*,
out_dir: Path,
feed_slug: str,
convert_images: bool = True,
convert_video: bool = True,
) -> Settings:
feed_dir = feed_output_dir(out_dir=out_dir, feed_slug=feed_slug)
image_dir = base_settings.get("REPUBLISHER_IMAGE_DIR", IMAGE_DIR)
@ -187,14 +189,20 @@ def build_feed_settings(
audio_dir = base_settings.get("REPUBLISHER_AUDIO_DIR", AUDIO_DIR)
file_dir = base_settings.get("REPUBLISHER_FILE_DIR", FILE_DIR)
item_pipelines = dict(base_settings.getdict("ITEM_PIPELINES"))
item_pipelines.pop("repub.pipelines.ImagePipeline", None)
item_pipelines.pop("repub.pipelines.AudioPipeline", None)
item_pipelines.pop("repub.pipelines.VideoPipeline", None)
item_pipelines.pop("repub.pipelines.FilePipeline", None)
item_pipelines.update(
{
"repub.pipelines.ImagePipeline": 1,
"repub.pipelines.AudioPipeline": 2,
"repub.pipelines.VideoPipeline": 3,
"repub.pipelines.FilePipeline": 4,
}
)
if convert_images:
item_pipelines["repub.pipelines.ImagePipeline"] = 1
if convert_video:
item_pipelines["repub.pipelines.VideoPipeline"] = 3
settings = base_settings.copy()
settings.setdict(
{

View file

@ -186,6 +186,8 @@ class JobSourceConfig:
source_slug: str
source_type: str
spider_arguments: dict[str, str]
convert_images: bool = True
convert_video: bool = True
feed_url: str | None = None
pangea_domain: str | None = None
pangea_category: str | None = None
@ -267,6 +269,8 @@ def main(argv: list[str] | None = None) -> int:
out_dir=out_dir,
feed=feed,
stats_path=stats_path,
convert_images=source_config.convert_images,
convert_video=source_config.convert_video,
)
)
print(
@ -326,6 +330,8 @@ def _load_job_source_config(*, db_path: str, job_id: int) -> JobSourceConfig:
source_slug=source.slug,
source_type=source.source_type,
spider_arguments=spider_arguments,
convert_images=bool(job.convert_images),
convert_video=bool(job.convert_video),
feed_url=feed.feed_url,
)
@ -339,6 +345,8 @@ def _load_job_source_config(*, db_path: str, job_id: int) -> JobSourceConfig:
source_slug=source.slug,
source_type=source.source_type,
spider_arguments=spider_arguments,
convert_images=bool(job.convert_images),
convert_video=bool(job.convert_video),
pangea_domain=pangea.domain,
pangea_category=pangea.category_name,
content_type=pangea.content_type,
@ -409,7 +417,14 @@ def _resolve_feed(
)
def _build_crawl_settings(*, out_dir: Path, feed: FeedConfig, stats_path: Path):
def _build_crawl_settings(
*,
out_dir: Path,
feed: FeedConfig,
stats_path: Path,
convert_images: bool = True,
convert_video: bool = True,
):
base_settings = build_base_settings(
RepublisherConfig(
config_path=out_dir / "job-runner.toml",
@ -419,7 +434,13 @@ def _build_crawl_settings(*, out_dir: Path, feed: FeedConfig, stats_path: Path):
)
)
prepare_output_dirs(out_dir, feed.slug)
settings = build_feed_settings(base_settings, out_dir=out_dir, feed_slug=feed.slug)
settings = build_feed_settings(
base_settings,
out_dir=out_dir,
feed_slug=feed.slug,
convert_images=convert_images,
convert_video=convert_video,
)
settings.set("LOG_FILE", None, priority="cmdline")
settings.set(
"STATS_CLASS",

View file

@ -5,6 +5,7 @@ import os
import signal
import subprocess
import sys
import threading
import time
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
@ -15,7 +16,15 @@ from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from repub.config import feed_output_dir, feed_output_path
from repub.model import Job, JobExecution, JobExecutionStatus, Source, database, utc_now
from repub.model import (
Job,
JobExecution,
JobExecutionStatus,
Source,
database,
load_max_concurrent_jobs,
utc_now,
)
SCHEDULER_JOB_PREFIX = "job-"
POLL_JOB_ID = "runtime-poll-workers"
@ -105,6 +114,7 @@ class JobRuntime:
self.graceful_stop_seconds = graceful_stop_seconds
self.scheduler = BackgroundScheduler(timezone=UTC)
self._workers: dict[int, RunningWorker] = {}
self._run_lock = threading.Lock()
self._started = False
def start(self) -> None:
@ -178,28 +188,32 @@ class JobRuntime:
def run_job_now(self, job_id: int, *, reason: str) -> int | None:
del reason
self.start()
with database.connection_context():
job = Job.get_or_none(id=job_id)
if job is None:
return None
with self._run_lock:
with database.connection_context():
job = Job.get_or_none(id=job_id)
if job is None:
return None
already_running = (
JobExecution.select()
.where(
(JobExecution.job == job)
& (JobExecution.running_status == JobExecutionStatus.RUNNING)
if self._max_concurrent_jobs_reached():
return None
already_running = (
JobExecution.select()
.where(
(JobExecution.job == job)
& (JobExecution.running_status == JobExecutionStatus.RUNNING)
)
.exists()
)
.exists()
)
if already_running:
return None
if already_running:
return None
execution = JobExecution.create(
job=job,
started_at=utc_now(),
running_status=JobExecutionStatus.RUNNING,
)
execution_id = _execution_id(execution)
execution = JobExecution.create(
job=job,
started_at=utc_now(),
running_status=JobExecutionStatus.RUNNING,
)
execution_id = _execution_id(execution)
artifacts = JobArtifacts.for_execution(
log_dir=self.log_dir, job_id=job_id, execution_id=execution_id
@ -239,6 +253,14 @@ class JobRuntime:
self._trigger_refresh()
return execution_id
def _max_concurrent_jobs_reached(self) -> bool:
return (
JobExecution.select()
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
.count()
>= load_max_concurrent_jobs()
)
def request_execution_cancel(self, execution_id: int) -> bool:
with database.connection_context():
execution = JobExecution.get_or_none(id=execution_id)

View file

@ -1,11 +1,13 @@
from __future__ import annotations
import json
import os
from datetime import UTC, datetime
from enum import IntEnum
from importlib import resources
from importlib.resources.abc import Traversable
from pathlib import Path
from typing import Any
from peewee import (
BooleanField,
@ -29,6 +31,8 @@ DATABASE_PRAGMAS = {
"temp_store": "memory",
}
SCHEMA_GLOB = "*.sql"
MAX_CONCURRENT_JOBS_SETTING_KEY = "max_concurrent_jobs"
DEFAULT_MAX_CONCURRENT_JOBS = 1
database = SqliteDatabase(None, pragmas=DATABASE_PRAGMAS)
@ -78,17 +82,85 @@ def initialize_database(db_path: str | Path | None = None) -> Path:
connection = database.connection()
for path in schema_paths():
connection.executescript(path.read_text(encoding="utf-8"))
_ensure_schema(connection)
finally:
database.close()
return resolved_path
def _ensure_schema(connection: Any) -> None:
connection.execute(
"""
CREATE TABLE IF NOT EXISTS app_setting (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
"""
)
job_columns = {
row[1] for row in connection.execute("PRAGMA table_info('job')").fetchall()
}
if "convert_images" not in job_columns:
connection.execute(
"""
ALTER TABLE job
ADD COLUMN convert_images INTEGER NOT NULL DEFAULT 1
CHECK (convert_images IN (0, 1))
"""
)
if "convert_video" not in job_columns:
connection.execute(
"""
ALTER TABLE job
ADD COLUMN convert_video INTEGER NOT NULL DEFAULT 1
CHECK (convert_video IN (0, 1))
"""
)
def source_slug_exists(slug: str) -> bool:
with database.connection_context():
return Source.select().where(Source.slug == slug).exists()
def save_setting(key: str, value: Any) -> None:
payload = json.dumps(value, sort_keys=True)
with database.connection_context():
with database.atomic():
setting = AppSetting.get_or_none(AppSetting.key == key)
if setting is None:
AppSetting.create(key=key, value=payload)
return
setting.value = payload
setting.save()
def load_setting(key: str, default: Any) -> Any:
with database.connection_context():
setting = AppSetting.get_or_none(AppSetting.key == key)
if setting is None:
return default
try:
return json.loads(setting.value)
except json.JSONDecodeError:
return default
def load_max_concurrent_jobs() -> int:
value = load_setting(MAX_CONCURRENT_JOBS_SETTING_KEY, DEFAULT_MAX_CONCURRENT_JOBS)
try:
parsed = int(value)
except (TypeError, ValueError):
return DEFAULT_MAX_CONCURRENT_JOBS
return parsed if parsed >= 1 else DEFAULT_MAX_CONCURRENT_JOBS
def load_settings_form() -> dict[str, object]:
return {"max_concurrent_jobs": load_max_concurrent_jobs()}
def load_source_form(slug: str) -> dict[str, object] | None:
with database.connection_context():
source = Source.get_or_none(Source.slug == slug)
@ -103,6 +175,8 @@ def load_source_form(slug: str) -> dict[str, object] | None:
"notes": source.notes,
"spider_arguments": job.spider_arguments,
"enabled": job.enabled,
"convert_images": job.convert_images,
"convert_video": job.convert_video,
"cron_minute": job.cron_minute,
"cron_hour": job.cron_hour,
"cron_day_of_month": job.cron_day_of_month,
@ -155,6 +229,8 @@ def create_source(
cron_day_of_month: str,
cron_day_of_week: str,
cron_month: str,
convert_images: bool = True,
convert_video: bool = True,
feed_url: str = "",
pangea_domain: str = "",
pangea_category: str = "",
@ -197,6 +273,8 @@ def create_source(
Job.create(
source=source,
enabled=enabled,
convert_images=convert_images,
convert_video=convert_video,
spider_arguments=spider_arguments,
cron_minute=cron_minute,
cron_hour=cron_hour,
@ -221,6 +299,8 @@ def update_source(
cron_day_of_month: str,
cron_day_of_week: str,
cron_month: str,
convert_images: bool = True,
convert_video: bool = True,
feed_url: str = "",
pangea_domain: str = "",
pangea_category: str = "",
@ -246,6 +326,8 @@ def update_source(
job = Job.get(Job.source == source)
job.enabled = enabled
job.convert_images = convert_images
job.convert_video = convert_video
job.spider_arguments = spider_arguments
job.cron_minute = cron_minute
job.cron_hour = cron_hour
@ -375,6 +457,14 @@ class BaseModel(Model):
database = database
class AppSetting(BaseModel):
key = TextField(primary_key=True)
value = TextField()
class Meta:
table_name = "app_setting"
class Source(BaseModel):
created_at = DateTimeField(default=utc_now)
updated_at = DateTimeField(default=utc_now)
@ -419,6 +509,8 @@ class Job(BaseModel):
created_at = DateTimeField(default=utc_now)
updated_at = DateTimeField(default=utc_now)
enabled = BooleanField()
convert_images = BooleanField(default=True)
convert_video = BooleanField(default=True)
spider_arguments = TextField(default="")
cron_minute = TextField()
cron_hour = TextField()

View file

@ -1,5 +1,6 @@
from repub.pages.dashboard import dashboard_page, dashboard_page_with_data
from repub.pages.runs import execution_logs_page, runs_page
from repub.pages.settings import settings_page
from repub.pages.shim import shim_page
from repub.pages.sources import create_source_page, edit_source_page, sources_page
@ -10,6 +11,7 @@ __all__ = [
"edit_source_page",
"execution_logs_page",
"runs_page",
"settings_page",
"shim_page",
"sources_page",
]

View file

@ -251,17 +251,23 @@ def dashboard_page_with_data(
running_executions: tuple[Mapping[str, object], ...] | None = None,
source_feeds: tuple[Mapping[str, object], ...] | None = None,
) -> Renderable:
running_items = running_executions or ()
source_items = source_feeds or ()
return h.main(
id="morph",
class_="min-h-screen lg:grid lg:grid-cols-[18rem_minmax(0,1fr)]",
)[
admin_sidebar(current_path="/"),
admin_sidebar(
current_path="/",
source_count=len(source_items),
running_count=len(running_items),
),
h.div(class_="px-4 py-4 sm:px-5 lg:px-6 lg:py-5")[
h.div(class_="mx-auto max-w-7xl space-y-5")[
dashboard_header(),
operational_snapshot(snapshot=snapshot),
running_executions_table(running_executions=running_executions),
published_feeds_table(source_feeds=source_feeds),
running_executions_table(running_executions=running_items),
published_feeds_table(source_feeds=source_items),
]
],
]

View file

@ -194,6 +194,7 @@ def runs_page(
running_executions: tuple[Mapping[str, object], ...] | None = None,
upcoming_jobs: tuple[Mapping[str, object], ...] | None = None,
completed_executions: tuple[Mapping[str, object], ...] | None = None,
source_count: int = 0,
) -> Renderable:
running_items = running_executions or ()
upcoming_items = upcoming_jobs or ()
@ -207,6 +208,8 @@ def runs_page(
eyebrow="Execution control",
title="Runs",
actions=muted_action_link(href="/sources", label="Back to sources"),
source_count=source_count,
running_count=len(running_items),
content=(
table_section(
eyebrow="Live work",

82
repub/pages/settings.py Normal file
View file

@ -0,0 +1,82 @@
from __future__ import annotations
from collections.abc import Mapping
import htpy as h
from htpy import Renderable
from repub.components import input_field, muted_action_link, page_shell, section_card
def _value(settings: Mapping[str, object] | None, key: str, default: str = "") -> str:
if settings is None:
return default
return str(settings.get(key, default))
def settings_page(
*,
settings: Mapping[str, object] | None = None,
action_path: str = "/actions/settings",
source_count: int = 0,
running_count: int = 0,
) -> Renderable:
return page_shell(
current_path="/settings",
eyebrow="Configuration",
title="Settings",
description="Global runtime controls for the republisher.",
source_count=source_count,
running_count=running_count,
content=section_card(
content=(
h.form(
{
"data-signals": "{_formError: '', _formSuccess: ''}",
"data-signals__ifmissing": (
"{"
f"maxConcurrentJobs: '{_value(settings, 'max_concurrent_jobs', '1')}'"
"}"
),
"data-on:submit": f"@post('{action_path}')",
},
class_="space-y-6 rounded-[1.5rem] bg-white p-6 shadow-sm ring-1 ring-slate-200",
)[
h.div[
h.p(
class_="text-xs font-semibold uppercase tracking-[0.22em] text-amber-600"
)["Scheduler"],
h.h2(class_="mt-2 text-xl font-semibold text-slate-950")[
"Runtime settings"
],
h.p(class_="mt-2 text-sm text-slate-600")[
"Limit how many jobs the scheduler and manual runs can execute at the same time."
],
],
h.div(
{
"data-show": "$_formError !== ''",
"data-text": "$_formError",
},
class_="rounded-2xl bg-rose-50 px-4 py-3 text-sm font-medium text-rose-800",
),
input_field(
label="Max concurrent jobs",
field_id="max-concurrent-jobs",
value=_value(settings, "max_concurrent_jobs", "1"),
help_text="Must be an integer greater than or equal to 1.",
signal_name="maxConcurrentJobs",
),
h.div(
class_="flex flex-wrap justify-end gap-3 border-t border-slate-200 pt-6"
)[
muted_action_link(href="/", label="Back to dashboard"),
h.button(
type="submit",
class_="rounded-full bg-slate-950 px-4 py-2.5 text-sm font-semibold text-white transition hover:bg-slate-800",
)["Save settings"],
],
],
)
),
)

View file

@ -37,7 +37,11 @@ def shim_page(
id="morph",
class_="min-h-screen lg:grid lg:grid-cols-[18rem_minmax(0,1fr)]",
)[
admin_sidebar(current_path=current_path),
admin_sidebar(
current_path=current_path,
source_count=0,
running_count=0,
),
h.div(class_="px-4 py-4 sm:px-5 lg:px-6 lg:py-5")[
h.div(class_="mx-auto max-w-7xl space-y-5")[
h.section[

View file

@ -128,13 +128,18 @@ def sources_table(
def sources_page(
*, sources: tuple[Mapping[str, object], ...] | None = None
*,
sources: tuple[Mapping[str, object], ...] | None = None,
running_count: int = 0,
) -> Renderable:
source_items = sources or ()
return page_shell(
current_path="/sources",
eyebrow="Source management",
title="Sources",
content=sources_table(sources=sources),
source_count=len(source_items),
running_count=running_count,
content=sources_table(sources=source_items),
)
@ -398,6 +403,18 @@ def source_form(
signal_name="jobEnabled",
checked=_checked(source, "enabled", True),
),
toggle_field(
label="Convert images",
description="Normalize mirrored images through the image conversion pipeline for this source.",
signal_name="convertImages",
checked=_checked(source, "convert_images", True),
),
toggle_field(
label="Convert video",
description="Run mirrored videos through the video conversion pipeline for this source.",
signal_name="convertVideo",
checked=_checked(source, "convert_video", True),
),
],
],
],
@ -415,7 +432,12 @@ def source_form(
)
def create_source_page(*, action_path: str = "/actions/sources/create") -> Renderable:
def create_source_page(
*,
action_path: str = "/actions/sources/create",
source_count: int = 0,
running_count: int = 0,
) -> Renderable:
actions = (
muted_action_link(href="/sources", label="Back to sources"),
header_action_link(href="/runs", label="View runs"),
@ -425,6 +447,8 @@ def create_source_page(*, action_path: str = "/actions/sources/create") -> Rende
eyebrow="Source creation",
title="Create source",
actions=actions,
source_count=source_count,
running_count=running_count,
content=source_form(mode="create", action_path=action_path),
)
@ -434,6 +458,8 @@ def edit_source_page(
slug: str,
source: Mapping[str, object],
action_path: str,
source_count: int = 0,
running_count: int = 0,
) -> Renderable:
actions = (
muted_action_link(href="/sources", label="Back to sources"),
@ -444,5 +470,7 @@ def edit_source_page(
eyebrow="Source editing",
title="Edit source",
actions=actions,
source_count=source_count,
running_count=running_count,
content=source_form(mode="edit", action_path=action_path, source=source),
)

View file

@ -28,8 +28,10 @@ from repub.model import (
delete_job_source,
delete_source,
initialize_database,
load_settings_form,
load_source_form,
load_sources,
save_setting,
source_slug_exists,
update_source,
)
@ -39,6 +41,7 @@ from repub.pages import (
edit_source_page,
execution_logs_page,
runs_page,
settings_page,
shim_page,
sources_page,
)
@ -59,6 +62,8 @@ class SourceFormData(TypedDict):
notes: str
spider_arguments: str
enabled: bool
convert_images: bool
convert_video: bool
cron_minute: str
cron_hour: str
cron_day_of_month: str
@ -77,6 +82,10 @@ class SourceFormData(TypedDict):
include_content: bool
class SettingsFormData(TypedDict):
max_concurrent_jobs: int
DEFAULT_PANGEA_CONTENT_FORMAT = "MOBILE_3"
DEFAULT_PANGEA_CONTENT_TYPE = "articles"
DEFAULT_PANGEA_MAX_ARTICLES = "10"
@ -123,6 +132,7 @@ def create_app(*, dev_mode: bool = False) -> Quart:
@app.get("/sources/create")
@app.get("/sources/<string:slug>/edit")
@app.get("/runs")
@app.get("/settings")
@app.get("/job/<int:job_id>/execution/<int:execution_id>/logs")
async def page_shim(
slug: str | None = None,
@ -158,7 +168,11 @@ def create_app(*, dev_mode: bool = False) -> Quart:
@app.post("/sources/<string:slug>/edit")
async def edit_source_patch(slug: str) -> DatastarResponse:
return _page_patch_response(app, lambda: render_edit_source(slug))
return _page_patch_response(app, lambda: render_edit_source(slug, app))
@app.post("/settings")
async def settings_patch() -> DatastarResponse:
return _page_patch_response(app, lambda: render_settings(app))
@app.post("/actions/sources/create")
async def create_source_action() -> DatastarResponse:
@ -217,6 +231,20 @@ def create_app(*, dev_mode: bool = False) -> Quart:
trigger_refresh(app)
return Response(status=204)
@app.post("/actions/settings")
async def update_settings_action() -> DatastarResponse:
signals = cast(dict[str, object], await read_signals())
settings, error = validate_settings_form(signals)
if error is not None:
return DatastarResponse(
SSE.patch_signals({"_formError": error, "_formSuccess": ""})
)
assert settings is not None
save_setting("max_concurrent_jobs", settings["max_concurrent_jobs"])
trigger_refresh(app)
return DatastarResponse(SSE.redirect("/settings"))
@app.post("/runs")
async def runs_patch() -> DatastarResponse:
return _page_patch_response(app, lambda: render_runs(app))
@ -300,16 +328,30 @@ async def render_dashboard(app: Quart | None = None) -> Renderable:
async def render_sources(app: Quart | None = None) -> Renderable:
sources = None if app is None else load_sources()
return sources_page(sources=sources)
if app is None:
return sources_page()
sources = load_sources()
return sources_page(
sources=sources,
running_count=len(
load_runs_view(log_dir=app.config["REPUB_LOG_DIR"])["running"]
),
)
async def render_create_source(app: Quart | None = None) -> Renderable:
del app
return create_source_page()
if app is None:
return create_source_page()
sidebar_counts = _load_sidebar_counts(app)
return create_source_page(
source_count=sidebar_counts["source_count"],
running_count=sidebar_counts["running_count"],
)
async def render_edit_source(slug: str) -> Renderable:
async def render_edit_source(slug: str, app: Quart | None = None) -> Renderable:
source = load_source_form(slug)
if source is None:
return sources_page(sources=())
@ -317,6 +359,7 @@ async def render_edit_source(slug: str) -> Renderable:
slug=slug,
source=source,
action_path=f"/actions/sources/{slug}/edit",
**({} if app is None else _load_sidebar_counts(app)),
)
@ -329,6 +372,18 @@ async def render_runs(app: Quart | None = None) -> Renderable:
running_executions=cast(tuple[dict[str, object], ...], view["running"]),
upcoming_jobs=cast(tuple[dict[str, object], ...], view["upcoming"]),
completed_executions=cast(tuple[dict[str, object], ...], view["completed"]),
source_count=len(load_sources()),
)
async def render_settings(app: Quart | None = None) -> Renderable:
if app is None:
return settings_page(settings=load_settings_form())
sidebar_counts = _load_sidebar_counts(app)
return settings_page(
settings=load_settings_form(),
source_count=sidebar_counts["source_count"],
running_count=sidebar_counts["running_count"],
)
@ -377,6 +432,15 @@ async def _unsubscribe_on_close(
get_refresh_broker(app).unsubscribe(cast(asyncio.Queue[object], queue))
def _load_sidebar_counts(app: Quart) -> dict[str, int]:
return {
"source_count": len(load_sources()),
"running_count": len(
load_runs_view(log_dir=app.config["REPUB_LOG_DIR"])["running"]
),
}
def validate_source_form(
signals: dict[str, object] | None,
*,
@ -469,6 +533,8 @@ def validate_source_form(
"max_articles": _parse_int(max_articles),
"oldest_article": _parse_int(oldest_article),
"enabled": enabled,
"convert_images": _read_bool(signals, "convertImages", default=True),
"convert_video": _read_bool(signals, "convertVideo", default=True),
"only_newest": _read_bool(signals, "onlyNewest", default=True),
"include_authors": _read_bool(signals, "includeAuthors", default=True),
"exclude_media": _read_bool(signals, "excludeMedia", default=False),
@ -482,6 +548,20 @@ def validate_source_form(
return source, None
def validate_settings_form(
signals: dict[str, object] | None,
) -> tuple[SettingsFormData | None, str | None]:
if signals is None:
return None, "Missing form data."
max_concurrent_jobs = _parse_int(_read_string(signals, "maxConcurrentJobs"))
if max_concurrent_jobs is None:
return None, "Max concurrent jobs must be an integer."
if max_concurrent_jobs < 1:
return None, "Max concurrent jobs must be at least 1."
return {"max_concurrent_jobs": max_concurrent_jobs}, None
def _read_string(signals: dict[str, object], key: str, *, strip: bool = True) -> str:
value = str(signals.get(key, ""))
return value.strip() if strip else value

View file

@ -195,3 +195,35 @@ def test_build_feed_settings_uses_runtime_media_dir_overrides(tmp_path: Path) ->
assert feed_settings["AUDIO_STORE"] == str(
out_dir / "feeds" / "gp-pod" / "audio-custom"
)
def test_build_feed_settings_can_disable_image_and_video_conversion(
tmp_path: Path,
) -> None:
out_dir = (tmp_path / "mirror").resolve()
config = RepublisherConfig(
config_path=tmp_path / "repub.toml",
out_dir=out_dir,
feeds=(
FeedConfig(
name="Guardian Project Podcast",
slug="gp-pod",
url="https://guardianproject.info/podcast/podcast.xml",
),
),
scrapy_settings={},
)
base_settings = build_base_settings(config)
feed_settings = build_feed_settings(
base_settings,
out_dir=out_dir,
feed_slug="gp-pod",
convert_images=False,
convert_video=False,
)
assert "repub.pipelines.ImagePipeline" not in feed_settings["ITEM_PIPELINES"]
assert "repub.pipelines.VideoPipeline" not in feed_settings["ITEM_PIPELINES"]
assert feed_settings["ITEM_PIPELINES"]["repub.pipelines.AudioPipeline"] == 2
assert feed_settings["ITEM_PIPELINES"]["repub.pipelines.FilePipeline"] == 4

View file

@ -7,11 +7,14 @@ import pytest
from peewee import IntegrityError
from repub.model import (
AppSetting,
Job,
Source,
database,
initialize_database,
load_max_concurrent_jobs,
resolve_database_path,
save_setting,
)
@ -51,6 +54,7 @@ def test_initialize_database_bootstraps_schema_from_sql_files(tmp_path: Path) ->
)
}
assert table_names == {
"app_setting",
"job",
"job_execution",
"source",
@ -59,17 +63,10 @@ def test_initialize_database_bootstraps_schema_from_sql_files(tmp_path: Path) ->
}
defaults = {
row[1]: row[4]
for row in connection.execute("PRAGMA table_info('source_pangea')")
row[1]: row[4] for row in connection.execute("PRAGMA table_info('job')")
}
assert defaults["content_type"] is None
assert defaults["only_newest"] is None
assert defaults["max_articles"] is None
assert defaults["oldest_article"] is None
assert defaults["include_authors"] is None
assert defaults["exclude_media"] is None
assert defaults["include_content"] is None
assert defaults["content_format"] is None
assert defaults["convert_images"] == "1"
assert defaults["convert_video"] == "1"
finally:
connection.close()
@ -168,3 +165,20 @@ def test_job_table_allows_exactly_one_job_per_source(tmp_path: Path) -> None:
cron_day_of_week="*",
cron_month="*",
)
def test_load_max_concurrent_jobs_defaults_to_one(tmp_path: Path) -> None:
initialize_database(tmp_path / "settings-defaults.db")
assert load_max_concurrent_jobs() == 1
def test_save_setting_persists_json_value(tmp_path: Path) -> None:
initialize_database(tmp_path / "settings-roundtrip.db")
save_setting("max_concurrent_jobs", 4)
row = AppSetting.get(AppSetting.key == "max_concurrent_jobs")
assert row.value == "4"
assert load_max_concurrent_jobs() == 4

View file

@ -20,6 +20,7 @@ from repub.model import (
Source,
create_source,
initialize_database,
save_setting,
)
from repub.web import create_app, get_job_runtime, render_execution_logs, render_runs
@ -137,6 +138,68 @@ def test_job_runtime_run_now_writes_log_and_stats_and_marks_success(
runtime.shutdown()
def test_job_runtime_respects_max_concurrent_jobs_setting(tmp_path: Path) -> None:
db_path = tmp_path / "max-concurrency.db"
log_dir = tmp_path / "out" / "logs"
initialize_database(db_path)
save_setting("max_concurrent_jobs", 1)
with _slow_feed_server() as feed_url:
first_source = create_source(
name="First source",
slug="first-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url=feed_url,
)
second_source = create_source(
name="Second source",
slug="second-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url=feed_url,
)
first_job = Job.get(Job.source == first_source)
second_job = Job.get(Job.source == second_source)
runtime = JobRuntime(log_dir=log_dir)
try:
runtime.start()
first_execution_id = runtime.run_job_now(first_job.id, reason="manual")
assert first_execution_id is not None
_wait_for_running_execution(first_execution_id)
second_execution_id = runtime.run_job_now(second_job.id, reason="manual")
assert second_execution_id is None
assert (
JobExecution.select()
.where(JobExecution.running_status == JobExecutionStatus.RUNNING)
.count()
== 1
)
runtime.request_execution_cancel(first_execution_id)
finished_execution = _wait_for_terminal_execution(first_execution_id)
assert finished_execution.running_status == JobExecutionStatus.CANCELED
finally:
runtime.shutdown()
def test_job_runtime_cancel_marks_execution_canceled(tmp_path: Path) -> None:
initialize_database(tmp_path / "cancel.db")
with _slow_feed_server() as feed_url:

View file

@ -2,6 +2,7 @@ from __future__ import annotations
import asyncio
import os
import re
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any, cast
@ -17,6 +18,8 @@ from repub.model import (
SourceFeed,
SourcePangea,
create_source,
load_max_concurrent_jobs,
save_setting,
)
from repub.pages.runs import runs_page
from repub.web import (
@ -27,6 +30,7 @@ from repub.web import (
render_edit_source,
render_execution_logs,
render_runs,
render_settings,
render_sources,
)
@ -109,6 +113,7 @@ def test_root_get_serves_datastar_shim() -> None:
assert '<main id="morph"' in body
assert 'href="/sources"' in body
assert 'href="/runs"' in body
assert 'href="/settings"' in body
assert "Connecting" in body
asyncio.run(run())
@ -430,6 +435,94 @@ def test_render_sources_shows_table_and_create_link() -> None:
asyncio.run(run())
def test_render_sources_shows_live_sidebar_badges(monkeypatch, tmp_path: Path) -> None:
db_path = tmp_path / "sources-sidebar.db"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
app = create_app()
create_source(
name="First source",
slug="first-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="0",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/first.xml",
)
create_source(
name="Second source",
slug="second-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="0",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/second.xml",
)
async def run() -> None:
body = str(await render_sources(app))
assert re.search(
r'href="/sources"[^>]*>.*?<span>Sources</span>\s*<span[^>]*>2</span>',
body,
re.S,
)
assert re.search(
r'href="/runs"[^>]*>.*?<span>Runs</span>\s*<span[^>]*>0</span>',
body,
re.S,
)
asyncio.run(run())
def test_render_dashboard_shows_live_sidebar_badges(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "dashboard-sidebar.db"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
app = create_app()
create_source(
name="Dashboard source",
slug="dashboard-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="0",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/dashboard.xml",
)
async def run() -> None:
body = str(await render_dashboard(app))
assert re.search(
r'href="/sources"[^>]*>.*?<span>Sources</span>\s*<span[^>]*>1</span>',
body,
re.S,
)
assert re.search(
r'href="/runs"[^>]*>.*?<span>Runs</span>\s*<span[^>]*>0</span>',
body,
re.S,
)
asyncio.run(run())
def test_render_sources_shows_delete_action_for_each_source(
monkeypatch, tmp_path: Path
) -> None:
@ -476,6 +569,8 @@ def test_render_create_source_shows_dedicated_form_page() -> None:
assert "includeAuthors" in body
assert "excludeMedia" in body
assert "includeContent" in body
assert "convertImages" in body
assert "convertVideo" in body
assert "TEXT_ONLY" in body
assert "breakingnews" in body
assert "Pangea domain" in body
@ -512,6 +607,8 @@ def test_render_edit_source_shows_existing_values(monkeypatch, tmp_path: Path) -
notes="Regional health alerts.",
spider_arguments="language=en\ndownload_media=true",
enabled=True,
convert_images=False,
convert_video=False,
cron_minute="0",
cron_hour="*/6",
cron_day_of_month="*",
@ -546,6 +643,28 @@ def test_render_edit_source_shows_existing_values(monkeypatch, tmp_path: Path) -
assert "example.org" in body
assert "Health" in body
assert "language=en\ndownload_media=true" in body
assert "convertImages: false" in body
assert "convertVideo: false" in body
asyncio.run(run())
def test_render_settings_shows_current_max_concurrent_jobs(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "settings-page.db"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
create_app()
save_setting("max_concurrent_jobs", 3)
async def run() -> None:
app = create_app()
body = str(await render_settings(app))
assert ">Settings<" in body
assert "/actions/settings" in body
assert 'value="3"' in body
assert "Max concurrent jobs" in body
asyncio.run(run())
@ -602,6 +721,8 @@ def test_create_source_action_creates_pangea_source_and_job_in_database(
assert pangea.content_type == "breakingnews"
assert pangea.include_content is True
assert job.enabled is True
assert job.convert_images is True
assert job.convert_video is True
assert job.spider_arguments == "language=en\ndownload_media=true"
assert job.cron_hour == "*/6"
assert "kenya-health" in rendered_sources
@ -713,6 +834,8 @@ def test_edit_source_action_updates_existing_source_and_job_in_database(
"cronDayOfWeek": "*",
"cronMonth": "*",
"jobEnabled": False,
"convertImages": False,
"convertVideo": False,
"onlyNewest": False,
"includeAuthors": False,
"excludeMedia": True,
@ -737,6 +860,8 @@ def test_edit_source_action_updates_existing_source_and_job_in_database(
assert pangea.include_authors is False
assert pangea.exclude_media is True
assert job.enabled is False
assert job.convert_images is False
assert job.convert_video is False
assert job.spider_arguments == "language=sw\ninclude_audio=false"
assert job.cron_hour == "2"
assert "Kenya health desk nightly" in rendered_sources
@ -863,6 +988,55 @@ def test_create_source_action_validates_duplicate_slug_and_pangea_type(
asyncio.run(run())
def test_settings_action_updates_max_concurrent_jobs(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "settings-action.db"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
async def run() -> None:
app = create_app()
client = app.test_client()
response = await client.post(
"/actions/settings",
headers={"Datastar-Request": "true"},
json={"maxConcurrentJobs": "3"},
)
body = await response.get_data(as_text=True)
assert response.status_code == 200
assert "window.location = '/settings'" in body
assert load_max_concurrent_jobs() == 3
assert 'value="3"' in str(await render_settings(app))
asyncio.run(run())
def test_settings_action_rejects_non_positive_max_concurrent_jobs(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "settings-invalid.db"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
async def run() -> None:
app = create_app()
client = app.test_client()
response = await client.post(
"/actions/settings",
headers={"Datastar-Request": "true"},
json={"maxConcurrentJobs": "0"},
)
body = await response.get_data(as_text=True)
assert response.status_code == 200
assert "Max concurrent jobs must be at least 1." in body
assert load_max_concurrent_jobs() == 1
asyncio.run(run())
def test_render_runs_shows_running_upcoming_and_completed_tables(
monkeypatch, tmp_path: Path
) -> None: