nix-builder-autoscaler/agent/nix_builder_autoscaler/scheduler.py

266 lines
8.7 KiB
Python

"""Scheduler — stateless scheduling tick for the autoscaler.
Each tick: expire reservations, handle interruptions, assign pending
reservations to ready slots, launch new capacity, maintain warm pool
and min-slots, check idle scale-down, and emit metrics.
"""
from __future__ import annotations
import logging
import time
from datetime import datetime
from typing import TYPE_CHECKING
from .bootstrap.userdata import render_userdata
from .models import SlotState
from .runtime.base import RuntimeError as RuntimeAdapterError
if TYPE_CHECKING:
from .config import AppConfig
from .metrics import MetricsRegistry
from .providers.clock import Clock
from .runtime.base import RuntimeAdapter
from .state_db import StateDB
log = logging.getLogger(__name__)
def scheduling_tick(
db: StateDB,
runtime: RuntimeAdapter,
config: AppConfig,
clock: Clock,
metrics: MetricsRegistry,
) -> None:
"""Execute one scheduling tick.
All dependencies are passed as arguments — no global state.
"""
t0 = time.monotonic()
# 1. Expire old reservations
expired = db.expire_reservations(clock.now())
if expired:
log.info("expired_reservations", extra={"count": len(expired), "ids": expired})
# 2. Handle interruption-pending slots
_handle_interruptions(db)
# 3. Assign pending reservations to ready slots
_assign_reservations(db, config)
# 4. Launch new capacity for unmet demand
_launch_for_unmet_demand(db, runtime, config, metrics)
# 5. Ensure minimum slots and warm pool
_ensure_min_and_warm(db, runtime, config, metrics)
# 6. Check scale-down for idle slots
_check_idle_scale_down(db, config, clock)
# 7. Emit metrics
tick_duration = time.monotonic() - t0
_update_metrics(db, metrics, tick_duration)
def _handle_interruptions(db: StateDB) -> None:
"""Move ready slots with interruption_pending to draining."""
ready_slots = db.list_slots(SlotState.READY)
for slot in ready_slots:
if slot["interruption_pending"]:
db.update_slot_state(slot["slot_id"], SlotState.DRAINING, interruption_pending=0)
log.info(
"interruption_drain",
extra={"slot_id": slot["slot_id"]},
)
def _assign_reservations(db: StateDB, config: AppConfig) -> None:
"""Assign pending reservations to ready slots with capacity."""
from .models import ReservationPhase
pending = db.list_reservations(ReservationPhase.PENDING)
if not pending:
return
ready_slots = db.list_slots(SlotState.READY)
if not ready_slots:
return
max_leases = config.capacity.max_leases_per_slot
# Track in-memory capacity to prevent double-assignment within the same tick
capacity_map: dict[str, int] = {s["slot_id"]: s["lease_count"] for s in ready_slots}
for resv in pending:
system = resv["system"]
slot = _find_assignable_slot(ready_slots, system, max_leases, capacity_map)
if slot is None:
continue
db.assign_reservation(resv["reservation_id"], slot["slot_id"], slot["instance_id"])
capacity_map[slot["slot_id"]] += 1
log.info(
"reservation_assigned",
extra={
"reservation_id": resv["reservation_id"],
"slot_id": slot["slot_id"],
},
)
def _find_assignable_slot(
ready_slots: list[dict],
system: str,
max_leases: int,
capacity_map: dict[str, int],
) -> dict | None:
"""Return first ready slot for system with remaining capacity, or None."""
for slot in ready_slots:
if slot["system"] != system:
continue
sid = slot["slot_id"]
current: int = capacity_map[sid] if sid in capacity_map else slot["lease_count"]
if current < max_leases:
return slot
return None
def _count_active_slots(db: StateDB) -> int:
"""Count slots NOT in empty or error states."""
all_slots = db.list_slots()
return sum(
1 for s in all_slots if s["state"] not in (SlotState.EMPTY.value, SlotState.ERROR.value)
)
def _launch_for_unmet_demand(
db: StateDB,
runtime: RuntimeAdapter,
config: AppConfig,
metrics: MetricsRegistry,
) -> None:
"""Launch new capacity for pending reservations that couldn't be assigned."""
from .models import ReservationPhase
pending = db.list_reservations(ReservationPhase.PENDING)
if not pending:
return
active = _count_active_slots(db)
if active >= config.capacity.max_slots:
return
empty_slots = db.list_slots(SlotState.EMPTY)
if not empty_slots:
return
for launched, slot in enumerate(empty_slots):
if launched >= len(pending):
break
if active + launched >= config.capacity.max_slots:
break
_launch_slot(db, runtime, config, metrics, slot)
def _ensure_min_and_warm(
db: StateDB,
runtime: RuntimeAdapter,
config: AppConfig,
metrics: MetricsRegistry,
) -> None:
"""Ensure minimum slots and warm pool targets are met."""
active = _count_active_slots(db)
# Ensure min_slots
if active < config.capacity.min_slots:
needed = config.capacity.min_slots - active
empty_slots = db.list_slots(SlotState.EMPTY)
launched = 0
for slot in empty_slots:
if launched >= needed:
break
if active + launched >= config.capacity.max_slots:
break
_launch_slot(db, runtime, config, metrics, slot)
launched += 1
active += launched
# Ensure warm pool
if config.capacity.target_warm_slots > 0:
ready_idle = sum(1 for s in db.list_slots(SlotState.READY) if s["lease_count"] == 0)
pending_warm = (
len(db.list_slots(SlotState.LAUNCHING))
+ len(db.list_slots(SlotState.BOOTING))
+ len(db.list_slots(SlotState.BINDING))
)
warm_total = ready_idle + pending_warm
if warm_total < config.capacity.target_warm_slots:
needed = config.capacity.target_warm_slots - warm_total
empty_slots = db.list_slots(SlotState.EMPTY)
launched = 0
for slot in empty_slots:
if launched >= needed:
break
if active + launched >= config.capacity.max_slots:
break
_launch_slot(db, runtime, config, metrics, slot)
launched += 1
def _launch_slot(
db: StateDB,
runtime: RuntimeAdapter,
config: AppConfig,
metrics: MetricsRegistry,
slot: dict,
) -> None:
"""Launch a single slot. Transition to LAUNCHING on success, ERROR on failure."""
slot_id = slot["slot_id"]
user_data = render_userdata(slot_id)
try:
instance_id = runtime.launch_spot(slot_id, user_data)
metrics.counter("autoscaler_ec2_launch_total", {"result": "success"}, 1.0)
db.update_slot_state(slot_id, SlotState.LAUNCHING, instance_id=instance_id)
log.info("slot_launched", extra={"slot_id": slot_id, "instance_id": instance_id})
except RuntimeAdapterError as exc:
metrics.counter("autoscaler_ec2_launch_total", {"result": exc.category}, 1.0)
db.update_slot_state(slot_id, SlotState.ERROR)
log.warning(
"slot_launch_failed",
extra={"slot_id": slot_id, "error": str(exc), "category": exc.category},
)
def _check_idle_scale_down(db: StateDB, config: AppConfig, clock: Clock) -> None:
"""Move idle ready slots to draining when idle threshold exceeded."""
ready_slots = db.list_slots(SlotState.READY)
now = clock.now()
active = _count_active_slots(db)
for slot in ready_slots:
if slot["lease_count"] != 0:
continue
last_change = datetime.fromisoformat(slot["last_state_change"])
idle_seconds = (now - last_change).total_seconds()
if idle_seconds > config.capacity.idle_scale_down_seconds:
if active <= config.capacity.min_slots:
continue
db.update_slot_state(slot["slot_id"], SlotState.DRAINING)
active -= 1
log.info(
"idle_scale_down",
extra={"slot_id": slot["slot_id"], "idle_seconds": idle_seconds},
)
def _update_metrics(db: StateDB, metrics: MetricsRegistry, tick_duration: float) -> None:
"""Refresh all gauge/counter/histogram values."""
summary = db.get_state_summary()
for state, count in summary["slots"].items():
metrics.gauge("autoscaler_slots_total", {"state": state}, float(count))
for phase, count in summary["reservations"].items():
metrics.gauge("autoscaler_reservations_total", {"phase": phase}, float(count))
metrics.histogram_observe("autoscaler_scheduler_tick_duration_seconds", {}, tick_duration)