diff --git a/repub/components.py b/repub/components.py index 6ecf837..af9e631 100644 --- a/repub/components.py +++ b/repub/components.py @@ -39,7 +39,9 @@ def nav_link( ] -def admin_sidebar(*, current_path: str) -> Renderable: +def admin_sidebar( + *, current_path: str, source_count: int = 0, running_count: int = 0 +) -> Renderable: return h.aside( class_="relative overflow-hidden bg-slate-950 px-6 py-8 text-white lg:min-h-screen" )[ @@ -68,14 +70,20 @@ def admin_sidebar(*, current_path: str) -> Renderable: label="Sources", href="/sources", active=current_path.startswith("/sources"), - badge="12", + badge=str(source_count), ), nav_link( label="Runs", href="/runs", active=current_path.startswith("/runs") or current_path.startswith("/job/"), - badge="3", + badge=str(running_count), + ), + nav_link( + label="Settings", + href="/settings", + active=current_path.startswith("/settings"), + badge="App", ), ], h.div(class_="mt-auto rounded-3xl bg-white/5 p-5 ring-1 ring-white/10")[ @@ -148,13 +156,19 @@ def page_shell( title: str, description: str | None = None, actions: Node | None = None, + source_count: int = 0, + running_count: int = 0, content: Node, ) -> Renderable: return h.main( id="morph", class_="min-h-screen lg:grid lg:grid-cols-[18rem_minmax(0,1fr)]", )[ - admin_sidebar(current_path=current_path), + admin_sidebar( + current_path=current_path, + source_count=source_count, + running_count=running_count, + ), h.div(class_="px-4 py-4 sm:px-5 lg:px-6 lg:py-5")[ h.div(class_="mx-auto max-w-7xl space-y-5")[ h.section[ diff --git a/repub/config.py b/repub/config.py index 62a8376..e14e0d2 100644 --- a/repub/config.py +++ b/repub/config.py @@ -180,6 +180,8 @@ def build_feed_settings( *, out_dir: Path, feed_slug: str, + convert_images: bool = True, + convert_video: bool = True, ) -> Settings: feed_dir = feed_output_dir(out_dir=out_dir, feed_slug=feed_slug) image_dir = base_settings.get("REPUBLISHER_IMAGE_DIR", IMAGE_DIR) @@ -187,14 +189,20 @@ def build_feed_settings( audio_dir = base_settings.get("REPUBLISHER_AUDIO_DIR", AUDIO_DIR) file_dir = base_settings.get("REPUBLISHER_FILE_DIR", FILE_DIR) item_pipelines = dict(base_settings.getdict("ITEM_PIPELINES")) + item_pipelines.pop("repub.pipelines.ImagePipeline", None) + item_pipelines.pop("repub.pipelines.AudioPipeline", None) + item_pipelines.pop("repub.pipelines.VideoPipeline", None) + item_pipelines.pop("repub.pipelines.FilePipeline", None) item_pipelines.update( { - "repub.pipelines.ImagePipeline": 1, "repub.pipelines.AudioPipeline": 2, - "repub.pipelines.VideoPipeline": 3, "repub.pipelines.FilePipeline": 4, } ) + if convert_images: + item_pipelines["repub.pipelines.ImagePipeline"] = 1 + if convert_video: + item_pipelines["repub.pipelines.VideoPipeline"] = 3 settings = base_settings.copy() settings.setdict( { diff --git a/repub/job_runner.py b/repub/job_runner.py index 5419cbd..f95b5d7 100644 --- a/repub/job_runner.py +++ b/repub/job_runner.py @@ -186,6 +186,8 @@ class JobSourceConfig: source_slug: str source_type: str spider_arguments: dict[str, str] + convert_images: bool = True + convert_video: bool = True feed_url: str | None = None pangea_domain: str | None = None pangea_category: str | None = None @@ -267,6 +269,8 @@ def main(argv: list[str] | None = None) -> int: out_dir=out_dir, feed=feed, stats_path=stats_path, + convert_images=source_config.convert_images, + convert_video=source_config.convert_video, ) ) print( @@ -326,6 +330,8 @@ def _load_job_source_config(*, db_path: str, job_id: int) -> JobSourceConfig: source_slug=source.slug, source_type=source.source_type, spider_arguments=spider_arguments, + convert_images=bool(job.convert_images), + convert_video=bool(job.convert_video), feed_url=feed.feed_url, ) @@ -339,6 +345,8 @@ def _load_job_source_config(*, db_path: str, job_id: int) -> JobSourceConfig: source_slug=source.slug, source_type=source.source_type, spider_arguments=spider_arguments, + convert_images=bool(job.convert_images), + convert_video=bool(job.convert_video), pangea_domain=pangea.domain, pangea_category=pangea.category_name, content_type=pangea.content_type, @@ -409,7 +417,14 @@ def _resolve_feed( ) -def _build_crawl_settings(*, out_dir: Path, feed: FeedConfig, stats_path: Path): +def _build_crawl_settings( + *, + out_dir: Path, + feed: FeedConfig, + stats_path: Path, + convert_images: bool = True, + convert_video: bool = True, +): base_settings = build_base_settings( RepublisherConfig( config_path=out_dir / "job-runner.toml", @@ -419,7 +434,13 @@ def _build_crawl_settings(*, out_dir: Path, feed: FeedConfig, stats_path: Path): ) ) prepare_output_dirs(out_dir, feed.slug) - settings = build_feed_settings(base_settings, out_dir=out_dir, feed_slug=feed.slug) + settings = build_feed_settings( + base_settings, + out_dir=out_dir, + feed_slug=feed.slug, + convert_images=convert_images, + convert_video=convert_video, + ) settings.set("LOG_FILE", None, priority="cmdline") settings.set( "STATS_CLASS", diff --git a/repub/jobs.py b/repub/jobs.py index de504ff..b5441ac 100644 --- a/repub/jobs.py +++ b/repub/jobs.py @@ -5,6 +5,7 @@ import os import signal import subprocess import sys +import threading import time from dataclasses import dataclass from datetime import UTC, datetime, timedelta @@ -15,7 +16,15 @@ from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from repub.config import feed_output_dir, feed_output_path -from repub.model import Job, JobExecution, JobExecutionStatus, Source, database, utc_now +from repub.model import ( + Job, + JobExecution, + JobExecutionStatus, + Source, + database, + load_max_concurrent_jobs, + utc_now, +) SCHEDULER_JOB_PREFIX = "job-" POLL_JOB_ID = "runtime-poll-workers" @@ -105,6 +114,7 @@ class JobRuntime: self.graceful_stop_seconds = graceful_stop_seconds self.scheduler = BackgroundScheduler(timezone=UTC) self._workers: dict[int, RunningWorker] = {} + self._run_lock = threading.Lock() self._started = False def start(self) -> None: @@ -178,28 +188,32 @@ class JobRuntime: def run_job_now(self, job_id: int, *, reason: str) -> int | None: del reason self.start() - with database.connection_context(): - job = Job.get_or_none(id=job_id) - if job is None: - return None + with self._run_lock: + with database.connection_context(): + job = Job.get_or_none(id=job_id) + if job is None: + return None - already_running = ( - JobExecution.select() - .where( - (JobExecution.job == job) - & (JobExecution.running_status == JobExecutionStatus.RUNNING) + if self._max_concurrent_jobs_reached(): + return None + + already_running = ( + JobExecution.select() + .where( + (JobExecution.job == job) + & (JobExecution.running_status == JobExecutionStatus.RUNNING) + ) + .exists() ) - .exists() - ) - if already_running: - return None + if already_running: + return None - execution = JobExecution.create( - job=job, - started_at=utc_now(), - running_status=JobExecutionStatus.RUNNING, - ) - execution_id = _execution_id(execution) + execution = JobExecution.create( + job=job, + started_at=utc_now(), + running_status=JobExecutionStatus.RUNNING, + ) + execution_id = _execution_id(execution) artifacts = JobArtifacts.for_execution( log_dir=self.log_dir, job_id=job_id, execution_id=execution_id @@ -239,6 +253,14 @@ class JobRuntime: self._trigger_refresh() return execution_id + def _max_concurrent_jobs_reached(self) -> bool: + return ( + JobExecution.select() + .where(JobExecution.running_status == JobExecutionStatus.RUNNING) + .count() + >= load_max_concurrent_jobs() + ) + def request_execution_cancel(self, execution_id: int) -> bool: with database.connection_context(): execution = JobExecution.get_or_none(id=execution_id) diff --git a/repub/model.py b/repub/model.py index accdf8e..2126d50 100644 --- a/repub/model.py +++ b/repub/model.py @@ -1,11 +1,13 @@ from __future__ import annotations +import json import os from datetime import UTC, datetime from enum import IntEnum from importlib import resources from importlib.resources.abc import Traversable from pathlib import Path +from typing import Any from peewee import ( BooleanField, @@ -29,6 +31,8 @@ DATABASE_PRAGMAS = { "temp_store": "memory", } SCHEMA_GLOB = "*.sql" +MAX_CONCURRENT_JOBS_SETTING_KEY = "max_concurrent_jobs" +DEFAULT_MAX_CONCURRENT_JOBS = 1 database = SqliteDatabase(None, pragmas=DATABASE_PRAGMAS) @@ -78,17 +82,85 @@ def initialize_database(db_path: str | Path | None = None) -> Path: connection = database.connection() for path in schema_paths(): connection.executescript(path.read_text(encoding="utf-8")) + _ensure_schema(connection) finally: database.close() return resolved_path +def _ensure_schema(connection: Any) -> None: + connection.execute( + """ + CREATE TABLE IF NOT EXISTS app_setting ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ) + """ + ) + + job_columns = { + row[1] for row in connection.execute("PRAGMA table_info('job')").fetchall() + } + if "convert_images" not in job_columns: + connection.execute( + """ + ALTER TABLE job + ADD COLUMN convert_images INTEGER NOT NULL DEFAULT 1 + CHECK (convert_images IN (0, 1)) + """ + ) + if "convert_video" not in job_columns: + connection.execute( + """ + ALTER TABLE job + ADD COLUMN convert_video INTEGER NOT NULL DEFAULT 1 + CHECK (convert_video IN (0, 1)) + """ + ) + + def source_slug_exists(slug: str) -> bool: with database.connection_context(): return Source.select().where(Source.slug == slug).exists() +def save_setting(key: str, value: Any) -> None: + payload = json.dumps(value, sort_keys=True) + with database.connection_context(): + with database.atomic(): + setting = AppSetting.get_or_none(AppSetting.key == key) + if setting is None: + AppSetting.create(key=key, value=payload) + return + setting.value = payload + setting.save() + + +def load_setting(key: str, default: Any) -> Any: + with database.connection_context(): + setting = AppSetting.get_or_none(AppSetting.key == key) + if setting is None: + return default + try: + return json.loads(setting.value) + except json.JSONDecodeError: + return default + + +def load_max_concurrent_jobs() -> int: + value = load_setting(MAX_CONCURRENT_JOBS_SETTING_KEY, DEFAULT_MAX_CONCURRENT_JOBS) + try: + parsed = int(value) + except (TypeError, ValueError): + return DEFAULT_MAX_CONCURRENT_JOBS + return parsed if parsed >= 1 else DEFAULT_MAX_CONCURRENT_JOBS + + +def load_settings_form() -> dict[str, object]: + return {"max_concurrent_jobs": load_max_concurrent_jobs()} + + def load_source_form(slug: str) -> dict[str, object] | None: with database.connection_context(): source = Source.get_or_none(Source.slug == slug) @@ -103,6 +175,8 @@ def load_source_form(slug: str) -> dict[str, object] | None: "notes": source.notes, "spider_arguments": job.spider_arguments, "enabled": job.enabled, + "convert_images": job.convert_images, + "convert_video": job.convert_video, "cron_minute": job.cron_minute, "cron_hour": job.cron_hour, "cron_day_of_month": job.cron_day_of_month, @@ -155,6 +229,8 @@ def create_source( cron_day_of_month: str, cron_day_of_week: str, cron_month: str, + convert_images: bool = True, + convert_video: bool = True, feed_url: str = "", pangea_domain: str = "", pangea_category: str = "", @@ -197,6 +273,8 @@ def create_source( Job.create( source=source, enabled=enabled, + convert_images=convert_images, + convert_video=convert_video, spider_arguments=spider_arguments, cron_minute=cron_minute, cron_hour=cron_hour, @@ -221,6 +299,8 @@ def update_source( cron_day_of_month: str, cron_day_of_week: str, cron_month: str, + convert_images: bool = True, + convert_video: bool = True, feed_url: str = "", pangea_domain: str = "", pangea_category: str = "", @@ -246,6 +326,8 @@ def update_source( job = Job.get(Job.source == source) job.enabled = enabled + job.convert_images = convert_images + job.convert_video = convert_video job.spider_arguments = spider_arguments job.cron_minute = cron_minute job.cron_hour = cron_hour @@ -375,6 +457,14 @@ class BaseModel(Model): database = database +class AppSetting(BaseModel): + key = TextField(primary_key=True) + value = TextField() + + class Meta: + table_name = "app_setting" + + class Source(BaseModel): created_at = DateTimeField(default=utc_now) updated_at = DateTimeField(default=utc_now) @@ -419,6 +509,8 @@ class Job(BaseModel): created_at = DateTimeField(default=utc_now) updated_at = DateTimeField(default=utc_now) enabled = BooleanField() + convert_images = BooleanField(default=True) + convert_video = BooleanField(default=True) spider_arguments = TextField(default="") cron_minute = TextField() cron_hour = TextField() diff --git a/repub/pages/__init__.py b/repub/pages/__init__.py index bc914b7..38a43e1 100644 --- a/repub/pages/__init__.py +++ b/repub/pages/__init__.py @@ -1,5 +1,6 @@ from repub.pages.dashboard import dashboard_page, dashboard_page_with_data from repub.pages.runs import execution_logs_page, runs_page +from repub.pages.settings import settings_page from repub.pages.shim import shim_page from repub.pages.sources import create_source_page, edit_source_page, sources_page @@ -10,6 +11,7 @@ __all__ = [ "edit_source_page", "execution_logs_page", "runs_page", + "settings_page", "shim_page", "sources_page", ] diff --git a/repub/pages/dashboard.py b/repub/pages/dashboard.py index 8f61b53..ef75847 100644 --- a/repub/pages/dashboard.py +++ b/repub/pages/dashboard.py @@ -251,17 +251,23 @@ def dashboard_page_with_data( running_executions: tuple[Mapping[str, object], ...] | None = None, source_feeds: tuple[Mapping[str, object], ...] | None = None, ) -> Renderable: + running_items = running_executions or () + source_items = source_feeds or () return h.main( id="morph", class_="min-h-screen lg:grid lg:grid-cols-[18rem_minmax(0,1fr)]", )[ - admin_sidebar(current_path="/"), + admin_sidebar( + current_path="/", + source_count=len(source_items), + running_count=len(running_items), + ), h.div(class_="px-4 py-4 sm:px-5 lg:px-6 lg:py-5")[ h.div(class_="mx-auto max-w-7xl space-y-5")[ dashboard_header(), operational_snapshot(snapshot=snapshot), - running_executions_table(running_executions=running_executions), - published_feeds_table(source_feeds=source_feeds), + running_executions_table(running_executions=running_items), + published_feeds_table(source_feeds=source_items), ] ], ] diff --git a/repub/pages/runs.py b/repub/pages/runs.py index a42c751..058f1bf 100644 --- a/repub/pages/runs.py +++ b/repub/pages/runs.py @@ -194,6 +194,7 @@ def runs_page( running_executions: tuple[Mapping[str, object], ...] | None = None, upcoming_jobs: tuple[Mapping[str, object], ...] | None = None, completed_executions: tuple[Mapping[str, object], ...] | None = None, + source_count: int = 0, ) -> Renderable: running_items = running_executions or () upcoming_items = upcoming_jobs or () @@ -207,6 +208,8 @@ def runs_page( eyebrow="Execution control", title="Runs", actions=muted_action_link(href="/sources", label="Back to sources"), + source_count=source_count, + running_count=len(running_items), content=( table_section( eyebrow="Live work", diff --git a/repub/pages/settings.py b/repub/pages/settings.py new file mode 100644 index 0000000..aa9bdbc --- /dev/null +++ b/repub/pages/settings.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from collections.abc import Mapping + +import htpy as h +from htpy import Renderable + +from repub.components import input_field, muted_action_link, page_shell, section_card + + +def _value(settings: Mapping[str, object] | None, key: str, default: str = "") -> str: + if settings is None: + return default + return str(settings.get(key, default)) + + +def settings_page( + *, + settings: Mapping[str, object] | None = None, + action_path: str = "/actions/settings", + source_count: int = 0, + running_count: int = 0, +) -> Renderable: + return page_shell( + current_path="/settings", + eyebrow="Configuration", + title="Settings", + description="Global runtime controls for the republisher.", + source_count=source_count, + running_count=running_count, + content=section_card( + content=( + h.form( + { + "data-signals": "{_formError: '', _formSuccess: ''}", + "data-signals__ifmissing": ( + "{" + f"maxConcurrentJobs: '{_value(settings, 'max_concurrent_jobs', '1')}'" + "}" + ), + "data-on:submit": f"@post('{action_path}')", + }, + class_="space-y-6 rounded-[1.5rem] bg-white p-6 shadow-sm ring-1 ring-slate-200", + )[ + h.div[ + h.p( + class_="text-xs font-semibold uppercase tracking-[0.22em] text-amber-600" + )["Scheduler"], + h.h2(class_="mt-2 text-xl font-semibold text-slate-950")[ + "Runtime settings" + ], + h.p(class_="mt-2 text-sm text-slate-600")[ + "Limit how many jobs the scheduler and manual runs can execute at the same time." + ], + ], + h.div( + { + "data-show": "$_formError !== ''", + "data-text": "$_formError", + }, + class_="rounded-2xl bg-rose-50 px-4 py-3 text-sm font-medium text-rose-800", + ), + input_field( + label="Max concurrent jobs", + field_id="max-concurrent-jobs", + value=_value(settings, "max_concurrent_jobs", "1"), + help_text="Must be an integer greater than or equal to 1.", + signal_name="maxConcurrentJobs", + ), + h.div( + class_="flex flex-wrap justify-end gap-3 border-t border-slate-200 pt-6" + )[ + muted_action_link(href="/", label="Back to dashboard"), + h.button( + type="submit", + class_="rounded-full bg-slate-950 px-4 py-2.5 text-sm font-semibold text-white transition hover:bg-slate-800", + )["Save settings"], + ], + ], + ) + ), + ) diff --git a/repub/pages/shim.py b/repub/pages/shim.py index d7bf552..1b8723f 100644 --- a/repub/pages/shim.py +++ b/repub/pages/shim.py @@ -37,7 +37,11 @@ def shim_page( id="morph", class_="min-h-screen lg:grid lg:grid-cols-[18rem_minmax(0,1fr)]", )[ - admin_sidebar(current_path=current_path), + admin_sidebar( + current_path=current_path, + source_count=0, + running_count=0, + ), h.div(class_="px-4 py-4 sm:px-5 lg:px-6 lg:py-5")[ h.div(class_="mx-auto max-w-7xl space-y-5")[ h.section[ diff --git a/repub/pages/sources.py b/repub/pages/sources.py index de95f2d..ad0c93a 100644 --- a/repub/pages/sources.py +++ b/repub/pages/sources.py @@ -128,13 +128,18 @@ def sources_table( def sources_page( - *, sources: tuple[Mapping[str, object], ...] | None = None + *, + sources: tuple[Mapping[str, object], ...] | None = None, + running_count: int = 0, ) -> Renderable: + source_items = sources or () return page_shell( current_path="/sources", eyebrow="Source management", title="Sources", - content=sources_table(sources=sources), + source_count=len(source_items), + running_count=running_count, + content=sources_table(sources=source_items), ) @@ -398,6 +403,18 @@ def source_form( signal_name="jobEnabled", checked=_checked(source, "enabled", True), ), + toggle_field( + label="Convert images", + description="Normalize mirrored images through the image conversion pipeline for this source.", + signal_name="convertImages", + checked=_checked(source, "convert_images", True), + ), + toggle_field( + label="Convert video", + description="Run mirrored videos through the video conversion pipeline for this source.", + signal_name="convertVideo", + checked=_checked(source, "convert_video", True), + ), ], ], ], @@ -415,7 +432,12 @@ def source_form( ) -def create_source_page(*, action_path: str = "/actions/sources/create") -> Renderable: +def create_source_page( + *, + action_path: str = "/actions/sources/create", + source_count: int = 0, + running_count: int = 0, +) -> Renderable: actions = ( muted_action_link(href="/sources", label="Back to sources"), header_action_link(href="/runs", label="View runs"), @@ -425,6 +447,8 @@ def create_source_page(*, action_path: str = "/actions/sources/create") -> Rende eyebrow="Source creation", title="Create source", actions=actions, + source_count=source_count, + running_count=running_count, content=source_form(mode="create", action_path=action_path), ) @@ -434,6 +458,8 @@ def edit_source_page( slug: str, source: Mapping[str, object], action_path: str, + source_count: int = 0, + running_count: int = 0, ) -> Renderable: actions = ( muted_action_link(href="/sources", label="Back to sources"), @@ -444,5 +470,7 @@ def edit_source_page( eyebrow="Source editing", title="Edit source", actions=actions, + source_count=source_count, + running_count=running_count, content=source_form(mode="edit", action_path=action_path, source=source), ) diff --git a/repub/web.py b/repub/web.py index d0b2247..ae8b832 100644 --- a/repub/web.py +++ b/repub/web.py @@ -28,8 +28,10 @@ from repub.model import ( delete_job_source, delete_source, initialize_database, + load_settings_form, load_source_form, load_sources, + save_setting, source_slug_exists, update_source, ) @@ -39,6 +41,7 @@ from repub.pages import ( edit_source_page, execution_logs_page, runs_page, + settings_page, shim_page, sources_page, ) @@ -59,6 +62,8 @@ class SourceFormData(TypedDict): notes: str spider_arguments: str enabled: bool + convert_images: bool + convert_video: bool cron_minute: str cron_hour: str cron_day_of_month: str @@ -77,6 +82,10 @@ class SourceFormData(TypedDict): include_content: bool +class SettingsFormData(TypedDict): + max_concurrent_jobs: int + + DEFAULT_PANGEA_CONTENT_FORMAT = "MOBILE_3" DEFAULT_PANGEA_CONTENT_TYPE = "articles" DEFAULT_PANGEA_MAX_ARTICLES = "10" @@ -123,6 +132,7 @@ def create_app(*, dev_mode: bool = False) -> Quart: @app.get("/sources/create") @app.get("/sources//edit") @app.get("/runs") + @app.get("/settings") @app.get("/job//execution//logs") async def page_shim( slug: str | None = None, @@ -158,7 +168,11 @@ def create_app(*, dev_mode: bool = False) -> Quart: @app.post("/sources//edit") async def edit_source_patch(slug: str) -> DatastarResponse: - return _page_patch_response(app, lambda: render_edit_source(slug)) + return _page_patch_response(app, lambda: render_edit_source(slug, app)) + + @app.post("/settings") + async def settings_patch() -> DatastarResponse: + return _page_patch_response(app, lambda: render_settings(app)) @app.post("/actions/sources/create") async def create_source_action() -> DatastarResponse: @@ -217,6 +231,20 @@ def create_app(*, dev_mode: bool = False) -> Quart: trigger_refresh(app) return Response(status=204) + @app.post("/actions/settings") + async def update_settings_action() -> DatastarResponse: + signals = cast(dict[str, object], await read_signals()) + settings, error = validate_settings_form(signals) + if error is not None: + return DatastarResponse( + SSE.patch_signals({"_formError": error, "_formSuccess": ""}) + ) + + assert settings is not None + save_setting("max_concurrent_jobs", settings["max_concurrent_jobs"]) + trigger_refresh(app) + return DatastarResponse(SSE.redirect("/settings")) + @app.post("/runs") async def runs_patch() -> DatastarResponse: return _page_patch_response(app, lambda: render_runs(app)) @@ -300,16 +328,30 @@ async def render_dashboard(app: Quart | None = None) -> Renderable: async def render_sources(app: Quart | None = None) -> Renderable: - sources = None if app is None else load_sources() - return sources_page(sources=sources) + if app is None: + return sources_page() + + sources = load_sources() + return sources_page( + sources=sources, + running_count=len( + load_runs_view(log_dir=app.config["REPUB_LOG_DIR"])["running"] + ), + ) async def render_create_source(app: Quart | None = None) -> Renderable: - del app - return create_source_page() + if app is None: + return create_source_page() + + sidebar_counts = _load_sidebar_counts(app) + return create_source_page( + source_count=sidebar_counts["source_count"], + running_count=sidebar_counts["running_count"], + ) -async def render_edit_source(slug: str) -> Renderable: +async def render_edit_source(slug: str, app: Quart | None = None) -> Renderable: source = load_source_form(slug) if source is None: return sources_page(sources=()) @@ -317,6 +359,7 @@ async def render_edit_source(slug: str) -> Renderable: slug=slug, source=source, action_path=f"/actions/sources/{slug}/edit", + **({} if app is None else _load_sidebar_counts(app)), ) @@ -329,6 +372,18 @@ async def render_runs(app: Quart | None = None) -> Renderable: running_executions=cast(tuple[dict[str, object], ...], view["running"]), upcoming_jobs=cast(tuple[dict[str, object], ...], view["upcoming"]), completed_executions=cast(tuple[dict[str, object], ...], view["completed"]), + source_count=len(load_sources()), + ) + + +async def render_settings(app: Quart | None = None) -> Renderable: + if app is None: + return settings_page(settings=load_settings_form()) + sidebar_counts = _load_sidebar_counts(app) + return settings_page( + settings=load_settings_form(), + source_count=sidebar_counts["source_count"], + running_count=sidebar_counts["running_count"], ) @@ -377,6 +432,15 @@ async def _unsubscribe_on_close( get_refresh_broker(app).unsubscribe(cast(asyncio.Queue[object], queue)) +def _load_sidebar_counts(app: Quart) -> dict[str, int]: + return { + "source_count": len(load_sources()), + "running_count": len( + load_runs_view(log_dir=app.config["REPUB_LOG_DIR"])["running"] + ), + } + + def validate_source_form( signals: dict[str, object] | None, *, @@ -469,6 +533,8 @@ def validate_source_form( "max_articles": _parse_int(max_articles), "oldest_article": _parse_int(oldest_article), "enabled": enabled, + "convert_images": _read_bool(signals, "convertImages", default=True), + "convert_video": _read_bool(signals, "convertVideo", default=True), "only_newest": _read_bool(signals, "onlyNewest", default=True), "include_authors": _read_bool(signals, "includeAuthors", default=True), "exclude_media": _read_bool(signals, "excludeMedia", default=False), @@ -482,6 +548,20 @@ def validate_source_form( return source, None +def validate_settings_form( + signals: dict[str, object] | None, +) -> tuple[SettingsFormData | None, str | None]: + if signals is None: + return None, "Missing form data." + + max_concurrent_jobs = _parse_int(_read_string(signals, "maxConcurrentJobs")) + if max_concurrent_jobs is None: + return None, "Max concurrent jobs must be an integer." + if max_concurrent_jobs < 1: + return None, "Max concurrent jobs must be at least 1." + return {"max_concurrent_jobs": max_concurrent_jobs}, None + + def _read_string(signals: dict[str, object], key: str, *, strip: bool = True) -> str: value = str(signals.get(key, "")) return value.strip() if strip else value diff --git a/tests/test_config.py b/tests/test_config.py index 34da4ea..f7430ba 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -195,3 +195,35 @@ def test_build_feed_settings_uses_runtime_media_dir_overrides(tmp_path: Path) -> assert feed_settings["AUDIO_STORE"] == str( out_dir / "feeds" / "gp-pod" / "audio-custom" ) + + +def test_build_feed_settings_can_disable_image_and_video_conversion( + tmp_path: Path, +) -> None: + out_dir = (tmp_path / "mirror").resolve() + config = RepublisherConfig( + config_path=tmp_path / "repub.toml", + out_dir=out_dir, + feeds=( + FeedConfig( + name="Guardian Project Podcast", + slug="gp-pod", + url="https://guardianproject.info/podcast/podcast.xml", + ), + ), + scrapy_settings={}, + ) + + base_settings = build_base_settings(config) + feed_settings = build_feed_settings( + base_settings, + out_dir=out_dir, + feed_slug="gp-pod", + convert_images=False, + convert_video=False, + ) + + assert "repub.pipelines.ImagePipeline" not in feed_settings["ITEM_PIPELINES"] + assert "repub.pipelines.VideoPipeline" not in feed_settings["ITEM_PIPELINES"] + assert feed_settings["ITEM_PIPELINES"]["repub.pipelines.AudioPipeline"] == 2 + assert feed_settings["ITEM_PIPELINES"]["repub.pipelines.FilePipeline"] == 4 diff --git a/tests/test_model.py b/tests/test_model.py index 2df0b8f..1553354 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -7,11 +7,14 @@ import pytest from peewee import IntegrityError from repub.model import ( + AppSetting, Job, Source, database, initialize_database, + load_max_concurrent_jobs, resolve_database_path, + save_setting, ) @@ -51,6 +54,7 @@ def test_initialize_database_bootstraps_schema_from_sql_files(tmp_path: Path) -> ) } assert table_names == { + "app_setting", "job", "job_execution", "source", @@ -59,17 +63,10 @@ def test_initialize_database_bootstraps_schema_from_sql_files(tmp_path: Path) -> } defaults = { - row[1]: row[4] - for row in connection.execute("PRAGMA table_info('source_pangea')") + row[1]: row[4] for row in connection.execute("PRAGMA table_info('job')") } - assert defaults["content_type"] is None - assert defaults["only_newest"] is None - assert defaults["max_articles"] is None - assert defaults["oldest_article"] is None - assert defaults["include_authors"] is None - assert defaults["exclude_media"] is None - assert defaults["include_content"] is None - assert defaults["content_format"] is None + assert defaults["convert_images"] == "1" + assert defaults["convert_video"] == "1" finally: connection.close() @@ -168,3 +165,20 @@ def test_job_table_allows_exactly_one_job_per_source(tmp_path: Path) -> None: cron_day_of_week="*", cron_month="*", ) + + +def test_load_max_concurrent_jobs_defaults_to_one(tmp_path: Path) -> None: + initialize_database(tmp_path / "settings-defaults.db") + + assert load_max_concurrent_jobs() == 1 + + +def test_save_setting_persists_json_value(tmp_path: Path) -> None: + initialize_database(tmp_path / "settings-roundtrip.db") + + save_setting("max_concurrent_jobs", 4) + + row = AppSetting.get(AppSetting.key == "max_concurrent_jobs") + + assert row.value == "4" + assert load_max_concurrent_jobs() == 4 diff --git a/tests/test_scheduler_runtime.py b/tests/test_scheduler_runtime.py index 9fe81b4..2af4326 100644 --- a/tests/test_scheduler_runtime.py +++ b/tests/test_scheduler_runtime.py @@ -20,6 +20,7 @@ from repub.model import ( Source, create_source, initialize_database, + save_setting, ) from repub.web import create_app, get_job_runtime, render_execution_logs, render_runs @@ -137,6 +138,68 @@ def test_job_runtime_run_now_writes_log_and_stats_and_marks_success( runtime.shutdown() +def test_job_runtime_respects_max_concurrent_jobs_setting(tmp_path: Path) -> None: + db_path = tmp_path / "max-concurrency.db" + log_dir = tmp_path / "out" / "logs" + initialize_database(db_path) + save_setting("max_concurrent_jobs", 1) + + with _slow_feed_server() as feed_url: + first_source = create_source( + name="First source", + slug="first-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url=feed_url, + ) + second_source = create_source( + name="Second source", + slug="second-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url=feed_url, + ) + first_job = Job.get(Job.source == first_source) + second_job = Job.get(Job.source == second_source) + + runtime = JobRuntime(log_dir=log_dir) + try: + runtime.start() + first_execution_id = runtime.run_job_now(first_job.id, reason="manual") + + assert first_execution_id is not None + _wait_for_running_execution(first_execution_id) + + second_execution_id = runtime.run_job_now(second_job.id, reason="manual") + + assert second_execution_id is None + assert ( + JobExecution.select() + .where(JobExecution.running_status == JobExecutionStatus.RUNNING) + .count() + == 1 + ) + runtime.request_execution_cancel(first_execution_id) + finished_execution = _wait_for_terminal_execution(first_execution_id) + assert finished_execution.running_status == JobExecutionStatus.CANCELED + finally: + runtime.shutdown() + + def test_job_runtime_cancel_marks_execution_canceled(tmp_path: Path) -> None: initialize_database(tmp_path / "cancel.db") with _slow_feed_server() as feed_url: diff --git a/tests/test_web.py b/tests/test_web.py index 0e23ef2..70a5bb5 100644 --- a/tests/test_web.py +++ b/tests/test_web.py @@ -2,6 +2,7 @@ from __future__ import annotations import asyncio import os +import re from datetime import UTC, datetime, timedelta from pathlib import Path from typing import Any, cast @@ -17,6 +18,8 @@ from repub.model import ( SourceFeed, SourcePangea, create_source, + load_max_concurrent_jobs, + save_setting, ) from repub.pages.runs import runs_page from repub.web import ( @@ -27,6 +30,7 @@ from repub.web import ( render_edit_source, render_execution_logs, render_runs, + render_settings, render_sources, ) @@ -109,6 +113,7 @@ def test_root_get_serves_datastar_shim() -> None: assert '
None: asyncio.run(run()) +def test_render_sources_shows_live_sidebar_badges(monkeypatch, tmp_path: Path) -> None: + db_path = tmp_path / "sources-sidebar.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + app = create_app() + create_source( + name="First source", + slug="first-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="0", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/first.xml", + ) + create_source( + name="Second source", + slug="second-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="0", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/second.xml", + ) + + async def run() -> None: + body = str(await render_sources(app)) + + assert re.search( + r'href="/sources"[^>]*>.*?Sources\s*]*>2', + body, + re.S, + ) + assert re.search( + r'href="/runs"[^>]*>.*?Runs\s*]*>0', + body, + re.S, + ) + + asyncio.run(run()) + + +def test_render_dashboard_shows_live_sidebar_badges( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "dashboard-sidebar.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + app = create_app() + create_source( + name="Dashboard source", + slug="dashboard-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=True, + cron_minute="0", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/dashboard.xml", + ) + + async def run() -> None: + body = str(await render_dashboard(app)) + + assert re.search( + r'href="/sources"[^>]*>.*?Sources\s*]*>1', + body, + re.S, + ) + assert re.search( + r'href="/runs"[^>]*>.*?Runs\s*]*>0', + body, + re.S, + ) + + asyncio.run(run()) + + def test_render_sources_shows_delete_action_for_each_source( monkeypatch, tmp_path: Path ) -> None: @@ -476,6 +569,8 @@ def test_render_create_source_shows_dedicated_form_page() -> None: assert "includeAuthors" in body assert "excludeMedia" in body assert "includeContent" in body + assert "convertImages" in body + assert "convertVideo" in body assert "TEXT_ONLY" in body assert "breakingnews" in body assert "Pangea domain" in body @@ -512,6 +607,8 @@ def test_render_edit_source_shows_existing_values(monkeypatch, tmp_path: Path) - notes="Regional health alerts.", spider_arguments="language=en\ndownload_media=true", enabled=True, + convert_images=False, + convert_video=False, cron_minute="0", cron_hour="*/6", cron_day_of_month="*", @@ -546,6 +643,28 @@ def test_render_edit_source_shows_existing_values(monkeypatch, tmp_path: Path) - assert "example.org" in body assert "Health" in body assert "language=en\ndownload_media=true" in body + assert "convertImages: false" in body + assert "convertVideo: false" in body + + asyncio.run(run()) + + +def test_render_settings_shows_current_max_concurrent_jobs( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "settings-page.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + create_app() + save_setting("max_concurrent_jobs", 3) + + async def run() -> None: + app = create_app() + body = str(await render_settings(app)) + + assert ">Settings<" in body + assert "/actions/settings" in body + assert 'value="3"' in body + assert "Max concurrent jobs" in body asyncio.run(run()) @@ -602,6 +721,8 @@ def test_create_source_action_creates_pangea_source_and_job_in_database( assert pangea.content_type == "breakingnews" assert pangea.include_content is True assert job.enabled is True + assert job.convert_images is True + assert job.convert_video is True assert job.spider_arguments == "language=en\ndownload_media=true" assert job.cron_hour == "*/6" assert "kenya-health" in rendered_sources @@ -713,6 +834,8 @@ def test_edit_source_action_updates_existing_source_and_job_in_database( "cronDayOfWeek": "*", "cronMonth": "*", "jobEnabled": False, + "convertImages": False, + "convertVideo": False, "onlyNewest": False, "includeAuthors": False, "excludeMedia": True, @@ -737,6 +860,8 @@ def test_edit_source_action_updates_existing_source_and_job_in_database( assert pangea.include_authors is False assert pangea.exclude_media is True assert job.enabled is False + assert job.convert_images is False + assert job.convert_video is False assert job.spider_arguments == "language=sw\ninclude_audio=false" assert job.cron_hour == "2" assert "Kenya health desk nightly" in rendered_sources @@ -863,6 +988,55 @@ def test_create_source_action_validates_duplicate_slug_and_pangea_type( asyncio.run(run()) +def test_settings_action_updates_max_concurrent_jobs( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "settings-action.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + + async def run() -> None: + app = create_app() + client = app.test_client() + + response = await client.post( + "/actions/settings", + headers={"Datastar-Request": "true"}, + json={"maxConcurrentJobs": "3"}, + ) + body = await response.get_data(as_text=True) + + assert response.status_code == 200 + assert "window.location = '/settings'" in body + assert load_max_concurrent_jobs() == 3 + assert 'value="3"' in str(await render_settings(app)) + + asyncio.run(run()) + + +def test_settings_action_rejects_non_positive_max_concurrent_jobs( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "settings-invalid.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + + async def run() -> None: + app = create_app() + client = app.test_client() + + response = await client.post( + "/actions/settings", + headers={"Datastar-Request": "true"}, + json={"maxConcurrentJobs": "0"}, + ) + body = await response.get_data(as_text=True) + + assert response.status_code == 200 + assert "Max concurrent jobs must be at least 1." in body + assert load_max_concurrent_jobs() == 1 + + asyncio.run(run()) + + def test_render_runs_shows_running_upcoming_and_completed_tables( monkeypatch, tmp_path: Path ) -> None: