199 lines
6.8 KiB
Python
199 lines
6.8 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
from typing import Any, cast
|
|
|
|
from buildbot.plugins import util
|
|
from buildbot.process import buildstep
|
|
from twisted.internet import defer, reactor
|
|
from twisted.internet.interfaces import IReactorTime
|
|
from twisted.internet.task import deferLater
|
|
from twisted.internet.threads import deferToThread
|
|
|
|
from .client import DaemonClient, DaemonError, RetryPolicy
|
|
from .telemetry import phase_message
|
|
|
|
|
|
class CapacityGateStep(buildstep.BuildStep):
|
|
renderables = ()
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
daemon_socket: str,
|
|
system_property: str,
|
|
default_system: str,
|
|
reserve_timeout_seconds: int,
|
|
poll_interval_seconds: float,
|
|
retry_max_attempts: int,
|
|
retry_base_seconds: float,
|
|
retry_max_seconds: float = 5.0,
|
|
**kwargs: object,
|
|
) -> None:
|
|
super().__init__(**kwargs)
|
|
self._system_property = system_property
|
|
self._default_system = default_system
|
|
self._reserve_timeout_seconds = reserve_timeout_seconds
|
|
self._poll_interval_seconds = poll_interval_seconds
|
|
self._client = DaemonClient(
|
|
socket_path=daemon_socket,
|
|
retry_policy=RetryPolicy(
|
|
max_attempts=retry_max_attempts,
|
|
base_seconds=retry_base_seconds,
|
|
max_seconds=retry_max_seconds,
|
|
),
|
|
)
|
|
|
|
def _determine_system(self) -> str:
|
|
if self.build is None:
|
|
return self._default_system
|
|
props = self.build.getProperties()
|
|
value = props.getProperty(self._system_property)
|
|
if value:
|
|
return str(value)
|
|
return self._default_system
|
|
|
|
def run(self) -> defer.Deferred[int]:
|
|
return defer.ensureDeferred(self._run())
|
|
|
|
async def _run(self) -> int:
|
|
system = self._determine_system()
|
|
start = time.monotonic()
|
|
|
|
try:
|
|
reserve = await deferToThread(
|
|
self._client.post_json,
|
|
"/v1/reservations",
|
|
{
|
|
"system": system,
|
|
"reason": "buildbot-nix-build",
|
|
"build_id": getattr(self.build, "buildid", None),
|
|
},
|
|
10.0,
|
|
{429, 500, 502, 503, 504},
|
|
)
|
|
except Exception as exc: # noqa: BLE001
|
|
await self._add_log("autoscale_gate_reserve_error", f"reserve failed: {exc}")
|
|
self.descriptionDone = ["capacity reservation failed after retries"]
|
|
return util.FAILURE
|
|
|
|
reservation_id = str(reserve["reservation_id"])
|
|
self._set_property("autoscale_reservation_id", reservation_id)
|
|
|
|
last_phase: str | None = "pending"
|
|
last_reason: str | None = None
|
|
|
|
while True:
|
|
elapsed = time.monotonic() - start
|
|
self.descriptionSuffix = [f"phase={last_phase} elapsed={int(elapsed)}s"]
|
|
if elapsed > self._reserve_timeout_seconds:
|
|
await self._add_log(
|
|
"autoscale_gate_timeout",
|
|
f"capacity wait timeout {phase_message(last_phase, last_reason)}",
|
|
)
|
|
self.descriptionDone = ["capacity wait timeout"]
|
|
return util.FAILURE
|
|
|
|
try:
|
|
status = await deferToThread(
|
|
self._client.get_json,
|
|
f"/v1/reservations/{reservation_id}",
|
|
10.0,
|
|
{429, 500, 502, 503, 504},
|
|
)
|
|
except DaemonError as exc:
|
|
last_reason = str(exc)
|
|
await deferLater(
|
|
cast(IReactorTime, reactor),
|
|
self._poll_interval_seconds,
|
|
lambda: None,
|
|
)
|
|
continue
|
|
|
|
phase = str(status.get("phase", "pending"))
|
|
reason = status.get("reason")
|
|
last_phase = phase
|
|
last_reason = str(reason) if reason is not None else None
|
|
|
|
if phase == "ready":
|
|
slot = str(status["slot"])
|
|
instance_id = str(status["instance_id"])
|
|
waited = int(time.monotonic() - start)
|
|
self._set_property("autoscale_slot", slot)
|
|
self._set_property("autoscale_instance_id", instance_id)
|
|
self._set_property("autoscale_wait_seconds", waited)
|
|
self.descriptionDone = [f"capacity ready in {waited}s"]
|
|
self.descriptionSuffix = [f"phase={phase}"]
|
|
return util.SUCCESS
|
|
|
|
if phase in {"failed", "expired", "released"}:
|
|
await self._add_log(
|
|
"autoscale_gate_failure",
|
|
f"capacity gate terminal {phase_message(last_phase, last_reason)}",
|
|
)
|
|
self.descriptionDone = [f"autoscaler reservation {phase}"]
|
|
self.descriptionSuffix = [f"phase={phase}"]
|
|
return util.FAILURE
|
|
|
|
await deferLater(
|
|
cast(IReactorTime, reactor),
|
|
self._poll_interval_seconds,
|
|
lambda: None,
|
|
)
|
|
|
|
def _set_property(self, name: str, value: object) -> None:
|
|
if self.build is None:
|
|
return
|
|
self.build.setProperty(name, value, "autoscale")
|
|
|
|
async def _add_log(self, name: str, message: str) -> None:
|
|
log = cast(Any, await self.addLog(name))
|
|
log.addStderr(f"{message}\n")
|
|
|
|
|
|
class CapacityReleaseStep(buildstep.BuildStep):
|
|
def __init__(
|
|
self,
|
|
*,
|
|
daemon_socket: str,
|
|
retry_max_attempts: int,
|
|
retry_base_seconds: float,
|
|
retry_max_seconds: float = 5.0,
|
|
**kwargs: object,
|
|
) -> None:
|
|
super().__init__(**kwargs)
|
|
self._client = DaemonClient(
|
|
socket_path=daemon_socket,
|
|
retry_policy=RetryPolicy(
|
|
max_attempts=retry_max_attempts,
|
|
base_seconds=retry_base_seconds,
|
|
max_seconds=retry_max_seconds,
|
|
),
|
|
)
|
|
|
|
def run(self) -> defer.Deferred[int]:
|
|
return defer.ensureDeferred(self._run())
|
|
|
|
async def _run(self) -> int:
|
|
if self.build is None:
|
|
return util.SKIPPED
|
|
|
|
reservation_id = self.build.getProperty("autoscale_reservation_id")
|
|
if not reservation_id:
|
|
return util.SKIPPED
|
|
|
|
try:
|
|
await deferToThread(
|
|
self._client.post_json,
|
|
f"/v1/reservations/{reservation_id}/release",
|
|
{},
|
|
10.0,
|
|
{429, 500, 502, 503, 504},
|
|
)
|
|
except Exception as exc: # noqa: BLE001
|
|
log = cast(Any, await self.addLog("autoscale_release_error"))
|
|
log.addStderr(f"release failed: {exc}\n")
|
|
return util.WARNINGS
|
|
|
|
self.descriptionDone = ["autoscaler reservation released"]
|
|
return util.SUCCESS
|