majuna/app/terraform/block/bridge.py

108 lines
3.7 KiB
Python
Raw Normal View History

from datetime import datetime, timedelta
import logging
from abc import abstractmethod
2022-12-07 14:04:33 +00:00
from typing import Tuple, List, Callable, Optional, Any
from app.extensions import db
from app.models.activity import Activity
from app.models.bridges import Bridge
from app.terraform import BaseAutomation
class BlockBridgeAutomation(BaseAutomation):
2022-08-12 12:24:56 +01:00
ips: List[str]
fingerprints: List[str]
hashed_fingerprints: List[str]
patterns: List[str]
2022-12-07 14:04:33 +00:00
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Constructor method.
"""
self.ips = []
self.fingerprints = []
self.hashed_fingerprints = []
super().__init__(*args, **kwargs)
2022-08-12 12:24:56 +01:00
def perform_deprecations(self, ids: List[str], bridge_select_func: Callable[[str], Optional[Bridge]]
) -> List[Tuple[str, str]]:
rotated = []
for id_ in ids:
bridge = bridge_select_func(id_)
if bridge is None:
continue
logging.debug("Found %s blocked", bridge.fingerprint)
if bridge.added > datetime.utcnow() - timedelta(hours=3):
logging.debug("Not rotating a bridge less than 3 hours old")
continue
if bridge.deprecate(reason=self.short_name):
logging.info("Rotated %s", bridge.fingerprint)
rotated.append((bridge.fingerprint, bridge.conf.provider))
else:
logging.debug("Not rotating a bridge that is already deprecated")
return rotated
def automate(self, full: bool = False) -> Tuple[bool, str]:
self.fetch()
logging.debug("Fetch complete")
self.parse()
logging.debug("Parse complete")
rotated = []
rotated.extend(self.perform_deprecations(self.ips, get_bridge_by_ip))
2022-07-12 11:14:17 +01:00
logging.debug("Blocked by IP")
rotated.extend(self.perform_deprecations(self.fingerprints, get_bridge_by_fingerprint))
2022-07-12 11:14:17 +01:00
logging.debug("Blocked by fingerprint")
rotated.extend(self.perform_deprecations(self.hashed_fingerprints, get_bridge_by_hashed_fingerprint))
2022-07-12 11:14:17 +01:00
logging.debug("Blocked by hashed fingerprint")
if rotated:
activity = Activity(
activity_type="block",
text=(f"[{self.short_name}] ♻ Rotated {len(rotated)} bridges: \n"
+ "\n".join([f"* {fingerprint} ({provider})" for fingerprint, provider in rotated]))
)
db.session.add(activity)
activity.notify()
db.session.commit()
return True, ""
@abstractmethod
def fetch(self) -> None:
"""
Fetch the blocklist data. It is the responsibility of the automation task
to persist this within the object for the parse step.
:return: None
"""
@abstractmethod
def parse(self) -> None:
"""
Parse the blocklist data.
:return: None
"""
2022-08-12 12:24:56 +01:00
def get_bridge_by_ip(ip: str) -> Optional[Bridge]:
return Bridge.query.filter( # type: ignore[no-any-return]
Bridge.deprecated.is_(None),
Bridge.destroyed.is_(None),
Bridge.bridgeline.contains(f" {ip} ")
).first()
2022-08-12 12:24:56 +01:00
def get_bridge_by_fingerprint(fingerprint: str) -> Optional[Bridge]:
return Bridge.query.filter( # type: ignore[no-any-return]
Bridge.deprecated.is_(None),
Bridge.destroyed.is_(None),
Bridge.fingerprint == fingerprint
).first()
2022-08-12 12:24:56 +01:00
def get_bridge_by_hashed_fingerprint(hashed_fingerprint: str) -> Optional[Bridge]:
return Bridge.query.filter( # type: ignore[no-any-return]
Bridge.deprecated.is_(None),
Bridge.destroyed.is_(None),
Bridge.hashed_fingerprint == hashed_fingerprint
).first()