from __future__ import annotations import json from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Literal, Optional, TypedDict, Union import tldextract from sqlalchemy.orm import Mapped, mapped_column, relationship from tldextract import extract from werkzeug.datastructures import FileStorage from app.brm.brn import BRN from app.brm.utils import create_data_uri, normalize_color, thumbnail_uploaded_image from app.extensions import db from app.models import AbstractConfiguration, AbstractResource, Deprecation from app.models.base import Group, Pool from app.models.onions import Onion from app.models.types import AwareDateTime country_origin = db.Table( "country_origin", db.metadata, db.Column("country_id", db.ForeignKey("country.id"), primary_key=True), db.Column("origin_id", db.ForeignKey("origin.id"), primary_key=True), extend_existing=True, ) class OriginDict(TypedDict): Id: int Description: str DomainName: str RiskLevel: Dict[str, int] RiskLevelOverride: Optional[int] class Origin(AbstractConfiguration): group_id: Mapped[int] = mapped_column(db.Integer, db.ForeignKey("group.id")) domain_name: Mapped[str] = mapped_column(unique=True) auto_rotation: Mapped[bool] smart: Mapped[bool] assets: Mapped[bool] risk_level_override: Mapped[Optional[int]] group: Mapped[Group] = relationship("Group", back_populates="origins") proxies: Mapped[List[Proxy]] = relationship("Proxy", back_populates="origin") countries: Mapped[List[Country]] = relationship( "Country", secondary=country_origin, back_populates="origins" ) @property def brn(self) -> BRN: return BRN( group_id=self.group_id, product="mirror", provider="conf", resource_type="origin", resource_id=self.domain_name, ) @classmethod def csv_header(cls) -> List[str]: return super().csv_header() + [ "group_id", "domain_name", "auto_rotation", "smart", "assets", "country", ] def destroy(self) -> None: super().destroy() for proxy in self.proxies: proxy.destroy() @property def normalised_domain_name(self) -> str: extracted_domain = tldextract.extract(self.domain_name) return extracted_domain.registered_domain def onion(self) -> Optional[str]: tld = extract(self.domain_name).registered_domain onion = Onion.query.filter(Onion.domain_name == tld).first() if not onion: return None domain_name: str = self.domain_name return f"https://{domain_name.replace(tld, onion.onion_name)}.onion" @property def risk_level(self) -> Dict[str, int]: if self.risk_level_override: return { country.country_code: self.risk_level_override for country in self.countries } frequency_factor = 0.0 recency_factor = 0.0 recent_deprecations = ( db.session.query(Deprecation) .join(Proxy, Deprecation.resource_id == Proxy.id) .join(Origin, Origin.id == Proxy.origin_id) .filter( Origin.id == self.id, Deprecation.resource_type == "Proxy", Deprecation.deprecated_at >= datetime.now(tz=timezone.utc) - timedelta(hours=168), Deprecation.reason != "destroyed", ) .distinct(Proxy.id) .all() ) for deprecation in recent_deprecations: recency_factor += 1 / max( ( datetime.now(tz=timezone.utc) - deprecation.deprecated_at ).total_seconds() // 3600, 1, ) frequency_factor += 1 risk_levels: Dict[str, int] = {} for country in self.countries: risk_levels[country.country_code.upper()] = ( int(max(1, min(10, frequency_factor * recency_factor))) + country.risk_level ) return risk_levels def to_dict(self) -> OriginDict: return { "Id": self.id, "Description": self.description, "DomainName": self.domain_name, "RiskLevel": self.risk_level, "RiskLevelOverride": self.risk_level_override, } class Country(AbstractConfiguration): @property def brn(self) -> BRN: return BRN( group_id=0, product="country", provider="iso3166-1", resource_type="alpha2", resource_id=self.country_code, ) country_code: Mapped[str] risk_level_override: Mapped[Optional[int]] origins = db.relationship( "Origin", secondary=country_origin, back_populates="countries" ) @property def risk_level(self) -> int: if self.risk_level_override: return int(self.risk_level_override // 2) frequency_factor = 0.0 recency_factor = 0.0 recent_deprecations = ( db.session.query(Deprecation) .join(Proxy, Deprecation.resource_id == Proxy.id) .join(Origin, Origin.id == Proxy.origin_id) .join(Origin.countries) .filter( Country.id == self.id, Deprecation.resource_type == "Proxy", Deprecation.deprecated_at >= datetime.now(tz=timezone.utc) - timedelta(hours=168), Deprecation.reason != "destroyed", ) .distinct(Proxy.id) .all() ) for deprecation in recent_deprecations: recency_factor += 1 / max( ( datetime.now(tz=timezone.utc) - deprecation.deprecated_at ).total_seconds() // 3600, 1, ) frequency_factor += 1 return int(max(1, min(10, frequency_factor * recency_factor))) class StaticOrigin(AbstractConfiguration): group_id = mapped_column(db.Integer, db.ForeignKey("group.id"), nullable=False) storage_cloud_account_id = mapped_column( db.Integer(), db.ForeignKey("cloud_account.id"), nullable=False ) source_cloud_account_id = mapped_column( db.Integer(), db.ForeignKey("cloud_account.id"), nullable=False ) source_project = mapped_column(db.String(255), nullable=False) auto_rotate = mapped_column(db.Boolean, nullable=False) matrix_homeserver = mapped_column(db.String(255), nullable=True) keanu_convene_path = mapped_column(db.String(255), nullable=True) keanu_convene_config = mapped_column(db.String(), nullable=True) clean_insights_backend = mapped_column(db.String(255), nullable=True) origin_domain_name = mapped_column(db.String(255), nullable=True) @property def brn(self) -> BRN: return BRN( group_id=self.group_id, product="mirror", provider="aws", resource_type="static", resource_id=self.domain_name, ) group = db.relationship("Group", back_populates="statics") storage_cloud_account = db.relationship( "CloudAccount", back_populates="statics", foreign_keys=[storage_cloud_account_id], ) source_cloud_account = db.relationship( "CloudAccount", back_populates="statics", foreign_keys=[source_cloud_account_id] ) def destroy(self) -> None: # TODO: The StaticMetaAutomation will clean up for now, but it should probably happen here for consistency super().destroy() def update( self, source_project: str, description: str, auto_rotate: bool, matrix_homeserver: Optional[str], keanu_convene_path: Optional[str], keanu_convene_logo: Optional[FileStorage], keanu_convene_color: Optional[str], clean_insights_backend: Optional[Union[str, bool]], db_session_commit: bool, ) -> None: if isinstance(source_project, str): self.source_project = source_project else: raise ValueError("source project must be a str") if isinstance(description, str): self.description = description else: raise ValueError("description must be a str") if isinstance(auto_rotate, bool): self.auto_rotate = auto_rotate else: raise ValueError("auto_rotate must be a bool") if isinstance(matrix_homeserver, str): self.matrix_homeserver = matrix_homeserver else: raise ValueError("matrix_homeserver must be a str") if isinstance(keanu_convene_path, str): self.keanu_convene_path = keanu_convene_path else: raise ValueError("keanu_convene_path must be a str") if self.keanu_convene_config is None: self.keanu_convene_config = "{}" keanu_convene_config: Dict[str, Any] = json.loads(self.keanu_convene_config) if keanu_convene_logo is None: pass elif isinstance(keanu_convene_logo, FileStorage): if keanu_convene_logo.filename: # if False, no file was uploaded keanu_convene_config["logo"] = create_data_uri( thumbnail_uploaded_image(keanu_convene_logo), keanu_convene_logo.filename, ) else: raise ValueError("keanu_convene_logo must be a FileStorage") try: if isinstance(keanu_convene_color, str): keanu_convene_config["color"] = normalize_color( keanu_convene_color ) # can raise ValueError else: raise ValueError() # re-raised below with message except ValueError: raise ValueError( "keanu_convene_path must be a str containing an HTML color (CSS name or hex)" ) self.keanu_convene_config = json.dumps( keanu_convene_config, separators=(",", ":") ) del keanu_convene_config # done with this temporary variable if clean_insights_backend is None or ( isinstance(clean_insights_backend, bool) and not clean_insights_backend ): self.clean_insights_backend = None elif isinstance(clean_insights_backend, bool) and clean_insights_backend: self.clean_insights_backend = "metrics.cleaninsights.org" elif isinstance(clean_insights_backend, str): self.clean_insights_backend = clean_insights_backend else: raise ValueError("clean_insights_backend must be a str, bool, or None") if db_session_commit: db.session.commit() self.updated = datetime.now(tz=timezone.utc) ResourceStatus = Union[ Literal["active"], Literal["pending"], Literal["expiring"], Literal["destroyed"] ] class ProxyDict(TypedDict): Id: int OriginDomain: str MirrorDomain: Optional[str] Status: ResourceStatus class Proxy(AbstractResource): origin_id: Mapped[int] = mapped_column( db.Integer, db.ForeignKey("origin.id"), nullable=False ) pool_id: Mapped[Optional[int]] = mapped_column(db.Integer, db.ForeignKey("pool.id")) provider: Mapped[str] = mapped_column(db.String(20), nullable=False) psg: Mapped[Optional[int]] = mapped_column(db.Integer, nullable=True) slug: Mapped[Optional[str]] = mapped_column(db.String(20), nullable=True) terraform_updated: Mapped[Optional[datetime]] = mapped_column( AwareDateTime(), nullable=True ) url: Mapped[Optional[str]] = mapped_column(db.String(255), nullable=True) origin: Mapped[Origin] = relationship("Origin", back_populates="proxies") pool: Mapped[Pool] = relationship("Pool", back_populates="proxies") @property def brn(self) -> BRN: return BRN( group_id=self.origin.group_id, product="mirror", provider=self.provider, resource_type="proxy", resource_id=str(self.id), ) @classmethod def csv_header(cls) -> List[str]: return super().csv_header() + [ "origin_id", "provider", "psg", "slug", "terraform_updated", "url", ] def to_dict(self) -> ProxyDict: status: ResourceStatus = "active" if self.url is None: status = "pending" if self.deprecated is not None: status = "expiring" if self.destroyed is not None: status = "destroyed" return { "Id": self.id, "OriginDomain": self.origin.domain_name, "MirrorDomain": self.url.replace("https://", "") if self.url else None, "Status": status, } class SmartProxy(AbstractResource): group_id = mapped_column(db.Integer(), db.ForeignKey("group.id"), nullable=False) instance_id = mapped_column(db.String(100), nullable=True) provider = mapped_column(db.String(20), nullable=False) region = mapped_column(db.String(20), nullable=False) group = db.relationship("Group", back_populates="smart_proxies") @property def brn(self) -> BRN: return BRN( group_id=self.group_id, product="mirror", provider=self.provider, resource_type="smart_proxy", resource_id=str(1), )