Prune old job executions
All checks were successful
buildbot/nix-eval Build done.
buildbot/nix-build Build done.
buildbot/nix-effects Build done.

This commit is contained in:
Abel Luck 2026-06-02 11:31:39 +02:00
parent 813f19f355
commit 710ac76192
6 changed files with 552 additions and 11 deletions

View file

@ -80,12 +80,16 @@ Operational notes:
Reordering `REPUBLISHER_IMAGE` changes canonical feed image URLs. Reordering `REPUBLISHER_IMAGE` changes canonical feed image URLs.
- Job logs and stats artifacts are written under `out/logs/`. - Job logs and stats artifacts are written under `out/logs/`.
Media cleanup: Maintenance cleanup:
- Published media can outlive the current feed when articles fall out of the - Published media can outlive the current feed when articles fall out of the
feed window. Use `cleanup-media` to delete old media files that are no longer feed window. Use `cleanup-media` to delete old media files that are no longer
referenced by the latest published `feed.rss`. referenced by the latest published `feed.rss`.
- The default retention window is 25 days. Run a dry run first: - The command also prunes completed job executions and their `out/logs/`
artifacts. Successful executions are retained for 7 days; failed or canceled
executions are retained for 90 days. Pending and running executions are not
pruned.
- The default media retention window is 25 days. Run a dry run first:
```sh ```sh
uv run repub cleanup-media --feeds-dir out/feeds --days 25 --dry-run uv run repub cleanup-media --feeds-dir out/feeds --days 25 --dry-run
@ -93,9 +97,10 @@ uv run repub cleanup-media --feeds-dir out/feeds --days 25 --dry-run
- Remove `--dry-run` to delete matching files. The command protects media - Remove `--dry-run` to delete matching files. The command protects media
referenced by the latest published feed, lists each matched file before the referenced by the latest published feed, lists each matched file before the
aggregate summary, and uses a lock to avoid racing with active crawls. aggregate summary, prunes old job execution rows and logs, and uses a lock to
avoid racing with active crawls.
- For config-driven deployments, pass the runtime config so cleanup uses the - For config-driven deployments, pass the runtime config so cleanup uses the
configured `out_dir` and media directory names: configured `out_dir`, media directory names, and job log directory:
```sh ```sh
uv run repub cleanup-media --config repub.toml --dry-run uv run repub cleanup-media --config repub.toml --dry-run

View file

