From e64a32d76b6e12fa4d21db13b41315e55ad8cd43 Mon Sep 17 00:00:00 2001 From: Abel Luck Date: Wed, 27 May 2026 10:57:21 +0200 Subject: [PATCH] fix: publish feeds atomically --- repub/config.py | 6 +- repub/crawl.py | 9 +++ repub/job_runner.py | 9 +++ repub/postprocessing.py | 47 +++++++++++++ tests/test_config.py | 2 +- tests/test_job_runner.py | 132 ++++++++++++++++++++++++++++++++++- tests/test_postprocessing.py | 52 ++++++++++++++ 7 files changed, 253 insertions(+), 4 deletions(-) create mode 100644 tests/test_postprocessing.py diff --git a/repub/config.py b/repub/config.py index d17c7d7..459e6b2 100644 --- a/repub/config.py +++ b/repub/config.py @@ -38,6 +38,10 @@ def feed_output_path(*, out_dir: Path, feed_slug: str) -> Path: return feed_output_dir(out_dir=out_dir, feed_slug=feed_slug) / "feed.rss" +def staged_feed_output_path(*, out_dir: Path, feed_slug: str) -> Path: + return feed_output_dir(out_dir=out_dir, feed_slug=feed_slug) / ".feed.rss.next" + + def _resolve_path(base_path: Path, value: str) -> Path: path = Path(value).expanduser() if not path.is_absolute(): @@ -218,7 +222,7 @@ def build_feed_settings( { "REPUBLISHER_OUT_DIR": str(out_dir), "FEEDS": { - str(feed_output_path(out_dir=out_dir, feed_slug=feed_slug)): { + str(staged_feed_output_path(out_dir=out_dir, feed_slug=feed_slug)): { "format": "rss", "postprocessing": [], "feed_name": feed_slug, diff --git a/repub/crawl.py b/repub/crawl.py index afa789f..6f0d9f4 100644 --- a/repub/crawl.py +++ b/repub/crawl.py @@ -15,6 +15,7 @@ from repub.config import ( load_config, ) from repub.media import check_runtime +from repub.postprocessing import publish_staged_feed from repub.spiders.rss_spider import RssFeedSpider logger = logging.getLogger(__name__) @@ -81,6 +82,14 @@ def run_feeds( deferred = process.crawl(crawler, feed_name=feed.slug, url=feed.url) def handle_success(_: object) -> None: + try: + publish_staged_feed(out_dir=out_dir, feed_slug=feed.slug) + except Exception: + failure = Failure() + logger.error("Feed %s (%s) failed to publish", feed.name, feed.slug) + logger.critical("%s", failure.getTraceback()) + results.append((feed.slug, failure)) + return None logger.info("Feed %s (%s) completed successfully", feed.name, feed.slug) results.append((feed.slug, None)) return None diff --git a/repub/job_runner.py b/repub/job_runner.py index 68b3be1..008bd15 100644 --- a/repub/job_runner.py +++ b/repub/job_runner.py @@ -31,6 +31,7 @@ from repub.model import ( initialize_database, load_feed_url, ) +from repub.postprocessing import publish_staged_feed from repub.spiders.rss_spider import RssFeedSpider @@ -299,6 +300,14 @@ def main(argv: list[str] | None = None) -> int: return 130 if exit_code == 0: + try: + publish_staged_feed(out_dir=out_dir, feed_slug=feed.slug) + except Exception as error: + print( + f"worker[{args.job_id}:{args.execution_id}]: publish failed: {error}", + flush=True, + ) + return 1 print( f"worker[{args.job_id}:{args.execution_id}]: completed successfully", flush=True, diff --git a/repub/postprocessing.py b/repub/postprocessing.py index e69de29..984c92a 100644 --- a/repub/postprocessing.py +++ b/repub/postprocessing.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import os +from contextlib import suppress +from pathlib import Path +from xml.etree import ElementTree + +from repub.config import feed_output_path, staged_feed_output_path + + +def publish_staged_feed(*, out_dir: Path, feed_slug: str) -> Path: + staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug=feed_slug) + public_path = feed_output_path(out_dir=out_dir, feed_slug=feed_slug) + + public_path.parent.mkdir(parents=True, exist_ok=True) + _validate_staged_feed(staged_path) + _fsync_file(staged_path) + os.replace(staged_path, public_path) + _fsync_directory(public_path.parent) + return public_path + + +def _fsync_file(path: Path) -> None: + with path.open("rb") as handle: + os.fsync(handle.fileno()) + + +def _validate_staged_feed(path: Path) -> None: + try: + root = ElementTree.parse(path).getroot() + except ElementTree.ParseError as error: + raise ValueError(f"Staged feed is not well-formed XML: {path}") from error + + if root.tag != "rss": + raise ValueError(f"Staged feed is not an RSS document: {path}") + if root.find("channel") is None: + raise ValueError(f"Staged feed is missing an RSS channel: {path}") + + +def _fsync_directory(path: Path) -> None: + flags = os.O_RDONLY | getattr(os, "O_DIRECTORY", 0) + with suppress(OSError): + fd = os.open(path, flags) + try: + os.fsync(fd) + finally: + os.close(fd) diff --git a/tests/test_config.py b/tests/test_config.py index 1d5816b..517dc91 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -154,7 +154,7 @@ def test_build_feed_settings_derives_output_paths_from_feed_slug( out_dir / "feeds" / "info-marti" / "files" ) assert feed_settings["FEEDS"] == { - str(out_dir / "feeds" / "info-marti" / "feed.rss"): { + str(out_dir / "feeds" / "info-marti" / ".feed.rss.next"): { "format": "rss", "postprocessing": [], "feed_name": "info-marti", diff --git a/tests/test_job_runner.py b/tests/test_job_runner.py index d7fa936..712a540 100644 --- a/tests/test_job_runner.py +++ b/tests/test_job_runner.py @@ -2,8 +2,11 @@ from pathlib import Path import pytest -from repub.config import FeedConfig -from repub.job_runner import _build_crawl_settings +from repub import job_runner as job_runner_module +from repub.config import FeedConfig, feed_output_path, staged_feed_output_path +from repub.job_runner import JobSourceConfig, _build_crawl_settings + +VALID_FEED = 'new\n' def test_build_crawl_settings_passes_feed_url_to_spider(tmp_path: Path) -> None: @@ -35,3 +38,128 @@ def test_build_crawl_settings_requires_non_empty_feed_url( stats_path=tmp_path / "stats.jsonl", feed_url="", ) + + +def test_main_publishes_staged_feed_after_successful_crawl( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + out_dir = tmp_path / "out" + public_path = feed_output_path(out_dir=out_dir, feed_slug="demo") + staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug="demo") + public_path.parent.mkdir(parents=True) + public_path.write_text("old\n", encoding="utf-8") + staged_path.write_text(VALID_FEED, encoding="utf-8") + + _patch_worker_dependencies(monkeypatch, exit_code=0) + + exit_code = job_runner_module.main( + [ + "--job-id", + "1", + "--execution-id", + "2", + "--db-path", + str(tmp_path / "republisher.db"), + "--out-dir", + str(out_dir), + "--stats-path", + str(tmp_path / "stats.jsonl"), + ] + ) + + assert exit_code == 0 + assert public_path.read_text(encoding="utf-8") == VALID_FEED + assert not staged_path.exists() + + +def test_main_does_not_publish_unusable_staged_feed_after_successful_crawl( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + out_dir = tmp_path / "out" + public_path = feed_output_path(out_dir=out_dir, feed_slug="demo") + staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug="demo") + public_path.parent.mkdir(parents=True) + public_path.write_text("old\n", encoding="utf-8") + staged_path.write_text('\n', encoding="utf-8") + + _patch_worker_dependencies(monkeypatch, exit_code=0) + + exit_code = job_runner_module.main( + [ + "--job-id", + "1", + "--execution-id", + "2", + "--db-path", + str(tmp_path / "republisher.db"), + "--out-dir", + str(out_dir), + "--stats-path", + str(tmp_path / "stats.jsonl"), + ] + ) + + assert exit_code == 1 + assert public_path.read_text(encoding="utf-8") == "old\n" + assert staged_path.read_text(encoding="utf-8") == '\n' + + +def test_main_does_not_publish_staged_feed_after_failed_crawl( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + out_dir = tmp_path / "out" + public_path = feed_output_path(out_dir=out_dir, feed_slug="demo") + staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug="demo") + public_path.parent.mkdir(parents=True) + public_path.write_text("old\n", encoding="utf-8") + staged_path.write_text(VALID_FEED, encoding="utf-8") + + _patch_worker_dependencies(monkeypatch, exit_code=1) + + exit_code = job_runner_module.main( + [ + "--job-id", + "1", + "--execution-id", + "2", + "--db-path", + str(tmp_path / "republisher.db"), + "--out-dir", + str(out_dir), + "--stats-path", + str(tmp_path / "stats.jsonl"), + ] + ) + + assert exit_code == 1 + assert public_path.read_text(encoding="utf-8") == "old\n" + assert staged_path.read_text(encoding="utf-8") == VALID_FEED + + +def _patch_worker_dependencies( + monkeypatch: pytest.MonkeyPatch, *, exit_code: int +) -> None: + monkeypatch.setattr( + job_runner_module, + "_load_job_source_config", + lambda *, db_path, job_id: JobSourceConfig( + source_name="Demo", + source_slug="demo", + source_type="feed", + spider_arguments={}, + feed_url="https://source.example/feed.rss", + ), + ) + monkeypatch.setattr( + job_runner_module, "load_feed_url", lambda: "https://mirror.example" + ) + monkeypatch.setattr( + job_runner_module, + "CrawlerProcess", + lambda settings: object(), + ) + monkeypatch.setattr( + job_runner_module, + "_run_crawl", + lambda *, process, feed, spider_arguments: exit_code, + ) diff --git a/tests/test_postprocessing.py b/tests/test_postprocessing.py new file mode 100644 index 0000000..77a221a --- /dev/null +++ b/tests/test_postprocessing.py @@ -0,0 +1,52 @@ +from pathlib import Path + +import pytest + +from repub.config import feed_output_path, staged_feed_output_path +from repub.postprocessing import publish_staged_feed + +VALID_FEED = 'new\n' + + +def test_publish_staged_feed_replaces_public_feed(tmp_path: Path) -> None: + out_dir = tmp_path / "out" + public_path = feed_output_path(out_dir=out_dir, feed_slug="demo") + staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug="demo") + public_path.parent.mkdir(parents=True) + public_path.write_text("old\n", encoding="utf-8") + staged_path.write_text(VALID_FEED, encoding="utf-8") + + published_path = publish_staged_feed(out_dir=out_dir, feed_slug="demo") + + assert published_path == public_path + assert public_path.read_text(encoding="utf-8") == VALID_FEED + assert not staged_path.exists() + + +def test_publish_staged_feed_requires_staged_file(tmp_path: Path) -> None: + with pytest.raises(FileNotFoundError): + publish_staged_feed(out_dir=tmp_path / "out", feed_slug="missing") + + +@pytest.mark.parametrize( + "staged_feed", + [ + '\n', + '\n', + ], +) +def test_publish_staged_feed_rejects_unusable_feed( + tmp_path: Path, staged_feed: str +) -> None: + out_dir = tmp_path / "out" + public_path = feed_output_path(out_dir=out_dir, feed_slug="demo") + staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug="demo") + public_path.parent.mkdir(parents=True) + public_path.write_text("old\n", encoding="utf-8") + staged_path.write_text(staged_feed, encoding="utf-8") + + with pytest.raises(ValueError): + publish_staged_feed(out_dir=out_dir, feed_slug="demo") + + assert public_path.read_text(encoding="utf-8") == "old\n" + assert staged_path.read_text(encoding="utf-8") == staged_feed