from __future__ import annotations import json import os import socketserver import tempfile import threading from collections.abc import Callable from contextlib import suppress from http import HTTPStatus from http.server import BaseHTTPRequestHandler from pathlib import Path from typing import Any import pytest from buildbot.plugins import util from twisted.internet import defer from buildbot_autoscale_ext.steps import CapacityGateStep, CapacityReleaseStep class FakeProperties: def __init__(self, data: dict[str, Any]) -> None: self._data = data def getProperty(self, name: str) -> Any: return self._data.get(name) class FakeBuild: def __init__(self) -> None: self.buildid = 42 self._props: dict[str, Any] = {} def getProperties(self) -> FakeProperties: return FakeProperties(self._props) def setProperty(self, key: str, value: Any, _source: str) -> None: self._props[key] = value def getProperty(self, key: str) -> Any: return self._props.get(key) class FakeLog: def __init__(self) -> None: self.stderr: list[str] = [] def addStderr(self, text: str) -> None: self.stderr.append(text) class _Handler(BaseHTTPRequestHandler): server: _UnixHTTPServer def do_GET(self) -> None: # noqa: N802 self.server.get_count += 1 status, body = self.server.on_get(self.path, self.server.get_count) self._send(status, body) def do_POST(self) -> None: # noqa: N802 self.server.post_count += 1 size = int(self.headers.get("Content-Length", "0")) raw = self.rfile.read(size) if size else b"{}" payload = json.loads(raw.decode("utf-8")) status, body = self.server.on_post(self.path, payload, self.server.post_count) self._send(status, body) def _send(self, status: int, body: dict[str, Any]) -> None: data = json.dumps(body).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def log_message(self, format: str, *args: object) -> None: del format, args class _UnixHTTPServer(socketserver.UnixStreamServer): def __init__( self, socket_path: str, *, on_get: Callable[[str, int], tuple[int, dict[str, Any]]], on_post: Callable[[str, dict[str, Any], int], tuple[int, dict[str, Any]]], ) -> None: self.on_get = on_get self.on_post = on_post self.get_count = 0 self.post_count = 0 super().__init__(socket_path, _Handler) class FakeDaemon: def __init__( self, socket_path: str, *, on_get: Callable[[str, int], tuple[int, dict[str, Any]]], on_post: Callable[[str, dict[str, Any], int], tuple[int, dict[str, Any]]], ) -> None: self._socket_path = socket_path self._server = _UnixHTTPServer(socket_path, on_get=on_get, on_post=on_post) self._thread = threading.Thread(target=self._server.serve_forever, daemon=True) def __enter__(self) -> FakeDaemon: self._thread.start() return self def __exit__(self, exc_type: object, exc: object, tb: object) -> None: del exc_type, exc, tb self._server.shutdown() self._server.server_close() with suppress(FileNotFoundError): os.unlink(self._socket_path) @pytest.fixture def socket_path() -> str: with tempfile.TemporaryDirectory() as tmp: yield str(Path(tmp) / "daemon.sock") def _attach_build(step: Any, build: FakeBuild) -> None: object.__setattr__(build, "master", None) object.__setattr__(step, "build", build) object.__setattr__(step, "master", None) async def _add_log(_name: str) -> FakeLog: return FakeLog() object.__setattr__(step, "addLog", _add_log) @pytest.fixture(autouse=True) def _patch_twisted_waits(monkeypatch: pytest.MonkeyPatch) -> None: from buildbot.config import errors as config_errors from buildbot.process import buildstep as buildstep_module import buildbot_autoscale_ext.steps as steps_mod def _defer_to_thread(func: Any, *args: object, **kwargs: object) -> defer.Deferred[object]: try: return defer.succeed(func(*args, **kwargs)) except Exception as exc: # noqa: BLE001 return defer.fail(exc) def _defer_later( _clock: object, _seconds: float, callback: Any, *args: object, **kwargs: object, ) -> defer.Deferred[object]: return defer.succeed(callback(*args, **kwargs)) monkeypatch.setattr(steps_mod, "deferToThread", _defer_to_thread) monkeypatch.setattr(steps_mod, "deferLater", _defer_later) monkeypatch.setattr(config_errors, "_errors", None, raising=False) monkeypatch.setattr(buildstep_module.config, "error", lambda *_a, **_k: None) def _run_step(step: Any) -> int: deferred = step.run() out: list[int] = [] failures: list[defer.Failure] = [] deferred.addCallbacks(lambda value: out.append(value), lambda err: failures.append(err)) if failures: failures[0].raiseException() return out[0] def test_gate_success_pending_then_ready(socket_path: str) -> None: with FakeDaemon( socket_path, on_post=lambda _p, _payload, _n: ( HTTPStatus.OK, { "reservation_id": "r1", "phase": "pending", "created_at": "now", "expires_at": "soon", }, ), on_get=lambda _p, n: ( HTTPStatus.OK, {"reservation_id": "r1", "phase": "pending"} if n == 1 else { "reservation_id": "r1", "phase": "ready", "slot": "slot-1", "instance_id": "i-123", "system": "x86_64-linux", "updated_at": "now", }, ), ): step = CapacityGateStep( name="gate", daemon_socket=socket_path, system_property="system", default_system="x86_64-linux", reserve_timeout_seconds=2, poll_interval_seconds=0.01, retry_max_attempts=2, retry_base_seconds=0.001, ) build = FakeBuild() _attach_build(step, build) result = _run_step(step) assert result == util.SUCCESS assert build.getProperty("autoscale_reservation_id") == "r1" assert build.getProperty("autoscale_slot") == "slot-1" assert build.getProperty("autoscale_instance_id") == "i-123" assert isinstance(build.getProperty("autoscale_wait_seconds"), int) def test_gate_failure_on_failed_phase(socket_path: str) -> None: with FakeDaemon( socket_path, on_post=lambda _p, _payload, _n: ( HTTPStatus.OK, {"reservation_id": "r1", "phase": "pending"}, ), on_get=lambda _p, _n: ( HTTPStatus.OK, {"reservation_id": "r1", "phase": "failed", "reason": "no"}, ), ): step = CapacityGateStep( name="gate", daemon_socket=socket_path, system_property="system", default_system="x86_64-linux", reserve_timeout_seconds=2, poll_interval_seconds=0.01, retry_max_attempts=2, retry_base_seconds=0.001, ) build = FakeBuild() _attach_build(step, build) result = _run_step(step) assert result == util.FAILURE def test_gate_failure_on_timeout(socket_path: str) -> None: with FakeDaemon( socket_path, on_post=lambda _p, _payload, _n: ( HTTPStatus.OK, {"reservation_id": "r1", "phase": "pending"}, ), on_get=lambda _p, _n: (HTTPStatus.OK, {"reservation_id": "r1", "phase": "pending"}), ): step = CapacityGateStep( name="gate", daemon_socket=socket_path, system_property="system", default_system="x86_64-linux", reserve_timeout_seconds=0, poll_interval_seconds=0.01, retry_max_attempts=2, retry_base_seconds=0.001, ) build = FakeBuild() _attach_build(step, build) result = _run_step(step) assert result == util.FAILURE def test_release_skipped_without_reservation(socket_path: str) -> None: step = CapacityReleaseStep( name="release", daemon_socket=socket_path, retry_max_attempts=2, retry_base_seconds=0.001, ) build = FakeBuild() _attach_build(step, build) result = _run_step(step) assert result == util.SKIPPED def test_release_warnings_on_retry_exhausted_500(socket_path: str) -> None: with FakeDaemon( socket_path, on_post=lambda _p, _payload, _n: (HTTPStatus.INTERNAL_SERVER_ERROR, {"error": "boom"}), on_get=lambda _p, _n: (HTTPStatus.OK, {}), ): step = CapacityReleaseStep( name="release", daemon_socket=socket_path, retry_max_attempts=2, retry_base_seconds=0.001, ) build = FakeBuild() build.setProperty("autoscale_reservation_id", "r1", "test") _attach_build(step, build) result = _run_step(step) assert result == util.WARNINGS def test_release_success(socket_path: str) -> None: with FakeDaemon( socket_path, on_post=lambda _p, _payload, _n: ( HTTPStatus.OK, {"reservation_id": "r1", "phase": "released"}, ), on_get=lambda _p, _n: (HTTPStatus.OK, {}), ): step = CapacityReleaseStep( name="release", daemon_socket=socket_path, retry_max_attempts=2, retry_base_seconds=0.001, ) build = FakeBuild() build.setProperty("autoscale_reservation_id", "r1", "test") _attach_build(step, build) result = _run_step(step) assert result == util.SUCCESS