@ -22,6 +22,11 @@ from repub.config import (
build_base_settings, build_base_settings,
load_config, load_config,
) )
from repub.job_retention import (
DEFAULT_SUCCESSFUL_EXECUTION_RETENTION_DAYS,
DEFAULT_UNSUCCESSFUL_EXECUTION_RETENTION_DAYS,
cleanup_job_executions,
)
from repub.web import SHUTDOWN_EVENT_KEY, create_app from repub.web import SHUTDOWN_EVENT_KEY, create_app
FeedNameFilter = crawl_module.FeedNameFilter FeedNameFilter = crawl_module.FeedNameFilter
@ -97,6 +102,11 @@ def parse_args(argv: list[str] | None = None) -> tuple[str, argparse.Namespace]:
default=None, default=None,
help="Published feeds directory to clean (default: config out_dir/feeds or out/feeds)", help="Published feeds directory to clean (default: config out_dir/feeds or out/feeds)",
) )
cleanup_parser.add_argument(
"--log-dir",
default=None,
help="Job execution log directory to clean (default: config out_dir/logs or alongside feeds)",
)
cleanup_parser.add_argument( cleanup_parser.add_argument(
"--days", "--days",
type=int, type=int,
@ -121,11 +131,12 @@ def parse_args(argv: list[str] | None = None) -> tuple[str, argparse.Namespace]:
return command, args return command, args
def _cleanup_config(args: argparse.Namespace) -> tuple[Path, tuple[str, ...]]: def _cleanup_config(args: argparse.Namespace) -> tuple[Path, Path, tuple[str, ...]]:
feeds_dir = Path(args.feeds_dir) if args.feeds_dir else Path("out/feeds") feeds_dir = Path(args.feeds_dir) if args.feeds_dir else Path("out/feeds")
log_dir = Path(args.log_dir) if args.log_dir else feeds_dir.parent / "logs"
media_dirs = DEFAULT_MEDIA_DIRS media_dirs = DEFAULT_MEDIA_DIRS
if args.config is None: if args.config is None:
return feeds_dir, media_dirs return feeds_dir, log_dir, media_dirs
config = load_config(args.config) config = load_config(args.config)
settings = build_base_settings(config) settings = build_base_settings(config)
@ -137,7 +148,9 @@ def _cleanup_config(args: argparse.Namespace) -> tuple[Path, tuple[str, ...]]:
) )
if args.feeds_dir is None: if args.feeds_dir is None:
feeds_dir = config.out_dir / "feeds" feeds_dir = config.out_dir / "feeds"
return feeds_dir, media_dirs if args.log_dir is None:
log_dir = config.out_dir / "logs"
return feeds_dir, log_dir, media_dirs
def _install_signal_handlers(stop_event: asyncio.Event) -> None: def _install_signal_handlers(stop_event: asyncio.Event) -> None:
@ -187,7 +200,7 @@ def entrypoint(argv: list[str] | None = None) -> int:
if command == "cleanup-media": if command == "cleanup-media":
try: try:
feeds_dir, media_dirs = _cleanup_config(args) feeds_dir, log_dir, media_dirs = _cleanup_config(args)
except FileNotFoundError as error: except FileNotFoundError as error:
missing_path = ( missing_path = (
Path(error.filename).expanduser() Path(error.filename).expanduser()
@ -206,7 +219,13 @@ def entrypoint(argv: list[str] | None = None) -> int:
dry_run=bool(args.dry_run), dry_run=bool(args.dry_run),
media_dirs=media_dirs, media_dirs=media_dirs,
) )
return 1 if result.failures else 0 job_result = cleanup_job_executions(
log_dir=log_dir,
successful_days=DEFAULT_SUCCESSFUL_EXECUTION_RETENTION_DAYS,
unsuccessful_days=DEFAULT_UNSUCCESSFUL_EXECUTION_RETENTION_DAYS,
dry_run=bool(args.dry_run),
)
return 1 if result.failures or job_result.failures else 0
try: try:
port = int(args.port) port = int(args.port)

220
repub/job_retention.py Normal file
View file

