2026-02-27 12:46:32 +01:00
|
|
|
"""FastAPI application for the autoscaler daemon."""
|
2026-02-27 11:59:16 +01:00
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
2026-02-27 12:46:32 +01:00
|
|
|
import logging
|
|
|
|
|
import uuid
|
|
|
|
|
from collections.abc import Callable
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
from typing import TYPE_CHECKING, NoReturn
|
2026-02-27 11:59:16 +01:00
|
|
|
|
2026-02-27 12:46:32 +01:00
|
|
|
from fastapi import FastAPI, HTTPException, Request, Response
|
|
|
|
|
from fastapi.responses import JSONResponse
|
2026-02-27 13:48:52 +01:00
|
|
|
from pydantic import BaseModel
|
2026-02-27 11:59:16 +01:00
|
|
|
|
2026-02-27 12:46:32 +01:00
|
|
|
from .models import (
|
|
|
|
|
CapacityHint,
|
2026-02-27 16:43:52 +01:00
|
|
|
CapacityPolicy,
|
|
|
|
|
EffectiveConfigResponse,
|
2026-02-27 12:46:32 +01:00
|
|
|
ErrorDetail,
|
|
|
|
|
ErrorResponse,
|
|
|
|
|
HealthResponse,
|
|
|
|
|
ReservationPhase,
|
|
|
|
|
ReservationRequest,
|
|
|
|
|
ReservationResponse,
|
2026-02-27 16:43:52 +01:00
|
|
|
SchedulerPolicy,
|
2026-02-27 12:46:32 +01:00
|
|
|
SlotInfo,
|
|
|
|
|
SlotState,
|
|
|
|
|
StateSummary,
|
|
|
|
|
)
|
2026-02-27 11:59:16 +01:00
|
|
|
|
2026-02-27 12:46:32 +01:00
|
|
|
if TYPE_CHECKING:
|
|
|
|
|
from .config import AppConfig
|
|
|
|
|
from .metrics import MetricsRegistry
|
|
|
|
|
from .providers.clock import Clock
|
|
|
|
|
from .providers.haproxy import HAProxyRuntime
|
|
|
|
|
from .runtime.base import RuntimeAdapter
|
|
|
|
|
from .state_db import StateDB
|
|
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
2026-02-27 13:48:52 +01:00
|
|
|
class SlotAdminRequest(BaseModel):
|
|
|
|
|
"""Admin action request that targets a slot."""
|
|
|
|
|
|
|
|
|
|
slot_id: str
|
|
|
|
|
|
|
|
|
|
|
2026-02-27 12:46:32 +01:00
|
|
|
def _parse_required_dt(value: str) -> datetime:
|
|
|
|
|
return datetime.fromisoformat(value)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_optional_dt(value: str | None) -> datetime | None:
|
|
|
|
|
if value is None:
|
|
|
|
|
return None
|
|
|
|
|
return datetime.fromisoformat(value)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _resv_to_response(resv: dict) -> ReservationResponse:
|
|
|
|
|
return ReservationResponse(
|
|
|
|
|
reservation_id=str(resv["reservation_id"]),
|
|
|
|
|
phase=ReservationPhase(str(resv["phase"])),
|
|
|
|
|
slot=resv.get("slot_id"),
|
|
|
|
|
instance_id=resv.get("instance_id"),
|
|
|
|
|
system=str(resv["system"]),
|
|
|
|
|
created_at=_parse_required_dt(str(resv["created_at"])),
|
|
|
|
|
updated_at=_parse_required_dt(str(resv["updated_at"])),
|
|
|
|
|
expires_at=_parse_required_dt(str(resv["expires_at"])),
|
|
|
|
|
released_at=_parse_optional_dt(resv.get("released_at")),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _slot_to_info(slot: dict) -> SlotInfo:
|
|
|
|
|
return SlotInfo(
|
|
|
|
|
slot_id=str(slot["slot_id"]),
|
|
|
|
|
system=str(slot["system"]),
|
|
|
|
|
state=SlotState(str(slot["state"])),
|
|
|
|
|
instance_id=slot.get("instance_id"),
|
|
|
|
|
instance_ip=slot.get("instance_ip"),
|
|
|
|
|
lease_count=int(slot["lease_count"]),
|
|
|
|
|
last_state_change=_parse_required_dt(str(slot["last_state_change"])),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _error_response(
|
|
|
|
|
request: Request,
|
|
|
|
|
status_code: int,
|
|
|
|
|
code: str,
|
|
|
|
|
message: str,
|
|
|
|
|
retryable: bool = False,
|
|
|
|
|
) -> NoReturn:
|
|
|
|
|
request_id = getattr(request.state, "request_id", str(uuid.uuid4()))
|
|
|
|
|
payload = ErrorResponse(
|
|
|
|
|
error=ErrorDetail(code=code, message=message, retryable=retryable),
|
|
|
|
|
request_id=request_id,
|
|
|
|
|
)
|
|
|
|
|
raise HTTPException(status_code=status_code, detail=payload.model_dump(mode="json"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_app(
|
|
|
|
|
db: StateDB,
|
|
|
|
|
config: AppConfig,
|
|
|
|
|
clock: Clock,
|
|
|
|
|
metrics: MetricsRegistry,
|
|
|
|
|
runtime: RuntimeAdapter | None = None,
|
|
|
|
|
haproxy: HAProxyRuntime | None = None,
|
|
|
|
|
scheduler_running: Callable[[], bool] | None = None,
|
|
|
|
|
reconciler_running: Callable[[], bool] | None = None,
|
2026-02-27 13:48:52 +01:00
|
|
|
ready_check: Callable[[], bool] | None = None,
|
|
|
|
|
reconcile_now: Callable[[], dict[str, object] | None] | None = None,
|
2026-02-27 12:46:32 +01:00
|
|
|
) -> FastAPI:
|
|
|
|
|
"""Create the FastAPI application."""
|
2026-02-27 11:59:16 +01:00
|
|
|
app = FastAPI(title="nix-builder-autoscaler", version="0.1.0")
|
|
|
|
|
|
2026-02-27 12:46:32 +01:00
|
|
|
app.state.db = db
|
|
|
|
|
app.state.config = config
|
|
|
|
|
app.state.clock = clock
|
|
|
|
|
app.state.metrics = metrics
|
|
|
|
|
app.state.runtime = runtime
|
|
|
|
|
app.state.haproxy = haproxy
|
|
|
|
|
|
|
|
|
|
@app.middleware("http")
|
|
|
|
|
async def request_id_middleware(request: Request, call_next: Callable) -> Response:
|
|
|
|
|
request.state.request_id = str(uuid.uuid4())
|
|
|
|
|
response = await call_next(request)
|
|
|
|
|
response.headers["x-request-id"] = request.state.request_id
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
@app.exception_handler(HTTPException)
|
|
|
|
|
async def http_exception_handler(request: Request, exc: HTTPException) -> JSONResponse:
|
|
|
|
|
detail = exc.detail
|
|
|
|
|
if isinstance(detail, dict) and "error" in detail and "request_id" in detail:
|
|
|
|
|
return JSONResponse(status_code=exc.status_code, content=detail)
|
|
|
|
|
|
|
|
|
|
request_id = getattr(request.state, "request_id", str(uuid.uuid4()))
|
|
|
|
|
payload = ErrorResponse(
|
|
|
|
|
error=ErrorDetail(
|
|
|
|
|
code="http_error",
|
|
|
|
|
message=str(detail) if detail else "Request failed",
|
|
|
|
|
retryable=False,
|
|
|
|
|
),
|
|
|
|
|
request_id=request_id,
|
|
|
|
|
)
|
|
|
|
|
return JSONResponse(status_code=exc.status_code, content=payload.model_dump(mode="json"))
|
|
|
|
|
|
|
|
|
|
@app.post("/v1/reservations", response_model=ReservationResponse)
|
|
|
|
|
def create_reservation(body: ReservationRequest) -> ReservationResponse:
|
|
|
|
|
resv = db.create_reservation(
|
|
|
|
|
body.system,
|
|
|
|
|
body.reason,
|
|
|
|
|
body.build_id,
|
|
|
|
|
config.capacity.reservation_ttl_seconds,
|
|
|
|
|
)
|
|
|
|
|
return _resv_to_response(resv)
|
|
|
|
|
|
|
|
|
|
@app.get("/v1/reservations/{reservation_id}", response_model=ReservationResponse)
|
|
|
|
|
def get_reservation(reservation_id: str, request: Request) -> ReservationResponse:
|
|
|
|
|
resv = db.get_reservation(reservation_id)
|
|
|
|
|
if resv is None:
|
|
|
|
|
_error_response(request, 404, "not_found", "Reservation not found")
|
|
|
|
|
return _resv_to_response(resv)
|
|
|
|
|
|
|
|
|
|
@app.post("/v1/reservations/{reservation_id}/release", response_model=ReservationResponse)
|
|
|
|
|
def release_reservation(reservation_id: str, request: Request) -> ReservationResponse:
|
|
|
|
|
resv = db.release_reservation(reservation_id)
|
|
|
|
|
if resv is None:
|
|
|
|
|
_error_response(request, 404, "not_found", "Reservation not found")
|
|
|
|
|
return _resv_to_response(resv)
|
|
|
|
|
|
|
|
|
|
@app.get("/v1/reservations", response_model=list[ReservationResponse])
|
|
|
|
|
def list_reservations(
|
|
|
|
|
phase: ReservationPhase | None = None,
|
|
|
|
|
) -> list[ReservationResponse]:
|
|
|
|
|
reservations = db.list_reservations(phase)
|
|
|
|
|
return [_resv_to_response(resv) for resv in reservations]
|
|
|
|
|
|
|
|
|
|
@app.get("/v1/slots", response_model=list[SlotInfo])
|
|
|
|
|
def list_slots() -> list[SlotInfo]:
|
|
|
|
|
slots = db.list_slots()
|
|
|
|
|
return [_slot_to_info(slot) for slot in slots]
|
|
|
|
|
|
|
|
|
|
@app.get("/v1/state/summary", response_model=StateSummary)
|
|
|
|
|
def state_summary() -> StateSummary:
|
|
|
|
|
summary = db.get_state_summary()
|
|
|
|
|
return StateSummary.model_validate(summary)
|
|
|
|
|
|
2026-02-27 16:43:52 +01:00
|
|
|
@app.get("/v1/config/effective", response_model=EffectiveConfigResponse)
|
|
|
|
|
def effective_config() -> EffectiveConfigResponse:
|
|
|
|
|
return EffectiveConfigResponse(
|
|
|
|
|
capacity=CapacityPolicy(
|
|
|
|
|
min_slots=config.capacity.min_slots,
|
|
|
|
|
max_slots=config.capacity.max_slots,
|
|
|
|
|
target_warm_slots=config.capacity.target_warm_slots,
|
|
|
|
|
max_leases_per_slot=config.capacity.max_leases_per_slot,
|
|
|
|
|
idle_scale_down_seconds=config.capacity.idle_scale_down_seconds,
|
|
|
|
|
drain_timeout_seconds=config.capacity.drain_timeout_seconds,
|
|
|
|
|
launch_timeout_seconds=config.capacity.launch_timeout_seconds,
|
|
|
|
|
boot_timeout_seconds=config.capacity.boot_timeout_seconds,
|
|
|
|
|
binding_timeout_seconds=config.capacity.binding_timeout_seconds,
|
|
|
|
|
terminating_timeout_seconds=config.capacity.terminating_timeout_seconds,
|
2026-02-27 18:37:58 +01:00
|
|
|
termination_cooldown_seconds=config.capacity.termination_cooldown_seconds,
|
2026-02-27 16:43:52 +01:00
|
|
|
),
|
|
|
|
|
scheduler=SchedulerPolicy(
|
|
|
|
|
tick_seconds=config.scheduler.tick_seconds,
|
|
|
|
|
reconcile_seconds=config.scheduler.reconcile_seconds,
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
|
2026-02-27 12:46:32 +01:00
|
|
|
@app.post("/v1/hints/capacity")
|
|
|
|
|
def capacity_hint(hint: CapacityHint) -> dict[str, str]:
|
|
|
|
|
log.info(
|
|
|
|
|
"capacity_hint",
|
|
|
|
|
extra={
|
|
|
|
|
"builder": hint.builder,
|
|
|
|
|
"queued": hint.queued,
|
|
|
|
|
"running": hint.running,
|
|
|
|
|
"system": hint.system,
|
|
|
|
|
"timestamp": hint.timestamp.isoformat(),
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
return {"status": "accepted"}
|
|
|
|
|
|
|
|
|
|
@app.get("/health/live", response_model=HealthResponse)
|
|
|
|
|
def health_live() -> HealthResponse:
|
|
|
|
|
return HealthResponse(status="ok")
|
|
|
|
|
|
|
|
|
|
@app.get("/health/ready", response_model=HealthResponse)
|
|
|
|
|
def health_ready() -> HealthResponse:
|
2026-02-27 13:48:52 +01:00
|
|
|
if ready_check is not None and not ready_check():
|
|
|
|
|
return JSONResponse( # type: ignore[return-value]
|
|
|
|
|
status_code=503,
|
|
|
|
|
content=HealthResponse(status="degraded").model_dump(mode="json"),
|
|
|
|
|
)
|
2026-02-27 12:46:32 +01:00
|
|
|
if scheduler_running is not None and not scheduler_running():
|
|
|
|
|
return JSONResponse( # type: ignore[return-value]
|
|
|
|
|
status_code=503,
|
|
|
|
|
content=HealthResponse(status="degraded").model_dump(mode="json"),
|
|
|
|
|
)
|
|
|
|
|
if reconciler_running is not None and not reconciler_running():
|
|
|
|
|
return JSONResponse( # type: ignore[return-value]
|
|
|
|
|
status_code=503,
|
|
|
|
|
content=HealthResponse(status="degraded").model_dump(mode="json"),
|
|
|
|
|
)
|
|
|
|
|
return HealthResponse(status="ok")
|
|
|
|
|
|
|
|
|
|
@app.get("/metrics")
|
|
|
|
|
def metrics_endpoint() -> Response:
|
|
|
|
|
return Response(content=metrics.render(), media_type="text/plain")
|
2026-02-27 11:59:16 +01:00
|
|
|
|
2026-02-27 13:48:52 +01:00
|
|
|
@app.post("/v1/admin/drain")
|
|
|
|
|
def admin_drain(body: SlotAdminRequest, request: Request) -> dict[str, str]:
|
|
|
|
|
slot = db.get_slot(body.slot_id)
|
|
|
|
|
if slot is None:
|
|
|
|
|
_error_response(request, 404, "not_found", "Slot not found")
|
|
|
|
|
state = str(slot["state"])
|
|
|
|
|
if state == SlotState.DRAINING.value or state == SlotState.TERMINATING.value:
|
|
|
|
|
return {"status": "accepted", "slot_id": body.slot_id, "state": state}
|
|
|
|
|
|
|
|
|
|
allowed_states = {
|
|
|
|
|
SlotState.READY.value,
|
|
|
|
|
SlotState.BINDING.value,
|
|
|
|
|
SlotState.BOOTING.value,
|
|
|
|
|
SlotState.LAUNCHING.value,
|
|
|
|
|
}
|
|
|
|
|
if state not in allowed_states:
|
|
|
|
|
_error_response(
|
|
|
|
|
request,
|
|
|
|
|
409,
|
|
|
|
|
"invalid_state",
|
|
|
|
|
f"Cannot drain slot from state {state}",
|
|
|
|
|
)
|
|
|
|
|
db.update_slot_state(body.slot_id, SlotState.DRAINING, interruption_pending=0)
|
|
|
|
|
return {"status": "accepted", "slot_id": body.slot_id, "state": SlotState.DRAINING.value}
|
|
|
|
|
|
|
|
|
|
@app.post("/v1/admin/unquarantine")
|
|
|
|
|
def admin_unquarantine(body: SlotAdminRequest, request: Request) -> dict[str, str]:
|
|
|
|
|
slot = db.get_slot(body.slot_id)
|
|
|
|
|
if slot is None:
|
|
|
|
|
_error_response(request, 404, "not_found", "Slot not found")
|
|
|
|
|
|
|
|
|
|
state = str(slot["state"])
|
|
|
|
|
if state != SlotState.ERROR.value:
|
|
|
|
|
_error_response(
|
|
|
|
|
request,
|
|
|
|
|
409,
|
|
|
|
|
"invalid_state",
|
|
|
|
|
f"Cannot unquarantine slot from state {state}",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
db.update_slot_state(
|
|
|
|
|
body.slot_id,
|
|
|
|
|
SlotState.EMPTY,
|
|
|
|
|
instance_id=None,
|
|
|
|
|
instance_ip=None,
|
|
|
|
|
instance_launch_time=None,
|
|
|
|
|
lease_count=0,
|
|
|
|
|
cooldown_until=None,
|
|
|
|
|
interruption_pending=0,
|
|
|
|
|
)
|
|
|
|
|
return {"status": "accepted", "slot_id": body.slot_id, "state": SlotState.EMPTY.value}
|
|
|
|
|
|
|
|
|
|
@app.post("/v1/admin/reconcile-now")
|
|
|
|
|
def admin_reconcile_now(request: Request) -> dict[str, object]:
|
|
|
|
|
if reconcile_now is None:
|
|
|
|
|
_error_response(
|
|
|
|
|
request,
|
|
|
|
|
503,
|
|
|
|
|
"not_configured",
|
|
|
|
|
"Reconcile trigger not configured",
|
|
|
|
|
retryable=True,
|
|
|
|
|
)
|
|
|
|
|
try:
|
|
|
|
|
result = reconcile_now()
|
|
|
|
|
except Exception:
|
|
|
|
|
log.exception("admin_reconcile_now_failed")
|
|
|
|
|
_error_response(
|
|
|
|
|
request,
|
|
|
|
|
500,
|
|
|
|
|
"reconcile_failed",
|
|
|
|
|
"Reconcile tick failed",
|
|
|
|
|
retryable=True,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
payload: dict[str, object] = {"status": "accepted"}
|
|
|
|
|
if isinstance(result, dict):
|
|
|
|
|
payload.update(result)
|
|
|
|
|
return payload
|
|
|
|
|
|
2026-02-27 11:59:16 +01:00
|
|
|
return app
|