Add remote autoscaler daemon endpoint support
This commit is contained in:
parent
95021a4253
commit
679b5c8d07
11 changed files with 291 additions and 22 deletions
|
|
@ -7,6 +7,7 @@ import socket
|
|||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
|
|
@ -45,10 +46,39 @@ class UnixSocketHTTPConnection(http.client.HTTPConnection):
|
|||
|
||||
|
||||
class DaemonClient:
|
||||
def __init__(self, socket_path: str, retry_policy: RetryPolicy) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
retry_policy: RetryPolicy,
|
||||
socket_path: str | None = None,
|
||||
base_url: str | None = None,
|
||||
auth_token: str | None = None,
|
||||
) -> None:
|
||||
if (socket_path is None) == (base_url is None):
|
||||
raise ValueError("exactly one of socket_path or base_url must be set")
|
||||
self._socket_path = socket_path
|
||||
self._base_url = base_url
|
||||
self._auth_token = auth_token.strip() if auth_token is not None else None
|
||||
self._retry = retry_policy
|
||||
|
||||
self._base_path = ""
|
||||
self._http_scheme = "http"
|
||||
self._http_host = "localhost"
|
||||
self._http_port = 80
|
||||
if base_url is not None:
|
||||
parsed = urlparse(base_url)
|
||||
if parsed.scheme not in {"http", "https"}:
|
||||
raise ValueError("base_url must use http or https scheme")
|
||||
if parsed.hostname is None:
|
||||
raise ValueError("base_url must include a hostname")
|
||||
self._http_scheme = parsed.scheme
|
||||
self._http_host = parsed.hostname
|
||||
if parsed.port is not None:
|
||||
self._http_port = parsed.port
|
||||
elif parsed.scheme == "https":
|
||||
self._http_port = 443
|
||||
self._base_path = parsed.path.rstrip("/")
|
||||
|
||||
def post_json(
|
||||
self,
|
||||
path: str,
|
||||
|
|
@ -136,12 +166,31 @@ class DaemonClient:
|
|||
timeout_seconds: float,
|
||||
payload: bytes | None,
|
||||
) -> tuple[bytes, int]:
|
||||
conn = UnixSocketHTTPConnection(self._socket_path, timeout=timeout_seconds)
|
||||
request_path = path if path.startswith("/") else f"/{path}"
|
||||
if self._base_path != "":
|
||||
request_path = f"{self._base_path}{request_path}"
|
||||
|
||||
headers = {"Accept": "application/json"}
|
||||
if payload is not None:
|
||||
headers["Content-Type"] = "application/json"
|
||||
if self._auth_token is not None:
|
||||
headers["Authorization"] = f"Bearer {self._auth_token}"
|
||||
|
||||
conn: http.client.HTTPConnection
|
||||
if self._socket_path is not None:
|
||||
conn = UnixSocketHTTPConnection(self._socket_path, timeout=timeout_seconds)
|
||||
else:
|
||||
conn = (
|
||||
http.client.HTTPSConnection(
|
||||
self._http_host, self._http_port, timeout=timeout_seconds
|
||||
)
|
||||
if self._http_scheme == "https"
|
||||
else http.client.HTTPConnection(
|
||||
self._http_host, self._http_port, timeout=timeout_seconds
|
||||
)
|
||||
)
|
||||
try:
|
||||
conn.request(method=method, url=path, body=payload, headers=headers)
|
||||
conn.request(method=method, url=request_path, body=payload, headers=headers)
|
||||
response = conn.getresponse()
|
||||
data = response.read()
|
||||
return data, response.status
|
||||
|
|
|
|||
|
|
@ -34,6 +34,8 @@ class AutoscaleConfigurator(ConfiguratorBase):
|
|||
gate = CapacityGateStep(
|
||||
name="Ensure remote builder capacity",
|
||||
daemon_socket=self.settings.daemon_socket,
|
||||
daemon_url=self.settings.daemon_url,
|
||||
daemon_auth_token=self.settings.daemon_auth_token,
|
||||
system_property=self.settings.system_property,
|
||||
default_system=self.settings.default_system,
|
||||
reserve_timeout_seconds=self.settings.reserve_timeout_seconds,
|
||||
|
|
@ -52,6 +54,8 @@ class AutoscaleConfigurator(ConfiguratorBase):
|
|||
CapacityReleaseStep(
|
||||
name="Release autoscaler reservation",
|
||||
daemon_socket=self.settings.daemon_socket,
|
||||
daemon_url=self.settings.daemon_url,
|
||||
daemon_auth_token=self.settings.daemon_auth_token,
|
||||
retry_max_attempts=self.settings.retry_max_attempts,
|
||||
retry_base_seconds=self.settings.retry_base_seconds,
|
||||
retry_max_seconds=self.settings.retry_max_seconds,
|
||||
|
|
|
|||
|
|
@ -3,7 +3,9 @@ from dataclasses import dataclass
|
|||
|
||||
@dataclass(frozen=True)
|
||||
class AutoscaleSettings:
|
||||
daemon_socket: str
|
||||
daemon_socket: str | None = "/run/nix-builder-autoscaler/daemon.sock"
|
||||
daemon_url: str | None = None
|
||||
daemon_auth_token: str | None = None
|
||||
system_property: str = "system"
|
||||
default_system: str = "x86_64-linux"
|
||||
reserve_timeout_seconds: int = 900
|
||||
|
|
|
|||
|
|
@ -20,7 +20,9 @@ class CapacityGateStep(buildstep.BuildStep):
|
|||
def __init__(
|
||||
self,
|
||||
*,
|
||||
daemon_socket: str,
|
||||
daemon_socket: str | None = None,
|
||||
daemon_url: str | None = None,
|
||||
daemon_auth_token: str | None = None,
|
||||
system_property: str,
|
||||
default_system: str,
|
||||
reserve_timeout_seconds: int,
|
||||
|
|
@ -36,12 +38,14 @@ class CapacityGateStep(buildstep.BuildStep):
|
|||
self._reserve_timeout_seconds = reserve_timeout_seconds
|
||||
self._poll_interval_seconds = poll_interval_seconds
|
||||
self._client = DaemonClient(
|
||||
socket_path=daemon_socket,
|
||||
retry_policy=RetryPolicy(
|
||||
max_attempts=retry_max_attempts,
|
||||
base_seconds=retry_base_seconds,
|
||||
max_seconds=retry_max_seconds,
|
||||
),
|
||||
socket_path=daemon_socket,
|
||||
base_url=daemon_url,
|
||||
auth_token=daemon_auth_token,
|
||||
)
|
||||
|
||||
def _determine_system(self) -> str:
|
||||
|
|
@ -155,7 +159,9 @@ class CapacityReleaseStep(buildstep.BuildStep):
|
|||
def __init__(
|
||||
self,
|
||||
*,
|
||||
daemon_socket: str,
|
||||
daemon_socket: str | None = None,
|
||||
daemon_url: str | None = None,
|
||||
daemon_auth_token: str | None = None,
|
||||
retry_max_attempts: int,
|
||||
retry_base_seconds: float,
|
||||
retry_max_seconds: float = 5.0,
|
||||
|
|
@ -163,12 +169,14 @@ class CapacityReleaseStep(buildstep.BuildStep):
|
|||
) -> None:
|
||||
super().__init__(**kwargs)
|
||||
self._client = DaemonClient(
|
||||
socket_path=daemon_socket,
|
||||
retry_policy=RetryPolicy(
|
||||
max_attempts=retry_max_attempts,
|
||||
base_seconds=retry_base_seconds,
|
||||
max_seconds=retry_max_seconds,
|
||||
),
|
||||
socket_path=daemon_socket,
|
||||
base_url=daemon_url,
|
||||
auth_token=daemon_auth_token,
|
||||
)
|
||||
|
||||
def run(self) -> defer.Deferred[int]:
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ from collections.abc import Callable
|
|||
from contextlib import suppress
|
||||
from dataclasses import dataclass
|
||||
from http import HTTPStatus
|
||||
from http.server import BaseHTTPRequestHandler
|
||||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
|
@ -95,6 +95,87 @@ class FakeDaemon:
|
|||
os.unlink(self._socket_path)
|
||||
|
||||
|
||||
class _TCPHTTPServer(ThreadingHTTPServer):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
on_get: Callable[[str, int], tuple[int, dict[str, Any]]],
|
||||
on_post: Callable[[str, dict[str, Any], int], tuple[int, dict[str, Any]]],
|
||||
expected_auth: str | None = None,
|
||||
) -> None:
|
||||
self.on_get = on_get
|
||||
self.on_post = on_post
|
||||
self.expected_auth = expected_auth
|
||||
self.state = ServerState()
|
||||
super().__init__(("127.0.0.1", 0), _TCPHandler)
|
||||
|
||||
|
||||
class _TCPHandler(BaseHTTPRequestHandler):
|
||||
server: _TCPHTTPServer
|
||||
|
||||
def _authorize(self) -> bool:
|
||||
expected = self.server.expected_auth
|
||||
if expected is None:
|
||||
return True
|
||||
return self.headers.get("Authorization") == f"Bearer {expected}"
|
||||
|
||||
def do_GET(self) -> None: # noqa: N802
|
||||
if not self._authorize():
|
||||
self._send(HTTPStatus.UNAUTHORIZED, {"error": "unauthorized"})
|
||||
return
|
||||
self.server.state.get_count += 1
|
||||
status, body = self.server.on_get(self.path, self.server.state.get_count)
|
||||
self._send(status, body)
|
||||
|
||||
def do_POST(self) -> None: # noqa: N802
|
||||
if not self._authorize():
|
||||
self._send(HTTPStatus.UNAUTHORIZED, {"error": "unauthorized"})
|
||||
return
|
||||
self.server.state.post_count += 1
|
||||
size = int(self.headers.get("Content-Length", "0"))
|
||||
raw = self.rfile.read(size) if size else b"{}"
|
||||
payload = json.loads(raw.decode("utf-8"))
|
||||
status, body = self.server.on_post(self.path, payload, self.server.state.post_count)
|
||||
self._send(status, body)
|
||||
|
||||
def log_message(self, format: str, *args: object) -> None:
|
||||
del format, args
|
||||
|
||||
def _send(self, status: int, body: dict[str, Any]) -> None:
|
||||
encoded = json.dumps(body).encode("utf-8")
|
||||
self.send_response(status)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.send_header("Content-Length", str(len(encoded)))
|
||||
self.end_headers()
|
||||
self.wfile.write(encoded)
|
||||
|
||||
|
||||
class FakeTCPDaemon:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
on_get: Callable[[str, int], tuple[int, dict[str, Any]]],
|
||||
on_post: Callable[[str, dict[str, Any], int], tuple[int, dict[str, Any]]],
|
||||
expected_auth: str | None = None,
|
||||
) -> None:
|
||||
self._server = _TCPHTTPServer(on_get=on_get, on_post=on_post, expected_auth=expected_auth)
|
||||
self._thread = threading.Thread(target=self._server.serve_forever, daemon=True)
|
||||
|
||||
@property
|
||||
def base_url(self) -> str:
|
||||
host, port = self._server.server_address
|
||||
return f"http://{host}:{port}"
|
||||
|
||||
def __enter__(self) -> FakeTCPDaemon:
|
||||
self._thread.start()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type: object, exc: object, tb: object) -> None:
|
||||
del exc_type, exc, tb
|
||||
self._server.shutdown()
|
||||
self._server.server_close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def socket_path() -> str:
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
|
|
@ -210,3 +291,23 @@ def test_backoff_attempts_at_least_two(socket_path: str) -> None:
|
|||
)
|
||||
|
||||
assert daemon._server.state.get_count >= 2
|
||||
|
||||
|
||||
def test_get_json_success_over_http_with_auth() -> None:
|
||||
with FakeTCPDaemon(
|
||||
on_get=lambda _p, _a: (HTTPStatus.OK, {"phase": "ready"}),
|
||||
on_post=lambda _p, _payload, _a: (HTTPStatus.OK, {}),
|
||||
expected_auth="test-token",
|
||||
) as daemon:
|
||||
client = DaemonClient(
|
||||
base_url=daemon.base_url,
|
||||
auth_token="test-token",
|
||||
retry_policy=RetryPolicy(max_attempts=2, base_seconds=0.001, max_seconds=0.01),
|
||||
)
|
||||
response = client.get_json(
|
||||
"/v1/reservations/r1",
|
||||
timeout_seconds=1.0,
|
||||
retryable_statuses={429, 500, 503},
|
||||
)
|
||||
|
||||
assert response == {"phase": "ready"}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue