from typing import Tuple from azure.identity import ClientSecretCredential from azure.mgmt.alertsmanagement import AlertsManagementClient from app import app from app.alarms import get_or_create_alarm from app.models.alarms import AlarmState from app.models.mirrors import Proxy from app.terraform import BaseAutomation class AlarmProxyAzureCdnAutomation(BaseAutomation): short_name = "monitor_proxy_azure_cdn" description = "Import alarms for Azure CDN proxies" def automate(self, full: bool = False) -> Tuple[bool, str]: credential = ClientSecretCredential( tenant_id=app.config["AZURE_TENANT_ID"], client_id=app.config["AZURE_CLIENT_ID"], client_secret=app.config["AZURE_CLIENT_SECRET"], ) client = AlertsManagementClient(credential, app.config["AZURE_SUBSCRIPTION_ID"]) firing = [ x.name[len("bandwidth-out-high-bc-") :] for x in client.alerts.get_all() if x.name.startswith("bandwidth-out-high-bc-") and x.properties.essentials.monitor_condition == "Fired" ] for proxy in Proxy.query.filter( Proxy.provider == "azure_cdn", Proxy.destroyed.is_(None) ): alarm = get_or_create_alarm(proxy.brn, "bandwidth-out-high") if proxy.origin.group.group_name.lower() not in firing: alarm.update_state(AlarmState.OK, "Azure monitor alert not firing") else: alarm.update_state(AlarmState.CRITICAL, "Azure monitor alert firing") return True, ""