import datetime from typing import Tuple from bs4 import BeautifulSoup import requests from app import app from app.extensions import db from app.models.activity import Activity from app.models.mirrors import Proxy from app.terraform import BaseAutomation class BlockExternalAutomation(BaseAutomation): short_name = "block_external" description = "Import proxy reachability results from external source" def automate(self, full: bool = False) -> Tuple[bool, str]: user_agent = {'User-agent': 'BypassCensorship/1.0'} page = requests.get(app.config['EXTERNAL_CHECK_URL'], headers=user_agent) soup = BeautifulSoup(page.content, 'html.parser') h2 = soup.find_all('h2') div = soup.find_all('div', class_="overflow-auto mb-5") results = {} i = 0 while i < len(h2): if not div[i].div: urls = [] a = div[i].find_all('a') j = 0 while j < len(a): urls.append(a[j].text) j += 1 results[h2[i].text] = urls else: results[h2[i].text] = [] i += 1 activities = [] blocked_proxies = [] for vp in results: if vp not in app.config['EXTERNAL_VANTAGE_POINTS']: continue for url in results[vp]: print(f"Found {url} blocked") proxy = Proxy.query.filter( Proxy.provider == "cloudfront", Proxy.url == f"https://{url}" ).first() if not proxy: print("Proxy not found") continue if not proxy.origin.auto_rotation: print("Proxy auto-rotation forbidden for origin") continue if proxy.deprecated: print("Proxy already marked blocked") continue if proxy.added > datetime.datetime.utcnow() - datetime.timedelta(hours=3): activities.append(Activity( activity_type="block_warning", text=( f"Proxy {proxy.url} for {proxy.origin.domain_name} detected blocked according to " "external source. REFUSING to rotate because this proxy is less than 3 hours old."))) continue blocked_proxies.append(proxy) if len(blocked_proxies) <= 15: for proxy in blocked_proxies: activities.append(Activity( activity_type="block", text=(f"Proxy {proxy.url} for {proxy.origin.domain_name} detected blocked according to external " "source. Rotation scheduled.") )) proxy.deprecate(reason="external") else: activities.append(Activity( activity_type="block_warning", text=( "More than 15 proxies were marked blocked according to external source. REFUSING to rotate."))) for a in activities: a.notify() db.session.add(a) db.session.commit() return True, ""