92 lines
3 KiB
Python
92 lines
3 KiB
Python
import functools
|
||
import os
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from datetime import datetime, timedelta
|
||
import logging
|
||
from abc import abstractmethod
|
||
from fnmatch import fnmatch
|
||
from typing import Tuple, List
|
||
|
||
from app.extensions import db
|
||
from app.models.activity import Activity
|
||
from app.models.mirrors import Proxy
|
||
from app.terraform import BaseAutomation
|
||
|
||
|
||
class BlockMirrorAutomation(BaseAutomation):
|
||
patterns: List[str]
|
||
|
||
def __init__(self) -> None:
|
||
"""
|
||
Constructor method.
|
||
"""
|
||
self.patterns = list()
|
||
super().__init__()
|
||
|
||
def automate(self, full: bool = False) -> Tuple[bool, str]:
|
||
self.fetch()
|
||
logging.debug("Fetch complete")
|
||
self.parse()
|
||
logging.debug("Parse complete")
|
||
rotated = list()
|
||
for proxy in active_proxies():
|
||
if proxy.url is None:
|
||
# Not ready yet
|
||
continue
|
||
logging.debug("Testing active proxy %s", proxy.url)
|
||
if is_match(proxy.url, self.patterns):
|
||
logging.debug("Found %s blocked", proxy.url)
|
||
if not proxy.origin.auto_rotation:
|
||
logging.debug("Proxy auto-rotation forbidden for origin")
|
||
continue
|
||
if proxy.added > datetime.utcnow() - timedelta(hours=3):
|
||
logging.debug("Not rotating a proxy less than 3 hours old")
|
||
continue
|
||
if proxy.deprecate(reason=self.short_name):
|
||
logging.info("Rotated %s", proxy.url)
|
||
rotated.append((proxy.url, proxy.origin.domain_name))
|
||
else:
|
||
logging.debug("Not rotating a proxy that is already deprecated")
|
||
if rotated:
|
||
activity = Activity(
|
||
activity_type="block",
|
||
text=(f"[{self.short_name}] ♻ Rotated {len(rotated)} proxies️️: \n"
|
||
+ "\n".join([f"* {proxy_domain} ({origin_domain})" for proxy_domain, origin_domain in rotated]))
|
||
)
|
||
db.session.add(activity)
|
||
activity.notify()
|
||
db.session.commit()
|
||
return True, ""
|
||
|
||
@abstractmethod
|
||
def fetch(self) -> None:
|
||
"""
|
||
Fetch the blocklist data. It is the responsibility of the automation task
|
||
to persist this within the object for the parse step.
|
||
|
||
:return: None
|
||
"""
|
||
|
||
@abstractmethod
|
||
def parse(self) -> None:
|
||
"""
|
||
Parse the blocklist data.
|
||
|
||
:return: None
|
||
"""
|
||
|
||
|
||
def active_proxies() -> List[Proxy]:
|
||
return Proxy.query.filter( # type: ignore[no-any-return]
|
||
Proxy.deprecated.is_(None),
|
||
Proxy.destroyed.is_(None)
|
||
).all()
|
||
|
||
|
||
def is_match(test_url: str, patterns: List[str]):
|
||
with ThreadPoolExecutor(os.cpu_count() - 1) as executor:
|
||
url_fnmatch = functools.partial(fnmatch, test_url)
|
||
for result in executor.map(fnmatch, patterns):
|
||
if result:
|
||
return True
|
||
return False
|