diff --git a/agent/nix_builder_autoscaler/__main__.py b/agent/nix_builder_autoscaler/__main__.py index 4b0fce9..0bf3f32 100644 --- a/agent/nix_builder_autoscaler/__main__.py +++ b/agent/nix_builder_autoscaler/__main__.py @@ -1,23 +1,162 @@ -"""Daemon entry point: python -m nix_builder_autoscaler — stub for Plan 04.""" +"""Daemon entry point: python -m nix_builder_autoscaler.""" from __future__ import annotations import argparse -import sys +import logging +import signal +import threading +from pathlib import Path +from types import FrameType + +import uvicorn + +from .api import create_app +from .config import AppConfig, load_config +from .logging import setup_logging +from .metrics import MetricsRegistry +from .providers.clock import SystemClock +from .providers.haproxy import HAProxyRuntime +from .reconciler import Reconciler +from .runtime.ec2 import EC2Runtime +from .scheduler import scheduling_tick +from .state_db import StateDB + +log = logging.getLogger(__name__) -def main() -> None: - """Parse arguments and start the daemon.""" +def _scheduler_loop( + db: StateDB, + runtime: EC2Runtime, + config: AppConfig, + clock: SystemClock, + metrics: MetricsRegistry, + stop_event: threading.Event, +) -> None: + while not stop_event.is_set(): + try: + scheduling_tick(db, runtime, config, clock, metrics) + except Exception: + log.exception("scheduler_tick_failed") + stop_event.wait(config.scheduler.tick_seconds) + + +def _reconciler_loop( + reconciler: Reconciler, + config: AppConfig, + stop_event: threading.Event, +) -> None: + while not stop_event.is_set(): + try: + reconciler.tick() + except Exception: + log.exception("reconciler_tick_failed") + stop_event.wait(config.scheduler.reconcile_seconds) + + +def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( prog="nix-builder-autoscaler", description="Nix builder autoscaler daemon", ) parser.add_argument("--config", required=True, help="Path to TOML config file") - args = parser.parse_args() + return parser.parse_args() - print(f"nix-builder-autoscaler: would start with config {args.config}") - print("Full daemon implementation in Plan 04.") - sys.exit(0) + +def main() -> None: + """Parse config, initialize components, and run the daemon.""" + args = _parse_args() + config = load_config(Path(args.config)) + setup_logging(config.server.log_level) + + clock = SystemClock() + db = StateDB(config.server.db_path, clock=clock) + db.init_schema() + db.init_slots( + config.haproxy.slot_prefix, + config.haproxy.slot_count, + config.capacity.default_system, + config.haproxy.backend, + ) + + runtime = EC2Runtime(config.aws) + haproxy = HAProxyRuntime( + config.haproxy.runtime_socket, + config.haproxy.backend, + config.haproxy.slot_prefix, + ) + metrics = MetricsRegistry() + reconciler = Reconciler(db, runtime, haproxy, config, clock, metrics) + reconciler.tick() + + stop_event = threading.Event() + scheduler_thread: threading.Thread | None = None + reconciler_thread: threading.Thread | None = None + server: uvicorn.Server | None = None + + def scheduler_running() -> bool: + return scheduler_thread is not None and scheduler_thread.is_alive() + + def reconciler_running() -> bool: + return reconciler_thread is not None and reconciler_thread.is_alive() + + app = create_app( + db, + config, + clock, + metrics, + runtime=runtime, + haproxy=haproxy, + scheduler_running=scheduler_running, + reconciler_running=reconciler_running, + ) + + scheduler_thread = threading.Thread( + target=_scheduler_loop, + name="autoscaler-scheduler", + args=(db, runtime, config, clock, metrics, stop_event), + daemon=True, + ) + reconciler_thread = threading.Thread( + target=_reconciler_loop, + name="autoscaler-reconciler", + args=(reconciler, config, stop_event), + daemon=True, + ) + + scheduler_thread.start() + reconciler_thread.start() + + socket_path = Path(config.server.socket_path) + socket_path.parent.mkdir(parents=True, exist_ok=True) + if socket_path.exists(): + socket_path.unlink() + + uvicorn_config = uvicorn.Config( + app=app, + uds=config.server.socket_path, + log_level=config.server.log_level.lower(), + ) + server = uvicorn.Server(uvicorn_config) + + def _handle_signal(signum: int, _: FrameType | None) -> None: + log.info("shutdown_signal", extra={"signal": signum}) + stop_event.set() + if server is not None: + server.should_exit = True + + signal.signal(signal.SIGTERM, _handle_signal) + signal.signal(signal.SIGINT, _handle_signal) + + try: + server.run() + finally: + stop_event.set() + if scheduler_thread is not None: + scheduler_thread.join(timeout=10) + if reconciler_thread is not None: + reconciler_thread.join(timeout=10) + db.close() if __name__ == "__main__": diff --git a/agent/nix_builder_autoscaler/api.py b/agent/nix_builder_autoscaler/api.py index 1085b43..dae8074 100644 --- a/agent/nix_builder_autoscaler/api.py +++ b/agent/nix_builder_autoscaler/api.py @@ -1,19 +1,210 @@ -"""FastAPI application — stub for Plan 04.""" +"""FastAPI application for the autoscaler daemon.""" from __future__ import annotations -from fastapi import FastAPI +import logging +import uuid +from collections.abc import Callable +from datetime import datetime +from typing import TYPE_CHECKING, NoReturn + +from fastapi import FastAPI, HTTPException, Request, Response +from fastapi.responses import JSONResponse + +from .models import ( + CapacityHint, + ErrorDetail, + ErrorResponse, + HealthResponse, + ReservationPhase, + ReservationRequest, + ReservationResponse, + SlotInfo, + SlotState, + StateSummary, +) + +if TYPE_CHECKING: + from .config import AppConfig + from .metrics import MetricsRegistry + from .providers.clock import Clock + from .providers.haproxy import HAProxyRuntime + from .runtime.base import RuntimeAdapter + from .state_db import StateDB + +log = logging.getLogger(__name__) -def create_app() -> FastAPI: - """Create the FastAPI application. +def _parse_required_dt(value: str) -> datetime: + return datetime.fromisoformat(value) - Full implementation in Plan 04. - """ + +def _parse_optional_dt(value: str | None) -> datetime | None: + if value is None: + return None + return datetime.fromisoformat(value) + + +def _resv_to_response(resv: dict) -> ReservationResponse: + return ReservationResponse( + reservation_id=str(resv["reservation_id"]), + phase=ReservationPhase(str(resv["phase"])), + slot=resv.get("slot_id"), + instance_id=resv.get("instance_id"), + system=str(resv["system"]), + created_at=_parse_required_dt(str(resv["created_at"])), + updated_at=_parse_required_dt(str(resv["updated_at"])), + expires_at=_parse_required_dt(str(resv["expires_at"])), + released_at=_parse_optional_dt(resv.get("released_at")), + ) + + +def _slot_to_info(slot: dict) -> SlotInfo: + return SlotInfo( + slot_id=str(slot["slot_id"]), + system=str(slot["system"]), + state=SlotState(str(slot["state"])), + instance_id=slot.get("instance_id"), + instance_ip=slot.get("instance_ip"), + lease_count=int(slot["lease_count"]), + last_state_change=_parse_required_dt(str(slot["last_state_change"])), + ) + + +def _error_response( + request: Request, + status_code: int, + code: str, + message: str, + retryable: bool = False, +) -> NoReturn: + request_id = getattr(request.state, "request_id", str(uuid.uuid4())) + payload = ErrorResponse( + error=ErrorDetail(code=code, message=message, retryable=retryable), + request_id=request_id, + ) + raise HTTPException(status_code=status_code, detail=payload.model_dump(mode="json")) + + +def create_app( + db: StateDB, + config: AppConfig, + clock: Clock, + metrics: MetricsRegistry, + runtime: RuntimeAdapter | None = None, + haproxy: HAProxyRuntime | None = None, + scheduler_running: Callable[[], bool] | None = None, + reconciler_running: Callable[[], bool] | None = None, +) -> FastAPI: + """Create the FastAPI application.""" app = FastAPI(title="nix-builder-autoscaler", version="0.1.0") - @app.get("/health/live") - def health_live() -> dict[str, str]: - return {"status": "ok"} + app.state.db = db + app.state.config = config + app.state.clock = clock + app.state.metrics = metrics + app.state.runtime = runtime + app.state.haproxy = haproxy + + @app.middleware("http") + async def request_id_middleware(request: Request, call_next: Callable) -> Response: + request.state.request_id = str(uuid.uuid4()) + response = await call_next(request) + response.headers["x-request-id"] = request.state.request_id + return response + + @app.exception_handler(HTTPException) + async def http_exception_handler(request: Request, exc: HTTPException) -> JSONResponse: + detail = exc.detail + if isinstance(detail, dict) and "error" in detail and "request_id" in detail: + return JSONResponse(status_code=exc.status_code, content=detail) + + request_id = getattr(request.state, "request_id", str(uuid.uuid4())) + payload = ErrorResponse( + error=ErrorDetail( + code="http_error", + message=str(detail) if detail else "Request failed", + retryable=False, + ), + request_id=request_id, + ) + return JSONResponse(status_code=exc.status_code, content=payload.model_dump(mode="json")) + + @app.post("/v1/reservations", response_model=ReservationResponse) + def create_reservation(body: ReservationRequest) -> ReservationResponse: + resv = db.create_reservation( + body.system, + body.reason, + body.build_id, + config.capacity.reservation_ttl_seconds, + ) + return _resv_to_response(resv) + + @app.get("/v1/reservations/{reservation_id}", response_model=ReservationResponse) + def get_reservation(reservation_id: str, request: Request) -> ReservationResponse: + resv = db.get_reservation(reservation_id) + if resv is None: + _error_response(request, 404, "not_found", "Reservation not found") + return _resv_to_response(resv) + + @app.post("/v1/reservations/{reservation_id}/release", response_model=ReservationResponse) + def release_reservation(reservation_id: str, request: Request) -> ReservationResponse: + resv = db.release_reservation(reservation_id) + if resv is None: + _error_response(request, 404, "not_found", "Reservation not found") + return _resv_to_response(resv) + + @app.get("/v1/reservations", response_model=list[ReservationResponse]) + def list_reservations( + phase: ReservationPhase | None = None, + ) -> list[ReservationResponse]: + reservations = db.list_reservations(phase) + return [_resv_to_response(resv) for resv in reservations] + + @app.get("/v1/slots", response_model=list[SlotInfo]) + def list_slots() -> list[SlotInfo]: + slots = db.list_slots() + return [_slot_to_info(slot) for slot in slots] + + @app.get("/v1/state/summary", response_model=StateSummary) + def state_summary() -> StateSummary: + summary = db.get_state_summary() + return StateSummary.model_validate(summary) + + @app.post("/v1/hints/capacity") + def capacity_hint(hint: CapacityHint) -> dict[str, str]: + log.info( + "capacity_hint", + extra={ + "builder": hint.builder, + "queued": hint.queued, + "running": hint.running, + "system": hint.system, + "timestamp": hint.timestamp.isoformat(), + }, + ) + return {"status": "accepted"} + + @app.get("/health/live", response_model=HealthResponse) + def health_live() -> HealthResponse: + return HealthResponse(status="ok") + + @app.get("/health/ready", response_model=HealthResponse) + def health_ready() -> HealthResponse: + if scheduler_running is not None and not scheduler_running(): + return JSONResponse( # type: ignore[return-value] + status_code=503, + content=HealthResponse(status="degraded").model_dump(mode="json"), + ) + if reconciler_running is not None and not reconciler_running(): + return JSONResponse( # type: ignore[return-value] + status_code=503, + content=HealthResponse(status="degraded").model_dump(mode="json"), + ) + return HealthResponse(status="ok") + + @app.get("/metrics") + def metrics_endpoint() -> Response: + return Response(content=metrics.render(), media_type="text/plain") return app diff --git a/agent/nix_builder_autoscaler/cli.py b/agent/nix_builder_autoscaler/cli.py index d974f78..71a7a82 100644 --- a/agent/nix_builder_autoscaler/cli.py +++ b/agent/nix_builder_autoscaler/cli.py @@ -1,26 +1,173 @@ -"""autoscalerctl CLI entry point — stub for Plan 04.""" +"""autoscalerctl CLI entry point.""" from __future__ import annotations import argparse -import sys +import http.client +import json +import socket +from collections.abc import Sequence +from typing import Any + + +class UnixHTTPConnection(http.client.HTTPConnection): + """HTTPConnection that dials a Unix domain socket.""" + + def __init__(self, socket_path: str, timeout: float = 5.0) -> None: + super().__init__("localhost", timeout=timeout) + self._socket_path = socket_path + + def connect(self) -> None: + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock.connect(self._socket_path) + + +def _uds_request( + socket_path: str, + method: str, + path: str, + body: dict[str, Any] | None = None, +) -> tuple[int, dict[str, Any] | list[dict[str, Any]] | str]: + conn = UnixHTTPConnection(socket_path) + headers = {"Host": "localhost", "Accept": "application/json"} + payload: str | None = None + if body is not None: + payload = json.dumps(body) + headers["Content-Type"] = "application/json" + + try: + conn.request(method, path, body=payload, headers=headers) + resp = conn.getresponse() + raw = resp.read() + text = raw.decode() if raw else "" + content_type = resp.getheader("Content-Type", "") + if text and "application/json" in content_type: + parsed = json.loads(text) + if isinstance(parsed, dict | list): + return resp.status, parsed + return resp.status, text + finally: + conn.close() + + +def _print_table(headers: Sequence[str], rows: Sequence[Sequence[str]]) -> None: + widths = [len(h) for h in headers] + for row in rows: + for idx, cell in enumerate(row): + widths[idx] = max(widths[idx], len(cell)) + + header_line = " ".join(h.ljust(widths[idx]) for idx, h in enumerate(headers)) + separator = " ".join("-" * widths[idx] for idx in range(len(headers))) + print(header_line) + print(separator) + for row in rows: + print(" ".join(cell.ljust(widths[idx]) for idx, cell in enumerate(row))) + + +def _print_slots(data: list[dict[str, Any]]) -> None: + rows: list[list[str]] = [] + for slot in data: + rows.append( + [ + str(slot.get("slot_id", "")), + str(slot.get("state", "")), + str(slot.get("instance_id") or "-"), + str(slot.get("instance_ip") or "-"), + str(slot.get("lease_count", 0)), + ] + ) + _print_table(["slot_id", "state", "instance_id", "ip", "leases"], rows) + + +def _print_reservations(data: list[dict[str, Any]]) -> None: + rows: list[list[str]] = [] + for resv in data: + rows.append( + [ + str(resv.get("reservation_id", "")), + str(resv.get("phase", "")), + str(resv.get("system", "")), + str(resv.get("slot") or "-"), + str(resv.get("instance_id") or "-"), + ] + ) + _print_table(["reservation_id", "phase", "system", "slot", "instance_id"], rows) + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(prog="autoscalerctl", description="Autoscaler CLI") + parser.add_argument( + "--socket", + default="/run/nix-builder-autoscaler/daemon.sock", + help="Daemon Unix socket path", + ) + subparsers = parser.add_subparsers(dest="command") + subparsers.add_parser("status", help="Show state summary") + subparsers.add_parser("slots", help="List slots") + subparsers.add_parser("reservations", help="List reservations") + + parser_drain = subparsers.add_parser("drain", help="Drain a slot (not implemented)") + parser_drain.add_argument("slot_id") + parser_unq = subparsers.add_parser( + "unquarantine", + help="Unquarantine a slot (not implemented)", + ) + parser_unq.add_argument("slot_id") + subparsers.add_parser("reconcile-now", help="Run reconciler now (not implemented)") + return parser.parse_args() + + +def _print_error(data: object) -> None: + if isinstance(data, dict | list): + print(json.dumps(data, indent=2)) + else: + print(str(data)) def main() -> None: """Entry point for the autoscalerctl CLI.""" - parser = argparse.ArgumentParser(prog="autoscalerctl", description="Autoscaler CLI") - parser.add_argument( - "--socket", default="/run/nix-builder-autoscaler/daemon.sock", help="Daemon socket path" - ) - subparsers = parser.add_subparsers(dest="command") - subparsers.add_parser("status", help="Show daemon status") - subparsers.add_parser("slots", help="List slots") - subparsers.add_parser("reservations", help="List reservations") - - args = parser.parse_args() + args = _parse_args() if not args.command: - parser.print_help() - sys.exit(1) + raise SystemExit(1) - print(f"autoscalerctl: command '{args.command}' not yet implemented") - sys.exit(1) + if args.command in {"drain", "unquarantine", "reconcile-now"}: + print(f"{args.command}: not yet implemented in API v1") + raise SystemExit(0) + + endpoint_map = { + "status": "/v1/state/summary", + "slots": "/v1/slots", + "reservations": "/v1/reservations", + } + path = endpoint_map[args.command] + + try: + status, data = _uds_request(args.socket, "GET", path) + except OSError as err: + print(f"Error: cannot connect to daemon at {args.socket}") + raise SystemExit(1) from err + + if status < 200 or status >= 300: + _print_error(data) + raise SystemExit(1) + + if args.command == "status": + print(json.dumps(data, indent=2)) + elif args.command == "slots": + if isinstance(data, list): + _print_slots(data) + else: + _print_error(data) + raise SystemExit(1) + elif args.command == "reservations": + if isinstance(data, list): + _print_reservations(data) + else: + _print_error(data) + raise SystemExit(1) + + raise SystemExit(0) + + +if __name__ == "__main__": + main() diff --git a/agent/nix_builder_autoscaler/tests/test_reservations_api.py b/agent/nix_builder_autoscaler/tests/test_reservations_api.py index 9e993fb..8a11db1 100644 --- a/agent/nix_builder_autoscaler/tests/test_reservations_api.py +++ b/agent/nix_builder_autoscaler/tests/test_reservations_api.py @@ -1 +1,152 @@ -"""Reservations API unit tests — Plan 04.""" +"""Reservations API unit tests.""" + +from __future__ import annotations + +from datetime import UTC, datetime + +from fastapi.testclient import TestClient + +from nix_builder_autoscaler.api import create_app +from nix_builder_autoscaler.config import AppConfig, CapacityConfig +from nix_builder_autoscaler.metrics import MetricsRegistry +from nix_builder_autoscaler.providers.clock import FakeClock +from nix_builder_autoscaler.state_db import StateDB + + +def _make_client() -> tuple[TestClient, StateDB, FakeClock, MetricsRegistry]: + clock = FakeClock() + db = StateDB(":memory:", clock=clock) + db.init_schema() + db.init_slots("slot", 3, "x86_64-linux", "all") + config = AppConfig(capacity=CapacityConfig(reservation_ttl_seconds=1200)) + metrics = MetricsRegistry() + app = create_app(db, config, clock, metrics) + return TestClient(app), db, clock, metrics + + +def test_create_reservation_returns_200() -> None: + client, _, _, _ = _make_client() + response = client.post("/v1/reservations", json={"system": "x86_64-linux", "reason": "test"}) + assert response.status_code == 200 + body = response.json() + assert body["reservation_id"].startswith("resv_") + assert body["phase"] == "pending" + assert body["system"] == "x86_64-linux" + assert "created_at" in body + assert "expires_at" in body + + +def test_get_reservation_returns_current_phase() -> None: + client, _, _, _ = _make_client() + created = client.post("/v1/reservations", json={"system": "x86_64-linux", "reason": "test"}) + reservation_id = created.json()["reservation_id"] + response = client.get(f"/v1/reservations/{reservation_id}") + assert response.status_code == 200 + assert response.json()["phase"] == "pending" + + +def test_release_reservation_moves_to_released() -> None: + client, _, _, _ = _make_client() + created = client.post("/v1/reservations", json={"system": "x86_64-linux", "reason": "test"}) + reservation_id = created.json()["reservation_id"] + response = client.post(f"/v1/reservations/{reservation_id}/release") + assert response.status_code == 200 + assert response.json()["phase"] == "released" + + +def test_unknown_reservation_returns_404() -> None: + client, _, _, _ = _make_client() + response = client.get("/v1/reservations/resv_nonexistent") + assert response.status_code == 404 + body = response.json() + assert body["error"]["code"] == "not_found" + assert "request_id" in body + + +def test_malformed_body_returns_422() -> None: + client, _, _, _ = _make_client() + response = client.post("/v1/reservations", json={"invalid": "data"}) + assert response.status_code == 422 + + +def test_list_reservations_returns_all() -> None: + client, _, _, _ = _make_client() + client.post("/v1/reservations", json={"system": "x86_64-linux", "reason": "a"}) + client.post("/v1/reservations", json={"system": "x86_64-linux", "reason": "b"}) + response = client.get("/v1/reservations") + assert response.status_code == 200 + assert len(response.json()) == 2 + + +def test_list_reservations_filters_by_phase() -> None: + client, _, _, _ = _make_client() + created = client.post("/v1/reservations", json={"system": "x86_64-linux", "reason": "test"}) + reservation_id = created.json()["reservation_id"] + client.post(f"/v1/reservations/{reservation_id}/release") + response = client.get("/v1/reservations?phase=released") + assert response.status_code == 200 + body = response.json() + assert len(body) == 1 + assert body[0]["phase"] == "released" + + +def test_list_slots_returns_all_slots() -> None: + client, _, _, _ = _make_client() + response = client.get("/v1/slots") + assert response.status_code == 200 + assert len(response.json()) == 3 + + +def test_state_summary_returns_counts() -> None: + client, _, _, _ = _make_client() + response = client.get("/v1/state/summary") + assert response.status_code == 200 + body = response.json() + assert body["slots"]["total"] == 3 + assert body["slots"]["empty"] == 3 + + +def test_health_live_returns_ok() -> None: + client, _, _, _ = _make_client() + response = client.get("/health/live") + assert response.status_code == 200 + assert response.json()["status"] == "ok" + + +def test_health_ready_returns_ok_when_no_checks() -> None: + client, _, _, _ = _make_client() + response = client.get("/health/ready") + assert response.status_code == 200 + assert response.json()["status"] == "ok" + + +def test_metrics_returns_prometheus_text() -> None: + client, _, _, metrics = _make_client() + metrics.counter("autoscaler_test_counter", {}, 1.0) + response = client.get("/metrics") + assert response.status_code == 200 + assert "text/plain" in response.headers["content-type"] + assert "autoscaler_test_counter" in response.text + + +def test_capacity_hint_accepted() -> None: + client, _, _, _ = _make_client() + response = client.post( + "/v1/hints/capacity", + json={ + "builder": "buildbot", + "queued": 2, + "running": 4, + "system": "x86_64-linux", + "timestamp": datetime(2026, 1, 1, tzinfo=UTC).isoformat(), + }, + ) + assert response.status_code == 200 + assert response.json()["status"] == "accepted" + + +def test_release_nonexistent_returns_404() -> None: + client, _, _, _ = _make_client() + response = client.post("/v1/reservations/resv_nonexistent/release") + assert response.status_code == 404 + assert response.json()["error"]["code"] == "not_found"