91 lines
3.1 KiB
Python
91 lines
3.1 KiB
Python
|
import json
|
||
|
import logging
|
||
|
import time
|
||
|
from typing import Any, Dict
|
||
|
|
||
|
import requests
|
||
|
|
||
|
from app.extensions import db
|
||
|
from app.models.mirrors import Country, Origin, Proxy, country_origin
|
||
|
from app.terraform.block_mirror import BlockMirrorAutomation
|
||
|
|
||
|
|
||
|
def clean_json_response(raw_response: str) -> Dict[str, Any]:
|
||
|
"""
|
||
|
Seems to be a bug in the API where a <script> tag is appended after the JSON.
|
||
|
"""
|
||
|
end_index = raw_response.rfind("}")
|
||
|
if end_index != -1:
|
||
|
raw_response = raw_response[:end_index + 1]
|
||
|
response: Dict[str, Any] = json.loads(raw_response)
|
||
|
return response
|
||
|
|
||
|
|
||
|
def request_test_now(test_url: str) -> str:
|
||
|
api_url = "https://blocky.greatfire.org/api/test_now"
|
||
|
headers = {
|
||
|
"User-Agent": "bypasscensorship.org",
|
||
|
"Content-Type": "application/json;charset=utf-8",
|
||
|
"Pragma": "no-cache",
|
||
|
"Cache-Control": "no-cache"
|
||
|
}
|
||
|
request_count = 0
|
||
|
while request_count < 20:
|
||
|
params = {
|
||
|
"url": test_url,
|
||
|
"timestamp": str(int(time.time())) # unix timestamp
|
||
|
}
|
||
|
response = requests.post(api_url, params=params, headers=headers, json={}, timeout=30)
|
||
|
response_data = clean_json_response(response.text)
|
||
|
print(f"Response: {response_data}")
|
||
|
if "url_test_id" in response_data.get("d", {}):
|
||
|
url_test_id: str = response_data["d"]["url_test_id"]
|
||
|
logging.debug("Test result for %s has test result ID %s", test_url, url_test_id)
|
||
|
return url_test_id
|
||
|
request_count += 1
|
||
|
time.sleep(1)
|
||
|
raise RuntimeWarning("Exceeded timeout requesting result")
|
||
|
|
||
|
|
||
|
def request_test_result(url_test_id: str) -> int:
|
||
|
url = f"https://blocky.greatfire.org/api/url_test_result/{url_test_id}"
|
||
|
headers = {
|
||
|
"User-Agent": "bypasscensorship.org",
|
||
|
"Pragma": "no-cache",
|
||
|
"Cache-Control": "no-cache"
|
||
|
}
|
||
|
response = requests.get(url, headers=headers, timeout=30)
|
||
|
response_data = response.json()
|
||
|
tests = response_data.get("d", [])
|
||
|
non_zero_curl_exit_count: int = sum(1 for test in tests if test.get("curl_exit_value") != "0")
|
||
|
logging.debug("Test result for %s has %s non-zero exit values", url_test_id, non_zero_curl_exit_count)
|
||
|
return non_zero_curl_exit_count
|
||
|
|
||
|
|
||
|
class BlockBlockyAutomation(BlockMirrorAutomation):
|
||
|
short_name = "block_blocky"
|
||
|
description = "Use the Blocky API to test for blocked mirrors in China"
|
||
|
interval = 300
|
||
|
|
||
|
def fetch(self) -> None:
|
||
|
pass
|
||
|
|
||
|
def parse(self) -> None:
|
||
|
cn_proxies = (
|
||
|
db.session.query(Proxy.url)
|
||
|
.join(Origin, Proxy.origin_id == Origin.id)
|
||
|
.join(country_origin, Origin.id == country_origin.c.origin_id)
|
||
|
.join(Country, Country.id == country_origin.c.country_id)
|
||
|
.filter(
|
||
|
Country.country_code == "CN",
|
||
|
Proxy.deprecated.is_(None),
|
||
|
Proxy.destroyed.is_(None),
|
||
|
Proxy.pool_id != -1
|
||
|
)
|
||
|
.all()
|
||
|
)
|
||
|
for proxy_url in cn_proxies:
|
||
|
test_id = request_test_now(proxy_url)
|
||
|
if request_test_result(test_id) > 1:
|
||
|
self.patterns["blocky"].append(proxy_url)
|