from __future__ import annotations import fcntl import html import re import sys from collections.abc import Iterator, Sequence from contextlib import contextmanager from dataclasses import dataclass from datetime import UTC, datetime, timedelta from pathlib import Path from typing import TextIO from urllib.parse import unquote, urlsplit DEFAULT_MEDIA_DIRS = ("images", "audio", "video", "files") MEDIA_RETENTION_LOCK = ".media-retention.lock" @dataclass class CleanupResult: scanned_root: Path cutoff: datetime dry_run: bool matched_files: int = 0 deleted_files: int = 0 bytes_deleted: int = 0 failures: int = 0 def _bool_text(value: bool) -> str: return "true" if value else "false" def _normalize_media_dirs(media_dirs: Sequence[str]) -> tuple[str, ...]: normalized = tuple( dict.fromkeys( normalized_dir for media_dir in media_dirs if (normalized_dir := media_dir.strip("/")) ) ) if not normalized: raise ValueError("media_dirs must include at least one media directory") return normalized def _feed_reference_re(media_dirs: Sequence[str]) -> re.Pattern[str]: media_names = "|".join(re.escape(media_dir) for media_dir in media_dirs) return re.compile( rf"""(?:https?://[^"'<>\s,]+|/?(?:feeds/[^/"'<>\s,]+/)?(?:{media_names})/[^"'<>\s,]+)""" ) @contextmanager def media_retention_lock(*, out_dir: Path, exclusive: bool) -> Iterator[None]: out_dir = out_dir.resolve() out_dir.mkdir(parents=True, exist_ok=True) lock_mode = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH with (out_dir / MEDIA_RETENTION_LOCK).open("a", encoding="utf-8") as lock_file: fcntl.flock(lock_file.fileno(), lock_mode) try: yield finally: fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) def _feed_dirs(feeds_dir: Path) -> list[Path]: if not feeds_dir.exists(): return [] return sorted(path for path in feeds_dir.iterdir() if path.is_dir()) def _referenced_media_paths( feed_dir: Path, feed_body: str, media_dirs: Sequence[str] ) -> set[Path]: protected: set[Path] = set() slug = feed_dir.name feed_prefix = f"/feeds/{slug}/" feed_root = feed_dir.resolve() media_dir_set = set(media_dirs) for match in _feed_reference_re(media_dirs).finditer(feed_body): reference = html.unescape(match.group(0)) path = unquote(urlsplit(reference).path) if path.startswith(feed_prefix): relative_path = path.removeprefix(feed_prefix) else: relative_path = path.lstrip("/") if relative_path.split("/", maxsplit=1)[0] not in media_dir_set: continue candidate = (feed_dir / relative_path).resolve() if candidate.is_relative_to(feed_root): protected.add(candidate) return protected def collect_protected_paths( feeds_dir: Path, media_dirs: Sequence[str] = DEFAULT_MEDIA_DIRS ) -> set[Path]: media_dirs = _normalize_media_dirs(media_dirs) protected: set[Path] = set() for feed_dir in _feed_dirs(feeds_dir): feed_path = feed_dir / "feed.rss" if not feed_path.exists(): continue protected.update( _referenced_media_paths( feed_dir, feed_path.read_text(encoding="utf-8", errors="replace"), media_dirs, ) ) return protected def _media_files(feeds_dir: Path, media_dirs: Sequence[str]) -> list[Path]: files: list[Path] = [] for feed_dir in _feed_dirs(feeds_dir): for media_dir_name in media_dirs: media_dir = feed_dir / media_dir_name if not media_dir.exists(): continue files.extend(path for path in media_dir.rglob("*") if path.is_file()) return sorted(files) def cleanup_media( *, feeds_dir: Path, retention_days: int = 25, now: datetime | None = None, dry_run: bool = False, output: TextIO = sys.stdout, media_dirs: Sequence[str] = DEFAULT_MEDIA_DIRS, ) -> CleanupResult: if now is None: now = datetime.now(UTC) elif now.tzinfo is None: now = now.replace(tzinfo=UTC) cutoff = now - timedelta(days=retention_days) cutoff_timestamp = cutoff.timestamp() feeds_dir = feeds_dir.resolve() media_dirs = _normalize_media_dirs(media_dirs) with media_retention_lock(out_dir=feeds_dir.parent, exclusive=True): protected = collect_protected_paths(feeds_dir, media_dirs=media_dirs) result = CleanupResult(scanned_root=feeds_dir, cutoff=cutoff, dry_run=dry_run) for path in _media_files(feeds_dir, media_dirs): try: stat = path.stat() except OSError as error: result.failures += 1 print( f"media cleanup: stat failed path={path} error={error}", file=output, ) continue if stat.st_mtime >= cutoff_timestamp: continue if path.resolve() in protected: continue result.matched_files += 1 if dry_run: continue try: path.unlink() except OSError as error: result.failures += 1 print( f"media cleanup: delete failed path={path} error={error}", file=output, ) continue result.deleted_files += 1 result.bytes_deleted += stat.st_size print( "media cleanup: " f"dry_run={_bool_text(result.dry_run)} " f"cutoff={result.cutoff.isoformat()} " f"root={result.scanned_root} " f"matched_files={result.matched_files} " f"deleted_files={result.deleted_files} " f"bytes_deleted={result.bytes_deleted} " f"failures={result.failures}", file=output, ) return result