"""FastAPI application for the autoscaler daemon.""" from __future__ import annotations import logging import uuid from collections.abc import Callable from datetime import datetime from typing import TYPE_CHECKING, NoReturn from fastapi import FastAPI, HTTPException, Request, Response from fastapi.responses import JSONResponse from pydantic import BaseModel from .models import ( CapacityHint, CapacityPolicy, EffectiveConfigResponse, ErrorDetail, ErrorResponse, HealthResponse, ReservationPhase, ReservationRequest, ReservationResponse, SchedulerPolicy, SlotInfo, SlotState, StateSummary, ) if TYPE_CHECKING: from .config import AppConfig from .metrics import MetricsRegistry from .providers.clock import Clock from .providers.haproxy import HAProxyRuntime from .runtime.base import RuntimeAdapter from .state_db import StateDB log = logging.getLogger(__name__) class SlotAdminRequest(BaseModel): """Admin action request that targets a slot.""" slot_id: str def _parse_required_dt(value: str) -> datetime: return datetime.fromisoformat(value) def _parse_optional_dt(value: str | None) -> datetime | None: if value is None: return None return datetime.fromisoformat(value) def _resv_to_response(resv: dict) -> ReservationResponse: return ReservationResponse( reservation_id=str(resv["reservation_id"]), phase=ReservationPhase(str(resv["phase"])), slot=resv.get("slot_id"), instance_id=resv.get("instance_id"), system=str(resv["system"]), created_at=_parse_required_dt(str(resv["created_at"])), updated_at=_parse_required_dt(str(resv["updated_at"])), expires_at=_parse_required_dt(str(resv["expires_at"])), released_at=_parse_optional_dt(resv.get("released_at")), ) def _slot_to_info(slot: dict) -> SlotInfo: return SlotInfo( slot_id=str(slot["slot_id"]), system=str(slot["system"]), state=SlotState(str(slot["state"])), instance_id=slot.get("instance_id"), instance_ip=slot.get("instance_ip"), lease_count=int(slot["lease_count"]), last_state_change=_parse_required_dt(str(slot["last_state_change"])), ) def _error_response( request: Request, status_code: int, code: str, message: str, retryable: bool = False, ) -> NoReturn: request_id = getattr(request.state, "request_id", str(uuid.uuid4())) payload = ErrorResponse( error=ErrorDetail(code=code, message=message, retryable=retryable), request_id=request_id, ) raise HTTPException(status_code=status_code, detail=payload.model_dump(mode="json")) def create_app( db: StateDB, config: AppConfig, clock: Clock, metrics: MetricsRegistry, runtime: RuntimeAdapter | None = None, haproxy: HAProxyRuntime | None = None, scheduler_running: Callable[[], bool] | None = None, reconciler_running: Callable[[], bool] | None = None, ready_check: Callable[[], bool] | None = None, reconcile_now: Callable[[], dict[str, object] | None] | None = None, ) -> FastAPI: """Create the FastAPI application.""" app = FastAPI(title="nix-builder-autoscaler", version="0.1.0") app.state.db = db app.state.config = config app.state.clock = clock app.state.metrics = metrics app.state.runtime = runtime app.state.haproxy = haproxy @app.middleware("http") async def request_id_middleware(request: Request, call_next: Callable) -> Response: request.state.request_id = str(uuid.uuid4()) response = await call_next(request) response.headers["x-request-id"] = request.state.request_id return response @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException) -> JSONResponse: detail = exc.detail if isinstance(detail, dict) and "error" in detail and "request_id" in detail: return JSONResponse(status_code=exc.status_code, content=detail) request_id = getattr(request.state, "request_id", str(uuid.uuid4())) payload = ErrorResponse( error=ErrorDetail( code="http_error", message=str(detail) if detail else "Request failed", retryable=False, ), request_id=request_id, ) return JSONResponse(status_code=exc.status_code, content=payload.model_dump(mode="json")) @app.post("/v1/reservations", response_model=ReservationResponse) def create_reservation(body: ReservationRequest) -> ReservationResponse: resv = db.create_reservation( body.system, body.reason, body.build_id, config.capacity.reservation_ttl_seconds, ) return _resv_to_response(resv) @app.get("/v1/reservations/{reservation_id}", response_model=ReservationResponse) def get_reservation(reservation_id: str, request: Request) -> ReservationResponse: resv = db.get_reservation(reservation_id) if resv is None: _error_response(request, 404, "not_found", "Reservation not found") return _resv_to_response(resv) @app.post("/v1/reservations/{reservation_id}/release", response_model=ReservationResponse) def release_reservation(reservation_id: str, request: Request) -> ReservationResponse: resv = db.release_reservation(reservation_id) if resv is None: _error_response(request, 404, "not_found", "Reservation not found") return _resv_to_response(resv) @app.get("/v1/reservations", response_model=list[ReservationResponse]) def list_reservations( phase: ReservationPhase | None = None, ) -> list[ReservationResponse]: reservations = db.list_reservations(phase) return [_resv_to_response(resv) for resv in reservations] @app.get("/v1/slots", response_model=list[SlotInfo]) def list_slots() -> list[SlotInfo]: slots = db.list_slots() return [_slot_to_info(slot) for slot in slots] @app.get("/v1/state/summary", response_model=StateSummary) def state_summary() -> StateSummary: summary = db.get_state_summary() return StateSummary.model_validate(summary) @app.get("/v1/config/effective", response_model=EffectiveConfigResponse) def effective_config() -> EffectiveConfigResponse: return EffectiveConfigResponse( capacity=CapacityPolicy( min_slots=config.capacity.min_slots, max_slots=config.capacity.max_slots, target_warm_slots=config.capacity.target_warm_slots, max_leases_per_slot=config.capacity.max_leases_per_slot, idle_scale_down_seconds=config.capacity.idle_scale_down_seconds, drain_timeout_seconds=config.capacity.drain_timeout_seconds, launch_timeout_seconds=config.capacity.launch_timeout_seconds, boot_timeout_seconds=config.capacity.boot_timeout_seconds, binding_timeout_seconds=config.capacity.binding_timeout_seconds, terminating_timeout_seconds=config.capacity.terminating_timeout_seconds, termination_cooldown_seconds=config.capacity.termination_cooldown_seconds, ), scheduler=SchedulerPolicy( tick_seconds=config.scheduler.tick_seconds, reconcile_seconds=config.scheduler.reconcile_seconds, ), ) @app.post("/v1/hints/capacity") def capacity_hint(hint: CapacityHint) -> dict[str, str]: log.info( "capacity_hint", extra={ "builder": hint.builder, "queued": hint.queued, "running": hint.running, "system": hint.system, "timestamp": hint.timestamp.isoformat(), }, ) return {"status": "accepted"} @app.get("/health/live", response_model=HealthResponse) def health_live() -> HealthResponse: return HealthResponse(status="ok") @app.get("/health/ready", response_model=HealthResponse) def health_ready() -> HealthResponse: if ready_check is not None and not ready_check(): return JSONResponse( # type: ignore[return-value] status_code=503, content=HealthResponse(status="degraded").model_dump(mode="json"), ) if scheduler_running is not None and not scheduler_running(): return JSONResponse( # type: ignore[return-value] status_code=503, content=HealthResponse(status="degraded").model_dump(mode="json"), ) if reconciler_running is not None and not reconciler_running(): return JSONResponse( # type: ignore[return-value] status_code=503, content=HealthResponse(status="degraded").model_dump(mode="json"), ) return HealthResponse(status="ok") @app.get("/metrics") def metrics_endpoint() -> Response: return Response(content=metrics.render(), media_type="text/plain") @app.post("/v1/admin/drain") def admin_drain(body: SlotAdminRequest, request: Request) -> dict[str, str]: slot = db.get_slot(body.slot_id) if slot is None: _error_response(request, 404, "not_found", "Slot not found") state = str(slot["state"]) if state == SlotState.DRAINING.value or state == SlotState.TERMINATING.value: return {"status": "accepted", "slot_id": body.slot_id, "state": state} allowed_states = { SlotState.READY.value, SlotState.BINDING.value, SlotState.BOOTING.value, SlotState.LAUNCHING.value, } if state not in allowed_states: _error_response( request, 409, "invalid_state", f"Cannot drain slot from state {state}", ) db.update_slot_state(body.slot_id, SlotState.DRAINING, interruption_pending=0) return {"status": "accepted", "slot_id": body.slot_id, "state": SlotState.DRAINING.value} @app.post("/v1/admin/unquarantine") def admin_unquarantine(body: SlotAdminRequest, request: Request) -> dict[str, str]: slot = db.get_slot(body.slot_id) if slot is None: _error_response(request, 404, "not_found", "Slot not found") state = str(slot["state"]) if state != SlotState.ERROR.value: _error_response( request, 409, "invalid_state", f"Cannot unquarantine slot from state {state}", ) db.update_slot_state( body.slot_id, SlotState.EMPTY, instance_id=None, instance_ip=None, instance_launch_time=None, lease_count=0, cooldown_until=None, interruption_pending=0, ) return {"status": "accepted", "slot_id": body.slot_id, "state": SlotState.EMPTY.value} @app.post("/v1/admin/reconcile-now") def admin_reconcile_now(request: Request) -> dict[str, object]: if reconcile_now is None: _error_response( request, 503, "not_configured", "Reconcile trigger not configured", retryable=True, ) try: result = reconcile_now() except Exception: log.exception("admin_reconcile_now_failed") _error_response( request, 500, "reconcile_failed", "Reconcile tick failed", retryable=True, ) payload: dict[str, object] = {"status": "accepted"} if isinstance(result, dict): payload.update(result) return payload return app