55 lines
1.5 KiB
Python
55 lines
1.5 KiB
Python
|
|
"""HAProxy runtime socket adapter — stub for Plan 02."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from dataclasses import dataclass
|
||
|
|
|
||
|
|
|
||
|
|
class HAProxyError(Exception):
|
||
|
|
"""Error communicating with HAProxy runtime socket."""
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class SlotHealth:
|
||
|
|
"""Health status for a single HAProxy server slot."""
|
||
|
|
|
||
|
|
status: str
|
||
|
|
scur: int
|
||
|
|
qcur: int
|
||
|
|
|
||
|
|
|
||
|
|
class HAProxyRuntime:
|
||
|
|
"""HAProxy runtime CLI adapter via Unix socket.
|
||
|
|
|
||
|
|
Full implementation in Plan 02.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, socket_path: str, backend: str, slot_prefix: str) -> None:
|
||
|
|
self._socket_path = socket_path
|
||
|
|
self._backend = backend
|
||
|
|
self._slot_prefix = slot_prefix
|
||
|
|
|
||
|
|
def set_slot_addr(self, slot_id: str, ip: str, port: int = 22) -> None:
|
||
|
|
"""Update server address for a slot."""
|
||
|
|
raise NotImplementedError
|
||
|
|
|
||
|
|
def enable_slot(self, slot_id: str) -> None:
|
||
|
|
"""Enable a server slot."""
|
||
|
|
raise NotImplementedError
|
||
|
|
|
||
|
|
def disable_slot(self, slot_id: str) -> None:
|
||
|
|
"""Disable a server slot."""
|
||
|
|
raise NotImplementedError
|
||
|
|
|
||
|
|
def slot_is_up(self, slot_id: str) -> bool:
|
||
|
|
"""Return True when HAProxy health status is UP for slot."""
|
||
|
|
raise NotImplementedError
|
||
|
|
|
||
|
|
def slot_session_count(self, slot_id: str) -> int:
|
||
|
|
"""Return current active session count for slot."""
|
||
|
|
raise NotImplementedError
|
||
|
|
|
||
|
|
def read_slot_health(self) -> dict[str, SlotHealth]:
|
||
|
|
"""Return full stats snapshot for all slots."""
|
||
|
|
raise NotImplementedError
|