"""autoscalerctl CLI entry point.""" from __future__ import annotations import argparse import http.client import json import socket from collections.abc import Sequence from typing import Any class UnixHTTPConnection(http.client.HTTPConnection): """HTTPConnection that dials a Unix domain socket.""" def __init__(self, socket_path: str, timeout: float = 5.0) -> None: super().__init__("localhost", timeout=timeout) self._socket_path = socket_path def connect(self) -> None: self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.sock.connect(self._socket_path) def _uds_request( socket_path: str, method: str, path: str, body: dict[str, Any] | None = None, ) -> tuple[int, dict[str, Any] | list[dict[str, Any]] | str]: conn = UnixHTTPConnection(socket_path) headers = {"Host": "localhost", "Accept": "application/json"} payload: str | None = None if body is not None: payload = json.dumps(body) headers["Content-Type"] = "application/json" try: conn.request(method, path, body=payload, headers=headers) resp = conn.getresponse() raw = resp.read() text = raw.decode() if raw else "" content_type = resp.getheader("Content-Type", "") if text and "application/json" in content_type: parsed = json.loads(text) if isinstance(parsed, dict | list): return resp.status, parsed return resp.status, text finally: conn.close() def _print_table(headers: Sequence[str], rows: Sequence[Sequence[str]]) -> None: widths = [len(h) for h in headers] for row in rows: for idx, cell in enumerate(row): widths[idx] = max(widths[idx], len(cell)) header_line = " ".join(h.ljust(widths[idx]) for idx, h in enumerate(headers)) separator = " ".join("-" * widths[idx] for idx in range(len(headers))) print(header_line) print(separator) for row in rows: print(" ".join(cell.ljust(widths[idx]) for idx, cell in enumerate(row))) def _print_slots(data: list[dict[str, Any]]) -> None: rows: list[list[str]] = [] for slot in data: rows.append( [ str(slot.get("slot_id", "")), str(slot.get("state", "")), str(slot.get("instance_id") or "-"), str(slot.get("instance_ip") or "-"), str(slot.get("lease_count", 0)), ] ) _print_table(["slot_id", "state", "instance_id", "ip", "leases"], rows) def _print_reservations(data: list[dict[str, Any]]) -> None: rows: list[list[str]] = [] for resv in data: rows.append( [ str(resv.get("reservation_id", "")), str(resv.get("phase", "")), str(resv.get("system", "")), str(resv.get("slot") or "-"), str(resv.get("instance_id") or "-"), ] ) _print_table(["reservation_id", "phase", "system", "slot", "instance_id"], rows) def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(prog="autoscalerctl", description="Autoscaler CLI") parser.add_argument( "--socket", default="/run/nix-builder-autoscaler/daemon.sock", help="Daemon Unix socket path", ) subparsers = parser.add_subparsers(dest="command") subparsers.add_parser("status", help="Show state summary") subparsers.add_parser("slots", help="List slots") subparsers.add_parser("reservations", help="List reservations") parser_drain = subparsers.add_parser("drain", help="Drain a slot (not implemented)") parser_drain.add_argument("slot_id") parser_unq = subparsers.add_parser( "unquarantine", help="Unquarantine a slot (not implemented)", ) parser_unq.add_argument("slot_id") subparsers.add_parser("reconcile-now", help="Run reconciler now (not implemented)") return parser.parse_args() def _print_error(data: object) -> None: if isinstance(data, dict | list): print(json.dumps(data, indent=2)) else: print(str(data)) def main() -> None: """Entry point for the autoscalerctl CLI.""" args = _parse_args() if not args.command: raise SystemExit(1) if args.command in {"drain", "unquarantine", "reconcile-now"}: print(f"{args.command}: not yet implemented in API v1") raise SystemExit(0) endpoint_map = { "status": "/v1/state/summary", "slots": "/v1/slots", "reservations": "/v1/reservations", } path = endpoint_map[args.command] try: status, data = _uds_request(args.socket, "GET", path) except OSError as err: print(f"Error: cannot connect to daemon at {args.socket}") raise SystemExit(1) from err if status < 200 or status >= 300: _print_error(data) raise SystemExit(1) if args.command == "status": print(json.dumps(data, indent=2)) elif args.command == "slots": if isinstance(data, list): _print_slots(data) else: _print_error(data) raise SystemExit(1) elif args.command == "reservations": if isinstance(data, list): _print_reservations(data) else: _print_error(data) raise SystemExit(1) raise SystemExit(0) if __name__ == "__main__": main()