# pylint: disable=too-few-public-methods import builtins from datetime import datetime, timedelta from typing import Dict, List, Union, Optional from flask import current_app from pydantic import BaseModel, Field from sqlalchemy import or_ from tldextract import extract from app.extensions import db from app.models.base import Group, Pool from app.models.mirrors import Proxy class MMMirror(BaseModel): origin_domain: str = Field(description="The full origin domain name") origin_domain_normalized: str = Field(description="The origin_domain with \"www.\" removed, if present") origin_domain_root: str = Field(description="The registered domain name of the origin, excluding subdomains") valid_from: str = Field(description="The date on which the mirror was added to the system") valid_to: Optional[str] = Field(description="The date on which the mirror was decommissioned") countries: Dict[str, int] = Field(description="A list mapping of risk levels to country") country: Optional[str] = Field( description="The country code of the country with the highest risk level where the origin is targeted") risk: int = Field(description="The risk score for the highest risk country") class MirrorMapping(BaseModel): version: str = Field( description="Version number of the mirror mapping schema in use" ) mappings: Dict[str, MMMirror] = Field( description="The domain name for the mirror" ) s3_buckets: List[str] = Field( description="The names of all S3 buckets used for CloudFront logs" ) class Config: title = "Mirror Mapping Version 1.2" def mirror_mapping(_: Optional[Pool]) -> Dict[str, Union[str, Dict[str, str]]]: one_week_ago = datetime.utcnow() - timedelta(days=7) proxies = ( db.session.query(Proxy) # type: ignore[no-untyped-call] .filter(or_(Proxy.destroyed.is_(None), Proxy.destroyed > one_week_ago)) .filter(Proxy.url.is_not(None)) .all() ) result = {} for proxy in proxies: if proxy.origin.countries: # Check if there are any associated countries risk_levels = proxy.origin.risk_level.items() highest_risk_country = max(risk_levels, key=lambda x: x[1]) highest_risk_country_code = highest_risk_country[0] highest_risk_level = highest_risk_country[1] else: highest_risk_country_code = "ZZ" highest_risk_level = 0 result[proxy.url.lstrip("https://")] = MMMirror( origin_domain=proxy.origin.domain_name, origin_domain_normalized=proxy.origin.domain_name.replace("www.", ""), origin_domain_root=extract(proxy.origin.domain_name).registered_domain, valid_from=proxy.added.isoformat(), valid_to=proxy.destroyed.isoformat() if proxy.destroyed is not None else None, countries=proxy.origin.risk_level, country=highest_risk_country_code, risk=highest_risk_level ) return MirrorMapping( version="1.2", mappings=result, s3_buckets=[ f"{current_app.config['GLOBAL_NAMESPACE']}-{g.group_name.lower()}-logs-cloudfront" for g in Group.query.filter(Group.destroyed.is_(None)).all() ] ).dict() if getattr(builtins, "__sphinx_build__", False): schema = MirrorMapping.schema_json()