majuna/app/terraform/block_ooni.py

81 lines
2.7 KiB
Python
Raw Normal View History

2022-04-20 15:34:11 +01:00
from collections import defaultdict
from datetime import datetime
from datetime import timedelta
from typing import Dict
import requests
from app import app
from app.extensions import db
2022-04-22 14:01:16 +01:00
from app.models.alarms import Alarm, AlarmState
from app.models.mirrors import Proxy
2022-04-20 15:34:11 +01:00
def check_origin(domain_name: str):
start_date = (datetime.utcnow() - timedelta(days=2)).strftime("%Y-%m-%dT%H%%3A%M")
end_date = datetime.utcnow().strftime("%Y-%m-%dT%H%%3A%M")
api_url = f"https://api.ooni.io/api/v1/measurements?domain={domain_name}&since={start_date}&until={end_date}"
result = defaultdict(lambda: {"anomaly": 0, "confirmed": 0, "failure": 0, "ok": 0})
return _check_origin(api_url, result)
def _check_origin(api_url: str, result: Dict):
print(f"Processing {api_url}")
req = requests.get(api_url).json()
if not req['results']:
return result
for r in req['results']:
not_ok = False
for s in ["anomaly", "confirmed", "failure"]:
if s in r and r[s]:
result[r["probe_cc"]][s] += 1
not_ok = True
break
if not not_ok:
result[r["probe_cc"]]["ok"] += 1
if req['metadata']['next_url']:
return _check_origin(req['metadata']['next_url'], result)
return result
def threshold_origin(domain_name):
ooni = check_origin(domain_name)
for country in ooni:
total = sum([
ooni[country]["anomaly"],
ooni[country]["confirmed"],
ooni[country]["failure"],
ooni[country]["ok"]
])
total_blocks = sum([
ooni[country]["anomaly"],
ooni[country]["confirmed"]
])
block_perc = round((total_blocks / total * 100), 1)
ooni[country]["block_perc"] = block_perc
ooni[country]["state"] = AlarmState.CRITICAL if block_perc > 20 else AlarmState.OK
ooni[country]["message"] = f"Blocked in {block_perc}% of measurements"
return ooni
def set_ooni_alarm(origin_id: int, country: str, state: AlarmState, text: str):
alarm = Alarm.query.filter(
Alarm.origin_id == origin_id,
Alarm.alarm_type == f"origin-block-ooni-{country}"
).first()
if alarm is None:
alarm = Alarm()
alarm.origin_id = origin_id
alarm.alarm_type = f"origin-block-ooni-{country}"
alarm.target = "origin"
db.session.add(alarm)
alarm.update_state(state, text)
with app.app_context():
origins = Origin.query.filter(Origin.destroyed == None).all()
for origin in origins:
ooni = threshold_origin(origin.domain_name)
for country in ooni:
set_ooni_alarm(origin.id, country.lower(), ooni[country]["state"], ooni[country]["message"])