parent
8abe5d60fa
commit
10b60b0206
4 changed files with 90 additions and 74 deletions
|
@ -1,57 +1,60 @@
|
|||
from typing import Tuple
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
import requests
|
||||
|
||||
from app import app
|
||||
from app.extensions import db
|
||||
from app.models.mirrors import Proxy
|
||||
from app.terraform import BaseAutomation
|
||||
|
||||
|
||||
def check_blocks():
|
||||
user_agent = {'User-agent': 'BypassCensorship/1.0'}
|
||||
page = requests.get(app.config['EXTERNAL_CHECK_URL'], headers=user_agent)
|
||||
soup = BeautifulSoup(page.content, 'html.parser')
|
||||
h2 = soup.find_all('h2')
|
||||
div = soup.find_all('div', class_="overflow-auto mb-5")
|
||||
class BlockExternalAutomation(BaseAutomation):
|
||||
short_name = "block_external"
|
||||
description = "Import proxy reachability results from external source"
|
||||
|
||||
results = {}
|
||||
def automate(self, full: bool = False) -> Tuple[bool, str]:
|
||||
user_agent = {'User-agent': 'BypassCensorship/1.0'}
|
||||
page = requests.get(app.config['EXTERNAL_CHECK_URL'], headers=user_agent)
|
||||
soup = BeautifulSoup(page.content, 'html.parser')
|
||||
h2 = soup.find_all('h2')
|
||||
div = soup.find_all('div', class_="overflow-auto mb-5")
|
||||
|
||||
i = 0
|
||||
while i < len(h2):
|
||||
if not div[i].div:
|
||||
urls = []
|
||||
a = div[i].find_all('a')
|
||||
j = 0
|
||||
while j < len(a):
|
||||
urls.append(a[j].text)
|
||||
j += 1
|
||||
results[h2[i].text] = urls
|
||||
else:
|
||||
results[h2[i].text] = []
|
||||
i += 1
|
||||
results = {}
|
||||
|
||||
for vp in results:
|
||||
if vp not in app.config['EXTERNAL_VANTAGE_POINTS']:
|
||||
continue
|
||||
for url in results[vp]:
|
||||
print(f"Found {url} blocked")
|
||||
proxy = Proxy.query.filter(
|
||||
Proxy.provider == "cloudfront",
|
||||
Proxy.url == f"https://{url}"
|
||||
).first()
|
||||
if not proxy:
|
||||
print("Proxy not found")
|
||||
i = 0
|
||||
while i < len(h2):
|
||||
if not div[i].div:
|
||||
urls = []
|
||||
a = div[i].find_all('a')
|
||||
j = 0
|
||||
while j < len(a):
|
||||
urls.append(a[j].text)
|
||||
j += 1
|
||||
results[h2[i].text] = urls
|
||||
else:
|
||||
results[h2[i].text] = []
|
||||
i += 1
|
||||
|
||||
for vp in results:
|
||||
if vp not in app.config['EXTERNAL_VANTAGE_POINTS']:
|
||||
continue
|
||||
if not proxy.origin.auto_rotation:
|
||||
print("Proxy auto-rotation forbidden for origin")
|
||||
continue
|
||||
if proxy.deprecated:
|
||||
print("Proxy already marked blocked")
|
||||
continue
|
||||
proxy.deprecate(reason="external")
|
||||
for url in results[vp]:
|
||||
print(f"Found {url} blocked")
|
||||
proxy = Proxy.query.filter(
|
||||
Proxy.provider == "cloudfront",
|
||||
Proxy.url == f"https://{url}"
|
||||
).first()
|
||||
if not proxy:
|
||||
print("Proxy not found")
|
||||
continue
|
||||
if not proxy.origin.auto_rotation:
|
||||
print("Proxy auto-rotation forbidden for origin")
|
||||
continue
|
||||
if proxy.deprecated:
|
||||
print("Proxy already marked blocked")
|
||||
continue
|
||||
proxy.deprecate(reason="external")
|
||||
db.session.commit()
|
||||
|
||||
db.session.commit()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
with app.app_context():
|
||||
check_blocks()
|
||||
return True, ""
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue