majuna/app/terraform/block_external.py

86 lines
3.2 KiB
Python
Raw Normal View History

2022-05-23 10:55:59 +01:00
import datetime
from typing import Tuple
2022-03-10 14:26:22 +00:00
from bs4 import BeautifulSoup
import requests
from app import app
2022-05-01 16:23:45 +01:00
from app.extensions import db
from app.models.activity import Activity
2022-04-22 14:01:16 +01:00
from app.models.mirrors import Proxy
from app.terraform import BaseAutomation
class BlockExternalAutomation(BaseAutomation):
short_name = "block_external"
description = "Import proxy reachability results from external source"
def automate(self, full: bool = False) -> Tuple[bool, str]:
user_agent = {'User-agent': 'BypassCensorship/1.0'}
page = requests.get(app.config['EXTERNAL_CHECK_URL'], headers=user_agent)
soup = BeautifulSoup(page.content, 'html.parser')
h2 = soup.find_all('h2')
div = soup.find_all('div', class_="overflow-auto mb-5")
results = {}
i = 0
while i < len(h2):
if not div[i].div:
urls = []
a = div[i].find_all('a')
j = 0
while j < len(a):
urls.append(a[j].text)
j += 1
results[h2[i].text] = urls
else:
results[h2[i].text] = []
i += 1
activities = []
2022-05-23 10:55:59 +01:00
blocked_proxies = []
for vp in results:
if vp not in app.config['EXTERNAL_VANTAGE_POINTS']:
continue
for url in results[vp]:
print(f"Found {url} blocked")
proxy = Proxy.query.filter(
Proxy.provider == "cloudfront",
Proxy.url == f"https://{url}"
).first()
if not proxy:
print("Proxy not found")
continue
if not proxy.origin.auto_rotation:
print("Proxy auto-rotation forbidden for origin")
continue
if proxy.deprecated:
print("Proxy already marked blocked")
continue
2022-05-25 09:20:57 +01:00
if proxy.added > datetime.datetime.utcnow() - datetime.timedelta(hours=3):
2022-05-23 10:55:59 +01:00
activities.append(Activity(
activity_type="block_warning",
text=(
f"Proxy {proxy.url} for {proxy.origin.domain_name} detected blocked according to "
"external source. REFUSING to rotate because this proxy is less than 3 hours old.")))
continue
blocked_proxies.append(proxy)
if len(blocked_proxies) <= 15:
for proxy in blocked_proxies:
activities.append(Activity(
activity_type="block",
2022-05-23 10:55:59 +01:00
text=(f"Proxy {proxy.url} for {proxy.origin.domain_name} detected blocked according to external "
"source. Rotation scheduled.")
))
proxy.deprecate(reason="external")
2022-05-23 10:55:59 +01:00
else:
activities.append(Activity(
activity_type="block_warning",
text=(
"More than 15 proxies were marked blocked according to external source. REFUSING to rotate.")))
for a in activities:
2022-05-23 10:55:59 +01:00
a.notify()
db.session.add(a)
db.session.commit()
return True, ""