"""Unit tests for reconciler timeout and failure safeguards.""" from __future__ import annotations from dataclasses import dataclass from nix_builder_autoscaler.config import AppConfig, AwsConfig, CapacityConfig from nix_builder_autoscaler.metrics import MetricsRegistry from nix_builder_autoscaler.models import SlotState from nix_builder_autoscaler.providers.clock import FakeClock from nix_builder_autoscaler.providers.haproxy import SlotHealth from nix_builder_autoscaler.reconciler import Reconciler from nix_builder_autoscaler.state_db import StateDB @dataclass class _Instance: state: str slot_id: str tailscale_ip: str | None = None class _RuntimeStub: def __init__(self) -> None: self.instances: dict[str, _Instance] = {} self.terminate_calls: list[str] = [] def list_managed_instances(self) -> list[dict]: return [ { "instance_id": iid, "state": inst.state, "slot_id": inst.slot_id, } for iid, inst in self.instances.items() if inst.state != "terminated" ] def describe_instance(self, instance_id: str) -> dict: inst = self.instances.get(instance_id) if inst is None: return {"state": "terminated", "tailscale_ip": None, "launch_time": None} return {"state": inst.state, "tailscale_ip": inst.tailscale_ip, "launch_time": None} def terminate_instance(self, instance_id: str) -> None: self.terminate_calls.append(instance_id) if instance_id in self.instances: self.instances[instance_id].state = "terminated" class _HAProxyStub: def __init__(self, health: dict[str, SlotHealth] | None = None) -> None: self.health = health or {} def set_slot_addr(self, slot_id: str, ip: str, port: int = 22) -> None: # noqa: ARG002 return def enable_slot(self, slot_id: str) -> None: # noqa: ARG002 return def disable_slot(self, slot_id: str) -> None: # noqa: ARG002 return def read_slot_health(self) -> dict[str, SlotHealth]: return self.health def _make_env( *, launch_timeout=300, boot_timeout=300, binding_timeout=180, terminating_timeout=300, termination_cooldown=0, ): clock = FakeClock() db = StateDB(":memory:", clock=clock) db.init_schema() db.init_slots("slot", 1, "x86_64-linux", "all") runtime = _RuntimeStub() haproxy = _HAProxyStub() config = AppConfig( capacity=CapacityConfig( launch_timeout_seconds=launch_timeout, boot_timeout_seconds=boot_timeout, binding_timeout_seconds=binding_timeout, terminating_timeout_seconds=terminating_timeout, termination_cooldown_seconds=termination_cooldown, ), aws=AwsConfig(region="us-east-1"), ) metrics = MetricsRegistry() reconciler = Reconciler(db, runtime, haproxy, config, clock, metrics) return db, runtime, reconciler, clock def test_launching_timeout_moves_slot_to_terminating() -> None: db, runtime, reconciler, clock = _make_env(launch_timeout=10) runtime.instances["i-1"] = _Instance(state="pending", slot_id="slot001") db.update_slot_state("slot001", SlotState.LAUNCHING, instance_id="i-1") clock.advance(11) reconciler.tick() slot = db.get_slot("slot001") assert slot is not None assert slot["state"] == SlotState.TERMINATING.value assert runtime.terminate_calls == ["i-1"] def test_launching_stopped_state_begins_termination() -> None: db, runtime, reconciler, _ = _make_env() runtime.instances["i-1"] = _Instance(state="stopped", slot_id="slot001") db.update_slot_state("slot001", SlotState.LAUNCHING, instance_id="i-1") reconciler.tick() slot = db.get_slot("slot001") assert slot is not None assert slot["state"] == SlotState.TERMINATING.value assert runtime.terminate_calls == ["i-1"] def test_booting_timeout_moves_slot_to_terminating() -> None: db, runtime, reconciler, clock = _make_env(boot_timeout=15) runtime.instances["i-2"] = _Instance(state="running", slot_id="slot001", tailscale_ip=None) db.update_slot_state("slot001", SlotState.BOOTING, instance_id="i-2") clock.advance(16) reconciler.tick() slot = db.get_slot("slot001") assert slot is not None assert slot["state"] == SlotState.TERMINATING.value assert runtime.terminate_calls == ["i-2"] def test_binding_timeout_moves_slot_to_terminating() -> None: db, runtime, reconciler, clock = _make_env(binding_timeout=8) runtime.instances["i-3"] = _Instance( state="running", slot_id="slot001", tailscale_ip="100.64.0.3", ) db.update_slot_state( "slot001", SlotState.BINDING, instance_id="i-3", instance_ip="100.64.0.3", ) clock.advance(9) reconciler.tick() slot = db.get_slot("slot001") assert slot is not None assert slot["state"] == SlotState.TERMINATING.value assert runtime.terminate_calls == ["i-3"] def test_binding_stopped_state_begins_termination() -> None: db, runtime, reconciler, _ = _make_env() runtime.instances["i-4"] = _Instance( state="stopping", slot_id="slot001", tailscale_ip="100.64.0.4", ) db.update_slot_state( "slot001", SlotState.BINDING, instance_id="i-4", instance_ip="100.64.0.4", ) reconciler.tick() slot = db.get_slot("slot001") assert slot is not None assert slot["state"] == SlotState.TERMINATING.value assert runtime.terminate_calls == ["i-4"] def test_terminating_timeout_reissues_terminate_with_pacing() -> None: db, runtime, reconciler, clock = _make_env(terminating_timeout=5) runtime.instances["i-5"] = _Instance(state="shutting-down", slot_id="slot001") db.update_slot_state("slot001", SlotState.TERMINATING, instance_id="i-5") clock.advance(6) reconciler.tick() slot = db.get_slot("slot001") assert slot is not None assert slot["state"] == SlotState.TERMINATING.value assert runtime.terminate_calls == ["i-5"] # Immediate next tick should not retry yet because last_state_change was refreshed. reconciler.tick() assert runtime.terminate_calls == ["i-5"] def test_termination_cooldown_spaces_terminations() -> None: db, runtime, reconciler, clock = _make_env(termination_cooldown=30) runtime.instances["i-6"] = _Instance(state="running", slot_id="slot001") db.update_slot_state("slot001", SlotState.DRAINING, instance_id="i-6", lease_count=0) reconciler.tick() slot = db.get_slot("slot001") assert slot is not None assert slot["state"] == SlotState.TERMINATING.value assert runtime.terminate_calls == ["i-6"] # New draining cycle before cooldown expires should not terminate yet. runtime.instances["i-7"] = _Instance(state="running", slot_id="slot001") db.update_slot_state("slot001", SlotState.DRAINING, instance_id="i-7", lease_count=0) clock.advance(10) reconciler.tick() slot = db.get_slot("slot001") assert slot is not None assert slot["state"] == SlotState.DRAINING.value assert runtime.terminate_calls == ["i-6"] # After cooldown, termination proceeds. clock.advance(21) reconciler.tick() slot = db.get_slot("slot001") assert slot is not None assert slot["state"] == SlotState.TERMINATING.value assert runtime.terminate_calls == ["i-6", "i-7"]