@ -0,0 +1,220 @@
from __future__ import annotations
import sys
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import TextIO, cast
from repub.db import get_database_connection, initialize_database
from repub.jobs import JobArtifacts
from repub.model import Job, JobExecution, JobExecutionStatus, database
DEFAULT_SUCCESSFUL_EXECUTION_RETENTION_DAYS = 7
DEFAULT_UNSUCCESSFUL_EXECUTION_RETENTION_DAYS = 90
UNSUCCESSFUL_EXECUTION_STATUSES = (
JobExecutionStatus.FAILED,
JobExecutionStatus.CANCELED,
)
@dataclass
class JobExecutionRetentionResult:
log_dir: Path
successful_cutoff: datetime
unsuccessful_cutoff: datetime
dry_run: bool
matched_executions: int = 0
deleted_executions: int = 0
matched_files: int = 0
deleted_files: int = 0
bytes_deleted: int = 0
failures: int = 0
@dataclass(frozen=True)
class _ExecutionRetentionCandidate:
execution_id: int
job_id: int
status: JobExecutionStatus
ended_at: datetime
def cleanup_job_executions(
*,
log_dir: str | Path,
successful_days: int = DEFAULT_SUCCESSFUL_EXECUTION_RETENTION_DAYS,
unsuccessful_days: int = DEFAULT_UNSUCCESSFUL_EXECUTION_RETENTION_DAYS,
now: datetime | None = None,
dry_run: bool = False,
output: TextIO = sys.stdout,
) -> JobExecutionRetentionResult:
if get_database_connection() is None:
initialize_database()
reference_time = _coerce_datetime(now or datetime.now(UTC))
successful_cutoff = reference_time - timedelta(days=successful_days)
unsuccessful_cutoff = reference_time - timedelta(days=unsuccessful_days)
resolved_log_dir = Path(log_dir).resolve()
result = JobExecutionRetentionResult(
log_dir=resolved_log_dir,
successful_cutoff=successful_cutoff,
unsuccessful_cutoff=unsuccessful_cutoff,
dry_run=dry_run,
)
candidates = _retention_candidates(
successful_cutoff=successful_cutoff,
unsuccessful_cutoff=unsuccessful_cutoff,
)
execution_ids_to_delete: list[int] = []
for candidate in candidates:
result.matched_executions += 1
print(
"job retention: matched "
f"execution_id={candidate.execution_id} "
f"job_id={candidate.job_id} "
f"status={candidate.status.name} "
f"ended_at={candidate.ended_at.isoformat()}",
file=output,
)
artifacts = JobArtifacts.for_execution(
log_dir=resolved_log_dir,
job_id=candidate.job_id,
execution_id=candidate.execution_id,
)
artifact_cleanup_succeeded = _cleanup_artifacts(
artifacts=artifacts,
result=result,
dry_run=dry_run,
output=output,
)
if dry_run or not artifact_cleanup_succeeded:
continue
execution_ids_to_delete.append(candidate.execution_id)
if execution_ids_to_delete:
with database.writer():
execution_primary_key = getattr(JobExecution, "_meta").primary_key
result.deleted_executions = (
JobExecution.delete()
.where(execution_primary_key.in_(tuple(execution_ids_to_delete)))
.execute()
)
print(
"job retention: "
f"dry_run={_bool_text(result.dry_run)} "
f"successful_cutoff={result.successful_cutoff.isoformat()} "
f"unsuccessful_cutoff={result.unsuccessful_cutoff.isoformat()} "
f"root={result.log_dir} "
f"matched_executions={result.matched_executions} "
f"deleted_executions={result.deleted_executions} "
f"matched_files={result.matched_files} "
f"deleted_files={result.deleted_files} "
f"bytes_deleted={result.bytes_deleted} "
f"failures={result.failures}",
file=output,
)
return result
def _retention_candidates(
*, successful_cutoff: datetime, unsuccessful_cutoff: datetime
) -> tuple[_ExecutionRetentionCandidate, ...]:
with database.reader():
executions = tuple(
JobExecution.select(JobExecution, Job)
.join(Job)
.where(
(
JobExecution.running_status.in_(
(
JobExecutionStatus.SUCCEEDED,
*UNSUCCESSFUL_EXECUTION_STATUSES,
)
)
)
& (JobExecution.ended_at.is_null(False))
)
)
candidates: list[_ExecutionRetentionCandidate] = []
for execution in executions:
status = JobExecutionStatus(int(execution.running_status))
ended_at = _coerce_datetime(cast(datetime | str, execution.ended_at))
if status == JobExecutionStatus.SUCCEEDED:
if ended_at >= successful_cutoff:
continue
elif status in UNSUCCESSFUL_EXECUTION_STATUSES:
if ended_at >= unsuccessful_cutoff:
continue
else:
continue
job = cast(Job, execution.job)
candidates.append(
_ExecutionRetentionCandidate(
execution_id=int(execution.get_id()),
job_id=int(job.get_id()),
status=status,
ended_at=ended_at,
)
)
return tuple(candidates)
def _cleanup_artifacts(
*,
artifacts: JobArtifacts,
result: JobExecutionRetentionResult,
dry_run: bool,
output: TextIO,
) -> bool:
succeeded = True
for path in artifacts.existing_paths():
result.matched_files += 1
try:
file_size = path.stat().st_size
except OSError as error:
result.failures += 1
succeeded = False
print(
f"job retention: stat failed path={path} error={error}",
file=output,
)
continue
if dry_run:
continue
try:
path.unlink()
except FileNotFoundError:
continue
except OSError as error:
result.failures += 1
succeeded = False
print(
f"job retention: delete failed path={path} error={error}",
file=output,
)
continue
result.deleted_files += 1
result.bytes_deleted += file_size
return succeeded
def _coerce_datetime(value: datetime | str) -> datetime:
if isinstance(value, datetime):
if value.tzinfo is None:
return value.replace(tzinfo=UTC)
return value.astimezone(UTC)
parsed = datetime.fromisoformat(value)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC)
def _bool_text(value: bool) -> str:
return "true" if value else "false"

