from __future__ import annotations import logging from typing import Any from buildbot.configurators import ConfiguratorBase from .settings import AutoscaleSettings from .steps import CapacityGateStep, CapacityReleaseStep log = logging.getLogger(__name__) class AutoscaleConfigurator(ConfiguratorBase): def __init__(self, settings: AutoscaleSettings) -> None: super().__init__() self.settings = settings def configure(self, config_dict: dict[str, Any]) -> None: builders = config_dict.get("builders", []) patched: list[str] = [] for builder in builders: name = getattr(builder, "name", "") if not isinstance(name, str) or not name.endswith("/nix-build"): continue factory = getattr(builder, "factory", None) steps = getattr(factory, "steps", None) if factory is None or not isinstance(steps, list): log.warning("Skipping builder with unrecognized factory shape: %s", name) continue gate = CapacityGateStep( name="Ensure remote builder capacity", daemon_socket=self.settings.daemon_socket, system_property=self.settings.system_property, default_system=self.settings.default_system, reserve_timeout_seconds=self.settings.reserve_timeout_seconds, poll_interval_seconds=self.settings.poll_interval_seconds, retry_max_attempts=self.settings.retry_max_attempts, retry_base_seconds=self.settings.retry_base_seconds, retry_max_seconds=self.settings.retry_max_seconds, haltOnFailure=True, flunkOnFailure=True, warnOnFailure=False, ) steps.insert(0, gate) if self.settings.release_on_finish: steps.append( CapacityReleaseStep( name="Release autoscaler reservation", daemon_socket=self.settings.daemon_socket, retry_max_attempts=self.settings.retry_max_attempts, retry_base_seconds=self.settings.retry_base_seconds, retry_max_seconds=self.settings.retry_max_seconds, alwaysRun=True, flunkOnFailure=False, warnOnFailure=True, ) ) patched.append(name) log.info("AutoscaleConfigurator patched builders: %s", patched)