from typing import Tuple import requests from app.extensions import db from app.models.alarms import Alarm, AlarmState from app.models.mirrors import Proxy from app.terraform import BaseAutomation def set_http_alarm(proxy_id: int, state: AlarmState, text: str) -> None: alarm = Alarm.query.filter( Alarm.proxy_id == proxy_id, Alarm.alarm_type == "http-status" ).first() if alarm is None: alarm = Alarm() alarm.proxy_id = proxy_id alarm.alarm_type = "http-status" alarm.target = "proxy" db.session.add(alarm) alarm.update_state(state, text) class AlarmProxyHTTPStatusAutomation(BaseAutomation): short_name = "alarm_http_status" description = "Check all deployed proxies for HTTP status code" frequency = 45 def automate(self, full: bool = False) -> Tuple[bool, str]: proxies = Proxy.query.filter( Proxy.destroyed.is_(None) ) for proxy in proxies: try: if proxy.url is None: continue r = requests.get(proxy.url, allow_redirects=False, timeout=5) r.raise_for_status() if r.is_redirect: set_http_alarm( proxy.id, AlarmState.CRITICAL, f"{r.status_code} {r.reason}" ) else: set_http_alarm( proxy.id, AlarmState.OK, f"{r.status_code} {r.reason}" ) except requests.HTTPError: set_http_alarm( proxy.id, AlarmState.CRITICAL, f"{r.status_code} {r.reason}" ) except (requests.ConnectionError, requests.Timeout, ConnectionError) as e: set_http_alarm( proxy.id, AlarmState.CRITICAL, repr(e) ) return True, ""