View file

@ -50,6 +50,16 @@ class JobArtifacts:
stats_path=log_dir / f"{prefix}.jsonl", stats_path=log_dir / f"{prefix}.jsonl",
) )
def existing_paths(self) -> tuple[Path, ...]:
prefix = self.log_path.with_suffix("").name
return tuple(
sorted(
path
for path in self.log_path.parent.glob(f"{prefix}.*")
if path.is_file()
)
)
@dataclass @dataclass
class RunningWorker: class RunningWorker:
@ -793,8 +803,12 @@ def clear_completed_executions(*, log_dir: str | Path) -> int:
for execution in completed_executions: for execution in completed_executions:
job = cast(Job, execution.job) job = cast(Job, execution.job)
prefix = f"job-{_job_id(job)}-execution-{_execution_id(execution)}" artifacts = JobArtifacts.for_execution(
for artifact_path in resolved_log_dir.glob(f"{prefix}.*"): log_dir=resolved_log_dir,
job_id=_job_id(job),
execution_id=_execution_id(execution),
)
for artifact_path in artifacts.existing_paths():
artifact_path.unlink(missing_ok=True) artifact_path.unlink(missing_ok=True)
execution_ids = tuple( execution_ids = tuple(

View file

@ -67,6 +67,7 @@ def test_parse_args_supports_cleanup_media_defaults() -> None:
assert command == "cleanup-media" assert command == "cleanup-media"
assert args.config is None assert args.config is None
assert args.feeds_dir is None assert args.feeds_dir is None
assert args.log_dir is None
assert args.days == 25 assert args.days == 25
assert args.dry_run is False assert args.dry_run is False
@ -84,7 +85,24 @@ def test_entrypoint_runs_cleanup_media(monkeypatch, tmp_path) -> None:
recorded["media_dirs"] = media_dirs recorded["media_dirs"] = media_dirs
return FakeResult() return FakeResult()
def fake_cleanup_job_executions(
*,
log_dir,
successful_days,
unsuccessful_days,
dry_run,
):
recorded["log_dir"] = log_dir
recorded["successful_days"] = successful_days
recorded["unsuccessful_days"] = unsuccessful_days
recorded["job_dry_run"] = dry_run
return FakeResult()
monkeypatch.setattr("repub.entrypoint.cleanup_media", fake_cleanup_media) monkeypatch.setattr("repub.entrypoint.cleanup_media", fake_cleanup_media)
monkeypatch.setattr(
"repub.entrypoint.cleanup_job_executions",
fake_cleanup_job_executions,
)
exit_code = entrypoint( exit_code = entrypoint(
[ [
@ -103,6 +121,10 @@ def test_entrypoint_runs_cleanup_media(monkeypatch, tmp_path) -> None:
"retention_days": 10, "retention_days": 10,
"dry_run": True, "dry_run": True,
"media_dirs": ("images", "audio", "video", "files"), "media_dirs": ("images", "audio", "video", "files"),
"log_dir": tmp_path / "logs",
"successful_days": 7,
"unsuccessful_days": 90,
"job_dry_run": True,
} }
@ -139,7 +161,24 @@ REPUBLISHER_FILE_DIR = "files-custom"
recorded["media_dirs"] = media_dirs recorded["media_dirs"] = media_dirs
return FakeResult() return FakeResult()
def fake_cleanup_job_executions(
*,
log_dir,
successful_days,
unsuccessful_days,
dry_run,
):
recorded["log_dir"] = log_dir
recorded["successful_days"] = successful_days
recorded["unsuccessful_days"] = unsuccessful_days
recorded["job_dry_run"] = dry_run
return FakeResult()
monkeypatch.setattr("repub.entrypoint.cleanup_media", fake_cleanup_media) monkeypatch.setattr("repub.entrypoint.cleanup_media", fake_cleanup_media)
monkeypatch.setattr(
"repub.entrypoint.cleanup_job_executions",
fake_cleanup_job_executions,
)
exit_code = entrypoint(["cleanup-media", "--config", str(config_path)]) exit_code = entrypoint(["cleanup-media", "--config", str(config_path)])
@ -154,6 +193,61 @@ REPUBLISHER_FILE_DIR = "files-custom"
"videos-custom", "videos-custom",
"files-custom", "files-custom",
), ),
"log_dir": tmp_path / "mirror" / "logs",
"successful_days": 7,
"unsuccessful_days": 90,
"job_dry_run": False,
}
def test_entrypoint_cleanup_media_accepts_log_dir_override(
monkeypatch, tmp_path
) -> None:
recorded: dict[str, object] = {}
class FakeResult:
failures = 0
def fake_cleanup_media(*, feeds_dir, retention_days, dry_run, media_dirs):
recorded["feeds_dir"] = feeds_dir
return FakeResult()
def fake_cleanup_job_executions(
*,
log_dir,
successful_days,
unsuccessful_days,
dry_run,
):
recorded["log_dir"] = log_dir
recorded["successful_days"] = successful_days
recorded["unsuccessful_days"] = unsuccessful_days
recorded["dry_run"] = dry_run
return FakeResult()
monkeypatch.setattr("repub.entrypoint.cleanup_media", fake_cleanup_media)
monkeypatch.setattr(
"repub.entrypoint.cleanup_job_executions",
fake_cleanup_job_executions,
)
exit_code = entrypoint(
[
"cleanup-media",
"--feeds-dir",
str(tmp_path / "feeds"),
"--log-dir",
str(tmp_path / "custom-logs"),
]
)
assert exit_code == 0
assert recorded == {
"feeds_dir": tmp_path / "feeds",
"log_dir": tmp_path / "custom-logs",
"successful_days": 7,
"unsuccessful_days": 90,
"dry_run": False,
} }

