implement scrapy + pygea job runner

This commit is contained in:
Abel Luck 2026-03-30 15:04:41 +02:00
parent 916968c579
commit 8af28c2f68
8 changed files with 888 additions and 163 deletions

View file

@ -2,121 +2,464 @@ from __future__ import annotations
import argparse
import json
import random
import signal
import sys
import time
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from pygea.config import LoggingConfig, PygeaConfig, ResultsConfig, RuntimeConfig
from scrapy.crawler import CrawlerProcess
from scrapy.statscollectors import StatsCollector
from twisted.python.failure import Failure
from repub.config import (
FeedConfig,
RepublisherConfig,
build_base_settings,
build_feed_settings,
)
from repub.crawl import prepare_output_dirs
from repub.model import (
Job,
Source,
SourceFeed,
SourcePangea,
database,
initialize_database,
)
from repub.spiders.rss_spider import RssFeedSpider
def _json_default(value: Any) -> Any:
if isinstance(value, datetime):
if value.tzinfo is None:
return value.replace(tzinfo=UTC).isoformat()
return value.astimezone(UTC).isoformat()
return str(value)
def _normalized_stats(stats: dict[str, Any]) -> dict[str, Any]:
cache_store = int(stats.get("httpcache/store", 0))
cache_hits = int(stats.get("httpcache/hit", 0))
cache_misses = int(stats.get("httpcache/miss", 0))
return {
**stats,
"requests_count": int(stats.get("downloader/request_count", 0)),
"items_count": int(stats.get("item_scraped_count", 0)),
"warnings_count": int(stats.get("log_count/WARNING", 0)),
"errors_count": int(stats.get("log_count/ERROR", 0)),
"bytes_count": int(stats.get("downloader/response_bytes", 0)),
"retries_count": int(stats.get("retry/count", 0)),
"exceptions_count": int(stats.get("spider_exceptions/count", 0)),
"cache_size_count": cache_store,
"cache_object_count": cache_store + cache_hits + cache_misses,
}
class ExecutionStatsCollector(StatsCollector):
def __init__(self, crawler: Any):
super().__init__(crawler)
self._stats_path = Path(crawler.settings["REPUB_JOB_STATS_PATH"])
self._stats_path.parent.mkdir(parents=True, exist_ok=True)
def set_value(self, key: str, value: Any, spider: Any | None = None) -> None:
super().set_value(key, value, spider)
self._write_snapshot()
def set_stats(self, stats: dict[str, Any], spider: Any | None = None) -> None:
super().set_stats(stats, spider)
self._write_snapshot()
def inc_value(
self,
key: str,
count: int = 1,
start: int = 0,
spider: Any | None = None,
) -> None:
super().inc_value(key, count, start, spider)
self._write_snapshot()
def max_value(self, key: str, value: Any, spider: Any | None = None) -> None:
super().max_value(key, value, spider)
self._write_snapshot()
def min_value(self, key: str, value: Any, spider: Any | None = None) -> None:
super().min_value(key, value, spider)
self._write_snapshot()
def clear_stats(self, spider: Any | None = None) -> None:
super().clear_stats(spider)
self._write_snapshot()
def open_spider(self, spider: Any | None = None) -> None:
super().open_spider(spider)
self._write_snapshot()
def _persist_stats(self, stats: dict[str, Any]) -> None:
self._write_snapshot(stats)
def _write_snapshot(self, stats: dict[str, Any] | None = None) -> None:
payload = {
"timestamp": datetime.now(UTC).isoformat(),
**_normalized_stats(self._stats if stats is None else stats),
}
with self._stats_path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(payload, sort_keys=True, default=_json_default))
handle.write("\n")
def pangea_feed_class():
from pygea.pangeafeed import PangeaFeed
return PangeaFeed
def generate_pangea_feed(
*,
name: str,
slug: str,
domain: str,
category_name: str,
content_type: str,
only_newest: bool,
max_articles: int,
oldest_article: int,
include_authors: bool,
exclude_media: bool,
include_content: bool,
content_format: str,
out_dir: str | Path,
log_path: str | Path,
) -> Path:
resolved_out_dir = Path(out_dir).resolve()
resolved_log_path = Path(log_path).resolve()
config = PygeaConfig(
config_path=resolved_out_dir / "pygea-runtime.toml",
domain=domain,
default_content_type=content_type,
feeds=(
{
"name": category_name,
"slug": slug,
"only_newest": only_newest,
"content_type": content_type,
},
),
runtime=RuntimeConfig(
api_key=None,
max_articles=max_articles,
oldest_article=oldest_article,
authors_p=include_authors,
no_media_p=exclude_media,
content_inc_p=include_content,
content_format=content_format,
verbose_p=True,
),
results=ResultsConfig(
output_to_file_p=True,
output_file_name="rss.xml",
output_directory=resolved_out_dir,
),
logging=LoggingConfig(
log_file=resolved_log_path,
default_log_level="INFO",
),
)
feed_class = pangea_feed_class()
feed = feed_class(config, list(config.feeds))
feed.acquire_content()
feed.generate_feed()
output_path = feed.disgorge(slug)
if output_path is None:
raise RuntimeError(f"pygea did not write an output file for {name!r}")
return output_path.resolve()
@dataclass(frozen=True)
class JobSourceConfig:
source_name: str
source_slug: str
source_type: str
spider_arguments: dict[str, str]
feed_url: str | None = None
pangea_domain: str | None = None
pangea_category: str | None = None
content_type: str | None = None
only_newest: bool = True
max_articles: int = 10
oldest_article: int = 3
include_authors: bool = True
exclude_media: bool = False
include_content: bool = True
content_format: str = "MOBILE_3"
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Simulated republisher worker")
parser = argparse.ArgumentParser(description="Run a republisher job worker")
parser.add_argument("--job-id", type=int, required=True)
parser.add_argument("--execution-id", type=int, required=True)
parser.add_argument("--db-path", required=True)
parser.add_argument("--out-dir", required=True)
parser.add_argument("--stats-path", required=True)
parser.add_argument("--duration-seconds", type=float, required=True)
parser.add_argument("--interval-seconds", type=float, required=True)
parser.add_argument("--failure-probability", type=float, required=True)
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
rng = random.Random(f"{args.job_id}:{args.execution_id}")
stats_path = Path(args.stats_path)
stats_path.parent.mkdir(parents=True, exist_ok=True)
stop_requested = False
process: CrawlerProcess | None = None
def request_stop(signum: int, frame: object | None) -> None:
del signum, frame
nonlocal stop_requested
if stop_requested:
return
stop_requested = True
print(
f"worker[{args.job_id}:{args.execution_id}]: graceful stop requested",
flush=True,
)
if process is None:
return
try:
from twisted.internet import reactor
call_from_thread = getattr(reactor, "callFromThread", None)
if callable(call_from_thread):
call_from_thread(process.stop)
else:
process.stop()
except Exception as error:
print(
f"worker[{args.job_id}:{args.execution_id}]: failed to stop reactor gracefully: {error}",
flush=True,
)
signal.signal(signal.SIGTERM, request_stop)
signal.signal(signal.SIGINT, request_stop)
counters = {
"requests_count": 0,
"items_count": 0,
"warnings_count": 0,
"errors_count": 0,
"bytes_count": 0,
"retries_count": 0,
"exceptions_count": 0,
"cache_size_count": 0,
"cache_object_count": 0,
}
try:
source_config = _load_job_source_config(
db_path=args.db_path, job_id=args.job_id
)
except Exception as error:
print(
f"worker[{args.job_id}:{args.execution_id}]: failed to load job config: {error}",
flush=True,
)
return 1
out_dir = Path(args.out_dir).resolve()
stats_path = Path(args.stats_path).resolve()
log_path = stats_path.with_suffix(".log")
try:
feed = _resolve_feed(
source_config=source_config,
out_dir=out_dir,
log_path=log_path,
)
process = CrawlerProcess(
_build_crawl_settings(
out_dir=out_dir,
feed=feed,
stats_path=stats_path,
)
)
print(
f"worker[{args.job_id}:{args.execution_id}]: starting crawl for {source_config.source_slug}",
flush=True,
)
exit_code = _run_crawl(
process=process,
feed=feed,
spider_arguments=source_config.spider_arguments,
)
except Exception as error:
print(
f"worker[{args.job_id}:{args.execution_id}]: crawl failed: {error}",
flush=True,
)
return 1
if stop_requested:
print(
f"worker[{args.job_id}:{args.execution_id}]: stopping after graceful request",
flush=True,
)
return 130
if exit_code == 0:
print(
f"worker[{args.job_id}:{args.execution_id}]: completed successfully",
flush=True,
)
return exit_code
def _load_job_source_config(*, db_path: str, job_id: int) -> JobSourceConfig:
initialize_database(db_path)
primary_key = getattr(Job, "_meta").primary_key
with database.connection_context():
job = (
Job.select(Job, Source)
.join(Source)
.where(primary_key == job_id)
.get_or_none()
)
if job is None:
raise ValueError(f"job {job_id} does not exist")
source = job.source
spider_arguments = _parse_spider_arguments(job.spider_arguments)
if source.source_type == "feed":
feed = SourceFeed.get_or_none(SourceFeed.source == source)
if feed is None:
raise ValueError(
f"feed source {source.slug!r} is missing its feed config"
)
return JobSourceConfig(
source_name=source.name,
source_slug=source.slug,
source_type=source.source_type,
spider_arguments=spider_arguments,
feed_url=feed.feed_url,
)
pangea = SourcePangea.get_or_none(SourcePangea.source == source)
if pangea is None:
raise ValueError(
f"pangea source {source.slug!r} is missing its pangea config"
)
return JobSourceConfig(
source_name=source.name,
source_slug=source.slug,
source_type=source.source_type,
spider_arguments=spider_arguments,
pangea_domain=pangea.domain,
pangea_category=pangea.category_name,
content_type=pangea.content_type,
only_newest=bool(pangea.only_newest),
max_articles=int(pangea.max_articles),
oldest_article=int(pangea.oldest_article),
include_authors=bool(pangea.include_authors),
exclude_media=bool(pangea.exclude_media),
include_content=bool(pangea.include_content),
content_format=pangea.content_format,
)
def _parse_spider_arguments(raw_value: str) -> dict[str, str]:
arguments: dict[str, str] = {}
for raw_line in raw_value.splitlines():
line = raw_line.strip()
if line == "":
continue
key, separator, value = line.partition("=")
key = key.strip()
if separator == "" or key == "":
raise ValueError(
f"invalid spider argument {raw_line!r}; expected key=value"
)
arguments[key] = value
return arguments
def _resolve_feed(
*,
source_config: JobSourceConfig,
out_dir: Path,
log_path: Path,
) -> FeedConfig:
if source_config.source_type == "feed":
assert source_config.feed_url is not None
return FeedConfig(
name=source_config.source_name,
slug=source_config.source_slug,
url=source_config.feed_url,
)
generated_feed_path = generate_pangea_feed(
name=source_config.source_name,
slug=source_config.source_slug,
domain=_require_value(source_config.pangea_domain, "pangea_domain"),
category_name=_require_value(source_config.pangea_category, "pangea_category"),
content_type=_require_value(source_config.content_type, "content_type"),
only_newest=source_config.only_newest,
max_articles=source_config.max_articles,
oldest_article=source_config.oldest_article,
include_authors=source_config.include_authors,
exclude_media=source_config.exclude_media,
include_content=source_config.include_content,
content_format=source_config.content_format,
out_dir=out_dir,
log_path=log_path.with_suffix(".pygea.log"),
)
print(
f"worker[{args.job_id}:{args.execution_id}]: starting simulated crawl",
f"pygea: generated intermediate feed at {generated_feed_path}",
flush=True,
)
started = time.monotonic()
iteration = 0
with stats_path.open("a", encoding="utf-8") as stats_file:
while time.monotonic() - started < args.duration_seconds:
time.sleep(args.interval_seconds)
iteration += 1
counters["requests_count"] += rng.randint(1, 5)
counters["items_count"] += rng.randint(0, 2)
counters["bytes_count"] += rng.randint(500, 3000)
counters["cache_size_count"] += rng.randint(0, 1)
counters["cache_object_count"] += rng.randint(0, 2)
if rng.random() < 0.1:
counters["warnings_count"] += 1
if rng.random() < 0.05:
counters["retries_count"] += 1
snapshot = {
"timestamp": datetime.now(UTC).isoformat(),
"iteration": iteration,
**counters,
}
stats_file.write(json.dumps(snapshot, sort_keys=True) + "\n")
stats_file.flush()
print(
"stats: "
f"requests={counters['requests_count']} "
f"items={counters['items_count']} "
f"bytes={counters['bytes_count']}",
flush=True,
)
if stop_requested:
print(
f"worker[{args.job_id}:{args.execution_id}]: stopping after graceful request",
flush=True,
)
return 130
if rng.random() < args.failure_probability:
counters["errors_count"] += 1
counters["exceptions_count"] += 1
stats_file.write(
json.dumps(
{"timestamp": datetime.now(UTC).isoformat(), **counters},
sort_keys=True,
)
+ "\n"
)
stats_file.flush()
print(
f"worker[{args.job_id}:{args.execution_id}]: simulated failure",
flush=True,
)
return 1
print(
f"worker[{args.job_id}:{args.execution_id}]: completed successfully",
flush=True,
return FeedConfig(
name=source_config.source_name,
slug=source_config.source_slug,
url=generated_feed_path.as_uri(),
)
return 0
def _build_crawl_settings(*, out_dir: Path, feed: FeedConfig, stats_path: Path):
base_settings = build_base_settings(
RepublisherConfig(
config_path=out_dir / "job-runner.toml",
out_dir=out_dir,
feeds=(feed,),
scrapy_settings={},
)
)
prepare_output_dirs(out_dir, feed.slug)
settings = build_feed_settings(base_settings, out_dir=out_dir, feed_slug=feed.slug)
settings.set("LOG_FILE", None, priority="cmdline")
settings.set(
"STATS_CLASS",
"repub.job_runner.ExecutionStatsCollector",
priority="cmdline",
)
settings.set("REPUB_JOB_STATS_PATH", str(stats_path), priority="cmdline")
return settings
def _run_crawl(
*,
process: CrawlerProcess,
feed: FeedConfig,
spider_arguments: dict[str, str],
) -> int:
results: list[Failure | None] = []
deferred = process.crawl(
RssFeedSpider,
feed_name=feed.slug,
url=feed.url,
**spider_arguments,
)
def handle_success(_: object) -> None:
results.append(None)
return None
def handle_error(failure: Failure) -> None:
print(failure.getTraceback(), flush=True)
results.append(failure)
return None
deferred.addCallbacks(handle_success, handle_error)
process.start()
return 1 if any(result is not None for result in results) else 0
def _require_value(value: str | None, field_name: str) -> str:
if value is None or value == "":
raise ValueError(f"missing {field_name}")
return value
if __name__ == "__main__":

View file

@ -188,14 +188,12 @@ class JobRuntime:
str(job_id),
"--execution-id",
str(execution_id),
"--db-path",
str(database.database),
"--out-dir",
str(self.log_dir.parent),
"--stats-path",
str(artifacts.stats_path),
"--duration-seconds",
str(self.worker_duration_seconds),
"--interval-seconds",
str(self.worker_stats_interval_seconds),
"--failure-probability",
str(self.worker_failure_probability),
],
stdout=log_handle,
stderr=subprocess.STDOUT,
@ -390,7 +388,7 @@ def load_runs_view(
for job in jobs
),
"completed": tuple(
_project_completed_execution(execution, resolved_log_dir)
_project_completed_execution(execution, resolved_log_dir, reference_time)
for execution in completed_executions
),
}
@ -401,6 +399,7 @@ def load_dashboard_view(
) -> dict[str, object]:
reference_time = now or datetime.now(UTC)
runs_view = load_runs_view(log_dir=log_dir, now=reference_time)
output_dir = Path(log_dir).parent
with database.connection_context():
failed_last_day = (
JobExecution.select()
@ -414,7 +413,7 @@ def load_dashboard_view(
upcoming_ready = sum(
1 for job in runs_view["upcoming"] if str(job["run_reason"]) == "Ready"
)
footprint_bytes = _directory_size(Path(log_dir))
footprint_bytes = _directory_size(output_dir)
return {
"running": runs_view["running"],
"snapshot": {
@ -538,7 +537,7 @@ def _project_upcoming_job(
"slug": job.source.slug,
"job_id": job_id,
"next_run": (
_humanize_future_time(reference_time, next_run)
_humanize_relative_time(reference_time, next_run)
if next_run is not None
else ("Running now" if running_execution is not None else "Not scheduled")
),
@ -565,7 +564,7 @@ def _project_upcoming_job(
def _project_completed_execution(
execution: JobExecution, log_dir: Path
execution: JobExecution, log_dir: Path, reference_time: datetime
) -> dict[str, object]:
job = cast(Job, execution.job)
job_id = _job_id(job)
@ -573,18 +572,22 @@ def _project_completed_execution(
artifacts = JobArtifacts.for_execution(
log_dir=log_dir, job_id=job_id, execution_id=execution_id
)
ended_at = (
_coerce_datetime(cast(datetime | str, execution.ended_at))
if execution.ended_at is not None
else None
)
return {
"source": job.source.name,
"slug": job.source.slug,
"job_id": job_id,
"execution_id": execution_id,
"ended_at": (
_coerce_datetime(cast(datetime | str, execution.ended_at)).strftime(
"%Y-%m-%d %H:%M UTC"
)
if execution.ended_at is not None
_humanize_relative_time(reference_time, ended_at)
if ended_at is not None
else "Pending"
),
"ended_at_iso": ended_at.isoformat() if ended_at is not None else None,
"status": _execution_status_label(execution),
"status_tone": _execution_status_tone(execution),
"stats": _stats_summary(execution),
@ -678,20 +681,25 @@ def _format_bytes(value: int) -> str:
return f"{value / (1024 * 1024 * 1024):.1f} GB"
def _humanize_future_time(reference_time: datetime, target_time: datetime) -> str:
def _humanize_relative_time(reference_time: datetime, target_time: datetime) -> str:
delta_seconds = int(round((target_time - reference_time).total_seconds()))
if delta_seconds <= 0:
if delta_seconds == 0:
return "now"
absolute_delta_seconds = abs(delta_seconds)
units = (
("day", 24 * 60 * 60),
("hour", 60 * 60),
("minute", 60),
)
for label, size in units:
if delta_seconds >= size:
count = max(1, round(delta_seconds / size))
if absolute_delta_seconds >= size:
count = max(1, round(absolute_delta_seconds / size))
suffix = "" if count == 1 else "s"
return f"in {count} {label}{suffix}"
if delta_seconds > 0:
return f"in {count} {label}{suffix}"
return f"{count} {label}{suffix} ago"
return f"in {delta_seconds} seconds"
if delta_seconds > 0:
return f"in {absolute_delta_seconds} seconds"
return f"{absolute_delta_seconds} seconds ago"

View file

@ -54,12 +54,25 @@ class VideoMeta(TypedDict):
bit_rate: float
def _decode_ffmpeg_output(output: Any) -> str:
if isinstance(output, bytes):
return output.decode("utf-8", errors="replace")
return str(output)
def _print_ffmpeg_error_output(error: ffmpeg.Error) -> None:
if error.stderr:
print(_decode_ffmpeg_output(error.stderr), file=sys.stderr)
if error.stdout:
print(_decode_ffmpeg_output(error.stdout))
def probe_media(file_path) -> Dict[str, Any]:
"""Probes `file_path` using ffmpeg's ffprobe and returns the data."""
try:
return ffmpeg.probe(file_path)
except ffmpeg.Error as e:
print(e.stderr, file=sys.stderr)
_print_ffmpeg_error_output(e)
logger.error(f"Failed to probe io {file_path}")
logger.error(e)
raise RuntimeError(f"Failed to probe io {file_path}") from e
@ -217,7 +230,7 @@ def transcode_audio(input_file: str, output_dir: str, params: Dict[str, str]) ->
**params,
loglevel="quiet",
)
.run()
.run(capture_stdout=True, capture_stderr=True)
)
before = os.path.getsize(input_file) / 1024
after = os.path.getsize(output_file) / 1024
@ -229,8 +242,7 @@ def transcode_audio(input_file: str, output_dir: str, params: Dict[str, str]) ->
)
return output_file
except ffmpeg.Error as e:
print(e.stderr, file=sys.stderr)
print(e.stdout)
_print_ffmpeg_error_output(e)
logger.error(e)
raise RuntimeError(f"Failed to compress audio {input_file}") from e
@ -310,7 +322,7 @@ def transcode_video(input_file: str, output_dir: str, params: Dict[str, Any]) ->
**params,
# loglevel="quiet",
)
.run()
.run(capture_stdout=True, capture_stderr=True)
)
else:
passes = params["passes"]
@ -323,16 +335,18 @@ def transcode_video(input_file: str, output_dir: str, params: Dict[str, Any]) ->
"-stats"
)
logger.info("Running pass #1")
std_out, std_err = ffoutput.run(capture_stdout=True)
print(std_out)
print(std_err)
ffoutput.run(capture_stdout=True, capture_stderr=True)
logger.info("Running pass #2")
ffoutput = ffinput.output(video, audio, output_file, **passes[1])
ffoutput = ffoutput.global_args(
# "-loglevel", "quiet",
"-stats"
)
ffoutput.run(overwrite_output=True)
ffoutput.run(
capture_stdout=True,
capture_stderr=True,
overwrite_output=True,
)
before = os.path.getsize(input_file) / 1024
after = os.path.getsize(output_file) / 1024
@ -344,7 +358,7 @@ def transcode_video(input_file: str, output_dir: str, params: Dict[str, Any]) ->
)
return output_file
except ffmpeg.Error as e:
print(e.stderr, file=sys.stderr)
_print_ffmpeg_error_output(e)
logger.error("Failed to transcode")
logger.error(e)
raise RuntimeError(f"Failed to transcode video: {e.stderr.decode()}") from e

View file

@ -118,7 +118,7 @@ def operational_snapshot(*, snapshot: Mapping[str, str] | None = None) -> Render
stat_card(
label="Artifact footprint",
value=values["artifact_footprint"],
detail="Current log and stats artifact size under out/logs.",
detail="Current artifact size under the output path.",
),
],
]

