majuna/app/terraform/block_external.py

85 lines
3.2 KiB
Python

import datetime
from typing import Tuple
from bs4 import BeautifulSoup
import requests
from app import app
from app.extensions import db
from app.models.activity import Activity
from app.models.mirrors import Proxy
from app.terraform import BaseAutomation
class BlockExternalAutomation(BaseAutomation):
short_name = "block_external"
description = "Import proxy reachability results from external source"
def automate(self, full: bool = False) -> Tuple[bool, str]:
user_agent = {'User-agent': 'BypassCensorship/1.0'}
page = requests.get(app.config['EXTERNAL_CHECK_URL'], headers=user_agent)
soup = BeautifulSoup(page.content, 'html.parser')
h2 = soup.find_all('h2')
div = soup.find_all('div', class_="overflow-auto mb-5")
results = {}
i = 0
while i < len(h2):
if not div[i].div:
urls = []
a = div[i].find_all('a')
j = 0
while j < len(a):
urls.append(a[j].text)
j += 1
results[h2[i].text] = urls
else:
results[h2[i].text] = []
i += 1
activities = []
blocked_proxies = []
for vp in results:
if vp not in app.config['EXTERNAL_VANTAGE_POINTS']:
continue
for url in results[vp]:
print(f"Found {url} blocked")
proxy = Proxy.query.filter(
Proxy.provider == "cloudfront",
Proxy.url == f"https://{url}"
).first()
if not proxy:
print("Proxy not found")
continue
if not proxy.origin.auto_rotation:
print("Proxy auto-rotation forbidden for origin")
continue
if proxy.deprecated:
print("Proxy already marked blocked")
continue
if proxy.added < datetime.datetime.utcnow() - datetime.timedelta(hours=3):
activities.append(Activity(
activity_type="block_warning",
text=(
f"Proxy {proxy.url} for {proxy.origin.domain_name} detected blocked according to "
"external source. REFUSING to rotate because this proxy is less than 3 hours old.")))
continue
blocked_proxies.append(proxy)
if len(blocked_proxies) <= 15:
for proxy in blocked_proxies:
activities.append(Activity(
activity_type="block",
text=(f"Proxy {proxy.url} for {proxy.origin.domain_name} detected blocked according to external "
"source. Rotation scheduled.")
))
proxy.deprecate(reason="external")
else:
activities.append(Activity(
activity_type="block_warning",
text=(
"More than 15 proxies were marked blocked according to external source. REFUSING to rotate.")))
for a in activities:
a.notify()
db.session.add(a)
db.session.commit()
return True, ""