majuna/app/terraform/block_ooni.py

86 lines
3 KiB
Python

from collections import defaultdict
from datetime import datetime
from datetime import timedelta
from typing import Dict, Tuple
import requests
from app.extensions import db
from app.models.alarms import Alarm, AlarmState
from app.models.mirrors import Origin
from app.terraform import BaseAutomation
def check_origin(domain_name: str):
start_date = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%dT%H%%3A%M")
end_date = datetime.utcnow().strftime("%Y-%m-%dT%H%%3A%M")
api_url = f"https://api.ooni.io/api/v1/measurements?domain={domain_name}&since={start_date}&until={end_date}"
result = defaultdict(lambda: {"anomaly": 0, "confirmed": 0, "failure": 0, "ok": 0})
return _check_origin(api_url, result)
def _check_origin(api_url: str, result: Dict):
print(f"Processing {api_url}")
req = requests.get(api_url).json()
if 'results' not in req or not req['results']:
return result
for r in req['results']:
not_ok = False
for s in ["anomaly", "confirmed", "failure"]:
if s in r and r[s]:
result[r["probe_cc"]][s] += 1
not_ok = True
break
if not not_ok:
result[r["probe_cc"]]["ok"] += 1
if req['metadata']['next_url']:
return _check_origin(req['metadata']['next_url'], result)
return result
def threshold_origin(domain_name):
ooni = check_origin(domain_name)
for country in ooni:
total = sum([
ooni[country]["anomaly"],
ooni[country]["confirmed"],
ooni[country]["failure"],
ooni[country]["ok"]
])
total_blocks = sum([
ooni[country]["anomaly"],
ooni[country]["confirmed"]
])
block_perc = round((total_blocks / total * 100), 1)
ooni[country]["block_perc"] = block_perc
ooni[country]["state"] = AlarmState.WARNING if block_perc > 20 else AlarmState.OK
ooni[country]["message"] = f"Blocked in {block_perc}% of measurements"
return ooni
def set_ooni_alarm(origin_id: int, country: str, state: AlarmState, text: str):
alarm = Alarm.query.filter(
Alarm.origin_id == origin_id,
Alarm.alarm_type == f"origin-block-ooni-{country}"
).first()
if alarm is None:
alarm = Alarm()
alarm.origin_id = origin_id
alarm.alarm_type = f"origin-block-ooni-{country}"
alarm.target = "origin"
db.session.add(alarm)
alarm.update_state(state, text)
class BlockOONIAutomation(BaseAutomation):
short_name = "block_ooni"
description = "Import origin and/or proxy reachability results from OONI"
frequency = 240
def automate(self, full: bool = False) -> Tuple[bool, str]:
origins = Origin.query.filter(Origin.destroyed == None).all()
for origin in origins:
ooni = threshold_origin(origin.domain_name)
for country in ooni:
set_ooni_alarm(origin.id, country.lower(), ooni[country]["state"], ooni[country]["message"])
return True, ""