agent: complete plan05 closeout

This commit is contained in:
Abel Luck 2026-02-27 13:48:52 +01:00
parent 33ba248c49
commit d8c925b817
12 changed files with 1347 additions and 313 deletions

View file

@ -1 +1,407 @@
"""End-to-end integration tests with FakeRuntime — Plan 05."""
"""End-to-end integration tests with FakeRuntime and a fake HAProxy socket."""
from __future__ import annotations
import socket
import threading
import time
from pathlib import Path
from fastapi.testclient import TestClient
from nix_builder_autoscaler.api import create_app
from nix_builder_autoscaler.config import (
AppConfig,
AwsConfig,
CapacityConfig,
HaproxyConfig,
SchedulerConfig,
)
from nix_builder_autoscaler.metrics import MetricsRegistry
from nix_builder_autoscaler.models import SlotState
from nix_builder_autoscaler.providers.clock import FakeClock
from nix_builder_autoscaler.providers.haproxy import HAProxyRuntime
from nix_builder_autoscaler.reconciler import Reconciler
from nix_builder_autoscaler.runtime.fake import FakeRuntime
from nix_builder_autoscaler.scheduler import scheduling_tick
from nix_builder_autoscaler.state_db import StateDB
class FakeHAProxySocketServer:
"""Tiny fake HAProxy runtime socket server for integration tests."""
def __init__(self, socket_path: Path, backend: str, slot_ids: list[str]) -> None:
self._socket_path = socket_path
self._backend = backend
self._slot_ids = slot_ids
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
self._lock = threading.Lock()
self._state: dict[str, dict[str, object]] = {
slot_id: {
"enabled": False,
"addr": "0.0.0.0",
"port": 22,
"status": "MAINT",
"scur": 0,
"qcur": 0,
}
for slot_id in slot_ids
}
def start(self) -> None:
self._thread = threading.Thread(target=self._serve, name="fake-haproxy", daemon=True)
self._thread.start()
deadline = time.time() + 2.0
while time.time() < deadline:
if self._socket_path.exists():
return
time.sleep(0.01)
msg = f"fake haproxy socket not created: {self._socket_path}"
raise RuntimeError(msg)
def stop(self) -> None:
self._stop_event.set()
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(str(self._socket_path))
sock.sendall(b"\n")
except OSError:
pass
if self._thread is not None:
self._thread.join(timeout=2.0)
if self._socket_path.exists():
self._socket_path.unlink()
def _serve(self) -> None:
if self._socket_path.exists():
self._socket_path.unlink()
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server:
server.bind(str(self._socket_path))
server.listen(16)
server.settimeout(0.2)
while not self._stop_event.is_set():
try:
conn, _ = server.accept()
except TimeoutError:
continue
except OSError:
if self._stop_event.is_set():
break
continue
with conn:
payload = b""
while True:
chunk = conn.recv(4096)
if not chunk:
break
payload += chunk
command = payload.decode().strip()
response = self._handle_command(command)
try:
conn.sendall(response.encode())
except BrokenPipeError:
continue
def _handle_command(self, command: str) -> str:
if command == "show stat":
return self._render_show_stat()
parts = command.split()
if not parts:
return "\n"
if parts[0:2] == ["set", "server"] and len(parts) >= 7:
slot_id = self._parse_slot(parts[2])
if slot_id is None:
return "No such server.\n"
with self._lock:
slot_state = self._state[slot_id]
slot_state["addr"] = parts[4]
slot_state["port"] = int(parts[6])
slot_state["status"] = "UP" if slot_state["enabled"] else "DOWN"
return "\n"
if parts[0:2] == ["enable", "server"] and len(parts) >= 3:
slot_id = self._parse_slot(parts[2])
if slot_id is None:
return "No such server.\n"
with self._lock:
slot_state = self._state[slot_id]
slot_state["enabled"] = True
slot_state["status"] = "UP"
return "\n"
if parts[0:2] == ["disable", "server"] and len(parts) >= 3:
slot_id = self._parse_slot(parts[2])
if slot_id is None:
return "No such server.\n"
with self._lock:
slot_state = self._state[slot_id]
slot_state["enabled"] = False
slot_state["status"] = "MAINT"
return "\n"
return "Unknown command.\n"
def _parse_slot(self, backend_slot: str) -> str | None:
backend, _, slot_id = backend_slot.partition("/")
if backend != self._backend or slot_id not in self._state:
return None
return slot_id
def _render_show_stat(self) -> str:
header = "# pxname,svname,qcur,qmax,scur,smax,slim,stot,status\n"
rows = [f"{self._backend},BACKEND,0,0,0,0,0,0,UP\n"]
with self._lock:
for slot_id in self._slot_ids:
slot_state = self._state[slot_id]
rows.append(
f"{self._backend},{slot_id},{slot_state['qcur']},0,"
f"{slot_state['scur']},0,50,0,{slot_state['status']}\n"
)
return header + "".join(rows)
class DaemonHarness:
"""In-process threaded harness for scheduler/reconciler/API integration."""
def __init__(
self,
root: Path,
*,
db_path: Path | None = None,
runtime: FakeRuntime | None = None,
max_slots: int = 3,
min_slots: int = 0,
idle_scale_down_seconds: int = 1,
drain_timeout_seconds: int = 120,
) -> None:
root.mkdir(parents=True, exist_ok=True)
self.clock = FakeClock()
self.metrics = MetricsRegistry()
self.runtime = runtime or FakeRuntime(launch_latency_ticks=2, ip_delay_ticks=1)
self._stop_event = threading.Event()
self._threads: list[threading.Thread] = []
self._reconcile_lock = threading.Lock()
self._db_path = db_path or (root / "state.db")
self._socket_path = root / "haproxy.sock"
self._slot_ids = [f"slot{i:03d}" for i in range(1, 4)]
self.config = AppConfig(
aws=AwsConfig(region="us-east-1"),
haproxy=HaproxyConfig(
runtime_socket=str(self._socket_path),
backend="all",
slot_prefix="slot",
slot_count=3,
check_ready_up_count=1,
),
capacity=CapacityConfig(
default_system="x86_64-linux",
max_slots=max_slots,
min_slots=min_slots,
max_leases_per_slot=1,
target_warm_slots=0,
reservation_ttl_seconds=1200,
idle_scale_down_seconds=idle_scale_down_seconds,
drain_timeout_seconds=drain_timeout_seconds,
),
scheduler=SchedulerConfig(tick_seconds=0.05, reconcile_seconds=0.05),
)
self.db = StateDB(str(self._db_path), clock=self.clock)
self.db.init_schema()
self.db.init_slots("slot", 3, "x86_64-linux", "all")
self.haproxy_server = FakeHAProxySocketServer(self._socket_path, "all", self._slot_ids)
self.haproxy = HAProxyRuntime(str(self._socket_path), "all", "slot")
self.reconciler = Reconciler(
self.db,
self.runtime,
self.haproxy,
self.config,
self.clock,
self.metrics,
)
app = create_app(
self.db,
self.config,
self.clock,
self.metrics,
reconcile_now=self.reconcile_now,
)
self.client = TestClient(app)
def start(self) -> None:
self.haproxy_server.start()
with self._reconcile_lock:
self.runtime.tick()
self.reconciler.tick()
self._threads = [
threading.Thread(target=self._scheduler_loop, name="sched", daemon=True),
threading.Thread(target=self._reconciler_loop, name="recon", daemon=True),
]
for thread in self._threads:
thread.start()
def stop(self) -> None:
self._stop_event.set()
for thread in self._threads:
thread.join(timeout=2.0)
self.client.close()
self.haproxy_server.stop()
self.db.close()
def create_reservation(self, reason: str) -> str:
response = self.client.post(
"/v1/reservations",
json={"system": "x86_64-linux", "reason": reason},
)
assert response.status_code == 200
return str(response.json()["reservation_id"])
def release_reservation(self, reservation_id: str) -> None:
response = self.client.post(f"/v1/reservations/{reservation_id}/release")
assert response.status_code == 200
def reservation(self, reservation_id: str) -> dict:
response = self.client.get(f"/v1/reservations/{reservation_id}")
assert response.status_code == 200
return response.json()
def wait_for(self, predicate, timeout: float = 6.0) -> None: # noqa: ANN001
deadline = time.time() + timeout
while time.time() < deadline:
if predicate():
return
time.sleep(0.02)
raise AssertionError("condition not met before timeout")
def reconcile_now(self) -> dict[str, bool]:
with self._reconcile_lock:
self.runtime.tick()
self.reconciler.tick()
return {"triggered": True}
def _scheduler_loop(self) -> None:
while not self._stop_event.is_set():
scheduling_tick(self.db, self.runtime, self.config, self.clock, self.metrics)
self._stop_event.wait(self.config.scheduler.tick_seconds)
def _reconciler_loop(self) -> None:
while not self._stop_event.is_set():
with self._reconcile_lock:
self.runtime.tick()
self.reconciler.tick()
self._stop_event.wait(self.config.scheduler.reconcile_seconds)
def test_cold_start_reservation_launch_bind_ready(tmp_path: Path) -> None:
harness = DaemonHarness(tmp_path)
harness.start()
try:
reservation_id = harness.create_reservation("cold-start")
harness.wait_for(lambda: harness.reservation(reservation_id)["phase"] == "ready")
reservation = harness.reservation(reservation_id)
assert reservation["slot"] is not None
slot = harness.db.get_slot(reservation["slot"])
assert slot is not None
assert slot["state"] == SlotState.READY.value
assert slot["instance_ip"] is not None
finally:
harness.stop()
def test_burst_three_concurrent_reservations(tmp_path: Path) -> None:
harness = DaemonHarness(tmp_path, max_slots=3)
harness.start()
try:
reservation_ids = [harness.create_reservation(f"burst-{i}") for i in range(3)]
harness.wait_for(
lambda: all(harness.reservation(rid)["phase"] == "ready" for rid in reservation_ids),
timeout=8.0,
)
slots = [harness.reservation(rid)["slot"] for rid in reservation_ids]
assert len(set(slots)) == 3
finally:
harness.stop()
def test_scale_down_after_release_and_idle_timeout(tmp_path: Path) -> None:
harness = DaemonHarness(tmp_path, idle_scale_down_seconds=1, drain_timeout_seconds=0)
harness.start()
try:
reservation_id = harness.create_reservation("scale-down")
harness.wait_for(lambda: harness.reservation(reservation_id)["phase"] == "ready")
slot_id = str(harness.reservation(reservation_id)["slot"])
harness.release_reservation(reservation_id)
harness.clock.advance(2)
harness.wait_for(
lambda: (
harness.db.get_slot(slot_id) is not None
and harness.db.get_slot(slot_id)["state"] == SlotState.EMPTY.value
)
)
finally:
harness.stop()
def test_restart_recovery_midflight(tmp_path: Path) -> None:
db_path = tmp_path / "state.db"
runtime = FakeRuntime(launch_latency_ticks=6, ip_delay_ticks=2)
first = DaemonHarness(tmp_path / "run1", db_path=db_path, runtime=runtime)
first.start()
reservation_id = first.create_reservation("restart-midflight")
first.wait_for(
lambda: len(first.db.list_slots(SlotState.LAUNCHING)) > 0,
timeout=4.0,
)
first.stop()
second = DaemonHarness(tmp_path / "run2", db_path=db_path, runtime=runtime)
second.start()
try:
second.wait_for(lambda: second.reservation(reservation_id)["phase"] == "ready", timeout=8.0)
finally:
second.stop()
def test_interruption_recovery_pending_reservation_resolves(tmp_path: Path) -> None:
harness = DaemonHarness(tmp_path, max_slots=2, idle_scale_down_seconds=60)
harness.start()
try:
first_reservation = harness.create_reservation("baseline")
harness.wait_for(lambda: harness.reservation(first_reservation)["phase"] == "ready")
slot_id = str(harness.reservation(first_reservation)["slot"])
instance_id = str(harness.reservation(first_reservation)["instance_id"])
second_reservation = harness.create_reservation("post-interruption")
harness.release_reservation(first_reservation)
harness.runtime.inject_interruption(instance_id)
harness.runtime._instances[instance_id].state = "shutting-down"
harness.wait_for(
lambda: (
harness.db.get_slot(slot_id) is not None
and harness.db.get_slot(slot_id)["state"]
in {
SlotState.DRAINING.value,
SlotState.TERMINATING.value,
SlotState.EMPTY.value,
}
),
timeout=6.0,
)
harness.wait_for(
lambda: harness.reservation(second_reservation)["phase"] == "ready",
timeout=10.0,
)
finally:
harness.stop()

View file

@ -3,24 +3,29 @@
from __future__ import annotations
from datetime import UTC, datetime
from typing import Any
from fastapi.testclient import TestClient
from nix_builder_autoscaler.api import create_app
from nix_builder_autoscaler.config import AppConfig, CapacityConfig
from nix_builder_autoscaler.metrics import MetricsRegistry
from nix_builder_autoscaler.models import SlotState
from nix_builder_autoscaler.providers.clock import FakeClock
from nix_builder_autoscaler.state_db import StateDB
def _make_client() -> tuple[TestClient, StateDB, FakeClock, MetricsRegistry]:
def _make_client(
*,
reconcile_now: Any = None, # noqa: ANN401
) -> tuple[TestClient, StateDB, FakeClock, MetricsRegistry]:
clock = FakeClock()
db = StateDB(":memory:", clock=clock)
db.init_schema()
db.init_slots("slot", 3, "x86_64-linux", "all")
config = AppConfig(capacity=CapacityConfig(reservation_ttl_seconds=1200))
metrics = MetricsRegistry()
app = create_app(db, config, clock, metrics)
app = create_app(db, config, clock, metrics, reconcile_now=reconcile_now)
return TestClient(app), db, clock, metrics
@ -120,6 +125,20 @@ def test_health_ready_returns_ok_when_no_checks() -> None:
assert response.json()["status"] == "ok"
def test_health_ready_degraded_when_ready_check_fails() -> None:
clock = FakeClock()
db = StateDB(":memory:", clock=clock)
db.init_schema()
db.init_slots("slot", 3, "x86_64-linux", "all")
config = AppConfig(capacity=CapacityConfig(reservation_ttl_seconds=1200))
metrics = MetricsRegistry()
app = create_app(db, config, clock, metrics, ready_check=lambda: False)
client = TestClient(app)
response = client.get("/health/ready")
assert response.status_code == 503
assert response.json()["status"] == "degraded"
def test_metrics_returns_prometheus_text() -> None:
client, _, _, metrics = _make_client()
metrics.counter("autoscaler_test_counter", {}, 1.0)
@ -150,3 +169,67 @@ def test_release_nonexistent_returns_404() -> None:
response = client.post("/v1/reservations/resv_nonexistent/release")
assert response.status_code == 404
assert response.json()["error"]["code"] == "not_found"
def test_admin_drain_success() -> None:
client, db, _, _ = _make_client()
db.update_slot_state("slot001", SlotState.LAUNCHING, instance_id="i-test")
db.update_slot_state("slot001", SlotState.BOOTING)
db.update_slot_state("slot001", SlotState.BINDING, instance_ip="100.64.0.1")
db.update_slot_state("slot001", SlotState.READY)
response = client.post("/v1/admin/drain", json={"slot_id": "slot001"})
assert response.status_code == 200
assert response.json()["state"] == "draining"
slot = db.get_slot("slot001")
assert slot is not None
assert slot["state"] == SlotState.DRAINING.value
def test_admin_drain_invalid_state_returns_409() -> None:
client, _, _, _ = _make_client()
response = client.post("/v1/admin/drain", json={"slot_id": "slot001"})
assert response.status_code == 409
assert response.json()["error"]["code"] == "invalid_state"
def test_admin_unquarantine_success() -> None:
client, db, _, _ = _make_client()
db.update_slot_state("slot001", SlotState.ERROR, instance_id="i-bad")
response = client.post("/v1/admin/unquarantine", json={"slot_id": "slot001"})
assert response.status_code == 200
assert response.json()["state"] == "empty"
slot = db.get_slot("slot001")
assert slot is not None
assert slot["state"] == SlotState.EMPTY.value
assert slot["instance_id"] is None
def test_admin_unquarantine_invalid_state_returns_409() -> None:
client, _, _, _ = _make_client()
response = client.post("/v1/admin/unquarantine", json={"slot_id": "slot001"})
assert response.status_code == 409
assert response.json()["error"]["code"] == "invalid_state"
def test_admin_reconcile_now_not_configured_returns_503() -> None:
client, _, _, _ = _make_client()
response = client.post("/v1/admin/reconcile-now")
assert response.status_code == 503
assert response.json()["error"]["code"] == "not_configured"
def test_admin_reconcile_now_success() -> None:
called = {"value": False}
def _reconcile_now() -> dict[str, object]:
called["value"] = True
return {"triggered": True}
client, _, _, _ = _make_client(reconcile_now=_reconcile_now)
response = client.post("/v1/admin/reconcile-now")
assert response.status_code == 200
assert response.json()["status"] == "accepted"
assert response.json()["triggered"] is True
assert called["value"] is True

