majuna/app/terraform/block_external.py

96 lines
3.6 KiB
Python

import datetime
from typing import Tuple, List, Dict
from bs4 import BeautifulSoup
import requests
from app import app
from app.extensions import db
from app.models.activity import Activity
from app.models.mirrors import Proxy
from app.terraform import BaseAutomation
class BlockExternalAutomation(BaseAutomation):
"""
Automation task to import proxy reachability results from external source.
"""
short_name = "block_external"
description = "Import proxy reachability results from external source"
content: bytes
results: Dict[str, List[str]]
def _fetch(self) -> None:
user_agent = {'User-agent': 'BypassCensorship/1.0'}
page = requests.get(app.config['EXTERNAL_CHECK_URL'], headers=user_agent)
self.content = page.content
def _parse(self) -> None:
soup = BeautifulSoup(self.content, 'html.parser')
h2 = soup.find_all('h2') # pylint: disable=invalid-name
div = soup.find_all('div', class_="overflow-auto mb-5")
results = {}
i = 0
while i < len(h2):
if not div[i].div:
urls = []
anchors = div[i].find_all('a')
j = 0
while j < len(anchors):
urls.append(anchors[j].text)
j += 1
results[h2[i].text] = urls
else:
results[h2[i].text] = []
i += 1
self.results = results
def automate(self, full: bool = False) -> Tuple[bool, str]:
# TODO: handle errors in fetching remote content
# TODO: handle errors in parsing the remote content
self._fetch()
self._parse()
activities = []
blocked_proxies = []
for vantage_point, urls in self.results.items():
if vantage_point not in app.config['EXTERNAL_VANTAGE_POINTS']:
continue
for url in urls:
print(f"Found {url} blocked")
proxy = Proxy.query.filter(
Proxy.provider == "cloudfront",
Proxy.url == f"https://{url}"
).first()
if not proxy:
print("Proxy not found")
continue
if not proxy.origin.auto_rotation:
print("Proxy auto-rotation forbidden for origin")
continue
if proxy.added > datetime.datetime.utcnow() - datetime.timedelta(hours=3):
activities.append(Activity(
activity_type="block_warning",
text=(
f"Proxy {proxy.url} for {proxy.origin.domain_name} detected blocked according to "
"external source. REFUSING to rotate because this proxy is less than 3 hours old.")))
continue
blocked_proxies.append(proxy)
if len(blocked_proxies) <= 15:
for proxy in blocked_proxies:
activities.append(Activity(
activity_type="block",
text=(f"Proxy {proxy.url} for {proxy.origin.domain_name} detected blocked according to external "
"source. Rotation scheduled.")
))
proxy.deprecate(reason="external")
else:
activities.append(Activity(
activity_type="block_warning",
text=(
"More than 15 proxies were marked blocked according to external source. REFUSING to rotate.")))
for activity in activities:
activity.notify()
db.session.add(activity)
db.session.commit()
return True, ""