from collections import defaultdict from datetime import datetime from datetime import timedelta from typing import Dict import requests from app import app from app.extensions import db from app.models.alarms import Alarm, AlarmState from app.models.mirrors import Origin def check_origin(domain_name: str): start_date = (datetime.utcnow() - timedelta(days=2)).strftime("%Y-%m-%dT%H%%3A%M") end_date = datetime.utcnow().strftime("%Y-%m-%dT%H%%3A%M") api_url = f"https://api.ooni.io/api/v1/measurements?domain={domain_name}&since={start_date}&until={end_date}" result = defaultdict(lambda: {"anomaly": 0, "confirmed": 0, "failure": 0, "ok": 0}) return _check_origin(api_url, result) def _check_origin(api_url: str, result: Dict): print(f"Processing {api_url}") req = requests.get(api_url).json() if not req['results']: return result for r in req['results']: not_ok = False for s in ["anomaly", "confirmed", "failure"]: if s in r and r[s]: result[r["probe_cc"]][s] += 1 not_ok = True break if not not_ok: result[r["probe_cc"]]["ok"] += 1 if req['metadata']['next_url']: return _check_origin(req['metadata']['next_url'], result) return result def threshold_origin(domain_name): ooni = check_origin(domain_name) for country in ooni: total = sum([ ooni[country]["anomaly"], ooni[country]["confirmed"], ooni[country]["failure"], ooni[country]["ok"] ]) total_blocks = sum([ ooni[country]["anomaly"], ooni[country]["confirmed"] ]) block_perc = round((total_blocks / total * 100), 1) ooni[country]["block_perc"] = block_perc ooni[country]["state"] = AlarmState.CRITICAL if block_perc > 20 else AlarmState.OK ooni[country]["message"] = f"Blocked in {block_perc}% of measurements" return ooni def set_ooni_alarm(origin_id: int, country: str, state: AlarmState, text: str): alarm = Alarm.query.filter( Alarm.origin_id == origin_id, Alarm.alarm_type == f"origin-block-ooni-{country}" ).first() if alarm is None: alarm = Alarm() alarm.origin_id = origin_id alarm.alarm_type = f"origin-block-ooni-{country}" alarm.target = "origin" db.session.add(alarm) alarm.update_state(state, text) with app.app_context(): origins = Origin.query.filter(Origin.destroyed == None).all() for origin in origins: ooni = threshold_origin(origin.domain_name) for country in ooni: set_ooni_alarm(origin.id, country.lower(), ooni[country]["state"], ooni[country]["message"])