majuna/app/terraform/block/bridge.py

104 lines
3.5 KiB
Python

from datetime import datetime, timedelta
import logging
from abc import abstractmethod
from typing import Tuple, List, Callable
from app.extensions import db
from app.models.activity import Activity
from app.models.bridges import Bridge
from app.terraform import BaseAutomation
class BlockBridgeAutomation(BaseAutomation):
patterns: List[str]
def __init__(self) -> None:
"""
Constructor method.
"""
self.ips = []
self.fingerprints = []
self.hashed_fingerprints = []
super().__init__()
def perform_deprecations(self, ids: List[str], bridge_select_func: Callable[[str], Bridge]
) -> List[Tuple[str, str]]:
rotated = []
for id_ in ids:
bridge = bridge_select_func(id_)
if bridge is None:
continue
logging.debug("Found %s blocked", bridge.fingerprint)
if bridge.added > datetime.utcnow() - timedelta(hours=3):
logging.debug("Not rotating a bridge less than 3 hours old")
continue
if bridge.deprecate(reason=self.short_name):
logging.info("Rotated %s", bridge.fingerprint)
rotated.append((bridge.fingerprint, bridge.conf.provider))
else:
logging.debug("Not rotating a bridge that is already deprecated")
return rotated
def automate(self, full: bool = False) -> Tuple[bool, str]:
self.fetch()
logging.debug("Fetch complete")
self.parse()
logging.debug("Parse complete")
rotated = []
rotated.extend(self.perform_deprecations(self.ips, get_bridge_by_ip))
logging.debug("Blocked by IP")
rotated.extend(self.perform_deprecations(self.fingerprints, get_bridge_by_fingerprint))
logging.debug("Blocked by fingerprint")
rotated.extend(self.perform_deprecations(self.fingerprints, get_bridge_by_hashed_fingerprint))
logging.debug("Blocked by hashed fingerprint")
if rotated:
activity = Activity(
activity_type="block",
text=(f"[{self.short_name}] ♻ Rotated {len(rotated)} bridges: \n"
+ "\n".join([f"* {fingerprint} ({provider})" for fingerprint, provider in rotated]))
)
db.session.add(activity)
activity.notify()
db.session.commit()
return True, ""
@abstractmethod
def fetch(self) -> None:
"""
Fetch the blocklist data. It is the responsibility of the automation task
to persist this within the object for the parse step.
:return: None
"""
@abstractmethod
def parse(self) -> None:
"""
Parse the blocklist data.
:return: None
"""
def get_bridge_by_ip(ip: str) -> Bridge:
return Bridge.query.filter( # type: ignore[no-any-return]
Bridge.deprecated.is_(None),
Bridge.destroyed.is_(None),
Bridge.bridgeline.contains(f" {ip} ")
).first()
def get_bridge_by_fingerprint(fingerprint: str) -> Bridge:
return Bridge.query.filter( # type: ignore[no-any-return]
Bridge.deprecated.is_(None),
Bridge.destroyed.is_(None),
Bridge.fingerprint == fingerprint
).first()
def get_bridge_by_hashed_fingerprint(hashed_fingerprint: str) -> Bridge:
return Bridge.query.filter( # type: ignore[no-any-return]
Bridge.deprecated.is_(None),
Bridge.destroyed.is_(None),
Bridge.hashed_fingerprint == hashed_fingerprint
).first()