majuna/app/models/mirrors.py
2024-02-19 12:15:54 +00:00

280 lines
11 KiB
Python

import json
from datetime import datetime, timedelta
from typing import Optional, List, Union, Any, Dict
from tldextract import extract
from werkzeug.datastructures import FileStorage
from app.brm.brn import BRN
from app.brm.utils import thumbnail_uploaded_image, create_data_uri, normalize_color
from app.extensions import db
from app.models import AbstractConfiguration, AbstractResource, Deprecation
from app.models.onions import Onion
country_origin = db.Table(
'country_origin',
db.metadata,
db.Column('country_id', db.ForeignKey('country.id'), primary_key=True),
db.Column('origin_id', db.ForeignKey('origin.id'), primary_key=True),
extend_existing=True,
)
class Origin(AbstractConfiguration):
group_id = db.Column(db.Integer, db.ForeignKey("group.id"), nullable=False)
domain_name = db.Column(db.String(255), unique=True, nullable=False)
auto_rotation = db.Column(db.Boolean, nullable=False)
smart = db.Column(db.Boolean(), nullable=False)
assets = db.Column(db.Boolean(), nullable=False)
risk_level_override = db.Column(db.Integer(), nullable=True)
group = db.relationship("Group", back_populates="origins")
proxies = db.relationship("Proxy", back_populates="origin")
countries = db.relationship("Country", secondary=country_origin, back_populates='origins')
@property
def brn(self) -> BRN:
return BRN(
group_id=self.group_id,
product="mirror",
provider="conf",
resource_type="origin",
resource_id=self.domain_name
)
@classmethod
def csv_header(cls) -> List[str]:
return super().csv_header() + [
"group_id", "domain_name", "auto_rotation", "smart", "assets", "country"
]
def destroy(self) -> None:
super().destroy()
for proxy in self.proxies:
proxy.destroy()
@property
def normalised_domain_name(self):
return self.domain_name.replace("www.", "")
def onion(self) -> Optional[str]:
tld = extract(self.domain_name).registered_domain
onion = Onion.query.filter(Onion.domain_name == tld).first()
if not onion:
return None
domain_name: str = self.domain_name
return f"https://{domain_name.replace(tld, onion.onion_name)}.onion"
@property
def risk_level(self) -> Dict[str, int]:
if self.risk_level_override:
return {country.country_code: self.risk_level_override for country in self.countries}
frequency_factor = 0
recency_factor = 0
recent_deprecations = (
db.session.query(Deprecation) # type: ignore[no-untyped-call]
.join(Proxy,
Deprecation.resource_id == Proxy.id)
.join(Origin, Origin.id == Proxy.origin_id)
.filter(
Origin.id == self.id,
Deprecation.resource_type == 'Proxy',
Deprecation.deprecated_at >= datetime.utcnow() - timedelta(hours=168)
)
.distinct(Proxy.id)
.all()
)
for deprecation in recent_deprecations:
recency_factor += 1 / max((datetime.utcnow() - deprecation.deprecated_at).total_seconds() // 3600, 1)
frequency_factor += 1
risk_levels: Dict[str, int] = {}
for country in self.countries:
risk_levels[country.country_code.upper()] = int(max(1, min(10, frequency_factor * recency_factor))) + country.risk_level
return risk_levels
class Country(AbstractConfiguration):
@property
def brn(self) -> BRN:
return BRN(
group_id=0,
product="country",
provider="iso3166-1",
resource_type="alpha2",
resource_id=self.country_code
)
country_code = db.Column(db.String(2), nullable=False)
risk_level_override = db.Column(db.Integer(), nullable=True)
origins = db.relationship("Origin", secondary=country_origin, back_populates='countries')
@property
def risk_level(self) -> int:
if self.risk_level_override:
return int(self.risk_level_override // 2)
frequency_factor = 0
recency_factor = 0
recent_deprecations = (
db.session.query(Deprecation) # type: ignore[no-untyped-call]
.join(Proxy,
Deprecation.resource_id == Proxy.id)
.join(Origin, Origin.id == Proxy.origin_id)
.join(Origin.countries)
.filter(
Country.id == self.id,
Deprecation.resource_type == 'Proxy',
Deprecation.deprecated_at >= datetime.utcnow() - timedelta(hours=168)
)
.distinct(Proxy.id)
.all()
)
for deprecation in recent_deprecations:
recency_factor += 1 / max((datetime.utcnow() - deprecation.deprecated_at).total_seconds() // 3600, 1)
frequency_factor += 1
return int(max(1, min(10, frequency_factor * recency_factor)))
class StaticOrigin(AbstractConfiguration):
group_id = db.Column(db.Integer, db.ForeignKey("group.id"), nullable=False)
storage_cloud_account_id = db.Column(db.Integer(), db.ForeignKey("cloud_account.id"), nullable=False)
source_cloud_account_id = db.Column(db.Integer(), db.ForeignKey("cloud_account.id"), nullable=False)
source_project = db.Column(db.String(255), nullable=False)
auto_rotate = db.Column(db.Boolean, nullable=False)
matrix_homeserver = db.Column(db.String(255), nullable=True)
keanu_convene_path = db.Column(db.String(255), nullable=True)
keanu_convene_config = db.Column(db.String(), nullable=True)
clean_insights_backend = db.Column(db.String(255), nullable=True)
origin_domain_name = db.Column(db.String(255), nullable=True)
@property
def brn(self) -> BRN:
return BRN(
group_id=self.group_id,
product="mirror",
provider="aws",
resource_type="static",
resource_id=self.domain_name
)
group = db.relationship("Group", back_populates="statics")
storage_cloud_account = db.relationship("CloudAccount", back_populates="statics",
foreign_keys=[storage_cloud_account_id])
source_cloud_account = db.relationship("CloudAccount", back_populates="statics",
foreign_keys=[source_cloud_account_id])
def destroy(self) -> None:
# TODO: The StaticMetaAutomation will clean up for now, but it should probably happen here for consistency
super().destroy()
def update(
self,
source_project: str,
description: str,
auto_rotate: bool,
matrix_homeserver: Optional[str],
keanu_convene_path: Optional[str],
keanu_convene_logo: Optional[FileStorage],
keanu_convene_color: Optional[str],
clean_insights_backend: Optional[Union[str, bool]],
db_session_commit: bool,
) -> None:
if isinstance(source_project, str):
self.source_project = source_project
else:
raise ValueError("source project must be a str")
if isinstance(description, str):
self.description = description
else:
raise ValueError("description must be a str")
if isinstance(auto_rotate, bool):
self.auto_rotate = auto_rotate
else:
raise ValueError("auto_rotate must be a bool")
if isinstance(matrix_homeserver, str):
self.matrix_homeserver = matrix_homeserver
else:
raise ValueError("matrix_homeserver must be a str")
if isinstance(keanu_convene_path, str):
self.keanu_convene_path = keanu_convene_path
else:
raise ValueError("keanu_convene_path must be a str")
if self.keanu_convene_config is None:
self.keanu_convene_config = "{}"
keanu_convene_config: Dict[str, Any] = json.loads(self.keanu_convene_config)
if keanu_convene_logo is None:
pass
elif isinstance(keanu_convene_logo, FileStorage):
if keanu_convene_logo.filename: # if False, no file was uploaded
keanu_convene_config["logo"] = create_data_uri(
thumbnail_uploaded_image(keanu_convene_logo), keanu_convene_logo.filename)
else:
raise ValueError("keanu_convene_logo must be a FileStorage")
try:
if isinstance(keanu_convene_color, str):
keanu_convene_config["color"] = normalize_color(keanu_convene_color) # can raise ValueError
else:
raise ValueError() # re-raised below with message
except ValueError:
raise ValueError("keanu_convene_path must be a str containing an HTML color (CSS name or hex)")
self.keanu_convene_config = json.dumps(keanu_convene_config, separators=(',', ':'))
del keanu_convene_config # done with this temporary variable
if clean_insights_backend is None or (isinstance(clean_insights_backend, bool) and not clean_insights_backend):
self.clean_insights_backend = None
elif isinstance(clean_insights_backend, bool) and clean_insights_backend:
self.clean_insights_backend = "metrics.cleaninsights.org"
elif isinstance(clean_insights_backend, str):
self.clean_insights_backend = clean_insights_backend
else:
raise ValueError("clean_insights_backend must be a str, bool, or None")
if db_session_commit:
db.session.commit()
self.updated = datetime.utcnow()
class Proxy(AbstractResource):
origin_id = db.Column(db.Integer, db.ForeignKey("origin.id"), nullable=False)
pool_id = db.Column(db.Integer, db.ForeignKey("pool.id"))
provider = db.Column(db.String(20), nullable=False)
psg = db.Column(db.Integer, nullable=True)
slug = db.Column(db.String(20), nullable=True)
terraform_updated = db.Column(db.DateTime(), nullable=True)
url = db.Column(db.String(255), nullable=True)
origin = db.relationship("Origin", back_populates="proxies")
pool = db.relationship("Pool", back_populates="proxies")
@property
def brn(self) -> BRN:
return BRN(
group_id=self.origin.group_id,
product="mirror",
provider=self.provider,
resource_type="proxy",
resource_id=str(self.id)
)
@classmethod
def csv_header(cls) -> List[str]:
return super().csv_header() + [
"origin_id", "provider", "psg", "slug", "terraform_updated", "url"
]
class SmartProxy(AbstractResource):
group_id = db.Column(db.Integer(), db.ForeignKey("group.id"), nullable=False)
instance_id = db.Column(db.String(100), nullable=True)
provider = db.Column(db.String(20), nullable=False)
region = db.Column(db.String(20), nullable=False)
group = db.relationship("Group", back_populates="smart_proxies")
@property
def brn(self) -> BRN:
return BRN(
group_id=self.group_id,
product="mirror",
provider=self.provider,
resource_type="smart_proxy",
resource_id=str(1)
)