nix-builder-autoscaler/agent/nix_builder_autoscaler/providers/haproxy.py

115 lines
4 KiB
Python
Raw Normal View History

"""HAProxy runtime socket adapter for managing builder slots."""
2026-02-27 11:59:16 +01:00
from __future__ import annotations
import csv
import io
import socket
2026-02-27 11:59:16 +01:00
from dataclasses import dataclass
class HAProxyError(Exception):
"""Error communicating with HAProxy runtime socket."""
@dataclass
class SlotHealth:
"""Health status for a single HAProxy server slot."""
status: str
scur: int
qcur: int
class HAProxyRuntime:
"""HAProxy runtime CLI adapter via Unix socket.
Communicates with HAProxy using the admin socket text protocol.
Args:
socket_path: Path to the HAProxy admin Unix socket.
backend: HAProxy backend name (e.g. "all").
slot_prefix: Server name prefix used for builder slots.
2026-02-27 11:59:16 +01:00
"""
def __init__(self, socket_path: str, backend: str, slot_prefix: str) -> None:
self._socket_path = socket_path
self._backend = backend
self._slot_prefix = slot_prefix
def set_slot_addr(self, slot_id: str, ip: str, port: int = 22) -> None:
"""Update server address for a slot."""
cmd = f"set server {self._backend}/{slot_id} addr {ip} port {port}"
resp = self._run(cmd)
self._check_response(resp, slot_id)
2026-02-27 11:59:16 +01:00
def enable_slot(self, slot_id: str) -> None:
"""Enable a server slot."""
cmd = f"enable server {self._backend}/{slot_id}"
resp = self._run(cmd)
self._check_response(resp, slot_id)
2026-02-27 11:59:16 +01:00
def disable_slot(self, slot_id: str) -> None:
"""Disable a server slot."""
cmd = f"disable server {self._backend}/{slot_id}"
resp = self._run(cmd)
self._check_response(resp, slot_id)
2026-02-27 11:59:16 +01:00
def slot_is_up(self, slot_id: str) -> bool:
"""Return True when HAProxy health status is UP for slot."""
health = self.read_slot_health()
entry = health.get(slot_id)
return entry is not None and entry.status == "UP"
2026-02-27 11:59:16 +01:00
def slot_session_count(self, slot_id: str) -> int:
"""Return current active session count for slot."""
health = self.read_slot_health()
entry = health.get(slot_id)
if entry is None:
raise HAProxyError(f"Slot not found in HAProxy stats: {slot_id}")
return entry.scur
2026-02-27 11:59:16 +01:00
def read_slot_health(self) -> dict[str, SlotHealth]:
"""Return full stats snapshot for all slots in the backend."""
raw = self._run("show stat")
reader = csv.DictReader(io.StringIO(raw))
result: dict[str, SlotHealth] = {}
for row in reader:
pxname = row.get("# pxname", "").strip()
svname = row.get("svname", "").strip()
if pxname == self._backend and svname.startswith(self._slot_prefix):
result[svname] = SlotHealth(
status=row.get("status", "").strip(),
scur=int(row.get("scur", "0")),
qcur=int(row.get("qcur", "0")),
)
return result
def _run(self, command: str) -> str:
"""Send a command to the HAProxy admin socket and return the response."""
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.connect(self._socket_path)
sock.sendall((command + "\n").encode())
sock.shutdown(socket.SHUT_WR)
chunks: list[bytes] = []
while True:
chunk = sock.recv(4096)
if not chunk:
break
chunks.append(chunk)
return b"".join(chunks).decode()
except FileNotFoundError as e:
raise HAProxyError(f"HAProxy socket not found: {self._socket_path}") from e
except ConnectionRefusedError as e:
raise HAProxyError(f"Connection refused to HAProxy socket: {self._socket_path}") from e
finally:
sock.close()
@staticmethod
def _check_response(response: str, slot_id: str) -> None:
"""Raise HAProxyError if the response indicates an error."""
stripped = response.strip()
if stripped.startswith(("No such", "Unknown")):
raise HAProxyError(f"HAProxy error for {slot_id}: {stripped}")