From 8fdf2d5e5b58bae0eb10f535053bb37da6e722da Mon Sep 17 00:00:00 2001 From: Abel Luck Date: Fri, 27 Feb 2026 16:43:52 +0100 Subject: [PATCH] add slot ttl output with effective timeout config --- agent/nix_builder_autoscaler/api.py | 24 +++++ agent/nix_builder_autoscaler/cli.py | 95 ++++++++++++++++++- agent/nix_builder_autoscaler/models.py | 29 ++++++ .../nix_builder_autoscaler/tests/test_cli.py | 56 ++++++++++- .../tests/test_reservations_api.py | 11 +++ 5 files changed, 211 insertions(+), 4 deletions(-) diff --git a/agent/nix_builder_autoscaler/api.py b/agent/nix_builder_autoscaler/api.py index 3df95f5..55a92e0 100644 --- a/agent/nix_builder_autoscaler/api.py +++ b/agent/nix_builder_autoscaler/api.py @@ -14,12 +14,15 @@ from pydantic import BaseModel from .models import ( CapacityHint, + CapacityPolicy, + EffectiveConfigResponse, ErrorDetail, ErrorResponse, HealthResponse, ReservationPhase, ReservationRequest, ReservationResponse, + SchedulerPolicy, SlotInfo, SlotState, StateSummary, @@ -180,6 +183,27 @@ def create_app( summary = db.get_state_summary() return StateSummary.model_validate(summary) + @app.get("/v1/config/effective", response_model=EffectiveConfigResponse) + def effective_config() -> EffectiveConfigResponse: + return EffectiveConfigResponse( + capacity=CapacityPolicy( + min_slots=config.capacity.min_slots, + max_slots=config.capacity.max_slots, + target_warm_slots=config.capacity.target_warm_slots, + max_leases_per_slot=config.capacity.max_leases_per_slot, + idle_scale_down_seconds=config.capacity.idle_scale_down_seconds, + drain_timeout_seconds=config.capacity.drain_timeout_seconds, + launch_timeout_seconds=config.capacity.launch_timeout_seconds, + boot_timeout_seconds=config.capacity.boot_timeout_seconds, + binding_timeout_seconds=config.capacity.binding_timeout_seconds, + terminating_timeout_seconds=config.capacity.terminating_timeout_seconds, + ), + scheduler=SchedulerPolicy( + tick_seconds=config.scheduler.tick_seconds, + reconcile_seconds=config.scheduler.reconcile_seconds, + ), + ) + @app.post("/v1/hints/capacity") def capacity_hint(hint: CapacityHint) -> dict[str, str]: log.info( diff --git a/agent/nix_builder_autoscaler/cli.py b/agent/nix_builder_autoscaler/cli.py index eb3ff70..a084273 100644 --- a/agent/nix_builder_autoscaler/cli.py +++ b/agent/nix_builder_autoscaler/cli.py @@ -7,6 +7,7 @@ import http.client import json import socket from collections.abc import Sequence +from datetime import UTC, datetime from typing import Any @@ -64,7 +65,81 @@ def _print_table(headers: Sequence[str], rows: Sequence[Sequence[str]]) -> None: print(" ".join(cell.ljust(widths[idx]) for idx, cell in enumerate(row))) -def _print_slots(data: list[dict[str, Any]]) -> None: +def _format_duration(seconds: float) -> str: + total = int(max(0, round(seconds))) + hours, rem = divmod(total, 3600) + minutes, secs = divmod(rem, 60) + if hours > 0: + return f"{hours}h{minutes:02d}m" + if minutes > 0: + return f"{minutes}m{secs:02d}s" + return f"{secs}s" + + +def _format_timeout_ttl(timeout_seconds: float, age_seconds: float) -> str: + remaining = timeout_seconds - age_seconds + if remaining <= 0: + return "due" + return _format_duration(remaining) + + +def _slot_age_seconds(slot: dict[str, Any]) -> float | None: + raw = slot.get("last_state_change") + if not isinstance(raw, str): + return None + try: + dt = datetime.fromisoformat(raw) + except ValueError: + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=UTC) + return (datetime.now(UTC) - dt).total_seconds() + + +def _slot_ttl(slot: dict[str, Any], policy: dict[str, Any] | None, active_slots: int) -> str: + if policy is None: + return "-" + + capacity = policy.get("capacity") + scheduler = policy.get("scheduler") + if not isinstance(capacity, dict) or not isinstance(scheduler, dict): + return "-" + + state = str(slot.get("state", "")) + lease_count = int(slot.get("lease_count", 0)) + age_seconds = _slot_age_seconds(slot) + + if state in {"empty", "error"}: + return "-" + if age_seconds is None: + return "?" + + if state == "launching": + return _format_timeout_ttl(float(capacity.get("launch_timeout_seconds", 0)), age_seconds) + if state == "booting": + return _format_timeout_ttl(float(capacity.get("boot_timeout_seconds", 0)), age_seconds) + if state == "binding": + return _format_timeout_ttl(float(capacity.get("binding_timeout_seconds", 0)), age_seconds) + if state == "terminating": + return _format_timeout_ttl( + float(capacity.get("terminating_timeout_seconds", 0)), age_seconds + ) + if state == "draining": + if lease_count == 0: + return f"<={_format_duration(float(scheduler.get('reconcile_seconds', 0)))}" + return _format_timeout_ttl(float(capacity.get("drain_timeout_seconds", 0)), age_seconds) + if state == "ready": + if lease_count > 0: + return "-" + min_slots = int(capacity.get("min_slots", 0)) + if active_slots <= min_slots: + return "pinned" + return _format_timeout_ttl(float(capacity.get("idle_scale_down_seconds", 0)), age_seconds) + return "-" + + +def _print_slots(data: list[dict[str, Any]], policy: dict[str, Any] | None = None) -> None: + active_slots = sum(1 for slot in data if str(slot.get("state", "")) not in {"empty", "error"}) rows: list[list[str]] = [] for slot in data: rows.append( @@ -74,9 +149,10 @@ def _print_slots(data: list[dict[str, Any]]) -> None: str(slot.get("instance_id") or "-"), str(slot.get("instance_ip") or "-"), str(slot.get("lease_count", 0)), + _slot_ttl(slot, policy, active_slots), ] ) - _print_table(["slot_id", "state", "instance_id", "ip", "leases"], rows) + _print_table(["slot_id", "state", "instance_id", "ip", "leases", "ttl"], rows) def _print_reservations(data: list[dict[str, Any]]) -> None: @@ -117,6 +193,18 @@ def _print_status_summary(data: dict[str, Any]) -> None: _print_table(["metric", "value"], rows) +def _get_effective_config(socket_path: str) -> dict[str, Any] | None: + try: + status, data = _uds_request(socket_path, "GET", "/v1/config/effective") + except OSError: + return None + if status < 200 or status >= 300: + return None + if isinstance(data, dict): + return data + return None + + def _bulk_slot_action(socket_path: str, action: str) -> dict[str, Any]: if action == "drain": eligible_states = {"ready"} @@ -307,7 +395,8 @@ def main() -> None: print(json.dumps(data, indent=2)) elif args.command == "slots": if isinstance(data, list): - _print_slots(data) + policy = _get_effective_config(args.socket) + _print_slots(data, policy) else: _print_error(data) raise SystemExit(1) diff --git a/agent/nix_builder_autoscaler/models.py b/agent/nix_builder_autoscaler/models.py index 8ca77a0..e8bd3e6 100644 --- a/agent/nix_builder_autoscaler/models.py +++ b/agent/nix_builder_autoscaler/models.py @@ -131,6 +131,35 @@ class StateSummary(BaseModel): haproxy: HaproxySummary = Field(default_factory=HaproxySummary) +class CapacityPolicy(BaseModel): + """Effective capacity timeout and sizing policy.""" + + min_slots: int + max_slots: int + target_warm_slots: int + max_leases_per_slot: int + idle_scale_down_seconds: int + drain_timeout_seconds: int + launch_timeout_seconds: int + boot_timeout_seconds: int + binding_timeout_seconds: int + terminating_timeout_seconds: int + + +class SchedulerPolicy(BaseModel): + """Effective scheduler timing policy.""" + + tick_seconds: float + reconcile_seconds: float + + +class EffectiveConfigResponse(BaseModel): + """GET /v1/config/effective response.""" + + capacity: CapacityPolicy + scheduler: SchedulerPolicy + + class ErrorDetail(BaseModel): """Structured error detail.""" diff --git a/agent/nix_builder_autoscaler/tests/test_cli.py b/agent/nix_builder_autoscaler/tests/test_cli.py index 2440b7d..caa11e8 100644 --- a/agent/nix_builder_autoscaler/tests/test_cli.py +++ b/agent/nix_builder_autoscaler/tests/test_cli.py @@ -2,10 +2,12 @@ from __future__ import annotations +from datetime import UTC, datetime, timedelta + import pytest from nix_builder_autoscaler import cli -from nix_builder_autoscaler.cli import _parse_args, _print_status_summary +from nix_builder_autoscaler.cli import _parse_args, _print_slots, _print_status_summary, _slot_ttl def test_parse_args_without_command_prints_help_and_exits_zero( @@ -95,3 +97,55 @@ def test_bulk_unquarantine_only_targets_error_slots(monkeypatch: pytest.MonkeyPa assert summary["succeeded"] == 1 assert summary["failed"] == 0 assert summary["skipped"] == 1 + + +def test_slot_ttl_ready_pinned_at_min_slots() -> None: + now = datetime.now(UTC) + slot = { + "state": "ready", + "lease_count": 0, + "last_state_change": (now - timedelta(seconds=60)).isoformat(), + } + policy = { + "capacity": { + "min_slots": 1, + "idle_scale_down_seconds": 900, + "launch_timeout_seconds": 300, + "boot_timeout_seconds": 300, + "binding_timeout_seconds": 180, + "drain_timeout_seconds": 120, + "terminating_timeout_seconds": 300, + }, + "scheduler": {"reconcile_seconds": 15.0}, + } + assert _slot_ttl(slot, policy, active_slots=1) == "pinned" + + +def test_print_slots_includes_ttl_column(capsys: pytest.CaptureFixture[str]) -> None: + now = datetime.now(UTC) + slots = [ + { + "slot_id": "slot001", + "state": "launching", + "instance_id": "i-123", + "instance_ip": None, + "lease_count": 0, + "last_state_change": (now - timedelta(seconds=20)).isoformat(), + } + ] + policy = { + "capacity": { + "min_slots": 0, + "idle_scale_down_seconds": 900, + "launch_timeout_seconds": 300, + "boot_timeout_seconds": 300, + "binding_timeout_seconds": 180, + "drain_timeout_seconds": 120, + "terminating_timeout_seconds": 300, + }, + "scheduler": {"reconcile_seconds": 15.0}, + } + _print_slots(slots, policy) + out = capsys.readouterr().out + assert "ttl" in out + assert "slot001" in out diff --git a/agent/nix_builder_autoscaler/tests/test_reservations_api.py b/agent/nix_builder_autoscaler/tests/test_reservations_api.py index 2d95282..b4af2a2 100644 --- a/agent/nix_builder_autoscaler/tests/test_reservations_api.py +++ b/agent/nix_builder_autoscaler/tests/test_reservations_api.py @@ -111,6 +111,17 @@ def test_state_summary_returns_counts() -> None: assert body["slots"]["empty"] == 3 +def test_effective_config_returns_capacity_and_scheduler() -> None: + client, _, _, _ = _make_client() + response = client.get("/v1/config/effective") + assert response.status_code == 200 + body = response.json() + assert body["capacity"]["max_slots"] == 8 + assert body["capacity"]["idle_scale_down_seconds"] == 900 + assert body["scheduler"]["tick_seconds"] == 3.0 + assert body["scheduler"]["reconcile_seconds"] == 15.0 + + def test_health_live_returns_ok() -> None: client, _, _, _ = _make_client() response = client.get("/health/live")