View file

@ -130,6 +130,135 @@ class TestDescribeInstance:
assert info["tailscale_ip"] is None
assert info["launch_time"] == launch_time.isoformat()
@patch.object(
EC2Runtime,
"_read_tailscale_status",
return_value={
"Peer": {
"peer1": {
"HostName": "nix-builder-slot001-i-running1",
"Online": True,
"TailscaleIPs": ["100.64.0.10"],
}
}
},
)
def test_discovers_tailscale_ip_from_localapi(self, _mock_status):
ec2_client = boto3.client("ec2", region_name="us-east-1")
stubber = Stubber(ec2_client)
launch_time = datetime(2026, 1, 15, 12, 30, 0, tzinfo=UTC)
response = {
"Reservations": [
{
"Instances": [
{
"InstanceId": "i-running1",
"State": {"Code": 16, "Name": "running"},
"LaunchTime": launch_time,
"Tags": [{"Key": "AutoscalerSlot", "Value": "slot001"}],
}
],
}
],
}
stubber.add_response(
"describe_instances",
response,
{"InstanceIds": ["i-running1"]},
)
runtime = _make_runtime(stubber, ec2_client)
info = runtime.describe_instance("i-running1")
assert info["tailscale_ip"] == "100.64.0.10"
@patch.object(EC2Runtime, "_read_tailscale_status", return_value={"Peer": {}})
def test_discovery_unavailable_returns_none(self, _mock_status):
ec2_client = boto3.client("ec2", region_name="us-east-1")
stubber = Stubber(ec2_client)
launch_time = datetime(2026, 1, 15, 12, 30, 0, tzinfo=UTC)
response = {
"Reservations": [
{
"Instances": [
{
"InstanceId": "i-running1",
"State": {"Code": 16, "Name": "running"},
"LaunchTime": launch_time,
"Tags": [{"Key": "AutoscalerSlot", "Value": "slot001"}],
}
],
}
],
}
stubber.add_response(
"describe_instances",
response,
{"InstanceIds": ["i-running1"]},
)
runtime = _make_runtime(stubber, ec2_client)
info = runtime.describe_instance("i-running1")
assert info["tailscale_ip"] is None
@patch.object(
EC2Runtime,
"_read_tailscale_status",
return_value={
"Peer": {
"peer1": {
"HostName": "nix-builder-slot001-old",
"Online": True,
"TailscaleIPs": ["100.64.0.10"],
},
"peer2": {
"HostName": "nix-builder-slot001-new",
"Online": True,
"TailscaleIPs": ["100.64.0.11"],
},
}
},
)
def test_ambiguous_slot_match_returns_none(self, _mock_status):
ec2_client = boto3.client("ec2", region_name="us-east-1")
stubber = Stubber(ec2_client)
launch_time = datetime(2026, 1, 15, 12, 30, 0, tzinfo=UTC)
response = {
"Reservations": [
{
"Instances": [
{
"InstanceId": "i-running1",
"State": {"Code": 16, "Name": "running"},
"LaunchTime": launch_time,
"Tags": [{"Key": "AutoscalerSlot", "Value": "slot001"}],
}
],
}
],
}
stubber.add_response(
"describe_instances",
response,
{"InstanceIds": ["i-running1"]},
)
runtime = _make_runtime(stubber, ec2_client)
info = runtime.describe_instance("i-running1")
assert info["tailscale_ip"] is None
def test_localapi_permission_error_returns_none(self):
ec2_client = boto3.client("ec2", region_name="us-east-1")
runtime = EC2Runtime(_make_config(), _client=ec2_client)
with patch(
"nix_builder_autoscaler.runtime.ec2._UnixSocketHTTPConnection.connect",
side_effect=PermissionError,
):
assert runtime._read_tailscale_status() is None
def test_missing_instance_returns_terminated(self):
ec2_client = boto3.client("ec2", region_name="us-east-1")
stubber = Stubber(ec2_client)