diff --git a/repub/jobs.py b/repub/jobs.py index 8d7de38..eeb4494 100644 --- a/repub/jobs.py +++ b/repub/jobs.py @@ -80,6 +80,7 @@ class JobRuntime: if self._started: return + self._reconcile_stale_executions() self.scheduler.start() self.scheduler.add_job( self.poll_workers, @@ -311,6 +312,39 @@ class JobRuntime: if self.refresh_callback is not None: self.refresh_callback() + def _reconcile_stale_executions(self) -> None: + with database.connection_context(): + stale_executions = tuple( + JobExecution.select(JobExecution, Job) + .join(Job) + .where(JobExecution.running_status == JobExecutionStatus.RUNNING) + ) + + for execution in stale_executions: + job = cast(Job, execution.job) + execution_id = _execution_id(execution) + artifacts = JobArtifacts.for_execution( + log_dir=self.log_dir, + job_id=_job_id(job), + execution_id=execution_id, + ) + artifacts.log_path.parent.mkdir(parents=True, exist_ok=True) + with artifacts.log_path.open("a", encoding="utf-8") as log_handle: + log_handle.write( + "scheduler: execution marked failed after app restart\n" + ) + + execution.ended_at = utc_now() + execution.running_status = ( + JobExecutionStatus.CANCELED + if execution.stop_requested_at is not None + else JobExecutionStatus.FAILED + ) + execution.save() + + if stale_executions: + self._trigger_refresh() + def load_runs_view( *, log_dir: str | Path, now: datetime | None = None diff --git a/tests/test_scheduler_runtime.py b/tests/test_scheduler_runtime.py index df226dc..a2059d4 100644 --- a/tests/test_scheduler_runtime.py +++ b/tests/test_scheduler_runtime.py @@ -182,6 +182,58 @@ def test_job_runtime_cancel_marks_execution_canceled(tmp_path: Path) -> None: runtime.shutdown() +def test_job_runtime_start_reconciles_stale_running_execution(tmp_path: Path) -> None: + initialize_database(tmp_path / "stale-running.db") + source = create_source( + name="Stale source", + slug="stale-source", + source_type="feed", + notes="", + spider_arguments="", + enabled=False, + cron_minute="*/5", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + feed_url="https://example.com/stale.xml", + ) + job = Job.get(Job.source == source) + execution = JobExecution.create( + job=job, + started_at="2026-03-30 12:30:00+00:00", + running_status=JobExecutionStatus.RUNNING, + ) + artifacts = JobArtifacts.for_execution( + log_dir=tmp_path / "out" / "logs", + job_id=job.id, + execution_id=int(execution.get_id()), + ) + artifacts.log_path.parent.mkdir(parents=True, exist_ok=True) + artifacts.log_path.write_text( + "worker: process lost during app restart\n", + encoding="utf-8", + ) + + runtime = JobRuntime( + log_dir=tmp_path / "out" / "logs", + worker_duration_seconds=0.5, + worker_stats_interval_seconds=0.05, + worker_failure_probability=0.0, + ) + try: + runtime.start() + reconciled_execution = JobExecution.get_by_id(execution.get_id()) + + assert reconciled_execution.running_status == JobExecutionStatus.FAILED + assert reconciled_execution.ended_at is not None + assert "marked failed after app restart" in artifacts.log_path.read_text( + encoding="utf-8" + ) + finally: + runtime.shutdown() + + def test_render_runs_uses_database_backed_jobs_and_executions( monkeypatch, tmp_path: Path ) -> None: