fix: publish feeds atomically

This commit is contained in:
Abel Luck 2026-05-27 10:57:21 +02:00
parent cbb427b89d
commit e64a32d76b
7 changed files with 253 additions and 4 deletions

View file

@ -38,6 +38,10 @@ def feed_output_path(*, out_dir: Path, feed_slug: str) -> Path:
return feed_output_dir(out_dir=out_dir, feed_slug=feed_slug) / "feed.rss"
def staged_feed_output_path(*, out_dir: Path, feed_slug: str) -> Path:
return feed_output_dir(out_dir=out_dir, feed_slug=feed_slug) / ".feed.rss.next"
def _resolve_path(base_path: Path, value: str) -> Path:
path = Path(value).expanduser()
if not path.is_absolute():
@ -218,7 +222,7 @@ def build_feed_settings(
{
"REPUBLISHER_OUT_DIR": str(out_dir),
"FEEDS": {
str(feed_output_path(out_dir=out_dir, feed_slug=feed_slug)): {
str(staged_feed_output_path(out_dir=out_dir, feed_slug=feed_slug)): {
"format": "rss",
"postprocessing": [],
"feed_name": feed_slug,

View file

@ -15,6 +15,7 @@ from repub.config import (
load_config,
)
from repub.media import check_runtime
from repub.postprocessing import publish_staged_feed
from repub.spiders.rss_spider import RssFeedSpider
logger = logging.getLogger(__name__)
@ -81,6 +82,14 @@ def run_feeds(
deferred = process.crawl(crawler, feed_name=feed.slug, url=feed.url)
def handle_success(_: object) -> None:
try:
publish_staged_feed(out_dir=out_dir, feed_slug=feed.slug)
except Exception:
failure = Failure()
logger.error("Feed %s (%s) failed to publish", feed.name, feed.slug)
logger.critical("%s", failure.getTraceback())
results.append((feed.slug, failure))
return None
logger.info("Feed %s (%s) completed successfully", feed.name, feed.slug)
results.append((feed.slug, None))
return None

View file

@ -31,6 +31,7 @@ from repub.model import (
initialize_database,
load_feed_url,
)
from repub.postprocessing import publish_staged_feed
from repub.spiders.rss_spider import RssFeedSpider
@ -299,6 +300,14 @@ def main(argv: list[str] | None = None) -> int:
return 130
if exit_code == 0:
try:
publish_staged_feed(out_dir=out_dir, feed_slug=feed.slug)
except Exception as error:
print(
f"worker[{args.job_id}:{args.execution_id}]: publish failed: {error}",
flush=True,
)
return 1
print(
f"worker[{args.job_id}:{args.execution_id}]: completed successfully",
flush=True,

View file

@ -0,0 +1,47 @@
from __future__ import annotations
import os
from contextlib import suppress
from pathlib import Path
from xml.etree import ElementTree
from repub.config import feed_output_path, staged_feed_output_path
def publish_staged_feed(*, out_dir: Path, feed_slug: str) -> Path:
staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug=feed_slug)
public_path = feed_output_path(out_dir=out_dir, feed_slug=feed_slug)
public_path.parent.mkdir(parents=True, exist_ok=True)
_validate_staged_feed(staged_path)
_fsync_file(staged_path)
os.replace(staged_path, public_path)
_fsync_directory(public_path.parent)
return public_path
def _fsync_file(path: Path) -> None:
with path.open("rb") as handle:
os.fsync(handle.fileno())
def _validate_staged_feed(path: Path) -> None:
try:
root = ElementTree.parse(path).getroot()
except ElementTree.ParseError as error:
raise ValueError(f"Staged feed is not well-formed XML: {path}") from error
if root.tag != "rss":
raise ValueError(f"Staged feed is not an RSS document: {path}")
if root.find("channel") is None:
raise ValueError(f"Staged feed is missing an RSS channel: {path}")
def _fsync_directory(path: Path) -> None:
flags = os.O_RDONLY | getattr(os, "O_DIRECTORY", 0)
with suppress(OSError):
fd = os.open(path, flags)
try:
os.fsync(fd)
finally:
os.close(fd)

View file

@ -154,7 +154,7 @@ def test_build_feed_settings_derives_output_paths_from_feed_slug(
out_dir / "feeds" / "info-marti" / "files"
)
assert feed_settings["FEEDS"] == {
str(out_dir / "feeds" / "info-marti" / "feed.rss"): {
str(out_dir / "feeds" / "info-marti" / ".feed.rss.next"): {
"format": "rss",
"postprocessing": [],
"feed_name": "info-marti",

View file

@ -2,8 +2,11 @@ from pathlib import Path
import pytest
from repub.config import FeedConfig
from repub.job_runner import _build_crawl_settings
from repub import job_runner as job_runner_module
from repub.config import FeedConfig, feed_output_path, staged_feed_output_path
from repub.job_runner import JobSourceConfig, _build_crawl_settings
VALID_FEED = '<rss version="2.0"><channel><title>new</title></channel></rss>\n'
def test_build_crawl_settings_passes_feed_url_to_spider(tmp_path: Path) -> None:
@ -35,3 +38,128 @@ def test_build_crawl_settings_requires_non_empty_feed_url(
stats_path=tmp_path / "stats.jsonl",
feed_url="",
)
def test_main_publishes_staged_feed_after_successful_crawl(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
out_dir = tmp_path / "out"
public_path = feed_output_path(out_dir=out_dir, feed_slug="demo")
staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug="demo")
public_path.parent.mkdir(parents=True)
public_path.write_text("<rss>old</rss>\n", encoding="utf-8")
staged_path.write_text(VALID_FEED, encoding="utf-8")
_patch_worker_dependencies(monkeypatch, exit_code=0)
exit_code = job_runner_module.main(
[
"--job-id",
"1",
"--execution-id",
"2",
"--db-path",
str(tmp_path / "republisher.db"),
"--out-dir",
str(out_dir),
"--stats-path",
str(tmp_path / "stats.jsonl"),
]
)
assert exit_code == 0
assert public_path.read_text(encoding="utf-8") == VALID_FEED
assert not staged_path.exists()
def test_main_does_not_publish_unusable_staged_feed_after_successful_crawl(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
out_dir = tmp_path / "out"
public_path = feed_output_path(out_dir=out_dir, feed_slug="demo")
staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug="demo")
public_path.parent.mkdir(parents=True)
public_path.write_text("<rss>old</rss>\n", encoding="utf-8")
staged_path.write_text('<rss version="2.0"/>\n', encoding="utf-8")
_patch_worker_dependencies(monkeypatch, exit_code=0)
exit_code = job_runner_module.main(
[
"--job-id",
"1",
"--execution-id",
"2",
"--db-path",
str(tmp_path / "republisher.db"),
"--out-dir",
str(out_dir),
"--stats-path",
str(tmp_path / "stats.jsonl"),
]
)
assert exit_code == 1
assert public_path.read_text(encoding="utf-8") == "<rss>old</rss>\n"
assert staged_path.read_text(encoding="utf-8") == '<rss version="2.0"/>\n'
def test_main_does_not_publish_staged_feed_after_failed_crawl(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
out_dir = tmp_path / "out"
public_path = feed_output_path(out_dir=out_dir, feed_slug="demo")
staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug="demo")
public_path.parent.mkdir(parents=True)
public_path.write_text("<rss>old</rss>\n", encoding="utf-8")
staged_path.write_text(VALID_FEED, encoding="utf-8")
_patch_worker_dependencies(monkeypatch, exit_code=1)
exit_code = job_runner_module.main(
[
"--job-id",
"1",
"--execution-id",
"2",
"--db-path",
str(tmp_path / "republisher.db"),
"--out-dir",
str(out_dir),
"--stats-path",
str(tmp_path / "stats.jsonl"),
]
)
assert exit_code == 1
assert public_path.read_text(encoding="utf-8") == "<rss>old</rss>\n"
assert staged_path.read_text(encoding="utf-8") == VALID_FEED
def _patch_worker_dependencies(
monkeypatch: pytest.MonkeyPatch, *, exit_code: int
) -> None:
monkeypatch.setattr(
job_runner_module,
"_load_job_source_config",
lambda *, db_path, job_id: JobSourceConfig(
source_name="Demo",
source_slug="demo",
source_type="feed",
spider_arguments={},
feed_url="https://source.example/feed.rss",
),
)
monkeypatch.setattr(
job_runner_module, "load_feed_url", lambda: "https://mirror.example"
)
monkeypatch.setattr(
job_runner_module,
"CrawlerProcess",
lambda settings: object(),
)
monkeypatch.setattr(
job_runner_module,
"_run_crawl",
lambda *, process, feed, spider_arguments: exit_code,
)

View file

@ -0,0 +1,52 @@
from pathlib import Path
import pytest
from repub.config import feed_output_path, staged_feed_output_path
from repub.postprocessing import publish_staged_feed
VALID_FEED = '<rss version="2.0"><channel><title>new</title></channel></rss>\n'
def test_publish_staged_feed_replaces_public_feed(tmp_path: Path) -> None:
out_dir = tmp_path / "out"
public_path = feed_output_path(out_dir=out_dir, feed_slug="demo")
staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug="demo")
public_path.parent.mkdir(parents=True)
public_path.write_text("<rss>old</rss>\n", encoding="utf-8")
staged_path.write_text(VALID_FEED, encoding="utf-8")
published_path = publish_staged_feed(out_dir=out_dir, feed_slug="demo")
assert published_path == public_path
assert public_path.read_text(encoding="utf-8") == VALID_FEED
assert not staged_path.exists()
def test_publish_staged_feed_requires_staged_file(tmp_path: Path) -> None:
with pytest.raises(FileNotFoundError):
publish_staged_feed(out_dir=tmp_path / "out", feed_slug="missing")
@pytest.mark.parametrize(
"staged_feed",
[
'<rss version="2.0"/>\n',
'<rss version="2.0"><channel></rss>\n',
],
)
def test_publish_staged_feed_rejects_unusable_feed(
tmp_path: Path, staged_feed: str
) -> None:
out_dir = tmp_path / "out"
public_path = feed_output_path(out_dir=out_dir, feed_slug="demo")
staged_path = staged_feed_output_path(out_dir=out_dir, feed_slug="demo")
public_path.parent.mkdir(parents=True)
public_path.write_text("<rss>old</rss>\n", encoding="utf-8")
staged_path.write_text(staged_feed, encoding="utf-8")
with pytest.raises(ValueError):
publish_staged_feed(out_dir=out_dir, feed_slug="demo")
assert public_path.read_text(encoding="utf-8") == "<rss>old</rss>\n"
assert staged_path.read_text(encoding="utf-8") == staged_feed