diff --git a/repub/job_runner.py b/repub/job_runner.py index 61abacb..9ad69c7 100644 --- a/repub/job_runner.py +++ b/repub/job_runner.py @@ -2,121 +2,464 @@ from __future__ import annotations import argparse import json -import random import signal import sys -import time +from dataclasses import dataclass from datetime import UTC, datetime from pathlib import Path +from typing import Any + +from pygea.config import LoggingConfig, PygeaConfig, ResultsConfig, RuntimeConfig +from scrapy.crawler import CrawlerProcess +from scrapy.statscollectors import StatsCollector +from twisted.python.failure import Failure + +from repub.config import ( + FeedConfig, + RepublisherConfig, + build_base_settings, + build_feed_settings, +) +from repub.crawl import prepare_output_dirs +from repub.model import ( + Job, + Source, + SourceFeed, + SourcePangea, + database, + initialize_database, +) +from repub.spiders.rss_spider import RssFeedSpider + + +def _json_default(value: Any) -> Any: + if isinstance(value, datetime): + if value.tzinfo is None: + return value.replace(tzinfo=UTC).isoformat() + return value.astimezone(UTC).isoformat() + return str(value) + + +def _normalized_stats(stats: dict[str, Any]) -> dict[str, Any]: + cache_store = int(stats.get("httpcache/store", 0)) + cache_hits = int(stats.get("httpcache/hit", 0)) + cache_misses = int(stats.get("httpcache/miss", 0)) + return { + **stats, + "requests_count": int(stats.get("downloader/request_count", 0)), + "items_count": int(stats.get("item_scraped_count", 0)), + "warnings_count": int(stats.get("log_count/WARNING", 0)), + "errors_count": int(stats.get("log_count/ERROR", 0)), + "bytes_count": int(stats.get("downloader/response_bytes", 0)), + "retries_count": int(stats.get("retry/count", 0)), + "exceptions_count": int(stats.get("spider_exceptions/count", 0)), + "cache_size_count": cache_store, + "cache_object_count": cache_store + cache_hits + cache_misses, + } + + +class ExecutionStatsCollector(StatsCollector): + def __init__(self, crawler: Any): + super().__init__(crawler) + self._stats_path = Path(crawler.settings["REPUB_JOB_STATS_PATH"]) + self._stats_path.parent.mkdir(parents=True, exist_ok=True) + + def set_value(self, key: str, value: Any, spider: Any | None = None) -> None: + super().set_value(key, value, spider) + self._write_snapshot() + + def set_stats(self, stats: dict[str, Any], spider: Any | None = None) -> None: + super().set_stats(stats, spider) + self._write_snapshot() + + def inc_value( + self, + key: str, + count: int = 1, + start: int = 0, + spider: Any | None = None, + ) -> None: + super().inc_value(key, count, start, spider) + self._write_snapshot() + + def max_value(self, key: str, value: Any, spider: Any | None = None) -> None: + super().max_value(key, value, spider) + self._write_snapshot() + + def min_value(self, key: str, value: Any, spider: Any | None = None) -> None: + super().min_value(key, value, spider) + self._write_snapshot() + + def clear_stats(self, spider: Any | None = None) -> None: + super().clear_stats(spider) + self._write_snapshot() + + def open_spider(self, spider: Any | None = None) -> None: + super().open_spider(spider) + self._write_snapshot() + + def _persist_stats(self, stats: dict[str, Any]) -> None: + self._write_snapshot(stats) + + def _write_snapshot(self, stats: dict[str, Any] | None = None) -> None: + payload = { + "timestamp": datetime.now(UTC).isoformat(), + **_normalized_stats(self._stats if stats is None else stats), + } + with self._stats_path.open("a", encoding="utf-8") as handle: + handle.write(json.dumps(payload, sort_keys=True, default=_json_default)) + handle.write("\n") + + +def pangea_feed_class(): + from pygea.pangeafeed import PangeaFeed + + return PangeaFeed + + +def generate_pangea_feed( + *, + name: str, + slug: str, + domain: str, + category_name: str, + content_type: str, + only_newest: bool, + max_articles: int, + oldest_article: int, + include_authors: bool, + exclude_media: bool, + include_content: bool, + content_format: str, + out_dir: str | Path, + log_path: str | Path, +) -> Path: + resolved_out_dir = Path(out_dir).resolve() + resolved_log_path = Path(log_path).resolve() + config = PygeaConfig( + config_path=resolved_out_dir / "pygea-runtime.toml", + domain=domain, + default_content_type=content_type, + feeds=( + { + "name": category_name, + "slug": slug, + "only_newest": only_newest, + "content_type": content_type, + }, + ), + runtime=RuntimeConfig( + api_key=None, + max_articles=max_articles, + oldest_article=oldest_article, + authors_p=include_authors, + no_media_p=exclude_media, + content_inc_p=include_content, + content_format=content_format, + verbose_p=True, + ), + results=ResultsConfig( + output_to_file_p=True, + output_file_name="rss.xml", + output_directory=resolved_out_dir, + ), + logging=LoggingConfig( + log_file=resolved_log_path, + default_log_level="INFO", + ), + ) + feed_class = pangea_feed_class() + feed = feed_class(config, list(config.feeds)) + feed.acquire_content() + feed.generate_feed() + output_path = feed.disgorge(slug) + if output_path is None: + raise RuntimeError(f"pygea did not write an output file for {name!r}") + return output_path.resolve() + + +@dataclass(frozen=True) +class JobSourceConfig: + source_name: str + source_slug: str + source_type: str + spider_arguments: dict[str, str] + feed_url: str | None = None + pangea_domain: str | None = None + pangea_category: str | None = None + content_type: str | None = None + only_newest: bool = True + max_articles: int = 10 + oldest_article: int = 3 + include_authors: bool = True + exclude_media: bool = False + include_content: bool = True + content_format: str = "MOBILE_3" def parse_args(argv: list[str] | None = None) -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Simulated republisher worker") + parser = argparse.ArgumentParser(description="Run a republisher job worker") parser.add_argument("--job-id", type=int, required=True) parser.add_argument("--execution-id", type=int, required=True) + parser.add_argument("--db-path", required=True) + parser.add_argument("--out-dir", required=True) parser.add_argument("--stats-path", required=True) - parser.add_argument("--duration-seconds", type=float, required=True) - parser.add_argument("--interval-seconds", type=float, required=True) - parser.add_argument("--failure-probability", type=float, required=True) return parser.parse_args(argv) def main(argv: list[str] | None = None) -> int: args = parse_args(argv) - rng = random.Random(f"{args.job_id}:{args.execution_id}") - stats_path = Path(args.stats_path) - stats_path.parent.mkdir(parents=True, exist_ok=True) stop_requested = False + process: CrawlerProcess | None = None def request_stop(signum: int, frame: object | None) -> None: del signum, frame nonlocal stop_requested - if stop_requested: - return stop_requested = True print( f"worker[{args.job_id}:{args.execution_id}]: graceful stop requested", flush=True, ) + if process is None: + return + try: + from twisted.internet import reactor + + call_from_thread = getattr(reactor, "callFromThread", None) + if callable(call_from_thread): + call_from_thread(process.stop) + else: + process.stop() + except Exception as error: + print( + f"worker[{args.job_id}:{args.execution_id}]: failed to stop reactor gracefully: {error}", + flush=True, + ) signal.signal(signal.SIGTERM, request_stop) signal.signal(signal.SIGINT, request_stop) - counters = { - "requests_count": 0, - "items_count": 0, - "warnings_count": 0, - "errors_count": 0, - "bytes_count": 0, - "retries_count": 0, - "exceptions_count": 0, - "cache_size_count": 0, - "cache_object_count": 0, - } + try: + source_config = _load_job_source_config( + db_path=args.db_path, job_id=args.job_id + ) + except Exception as error: + print( + f"worker[{args.job_id}:{args.execution_id}]: failed to load job config: {error}", + flush=True, + ) + return 1 + out_dir = Path(args.out_dir).resolve() + stats_path = Path(args.stats_path).resolve() + log_path = stats_path.with_suffix(".log") + + try: + feed = _resolve_feed( + source_config=source_config, + out_dir=out_dir, + log_path=log_path, + ) + process = CrawlerProcess( + _build_crawl_settings( + out_dir=out_dir, + feed=feed, + stats_path=stats_path, + ) + ) + print( + f"worker[{args.job_id}:{args.execution_id}]: starting crawl for {source_config.source_slug}", + flush=True, + ) + exit_code = _run_crawl( + process=process, + feed=feed, + spider_arguments=source_config.spider_arguments, + ) + except Exception as error: + print( + f"worker[{args.job_id}:{args.execution_id}]: crawl failed: {error}", + flush=True, + ) + return 1 + + if stop_requested: + print( + f"worker[{args.job_id}:{args.execution_id}]: stopping after graceful request", + flush=True, + ) + return 130 + + if exit_code == 0: + print( + f"worker[{args.job_id}:{args.execution_id}]: completed successfully", + flush=True, + ) + return exit_code + + +def _load_job_source_config(*, db_path: str, job_id: int) -> JobSourceConfig: + initialize_database(db_path) + primary_key = getattr(Job, "_meta").primary_key + with database.connection_context(): + job = ( + Job.select(Job, Source) + .join(Source) + .where(primary_key == job_id) + .get_or_none() + ) + if job is None: + raise ValueError(f"job {job_id} does not exist") + + source = job.source + spider_arguments = _parse_spider_arguments(job.spider_arguments) + if source.source_type == "feed": + feed = SourceFeed.get_or_none(SourceFeed.source == source) + if feed is None: + raise ValueError( + f"feed source {source.slug!r} is missing its feed config" + ) + return JobSourceConfig( + source_name=source.name, + source_slug=source.slug, + source_type=source.source_type, + spider_arguments=spider_arguments, + feed_url=feed.feed_url, + ) + + pangea = SourcePangea.get_or_none(SourcePangea.source == source) + if pangea is None: + raise ValueError( + f"pangea source {source.slug!r} is missing its pangea config" + ) + return JobSourceConfig( + source_name=source.name, + source_slug=source.slug, + source_type=source.source_type, + spider_arguments=spider_arguments, + pangea_domain=pangea.domain, + pangea_category=pangea.category_name, + content_type=pangea.content_type, + only_newest=bool(pangea.only_newest), + max_articles=int(pangea.max_articles), + oldest_article=int(pangea.oldest_article), + include_authors=bool(pangea.include_authors), + exclude_media=bool(pangea.exclude_media), + include_content=bool(pangea.include_content), + content_format=pangea.content_format, + ) + + +def _parse_spider_arguments(raw_value: str) -> dict[str, str]: + arguments: dict[str, str] = {} + for raw_line in raw_value.splitlines(): + line = raw_line.strip() + if line == "": + continue + key, separator, value = line.partition("=") + key = key.strip() + if separator == "" or key == "": + raise ValueError( + f"invalid spider argument {raw_line!r}; expected key=value" + ) + arguments[key] = value + return arguments + + +def _resolve_feed( + *, + source_config: JobSourceConfig, + out_dir: Path, + log_path: Path, +) -> FeedConfig: + if source_config.source_type == "feed": + assert source_config.feed_url is not None + return FeedConfig( + name=source_config.source_name, + slug=source_config.source_slug, + url=source_config.feed_url, + ) + + generated_feed_path = generate_pangea_feed( + name=source_config.source_name, + slug=source_config.source_slug, + domain=_require_value(source_config.pangea_domain, "pangea_domain"), + category_name=_require_value(source_config.pangea_category, "pangea_category"), + content_type=_require_value(source_config.content_type, "content_type"), + only_newest=source_config.only_newest, + max_articles=source_config.max_articles, + oldest_article=source_config.oldest_article, + include_authors=source_config.include_authors, + exclude_media=source_config.exclude_media, + include_content=source_config.include_content, + content_format=source_config.content_format, + out_dir=out_dir, + log_path=log_path.with_suffix(".pygea.log"), + ) print( - f"worker[{args.job_id}:{args.execution_id}]: starting simulated crawl", + f"pygea: generated intermediate feed at {generated_feed_path}", flush=True, ) - started = time.monotonic() - iteration = 0 - with stats_path.open("a", encoding="utf-8") as stats_file: - while time.monotonic() - started < args.duration_seconds: - time.sleep(args.interval_seconds) - iteration += 1 - counters["requests_count"] += rng.randint(1, 5) - counters["items_count"] += rng.randint(0, 2) - counters["bytes_count"] += rng.randint(500, 3000) - counters["cache_size_count"] += rng.randint(0, 1) - counters["cache_object_count"] += rng.randint(0, 2) - if rng.random() < 0.1: - counters["warnings_count"] += 1 - if rng.random() < 0.05: - counters["retries_count"] += 1 - - snapshot = { - "timestamp": datetime.now(UTC).isoformat(), - "iteration": iteration, - **counters, - } - stats_file.write(json.dumps(snapshot, sort_keys=True) + "\n") - stats_file.flush() - print( - "stats: " - f"requests={counters['requests_count']} " - f"items={counters['items_count']} " - f"bytes={counters['bytes_count']}", - flush=True, - ) - if stop_requested: - print( - f"worker[{args.job_id}:{args.execution_id}]: stopping after graceful request", - flush=True, - ) - return 130 - - if rng.random() < args.failure_probability: - counters["errors_count"] += 1 - counters["exceptions_count"] += 1 - stats_file.write( - json.dumps( - {"timestamp": datetime.now(UTC).isoformat(), **counters}, - sort_keys=True, - ) - + "\n" - ) - stats_file.flush() - print( - f"worker[{args.job_id}:{args.execution_id}]: simulated failure", - flush=True, - ) - return 1 - - print( - f"worker[{args.job_id}:{args.execution_id}]: completed successfully", - flush=True, + return FeedConfig( + name=source_config.source_name, + slug=source_config.source_slug, + url=generated_feed_path.as_uri(), ) - return 0 + + +def _build_crawl_settings(*, out_dir: Path, feed: FeedConfig, stats_path: Path): + base_settings = build_base_settings( + RepublisherConfig( + config_path=out_dir / "job-runner.toml", + out_dir=out_dir, + feeds=(feed,), + scrapy_settings={}, + ) + ) + prepare_output_dirs(out_dir, feed.slug) + settings = build_feed_settings(base_settings, out_dir=out_dir, feed_slug=feed.slug) + settings.set("LOG_FILE", None, priority="cmdline") + settings.set( + "STATS_CLASS", + "repub.job_runner.ExecutionStatsCollector", + priority="cmdline", + ) + settings.set("REPUB_JOB_STATS_PATH", str(stats_path), priority="cmdline") + return settings + + +def _run_crawl( + *, + process: CrawlerProcess, + feed: FeedConfig, + spider_arguments: dict[str, str], +) -> int: + results: list[Failure | None] = [] + deferred = process.crawl( + RssFeedSpider, + feed_name=feed.slug, + url=feed.url, + **spider_arguments, + ) + + def handle_success(_: object) -> None: + results.append(None) + return None + + def handle_error(failure: Failure) -> None: + print(failure.getTraceback(), flush=True) + results.append(failure) + return None + + deferred.addCallbacks(handle_success, handle_error) + process.start() + return 1 if any(result is not None for result in results) else 0 + + +def _require_value(value: str | None, field_name: str) -> str: + if value is None or value == "": + raise ValueError(f"missing {field_name}") + return value if __name__ == "__main__": diff --git a/repub/jobs.py b/repub/jobs.py index eeb4494..3ccec78 100644 --- a/repub/jobs.py +++ b/repub/jobs.py @@ -188,14 +188,12 @@ class JobRuntime: str(job_id), "--execution-id", str(execution_id), + "--db-path", + str(database.database), + "--out-dir", + str(self.log_dir.parent), "--stats-path", str(artifacts.stats_path), - "--duration-seconds", - str(self.worker_duration_seconds), - "--interval-seconds", - str(self.worker_stats_interval_seconds), - "--failure-probability", - str(self.worker_failure_probability), ], stdout=log_handle, stderr=subprocess.STDOUT, @@ -390,7 +388,7 @@ def load_runs_view( for job in jobs ), "completed": tuple( - _project_completed_execution(execution, resolved_log_dir) + _project_completed_execution(execution, resolved_log_dir, reference_time) for execution in completed_executions ), } @@ -401,6 +399,7 @@ def load_dashboard_view( ) -> dict[str, object]: reference_time = now or datetime.now(UTC) runs_view = load_runs_view(log_dir=log_dir, now=reference_time) + output_dir = Path(log_dir).parent with database.connection_context(): failed_last_day = ( JobExecution.select() @@ -414,7 +413,7 @@ def load_dashboard_view( upcoming_ready = sum( 1 for job in runs_view["upcoming"] if str(job["run_reason"]) == "Ready" ) - footprint_bytes = _directory_size(Path(log_dir)) + footprint_bytes = _directory_size(output_dir) return { "running": runs_view["running"], "snapshot": { @@ -538,7 +537,7 @@ def _project_upcoming_job( "slug": job.source.slug, "job_id": job_id, "next_run": ( - _humanize_future_time(reference_time, next_run) + _humanize_relative_time(reference_time, next_run) if next_run is not None else ("Running now" if running_execution is not None else "Not scheduled") ), @@ -565,7 +564,7 @@ def _project_upcoming_job( def _project_completed_execution( - execution: JobExecution, log_dir: Path + execution: JobExecution, log_dir: Path, reference_time: datetime ) -> dict[str, object]: job = cast(Job, execution.job) job_id = _job_id(job) @@ -573,18 +572,22 @@ def _project_completed_execution( artifacts = JobArtifacts.for_execution( log_dir=log_dir, job_id=job_id, execution_id=execution_id ) + ended_at = ( + _coerce_datetime(cast(datetime | str, execution.ended_at)) + if execution.ended_at is not None + else None + ) return { "source": job.source.name, "slug": job.source.slug, "job_id": job_id, "execution_id": execution_id, "ended_at": ( - _coerce_datetime(cast(datetime | str, execution.ended_at)).strftime( - "%Y-%m-%d %H:%M UTC" - ) - if execution.ended_at is not None + _humanize_relative_time(reference_time, ended_at) + if ended_at is not None else "Pending" ), + "ended_at_iso": ended_at.isoformat() if ended_at is not None else None, "status": _execution_status_label(execution), "status_tone": _execution_status_tone(execution), "stats": _stats_summary(execution), @@ -678,20 +681,25 @@ def _format_bytes(value: int) -> str: return f"{value / (1024 * 1024 * 1024):.1f} GB" -def _humanize_future_time(reference_time: datetime, target_time: datetime) -> str: +def _humanize_relative_time(reference_time: datetime, target_time: datetime) -> str: delta_seconds = int(round((target_time - reference_time).total_seconds())) - if delta_seconds <= 0: + if delta_seconds == 0: return "now" + absolute_delta_seconds = abs(delta_seconds) units = ( ("day", 24 * 60 * 60), ("hour", 60 * 60), ("minute", 60), ) for label, size in units: - if delta_seconds >= size: - count = max(1, round(delta_seconds / size)) + if absolute_delta_seconds >= size: + count = max(1, round(absolute_delta_seconds / size)) suffix = "" if count == 1 else "s" - return f"in {count} {label}{suffix}" + if delta_seconds > 0: + return f"in {count} {label}{suffix}" + return f"{count} {label}{suffix} ago" - return f"in {delta_seconds} seconds" + if delta_seconds > 0: + return f"in {absolute_delta_seconds} seconds" + return f"{absolute_delta_seconds} seconds ago" diff --git a/repub/media.py b/repub/media.py index b964d0a..53499cc 100644 --- a/repub/media.py +++ b/repub/media.py @@ -54,12 +54,25 @@ class VideoMeta(TypedDict): bit_rate: float +def _decode_ffmpeg_output(output: Any) -> str: + if isinstance(output, bytes): + return output.decode("utf-8", errors="replace") + return str(output) + + +def _print_ffmpeg_error_output(error: ffmpeg.Error) -> None: + if error.stderr: + print(_decode_ffmpeg_output(error.stderr), file=sys.stderr) + if error.stdout: + print(_decode_ffmpeg_output(error.stdout)) + + def probe_media(file_path) -> Dict[str, Any]: """Probes `file_path` using ffmpeg's ffprobe and returns the data.""" try: return ffmpeg.probe(file_path) except ffmpeg.Error as e: - print(e.stderr, file=sys.stderr) + _print_ffmpeg_error_output(e) logger.error(f"Failed to probe io {file_path}") logger.error(e) raise RuntimeError(f"Failed to probe io {file_path}") from e @@ -217,7 +230,7 @@ def transcode_audio(input_file: str, output_dir: str, params: Dict[str, str]) -> **params, loglevel="quiet", ) - .run() + .run(capture_stdout=True, capture_stderr=True) ) before = os.path.getsize(input_file) / 1024 after = os.path.getsize(output_file) / 1024 @@ -229,8 +242,7 @@ def transcode_audio(input_file: str, output_dir: str, params: Dict[str, str]) -> ) return output_file except ffmpeg.Error as e: - print(e.stderr, file=sys.stderr) - print(e.stdout) + _print_ffmpeg_error_output(e) logger.error(e) raise RuntimeError(f"Failed to compress audio {input_file}") from e @@ -310,7 +322,7 @@ def transcode_video(input_file: str, output_dir: str, params: Dict[str, Any]) -> **params, # loglevel="quiet", ) - .run() + .run(capture_stdout=True, capture_stderr=True) ) else: passes = params["passes"] @@ -323,16 +335,18 @@ def transcode_video(input_file: str, output_dir: str, params: Dict[str, Any]) -> "-stats" ) logger.info("Running pass #1") - std_out, std_err = ffoutput.run(capture_stdout=True) - print(std_out) - print(std_err) + ffoutput.run(capture_stdout=True, capture_stderr=True) logger.info("Running pass #2") ffoutput = ffinput.output(video, audio, output_file, **passes[1]) ffoutput = ffoutput.global_args( # "-loglevel", "quiet", "-stats" ) - ffoutput.run(overwrite_output=True) + ffoutput.run( + capture_stdout=True, + capture_stderr=True, + overwrite_output=True, + ) before = os.path.getsize(input_file) / 1024 after = os.path.getsize(output_file) / 1024 @@ -344,7 +358,7 @@ def transcode_video(input_file: str, output_dir: str, params: Dict[str, Any]) -> ) return output_file except ffmpeg.Error as e: - print(e.stderr, file=sys.stderr) + _print_ffmpeg_error_output(e) logger.error("Failed to transcode") logger.error(e) raise RuntimeError(f"Failed to transcode video: {e.stderr.decode()}") from e diff --git a/repub/pages/dashboard.py b/repub/pages/dashboard.py index 67b93a8..e58ffd1 100644 --- a/repub/pages/dashboard.py +++ b/repub/pages/dashboard.py @@ -118,7 +118,7 @@ def operational_snapshot(*, snapshot: Mapping[str, str] | None = None) -> Render stat_card( label="Artifact footprint", value=values["artifact_footprint"], - detail="Current log and stats artifact size under out/logs.", + detail="Current artifact size under the output path.", ), ], ] diff --git a/repub/pages/runs.py b/repub/pages/runs.py index 94acee7..c4d2eda 100644 --- a/repub/pages/runs.py +++ b/repub/pages/runs.py @@ -144,6 +144,20 @@ def _upcoming_row(job: Mapping[str, object]) -> tuple[Node, ...]: def _completed_row(execution: Mapping[str, object]) -> tuple[Node, ...]: + ended_at = _maybe_text(execution, "ended_at_iso") + ended_at_label: Node = h.p(class_="font-medium text-slate-900")[ + _text(execution, "ended_at") + ] + if ended_at is not None: + ended_at_label = h.time( + { + "data-ended-at": ended_at, + "title": ended_at, + }, + datetime=ended_at, + class_="font-medium text-slate-900", + )[_text(execution, "ended_at")] + return ( h.div[ h.div(class_="font-semibold text-slate-950")[_text(execution, "source")], @@ -157,7 +171,7 @@ def _completed_row(execution: Mapping[str, object]) -> tuple[Node, ...]: ], ], h.div[ - h.p(class_="font-medium text-slate-900")[_text(execution, "ended_at")], + ended_at_label, h.p(class_="mt-1 text-xs text-slate-500")[_text(execution, "summary")], ], status_badge( @@ -262,10 +276,12 @@ window.repubFormatNextRuns = window.repubFormatNextRuns || (() => { return relativeFormatter.format(0, 'second'); }; const format = () => { - document.querySelectorAll('time[data-next-run-at]').forEach((element) => { - const nextRunAt = element.getAttribute('data-next-run-at'); - if (!nextRunAt) return; - const targetDate = new Date(nextRunAt); + document.querySelectorAll('time[data-next-run-at], time[data-ended-at]').forEach((element) => { + const relativeAt = + element.getAttribute('data-next-run-at') ?? + element.getAttribute('data-ended-at'); + if (!relativeAt) return; + const targetDate = new Date(relativeAt); if (Number.isNaN(targetDate.getTime())) return; element.textContent = formatRelative(targetDate); element.title = absoluteFormatter.format(targetDate); diff --git a/tests/test_pipelines.py b/tests/test_pipelines.py index 60485c5..e6904a6 100644 --- a/tests/test_pipelines.py +++ b/tests/test_pipelines.py @@ -1,8 +1,10 @@ +import sys from pathlib import Path from types import SimpleNamespace import pytest +from repub import media from repub.config import ( FeedConfig, RepublisherConfig, @@ -48,3 +50,141 @@ def test_pipeline_from_crawler_uses_configured_store( assert pipeline.settings is crawler.settings assert pipeline.store.basedir == crawler.settings[store_setting] + + +def test_transcode_audio_captures_ffmpeg_output(monkeypatch, tmp_path: Path) -> None: + input_file = tmp_path / "input.mp3" + input_file.write_bytes(b"12345") + output_dir = tmp_path / "audio-out" + output_dir.mkdir() + run_calls: list[dict[str, object]] = [] + + class FakeOutput: + def __init__(self, output_path: Path): + self.output_path = output_path + + def run(self, **kwargs): + run_calls.append(kwargs) + self.output_path.write_bytes(b"12") + return b"", b"" + + class FakeInput: + def output(self, output_file: str, **params): + del params + return FakeOutput(Path(output_file)) + + monkeypatch.setattr(media.ffmpeg, "input", lambda _: FakeInput()) + + result = media.transcode_audio( + str(input_file), + str(output_dir), + {"extension": "mp3", "acodec": "libmp3lame"}, + ) + + assert result == str(output_dir / "converted.mp3") + assert run_calls == [{"capture_stdout": True, "capture_stderr": True}] + + +def test_transcode_video_two_pass_does_not_print_ffmpeg_output( + monkeypatch, tmp_path: Path +) -> None: + input_file = tmp_path / "input.mp4" + input_file.write_bytes(b"12345") + output_dir = tmp_path / "video-out" + output_dir.mkdir() + run_calls: list[dict[str, object]] = [] + printed: list[tuple[tuple[object, ...], dict[str, object]]] = [] + + class FakeOutput: + def __init__(self, output_path: Path | None): + self.output_path = output_path + + def global_args(self, *args): + del args + return self + + def run(self, **kwargs): + run_calls.append(kwargs) + if self.output_path is not None: + self.output_path.write_bytes(b"12") + return b"pass-out", b"pass-err" + + class FakeInput: + video = object() + audio = object() + + def output(self, *args, **params): + del params + output_path = next( + ( + Path(arg) + for arg in args + if isinstance(arg, str) and arg.endswith(".mp4") + ), + None, + ) + return FakeOutput(output_path) + + monkeypatch.setattr(media.ffmpeg, "input", lambda _: FakeInput()) + monkeypatch.setattr( + "builtins.print", lambda *args, **kwargs: printed.append((args, kwargs)) + ) + + result = media.transcode_video( + str(input_file), + str(output_dir), + { + "extension": "mp4", + "passes": [ + {"f": "null"}, + {"c:v": "libx264"}, + ], + }, + ) + + assert result == str(output_dir / "converted.mp4") + assert run_calls == [ + {"capture_stdout": True, "capture_stderr": True}, + { + "capture_stdout": True, + "capture_stderr": True, + "overwrite_output": True, + }, + ] + assert printed == [] + + +def test_transcode_video_prints_ffmpeg_output_on_error( + monkeypatch, tmp_path: Path +) -> None: + input_file = tmp_path / "input.mp4" + input_file.write_bytes(b"12345") + output_dir = tmp_path / "video-out" + output_dir.mkdir() + printed: list[tuple[str, bool]] = [] + + class FakeOutput: + def run(self, **kwargs): + del kwargs + raise media.ffmpeg.Error("ffmpeg", b"video-stdout", b"video-stderr") + + class FakeInput: + def output(self, *args, **params): + del args, params + return FakeOutput() + + def fake_print(*args, **kwargs): + printed.append((str(args[0]), kwargs.get("file") is sys.stderr)) + + monkeypatch.setattr(media.ffmpeg, "input", lambda _: FakeInput()) + monkeypatch.setattr("builtins.print", fake_print) + + with pytest.raises(RuntimeError): + media.transcode_video( + str(input_file), + str(output_dir), + {"extension": "mp4", "c:v": "libx264"}, + ) + + assert ("video-stderr", True) in printed + assert ("video-stdout", False) in printed diff --git a/tests/test_scheduler_runtime.py b/tests/test_scheduler_runtime.py index a2059d4..30385e5 100644 --- a/tests/test_scheduler_runtime.py +++ b/tests/test_scheduler_runtime.py @@ -2,10 +2,15 @@ from __future__ import annotations import asyncio import json +import socketserver +import threading import time +from datetime import UTC, datetime, timedelta +from http.server import BaseHTTPRequestHandler from pathlib import Path -from repub.jobs import JobArtifacts, JobRuntime +from repub.job_runner import generate_pangea_feed +from repub.jobs import JobArtifacts, JobRuntime, load_runs_view from repub.model import ( Job, JobExecution, @@ -16,6 +21,10 @@ from repub.model import ( ) from repub.web import create_app, get_job_runtime, render_execution_logs, render_runs +FIXTURE_FEED_PATH = ( + Path(__file__).resolve().parents[1] / "demo" / "fixtures" / "local-feed.rss" +).resolve() + def test_job_runtime_syncs_enabled_jobs_into_apscheduler(tmp_path: Path) -> None: initialize_database(tmp_path / "scheduler.db") @@ -91,7 +100,7 @@ def test_job_runtime_run_now_writes_log_and_stats_and_marks_success( cron_day_of_month="*", cron_day_of_week="*", cron_month="*", - feed_url="https://example.com/manual.xml", + feed_url=FIXTURE_FEED_PATH.as_uri(), ) job = Job.get(Job.source == source) @@ -120,9 +129,11 @@ def test_job_runtime_run_now_writes_log_and_stats_and_marks_success( assert execution.bytes_count > 0 assert artifacts.log_path.exists() assert artifacts.stats_path.exists() - assert "starting simulated crawl" in artifacts.log_path.read_text( - encoding="utf-8" - ) + output_path = tmp_path / "out" / "manual-source.rss" + assert output_path.exists() + output_text = output_path.read_text(encoding="utf-8") + assert "Local Demo Feed" in output_text + assert "Local Demo Entry" in output_text stats_lines = [ json.loads(line) @@ -136,50 +147,51 @@ def test_job_runtime_run_now_writes_log_and_stats_and_marks_success( def test_job_runtime_cancel_marks_execution_canceled(tmp_path: Path) -> None: initialize_database(tmp_path / "cancel.db") - source = create_source( - name="Cancelable source", - slug="cancelable-source", - source_type="feed", - notes="", - spider_arguments="", - enabled=False, - cron_minute="*/5", - cron_hour="*", - cron_day_of_month="*", - cron_day_of_week="*", - cron_month="*", - feed_url="https://example.com/cancelable.xml", - ) - job = Job.get(Job.source == source) + with _slow_feed_server() as feed_url: + source = create_source( + name="Cancelable source", + slug="cancelable-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url=feed_url, + ) + job = Job.get(Job.source == source) - runtime = JobRuntime( - log_dir=tmp_path / "out" / "logs", - worker_duration_seconds=2.0, - worker_stats_interval_seconds=0.1, - worker_failure_probability=0.0, - ) - try: - runtime.start() - execution_id = runtime.run_job_now(job.id, reason="manual") - assert execution_id is not None - _wait_for_running_execution(execution_id) - - runtime.request_execution_cancel(execution_id) - execution = _wait_for_terminal_execution(execution_id) - artifacts = JobArtifacts.for_execution( + runtime = JobRuntime( log_dir=tmp_path / "out" / "logs", - job_id=job.id, - execution_id=execution_id, + worker_duration_seconds=2.0, + worker_stats_interval_seconds=0.1, + worker_failure_probability=0.0, ) + try: + runtime.start() + execution_id = runtime.run_job_now(job.id, reason="manual") + assert execution_id is not None + _wait_for_running_execution(execution_id) - assert execution.running_status == JobExecutionStatus.CANCELED - assert execution.ended_at is not None - assert execution.stop_requested_at is not None - assert "graceful stop requested" in artifacts.log_path.read_text( - encoding="utf-8" - ) - finally: - runtime.shutdown() + runtime.request_execution_cancel(execution_id) + execution = _wait_for_terminal_execution(execution_id) + artifacts = JobArtifacts.for_execution( + log_dir=tmp_path / "out" / "logs", + job_id=job.id, + execution_id=execution_id, + ) + + assert execution.running_status == JobExecutionStatus.CANCELED + assert execution.ended_at is not None + assert execution.stop_requested_at is not None + assert "graceful stop requested" in artifacts.log_path.read_text( + encoding="utf-8" + ) + finally: + runtime.shutdown() def test_job_runtime_start_reconciles_stale_running_execution(tmp_path: Path) -> None: @@ -234,6 +246,93 @@ def test_job_runtime_start_reconciles_stale_running_execution(tmp_path: Path) -> runtime.shutdown() +def test_generate_pangea_feed_writes_rss_file(monkeypatch, tmp_path: Path) -> None: + class StubPangeaFeed: + def __init__(self, config, feeds): + self.config = config + self.feed = feeds[0] + + def acquire_content(self) -> None: + return None + + def generate_feed(self) -> None: + return None + + def disgorge(self, slug: str): + output_path = self.config.results.output_directory / slug / "rss.xml" + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text( + "Pangea Fixture\n", + encoding="utf-8", + ) + return output_path + + monkeypatch.setattr( + "repub.job_runner.pangea_feed_class", + lambda: StubPangeaFeed, + ) + + output_path = generate_pangea_feed( + name="Pangea source", + slug="pangea-source", + domain="example.org", + category_name="News", + content_type="articles", + only_newest=True, + max_articles=10, + oldest_article=3, + include_authors=True, + exclude_media=False, + include_content=True, + content_format="MOBILE_3", + out_dir=tmp_path / "out", + log_path=tmp_path / "out" / "logs" / "pangea.log", + ) + + assert output_path == (tmp_path / "out" / "pangea-source" / "rss.xml") + assert output_path.exists() + assert "Pangea Fixture" in output_path.read_text(encoding="utf-8") + + +def test_load_runs_view_humanizes_completed_execution_end_time( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "runs-view.db" + log_dir = tmp_path / "out" / "logs" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + + app = create_app() + app.config["REPUB_LOG_DIR"] = log_dir + source = create_source( + name="Completed source", + slug="completed-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/completed.xml", + ) + job = Job.get(Job.source == source) + reference_time = datetime(2026, 1, 15, 12, 0, tzinfo=UTC) + ended_at = reference_time - timedelta(hours=2) + JobExecution.create( + job=job, + running_status=JobExecutionStatus.SUCCEEDED, + ended_at=ended_at, + ) + + view = load_runs_view(log_dir=app.config["REPUB_LOG_DIR"], now=reference_time) + completed = view["completed"][0] + + assert completed["ended_at"] == "2 hours ago" + assert completed["ended_at_iso"] == ended_at.isoformat() + + def test_render_runs_uses_database_backed_jobs_and_executions( monkeypatch, tmp_path: Path ) -> None: @@ -259,7 +358,7 @@ def test_render_runs_uses_database_backed_jobs_and_executions( cron_day_of_month="*", cron_day_of_week="*", cron_month="*", - feed_url="https://example.com/runs-page.xml", + feed_url=FIXTURE_FEED_PATH.as_uri(), ) job = Job.get(Job.source == source) runtime = get_job_runtime(app) @@ -396,3 +495,41 @@ def _wait_for_terminal_execution( return execution time.sleep(0.02) raise AssertionError(f"execution {execution_id} did not finish in time") + + +class _SlowFeedRequestHandler(BaseHTTPRequestHandler): + def do_GET(self) -> None: # noqa: N802 + time.sleep(2.0) + payload = FIXTURE_FEED_PATH.read_bytes() + self.send_response(200) + self.send_header("Content-Type", "application/rss+xml; charset=utf-8") + self.send_header("Content-Length", str(len(payload))) + self.end_headers() + self.wfile.write(payload) + + def log_message(self, format: str, *args: object) -> None: + del format, args + + +class _ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + allow_reuse_address = True + + +class _slow_feed_server: + def __enter__(self) -> str: + self._server = _ThreadedTCPServer(("127.0.0.1", 0), _SlowFeedRequestHandler) + self._thread = threading.Thread( + target=self._server.serve_forever, + kwargs={"poll_interval": 0.01}, + daemon=True, + ) + self._thread.start() + host = str(self._server.server_address[0]) + port = int(self._server.server_address[1]) + return f"http://{host}:{port}/slow-feed.rss" + + def __exit__(self, exc_type, exc, tb) -> None: + del exc_type, exc, tb + self._server.shutdown() + self._server.server_close() + self._thread.join(timeout=1) diff --git a/tests/test_web.py b/tests/test_web.py index cc7deb8..1486367 100644 --- a/tests/test_web.py +++ b/tests/test_web.py @@ -6,6 +6,7 @@ from typing import Any, cast from repub.components import status_badge from repub.datastar import RefreshBroker, render_sse_event, render_stream +from repub.jobs import load_dashboard_view from repub.model import ( Job, JobExecution, @@ -15,6 +16,7 @@ from repub.model import ( SourcePangea, create_source, ) +from repub.pages.runs import runs_page from repub.web import ( create_app, get_refresh_broker, @@ -34,6 +36,37 @@ def test_status_badge_uses_green_done_tone() -> None: assert "Succeeded" in badge +def test_runs_page_renders_completed_execution_end_time_as_relative_hoverable_time() -> ( + None +): + ended_at = "2026-01-15T10:00:00+00:00" + body = str( + runs_page( + completed_executions=( + { + "source": "Completed source", + "slug": "completed-source", + "job_id": 7, + "execution_id": 42, + "ended_at": "2 hours ago", + "ended_at_iso": ended_at, + "status": "Succeeded", + "status_tone": "done", + "stats": "1 requests • 1 items • 1 bytes", + "summary": "Worker exited successfully", + "log_href": "/job/7/execution/42/logs", + }, + ) + ) + ) + + assert "data-ended-at" in body + assert f'data-ended-at="{ended_at}"' in body + assert f'datetime="{ended_at}"' in body + assert f'title="{ended_at}"' in body + assert ">2 hours ago<" in body + + def test_root_get_serves_datastar_shim() -> None: async def run() -> None: client = create_app().test_client() @@ -179,6 +212,40 @@ def test_render_dashboard_shows_dashboard_information_architecture( asyncio.run(run()) +def test_load_dashboard_view_measures_log_artifact_path( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "dashboard-footprint.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + create_app() + out_dir = tmp_path / "out" + log_dir = out_dir / "logs" + cache_dir = out_dir / "httpcache" + log_dir.mkdir(parents=True) + cache_dir.mkdir(parents=True) + (log_dir / "run.log").write_bytes(b"x" * 1024) + (cache_dir / "cache.bin").write_bytes(b"y" * 2048) + + snapshot = load_dashboard_view(log_dir=log_dir)["snapshot"] + + assert cast(dict[str, str], snapshot)["artifact_footprint"] == "3.0 KB" + + +def test_render_dashboard_describes_log_artifact_footprint( + monkeypatch, tmp_path: Path +) -> None: + db_path = tmp_path / "dashboard-footprint-copy.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + + async def run() -> None: + app = create_app() + body = str(await render_dashboard(app)) + + assert "Current artifact size under the output path." in body + + asyncio.run(run()) + + def test_render_sources_shows_table_and_create_link() -> None: async def run() -> None: body = str(await render_sources())