from __future__ import annotations import asyncio import json import time from pathlib import Path from repub.jobs import JobArtifacts, JobRuntime from repub.model import ( Job, JobExecution, JobExecutionStatus, Source, create_source, initialize_database, ) from repub.web import create_app, get_job_runtime, render_execution_logs, render_runs def test_job_runtime_syncs_enabled_jobs_into_apscheduler(tmp_path: Path) -> None: initialize_database(tmp_path / "scheduler.db") enabled_source = create_source( name="Enabled source", slug="enabled-source", source_type="feed", notes="", spider_arguments="", enabled=True, cron_minute="*/5", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", feed_url="https://example.com/enabled.xml", ) disabled_source = create_source( name="Disabled source", slug="disabled-source", source_type="feed", notes="", spider_arguments="", enabled=False, cron_minute="15", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", feed_url="https://example.com/disabled.xml", ) enabled_job = Job.get(Job.source == enabled_source) disabled_job = Job.get(Job.source == disabled_source) runtime = JobRuntime( log_dir=tmp_path / "out" / "logs", worker_duration_seconds=0.4, worker_stats_interval_seconds=0.05, worker_failure_probability=0.0, ) try: runtime.start() runtime.sync_jobs() scheduled_ids = {job.id for job in runtime.scheduler.get_jobs()} assert f"job-{enabled_job.id}" in scheduled_ids assert f"job-{disabled_job.id}" not in scheduled_ids enabled_job.enabled = False enabled_job.save() runtime.sync_jobs() scheduled_ids = {job.id for job in runtime.scheduler.get_jobs()} assert f"job-{enabled_job.id}" not in scheduled_ids finally: runtime.shutdown() def test_job_runtime_run_now_writes_log_and_stats_and_marks_success( tmp_path: Path, ) -> None: initialize_database(tmp_path / "run-now.db") source = create_source( name="Manual source", slug="manual-source", source_type="feed", notes="", spider_arguments="", enabled=False, cron_minute="*/5", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", feed_url="https://example.com/manual.xml", ) job = Job.get(Job.source == source) runtime = JobRuntime( log_dir=tmp_path / "out" / "logs", worker_duration_seconds=0.35, worker_stats_interval_seconds=0.05, worker_failure_probability=0.0, ) try: runtime.start() execution_id = runtime.run_job_now(job.id, reason="manual") assert execution_id is not None execution = _wait_for_terminal_execution(execution_id) artifacts = JobArtifacts.for_execution( log_dir=tmp_path / "out" / "logs", job_id=job.id, execution_id=execution_id, ) assert execution.running_status == JobExecutionStatus.SUCCEEDED assert execution.started_at is not None assert execution.ended_at is not None assert execution.requests_count > 0 assert execution.items_count > 0 assert execution.bytes_count > 0 assert artifacts.log_path.exists() assert artifacts.stats_path.exists() assert "starting simulated crawl" in artifacts.log_path.read_text( encoding="utf-8" ) stats_lines = [ json.loads(line) for line in artifacts.stats_path.read_text(encoding="utf-8").splitlines() ] assert len(stats_lines) >= 2 assert stats_lines[-1]["requests_count"] == execution.requests_count finally: runtime.shutdown() def test_job_runtime_cancel_marks_execution_canceled(tmp_path: Path) -> None: initialize_database(tmp_path / "cancel.db") source = create_source( name="Cancelable source", slug="cancelable-source", source_type="feed", notes="", spider_arguments="", enabled=False, cron_minute="*/5", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", feed_url="https://example.com/cancelable.xml", ) job = Job.get(Job.source == source) runtime = JobRuntime( log_dir=tmp_path / "out" / "logs", worker_duration_seconds=2.0, worker_stats_interval_seconds=0.1, worker_failure_probability=0.0, ) try: runtime.start() execution_id = runtime.run_job_now(job.id, reason="manual") assert execution_id is not None _wait_for_running_execution(execution_id) runtime.request_execution_cancel(execution_id) execution = _wait_for_terminal_execution(execution_id) artifacts = JobArtifacts.for_execution( log_dir=tmp_path / "out" / "logs", job_id=job.id, execution_id=execution_id, ) assert execution.running_status == JobExecutionStatus.CANCELED assert execution.ended_at is not None assert execution.stop_requested_at is not None assert "graceful stop requested" in artifacts.log_path.read_text( encoding="utf-8" ) finally: runtime.shutdown() def test_render_runs_uses_database_backed_jobs_and_executions( monkeypatch, tmp_path: Path ) -> None: db_path = tmp_path / "runs-page.db" log_dir = tmp_path / "out" / "logs" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) app = create_app() app.config["REPUB_LOG_DIR"] = log_dir app.config["REPUB_JOB_WORKER_DURATION_SECONDS"] = 0.35 app.config["REPUB_JOB_WORKER_STATS_INTERVAL_SECONDS"] = 0.05 app.config["REPUB_JOB_WORKER_FAILURE_PROBABILITY"] = 0.0 source = create_source( name="Runs page source", slug="runs-page-source", source_type="feed", notes="", spider_arguments="", enabled=True, cron_minute="*/5", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", feed_url="https://example.com/runs-page.xml", ) job = Job.get(Job.source == source) runtime = get_job_runtime(app) runtime.start() try: execution_id = runtime.run_job_now(job.id, reason="manual") assert execution_id is not None execution = _wait_for_terminal_execution(execution_id) async def run() -> None: body = str(await render_runs(app)) assert "runs-page-source" in body assert "Running job executions" in body assert "Upcoming jobs" in body assert "Completed job executions" in body assert f"/job/{job.id}/execution/{execution.get_id()}/logs" in body assert "Succeeded" in body assert "Run now" in body asyncio.run(run()) finally: runtime.shutdown() def test_render_execution_logs_handles_missing_execution_and_missing_log_file( monkeypatch, tmp_path: Path ) -> None: db_path = tmp_path / "log-errors.db" log_dir = tmp_path / "out" / "logs" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) app = create_app() app.config["REPUB_LOG_DIR"] = log_dir source = create_source( name="Log source", slug="log-source", source_type="feed", notes="", spider_arguments="", enabled=False, cron_minute="*/5", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", feed_url="https://example.com/log-source.xml", ) job = Job.get(Job.source == source) execution = JobExecution.create( job=job, running_status=JobExecutionStatus.FAILED, ) async def run() -> None: missing_execution = str( await render_execution_logs(app, job_id=job.id, execution_id=9999) ) missing_log = str( await render_execution_logs(app, job_id=job.id, execution_id=execution.id) ) assert "Execution log unavailable" in missing_execution assert "Execution does not exist." in missing_execution assert "Execution log unavailable" in missing_log assert "Log file has not been created yet." in missing_log asyncio.run(run()) def test_delete_job_action_removes_source_job_and_execution_history( monkeypatch, tmp_path: Path ) -> None: db_path = tmp_path / "delete-job.db" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) async def run() -> None: app = create_app() client = app.test_client() source = create_source( name="Delete source", slug="delete-source", source_type="feed", notes="", spider_arguments="", enabled=True, cron_minute="*/30", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", feed_url="https://example.com/delete.xml", ) job = Job.get(Job.source == source) execution = JobExecution.create( job=job, running_status=JobExecutionStatus.SUCCEEDED, ) response = await client.post(f"/actions/jobs/{job.id}/delete") assert response.status_code == 204 assert Source.get_or_none(Source.slug == "delete-source") is None assert Job.get_or_none(id=job.id) is None assert JobExecution.get_or_none(id=int(execution.get_id())) is None asyncio.run(run()) def _wait_for_running_execution( execution_id: int, *, timeout_seconds: float = 2.0 ) -> JobExecution: deadline = time.monotonic() + timeout_seconds while time.monotonic() < deadline: execution = JobExecution.get_by_id(execution_id) if execution.running_status == JobExecutionStatus.RUNNING: return execution time.sleep(0.02) raise AssertionError(f"execution {execution_id} never entered RUNNING state") def _wait_for_terminal_execution( execution_id: int, *, timeout_seconds: float = 4.0 ) -> JobExecution: deadline = time.monotonic() + timeout_seconds while time.monotonic() < deadline: execution = JobExecution.get_by_id(execution_id) if execution.running_status in { JobExecutionStatus.SUCCEEDED, JobExecutionStatus.FAILED, JobExecutionStatus.CANCELED, }: return execution time.sleep(0.02) raise AssertionError(f"execution {execution_id} did not finish in time")