189
tests/test_job_retention.py Normal file
View file

@ -0,0 +1,189 @@
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from pathlib import Path
from repub.job_retention import cleanup_job_executions
from repub.jobs import JobArtifacts
from repub.model import (
Job,
JobExecution,
JobExecutionStatus,
create_source,
database,
initialize_database,
)
NOW = datetime(2026, 6, 2, 12, 0, tzinfo=UTC)
def _create_job(name: str) -> Job:
source = create_source(
name=name,
slug=name.lower().replace(" ", "-"),
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url=f"https://example.com/{name.lower().replace(' ', '-')}.xml",
)
with database.reader():
return Job.get(Job.source == source)
def _create_execution(
job: Job,
*,
status: JobExecutionStatus,
ended_at: datetime | None = None,
) -> JobExecution:
with database.writer():
return JobExecution.create(
job=job,
running_status=status,
started_at=ended_at - timedelta(minutes=5) if ended_at else None,
ended_at=ended_at,
)
def _write_artifacts(
log_dir: Path, job: Job, execution: JobExecution
) -> tuple[Path, ...]:
artifacts = JobArtifacts.for_execution(
log_dir=log_dir,
job_id=int(job.get_id()),
execution_id=int(execution.get_id()),
)
artifacts.log_path.parent.mkdir(parents=True, exist_ok=True)
paths = (
artifacts.log_path,
artifacts.stats_path,
artifacts.log_path.with_suffix(".pygea.log"),
)
for path in paths:
path.write_text(f"artifact {path.name}", encoding="utf-8")
return paths
def _execution_exists(execution: JobExecution) -> bool:
with database.reader():
return JobExecution.get_or_none(id=int(execution.get_id())) is not None
def test_cleanup_job_executions_prunes_old_completed_rows_and_artifacts(
tmp_path: Path,
) -> None:
initialize_database(tmp_path / "job-retention.db")
log_dir = tmp_path / "out" / "logs"
job = _create_job("Retention source")
old_success = _create_execution(
job,
status=JobExecutionStatus.SUCCEEDED,
ended_at=NOW - timedelta(days=8),
)
fresh_success = _create_execution(
job,
status=JobExecutionStatus.SUCCEEDED,
ended_at=NOW - timedelta(days=6, hours=23),
)
old_failed = _create_execution(
job,
status=JobExecutionStatus.FAILED,
ended_at=NOW - timedelta(days=91),
)
fresh_failed = _create_execution(
job,
status=JobExecutionStatus.FAILED,
ended_at=NOW - timedelta(days=89),
)
old_canceled = _create_execution(
job,
status=JobExecutionStatus.CANCELED,
ended_at=NOW - timedelta(days=91),
)
old_running = _create_execution(
job,
status=JobExecutionStatus.RUNNING,
ended_at=None,
)
old_pending = _create_execution(
job,
status=JobExecutionStatus.PENDING,
ended_at=None,
)
pruned_paths = tuple(
path
for execution in (old_success, old_failed, old_canceled)
for path in _write_artifacts(log_dir, job, execution)
)
kept_paths = tuple(
path
for execution in (fresh_success, fresh_failed, old_running, old_pending)
for path in _write_artifacts(log_dir, job, execution)
)
result = cleanup_job_executions(log_dir=log_dir, now=NOW)
assert result.matched_executions == 3
assert result.deleted_executions == 3
assert result.matched_files == len(pruned_paths)
assert result.deleted_files == len(pruned_paths)
assert result.failures == 0
assert not _execution_exists(old_success)
assert _execution_exists(fresh_success)
assert not _execution_exists(old_failed)
assert _execution_exists(fresh_failed)
assert not _execution_exists(old_canceled)
assert _execution_exists(old_running)
assert _execution_exists(old_pending)
assert all(not path.exists() for path in pruned_paths)
assert all(path.exists() for path in kept_paths)
def test_cleanup_job_executions_dry_run_leaves_rows_and_artifacts(
tmp_path: Path,
) -> None:
initialize_database(tmp_path / "job-retention-dry-run.db")
log_dir = tmp_path / "out" / "logs"
job = _create_job("Dry run source")
execution = _create_execution(
job,
status=JobExecutionStatus.SUCCEEDED,
ended_at=NOW - timedelta(days=8),
)
paths = _write_artifacts(log_dir, job, execution)
result = cleanup_job_executions(log_dir=log_dir, now=NOW, dry_run=True)
assert result.matched_executions == 1
assert result.deleted_executions == 0
assert result.matched_files == len(paths)
assert result.deleted_files == 0
assert _execution_exists(execution)
assert all(path.exists() for path in paths)
def test_cleanup_job_executions_prunes_rows_when_artifacts_are_missing(
tmp_path: Path,
) -> None:
initialize_database(tmp_path / "job-retention-missing-artifacts.db")
job = _create_job("Missing artifacts source")
execution = _create_execution(
job,
status=JobExecutionStatus.FAILED,
ended_at=NOW - timedelta(days=91),
)
result = cleanup_job_executions(log_dir=tmp_path / "out" / "logs", now=NOW)
assert result.matched_executions == 1
assert result.deleted_executions == 1
assert result.matched_files == 0
assert result.deleted_files == 0
assert result.failures == 0
assert not _execution_exists(execution)