diff --git a/repub/config.py b/repub/config.py
index d17c7d7..459e6b2 100644
--- a/repub/config.py
+++ b/repub/config.py
@@ -38,6 +38,10 @@ def feed_output_path(*, out_dir: Path, feed_slug: str) -> Path:
return feed_output_dir(out_dir=out_dir, feed_slug=feed_slug) / "feed.rss"
+def staged_feed_output_path(*, out_dir: Path, feed_slug: str) -> Path:
+ return feed_output_dir(out_dir=out_dir, feed_slug=feed_slug) / ".feed.rss.next"
+
+
def _resolve_path(base_path: Path, value: str) -> Path:
path = Path(value).expanduser()
if not path.is_absolute():
@@ -218,7 +222,7 @@ def build_feed_settings(
{
"REPUBLISHER_OUT_DIR": str(out_dir),
"FEEDS": {
- str(feed_output_path(out_dir=out_dir, feed_slug=feed_slug)): {
+ str(staged_feed_output_path(out_dir=out_dir, feed_slug=feed_slug)): {
"format": "rss",
"postprocessing": [],
"feed_name": feed_slug,
diff --git a/repub/crawl.py b/repub/crawl.py
index afa789f..6f0d9f4 100644
--- a/repub/crawl.py
+++ b/repub/crawl.py
@@ -15,6 +15,7 @@ from repub.config import (
load_config,
)
from repub.media import check_runtime
+from repub.postprocessing import publish_staged_feed
from repub.spiders.rss_spider import RssFeedSpider
logger = logging.getLogger(__name__)
@@ -81,6 +82,14 @@ def run_feeds(
deferred = process.crawl(crawler, feed_name=feed.slug, url=feed.url)
def handle_success(_: object) -> None:
+ try:
+ publish_staged_feed(out_dir=out_dir, feed_slug=feed.slug)
+ except Exception:
+ failure = Failure()
+ logger.error("Feed %s (%s) failed to publish", feed.name, feed.slug)
+ logger.critical("%s", failure.getTraceback())
+ results.append((feed.slug, failure))
+ return None
logger.info("Feed %s (%s) completed successfully", feed.name, feed.slug)
results.append((feed.slug, None))
return None
diff --git a/repub/job_runner.py b/repub/job_runner.py
index 68b3be1..008bd15 100644
--- a/repub/job_runner.py
+++ b/repub/job_runner.py
@@ -31,6 +31,7 @@ from repub.model import (
initialize_database,
load_feed_url,
)
+from repub.postprocessing import publish_staged_feed
from repub.spiders.rss_spider import RssFeedSpider
@@ -299,6 +300,14 @@ def main(argv: list[str] | None = None) -> int:
return 130
if exit_code == 0:
+ try:
+ publish_staged_feed(out_dir=out_dir, feed_slug=feed.slug)
+ except Exception as error:
+ print(
+ f"worker[{args.job_id}:{args.execution_id}]: publish failed: {error}",
+ flush=True,
+ )
+ return 1
print(
f"worker[{args.job_id}:{args.execution_id}]: completed successfully",
flush=True,
diff --git a/repub/postprocessing.py b/repub/postprocessing.py
index e69de29..984c92a 100644
--- a/repub/postprocessing.py
+++ b/repub/postprocessing.py
@@ -0,0 +1,47 @@
+from __future__ import annotations
+
+import os
+from contextlib import suppress
+from pathlib import Path
+from xml.etree import ElementTree
+
+from repub.config import feed_output_path, staged_feed_output_path
+
+
+def publish_staged_feed(*, out_dir: Path, feed_slug: str) -> Path:
+ staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug=feed_slug)
+ public_path = feed_output_path(out_dir=out_dir, feed_slug=feed_slug)
+
+ public_path.parent.mkdir(parents=True, exist_ok=True)
+ _validate_staged_feed(staged_path)
+ _fsync_file(staged_path)
+ os.replace(staged_path, public_path)
+ _fsync_directory(public_path.parent)
+ return public_path
+
+
+def _fsync_file(path: Path) -> None:
+ with path.open("rb") as handle:
+ os.fsync(handle.fileno())
+
+
+def _validate_staged_feed(path: Path) -> None:
+ try:
+ root = ElementTree.parse(path).getroot()
+ except ElementTree.ParseError as error:
+ raise ValueError(f"Staged feed is not well-formed XML: {path}") from error
+
+ if root.tag != "rss":
+ raise ValueError(f"Staged feed is not an RSS document: {path}")
+ if root.find("channel") is None:
+ raise ValueError(f"Staged feed is missing an RSS channel: {path}")
+
+
+def _fsync_directory(path: Path) -> None:
+ flags = os.O_RDONLY | getattr(os, "O_DIRECTORY", 0)
+ with suppress(OSError):
+ fd = os.open(path, flags)
+ try:
+ os.fsync(fd)
+ finally:
+ os.close(fd)
diff --git a/tests/test_config.py b/tests/test_config.py
index 1d5816b..517dc91 100644
--- a/tests/test_config.py
+++ b/tests/test_config.py
@@ -154,7 +154,7 @@ def test_build_feed_settings_derives_output_paths_from_feed_slug(
out_dir / "feeds" / "info-marti" / "files"
)
assert feed_settings["FEEDS"] == {
- str(out_dir / "feeds" / "info-marti" / "feed.rss"): {
+ str(out_dir / "feeds" / "info-marti" / ".feed.rss.next"): {
"format": "rss",
"postprocessing": [],
"feed_name": "info-marti",
diff --git a/tests/test_job_runner.py b/tests/test_job_runner.py
index d7fa936..712a540 100644
--- a/tests/test_job_runner.py
+++ b/tests/test_job_runner.py
@@ -2,8 +2,11 @@ from pathlib import Path
import pytest
-from repub.config import FeedConfig
-from repub.job_runner import _build_crawl_settings
+from repub import job_runner as job_runner_module
+from repub.config import FeedConfig, feed_output_path, staged_feed_output_path
+from repub.job_runner import JobSourceConfig, _build_crawl_settings
+
+VALID_FEED = 'new\n'
def test_build_crawl_settings_passes_feed_url_to_spider(tmp_path: Path) -> None:
@@ -35,3 +38,128 @@ def test_build_crawl_settings_requires_non_empty_feed_url(
stats_path=tmp_path / "stats.jsonl",
feed_url="",
)
+
+
+def test_main_publishes_staged_feed_after_successful_crawl(
+ monkeypatch: pytest.MonkeyPatch, tmp_path: Path
+) -> None:
+ out_dir = tmp_path / "out"
+ public_path = feed_output_path(out_dir=out_dir, feed_slug="demo")
+ staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug="demo")
+ public_path.parent.mkdir(parents=True)
+ public_path.write_text("old\n", encoding="utf-8")
+ staged_path.write_text(VALID_FEED, encoding="utf-8")
+
+ _patch_worker_dependencies(monkeypatch, exit_code=0)
+
+ exit_code = job_runner_module.main(
+ [
+ "--job-id",
+ "1",
+ "--execution-id",
+ "2",
+ "--db-path",
+ str(tmp_path / "republisher.db"),
+ "--out-dir",
+ str(out_dir),
+ "--stats-path",
+ str(tmp_path / "stats.jsonl"),
+ ]
+ )
+
+ assert exit_code == 0
+ assert public_path.read_text(encoding="utf-8") == VALID_FEED
+ assert not staged_path.exists()
+
+
+def test_main_does_not_publish_unusable_staged_feed_after_successful_crawl(
+ monkeypatch: pytest.MonkeyPatch, tmp_path: Path
+) -> None:
+ out_dir = tmp_path / "out"
+ public_path = feed_output_path(out_dir=out_dir, feed_slug="demo")
+ staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug="demo")
+ public_path.parent.mkdir(parents=True)
+ public_path.write_text("old\n", encoding="utf-8")
+ staged_path.write_text('\n', encoding="utf-8")
+
+ _patch_worker_dependencies(monkeypatch, exit_code=0)
+
+ exit_code = job_runner_module.main(
+ [
+ "--job-id",
+ "1",
+ "--execution-id",
+ "2",
+ "--db-path",
+ str(tmp_path / "republisher.db"),
+ "--out-dir",
+ str(out_dir),
+ "--stats-path",
+ str(tmp_path / "stats.jsonl"),
+ ]
+ )
+
+ assert exit_code == 1
+ assert public_path.read_text(encoding="utf-8") == "old\n"
+ assert staged_path.read_text(encoding="utf-8") == '\n'
+
+
+def test_main_does_not_publish_staged_feed_after_failed_crawl(
+ monkeypatch: pytest.MonkeyPatch, tmp_path: Path
+) -> None:
+ out_dir = tmp_path / "out"
+ public_path = feed_output_path(out_dir=out_dir, feed_slug="demo")
+ staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug="demo")
+ public_path.parent.mkdir(parents=True)
+ public_path.write_text("old\n", encoding="utf-8")
+ staged_path.write_text(VALID_FEED, encoding="utf-8")
+
+ _patch_worker_dependencies(monkeypatch, exit_code=1)
+
+ exit_code = job_runner_module.main(
+ [
+ "--job-id",
+ "1",
+ "--execution-id",
+ "2",
+ "--db-path",
+ str(tmp_path / "republisher.db"),
+ "--out-dir",
+ str(out_dir),
+ "--stats-path",
+ str(tmp_path / "stats.jsonl"),
+ ]
+ )
+
+ assert exit_code == 1
+ assert public_path.read_text(encoding="utf-8") == "old\n"
+ assert staged_path.read_text(encoding="utf-8") == VALID_FEED
+
+
+def _patch_worker_dependencies(
+ monkeypatch: pytest.MonkeyPatch, *, exit_code: int
+) -> None:
+ monkeypatch.setattr(
+ job_runner_module,
+ "_load_job_source_config",
+ lambda *, db_path, job_id: JobSourceConfig(
+ source_name="Demo",
+ source_slug="demo",
+ source_type="feed",
+ spider_arguments={},
+ feed_url="https://source.example/feed.rss",
+ ),
+ )
+ monkeypatch.setattr(
+ job_runner_module, "load_feed_url", lambda: "https://mirror.example"
+ )
+ monkeypatch.setattr(
+ job_runner_module,
+ "CrawlerProcess",
+ lambda settings: object(),
+ )
+ monkeypatch.setattr(
+ job_runner_module,
+ "_run_crawl",
+ lambda *, process, feed, spider_arguments: exit_code,
+ )
diff --git a/tests/test_postprocessing.py b/tests/test_postprocessing.py
new file mode 100644
index 0000000..77a221a
--- /dev/null
+++ b/tests/test_postprocessing.py
@@ -0,0 +1,52 @@
+from pathlib import Path
+
+import pytest
+
+from repub.config import feed_output_path, staged_feed_output_path
+from repub.postprocessing import publish_staged_feed
+
+VALID_FEED = 'new\n'
+
+
+def test_publish_staged_feed_replaces_public_feed(tmp_path: Path) -> None:
+ out_dir = tmp_path / "out"
+ public_path = feed_output_path(out_dir=out_dir, feed_slug="demo")
+ staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug="demo")
+ public_path.parent.mkdir(parents=True)
+ public_path.write_text("old\n", encoding="utf-8")
+ staged_path.write_text(VALID_FEED, encoding="utf-8")
+
+ published_path = publish_staged_feed(out_dir=out_dir, feed_slug="demo")
+
+ assert published_path == public_path
+ assert public_path.read_text(encoding="utf-8") == VALID_FEED
+ assert not staged_path.exists()
+
+
+def test_publish_staged_feed_requires_staged_file(tmp_path: Path) -> None:
+ with pytest.raises(FileNotFoundError):
+ publish_staged_feed(out_dir=tmp_path / "out", feed_slug="missing")
+
+
+@pytest.mark.parametrize(
+ "staged_feed",
+ [
+ '\n',
+ '\n',
+ ],
+)
+def test_publish_staged_feed_rejects_unusable_feed(
+ tmp_path: Path, staged_feed: str
+) -> None:
+ out_dir = tmp_path / "out"
+ public_path = feed_output_path(out_dir=out_dir, feed_slug="demo")
+ staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug="demo")
+ public_path.parent.mkdir(parents=True)
+ public_path.write_text("old\n", encoding="utf-8")
+ staged_path.write_text(staged_feed, encoding="utf-8")
+
+ with pytest.raises(ValueError):
+ publish_staged_feed(out_dir=out_dir, feed_slug="demo")
+
+ assert public_path.read_text(encoding="utf-8") == "old\n"
+ assert staged_path.read_text(encoding="utf-8") == staged_feed