feat(proxy): rewrite the meta module to support hot spares

This commit is contained in:
Iain Learmonth 2023-05-30 16:38:00 +01:00
parent 9f726731c6
commit 8115966aca
4 changed files with 327 additions and 99 deletions

View file

@ -93,7 +93,8 @@ class ProxyAutomation(TerraformAutomation):
self.template,
groups=groups,
proxies=Proxy.query.filter(
Proxy.provider == self.provider, Proxy.destroyed.is_(None)).all(), subgroups=self.get_subgroups(),
Proxy.provider == self.provider, Proxy.destroyed.is_(None)).all(),
subgroups=self.get_subgroups(),
global_namespace=app.config['GLOBAL_NAMESPACE'], bypass_token=app.config['BYPASS_TOKEN'],
terraform_modules_path=os.path.join(*list(os.path.split(app.root_path))[:-1], 'terraform-modules'),
backend_config=f"""backend "http" {{
@ -157,25 +158,3 @@ class ProxyAutomation(TerraformAutomation):
for row in result:
subgroups[row[0]][row[1]] = row[2]
return subgroups
@classmethod
def next_subgroup(cls, group_id: int) -> Optional[int]:
provider = cls() # Some attributes are set by constructor
conn = db.engine.connect()
stmt = text("""
SELECT proxy.psg, COUNT(proxy.id) FROM proxy, origin
WHERE proxy.origin_id = origin.id
AND proxy.destroyed IS NULL
AND origin.group_id = :group_id
AND proxy.provider = :provider
GROUP BY proxy.psg ORDER BY proxy.psg;
""")
stmt = stmt.bindparams(provider=provider.short_name, group_id=group_id)
result = conn.execute(stmt).all()
subgroups = {
row[0]: row[1] for row in result
}
for subgroup in range(0, provider.subgroup_count_max):
if subgroups.get(subgroup, 0) < provider.subgroup_members_max:
return subgroup
return None

View file

@ -2,7 +2,9 @@ import datetime
import logging
import random
import string
from typing import Tuple, List
from collections import OrderedDict
from typing import Any, Dict, List, Optional, Tuple, Type
from typing import OrderedDict as OrderedDictT
from flask import current_app
from tldextract import tldextract
@ -11,56 +13,177 @@ from app import db
from app.models.base import Pool
from app.models.mirrors import Proxy, Origin
from app.terraform import BaseAutomation
from app.terraform.proxy import ProxyAutomation
from app.terraform.proxy.azure_cdn import ProxyAzureCdnAutomation
from app.terraform.proxy.cloudfront import ProxyCloudfrontAutomation
from app.terraform.proxy.fastly import ProxyFastlyAutomation
PROXY_PROVIDERS = {p.provider: p for p in [ # type: ignore[attr-defined]
PROXY_PROVIDERS: Dict[str, Type[ProxyAutomation]] = {p.provider: p for p in [ # type: ignore[attr-defined]
# In order of preference
ProxyCloudfrontAutomation,
ProxyFastlyAutomation,
ProxyAzureCdnAutomation
] if p.enabled} # type: ignore[attr-defined]
SubgroupCount = OrderedDictT[str, OrderedDictT[int, OrderedDictT[int, int]]]
def create_proxy(pool: Pool, origin: Origin) -> bool:
def all_active_proxies() -> List[Proxy]:
"""
Creates a web proxy resource for the given origin and pool combination.
Retrieve all active proxies from the database.
Initially it will attempt to create smart proxies on providers that support smart proxies,
and "simple" proxies on other providers. If other providers have exhausted their quota
already then a "simple" proxy may be created on a platform that supports smart proxies.
This function returns a list of all `Proxy` instances that are currently active.
An active proxy is defined as a proxy that is not deprecated and not destroyed.
A boolean is returned to indicate whether a proxy resource was created.
:param pool: pool to create the resource for
:param origin: origin to create the resource for
:return: whether a proxy resource was created
:return: A list of all active Proxy instances.
"""
for desperate in [False, True]:
for provider in PROXY_PROVIDERS.values():
if origin.smart and not provider.smart_proxies: # type: ignore[attr-defined]
continue # This origin cannot be supported on this provider
if provider.smart_proxies and not (desperate or origin.smart): # type: ignore[attr-defined]
continue
next_subgroup = provider.next_subgroup(origin.group_id) # type: ignore[attr-defined]
if next_subgroup is None:
continue # Exceeded maximum number of subgroups and last subgroup is full
proxy = Proxy()
proxy.pool_id = pool.id
proxy.origin_id = origin.id
proxy.provider = provider.provider # type: ignore[attr-defined]
proxy.psg = next_subgroup
# The random usage below is good enough for its purpose: to create a slug that
# hasn't been used recently.
proxy.slug = tldextract.extract(origin.domain_name).domain[:5] + ''.join(
result: List[Proxy] = Proxy.query.filter(
Proxy.deprecated.is_(None),
Proxy.destroyed.is_(None),
).all()
return result
def random_slug(origin_domain_name: str) -> str:
"""
Generate a random slug consisting of a prefix extracted from a domain name and a series of random lower case
letters.
The function extracts the domain from the given `origin_domain_name`, trims it to the first 5 characters,
and appends 12 random lower case letters.
:param origin_domain_name: The domain name to extract the prefix from.
:return: The generated random slug.
:Example:
>>> random_slug("example.com")
"exampasdfghjkl"
"""
# The random slug doesn't need to be cryptographically secure, hence the use of `# nosec`
return tldextract.extract(origin_domain_name).domain[:5] + ''.join(
random.choices(string.ascii_lowercase, k=12)) # nosec
proxy.added = datetime.datetime.utcnow()
proxy.updated = datetime.datetime.utcnow()
logging.debug("Creating proxy %s", proxy)
db.session.add(proxy)
return True
def calculate_subgroup_count(proxies: Optional[List[Proxy]] = None) -> SubgroupCount:
"""
Calculate the count of each subgroup within each group for each provider.
The function loops through the list of Proxy objects and creates a nested dictionary structure.
The keys of the outermost dictionary are the providers.
The values are dictionaries where the keys are the group IDs and the values are
dictionaries where the keys are subgroups and the values are their counts.
:param proxies: A list of Proxy objects. If None, the calculation will be performed on all active proxies.
:return: A nested dictionary representing the count of each subgroup within each group for each provider.
"""
if proxies is None:
proxies = all_active_proxies()
subgroup_count: SubgroupCount = OrderedDict()
for proxy in proxies:
if proxy.provider not in subgroup_count:
subgroup_count[proxy.provider] = OrderedDict()
if proxy.origin.group_id not in subgroup_count[proxy.provider]:
subgroup_count[proxy.provider][proxy.origin.group_id] = OrderedDict()
if proxy.psg not in subgroup_count[proxy.provider][proxy.origin.group_id]:
subgroup_count[proxy.provider][proxy.origin.group_id][proxy.psg] = 1
else:
subgroup_count[proxy.provider][proxy.origin.group_id][proxy.psg] += 1
return subgroup_count
def next_subgroup(subgroup_count: SubgroupCount, provider: str, group_id: int, max_subgroup_count: int,
max_subgroup_members: int) -> Optional[int]:
"""
Find the first available subgroup with less than the specified maximum count in the specified provider and group.
If the last subgroup in the group is full, return the next subgroup number as long as it doesn't exceed
`max_subgroup`.
The function traverses the `subgroup_count` dictionary for the given provider and group in the order of subgroup.
It returns the first subgroup found with a count less than `max_count`.
:param subgroup_count: A nested dictionary representing the count of each subgroup within each group for each
provider.
:param provider: The provider to find the next subgroup in.
:param group_id: The group to find the next subgroup in.
:param max_subgroup_count: The maximum allowable subgroup number.
:param max_subgroup_members: The maximum count a subgroup should have to be considered available.
:return: The subgroup of the first available subgroup within the specified provider and group.
If no available subgroup is found and max_subgroup is not exceeded, returns the next subgroup number.
If no subgroup is available and max_subgroup is exceeded, returns None.
"""
if provider in subgroup_count and group_id in subgroup_count[provider]:
subgroups = subgroup_count[provider][group_id]
for subgroup in range(1, max_subgroup_count + 1):
if subgroup not in subgroups or subgroups[subgroup] < max_subgroup_members:
return subgroup
return None
return 1
def auto_deprecate_proxies() -> None:
"""
Automatically deprecate proxies based on certain conditions.
This function deprecates proxies under two conditions:
1. The origin of the proxy has been destroyed.
2. The proxy belongs to a list of origins due for daily replacement and has reached its max age.
.. note::
- The "origin_destroyed" reason means the origin of the proxy has been destroyed.
- The "max_age_reached" reason means the proxy has been in use for longer than the maximum allowed period.
The maximum age cutoff is randomly set to a time between 24 and 48 hours.
"""
proxies: List[Proxy] = all_active_proxies()
for proxy in proxies:
if proxy.origin.destroyed is not None:
proxy.deprecate(reason="origin_destroyed")
if proxy.origin_id in current_app.config.get("DAILY_REPLACEMENT_ORIGINS", []):
max_age_cutoff = datetime.datetime.utcnow() - datetime.timedelta(
days=1, seconds=86400 * random.random()) # nosec: B311
if proxy.added < max_age_cutoff:
proxy.deprecate(reason="max_age_reached")
def destroy_expired_proxies() -> None:
"""
Destroy proxies that have been deprecated for a certain period of time.
This function finds all proxies that are not already destroyed and have been deprecated for more than 4 days.
It then destroys these proxies.
"""
expiry_cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=4)
proxies = Proxy.query.filter(
Proxy.destroyed.is_(None),
Proxy.deprecated < expiry_cutoff
).all()
for proxy in proxies:
logging.debug("Destroying expired proxy")
proxy.destroy()
def promote_hot_spare_proxy(pool_id: int, origin: Origin) -> bool:
"""
Promote a 'hot spare' proxy to a specified pool from the reserve pool.
This function searches for a 'hot spare' proxy (a proxy in reserve pool with pool_id == -1)
for the given origin. If a proxy is found, it is promoted to the specified pool by changing its pool ID.
:param pool_id: The pool to which the 'hot spare' proxy is to be promoted.
:param origin: The origin of the 'hot spare' proxy to be promoted.
:return: True if a 'hot spare' proxy was found and promoted, False otherwise.
.. note:: In the database, the pool ID -1 signifies a reserve pool of 'hot spare' proxies. This pool does not
appear in the pool table and is only used as a sentinel value.
"""
proxy = Proxy.query.filter(
Proxy.pool_id == -1,
Proxy.origin_id == origin.id,
).first()
if not proxy:
return False
proxy.pool_id = pool_id
return True
class ProxyMetaAutomation(BaseAutomation):
@ -68,24 +191,29 @@ class ProxyMetaAutomation(BaseAutomation):
description = "Housekeeping for proxies"
frequency = 1
subgroup_count: SubgroupCount
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.subgroup_count = calculate_subgroup_count()
def automate(self, full: bool = False) -> Tuple[bool, str]:
# Deprecate orphaned proxies, old proxies and mismatched proxies
proxies: List[Proxy] = Proxy.query.filter(
Proxy.deprecated.is_(None),
Proxy.destroyed.is_(None),
).all()
for proxy in proxies:
if proxy.origin.destroyed is not None:
proxy.deprecate(reason="origin_destroyed")
if proxy.origin_id in current_app.config.get("DAILY_REPLACEMENT_ORIGINS", []):
max_age_cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=1, seconds=86400 * random.random()) # nosec: B311
else:
max_age_cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=5, seconds=86400 * random.random()) # nosec: B311
if proxy.added < max_age_cutoff:
proxy.deprecate(reason="max_age_reached")
if proxy.origin.smart and not PROXY_PROVIDERS[proxy.provider].smart_proxies: # type: ignore[attr-defined]
proxy.deprecate(reason="not_smart_enough")
# Create new proxies
auto_deprecate_proxies()
destroy_expired_proxies()
self.handle_missing_proxies()
self.create_hot_spare_proxies()
db.session.commit()
return True, ""
def handle_missing_proxies(self) -> None:
"""
Create new proxies for origins that lack active proxies in a pool.
This function iterates over all pools, groups in each pool, and origins in each group.
If an origin is not destroyed and lacks active (not deprecated and not destroyed) proxies in a pool,
a new proxy for the origin in the pool is created.
"""
pools = Pool.query.all()
for pool in pools:
for group in pool.groups:
@ -98,15 +226,85 @@ class ProxyMetaAutomation(BaseAutomation):
]
if not proxies:
logging.debug("Creating new proxy for %s in pool %s", origin, pool)
create_proxy(pool, origin)
# Destroy expired proxies
expiry_cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=4)
proxies = Proxy.query.filter(
Proxy.destroyed.is_(None),
Proxy.deprecated < expiry_cutoff
if not promote_hot_spare_proxy(pool.id, origin):
# No "hot spare" available
self.create_proxy(pool.id, origin)
def create_proxy(self, pool_id: int, origin: Origin) -> bool:
"""
Creates a web proxy resource for the given origin and pool combination.
Initially it will attempt to create smart proxies on providers that support smart proxies,
and "simple" proxies on other providers. If other providers have exhausted their quota
already then a "simple" proxy may be created on a platform that supports smart proxies.
A boolean is returned to indicate whether a proxy resource was created.
:param pool_id: pool to create the resource for
:param origin: origin to create the resource for
:return: whether a proxy resource was created
"""
for provider in PROXY_PROVIDERS.values():
logging.debug("Looking at provider %s", provider.provider)
subgroup = next_subgroup(self.subgroup_count, provider.provider, origin.group_id,
provider.subgroup_members_max, provider.subgroup_count_max)
if subgroup is None:
continue # Exceeded maximum number of subgroups and last subgroup is full
self.increment_subgroup(provider.provider, origin.group_id, subgroup)
proxy = Proxy()
proxy.pool_id = pool_id
proxy.origin_id = origin.id
proxy.provider = provider.provider
proxy.psg = subgroup
# The random usage below is good enough for its purpose: to create a slug that
# hasn't been used recently.
proxy.slug = random_slug(origin.domain_name)
proxy.added = datetime.datetime.utcnow()
proxy.updated = datetime.datetime.utcnow()
logging.debug("Creating proxy %s", proxy)
db.session.add(proxy)
return True
return False
def increment_subgroup(self, provider: str, group_id: int, psg: int) -> None:
"""
Increment the count of a specific subgroup within a group for a specific provider.
This function mutates the `subgroup_count` dictionary by incrementing the count of the specified subgroup.
If the provider, group, or subgroup does not exist in `subgroup_count`, they are created.
:param provider: The provider to increment the subgroup count for.
:param group_id: The group to increment the subgroup count for.
:param psg: The subgroup to increment the count of.
"""
if provider not in self.subgroup_count:
self.subgroup_count[provider] = OrderedDict()
if group_id not in self.subgroup_count[provider]:
self.subgroup_count[provider][group_id] = OrderedDict()
if psg not in self.subgroup_count[provider][group_id]:
self.subgroup_count[provider][group_id][psg] = 0
self.subgroup_count[provider][group_id][psg] += 1
def create_hot_spare_proxies(self) -> None:
"""
Create 'hot spare' proxies for origins that lack active proxies.
This function iterates over all groups and their origins.
If an origin is not destroyed and lacks active proxies (not deprecated and not destroyed),
a new 'hot spare' proxy for this origin is created in the reserve pool (with pool_id = -1).
"""
origins = Origin.query.filter(
Origin.destroyed.is_(None)
).all()
for proxy in proxies:
logging.debug("Destroying expired proxy")
proxy.destroy()
db.session.commit()
return True, ""
for origin in origins:
if origin.destroyed is not None:
continue
proxies = Proxy.query.filter(
Proxy.pool_id == -1,
Proxy.origin_id == origin.id,
Proxy.deprecated.is_(None),
Proxy.destroyed.is_(None),
).all()
if not proxies:
logging.debug("Creating new hot spare proxy for origin %s", origin)
self.create_proxy(-1, origin) # Creating proxy in reserve pool

View file

@ -0,0 +1,29 @@
"""Add hot spare pool
Revision ID: 278bcfb487d3
Revises: 2d747ffb9928
Create Date: 2023-05-30 16:08:37.770371
"""
from alembic import op
from sqlalchemy.sql import text
# revision identifiers, used by Alembic.
revision = '278bcfb487d3'
down_revision = '2d747ffb9928'
branch_labels = None
depends_on = None
def upgrade():
# Add SQL here
sql = text("""INSERT INTO pool VALUES (
-1, 'Hot spares (reserve)', NOW(), NOW(), NULL, 'hotspare',
md5(to_char(NOW(), 'YYYY-MM-DD HH24:MI:SS.US')), NULL) ON CONFLICT (id) DO NOTHING;""")
op.execute(sql)
def downgrade():
# SQL to undo the changes
sql = text("""DELETE FROM pool WHERE id = -1;""")
op.execute(sql)

View file

@ -1,7 +1,11 @@
import unittest
from unittest.mock import MagicMock, patch
from app.terraform.proxy import ProxyAutomation, update_smart_proxy_instance
import pytest
from app import app
from app.terraform.proxy import update_smart_proxy_instance
from app.terraform.proxy.meta import random_slug, next_subgroup, Proxy
class TestUpdateSmartProxyInstance(unittest.TestCase):
@ -40,17 +44,35 @@ class TestUpdateSmartProxyInstance(unittest.TestCase):
self.assertEqual(mock_instance.instance_id, self.instance_id)
class TestProxyAutomation(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.app = app.test_client()
def test_proxy_automation_abstract_methods(self):
# Test NotImplementedError for import_state
with self.assertRaises(NotImplementedError):
proxy_automation = ProxyAutomation()
proxy_automation.import_state(None)
@pytest.fixture
def active_proxies():
proxies = [MagicMock(spec=Proxy) for _ in range(5)]
for proxy in proxies:
proxy.deprecated = None
proxy.destroyed = None
return proxies
if __name__ == '__main__':
unittest.main()
def test_random_slug():
slug = random_slug("example.com")
assert slug[:5] == "examp"
assert len(slug) == 17
def test_next_subgroup():
subgroup_count = {
'provider1': {
1: {
1: 3
},
2: {
1: 2,
2: 5
},
3: {}
}
}
assert next_subgroup(subgroup_count, 'provider1', 1, 3, 5) == 1
assert next_subgroup(subgroup_count, 'provider1', 1, 5, 3) == 2
assert next_subgroup(subgroup_count, 'provider1', 2, 3, 2) == 3
assert next_subgroup(subgroup_count, 'provider1', 3, 3, 5) == 1