nix-builder-autoscaler/buildbot-ext/buildbot_autoscale_ext/tests/test_steps.py

338 lines
10 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
import json
import os
import socketserver
import tempfile
import threading
from collections.abc import Callable
from contextlib import suppress
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler
from pathlib import Path
from typing import Any
import pytest
from buildbot.plugins import util
from twisted.internet import defer
from buildbot_autoscale_ext.steps import CapacityGateStep, CapacityReleaseStep
class FakeProperties:
def __init__(self, data: dict[str, Any]) -> None:
self._data = data
def getProperty(self, name: str) -> Any:
return self._data.get(name)
class FakeBuild:
def __init__(self) -> None:
self.buildid = 42
self._props: dict[str, Any] = {}
def getProperties(self) -> FakeProperties:
return FakeProperties(self._props)
def setProperty(self, key: str, value: Any, _source: str) -> None:
self._props[key] = value
def getProperty(self, key: str) -> Any:
return self._props.get(key)
class FakeLog:
def __init__(self) -> None:
self.stderr: list[str] = []
def addStderr(self, text: str) -> None:
self.stderr.append(text)
class _Handler(BaseHTTPRequestHandler):
server: _UnixHTTPServer
def do_GET(self) -> None: # noqa: N802
self.server.get_count += 1
status, body = self.server.on_get(self.path, self.server.get_count)
self._send(status, body)
def do_POST(self) -> None: # noqa: N802
self.server.post_count += 1
size = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(size) if size else b"{}"
payload = json.loads(raw.decode("utf-8"))
status, body = self.server.on_post(self.path, payload, self.server.post_count)
self._send(status, body)
def _send(self, status: int, body: dict[str, Any]) -> None:
data = json.dumps(body).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def log_message(self, format: str, *args: object) -> None:
del format, args
class _UnixHTTPServer(socketserver.UnixStreamServer):
def __init__(
self,
socket_path: str,
*,
on_get: Callable[[str, int], tuple[int, dict[str, Any]]],
on_post: Callable[[str, dict[str, Any], int], tuple[int, dict[str, Any]]],
) -> None:
self.on_get = on_get
self.on_post = on_post
self.get_count = 0
self.post_count = 0
super().__init__(socket_path, _Handler)
class FakeDaemon:
def __init__(
self,
socket_path: str,
*,
on_get: Callable[[str, int], tuple[int, dict[str, Any]]],
on_post: Callable[[str, dict[str, Any], int], tuple[int, dict[str, Any]]],
) -> None:
self._socket_path = socket_path
self._server = _UnixHTTPServer(socket_path, on_get=on_get, on_post=on_post)
self._thread = threading.Thread(target=self._server.serve_forever, daemon=True)
def __enter__(self) -> FakeDaemon:
self._thread.start()
return self
def __exit__(self, exc_type: object, exc: object, tb: object) -> None:
del exc_type, exc, tb
self._server.shutdown()
self._server.server_close()
with suppress(FileNotFoundError):
os.unlink(self._socket_path)
@pytest.fixture
def socket_path() -> str:
with tempfile.TemporaryDirectory() as tmp:
yield str(Path(tmp) / "daemon.sock")
def _attach_build(step: Any, build: FakeBuild) -> None:
object.__setattr__(build, "master", None)
object.__setattr__(step, "build", build)
object.__setattr__(step, "master", None)
async def _add_log(_name: str) -> FakeLog:
return FakeLog()
object.__setattr__(step, "addLog", _add_log)
@pytest.fixture(autouse=True)
def _patch_twisted_waits(monkeypatch: pytest.MonkeyPatch) -> None:
from buildbot.config import errors as config_errors
from buildbot.process import buildstep as buildstep_module
import buildbot_autoscale_ext.steps as steps_mod
def _defer_to_thread(func: Any, *args: object, **kwargs: object) -> defer.Deferred[object]:
try:
return defer.succeed(func(*args, **kwargs))
except Exception as exc: # noqa: BLE001
return defer.fail(exc)
def _defer_later(
_clock: object,
_seconds: float,
callback: Any,
*args: object,
**kwargs: object,
) -> defer.Deferred[object]:
return defer.succeed(callback(*args, **kwargs))
monkeypatch.setattr(steps_mod, "deferToThread", _defer_to_thread)
monkeypatch.setattr(steps_mod, "deferLater", _defer_later)
monkeypatch.setattr(config_errors, "_errors", None, raising=False)
monkeypatch.setattr(buildstep_module.config, "error", lambda *_a, **_k: None)
def _run_step(step: Any) -> int:
deferred = step.run()
out: list[int] = []
failures: list[defer.Failure] = []
deferred.addCallbacks(lambda value: out.append(value), lambda err: failures.append(err))
if failures:
failures[0].raiseException()
return out[0]
def test_gate_success_pending_then_ready(socket_path: str) -> None:
with FakeDaemon(
socket_path,
on_post=lambda _p, _payload, _n: (
HTTPStatus.OK,
{
"reservation_id": "r1",
"phase": "pending",
"created_at": "now",
"expires_at": "soon",
},
),
on_get=lambda _p, n: (
HTTPStatus.OK,
{"reservation_id": "r1", "phase": "pending"}
if n == 1
else {
"reservation_id": "r1",
"phase": "ready",
"slot": "slot-1",
"instance_id": "i-123",
"system": "x86_64-linux",
"updated_at": "now",
},
),
):
step = CapacityGateStep(
name="gate",
daemon_socket=socket_path,
system_property="system",
default_system="x86_64-linux",
reserve_timeout_seconds=2,
poll_interval_seconds=0.01,
retry_max_attempts=2,
retry_base_seconds=0.001,
)
build = FakeBuild()
_attach_build(step, build)
result = _run_step(step)
assert result == util.SUCCESS
assert build.getProperty("autoscale_reservation_id") == "r1"
assert build.getProperty("autoscale_slot") == "slot-1"
assert build.getProperty("autoscale_instance_id") == "i-123"
assert isinstance(build.getProperty("autoscale_wait_seconds"), int)
def test_gate_failure_on_failed_phase(socket_path: str) -> None:
with FakeDaemon(
socket_path,
on_post=lambda _p, _payload, _n: (
HTTPStatus.OK,
{"reservation_id": "r1", "phase": "pending"},
),
on_get=lambda _p, _n: (
HTTPStatus.OK,
{"reservation_id": "r1", "phase": "failed", "reason": "no"},
),
):
step = CapacityGateStep(
name="gate",
daemon_socket=socket_path,
system_property="system",
default_system="x86_64-linux",
reserve_timeout_seconds=2,
poll_interval_seconds=0.01,
retry_max_attempts=2,
retry_base_seconds=0.001,
)
build = FakeBuild()
_attach_build(step, build)
result = _run_step(step)
assert result == util.FAILURE
def test_gate_failure_on_timeout(socket_path: str) -> None:
with FakeDaemon(
socket_path,
on_post=lambda _p, _payload, _n: (
HTTPStatus.OK,
{"reservation_id": "r1", "phase": "pending"},
),
on_get=lambda _p, _n: (HTTPStatus.OK, {"reservation_id": "r1", "phase": "pending"}),
):
step = CapacityGateStep(
name="gate",
daemon_socket=socket_path,
system_property="system",
default_system="x86_64-linux",
reserve_timeout_seconds=0,
poll_interval_seconds=0.01,
retry_max_attempts=2,
retry_base_seconds=0.001,
)
build = FakeBuild()
_attach_build(step, build)
result = _run_step(step)
assert result == util.FAILURE
def test_release_skipped_without_reservation(socket_path: str) -> None:
step = CapacityReleaseStep(
name="release",
daemon_socket=socket_path,
retry_max_attempts=2,
retry_base_seconds=0.001,
)
build = FakeBuild()
_attach_build(step, build)
result = _run_step(step)
assert result == util.SKIPPED
def test_release_warnings_on_retry_exhausted_500(socket_path: str) -> None:
with FakeDaemon(
socket_path,
on_post=lambda _p, _payload, _n: (HTTPStatus.INTERNAL_SERVER_ERROR, {"error": "boom"}),
on_get=lambda _p, _n: (HTTPStatus.OK, {}),
):
step = CapacityReleaseStep(
name="release",
daemon_socket=socket_path,
retry_max_attempts=2,
retry_base_seconds=0.001,
)
build = FakeBuild()
build.setProperty("autoscale_reservation_id", "r1", "test")
_attach_build(step, build)
result = _run_step(step)
assert result == util.WARNINGS
def test_release_success(socket_path: str) -> None:
with FakeDaemon(
socket_path,
on_post=lambda _p, _payload, _n: (
HTTPStatus.OK,
{"reservation_id": "r1", "phase": "released"},
),
on_get=lambda _p, _n: (HTTPStatus.OK, {}),
):
step = CapacityReleaseStep(
name="release",
daemon_socket=socket_path,
retry_max_attempts=2,
retry_base_seconds=0.001,
)
build = FakeBuild()
build.setProperty("autoscale_reservation_id", "r1", "test")
_attach_build(step, build)
result = _run_step(step)
assert result == util.SUCCESS