republisher/tests/test_cleanup.py
Abel Luck 507074b80e
All checks were successful
buildbot/nix-eval Build done.
buildbot/nix-build Build done.
buildbot/nix-effects Build done.
Add media retention cleanup command
2026-05-27 13:04:47 +02:00

200 lines
6.6 KiB
Python

import fcntl
import io
import os
import subprocess
import sys
import time
from datetime import UTC, datetime, timedelta
from pathlib import Path
from repub.cleanup import cleanup_media
NOW = datetime(2026, 5, 27, 12, 0, tzinfo=UTC)
def write_media(path: Path, body: bytes, *, age_days: int) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(body)
timestamp = (NOW - timedelta(days=age_days)).timestamp()
os.utime(path, (timestamp, timestamp))
def wait_until(path: Path, *, timeout: float = 5.0) -> None:
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if path.exists():
return
time.sleep(0.05)
raise AssertionError(f"timed out waiting for {path}")
def test_cleanup_media_deletes_old_unreferenced_media_and_protects_latest_feed_refs(
tmp_path: Path,
) -> None:
feeds_dir = tmp_path / "feeds"
demo_dir = feeds_dir / "demo"
demo_dir.mkdir(parents=True)
(demo_dir / "feed.rss").write_text(
"""
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/">
<channel>
<item>
<enclosure url="https://mirror.example/feeds/demo/audio/current.mp3" />
<media:thumbnail xmlns:media="http://search.yahoo.com/mrss/" url="/images/thumbs/current.jpg" />
<content:encoded><![CDATA[<img src="images/full/current.webp">]]></content:encoded>
</item>
</channel>
</rss>
""".strip(),
encoding="utf-8",
)
write_media(demo_dir / "audio" / "current.mp3", b"audio", age_days=40)
write_media(demo_dir / "images" / "full" / "current.webp", b"webp", age_days=40)
write_media(demo_dir / "images" / "thumbs" / "current.jpg", b"jpg", age_days=40)
write_media(demo_dir / "images" / "source" / "current.png", b"source", age_days=40)
write_media(demo_dir / "video" / "old.mp4", b"video", age_days=40)
write_media(demo_dir / "files" / "fresh.pdf", b"fresh", age_days=2)
write_media(demo_dir / "images" / "full" / "old.webp", b"old", age_days=40)
write_media(demo_dir / ".feed.rss.next", b"staged", age_days=40)
output = io.StringIO()
result = cleanup_media(
feeds_dir=feeds_dir,
retention_days=25,
now=NOW,
dry_run=False,
output=output,
)
assert (demo_dir / "audio" / "current.mp3").exists()
assert (demo_dir / "images" / "full" / "current.webp").exists()
assert (demo_dir / "images" / "thumbs" / "current.jpg").exists()
assert not (demo_dir / "images" / "source" / "current.png").exists()
assert not (demo_dir / "video" / "old.mp4").exists()
assert not (demo_dir / "images" / "full" / "old.webp").exists()
assert (demo_dir / "files" / "fresh.pdf").exists()
assert (demo_dir / ".feed.rss.next").exists()
assert result.matched_files == 3
assert result.deleted_files == 3
assert result.bytes_deleted == len(b"source") + len(b"video") + len(b"old")
assert result.failures == 0
assert "dry_run=false" in output.getvalue()
assert "deleted_files=3" in output.getvalue()
def test_cleanup_media_dry_run_reports_matches_without_deleting(tmp_path: Path) -> None:
feeds_dir = tmp_path / "feeds"
old_file = feeds_dir / "demo" / "audio" / "old.mp3"
write_media(old_file, b"audio", age_days=40)
result = cleanup_media(
feeds_dir=feeds_dir,
retention_days=25,
now=NOW,
dry_run=True,
output=io.StringIO(),
)
assert old_file.exists()
assert result.matched_files == 1
assert result.deleted_files == 0
assert result.bytes_deleted == 0
assert result.failures == 0
def test_cleanup_media_uses_configured_media_dirs(tmp_path: Path) -> None:
feeds_dir = tmp_path / "feeds"
demo_dir = feeds_dir / "demo"
demo_dir.mkdir(parents=True)
(demo_dir / "feed.rss").write_text(
"""
<rss>
<channel>
<item>
<enclosure url="https://mirror.example/feeds/demo/audio-custom/current.mp3" />
<media:content xmlns:media="http://search.yahoo.com/mrss/" url="/videos-custom/current.mp4" />
</item>
</channel>
</rss>
""".strip(),
encoding="utf-8",
)
write_media(demo_dir / "audio-custom" / "current.mp3", b"current", age_days=40)
write_media(demo_dir / "audio-custom" / "old.mp3", b"old", age_days=40)
write_media(demo_dir / "videos-custom" / "current.mp4", b"video", age_days=40)
write_media(demo_dir / "audio" / "legacy.mp3", b"legacy", age_days=40)
result = cleanup_media(
feeds_dir=feeds_dir,
retention_days=25,
now=NOW,
media_dirs=("audio-custom", "videos-custom"),
output=io.StringIO(),
)
assert (demo_dir / "audio-custom" / "current.mp3").exists()
assert not (demo_dir / "audio-custom" / "old.mp3").exists()
assert (demo_dir / "videos-custom" / "current.mp4").exists()
assert (demo_dir / "audio" / "legacy.mp3").exists()
assert result.matched_files == 1
assert result.deleted_files == 1
assert result.failures == 0
def test_cleanup_media_waits_for_active_crawl_media_lock(tmp_path: Path) -> None:
out_dir = tmp_path / "out"
feeds_dir = out_dir / "feeds"
old_file = feeds_dir / "demo" / "audio" / "old.mp3"
write_media(old_file, b"audio", age_days=40)
lock_path = out_dir / ".media-retention.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
started_path = tmp_path / "cleanup-started"
done_path = tmp_path / "cleanup-done"
script = """
import io
import sys
from datetime import UTC, datetime
from pathlib import Path
from repub.cleanup import cleanup_media
Path(sys.argv[2]).write_text("started", encoding="utf-8")
cleanup_media(
feeds_dir=Path(sys.argv[1]),
retention_days=25,
now=datetime(2026, 5, 27, 12, 0, tzinfo=UTC),
output=io.StringIO(),
)
Path(sys.argv[3]).write_text("done", encoding="utf-8")
"""
with lock_path.open("a", encoding="utf-8") as lock_file:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_SH)
process = subprocess.Popen(
[
sys.executable,
"-c",
script,
str(feeds_dir),
str(started_path),
str(done_path),
],
cwd=Path.cwd(),
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True,
)
try:
wait_until(started_path)
time.sleep(0.5)
assert old_file.exists()
assert process.poll() is None
assert not done_path.exists()
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
stdout, stderr = process.communicate(timeout=5)
assert process.returncode == 0, stdout + stderr
assert not old_file.exists()
assert done_path.exists()