import datetime from typing import Tuple, List, Dict from bs4 import BeautifulSoup import requests from app import app from app.extensions import db from app.models.activity import Activity from app.models.mirrors import Proxy from app.terraform import BaseAutomation class BlockExternalAutomation(BaseAutomation): """ Automation task to import proxy reachability results from external source. """ short_name = "block_external" description = "Import proxy reachability results from external source" content: bytes results: Dict[str, List[str]] def _fetch(self) -> None: user_agent = {'User-agent': 'BypassCensorship/1.0'} page = requests.get(app.config['EXTERNAL_CHECK_URL'], headers=user_agent) self.content = page.content def _parse(self) -> None: soup = BeautifulSoup(self.content, 'html.parser') h2 = soup.find_all('h2') # pylint: disable=invalid-name div = soup.find_all('div', class_="overflow-auto mb-5") results = {} i = 0 while i < len(h2): if not div[i].div: urls = [] anchors = div[i].find_all('a') j = 0 while j < len(anchors): urls.append(anchors[j].text) j += 1 results[h2[i].text] = urls else: results[h2[i].text] = [] i += 1 self.results = results def automate(self, full: bool = False) -> Tuple[bool, str]: # TODO: handle errors in fetching remote content # TODO: handle errors in parsing the remote content self._fetch() self._parse() activities = [] blocked_proxies = [] for vantage_point, urls in self.results.items(): if vantage_point not in app.config['EXTERNAL_VANTAGE_POINTS']: continue for url in urls: print(f"Found {url} blocked") proxy = Proxy.query.filter( Proxy.provider == "cloudfront", Proxy.url == f"https://{url}" ).first() if not proxy: print("Proxy not found") continue if not proxy.origin.auto_rotation: print("Proxy auto-rotation forbidden for origin") continue if proxy.added > datetime.datetime.utcnow() - datetime.timedelta(hours=3): activities.append(Activity( activity_type="block_warning", text=( f"Proxy {proxy.url} for {proxy.origin.domain_name} detected blocked according to " "external source. REFUSING to rotate because this proxy is less than 3 hours old."))) continue blocked_proxies.append(proxy) if len(blocked_proxies) <= 15: for proxy in blocked_proxies: activities.append(Activity( activity_type="block", text=(f"Proxy {proxy.url} for {proxy.origin.domain_name} detected blocked according to external " "source. Rotation scheduled.") )) proxy.deprecate(reason="external") else: activities.append(Activity( activity_type="block_warning", text=( "More than 15 proxies were marked blocked according to external source. REFUSING to rotate."))) for activity in activities: activity.notify() db.session.add(activity) db.session.commit() return True, ""