majuna/app/terraform/block_mirror.py

89 lines
2.9 KiB
Python
Raw Normal View History

from datetime import datetime, timedelta
import logging
from abc import abstractmethod
2022-06-18 13:57:58 +01:00
import fnmatch
from typing import Tuple, List, Any, Optional
from app.extensions import db
from app.models.activity import Activity
from app.models.mirrors import Proxy
from app.terraform import BaseAutomation
class BlockMirrorAutomation(BaseAutomation):
patterns: List[str]
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Constructor method.
"""
2022-06-23 11:38:27 +01:00
self.patterns = []
super().__init__(*args, **kwargs)
def automate(self, full: bool = False) -> Tuple[bool, str]:
self.fetch()
logging.debug("Fetch complete")
self.parse()
logging.debug("Parse complete")
2022-06-23 11:38:27 +01:00
rotated = []
proxy_urls = list(filter(lambda u: u is not None, active_proxy_urls()))
2022-06-18 13:57:58 +01:00
for pattern in self.patterns:
blocked_urls = fnmatch.filter(proxy_urls, pattern)
for blocked_url in blocked_urls:
if not (proxy := proxy_by_url(blocked_url)):
continue
logging.debug("Found %s blocked", proxy.url)
if not proxy.origin.auto_rotation:
logging.debug("Proxy auto-rotation forbidden for origin")
continue
if proxy.added > datetime.utcnow() - timedelta(hours=3):
logging.debug("Not rotating a proxy less than 3 hours old")
continue
if proxy.deprecate(reason=self.short_name):
logging.info("Rotated %s", proxy.url)
rotated.append((proxy.url, proxy.origin.domain_name))
else:
logging.debug("Not rotating a proxy that is already deprecated")
if rotated:
activity = Activity(
activity_type="block",
2022-06-18 13:17:36 +01:00
text=(f"[{self.short_name}] ♻ Rotated {len(rotated)} proxies: \n"
+ "\n".join([f"* {proxy_domain} ({origin_domain})" for proxy_domain, origin_domain in rotated]))
)
db.session.add(activity)
activity.notify()
db.session.commit()
return True, ""
@abstractmethod
2022-06-18 13:17:36 +01:00
def fetch(self) -> None:
"""
Fetch the blocklist data. It is the responsibility of the automation task
to persist this within the object for the parse step.
:return: None
"""
@abstractmethod
2022-06-18 13:17:36 +01:00
def parse(self) -> None:
"""
Parse the blocklist data.
:return: None
"""
2022-06-18 13:57:58 +01:00
def active_proxy_urls() -> List[str]:
return [proxy.url for proxy in Proxy.query.filter(
Proxy.deprecated.is_(None),
Proxy.destroyed.is_(None)
2022-06-18 13:57:58 +01:00
).all()]
def proxy_by_url(url: str) -> Optional[Proxy]:
2022-06-22 16:38:19 +01:00
return Proxy.query.filter( # type: ignore[no-any-return]
2022-06-18 13:57:58 +01:00
Proxy.deprecated.is_(None),
Proxy.destroyed.is_(None),
Proxy.url == url
).first()