from datetime import datetime, timedelta import logging from abc import abstractmethod import fnmatch from typing import Tuple, List from app.extensions import db from app.models.activity import Activity from app.models.mirrors import Proxy from app.terraform import BaseAutomation class BlockMirrorAutomation(BaseAutomation): patterns: List[str] def __init__(self) -> None: """ Constructor method. """ self.patterns = [] super().__init__() def automate(self, full: bool = False) -> Tuple[bool, str]: self.fetch() logging.debug("Fetch complete") self.parse() logging.debug("Parse complete") rotated = [] proxy_urls = list(filter(lambda u: u is not None, active_proxy_urls())) for pattern in self.patterns: blocked_urls = fnmatch.filter(proxy_urls, pattern) for blocked_url in blocked_urls: proxy = proxy_by_url(blocked_url) logging.debug("Found %s blocked", proxy.url) if not proxy.origin.auto_rotation: logging.debug("Proxy auto-rotation forbidden for origin") continue if proxy.added > datetime.utcnow() - timedelta(hours=3): logging.debug("Not rotating a proxy less than 3 hours old") continue if proxy.deprecate(reason=self.short_name): logging.info("Rotated %s", proxy.url) rotated.append((proxy.url, proxy.origin.domain_name)) else: logging.debug("Not rotating a proxy that is already deprecated") if rotated: activity = Activity( activity_type="block", text=(f"[{self.short_name}] ♻ Rotated {len(rotated)} proxies️️: \n" + "\n".join([f"* {proxy_domain} ({origin_domain})" for proxy_domain, origin_domain in rotated])) ) db.session.add(activity) activity.notify() db.session.commit() return True, "" @abstractmethod def fetch(self) -> None: """ Fetch the blocklist data. It is the responsibility of the automation task to persist this within the object for the parse step. :return: None """ @abstractmethod def parse(self) -> None: """ Parse the blocklist data. :return: None """ def active_proxy_urls() -> List[str]: return [proxy.url for proxy in Proxy.query.filter( Proxy.deprecated.is_(None), Proxy.destroyed.is_(None) ).all()] def proxy_by_url(url: str) -> Proxy: return Proxy.query.filter( # type: ignore[no-any-return] Proxy.deprecated.is_(None), Proxy.destroyed.is_(None), Proxy.url == url ).first()