From 8115966acaa3d208b5372b423654236c79e30839 Mon Sep 17 00:00:00 2001 From: Iain Learmonth Date: Tue, 30 May 2023 16:38:00 +0100 Subject: [PATCH] feat(proxy): rewrite the meta module to support hot spares --- app/terraform/proxy/__init__.py | 25 +- app/terraform/proxy/meta.py | 324 ++++++++++++++---- .../278bcfb487d3_add_hot_spare_pool.py | 29 ++ tests/proxy/test_proxy_automation.py | 48 ++- 4 files changed, 327 insertions(+), 99 deletions(-) create mode 100644 migrations/versions/278bcfb487d3_add_hot_spare_pool.py diff --git a/app/terraform/proxy/__init__.py b/app/terraform/proxy/__init__.py index 10d6535..9781ee7 100644 --- a/app/terraform/proxy/__init__.py +++ b/app/terraform/proxy/__init__.py @@ -93,7 +93,8 @@ class ProxyAutomation(TerraformAutomation): self.template, groups=groups, proxies=Proxy.query.filter( - Proxy.provider == self.provider, Proxy.destroyed.is_(None)).all(), subgroups=self.get_subgroups(), + Proxy.provider == self.provider, Proxy.destroyed.is_(None)).all(), + subgroups=self.get_subgroups(), global_namespace=app.config['GLOBAL_NAMESPACE'], bypass_token=app.config['BYPASS_TOKEN'], terraform_modules_path=os.path.join(*list(os.path.split(app.root_path))[:-1], 'terraform-modules'), backend_config=f"""backend "http" {{ @@ -157,25 +158,3 @@ class ProxyAutomation(TerraformAutomation): for row in result: subgroups[row[0]][row[1]] = row[2] return subgroups - - @classmethod - def next_subgroup(cls, group_id: int) -> Optional[int]: - provider = cls() # Some attributes are set by constructor - conn = db.engine.connect() - stmt = text(""" - SELECT proxy.psg, COUNT(proxy.id) FROM proxy, origin - WHERE proxy.origin_id = origin.id - AND proxy.destroyed IS NULL - AND origin.group_id = :group_id - AND proxy.provider = :provider - GROUP BY proxy.psg ORDER BY proxy.psg; - """) - stmt = stmt.bindparams(provider=provider.short_name, group_id=group_id) - result = conn.execute(stmt).all() - subgroups = { - row[0]: row[1] for row in result - } - for subgroup in range(0, provider.subgroup_count_max): - if subgroups.get(subgroup, 0) < provider.subgroup_members_max: - return subgroup - return None diff --git a/app/terraform/proxy/meta.py b/app/terraform/proxy/meta.py index a9aebf5..c80b24a 100644 --- a/app/terraform/proxy/meta.py +++ b/app/terraform/proxy/meta.py @@ -2,7 +2,9 @@ import datetime import logging import random import string -from typing import Tuple, List +from collections import OrderedDict +from typing import Any, Dict, List, Optional, Tuple, Type +from typing import OrderedDict as OrderedDictT from flask import current_app from tldextract import tldextract @@ -11,56 +13,177 @@ from app import db from app.models.base import Pool from app.models.mirrors import Proxy, Origin from app.terraform import BaseAutomation +from app.terraform.proxy import ProxyAutomation from app.terraform.proxy.azure_cdn import ProxyAzureCdnAutomation from app.terraform.proxy.cloudfront import ProxyCloudfrontAutomation from app.terraform.proxy.fastly import ProxyFastlyAutomation -PROXY_PROVIDERS = {p.provider: p for p in [ # type: ignore[attr-defined] +PROXY_PROVIDERS: Dict[str, Type[ProxyAutomation]] = {p.provider: p for p in [ # type: ignore[attr-defined] # In order of preference ProxyCloudfrontAutomation, ProxyFastlyAutomation, ProxyAzureCdnAutomation ] if p.enabled} # type: ignore[attr-defined] +SubgroupCount = OrderedDictT[str, OrderedDictT[int, OrderedDictT[int, int]]] -def create_proxy(pool: Pool, origin: Origin) -> bool: + +def all_active_proxies() -> List[Proxy]: """ - Creates a web proxy resource for the given origin and pool combination. + Retrieve all active proxies from the database. - Initially it will attempt to create smart proxies on providers that support smart proxies, - and "simple" proxies on other providers. If other providers have exhausted their quota - already then a "simple" proxy may be created on a platform that supports smart proxies. + This function returns a list of all `Proxy` instances that are currently active. + An active proxy is defined as a proxy that is not deprecated and not destroyed. - A boolean is returned to indicate whether a proxy resource was created. - - :param pool: pool to create the resource for - :param origin: origin to create the resource for - :return: whether a proxy resource was created + :return: A list of all active Proxy instances. """ - for desperate in [False, True]: - for provider in PROXY_PROVIDERS.values(): - if origin.smart and not provider.smart_proxies: # type: ignore[attr-defined] - continue # This origin cannot be supported on this provider - if provider.smart_proxies and not (desperate or origin.smart): # type: ignore[attr-defined] - continue - next_subgroup = provider.next_subgroup(origin.group_id) # type: ignore[attr-defined] - if next_subgroup is None: - continue # Exceeded maximum number of subgroups and last subgroup is full - proxy = Proxy() - proxy.pool_id = pool.id - proxy.origin_id = origin.id - proxy.provider = provider.provider # type: ignore[attr-defined] - proxy.psg = next_subgroup - # The random usage below is good enough for its purpose: to create a slug that - # hasn't been used recently. - proxy.slug = tldextract.extract(origin.domain_name).domain[:5] + ''.join( - random.choices(string.ascii_lowercase, k=12)) # nosec - proxy.added = datetime.datetime.utcnow() - proxy.updated = datetime.datetime.utcnow() - logging.debug("Creating proxy %s", proxy) - db.session.add(proxy) - return True - return False + result: List[Proxy] = Proxy.query.filter( + Proxy.deprecated.is_(None), + Proxy.destroyed.is_(None), + ).all() + return result + + +def random_slug(origin_domain_name: str) -> str: + """ + Generate a random slug consisting of a prefix extracted from a domain name and a series of random lower case + letters. + + The function extracts the domain from the given `origin_domain_name`, trims it to the first 5 characters, + and appends 12 random lower case letters. + + :param origin_domain_name: The domain name to extract the prefix from. + :return: The generated random slug. + + :Example: + + >>> random_slug("example.com") + "exampasdfghjkl" + """ + # The random slug doesn't need to be cryptographically secure, hence the use of `# nosec` + return tldextract.extract(origin_domain_name).domain[:5] + ''.join( + random.choices(string.ascii_lowercase, k=12)) # nosec + + +def calculate_subgroup_count(proxies: Optional[List[Proxy]] = None) -> SubgroupCount: + """ + Calculate the count of each subgroup within each group for each provider. + + The function loops through the list of Proxy objects and creates a nested dictionary structure. + The keys of the outermost dictionary are the providers. + The values are dictionaries where the keys are the group IDs and the values are + dictionaries where the keys are subgroups and the values are their counts. + + :param proxies: A list of Proxy objects. If None, the calculation will be performed on all active proxies. + :return: A nested dictionary representing the count of each subgroup within each group for each provider. + """ + if proxies is None: + proxies = all_active_proxies() + subgroup_count: SubgroupCount = OrderedDict() + for proxy in proxies: + if proxy.provider not in subgroup_count: + subgroup_count[proxy.provider] = OrderedDict() + if proxy.origin.group_id not in subgroup_count[proxy.provider]: + subgroup_count[proxy.provider][proxy.origin.group_id] = OrderedDict() + if proxy.psg not in subgroup_count[proxy.provider][proxy.origin.group_id]: + subgroup_count[proxy.provider][proxy.origin.group_id][proxy.psg] = 1 + else: + subgroup_count[proxy.provider][proxy.origin.group_id][proxy.psg] += 1 + return subgroup_count + + +def next_subgroup(subgroup_count: SubgroupCount, provider: str, group_id: int, max_subgroup_count: int, + max_subgroup_members: int) -> Optional[int]: + """ + Find the first available subgroup with less than the specified maximum count in the specified provider and group. + If the last subgroup in the group is full, return the next subgroup number as long as it doesn't exceed + `max_subgroup`. + + The function traverses the `subgroup_count` dictionary for the given provider and group in the order of subgroup. + It returns the first subgroup found with a count less than `max_count`. + + :param subgroup_count: A nested dictionary representing the count of each subgroup within each group for each + provider. + :param provider: The provider to find the next subgroup in. + :param group_id: The group to find the next subgroup in. + :param max_subgroup_count: The maximum allowable subgroup number. + :param max_subgroup_members: The maximum count a subgroup should have to be considered available. + :return: The subgroup of the first available subgroup within the specified provider and group. + If no available subgroup is found and max_subgroup is not exceeded, returns the next subgroup number. + If no subgroup is available and max_subgroup is exceeded, returns None. + """ + if provider in subgroup_count and group_id in subgroup_count[provider]: + subgroups = subgroup_count[provider][group_id] + for subgroup in range(1, max_subgroup_count + 1): + if subgroup not in subgroups or subgroups[subgroup] < max_subgroup_members: + return subgroup + return None + return 1 + + +def auto_deprecate_proxies() -> None: + """ + Automatically deprecate proxies based on certain conditions. + + This function deprecates proxies under two conditions: + 1. The origin of the proxy has been destroyed. + 2. The proxy belongs to a list of origins due for daily replacement and has reached its max age. + + .. note:: + - The "origin_destroyed" reason means the origin of the proxy has been destroyed. + - The "max_age_reached" reason means the proxy has been in use for longer than the maximum allowed period. + The maximum age cutoff is randomly set to a time between 24 and 48 hours. + """ + proxies: List[Proxy] = all_active_proxies() + for proxy in proxies: + if proxy.origin.destroyed is not None: + proxy.deprecate(reason="origin_destroyed") + if proxy.origin_id in current_app.config.get("DAILY_REPLACEMENT_ORIGINS", []): + max_age_cutoff = datetime.datetime.utcnow() - datetime.timedelta( + days=1, seconds=86400 * random.random()) # nosec: B311 + if proxy.added < max_age_cutoff: + proxy.deprecate(reason="max_age_reached") + + +def destroy_expired_proxies() -> None: + """ + Destroy proxies that have been deprecated for a certain period of time. + + This function finds all proxies that are not already destroyed and have been deprecated for more than 4 days. + It then destroys these proxies. + """ + expiry_cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=4) + proxies = Proxy.query.filter( + Proxy.destroyed.is_(None), + Proxy.deprecated < expiry_cutoff + ).all() + for proxy in proxies: + logging.debug("Destroying expired proxy") + proxy.destroy() + + +def promote_hot_spare_proxy(pool_id: int, origin: Origin) -> bool: + """ + Promote a 'hot spare' proxy to a specified pool from the reserve pool. + + This function searches for a 'hot spare' proxy (a proxy in reserve pool with pool_id == -1) + for the given origin. If a proxy is found, it is promoted to the specified pool by changing its pool ID. + + :param pool_id: The pool to which the 'hot spare' proxy is to be promoted. + :param origin: The origin of the 'hot spare' proxy to be promoted. + :return: True if a 'hot spare' proxy was found and promoted, False otherwise. + + .. note:: In the database, the pool ID -1 signifies a reserve pool of 'hot spare' proxies. This pool does not + appear in the pool table and is only used as a sentinel value. + """ + proxy = Proxy.query.filter( + Proxy.pool_id == -1, + Proxy.origin_id == origin.id, + ).first() + if not proxy: + return False + proxy.pool_id = pool_id + return True class ProxyMetaAutomation(BaseAutomation): @@ -68,24 +191,29 @@ class ProxyMetaAutomation(BaseAutomation): description = "Housekeeping for proxies" frequency = 1 + subgroup_count: SubgroupCount + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.subgroup_count = calculate_subgroup_count() + def automate(self, full: bool = False) -> Tuple[bool, str]: # Deprecate orphaned proxies, old proxies and mismatched proxies - proxies: List[Proxy] = Proxy.query.filter( - Proxy.deprecated.is_(None), - Proxy.destroyed.is_(None), - ).all() - for proxy in proxies: - if proxy.origin.destroyed is not None: - proxy.deprecate(reason="origin_destroyed") - if proxy.origin_id in current_app.config.get("DAILY_REPLACEMENT_ORIGINS", []): - max_age_cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=1, seconds=86400 * random.random()) # nosec: B311 - else: - max_age_cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=5, seconds=86400 * random.random()) # nosec: B311 - if proxy.added < max_age_cutoff: - proxy.deprecate(reason="max_age_reached") - if proxy.origin.smart and not PROXY_PROVIDERS[proxy.provider].smart_proxies: # type: ignore[attr-defined] - proxy.deprecate(reason="not_smart_enough") - # Create new proxies + auto_deprecate_proxies() + destroy_expired_proxies() + self.handle_missing_proxies() + self.create_hot_spare_proxies() + db.session.commit() + return True, "" + + def handle_missing_proxies(self) -> None: + """ + Create new proxies for origins that lack active proxies in a pool. + + This function iterates over all pools, groups in each pool, and origins in each group. + If an origin is not destroyed and lacks active (not deprecated and not destroyed) proxies in a pool, + a new proxy for the origin in the pool is created. + """ pools = Pool.query.all() for pool in pools: for group in pool.groups: @@ -98,15 +226,85 @@ class ProxyMetaAutomation(BaseAutomation): ] if not proxies: logging.debug("Creating new proxy for %s in pool %s", origin, pool) - create_proxy(pool, origin) - # Destroy expired proxies - expiry_cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=4) - proxies = Proxy.query.filter( - Proxy.destroyed.is_(None), - Proxy.deprecated < expiry_cutoff + if not promote_hot_spare_proxy(pool.id, origin): + # No "hot spare" available + self.create_proxy(pool.id, origin) + + def create_proxy(self, pool_id: int, origin: Origin) -> bool: + """ + Creates a web proxy resource for the given origin and pool combination. + + Initially it will attempt to create smart proxies on providers that support smart proxies, + and "simple" proxies on other providers. If other providers have exhausted their quota + already then a "simple" proxy may be created on a platform that supports smart proxies. + + A boolean is returned to indicate whether a proxy resource was created. + + :param pool_id: pool to create the resource for + :param origin: origin to create the resource for + :return: whether a proxy resource was created + """ + for provider in PROXY_PROVIDERS.values(): + logging.debug("Looking at provider %s", provider.provider) + subgroup = next_subgroup(self.subgroup_count, provider.provider, origin.group_id, + provider.subgroup_members_max, provider.subgroup_count_max) + if subgroup is None: + continue # Exceeded maximum number of subgroups and last subgroup is full + self.increment_subgroup(provider.provider, origin.group_id, subgroup) + proxy = Proxy() + proxy.pool_id = pool_id + proxy.origin_id = origin.id + proxy.provider = provider.provider + proxy.psg = subgroup + # The random usage below is good enough for its purpose: to create a slug that + # hasn't been used recently. + proxy.slug = random_slug(origin.domain_name) + proxy.added = datetime.datetime.utcnow() + proxy.updated = datetime.datetime.utcnow() + logging.debug("Creating proxy %s", proxy) + db.session.add(proxy) + return True + return False + + def increment_subgroup(self, provider: str, group_id: int, psg: int) -> None: + """ + Increment the count of a specific subgroup within a group for a specific provider. + + This function mutates the `subgroup_count` dictionary by incrementing the count of the specified subgroup. + If the provider, group, or subgroup does not exist in `subgroup_count`, they are created. + + :param provider: The provider to increment the subgroup count for. + :param group_id: The group to increment the subgroup count for. + :param psg: The subgroup to increment the count of. + """ + if provider not in self.subgroup_count: + self.subgroup_count[provider] = OrderedDict() + if group_id not in self.subgroup_count[provider]: + self.subgroup_count[provider][group_id] = OrderedDict() + if psg not in self.subgroup_count[provider][group_id]: + self.subgroup_count[provider][group_id][psg] = 0 + self.subgroup_count[provider][group_id][psg] += 1 + + def create_hot_spare_proxies(self) -> None: + """ + Create 'hot spare' proxies for origins that lack active proxies. + + This function iterates over all groups and their origins. + If an origin is not destroyed and lacks active proxies (not deprecated and not destroyed), + a new 'hot spare' proxy for this origin is created in the reserve pool (with pool_id = -1). + """ + origins = Origin.query.filter( + Origin.destroyed.is_(None) ).all() - for proxy in proxies: - logging.debug("Destroying expired proxy") - proxy.destroy() - db.session.commit() - return True, "" + for origin in origins: + if origin.destroyed is not None: + continue + proxies = Proxy.query.filter( + Proxy.pool_id == -1, + Proxy.origin_id == origin.id, + Proxy.deprecated.is_(None), + Proxy.destroyed.is_(None), + ).all() + if not proxies: + logging.debug("Creating new hot spare proxy for origin %s", origin) + self.create_proxy(-1, origin) # Creating proxy in reserve pool diff --git a/migrations/versions/278bcfb487d3_add_hot_spare_pool.py b/migrations/versions/278bcfb487d3_add_hot_spare_pool.py new file mode 100644 index 0000000..81d4729 --- /dev/null +++ b/migrations/versions/278bcfb487d3_add_hot_spare_pool.py @@ -0,0 +1,29 @@ +"""Add hot spare pool + +Revision ID: 278bcfb487d3 +Revises: 2d747ffb9928 +Create Date: 2023-05-30 16:08:37.770371 + +""" +from alembic import op +from sqlalchemy.sql import text + +# revision identifiers, used by Alembic. +revision = '278bcfb487d3' +down_revision = '2d747ffb9928' +branch_labels = None +depends_on = None + + +def upgrade(): + # Add SQL here + sql = text("""INSERT INTO pool VALUES ( + -1, 'Hot spares (reserve)', NOW(), NOW(), NULL, 'hotspare', + md5(to_char(NOW(), 'YYYY-MM-DD HH24:MI:SS.US')), NULL) ON CONFLICT (id) DO NOTHING;""") + op.execute(sql) + + +def downgrade(): + # SQL to undo the changes + sql = text("""DELETE FROM pool WHERE id = -1;""") + op.execute(sql) diff --git a/tests/proxy/test_proxy_automation.py b/tests/proxy/test_proxy_automation.py index 9fa1efc..a1ceadc 100644 --- a/tests/proxy/test_proxy_automation.py +++ b/tests/proxy/test_proxy_automation.py @@ -1,7 +1,11 @@ import unittest from unittest.mock import MagicMock, patch -from app.terraform.proxy import ProxyAutomation, update_smart_proxy_instance + +import pytest + from app import app +from app.terraform.proxy import update_smart_proxy_instance +from app.terraform.proxy.meta import random_slug, next_subgroup, Proxy class TestUpdateSmartProxyInstance(unittest.TestCase): @@ -40,17 +44,35 @@ class TestUpdateSmartProxyInstance(unittest.TestCase): self.assertEqual(mock_instance.instance_id, self.instance_id) -class TestProxyAutomation(unittest.TestCase): - def setUp(self): - app.config['TESTING'] = True - self.app = app.test_client() - - def test_proxy_automation_abstract_methods(self): - # Test NotImplementedError for import_state - with self.assertRaises(NotImplementedError): - proxy_automation = ProxyAutomation() - proxy_automation.import_state(None) +@pytest.fixture +def active_proxies(): + proxies = [MagicMock(spec=Proxy) for _ in range(5)] + for proxy in proxies: + proxy.deprecated = None + proxy.destroyed = None + return proxies -if __name__ == '__main__': - unittest.main() \ No newline at end of file +def test_random_slug(): + slug = random_slug("example.com") + assert slug[:5] == "examp" + assert len(slug) == 17 + + +def test_next_subgroup(): + subgroup_count = { + 'provider1': { + 1: { + 1: 3 + }, + 2: { + 1: 2, + 2: 5 + }, + 3: {} + } + } + assert next_subgroup(subgroup_count, 'provider1', 1, 3, 5) == 1 + assert next_subgroup(subgroup_count, 'provider1', 1, 5, 3) == 2 + assert next_subgroup(subgroup_count, 'provider1', 2, 3, 2) == 3 + assert next_subgroup(subgroup_count, 'provider1', 3, 3, 5) == 1