80 lines
3 KiB
Python
80 lines
3 KiB
Python
from collections import defaultdict
|
|
from datetime import datetime
|
|
from datetime import timedelta
|
|
from typing import Dict, Tuple, Any
|
|
|
|
import requests
|
|
|
|
from app.alarms import get_or_create_alarm
|
|
from app.extensions import db
|
|
from app.models.alarms import AlarmState
|
|
from app.models.mirrors import Origin
|
|
from app.terraform import BaseAutomation
|
|
|
|
|
|
def check_origin(domain_name: str) -> Dict[str, Any]:
|
|
start_date = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%dT%H%%3A%M")
|
|
end_date = datetime.utcnow().strftime("%Y-%m-%dT%H%%3A%M")
|
|
api_url = f"https://api.ooni.io/api/v1/measurements?domain={domain_name}&since={start_date}&until={end_date}"
|
|
result: Dict[str, Dict[str, int]] = defaultdict(lambda: {"anomaly": 0, "confirmed": 0, "failure": 0, "ok": 0})
|
|
return _check_origin(api_url, result)
|
|
|
|
|
|
def _check_origin(api_url: str, result: Dict[str, Any]) -> Dict[str, Any]:
|
|
print(f"Processing {api_url}")
|
|
req = requests.get(api_url, timeout=30).json()
|
|
if 'results' not in req or not req['results']:
|
|
return result
|
|
for r in req['results']:
|
|
not_ok = False
|
|
for status in ["anomaly", "confirmed", "failure"]:
|
|
if status in r and r[status]:
|
|
result[r["probe_cc"]][status] += 1
|
|
not_ok = True
|
|
break
|
|
if not not_ok:
|
|
result[r["probe_cc"]]["ok"] += 1
|
|
if req['metadata']['next_url']:
|
|
return _check_origin(req['metadata']['next_url'], result)
|
|
return result
|
|
|
|
|
|
def threshold_origin(domain_name: str) -> Dict[str, Any]:
|
|
ooni = check_origin(domain_name)
|
|
for country in ooni:
|
|
total = sum([
|
|
ooni[country]["anomaly"],
|
|
ooni[country]["confirmed"],
|
|
ooni[country]["failure"],
|
|
ooni[country]["ok"]
|
|
])
|
|
total_blocks = sum([
|
|
ooni[country]["anomaly"],
|
|
ooni[country]["confirmed"]
|
|
])
|
|
block_perc = round((total_blocks / total * 100), 1)
|
|
ooni[country]["block_perc"] = block_perc
|
|
ooni[country]["state"] = AlarmState.WARNING if block_perc > 20 else AlarmState.OK
|
|
ooni[country]["message"] = f"Blocked in {block_perc}% of measurements"
|
|
return ooni
|
|
|
|
|
|
class BlockOONIAutomation(BaseAutomation):
|
|
"""
|
|
Automation task to import origin and/or proxy reachability results from OONI.
|
|
"""
|
|
|
|
short_name = "block_ooni"
|
|
description = "Import origin and/or proxy reachability results from OONI"
|
|
frequency = 240
|
|
|
|
def automate(self, full: bool = False) -> Tuple[bool, str]:
|
|
origins = Origin.query.filter(Origin.destroyed.is_(None)).all()
|
|
for origin in origins:
|
|
ooni = threshold_origin(origin.domain_name)
|
|
for country in ooni:
|
|
alarm = get_or_create_alarm(origin.brn,
|
|
f"origin-block-ooni-{country.lower()}")
|
|
alarm.update_state(ooni[country]["state"], ooni[country]["message"])
|
|
db.session.commit()
|
|
return True, ""
|