import asyncio import logging import random import string from datetime import datetime, timedelta, timezone from functools import wraps from traceback import format_exception from typing import Callable, Coroutine, Any from starlette.concurrency import run_in_threadpool logger = logging.getLogger(__name__) ALPHA_NUM = string.ascii_letters + string.digits LOWER_NUM = string.ascii_lowercase + string.digits TOP_SUBDOMAINS = [ "admin", "api", "app", "apps", "autodiscover", "baidu", "bbs", "beta", "blog", "cdn", "citrix", "cloud", "demo", "dev", "email", "en", "exchange", "forum", "ftp", "gateway", "gov", "gw", "home", "host", "images", "img", "info", "intranet", "login", "m", "mail", "mail1", "mail2", "mail3", "media", "mobile", "news", "office", "owa", "portal", "remote", "secure", "server", "server1", "shop", "ssl", "stage", "staging", "start", "static", "store", "support", "test", "web", "webmail", "wiki", "www1", "www2", ] NoArgsNoReturnFuncT = Callable[[], None] NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]] ExcArgNoReturnFuncT = Callable[[Exception], None] ExcArgNoReturnAsyncFuncT = Callable[[Exception], Coroutine[Any, Any, None]] NoArgsNoReturnAnyFuncT = NoArgsNoReturnFuncT | NoArgsNoReturnAsyncFuncT ExcArgNoReturnAnyFuncT = ExcArgNoReturnFuncT | ExcArgNoReturnAsyncFuncT NoArgsNoReturnDecorator = Callable[[NoArgsNoReturnAnyFuncT], NoArgsNoReturnAsyncFuncT] async def _handle_repeat_func(func: NoArgsNoReturnAnyFuncT) -> None: if asyncio.iscoroutinefunction(func): await func() else: await run_in_threadpool(func) async def _handle_repeat_exc( exc: Exception, on_exception: ExcArgNoReturnAnyFuncT | None ) -> None: if on_exception: if asyncio.iscoroutinefunction(on_exception): await on_exception(exc) else: await run_in_threadpool(on_exception, exc) def repeat_every( *, seconds: float, wait_first: float | None = None, max_repetitions: int | None = None, on_complete: NoArgsNoReturnAnyFuncT | None = None, on_exception: ExcArgNoReturnAnyFuncT | None = None, ) -> NoArgsNoReturnDecorator: """ This function returns a decorator that modifies a function so it is periodically re-executed after its first call. The function it decorates should accept no arguments and return nothing. If necessary, this can be accomplished by using `functools.partial` or otherwise wrapping the target function prior to decoration. Parameters ---------- seconds: float The number of seconds to wait between repeated calls wait_first: float (default None) If not None, the function will wait for the given duration before the first call max_repetitions: Optional[int] (default None) The maximum number of times to call the repeated function. If `None`, the function is repeated forever. on_complete: Optional[Callable[[], None]] (default None) A function to call after the final repetition of the decorated function. on_exception: Optional[Callable[[Exception], None]] (default None) A function to call when an exception is raised by the decorated function. """ def decorator(func: NoArgsNoReturnAnyFuncT) -> NoArgsNoReturnAsyncFuncT: """ Converts the decorated function into a repeated, periodically-called version of itself. """ @wraps(func) async def wrapped() -> None: async def loop() -> None: if wait_first is not None: await asyncio.sleep(wait_first) repetitions = 0 while max_repetitions is None or repetitions < max_repetitions: try: await _handle_repeat_func(func) except Exception as exc: formatted_exception = "".join( format_exception(type(exc), exc, exc.__traceback__) ) logger.error(formatted_exception) await _handle_repeat_exc(exc, on_exception) repetitions += 1 await asyncio.sleep(seconds) if on_complete: await _handle_repeat_func(on_complete) asyncio.ensure_future(loop()) return wrapped return decorator def perishable_cache(expires: int, minimum: int): def decorator(func): cache_data = {} @wraps(func) async def wrapper(*args, retry: bool = False, **kwargs): timeout = minimum if retry else expires if "cached_time" in cache_data: if datetime.now(tz=timezone.utc) - cache_data["cached_time"] < timedelta( seconds=timeout ): return cache_data["cached_result"] result = await func(*args, retry=retry, **kwargs) cache_data["cached_result"] = result cache_data["cached_time"] = datetime.now(tz=timezone.utc) return result return wrapper return decorator def generate_random_alphanum(length: int = 20, prefix: str = "") -> str: return prefix + "".join(random.choices(ALPHA_NUM, k=length - len(prefix))) def generate_random_lowernum(length: int = 20, prefix: str = "") -> str: return prefix + "".join(random.choices(LOWER_NUM, k=length - len(prefix))) def generate_random_subdomain() -> str: return random.choice(TOP_SUBDOMAINS)