nix-builder-autoscaler/agent/nix_builder_autoscaler/tests/test_cli.py

152 lines
5.1 KiB
Python
Raw Normal View History

"""Unit tests for autoscalerctl CLI argument and display behavior."""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
import pytest
from nix_builder_autoscaler import cli
from nix_builder_autoscaler.cli import _parse_args, _print_slots, _print_status_summary, _slot_ttl
def test_parse_args_without_command_prints_help_and_exits_zero(
capsys: pytest.CaptureFixture[str],
) -> None:
with pytest.raises(SystemExit) as exc:
_parse_args([])
assert exc.value.code == 0
captured = capsys.readouterr()
assert "Autoscaler CLI" in captured.out
assert "status" in captured.out
def test_parse_args_json_status() -> None:
args = _parse_args(["--json", "status"])
assert args.command == "status"
assert args.json is True
def test_parse_args_bulk_commands() -> None:
assert _parse_args(["drain-all"]).command == "drain-all"
assert _parse_args(["unquarantine-all"]).command == "unquarantine-all"
def test_print_status_summary_renders_metrics_table(capsys: pytest.CaptureFixture[str]) -> None:
_print_status_summary(
{
"slots": {
"total": 4,
"ready": 1,
"launching": 1,
"booting": 1,
"binding": 0,
"terminating": 0,
"empty": 1,
"error": 0,
},
"reservations": {"pending": 2, "ready": 1, "failed": 0},
"ec2": {"api_ok": True},
"haproxy": {"socket_ok": True},
}
)
out = capsys.readouterr().out
assert "metric" in out
assert "slots.total" in out
assert "reservations.pending" in out
assert "haproxy.socket_ok" in out
def test_bulk_drain_only_targets_ready_slots(monkeypatch: pytest.MonkeyPatch) -> None:
def _fake_request(socket_path: str, method: str, path: str, body=None): # noqa: ANN001
assert socket_path == "/tmp/sock"
if method == "GET" and path == "/v1/slots":
return 200, [
{"slot_id": "slot001", "state": "ready"},
{"slot_id": "slot002", "state": "booting"},
]
if method == "POST" and path == "/v1/admin/drain" and body == {"slot_id": "slot001"}:
return 200, {"state": "draining"}
raise AssertionError(f"unexpected request: {method} {path} {body}")
monkeypatch.setattr(cli, "_uds_request", _fake_request)
summary = cli._bulk_slot_action("/tmp/sock", "drain")
assert summary["matched"] == 1
assert summary["attempted"] == 1
assert summary["succeeded"] == 1
assert summary["failed"] == 0
assert summary["skipped"] == 1
def test_bulk_unquarantine_only_targets_error_slots(monkeypatch: pytest.MonkeyPatch) -> None:
def _fake_request(socket_path: str, method: str, path: str, body=None): # noqa: ANN001
assert socket_path == "/tmp/sock"
if method == "GET" and path == "/v1/slots":
return 200, [
{"slot_id": "slot001", "state": "error"},
{"slot_id": "slot002", "state": "ready"},
]
if method == "POST" and path == "/v1/admin/unquarantine" and body == {"slot_id": "slot001"}:
return 200, {"state": "empty"}
raise AssertionError(f"unexpected request: {method} {path} {body}")
monkeypatch.setattr(cli, "_uds_request", _fake_request)
summary = cli._bulk_slot_action("/tmp/sock", "unquarantine")
assert summary["matched"] == 1
assert summary["attempted"] == 1
assert summary["succeeded"] == 1
assert summary["failed"] == 0
assert summary["skipped"] == 1
def test_slot_ttl_ready_pinned_at_min_slots() -> None:
now = datetime.now(UTC)
slot = {
"state": "ready",
"lease_count": 0,
"last_state_change": (now - timedelta(seconds=60)).isoformat(),
}
policy = {
"capacity": {
"min_slots": 1,
"idle_scale_down_seconds": 900,
"launch_timeout_seconds": 300,
"boot_timeout_seconds": 300,
"binding_timeout_seconds": 180,
"drain_timeout_seconds": 120,
"terminating_timeout_seconds": 300,
},
"scheduler": {"reconcile_seconds": 15.0},
}
assert _slot_ttl(slot, policy, active_slots=1) == "pinned"
def test_print_slots_includes_ttl_column(capsys: pytest.CaptureFixture[str]) -> None:
now = datetime.now(UTC)
slots = [
{
"slot_id": "slot001",
"state": "launching",
"instance_id": "i-123",
"instance_ip": None,
"lease_count": 0,
"last_state_change": (now - timedelta(seconds=20)).isoformat(),
}
]
policy = {
"capacity": {
"min_slots": 0,
"idle_scale_down_seconds": 900,
"launch_timeout_seconds": 300,
"boot_timeout_seconds": 300,
"binding_timeout_seconds": 180,
"drain_timeout_seconds": 120,
"terminating_timeout_seconds": 300,
},
"scheduler": {"reconcile_seconds": 15.0},
}
_print_slots(slots, policy)
out = capsys.readouterr().out
assert "ttl" in out
assert "slot001" in out