add timeout safeguards for all slot lifecycle stages
This commit is contained in:
parent
48ff711f39
commit
3be933f16b
4 changed files with 356 additions and 23 deletions
|
|
@ -25,6 +25,7 @@ if TYPE_CHECKING:
|
|||
from .state_db import StateDB
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
_TERMINAL_OR_STOPPED_STATES = ("terminated", "shutting-down", "stopping", "stopped")
|
||||
|
||||
|
||||
class Reconciler:
|
||||
|
|
@ -113,11 +114,26 @@ class Reconciler:
|
|||
if ec2_state == "running":
|
||||
self._db.update_slot_state(slot["slot_id"], SlotState.BOOTING)
|
||||
log.info("slot_booting", extra={"slot_id": slot["slot_id"]})
|
||||
elif ec2_state in ("terminated", "shutting-down"):
|
||||
self._db.update_slot_state(slot["slot_id"], SlotState.ERROR)
|
||||
log.warning(
|
||||
"slot_launch_terminated",
|
||||
extra={"slot_id": slot["slot_id"], "ec2_state": ec2_state},
|
||||
return
|
||||
|
||||
if ec2_state in _TERMINAL_OR_STOPPED_STATES:
|
||||
self._begin_termination(
|
||||
slot,
|
||||
reason="slot_launch_lost",
|
||||
extra={"ec2_state": ec2_state},
|
||||
)
|
||||
return
|
||||
|
||||
age_seconds = self._slot_state_age_seconds(slot)
|
||||
if age_seconds >= self._config.capacity.launch_timeout_seconds:
|
||||
self._begin_termination(
|
||||
slot,
|
||||
reason="slot_launch_timeout",
|
||||
extra={
|
||||
"ec2_state": ec2_state,
|
||||
"age_seconds": age_seconds,
|
||||
"timeout_seconds": self._config.capacity.launch_timeout_seconds,
|
||||
},
|
||||
)
|
||||
|
||||
def _handle_booting(self, slot: dict) -> None:
|
||||
|
|
@ -130,11 +146,11 @@ class Reconciler:
|
|||
info = self._runtime.describe_instance(instance_id)
|
||||
ec2_state = info["state"]
|
||||
|
||||
if ec2_state in ("terminated", "shutting-down"):
|
||||
self._db.update_slot_state(slot["slot_id"], SlotState.ERROR)
|
||||
log.warning(
|
||||
"slot_boot_terminated",
|
||||
extra={"slot_id": slot["slot_id"], "ec2_state": ec2_state},
|
||||
if ec2_state in _TERMINAL_OR_STOPPED_STATES:
|
||||
self._begin_termination(
|
||||
slot,
|
||||
reason="slot_boot_lost",
|
||||
extra={"ec2_state": ec2_state},
|
||||
)
|
||||
return
|
||||
|
||||
|
|
@ -150,10 +166,49 @@ class Reconciler:
|
|||
extra={"slot_id": slot["slot_id"]},
|
||||
exc_info=True,
|
||||
)
|
||||
return
|
||||
|
||||
age_seconds = self._slot_state_age_seconds(slot)
|
||||
if age_seconds >= self._config.capacity.boot_timeout_seconds:
|
||||
self._begin_termination(
|
||||
slot,
|
||||
reason="slot_boot_timeout",
|
||||
extra={
|
||||
"ec2_state": ec2_state,
|
||||
"age_seconds": age_seconds,
|
||||
"timeout_seconds": self._config.capacity.boot_timeout_seconds,
|
||||
},
|
||||
)
|
||||
|
||||
def _handle_binding(self, slot: dict, haproxy_health: dict) -> None:
|
||||
"""Check HAProxy health to determine when slot is ready."""
|
||||
slot_id = slot["slot_id"]
|
||||
instance_id = slot.get("instance_id")
|
||||
if instance_id:
|
||||
info = self._runtime.describe_instance(instance_id)
|
||||
ec2_state = info["state"]
|
||||
if ec2_state in _TERMINAL_OR_STOPPED_STATES:
|
||||
self._begin_termination(
|
||||
slot,
|
||||
reason="slot_binding_lost",
|
||||
extra={"ec2_state": ec2_state},
|
||||
)
|
||||
self._binding_up_counts.pop(slot_id, None)
|
||||
return
|
||||
|
||||
age_seconds = self._slot_state_age_seconds(slot)
|
||||
if age_seconds >= self._config.capacity.binding_timeout_seconds:
|
||||
self._begin_termination(
|
||||
slot,
|
||||
reason="slot_binding_timeout",
|
||||
extra={
|
||||
"age_seconds": age_seconds,
|
||||
"timeout_seconds": self._config.capacity.binding_timeout_seconds,
|
||||
},
|
||||
)
|
||||
self._binding_up_counts.pop(slot_id, None)
|
||||
return
|
||||
|
||||
health = haproxy_health.get(slot_id)
|
||||
|
||||
if health is not None and health.status == "UP":
|
||||
|
|
@ -174,18 +229,6 @@ class Reconciler:
|
|||
except HAProxyError:
|
||||
pass
|
||||
|
||||
# Check if instance is still alive
|
||||
instance_id = slot.get("instance_id")
|
||||
if instance_id:
|
||||
info = self._runtime.describe_instance(instance_id)
|
||||
if info["state"] in ("terminated", "shutting-down"):
|
||||
self._db.update_slot_state(slot_id, SlotState.ERROR)
|
||||
self._binding_up_counts.pop(slot_id, None)
|
||||
log.warning(
|
||||
"slot_binding_terminated",
|
||||
extra={"slot_id": slot_id},
|
||||
)
|
||||
|
||||
def _handle_ready(self, slot: dict, ec2_by_slot: dict[str, dict]) -> None:
|
||||
"""Verify EC2 instance is still alive for ready slots."""
|
||||
slot_id = slot["slot_id"]
|
||||
|
|
@ -251,11 +294,72 @@ class Reconciler:
|
|||
return
|
||||
|
||||
info = self._runtime.describe_instance(instance_id)
|
||||
if info["state"] == "terminated":
|
||||
state = info["state"]
|
||||
if state == "terminated":
|
||||
self._db.update_slot_state(
|
||||
slot_id, SlotState.EMPTY, instance_id=None, instance_ip=None, lease_count=0
|
||||
)
|
||||
log.info("slot_emptied", extra={"slot_id": slot_id})
|
||||
return
|
||||
|
||||
age_seconds = self._slot_state_age_seconds(slot)
|
||||
if age_seconds >= self._config.capacity.terminating_timeout_seconds:
|
||||
self._terminate_instance_best_effort(slot_id, instance_id)
|
||||
# Reset last_state_change after a retry so repeated retries are paced.
|
||||
self._db.update_slot_state(slot_id, SlotState.TERMINATING)
|
||||
log.warning(
|
||||
"slot_termination_timeout_retry",
|
||||
extra={
|
||||
"slot_id": slot_id,
|
||||
"instance_id": instance_id,
|
||||
"ec2_state": state,
|
||||
"age_seconds": age_seconds,
|
||||
"timeout_seconds": self._config.capacity.terminating_timeout_seconds,
|
||||
},
|
||||
)
|
||||
|
||||
def _slot_state_age_seconds(self, slot: dict) -> float:
|
||||
now = self._clock.now()
|
||||
last_change = datetime.fromisoformat(slot["last_state_change"])
|
||||
return (now - last_change).total_seconds()
|
||||
|
||||
def _begin_termination(self, slot: dict, reason: str, extra: dict | None = None) -> None:
|
||||
slot_id = slot["slot_id"]
|
||||
instance_id = slot.get("instance_id")
|
||||
if instance_id:
|
||||
self._terminate_instance_best_effort(slot_id, instance_id)
|
||||
self._db.update_slot_state(slot_id, SlotState.TERMINATING)
|
||||
else:
|
||||
self._db.update_slot_state(
|
||||
slot_id, SlotState.EMPTY, instance_id=None, instance_ip=None, lease_count=0
|
||||
)
|
||||
|
||||
payload = {"slot_id": slot_id}
|
||||
if instance_id:
|
||||
payload["instance_id"] = instance_id
|
||||
if extra:
|
||||
payload.update(extra)
|
||||
log.warning(reason, extra=payload)
|
||||
|
||||
def _terminate_instance_best_effort(self, slot_id: str, instance_id: str) -> None:
|
||||
try:
|
||||
self._runtime.terminate_instance(instance_id)
|
||||
self._metrics.counter(
|
||||
"autoscaler_ec2_terminate_total",
|
||||
{"result": "success"},
|
||||
1.0,
|
||||
)
|
||||
except Exception:
|
||||
self._metrics.counter(
|
||||
"autoscaler_ec2_terminate_total",
|
||||
{"result": "error"},
|
||||
1.0,
|
||||
)
|
||||
log.warning(
|
||||
"terminate_failed",
|
||||
extra={"slot_id": slot_id, "instance_id": instance_id},
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
def _update_metrics(self, tick_duration: float) -> None:
|
||||
"""Emit reconciler metrics."""
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue