import logging import requests from app import app from app.terraform.block_mirror import BlockMirrorAutomation def _trim_prefix(s: str, prefix: str) -> str: if s.startswith(prefix): return s[len(prefix):] return s def trim_http_https(s: str) -> str: """ Return the string with "http://" or "https://" removed from the start of the string if present. :param s: String to modify. :return: Modified string. """ return _trim_prefix( _trim_prefix(s, "https://"), "http://") class BlockExternalAutomation(BlockMirrorAutomation): """ Automation task to import proxy reachability results from external source. """ short_name = "block_external" description = "Import proxy reachability results from external source" _content: bytes def fetch(self) -> None: user_agent = {'User-agent': 'BypassCensorship/1.0'} if isinstance(app.config.get('EXTERNAL_CHECK_URL', []), list): check_urls = app.config.get('EXTERNAL_CHECK_URL', []) elif isinstance(app.config.get('EXTERNAL_CHECK_URL'), str): check_urls = [app.config['EXTERNAL_CHECK_URL']] else: check_urls = [] for check_url in check_urls: if self._data is None: self._data = [] self._data.extend(requests.get(check_url, headers=user_agent, timeout=30).json()) def parse(self) -> None: self.patterns.extend(["https://" + trim_http_https(pattern) for pattern in self._data]) logging.debug("Found URLs: %s", self.patterns)