nix-builder-autoscaler/buildbot-ext/buildbot_autoscale_ext/configurator.py

67 lines
2.5 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
import logging
from typing import Any
from buildbot.configurators import ConfiguratorBase
from .settings import AutoscaleSettings
from .steps import CapacityGateStep, CapacityReleaseStep
log = logging.getLogger(__name__)
class AutoscaleConfigurator(ConfiguratorBase):
def __init__(self, settings: AutoscaleSettings) -> None:
super().__init__()
self.settings = settings
def configure(self, config_dict: dict[str, Any]) -> None:
builders = config_dict.get("builders", [])
patched: list[str] = []
for builder in builders:
name = getattr(builder, "name", "")
if not isinstance(name, str) or not name.endswith("/nix-build"):
continue
factory = getattr(builder, "factory", None)
steps = getattr(factory, "steps", None)
if factory is None or not isinstance(steps, list):
log.warning("Skipping builder with unrecognized factory shape: %s", name)
continue
gate = CapacityGateStep(
name="Ensure remote builder capacity",
daemon_socket=self.settings.daemon_socket,
system_property=self.settings.system_property,
default_system=self.settings.default_system,
reserve_timeout_seconds=self.settings.reserve_timeout_seconds,
poll_interval_seconds=self.settings.poll_interval_seconds,
retry_max_attempts=self.settings.retry_max_attempts,
retry_base_seconds=self.settings.retry_base_seconds,
retry_max_seconds=self.settings.retry_max_seconds,
haltOnFailure=True,
flunkOnFailure=True,
warnOnFailure=False,
)
steps.insert(0, gate)
if self.settings.release_on_finish:
steps.append(
CapacityReleaseStep(
name="Release autoscaler reservation",
daemon_socket=self.settings.daemon_socket,
retry_max_attempts=self.settings.retry_max_attempts,
retry_base_seconds=self.settings.retry_base_seconds,
retry_max_seconds=self.settings.retry_max_seconds,
alwaysRun=True,
flunkOnFailure=False,
warnOnFailure=True,
)
)
patched.append(name)
log.info("AutoscaleConfigurator patched builders: %s", patched)