From 710ac7619246ed4f871e37c6d5294d82687aa1e8 Mon Sep 17 00:00:00 2001 From: Abel Luck Date: Tue, 2 Jun 2026 11:31:39 +0200 Subject: [PATCH] Prune old job executions --- README.md | 13 ++- repub/entrypoint.py | 29 ++++- repub/job_retention.py | 220 ++++++++++++++++++++++++++++++++++++ repub/jobs.py | 18 ++- tests/test_entrypoint.py | 94 +++++++++++++++ tests/test_job_retention.py | 189 +++++++++++++++++++++++++++++++ 6 files changed, 552 insertions(+), 11 deletions(-) create mode 100644 repub/job_retention.py create mode 100644 tests/test_job_retention.py diff --git a/README.md b/README.md index 6285155..5c04696 100644 --- a/README.md +++ b/README.md @@ -80,12 +80,16 @@ Operational notes: Reordering `REPUBLISHER_IMAGE` changes canonical feed image URLs. - Job logs and stats artifacts are written under `out/logs/`. -Media cleanup: +Maintenance cleanup: - Published media can outlive the current feed when articles fall out of the feed window. Use `cleanup-media` to delete old media files that are no longer referenced by the latest published `feed.rss`. -- The default retention window is 25 days. Run a dry run first: +- The command also prunes completed job executions and their `out/logs/` + artifacts. Successful executions are retained for 7 days; failed or canceled + executions are retained for 90 days. Pending and running executions are not + pruned. +- The default media retention window is 25 days. Run a dry run first: ```sh uv run repub cleanup-media --feeds-dir out/feeds --days 25 --dry-run @@ -93,9 +97,10 @@ uv run repub cleanup-media --feeds-dir out/feeds --days 25 --dry-run - Remove `--dry-run` to delete matching files. The command protects media referenced by the latest published feed, lists each matched file before the - aggregate summary, and uses a lock to avoid racing with active crawls. + aggregate summary, prunes old job execution rows and logs, and uses a lock to + avoid racing with active crawls. - For config-driven deployments, pass the runtime config so cleanup uses the - configured `out_dir` and media directory names: + configured `out_dir`, media directory names, and job log directory: ```sh uv run repub cleanup-media --config repub.toml --dry-run diff --git a/repub/entrypoint.py b/repub/entrypoint.py index 6bf9749..51e08b5 100644 --- a/repub/entrypoint.py +++ b/repub/entrypoint.py @@ -22,6 +22,11 @@ from repub.config import ( build_base_settings, load_config, ) +from repub.job_retention import ( + DEFAULT_SUCCESSFUL_EXECUTION_RETENTION_DAYS, + DEFAULT_UNSUCCESSFUL_EXECUTION_RETENTION_DAYS, + cleanup_job_executions, +) from repub.web import SHUTDOWN_EVENT_KEY, create_app FeedNameFilter = crawl_module.FeedNameFilter @@ -97,6 +102,11 @@ def parse_args(argv: list[str] | None = None) -> tuple[str, argparse.Namespace]: default=None, help="Published feeds directory to clean (default: config out_dir/feeds or out/feeds)", ) + cleanup_parser.add_argument( + "--log-dir", + default=None, + help="Job execution log directory to clean (default: config out_dir/logs or alongside feeds)", + ) cleanup_parser.add_argument( "--days", type=int, @@ -121,11 +131,12 @@ def parse_args(argv: list[str] | None = None) -> tuple[str, argparse.Namespace]: return command, args -def _cleanup_config(args: argparse.Namespace) -> tuple[Path, tuple[str, ...]]: +def _cleanup_config(args: argparse.Namespace) -> tuple[Path, Path, tuple[str, ...]]: feeds_dir = Path(args.feeds_dir) if args.feeds_dir else Path("out/feeds") + log_dir = Path(args.log_dir) if args.log_dir else feeds_dir.parent / "logs" media_dirs = DEFAULT_MEDIA_DIRS if args.config is None: - return feeds_dir, media_dirs + return feeds_dir, log_dir, media_dirs config = load_config(args.config) settings = build_base_settings(config) @@ -137,7 +148,9 @@ def _cleanup_config(args: argparse.Namespace) -> tuple[Path, tuple[str, ...]]: ) if args.feeds_dir is None: feeds_dir = config.out_dir / "feeds" - return feeds_dir, media_dirs + if args.log_dir is None: + log_dir = config.out_dir / "logs" + return feeds_dir, log_dir, media_dirs def _install_signal_handlers(stop_event: asyncio.Event) -> None: @@ -187,7 +200,7 @@ def entrypoint(argv: list[str] | None = None) -> int: if command == "cleanup-media": try: - feeds_dir, media_dirs = _cleanup_config(args) + feeds_dir, log_dir, media_dirs = _cleanup_config(args) except FileNotFoundError as error: missing_path = ( Path(error.filename).expanduser() @@ -206,7 +219,13 @@ def entrypoint(argv: list[str] | None = None) -> int: dry_run=bool(args.dry_run), media_dirs=media_dirs, ) - return 1 if result.failures else 0 + job_result = cleanup_job_executions( + log_dir=log_dir, + successful_days=DEFAULT_SUCCESSFUL_EXECUTION_RETENTION_DAYS, + unsuccessful_days=DEFAULT_UNSUCCESSFUL_EXECUTION_RETENTION_DAYS, + dry_run=bool(args.dry_run), + ) + return 1 if result.failures or job_result.failures else 0 try: port = int(args.port) diff --git a/repub/job_retention.py b/repub/job_retention.py new file mode 100644 index 0000000..12ac5dc --- /dev/null +++ b/repub/job_retention.py @@ -0,0 +1,220 @@ +from __future__ import annotations + +import sys +from dataclasses import dataclass +from datetime import UTC, datetime, timedelta +from pathlib import Path +from typing import TextIO, cast + +from repub.db import get_database_connection, initialize_database +from repub.jobs import JobArtifacts +from repub.model import Job, JobExecution, JobExecutionStatus, database + +DEFAULT_SUCCESSFUL_EXECUTION_RETENTION_DAYS = 7 +DEFAULT_UNSUCCESSFUL_EXECUTION_RETENTION_DAYS = 90 +UNSUCCESSFUL_EXECUTION_STATUSES = ( + JobExecutionStatus.FAILED, + JobExecutionStatus.CANCELED, +) + + +@dataclass +class JobExecutionRetentionResult: + log_dir: Path + successful_cutoff: datetime + unsuccessful_cutoff: datetime + dry_run: bool + matched_executions: int = 0 + deleted_executions: int = 0 + matched_files: int = 0 + deleted_files: int = 0 + bytes_deleted: int = 0 + failures: int = 0 + + +@dataclass(frozen=True) +class _ExecutionRetentionCandidate: + execution_id: int + job_id: int + status: JobExecutionStatus + ended_at: datetime + + +def cleanup_job_executions( + *, + log_dir: str | Path, + successful_days: int = DEFAULT_SUCCESSFUL_EXECUTION_RETENTION_DAYS, + unsuccessful_days: int = DEFAULT_UNSUCCESSFUL_EXECUTION_RETENTION_DAYS, + now: datetime | None = None, + dry_run: bool = False, + output: TextIO = sys.stdout, +) -> JobExecutionRetentionResult: + if get_database_connection() is None: + initialize_database() + reference_time = _coerce_datetime(now or datetime.now(UTC)) + successful_cutoff = reference_time - timedelta(days=successful_days) + unsuccessful_cutoff = reference_time - timedelta(days=unsuccessful_days) + resolved_log_dir = Path(log_dir).resolve() + result = JobExecutionRetentionResult( + log_dir=resolved_log_dir, + successful_cutoff=successful_cutoff, + unsuccessful_cutoff=unsuccessful_cutoff, + dry_run=dry_run, + ) + + candidates = _retention_candidates( + successful_cutoff=successful_cutoff, + unsuccessful_cutoff=unsuccessful_cutoff, + ) + execution_ids_to_delete: list[int] = [] + for candidate in candidates: + result.matched_executions += 1 + print( + "job retention: matched " + f"execution_id={candidate.execution_id} " + f"job_id={candidate.job_id} " + f"status={candidate.status.name} " + f"ended_at={candidate.ended_at.isoformat()}", + file=output, + ) + artifacts = JobArtifacts.for_execution( + log_dir=resolved_log_dir, + job_id=candidate.job_id, + execution_id=candidate.execution_id, + ) + artifact_cleanup_succeeded = _cleanup_artifacts( + artifacts=artifacts, + result=result, + dry_run=dry_run, + output=output, + ) + if dry_run or not artifact_cleanup_succeeded: + continue + execution_ids_to_delete.append(candidate.execution_id) + + if execution_ids_to_delete: + with database.writer(): + execution_primary_key = getattr(JobExecution, "_meta").primary_key + result.deleted_executions = ( + JobExecution.delete() + .where(execution_primary_key.in_(tuple(execution_ids_to_delete))) + .execute() + ) + + print( + "job retention: " + f"dry_run={_bool_text(result.dry_run)} " + f"successful_cutoff={result.successful_cutoff.isoformat()} " + f"unsuccessful_cutoff={result.unsuccessful_cutoff.isoformat()} " + f"root={result.log_dir} " + f"matched_executions={result.matched_executions} " + f"deleted_executions={result.deleted_executions} " + f"matched_files={result.matched_files} " + f"deleted_files={result.deleted_files} " + f"bytes_deleted={result.bytes_deleted} " + f"failures={result.failures}", + file=output, + ) + return result + + +def _retention_candidates( + *, successful_cutoff: datetime, unsuccessful_cutoff: datetime +) -> tuple[_ExecutionRetentionCandidate, ...]: + with database.reader(): + executions = tuple( + JobExecution.select(JobExecution, Job) + .join(Job) + .where( + ( + JobExecution.running_status.in_( + ( + JobExecutionStatus.SUCCEEDED, + *UNSUCCESSFUL_EXECUTION_STATUSES, + ) + ) + ) + & (JobExecution.ended_at.is_null(False)) + ) + ) + + candidates: list[_ExecutionRetentionCandidate] = [] + for execution in executions: + status = JobExecutionStatus(int(execution.running_status)) + ended_at = _coerce_datetime(cast(datetime | str, execution.ended_at)) + if status == JobExecutionStatus.SUCCEEDED: + if ended_at >= successful_cutoff: + continue + elif status in UNSUCCESSFUL_EXECUTION_STATUSES: + if ended_at >= unsuccessful_cutoff: + continue + else: + continue + + job = cast(Job, execution.job) + candidates.append( + _ExecutionRetentionCandidate( + execution_id=int(execution.get_id()), + job_id=int(job.get_id()), + status=status, + ended_at=ended_at, + ) + ) + return tuple(candidates) + + +def _cleanup_artifacts( + *, + artifacts: JobArtifacts, + result: JobExecutionRetentionResult, + dry_run: bool, + output: TextIO, +) -> bool: + succeeded = True + for path in artifacts.existing_paths(): + result.matched_files += 1 + try: + file_size = path.stat().st_size + except OSError as error: + result.failures += 1 + succeeded = False + print( + f"job retention: stat failed path={path} error={error}", + file=output, + ) + continue + + if dry_run: + continue + + try: + path.unlink() + except FileNotFoundError: + continue + except OSError as error: + result.failures += 1 + succeeded = False + print( + f"job retention: delete failed path={path} error={error}", + file=output, + ) + continue + result.deleted_files += 1 + result.bytes_deleted += file_size + return succeeded + + +def _coerce_datetime(value: datetime | str) -> datetime: + if isinstance(value, datetime): + if value.tzinfo is None: + return value.replace(tzinfo=UTC) + return value.astimezone(UTC) + + parsed = datetime.fromisoformat(value) + if parsed.tzinfo is None: + return parsed.replace(tzinfo=UTC) + return parsed.astimezone(UTC) + + +def _bool_text(value: bool) -> str: + return "true" if value else "false" diff --git a/repub/jobs.py b/repub/jobs.py index ef24965..842e5c7 100644 --- a/repub/jobs.py +++ b/repub/jobs.py @@ -50,6 +50,16 @@ class JobArtifacts: stats_path=log_dir / f"{prefix}.jsonl", ) + def existing_paths(self) -> tuple[Path, ...]: + prefix = self.log_path.with_suffix("").name + return tuple( + sorted( + path + for path in self.log_path.parent.glob(f"{prefix}.*") + if path.is_file() + ) + ) + @dataclass class RunningWorker: @@ -793,8 +803,12 @@ def clear_completed_executions(*, log_dir: str | Path) -> int: for execution in completed_executions: job = cast(Job, execution.job) - prefix = f"job-{_job_id(job)}-execution-{_execution_id(execution)}" - for artifact_path in resolved_log_dir.glob(f"{prefix}.*"): + artifacts = JobArtifacts.for_execution( + log_dir=resolved_log_dir, + job_id=_job_id(job), + execution_id=_execution_id(execution), + ) + for artifact_path in artifacts.existing_paths(): artifact_path.unlink(missing_ok=True) execution_ids = tuple( diff --git a/tests/test_entrypoint.py b/tests/test_entrypoint.py index 0eb030d..69913b9 100644 --- a/tests/test_entrypoint.py +++ b/tests/test_entrypoint.py @@ -67,6 +67,7 @@ def test_parse_args_supports_cleanup_media_defaults() -> None: assert command == "cleanup-media" assert args.config is None assert args.feeds_dir is None + assert args.log_dir is None assert args.days == 25 assert args.dry_run is False @@ -84,7 +85,24 @@ def test_entrypoint_runs_cleanup_media(monkeypatch, tmp_path) -> None: recorded["media_dirs"] = media_dirs return FakeResult() + def fake_cleanup_job_executions( + *, + log_dir, + successful_days, + unsuccessful_days, + dry_run, + ): + recorded["log_dir"] = log_dir + recorded["successful_days"] = successful_days + recorded["unsuccessful_days"] = unsuccessful_days + recorded["job_dry_run"] = dry_run + return FakeResult() + monkeypatch.setattr("repub.entrypoint.cleanup_media", fake_cleanup_media) + monkeypatch.setattr( + "repub.entrypoint.cleanup_job_executions", + fake_cleanup_job_executions, + ) exit_code = entrypoint( [ @@ -103,6 +121,10 @@ def test_entrypoint_runs_cleanup_media(monkeypatch, tmp_path) -> None: "retention_days": 10, "dry_run": True, "media_dirs": ("images", "audio", "video", "files"), + "log_dir": tmp_path / "logs", + "successful_days": 7, + "unsuccessful_days": 90, + "job_dry_run": True, } @@ -139,7 +161,24 @@ REPUBLISHER_FILE_DIR = "files-custom" recorded["media_dirs"] = media_dirs return FakeResult() + def fake_cleanup_job_executions( + *, + log_dir, + successful_days, + unsuccessful_days, + dry_run, + ): + recorded["log_dir"] = log_dir + recorded["successful_days"] = successful_days + recorded["unsuccessful_days"] = unsuccessful_days + recorded["job_dry_run"] = dry_run + return FakeResult() + monkeypatch.setattr("repub.entrypoint.cleanup_media", fake_cleanup_media) + monkeypatch.setattr( + "repub.entrypoint.cleanup_job_executions", + fake_cleanup_job_executions, + ) exit_code = entrypoint(["cleanup-media", "--config", str(config_path)]) @@ -154,6 +193,61 @@ REPUBLISHER_FILE_DIR = "files-custom" "videos-custom", "files-custom", ), + "log_dir": tmp_path / "mirror" / "logs", + "successful_days": 7, + "unsuccessful_days": 90, + "job_dry_run": False, + } + + +def test_entrypoint_cleanup_media_accepts_log_dir_override( + monkeypatch, tmp_path +) -> None: + recorded: dict[str, object] = {} + + class FakeResult: + failures = 0 + + def fake_cleanup_media(*, feeds_dir, retention_days, dry_run, media_dirs): + recorded["feeds_dir"] = feeds_dir + return FakeResult() + + def fake_cleanup_job_executions( + *, + log_dir, + successful_days, + unsuccessful_days, + dry_run, + ): + recorded["log_dir"] = log_dir + recorded["successful_days"] = successful_days + recorded["unsuccessful_days"] = unsuccessful_days + recorded["dry_run"] = dry_run + return FakeResult() + + monkeypatch.setattr("repub.entrypoint.cleanup_media", fake_cleanup_media) + monkeypatch.setattr( + "repub.entrypoint.cleanup_job_executions", + fake_cleanup_job_executions, + ) + + exit_code = entrypoint( + [ + "cleanup-media", + "--feeds-dir", + str(tmp_path / "feeds"), + "--log-dir", + str(tmp_path / "custom-logs"), + ] + ) + + assert exit_code == 0 + assert recorded == { + "feeds_dir": tmp_path / "feeds", + "log_dir": tmp_path / "custom-logs", + "successful_days": 7, + "unsuccessful_days": 90, + "dry_run": False, } diff --git a/tests/test_job_retention.py b/tests/test_job_retention.py new file mode 100644 index 0000000..0c21250 --- /dev/null +++ b/tests/test_job_retention.py @@ -0,0 +1,189 @@ +from __future__ import annotations + +from datetime import UTC, datetime, timedelta +from pathlib import Path + +from repub.job_retention import cleanup_job_executions +from repub.jobs import JobArtifacts +from repub.model import ( + Job, + JobExecution, + JobExecutionStatus, + create_source, + database, + initialize_database, +) + +NOW = datetime(2026, 6, 2, 12, 0, tzinfo=UTC) + + +def _create_job(name: str) -> Job: + source = create_source( + name=name, + slug=name.lower().replace(" ", "-"), + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url=f"https://example.com/{name.lower().replace(' ', '-')}.xml", + ) + with database.reader(): + return Job.get(Job.source == source) + + +def _create_execution( + job: Job, + *, + status: JobExecutionStatus, + ended_at: datetime | None = None, +) -> JobExecution: + with database.writer(): + return JobExecution.create( + job=job, + running_status=status, + started_at=ended_at - timedelta(minutes=5) if ended_at else None, + ended_at=ended_at, + ) + + +def _write_artifacts( + log_dir: Path, job: Job, execution: JobExecution +) -> tuple[Path, ...]: + artifacts = JobArtifacts.for_execution( + log_dir=log_dir, + job_id=int(job.get_id()), + execution_id=int(execution.get_id()), + ) + artifacts.log_path.parent.mkdir(parents=True, exist_ok=True) + paths = ( + artifacts.log_path, + artifacts.stats_path, + artifacts.log_path.with_suffix(".pygea.log"), + ) + for path in paths: + path.write_text(f"artifact {path.name}", encoding="utf-8") + return paths + + +def _execution_exists(execution: JobExecution) -> bool: + with database.reader(): + return JobExecution.get_or_none(id=int(execution.get_id())) is not None + + +def test_cleanup_job_executions_prunes_old_completed_rows_and_artifacts( + tmp_path: Path, +) -> None: + initialize_database(tmp_path / "job-retention.db") + log_dir = tmp_path / "out" / "logs" + job = _create_job("Retention source") + old_success = _create_execution( + job, + status=JobExecutionStatus.SUCCEEDED, + ended_at=NOW - timedelta(days=8), + ) + fresh_success = _create_execution( + job, + status=JobExecutionStatus.SUCCEEDED, + ended_at=NOW - timedelta(days=6, hours=23), + ) + old_failed = _create_execution( + job, + status=JobExecutionStatus.FAILED, + ended_at=NOW - timedelta(days=91), + ) + fresh_failed = _create_execution( + job, + status=JobExecutionStatus.FAILED, + ended_at=NOW - timedelta(days=89), + ) + old_canceled = _create_execution( + job, + status=JobExecutionStatus.CANCELED, + ended_at=NOW - timedelta(days=91), + ) + old_running = _create_execution( + job, + status=JobExecutionStatus.RUNNING, + ended_at=None, + ) + old_pending = _create_execution( + job, + status=JobExecutionStatus.PENDING, + ended_at=None, + ) + pruned_paths = tuple( + path + for execution in (old_success, old_failed, old_canceled) + for path in _write_artifacts(log_dir, job, execution) + ) + kept_paths = tuple( + path + for execution in (fresh_success, fresh_failed, old_running, old_pending) + for path in _write_artifacts(log_dir, job, execution) + ) + + result = cleanup_job_executions(log_dir=log_dir, now=NOW) + + assert result.matched_executions == 3 + assert result.deleted_executions == 3 + assert result.matched_files == len(pruned_paths) + assert result.deleted_files == len(pruned_paths) + assert result.failures == 0 + assert not _execution_exists(old_success) + assert _execution_exists(fresh_success) + assert not _execution_exists(old_failed) + assert _execution_exists(fresh_failed) + assert not _execution_exists(old_canceled) + assert _execution_exists(old_running) + assert _execution_exists(old_pending) + assert all(not path.exists() for path in pruned_paths) + assert all(path.exists() for path in kept_paths) + + +def test_cleanup_job_executions_dry_run_leaves_rows_and_artifacts( + tmp_path: Path, +) -> None: + initialize_database(tmp_path / "job-retention-dry-run.db") + log_dir = tmp_path / "out" / "logs" + job = _create_job("Dry run source") + execution = _create_execution( + job, + status=JobExecutionStatus.SUCCEEDED, + ended_at=NOW - timedelta(days=8), + ) + paths = _write_artifacts(log_dir, job, execution) + + result = cleanup_job_executions(log_dir=log_dir, now=NOW, dry_run=True) + + assert result.matched_executions == 1 + assert result.deleted_executions == 0 + assert result.matched_files == len(paths) + assert result.deleted_files == 0 + assert _execution_exists(execution) + assert all(path.exists() for path in paths) + + +def test_cleanup_job_executions_prunes_rows_when_artifacts_are_missing( + tmp_path: Path, +) -> None: + initialize_database(tmp_path / "job-retention-missing-artifacts.db") + job = _create_job("Missing artifacts source") + execution = _create_execution( + job, + status=JobExecutionStatus.FAILED, + ended_at=NOW - timedelta(days=91), + ) + + result = cleanup_job_executions(log_dir=tmp_path / "out" / "logs", now=NOW) + + assert result.matched_executions == 1 + assert result.deleted_executions == 1 + assert result.matched_files == 0 + assert result.deleted_files == 0 + assert result.failures == 0 + assert not _execution_exists(execution)