from __future__ import annotations from datetime import UTC, datetime from pathlib import Path from repub.jobs import load_runs_view from repub.model import ( Job, JobExecution, JobExecutionStatus, create_source, initialize_database, ) def test_load_runs_view_humanizes_completed_execution_summary_bytes( tmp_path: Path, ) -> None: initialize_database(tmp_path / "jobs-completed.db") source = create_source( name="Completed source", slug="completed-source", source_type="feed", notes="", spider_arguments="", enabled=False, cron_minute="*/5", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", feed_url="https://example.com/completed.xml", ) job = Job.get(Job.source == source) JobExecution.create( job=job, running_status=JobExecutionStatus.SUCCEEDED, ended_at=datetime(2026, 3, 30, 12, 0, tzinfo=UTC), requests_count=14, items_count=11, bytes_count=16_410_269, ) view = load_runs_view( log_dir=tmp_path / "out" / "logs", now=datetime(2026, 3, 30, 12, 30, tzinfo=UTC), ) assert view["completed"][0]["stats"] == "14 requests • 11 items • 15.7 MiB" def test_load_runs_view_humanizes_running_execution_summary_bytes( tmp_path: Path, ) -> None: initialize_database(tmp_path / "jobs-running.db") source = create_source( name="Running source", slug="running-source", source_type="feed", notes="", spider_arguments="", enabled=False, cron_minute="*/5", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", feed_url="https://example.com/running.xml", ) job = Job.get(Job.source == source) JobExecution.create( job=job, running_status=JobExecutionStatus.RUNNING, started_at=datetime(2026, 3, 30, 12, 0, tzinfo=UTC), requests_count=14, items_count=11, bytes_count=1_536, ) view = load_runs_view( log_dir=tmp_path / "out" / "logs", now=datetime(2026, 3, 30, 12, 30, tzinfo=UTC), ) assert view["running"][0]["stats"] == "14 requests • 11 items • 1.5 KiB"