"""Scheduler — stateless scheduling tick for the autoscaler. Each tick: expire reservations, handle interruptions, assign pending reservations to ready slots, launch new capacity, maintain warm pool and min-slots, check idle scale-down, and emit metrics. """ from __future__ import annotations import logging import time from datetime import datetime from typing import TYPE_CHECKING from .bootstrap.userdata import render_userdata from .models import SlotState from .runtime.base import RuntimeError as RuntimeAdapterError if TYPE_CHECKING: from .config import AppConfig from .metrics import MetricsRegistry from .providers.clock import Clock from .runtime.base import RuntimeAdapter from .state_db import StateDB log = logging.getLogger(__name__) def scheduling_tick( db: StateDB, runtime: RuntimeAdapter, config: AppConfig, clock: Clock, metrics: MetricsRegistry, ) -> None: """Execute one scheduling tick. All dependencies are passed as arguments — no global state. """ t0 = time.monotonic() # 1. Expire old reservations expired = db.expire_reservations(clock.now()) if expired: log.info("expired_reservations", extra={"count": len(expired), "ids": expired}) # 2. Handle interruption-pending slots _handle_interruptions(db) # 3. Assign pending reservations to ready slots _assign_reservations(db, config) # 4. Launch new capacity for unmet demand _launch_for_unmet_demand(db, runtime, config, metrics) # 5. Ensure minimum slots and warm pool _ensure_min_and_warm(db, runtime, config, metrics) # 6. Check scale-down for idle slots _check_idle_scale_down(db, config, clock) # 7. Emit metrics tick_duration = time.monotonic() - t0 _update_metrics(db, metrics, tick_duration) def _handle_interruptions(db: StateDB) -> None: """Move ready slots with interruption_pending to draining.""" ready_slots = db.list_slots(SlotState.READY) for slot in ready_slots: if slot["interruption_pending"]: db.update_slot_state(slot["slot_id"], SlotState.DRAINING, interruption_pending=0) log.info( "interruption_drain", extra={"slot_id": slot["slot_id"]}, ) def _assign_reservations(db: StateDB, config: AppConfig) -> None: """Assign pending reservations to ready slots with capacity.""" from .models import ReservationPhase pending = db.list_reservations(ReservationPhase.PENDING) if not pending: return ready_slots = db.list_slots(SlotState.READY) if not ready_slots: return max_leases = config.capacity.max_leases_per_slot # Track in-memory capacity to prevent double-assignment within the same tick capacity_map: dict[str, int] = {s["slot_id"]: s["lease_count"] for s in ready_slots} for resv in pending: system = resv["system"] slot = _find_assignable_slot(ready_slots, system, max_leases, capacity_map) if slot is None: continue db.assign_reservation(resv["reservation_id"], slot["slot_id"], slot["instance_id"]) capacity_map[slot["slot_id"]] += 1 log.info( "reservation_assigned", extra={ "reservation_id": resv["reservation_id"], "slot_id": slot["slot_id"], }, ) def _find_assignable_slot( ready_slots: list[dict], system: str, max_leases: int, capacity_map: dict[str, int], ) -> dict | None: """Return first ready slot for system with remaining capacity, or None.""" for slot in ready_slots: if slot["system"] != system: continue sid = slot["slot_id"] current: int = capacity_map[sid] if sid in capacity_map else slot["lease_count"] if current < max_leases: return slot return None def _count_active_slots(db: StateDB) -> int: """Count slots NOT in empty or error states.""" all_slots = db.list_slots() return sum( 1 for s in all_slots if s["state"] not in (SlotState.EMPTY.value, SlotState.ERROR.value) ) def _launch_for_unmet_demand( db: StateDB, runtime: RuntimeAdapter, config: AppConfig, metrics: MetricsRegistry, ) -> None: """Launch new capacity for pending reservations that couldn't be assigned.""" from .models import ReservationPhase pending = db.list_reservations(ReservationPhase.PENDING) if not pending: return active = _count_active_slots(db) if active >= config.capacity.max_slots: return empty_slots = db.list_slots(SlotState.EMPTY) if not empty_slots: return for launched, slot in enumerate(empty_slots): if launched >= len(pending): break if active + launched >= config.capacity.max_slots: break _launch_slot(db, runtime, config, metrics, slot) def _ensure_min_and_warm( db: StateDB, runtime: RuntimeAdapter, config: AppConfig, metrics: MetricsRegistry, ) -> None: """Ensure minimum slots and warm pool targets are met.""" active = _count_active_slots(db) # Ensure min_slots if active < config.capacity.min_slots: needed = config.capacity.min_slots - active empty_slots = db.list_slots(SlotState.EMPTY) launched = 0 for slot in empty_slots: if launched >= needed: break if active + launched >= config.capacity.max_slots: break _launch_slot(db, runtime, config, metrics, slot) launched += 1 active += launched # Ensure warm pool if config.capacity.target_warm_slots > 0: ready_idle = sum(1 for s in db.list_slots(SlotState.READY) if s["lease_count"] == 0) pending_warm = ( len(db.list_slots(SlotState.LAUNCHING)) + len(db.list_slots(SlotState.BOOTING)) + len(db.list_slots(SlotState.BINDING)) ) warm_total = ready_idle + pending_warm if warm_total < config.capacity.target_warm_slots: needed = config.capacity.target_warm_slots - warm_total empty_slots = db.list_slots(SlotState.EMPTY) launched = 0 for slot in empty_slots: if launched >= needed: break if active + launched >= config.capacity.max_slots: break _launch_slot(db, runtime, config, metrics, slot) launched += 1 def _launch_slot( db: StateDB, runtime: RuntimeAdapter, config: AppConfig, metrics: MetricsRegistry, slot: dict, ) -> None: """Launch a single slot. Transition to LAUNCHING on success, ERROR on failure.""" slot_id = slot["slot_id"] user_data = render_userdata(slot_id, config.aws.region) metrics.counter("autoscaler_ec2_launch_total", {}, 1.0) try: instance_id = runtime.launch_spot(slot_id, user_data) db.update_slot_state(slot_id, SlotState.LAUNCHING, instance_id=instance_id) log.info("slot_launched", extra={"slot_id": slot_id, "instance_id": instance_id}) except RuntimeAdapterError as exc: db.update_slot_state(slot_id, SlotState.ERROR) log.warning( "slot_launch_failed", extra={"slot_id": slot_id, "error": str(exc), "category": exc.category}, ) def _check_idle_scale_down(db: StateDB, config: AppConfig, clock: Clock) -> None: """Move idle ready slots to draining when idle threshold exceeded.""" ready_slots = db.list_slots(SlotState.READY) now = clock.now() active = _count_active_slots(db) for slot in ready_slots: if slot["lease_count"] != 0: continue last_change = datetime.fromisoformat(slot["last_state_change"]) idle_seconds = (now - last_change).total_seconds() if idle_seconds > config.capacity.idle_scale_down_seconds: if active <= config.capacity.min_slots: continue db.update_slot_state(slot["slot_id"], SlotState.DRAINING) active -= 1 log.info( "idle_scale_down", extra={"slot_id": slot["slot_id"], "idle_seconds": idle_seconds}, ) def _update_metrics(db: StateDB, metrics: MetricsRegistry, tick_duration: float) -> None: """Refresh all gauge/counter/histogram values.""" summary = db.get_state_summary() for state, count in summary["slots"].items(): if state == "total": continue metrics.gauge("autoscaler_slots", {"state": state}, float(count)) for phase, count in summary["reservations"].items(): metrics.gauge("autoscaler_reservations", {"phase": phase}, float(count)) metrics.histogram_observe("autoscaler_scheduler_tick_seconds", {}, tick_duration)