View file

@ -144,6 +144,20 @@ def _upcoming_row(job: Mapping[str, object]) -> tuple[Node, ...]:
def _completed_row(execution: Mapping[str, object]) -> tuple[Node, ...]:
ended_at = _maybe_text(execution, "ended_at_iso")
ended_at_label: Node = h.p(class_="font-medium text-slate-900")[
_text(execution, "ended_at")
]
if ended_at is not None:
ended_at_label = h.time(
{
"data-ended-at": ended_at,
"title": ended_at,
},
datetime=ended_at,
class_="font-medium text-slate-900",
)[_text(execution, "ended_at")]
return (
h.div[
h.div(class_="font-semibold text-slate-950")[_text(execution, "source")],
@ -157,7 +171,7 @@ def _completed_row(execution: Mapping[str, object]) -> tuple[Node, ...]:
],
],
h.div[
h.p(class_="font-medium text-slate-900")[_text(execution, "ended_at")],
ended_at_label,
h.p(class_="mt-1 text-xs text-slate-500")[_text(execution, "summary")],
],
status_badge(
@ -262,10 +276,12 @@ window.repubFormatNextRuns = window.repubFormatNextRuns || (() => {
return relativeFormatter.format(0, 'second');
};
const format = () => {
document.querySelectorAll('time[data-next-run-at]').forEach((element) => {
const nextRunAt = element.getAttribute('data-next-run-at');
if (!nextRunAt) return;
const targetDate = new Date(nextRunAt);
document.querySelectorAll('time[data-next-run-at], time[data-ended-at]').forEach((element) => {
const relativeAt =
element.getAttribute('data-next-run-at') ??
element.getAttribute('data-ended-at');
if (!relativeAt) return;
const targetDate = new Date(relativeAt);
if (Number.isNaN(targetDate.getTime())) return;
element.textContent = formatRelative(targetDate);
element.title = absoluteFormatter.format(targetDate);

View file

@ -1,8 +1,10 @@
import sys
from pathlib import Path
from types import SimpleNamespace
import pytest
from repub import media
from repub.config import (
FeedConfig,
RepublisherConfig,
@ -48,3 +50,141 @@ def test_pipeline_from_crawler_uses_configured_store(
assert pipeline.settings is crawler.settings
assert pipeline.store.basedir == crawler.settings[store_setting]
def test_transcode_audio_captures_ffmpeg_output(monkeypatch, tmp_path: Path) -> None:
input_file = tmp_path / "input.mp3"
input_file.write_bytes(b"12345")
output_dir = tmp_path / "audio-out"
output_dir.mkdir()
run_calls: list[dict[str, object]] = []
class FakeOutput:
def __init__(self, output_path: Path):
self.output_path = output_path
def run(self, **kwargs):
run_calls.append(kwargs)
self.output_path.write_bytes(b"12")
return b"", b""
class FakeInput:
def output(self, output_file: str, **params):
del params
return FakeOutput(Path(output_file))
monkeypatch.setattr(media.ffmpeg, "input", lambda _: FakeInput())
result = media.transcode_audio(
str(input_file),
str(output_dir),
{"extension": "mp3", "acodec": "libmp3lame"},
)
assert result == str(output_dir / "converted.mp3")
assert run_calls == [{"capture_stdout": True, "capture_stderr": True}]
def test_transcode_video_two_pass_does_not_print_ffmpeg_output(
monkeypatch, tmp_path: Path
) -> None:
input_file = tmp_path / "input.mp4"
input_file.write_bytes(b"12345")
output_dir = tmp_path / "video-out"
output_dir.mkdir()
run_calls: list[dict[str, object]] = []
printed: list[tuple[tuple[object, ...], dict[str, object]]] = []
class FakeOutput:
def __init__(self, output_path: Path | None):
self.output_path = output_path
def global_args(self, *args):
del args
return self
def run(self, **kwargs):
run_calls.append(kwargs)
if self.output_path is not None:
self.output_path.write_bytes(b"12")
return b"pass-out", b"pass-err"
class FakeInput:
video = object()
audio = object()
def output(self, *args, **params):
del params
output_path = next(
(
Path(arg)
for arg in args
if isinstance(arg, str) and arg.endswith(".mp4")
),
None,
)
return FakeOutput(output_path)
monkeypatch.setattr(media.ffmpeg, "input", lambda _: FakeInput())
monkeypatch.setattr(
"builtins.print", lambda *args, **kwargs: printed.append((args, kwargs))
)
result = media.transcode_video(
str(input_file),
str(output_dir),
{
"extension": "mp4",
"passes": [
{"f": "null"},
{"c:v": "libx264"},
],
},
)
assert result == str(output_dir / "converted.mp4")
assert run_calls == [
{"capture_stdout": True, "capture_stderr": True},
{
"capture_stdout": True,
"capture_stderr": True,
"overwrite_output": True,
},
]
assert printed == []
def test_transcode_video_prints_ffmpeg_output_on_error(
monkeypatch, tmp_path: Path
) -> None:
input_file = tmp_path / "input.mp4"
input_file.write_bytes(b"12345")
output_dir = tmp_path / "video-out"
output_dir.mkdir()
printed: list[tuple[str, bool]] = []
class FakeOutput:
def run(self, **kwargs):
del kwargs
raise media.ffmpeg.Error("ffmpeg", b"video-stdout", b"video-stderr")
class FakeInput:
def output(self, *args, **params):
del args, params
return FakeOutput()
def fake_print(*args, **kwargs):
printed.append((str(args[0]), kwargs.get("file") is sys.stderr))
monkeypatch.setattr(media.ffmpeg, "input", lambda _: FakeInput())
monkeypatch.setattr("builtins.print", fake_print)
with pytest.raises(RuntimeError):
media.transcode_video(
str(input_file),
str(output_dir),
{"extension": "mp4", "c:v": "libx264"},
)
assert ("video-stderr", True) in printed
assert ("video-stdout", False) in printed

View file

@ -2,10 +2,15 @@ from __future__ import annotations
import asyncio
import json
import socketserver
import threading
import time
from datetime import UTC, datetime, timedelta
from http.server import BaseHTTPRequestHandler
from pathlib import Path
from repub.jobs import JobArtifacts, JobRuntime
from repub.job_runner import generate_pangea_feed
from repub.jobs import JobArtifacts, JobRuntime, load_runs_view
from repub.model import (
Job,
JobExecution,
@ -16,6 +21,10 @@ from repub.model import (
)
from repub.web import create_app, get_job_runtime, render_execution_logs, render_runs
FIXTURE_FEED_PATH = (
Path(__file__).resolve().parents[1] / "demo" / "fixtures" / "local-feed.rss"
).resolve()
def test_job_runtime_syncs_enabled_jobs_into_apscheduler(tmp_path: Path) -> None:
initialize_database(tmp_path / "scheduler.db")
@ -91,7 +100,7 @@ def test_job_runtime_run_now_writes_log_and_stats_and_marks_success(
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/manual.xml",
feed_url=FIXTURE_FEED_PATH.as_uri(),
)
job = Job.get(Job.source == source)
@ -120,9 +129,11 @@ def test_job_runtime_run_now_writes_log_and_stats_and_marks_success(
assert execution.bytes_count > 0
assert artifacts.log_path.exists()
assert artifacts.stats_path.exists()
assert "starting simulated crawl" in artifacts.log_path.read_text(
encoding="utf-8"
)
output_path = tmp_path / "out" / "manual-source.rss"
assert output_path.exists()
output_text = output_path.read_text(encoding="utf-8")
assert "<title>Local Demo Feed</title>" in output_text
assert "<title>Local Demo Entry</title>" in output_text
stats_lines = [
json.loads(line)
@ -136,50 +147,51 @@ def test_job_runtime_run_now_writes_log_and_stats_and_marks_success(
def test_job_runtime_cancel_marks_execution_canceled(tmp_path: Path) -> None:
initialize_database(tmp_path / "cancel.db")
source = create_source(
name="Cancelable source",
slug="cancelable-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/cancelable.xml",
)
job = Job.get(Job.source == source)
with _slow_feed_server() as feed_url:
source = create_source(
name="Cancelable source",
slug="cancelable-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url=feed_url,
)
job = Job.get(Job.source == source)
runtime = JobRuntime(
log_dir=tmp_path / "out" / "logs",
worker_duration_seconds=2.0,
worker_stats_interval_seconds=0.1,
worker_failure_probability=0.0,
)
try:
runtime.start()
execution_id = runtime.run_job_now(job.id, reason="manual")
assert execution_id is not None
_wait_for_running_execution(execution_id)
runtime.request_execution_cancel(execution_id)
execution = _wait_for_terminal_execution(execution_id)
artifacts = JobArtifacts.for_execution(
runtime = JobRuntime(
log_dir=tmp_path / "out" / "logs",
job_id=job.id,
execution_id=execution_id,
worker_duration_seconds=2.0,
worker_stats_interval_seconds=0.1,
worker_failure_probability=0.0,
)
try:
runtime.start()
execution_id = runtime.run_job_now(job.id, reason="manual")
assert execution_id is not None
_wait_for_running_execution(execution_id)
assert execution.running_status == JobExecutionStatus.CANCELED
assert execution.ended_at is not None
assert execution.stop_requested_at is not None
assert "graceful stop requested" in artifacts.log_path.read_text(
encoding="utf-8"
)
finally:
runtime.shutdown()
runtime.request_execution_cancel(execution_id)
execution = _wait_for_terminal_execution(execution_id)
artifacts = JobArtifacts.for_execution(
log_dir=tmp_path / "out" / "logs",
job_id=job.id,
execution_id=execution_id,
)
assert execution.running_status == JobExecutionStatus.CANCELED
assert execution.ended_at is not None
assert execution.stop_requested_at is not None
assert "graceful stop requested" in artifacts.log_path.read_text(
encoding="utf-8"
)
finally:
runtime.shutdown()
def test_job_runtime_start_reconciles_stale_running_execution(tmp_path: Path) -> None:
@ -234,6 +246,93 @@ def test_job_runtime_start_reconciles_stale_running_execution(tmp_path: Path) ->
runtime.shutdown()
def test_generate_pangea_feed_writes_rss_file(monkeypatch, tmp_path: Path) -> None:
class StubPangeaFeed:
def __init__(self, config, feeds):
self.config = config
self.feed = feeds[0]
def acquire_content(self) -> None:
return None
def generate_feed(self) -> None:
return None
def disgorge(self, slug: str):
output_path = self.config.results.output_directory / slug / "rss.xml"
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(
"<rss><channel><title>Pangea Fixture</title></channel></rss>\n",
encoding="utf-8",
)
return output_path
monkeypatch.setattr(
"repub.job_runner.pangea_feed_class",
lambda: StubPangeaFeed,
)
output_path = generate_pangea_feed(
name="Pangea source",
slug="pangea-source",
domain="example.org",
category_name="News",
content_type="articles",
only_newest=True,
max_articles=10,
oldest_article=3,
include_authors=True,
exclude_media=False,
include_content=True,
content_format="MOBILE_3",
out_dir=tmp_path / "out",
log_path=tmp_path / "out" / "logs" / "pangea.log",
)
assert output_path == (tmp_path / "out" / "pangea-source" / "rss.xml")
assert output_path.exists()
assert "Pangea Fixture" in output_path.read_text(encoding="utf-8")
def test_load_runs_view_humanizes_completed_execution_end_time(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "runs-view.db"
log_dir = tmp_path / "out" / "logs"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
app = create_app()
app.config["REPUB_LOG_DIR"] = log_dir
source = create_source(
name="Completed source",
slug="completed-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/completed.xml",
)
job = Job.get(Job.source == source)
reference_time = datetime(2026, 1, 15, 12, 0, tzinfo=UTC)
ended_at = reference_time - timedelta(hours=2)
JobExecution.create(
job=job,
running_status=JobExecutionStatus.SUCCEEDED,
ended_at=ended_at,
)
view = load_runs_view(log_dir=app.config["REPUB_LOG_DIR"], now=reference_time)
completed = view["completed"][0]
assert completed["ended_at"] == "2 hours ago"
assert completed["ended_at_iso"] == ended_at.isoformat()
def test_render_runs_uses_database_backed_jobs_and_executions(
monkeypatch, tmp_path: Path
) -> None:
@ -259,7 +358,7 @@ def test_render_runs_uses_database_backed_jobs_and_executions(
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/runs-page.xml",
feed_url=FIXTURE_FEED_PATH.as_uri(),
)
job = Job.get(Job.source == source)
runtime = get_job_runtime(app)
@ -396,3 +495,41 @@ def _wait_for_terminal_execution(
return execution
time.sleep(0.02)
raise AssertionError(f"execution {execution_id} did not finish in time")
class _SlowFeedRequestHandler(BaseHTTPRequestHandler):
def do_GET(self) -> None: # noqa: N802
time.sleep(2.0)
payload = FIXTURE_FEED_PATH.read_bytes()
self.send_response(200)
self.send_header("Content-Type", "application/rss+xml; charset=utf-8")
self.send_header("Content-Length", str(len(payload)))
self.end_headers()
self.wfile.write(payload)
def log_message(self, format: str, *args: object) -> None:
del format, args
class _ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
allow_reuse_address = True
class _slow_feed_server:
def __enter__(self) -> str:
self._server = _ThreadedTCPServer(("127.0.0.1", 0), _SlowFeedRequestHandler)
self._thread = threading.Thread(
target=self._server.serve_forever,
kwargs={"poll_interval": 0.01},
daemon=True,
)
self._thread.start()
host = str(self._server.server_address[0])
port = int(self._server.server_address[1])
return f"http://{host}:{port}/slow-feed.rss"
def __exit__(self, exc_type, exc, tb) -> None:
del exc_type, exc, tb
self._server.shutdown()
self._server.server_close()
self._thread.join(timeout=1)

View file

@ -6,6 +6,7 @@ from typing import Any, cast
from repub.components import status_badge
from repub.datastar import RefreshBroker, render_sse_event, render_stream
from repub.jobs import load_dashboard_view
from repub.model import (
Job,
JobExecution,
@ -15,6 +16,7 @@ from repub.model import (
SourcePangea,
create_source,
)
from repub.pages.runs import runs_page
from repub.web import (
create_app,
get_refresh_broker,
@ -34,6 +36,37 @@ def test_status_badge_uses_green_done_tone() -> None:
assert "Succeeded" in badge
def test_runs_page_renders_completed_execution_end_time_as_relative_hoverable_time() -> (
None
):
ended_at = "2026-01-15T10:00:00+00:00"
body = str(
runs_page(
completed_executions=(
{
"source": "Completed source",
"slug": "completed-source",
"job_id": 7,
"execution_id": 42,
"ended_at": "2 hours ago",
"ended_at_iso": ended_at,
"status": "Succeeded",
"status_tone": "done",
"stats": "1 requests • 1 items • 1 bytes",
"summary": "Worker exited successfully",
"log_href": "/job/7/execution/42/logs",
},
)
)
)
assert "data-ended-at" in body
assert f'data-ended-at="{ended_at}"' in body
assert f'datetime="{ended_at}"' in body
assert f'title="{ended_at}"' in body
assert ">2 hours ago<" in body
def test_root_get_serves_datastar_shim() -> None:
async def run() -> None:
client = create_app().test_client()
@ -179,6 +212,40 @@ def test_render_dashboard_shows_dashboard_information_architecture(
asyncio.run(run())
def test_load_dashboard_view_measures_log_artifact_path(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "dashboard-footprint.db"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
create_app()
out_dir = tmp_path / "out"
log_dir = out_dir / "logs"
cache_dir = out_dir / "httpcache"
log_dir.mkdir(parents=True)
cache_dir.mkdir(parents=True)
(log_dir / "run.log").write_bytes(b"x" * 1024)
(cache_dir / "cache.bin").write_bytes(b"y" * 2048)
snapshot = load_dashboard_view(log_dir=log_dir)["snapshot"]
assert cast(dict[str, str], snapshot)["artifact_footprint"] == "3.0 KB"
def test_render_dashboard_describes_log_artifact_footprint(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "dashboard-footprint-copy.db"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
async def run() -> None:
app = create_app()
body = str(await render_dashboard(app))
assert "Current artifact size under the output path." in body
asyncio.run(run())
def test_render_sources_shows_table_and_create_link() -> None:
async def run() -> None:
body = str(await render_sources())