import sys from pathlib import Path from types import SimpleNamespace from typing import Any, cast import pytest from scrapy.crawler import Crawler from scrapy.http import Request, Response from repub import media from repub.config import ( FeedConfig, RepublisherConfig, build_base_settings, build_feed_settings, ) from repub.items import ElementItem from repub.pipelines import AudioPipeline, FilePipeline, VideoPipeline from repub.utils import local_audio_path, local_video_path def build_test_crawler(tmp_path: Path) -> SimpleNamespace: out_dir = (tmp_path / "mirror").resolve() config = RepublisherConfig( config_path=tmp_path / "repub.toml", out_dir=out_dir, feeds=( FeedConfig( name="NASA Breaking News", slug="nasa", url="https://www.nasa.gov/rss/dyn/breaking_news.rss", ), ), scrapy_settings={}, ) base_settings = build_base_settings(config) settings = build_feed_settings(base_settings, out_dir=out_dir, feed_slug="nasa") settings.set("REPUBLISHER_FEED_URL", "https://mirror.example", priority="cmdline") return SimpleNamespace(settings=settings, request_fingerprinter=object()) def spider_info() -> Any: return SimpleNamespace(spider=SimpleNamespace()) def store_dir(pipeline: Any) -> Path: return Path(cast(Any, pipeline.store).basedir) @pytest.mark.parametrize( ("pipeline_cls", "store_setting"), [ (AudioPipeline, "AUDIO_STORE"), (VideoPipeline, "VIDEO_STORE"), (FilePipeline, "FILES_STORE"), ], ) def test_pipeline_from_crawler_uses_configured_store( tmp_path: Path, pipeline_cls, store_setting: str ) -> None: crawler = build_test_crawler(tmp_path) pipeline = pipeline_cls.from_crawler(cast(Crawler, crawler)) assert pipeline.settings is crawler.settings assert store_dir(pipeline) == Path(crawler.settings[store_setting]) def test_transcode_audio_captures_ffmpeg_output(monkeypatch, tmp_path: Path) -> None: input_file = tmp_path / "input.mp3" input_file.write_bytes(b"12345") output_dir = tmp_path / "audio-out" output_dir.mkdir() run_calls: list[dict[str, object]] = [] class FakeOutput: def __init__(self, output_path: Path): self.output_path = output_path def run(self, **kwargs): run_calls.append(kwargs) self.output_path.write_bytes(b"12") return b"", b"" class FakeInput: def output(self, output_file: str, **params): del params return FakeOutput(Path(output_file)) monkeypatch.setattr(media.ffmpeg, "input", lambda _: FakeInput()) result = media.transcode_audio( str(input_file), str(output_dir), {"extension": "mp3", "acodec": "libmp3lame"}, ) assert result == str(output_dir / "converted.mp3") assert run_calls == [{"capture_stdout": True, "capture_stderr": True}] def test_transcode_video_two_pass_does_not_print_ffmpeg_output( monkeypatch, tmp_path: Path ) -> None: input_file = tmp_path / "input.mp4" input_file.write_bytes(b"12345") output_dir = tmp_path / "video-out" output_dir.mkdir() run_calls: list[dict[str, object]] = [] printed: list[tuple[tuple[object, ...], dict[str, object]]] = [] class FakeOutput: def __init__(self, output_path: Path | None): self.output_path = output_path def global_args(self, *args): del args return self def run(self, **kwargs): run_calls.append(kwargs) if self.output_path is not None: self.output_path.write_bytes(b"12") return b"pass-out", b"pass-err" class FakeInput: video = object() audio = object() def output(self, *args, **params): del params output_path = next( ( Path(arg) for arg in args if isinstance(arg, str) and arg.endswith(".mp4") ), None, ) return FakeOutput(output_path) monkeypatch.setattr(media.ffmpeg, "input", lambda _: FakeInput()) monkeypatch.setattr( "builtins.print", lambda *args, **kwargs: printed.append((args, kwargs)) ) result = media.transcode_video( str(input_file), str(output_dir), { "extension": "mp4", "passes": [ {"f": "null"}, {"c:v": "libx264"}, ], }, ) assert result == str(output_dir / "converted.mp4") assert run_calls == [ {"capture_stdout": True, "capture_stderr": True}, { "capture_stdout": True, "capture_stderr": True, "overwrite_output": True, }, ] assert printed == [] def test_transcode_video_prints_ffmpeg_output_on_error( monkeypatch, tmp_path: Path ) -> None: input_file = tmp_path / "input.mp4" input_file.write_bytes(b"12345") output_dir = tmp_path / "video-out" output_dir.mkdir() printed: list[tuple[str, bool]] = [] class FakeOutput: def run(self, **kwargs): del kwargs raise media.ffmpeg.Error("ffmpeg", b"video-stdout", b"video-stderr") class FakeInput: def output(self, *args, **params): del args, params return FakeOutput() def fake_print(*args, **kwargs): printed.append((str(args[0]), kwargs.get("file") is sys.stderr)) monkeypatch.setattr(media.ffmpeg, "input", lambda _: FakeInput()) monkeypatch.setattr("builtins.print", fake_print) with pytest.raises(RuntimeError): media.transcode_video( str(input_file), str(output_dir), {"extension": "mp4", "c:v": "libx264"}, ) assert ("video-stderr", True) in printed assert ("video-stdout", False) in printed def test_audio_pipeline_media_downloaded_returns_canonical_file_info_and_variants( monkeypatch, tmp_path: Path ) -> None: crawler = build_test_crawler(tmp_path) pipeline = AudioPipeline.from_crawler(cast(Crawler, crawler)) monkeypatch.setattr(pipeline, "inc_stats", lambda status: None) persisted: list[tuple[str, str]] = [] source_url = "https://example.com/podcast.mp3" item = ElementItem( feed_name="nasa", el=None, image_urls=[], images=[], file_urls=[], files=[], audio_urls=[source_url], audios=[], video_urls=[], videos=[], ) def fake_transcode( input_file: str, settings: media.MediaSettings, tmp_dir: str ) -> str: output_path = Path(tmp_dir) / f"{settings['name']}.{settings['extension']}" output_path.write_bytes(settings["name"].encode("utf-8")) return str(output_path) def fake_probe_media(file_path: str): if file_path.endswith("vbr7.mp3"): return { "format": { "duration": "61.2", "size": "4567", "bit_rate": "96000", "format_name": "mp3", "format_long_name": "MP3", }, "streams": [ { "codec_type": "audio", "codec_name": "mp3", "bit_rate": "96000", "duration_ts": "61200", "sample_rate": "44100", "channels": 2, } ], } return { "format": { "duration": "61.2", "size": "3456", "bit_rate": "88000", "format_name": "aac", "format_long_name": "AAC", }, "streams": [ { "codec_type": "audio", "codec_name": "aac", "bit_rate": "88000", "duration_ts": "61200", "sample_rate": "48000", "channels": 2, } ], } monkeypatch.setattr(pipeline, "transcode", fake_transcode) monkeypatch.setattr(media, "probe_media", fake_probe_media) def fake_persist_file(path, buf, info, meta=None, headers=None): del info, meta assert headers is not None target = store_dir(pipeline) / path target.parent.mkdir(parents=True, exist_ok=True) target.write_bytes(buf.read()) persisted.append((path, headers["Content-Type"])) monkeypatch.setattr(pipeline.store, "persist_file", fake_persist_file) result = pipeline.media_downloaded( Response(url=source_url, body=b"source-bytes", status=200), Request(source_url), spider_info(), item=item, ) audio_base_path = local_audio_path(source_url) assert isinstance(result, dict) assert isinstance(result["checksum"], str) assert result == { "url": source_url, "path": f"{audio_base_path}-vbr7.mp3", "published_url": ( f"https://mirror.example/feeds/nasa/audio/{audio_base_path}-vbr7.mp3" ), "checksum": result["checksum"], "status": "downloaded", "variants": [ { "url": ( f"https://mirror.example/feeds/nasa/audio/{audio_base_path}-vbr7.mp3" ), "path": f"{audio_base_path}-vbr7.mp3", "type": "audio/mp3", "medium": "audio", "isDefault": "true", "fileSize": "4567", "bitrate": 96000, "duration": "61.2", "samplingrate": 44100, "channels": 2, }, { "url": ( f"https://mirror.example/feeds/nasa/audio/{audio_base_path}-vbr3.aac" ), "path": f"{audio_base_path}-vbr3.aac", "type": "audio/aac", "medium": "audio", "isDefault": "false", "fileSize": "3456", "bitrate": 88000, "duration": "61.2", "samplingrate": 48000, "channels": 2, }, ], } assert persisted == [ (f"{audio_base_path}-vbr7.mp3", "audio/mp3"), (f"{audio_base_path}-vbr3.aac", "audio/aac"), ] completed_item = pipeline.item_completed( [(True, result)], item, spider_info(), ) assert completed_item.audios == [result] def test_video_pipeline_media_downloaded_returns_canonical_file_info_and_variants( monkeypatch, tmp_path: Path ) -> None: crawler = build_test_crawler(tmp_path) pipeline = VideoPipeline.from_crawler(cast(Crawler, crawler)) monkeypatch.setattr(pipeline, "inc_stats", lambda status: None) persisted: list[tuple[str, str]] = [] source_url = "https://example.com/video.mp4" item = ElementItem( feed_name="nasa", el=None, image_urls=[], images=[], file_urls=[], files=[], audio_urls=[], audios=[], video_urls=[source_url], videos=[], ) def fake_transcode( input_file: str, settings: media.MediaSettings, tmp_dir: str ) -> str: output_path = Path(tmp_dir) / f"{settings['name']}.{settings['extension']}" output_path.write_bytes(settings["name"].encode("utf-8")) return str(output_path) monkeypatch.setattr(pipeline, "transcode", fake_transcode) monkeypatch.setattr( media, "probe_media", lambda _: { "format": { "duration": "60.0", "size": "9876", "bit_rate": "123456", "format_name": "mp4", "format_long_name": "MP4", }, "streams": [ { "codec_type": "video", "codec_name": "h264", "bit_rate": "123456", "duration_ts": "60000", "width": 1280, "height": 720, "avg_frame_rate": "30/1", }, { "codec_type": "audio", "codec_name": "mp3", "bit_rate": "96000", "duration_ts": "60000", }, ], }, ) def fake_persist_file(path, buf, info, meta=None, headers=None): del info, meta assert headers is not None target = store_dir(pipeline) / path target.parent.mkdir(parents=True, exist_ok=True) target.write_bytes(buf.read()) persisted.append((path, headers["Content-Type"])) monkeypatch.setattr(pipeline.store, "persist_file", fake_persist_file) result = pipeline.media_downloaded( Response(url=source_url, body=b"video-bytes", status=200), Request(source_url), spider_info(), item=item, ) video_base_path = local_video_path(source_url) assert isinstance(result, dict) assert isinstance(result["checksum"], str) assert result == { "url": source_url, "path": f"{video_base_path}-720.mp4", "published_url": ( f"https://mirror.example/feeds/nasa/video/{video_base_path}-720.mp4" ), "checksum": result["checksum"], "status": "downloaded", "variants": [ { "url": ( f"https://mirror.example/feeds/nasa/video/{video_base_path}-720.mp4" ), "path": f"{video_base_path}-720.mp4", "type": "video/mp4", "medium": "video", "isDefault": "true", "fileSize": "9876", "bitrate": 123456, "duration": "60.0", "width": 1280, "height": 720, "framerate": "30/1", } ], } assert persisted == [(f"{video_base_path}-720.mp4", "video/mp4")] def test_audio_pipeline_media_to_download_checks_canonical_path( monkeypatch, tmp_path: Path ) -> None: crawler = build_test_crawler(tmp_path) pipeline = AudioPipeline.from_crawler(cast(Crawler, crawler)) monkeypatch.setattr(pipeline, "inc_stats", lambda status: None) source_url = "https://example.com/podcast.mp3" audio_base_path = local_audio_path(source_url) canonical_path = store_dir(pipeline) / f"{audio_base_path}-vbr7.mp3" secondary_path = store_dir(pipeline) / f"{audio_base_path}-vbr3.aac" canonical_path.parent.mkdir(parents=True, exist_ok=True) canonical_path.write_bytes(b"default") secondary_path.write_bytes(b"alt") stat_paths: list[str] = [] original_stat_file = pipeline.store.stat_file item = ElementItem( feed_name="nasa", el=None, image_urls=[], images=[], file_urls=[], files=[], audio_urls=[source_url], audios=[], video_urls=[], videos=[], ) def wrapped_stat_file(path, info): stat_paths.append(path) return original_stat_file(path, info) monkeypatch.setattr(pipeline.store, "stat_file", wrapped_stat_file) monkeypatch.setattr( media, "probe_media", lambda file_path: { "format": { "duration": "61.2", "size": "4567" if file_path.endswith("vbr7.mp3") else "3456", "bit_rate": "96000" if file_path.endswith("vbr7.mp3") else "88000", "format_name": "mp3" if file_path.endswith("vbr7.mp3") else "aac", "format_long_name": "Audio", }, "streams": [ { "codec_type": "audio", "codec_name": "mp3" if file_path.endswith("vbr7.mp3") else "aac", "bit_rate": "96000" if file_path.endswith("vbr7.mp3") else "88000", "duration_ts": "61200", "sample_rate": ( "44100" if file_path.endswith("vbr7.mp3") else "48000" ), "channels": 2, } ], }, ) result = pipeline.media_to_download( Request(source_url), spider_info(), item=item, ) assert result is not None assert result["path"] == f"{audio_base_path}-vbr7.mp3" assert result["status"] == "uptodate" assert f"{audio_base_path}.mp3" not in stat_paths assert stat_paths[0] == f"{audio_base_path}-vbr7.mp3"