republisher/tests/test_scheduler_runtime.py

399 lines
12 KiB
Python
Raw Normal View History

2026-03-30 14:02:39 +02:00
from __future__ import annotations
import asyncio
import json
import time
from pathlib import Path
from repub.jobs import JobArtifacts, JobRuntime
from repub.model import (
Job,
JobExecution,
JobExecutionStatus,
Source,
create_source,
initialize_database,
)
from repub.web import create_app, get_job_runtime, render_execution_logs, render_runs
def test_job_runtime_syncs_enabled_jobs_into_apscheduler(tmp_path: Path) -> None:
initialize_database(tmp_path / "scheduler.db")
enabled_source = create_source(
name="Enabled source",
slug="enabled-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/enabled.xml",
)
disabled_source = create_source(
name="Disabled source",
slug="disabled-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="15",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/disabled.xml",
)
enabled_job = Job.get(Job.source == enabled_source)
disabled_job = Job.get(Job.source == disabled_source)
runtime = JobRuntime(
log_dir=tmp_path / "out" / "logs",
worker_duration_seconds=0.4,
worker_stats_interval_seconds=0.05,
worker_failure_probability=0.0,
)
try:
runtime.start()
runtime.sync_jobs()
scheduled_ids = {job.id for job in runtime.scheduler.get_jobs()}
assert f"job-{enabled_job.id}" in scheduled_ids
assert f"job-{disabled_job.id}" not in scheduled_ids
enabled_job.enabled = False
enabled_job.save()
runtime.sync_jobs()
scheduled_ids = {job.id for job in runtime.scheduler.get_jobs()}
assert f"job-{enabled_job.id}" not in scheduled_ids
finally:
runtime.shutdown()
def test_job_runtime_run_now_writes_log_and_stats_and_marks_success(
tmp_path: Path,
) -> None:
initialize_database(tmp_path / "run-now.db")
source = create_source(
name="Manual source",
slug="manual-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/manual.xml",
)
job = Job.get(Job.source == source)
runtime = JobRuntime(
log_dir=tmp_path / "out" / "logs",
worker_duration_seconds=0.35,
worker_stats_interval_seconds=0.05,
worker_failure_probability=0.0,
)
try:
runtime.start()
execution_id = runtime.run_job_now(job.id, reason="manual")
assert execution_id is not None
execution = _wait_for_terminal_execution(execution_id)
artifacts = JobArtifacts.for_execution(
log_dir=tmp_path / "out" / "logs",
job_id=job.id,
execution_id=execution_id,
)
assert execution.running_status == JobExecutionStatus.SUCCEEDED
assert execution.started_at is not None
assert execution.ended_at is not None
assert execution.requests_count > 0
assert execution.items_count > 0
assert execution.bytes_count > 0
assert artifacts.log_path.exists()
assert artifacts.stats_path.exists()
assert "starting simulated crawl" in artifacts.log_path.read_text(
encoding="utf-8"
)
stats_lines = [
json.loads(line)
for line in artifacts.stats_path.read_text(encoding="utf-8").splitlines()
]
assert len(stats_lines) >= 2
assert stats_lines[-1]["requests_count"] == execution.requests_count
finally:
runtime.shutdown()
def test_job_runtime_cancel_marks_execution_canceled(tmp_path: Path) -> None:
initialize_database(tmp_path / "cancel.db")
source = create_source(
name="Cancelable source",
slug="cancelable-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/cancelable.xml",
)
job = Job.get(Job.source == source)
runtime = JobRuntime(
log_dir=tmp_path / "out" / "logs",
worker_duration_seconds=2.0,
worker_stats_interval_seconds=0.1,
worker_failure_probability=0.0,
)
try:
runtime.start()
execution_id = runtime.run_job_now(job.id, reason="manual")
assert execution_id is not None
_wait_for_running_execution(execution_id)
runtime.request_execution_cancel(execution_id)
execution = _wait_for_terminal_execution(execution_id)
artifacts = JobArtifacts.for_execution(
log_dir=tmp_path / "out" / "logs",
job_id=job.id,
execution_id=execution_id,
)
assert execution.running_status == JobExecutionStatus.CANCELED
assert execution.ended_at is not None
assert execution.stop_requested_at is not None
assert "graceful stop requested" in artifacts.log_path.read_text(
encoding="utf-8"
)
finally:
runtime.shutdown()
2026-03-30 14:18:55 +02:00
def test_job_runtime_start_reconciles_stale_running_execution(tmp_path: Path) -> None:
initialize_database(tmp_path / "stale-running.db")
source = create_source(
name="Stale source",
slug="stale-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/stale.xml",
)
job = Job.get(Job.source == source)
execution = JobExecution.create(
job=job,
started_at="2026-03-30 12:30:00+00:00",
running_status=JobExecutionStatus.RUNNING,
)
artifacts = JobArtifacts.for_execution(
log_dir=tmp_path / "out" / "logs",
job_id=job.id,
execution_id=int(execution.get_id()),
)
artifacts.log_path.parent.mkdir(parents=True, exist_ok=True)
artifacts.log_path.write_text(
"worker: process lost during app restart\n",
encoding="utf-8",
)
runtime = JobRuntime(
log_dir=tmp_path / "out" / "logs",
worker_duration_seconds=0.5,
worker_stats_interval_seconds=0.05,
worker_failure_probability=0.0,
)
try:
runtime.start()
reconciled_execution = JobExecution.get_by_id(execution.get_id())
assert reconciled_execution.running_status == JobExecutionStatus.FAILED
assert reconciled_execution.ended_at is not None
assert "marked failed after app restart" in artifacts.log_path.read_text(
encoding="utf-8"
)
finally:
runtime.shutdown()
2026-03-30 14:02:39 +02:00
def test_render_runs_uses_database_backed_jobs_and_executions(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "runs-page.db"
log_dir = tmp_path / "out" / "logs"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
app = create_app()
app.config["REPUB_LOG_DIR"] = log_dir
app.config["REPUB_JOB_WORKER_DURATION_SECONDS"] = 0.35
app.config["REPUB_JOB_WORKER_STATS_INTERVAL_SECONDS"] = 0.05
app.config["REPUB_JOB_WORKER_FAILURE_PROBABILITY"] = 0.0
source = create_source(
name="Runs page source",
slug="runs-page-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/runs-page.xml",
)
job = Job.get(Job.source == source)
runtime = get_job_runtime(app)
runtime.start()
try:
execution_id = runtime.run_job_now(job.id, reason="manual")
assert execution_id is not None
execution = _wait_for_terminal_execution(execution_id)
async def run() -> None:
body = str(await render_runs(app))
assert "runs-page-source" in body
assert "Running job executions" in body
assert "Upcoming jobs" in body
assert "Completed job executions" in body
assert f"/job/{job.id}/execution/{execution.get_id()}/logs" in body
assert "Succeeded" in body
assert "Run now" in body
asyncio.run(run())
finally:
runtime.shutdown()
def test_render_execution_logs_handles_missing_execution_and_missing_log_file(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "log-errors.db"
log_dir = tmp_path / "out" / "logs"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
app = create_app()
app.config["REPUB_LOG_DIR"] = log_dir
source = create_source(
name="Log source",
slug="log-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=False,
cron_minute="*/5",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/log-source.xml",
)
job = Job.get(Job.source == source)
execution = JobExecution.create(
job=job,
running_status=JobExecutionStatus.FAILED,
)
async def run() -> None:
missing_execution = str(
await render_execution_logs(app, job_id=job.id, execution_id=9999)
)
missing_log = str(
await render_execution_logs(app, job_id=job.id, execution_id=execution.id)
)
assert "Execution log unavailable" in missing_execution
assert "Execution does not exist." in missing_execution
assert "Execution log unavailable" in missing_log
assert "Log file has not been created yet." in missing_log
asyncio.run(run())
def test_delete_job_action_removes_source_job_and_execution_history(
monkeypatch, tmp_path: Path
) -> None:
db_path = tmp_path / "delete-job.db"
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path))
async def run() -> None:
app = create_app()
client = app.test_client()
source = create_source(
name="Delete source",
slug="delete-source",
source_type="feed",
notes="",
spider_arguments="",
enabled=True,
cron_minute="*/30",
cron_hour="*",
cron_day_of_month="*",
cron_day_of_week="*",
cron_month="*",
feed_url="https://example.com/delete.xml",
)
job = Job.get(Job.source == source)
execution = JobExecution.create(
job=job,
running_status=JobExecutionStatus.SUCCEEDED,
)
response = await client.post(f"/actions/jobs/{job.id}/delete")
assert response.status_code == 204
assert Source.get_or_none(Source.slug == "delete-source") is None
assert Job.get_or_none(id=job.id) is None
assert JobExecution.get_or_none(id=int(execution.get_id())) is None
asyncio.run(run())
def _wait_for_running_execution(
execution_id: int, *, timeout_seconds: float = 2.0
) -> JobExecution:
deadline = time.monotonic() + timeout_seconds
while time.monotonic() < deadline:
execution = JobExecution.get_by_id(execution_id)
if execution.running_status == JobExecutionStatus.RUNNING:
return execution
time.sleep(0.02)
raise AssertionError(f"execution {execution_id} never entered RUNNING state")
def _wait_for_terminal_execution(
execution_id: int, *, timeout_seconds: float = 4.0
) -> JobExecution:
deadline = time.monotonic() + timeout_seconds
while time.monotonic() < deadline:
execution = JobExecution.get_by_id(execution_id)
if execution.running_status in {
JobExecutionStatus.SUCCEEDED,
JobExecutionStatus.FAILED,
JobExecutionStatus.CANCELED,
}:
return execution
time.sleep(0.02)
raise AssertionError(f"execution {execution_id} did not finish in time")