block: try fnmatch.filter instead

This commit is contained in:
Iain Learmonth 2022-06-18 13:57:58 +01:00
parent f78e4d67ad
commit 7ceca2ace2

View file

@ -1,10 +1,7 @@
import functools
import os
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging import logging
from abc import abstractmethod from abc import abstractmethod
from fnmatch import fnmatch import fnmatch
from typing import Tuple, List from typing import Tuple, List
from app.extensions import db from app.extensions import db
@ -29,12 +26,11 @@ class BlockMirrorAutomation(BaseAutomation):
self.parse() self.parse()
logging.debug("Parse complete") logging.debug("Parse complete")
rotated = list() rotated = list()
for proxy in active_proxies(): proxy_urls = active_proxy_urls()
if proxy.url is None: for pattern in self.patterns:
# Not ready yet blocked_urls = fnmatch.filter(proxy_urls, pattern)
continue for blocked_url in blocked_urls:
logging.debug("Testing active proxy %s", proxy.url) proxy = proxy_by_url(blocked_url)
if is_match(proxy.url, self.patterns):
logging.debug("Found %s blocked", proxy.url) logging.debug("Found %s blocked", proxy.url)
if not proxy.origin.auto_rotation: if not proxy.origin.auto_rotation:
logging.debug("Proxy auto-rotation forbidden for origin") logging.debug("Proxy auto-rotation forbidden for origin")
@ -76,17 +72,16 @@ class BlockMirrorAutomation(BaseAutomation):
""" """
def active_proxies() -> List[Proxy]: def active_proxy_urls() -> List[str]:
return Proxy.query.filter( # type: ignore[no-any-return] return [proxy.url for proxy in Proxy.query.filter(
Proxy.deprecated.is_(None), Proxy.deprecated.is_(None),
Proxy.destroyed.is_(None) Proxy.destroyed.is_(None)
).all() ).all()]
def is_match(test_url: str, patterns: List[str]): def proxy_by_url(url: str) -> Proxy:
with ThreadPoolExecutor(os.cpu_count() - 1) as executor: return Proxy.query.filter(
url_fnmatch = functools.partial(fnmatch, test_url) Proxy.deprecated.is_(None),
for result in executor.map(fnmatch, patterns): Proxy.destroyed.is_(None),
if result: Proxy.url == url
return True ).first()
return False