nix-builder-autoscaler/buildbot-ext/buildbot_autoscale_ext/steps.py

200 lines
6.8 KiB
Python
Raw Normal View History

from __future__ import annotations
import time
from typing import Any, cast
from buildbot.plugins import util
from buildbot.process import buildstep
from twisted.internet import defer, reactor
from twisted.internet.interfaces import IReactorTime
from twisted.internet.task import deferLater
from twisted.internet.threads import deferToThread
from .client import DaemonClient, DaemonError, RetryPolicy
from .telemetry import phase_message
class CapacityGateStep(buildstep.BuildStep):
renderables = ()
def __init__(
self,
*,
daemon_socket: str,
system_property: str,
default_system: str,
reserve_timeout_seconds: int,
poll_interval_seconds: float,
retry_max_attempts: int,
retry_base_seconds: float,
retry_max_seconds: float = 5.0,
**kwargs: object,
) -> None:
super().__init__(**kwargs)
self._system_property = system_property
self._default_system = default_system
self._reserve_timeout_seconds = reserve_timeout_seconds
self._poll_interval_seconds = poll_interval_seconds
self._client = DaemonClient(
socket_path=daemon_socket,
retry_policy=RetryPolicy(
max_attempts=retry_max_attempts,
base_seconds=retry_base_seconds,
max_seconds=retry_max_seconds,
),
)
def _determine_system(self) -> str:
if self.build is None:
return self._default_system
props = self.build.getProperties()
value = props.getProperty(self._system_property)
if value:
return str(value)
return self._default_system
def run(self) -> defer.Deferred[int]:
return defer.ensureDeferred(self._run())
async def _run(self) -> int:
system = self._determine_system()
start = time.monotonic()
try:
reserve = await deferToThread(
self._client.post_json,
"/v1/reservations",
{
"system": system,
"reason": "buildbot-nix-build",
"build_id": getattr(self.build, "buildid", None),
},
10.0,
{429, 500, 502, 503, 504},
)
except Exception as exc: # noqa: BLE001
await self._add_log("autoscale_gate_reserve_error", f"reserve failed: {exc}")
self.descriptionDone = ["capacity reservation failed after retries"]
return util.FAILURE
reservation_id = str(reserve["reservation_id"])
self._set_property("autoscale_reservation_id", reservation_id)
last_phase: str | None = "pending"
last_reason: str | None = None
while True:
elapsed = time.monotonic() - start
self.descriptionSuffix = [f"phase={last_phase} elapsed={int(elapsed)}s"]
if elapsed > self._reserve_timeout_seconds:
await self._add_log(
"autoscale_gate_timeout",
f"capacity wait timeout {phase_message(last_phase, last_reason)}",
)
self.descriptionDone = ["capacity wait timeout"]
return util.FAILURE
try:
status = await deferToThread(
self._client.get_json,
f"/v1/reservations/{reservation_id}",
10.0,
{429, 500, 502, 503, 504},
)
except DaemonError as exc:
last_reason = str(exc)
await deferLater(
cast(IReactorTime, reactor),
self._poll_interval_seconds,
lambda: None,
)
continue
phase = str(status.get("phase", "pending"))
reason = status.get("reason")
last_phase = phase
last_reason = str(reason) if reason is not None else None
if phase == "ready":
slot = str(status["slot"])
instance_id = str(status["instance_id"])
waited = int(time.monotonic() - start)
self._set_property("autoscale_slot", slot)
self._set_property("autoscale_instance_id", instance_id)
self._set_property("autoscale_wait_seconds", waited)
self.descriptionDone = [f"capacity ready in {waited}s"]
self.descriptionSuffix = [f"phase={phase}"]
return util.SUCCESS
if phase in {"failed", "expired", "released"}:
await self._add_log(
"autoscale_gate_failure",
f"capacity gate terminal {phase_message(last_phase, last_reason)}",
)
self.descriptionDone = [f"autoscaler reservation {phase}"]
self.descriptionSuffix = [f"phase={phase}"]
return util.FAILURE
await deferLater(
cast(IReactorTime, reactor),
self._poll_interval_seconds,
lambda: None,
)
def _set_property(self, name: str, value: object) -> None:
if self.build is None:
return
self.build.setProperty(name, value, "autoscale")
async def _add_log(self, name: str, message: str) -> None:
log = cast(Any, await self.addLog(name))
log.addStderr(f"{message}\n")
class CapacityReleaseStep(buildstep.BuildStep):
def __init__(
self,
*,
daemon_socket: str,
retry_max_attempts: int,
retry_base_seconds: float,
retry_max_seconds: float = 5.0,
**kwargs: object,
) -> None:
super().__init__(**kwargs)
self._client = DaemonClient(
socket_path=daemon_socket,
retry_policy=RetryPolicy(
max_attempts=retry_max_attempts,
base_seconds=retry_base_seconds,
max_seconds=retry_max_seconds,
),
)
def run(self) -> defer.Deferred[int]:
return defer.ensureDeferred(self._run())
async def _run(self) -> int:
if self.build is None:
return util.SKIPPED
reservation_id = self.build.getProperty("autoscale_reservation_id")
if not reservation_id:
return util.SKIPPED
try:
await deferToThread(
self._client.post_json,
f"/v1/reservations/{reservation_id}/release",
{},
10.0,
{429, 500, 502, 503, 504},
)
except Exception as exc: # noqa: BLE001
log = cast(Any, await self.addLog("autoscale_release_error"))
log.addStderr(f"release failed: {exc}\n")
return util.WARNINGS
self.descriptionDone = ["autoscaler reservation released"]
return util.SUCCESS