from collections import defaultdict from datetime import datetime from datetime import timedelta from typing import Dict, Tuple, Any import requests from app.alarms import get_or_create_alarm from app.extensions import db from app.models.alarms import AlarmState from app.models.mirrors import Origin from app.terraform import BaseAutomation def check_origin(domain_name: str) -> Dict[str, Any]: start_date = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%dT%H%%3A%M") end_date = datetime.utcnow().strftime("%Y-%m-%dT%H%%3A%M") api_url = f"https://api.ooni.io/api/v1/measurements?domain={domain_name}&since={start_date}&until={end_date}" result: Dict[str, Dict[str, int]] = defaultdict(lambda: {"anomaly": 0, "confirmed": 0, "failure": 0, "ok": 0}) return _check_origin(api_url, result) def _check_origin(api_url: str, result: Dict[str, Any]) -> Dict[str, Any]: print(f"Processing {api_url}") req = requests.get(api_url, timeout=30).json() if 'results' not in req or not req['results']: return result for r in req['results']: not_ok = False for status in ["anomaly", "confirmed", "failure"]: if status in r and r[status]: result[r["probe_cc"]][status] += 1 not_ok = True break if not not_ok: result[r["probe_cc"]]["ok"] += 1 if req['metadata']['next_url']: return _check_origin(req['metadata']['next_url'], result) return result def threshold_origin(domain_name: str) -> Dict[str, Any]: ooni = check_origin(domain_name) for country in ooni: total = sum([ ooni[country]["anomaly"], ooni[country]["confirmed"], ooni[country]["failure"], ooni[country]["ok"] ]) total_blocks = sum([ ooni[country]["anomaly"], ooni[country]["confirmed"] ]) block_perc = round((total_blocks / total * 100), 1) ooni[country]["block_perc"] = block_perc ooni[country]["state"] = AlarmState.WARNING if block_perc > 20 else AlarmState.OK ooni[country]["message"] = f"Blocked in {block_perc}% of measurements" return ooni class BlockOONIAutomation(BaseAutomation): """ Automation task to import origin and/or proxy reachability results from OONI. """ short_name = "block_ooni" description = "Import origin and/or proxy reachability results from OONI" frequency = 240 def automate(self, full: bool = False) -> Tuple[bool, str]: origins = Origin.query.filter(Origin.destroyed.is_(None)).all() for origin in origins: ooni = threshold_origin(origin.domain_name) for country in ooni: alarm = get_or_create_alarm(origin.brn, f"origin-block-ooni-{country.lower()}") alarm.update_state(ooni[country]["state"], ooni[country]["message"]) db.session.commit() return True, ""