lots of typing fixes

This commit is contained in:
Iain Learmonth 2022-05-16 11:44:03 +01:00
parent 51f580a304
commit 3665c34961
43 changed files with 260 additions and 178 deletions

View file

@ -1,5 +1,5 @@
import boto3 as boto3
from flask import Flask, jsonify, Response, redirect, url_for
from flask import Flask, redirect, url_for
from flask.typing import ResponseReturnValue
import yaml
from app.extensions import db
@ -20,37 +20,9 @@ app.register_blueprint(portal, url_prefix="/portal")
@app.route('/')
def index():
def index() -> ResponseReturnValue:
return redirect(url_for("portal.portal_home"))
@app.route('/import/cloudfront')
def import_cloudfront():
a = ""
not_found = []
cloudfront = boto3.client('cloudfront',
aws_access_key_id=app.config['AWS_ACCESS_KEY'],
aws_secret_access_key=app.config['AWS_SECRET_KEY'])
dist_paginator = cloudfront.get_paginator('list_distributions')
page_iterator = dist_paginator.paginate()
for page in page_iterator:
for dist in page['DistributionList']['Items']:
res = Proxy.query.all()
matches = [r for r in res if r.origin.domain_name == dist['Comment'][8:]]
if not matches:
not_found.append(dist['Comment'][8:])
continue
a += f"# {dist['Comment'][8:]}\n"
a += f"terraform import module.cloudfront_{matches[0].id}.aws_cloudfront_distribution.this {dist['Id']}\n"
for n in not_found:
a += f"# Not found: {n}\n"
return Response(a, content_type="text/plain")
@app.route('/mirrorSites.json')
def json_mirror_sites():
return jsonify(mirror_sites)
if __name__ == '__main__':
app.run()

View file

@ -1,4 +1,5 @@
import datetime
from typing import Optional
from app.extensions import db
from app.models.alarms import Alarm
@ -7,9 +8,10 @@ from app.models.alarms import Alarm
def _get_alarm(target: str,
alarm_type: str,
*,
proxy_id=None,
origin_id=None,
create_if_missing=True):
proxy_id: Optional[int] = None,
origin_id: Optional[int] = None,
create_if_missing: bool = True) -> Optional[Alarm]:
alarm: Optional[Alarm]
if target == "proxy":
alarm = Alarm.query.filter(
Alarm.target == "proxy",
@ -38,5 +40,7 @@ def _get_alarm(target: str,
return alarm
def get_proxy_alarm(proxy_id: int, alarm_type: str):
return _get_alarm("proxy", alarm_type, proxy_id=proxy_id)
def get_proxy_alarm(proxy_id: int, alarm_type: str) -> Alarm:
alarm = _get_alarm("proxy", alarm_type, proxy_id=proxy_id)
assert(alarm is not None)
return alarm

View file

@ -2,13 +2,14 @@ import argparse
import logging
import sys
from os.path import basename
from typing import List
from app.cli.automate import AutomateCliHandler
from app.cli.db import DbCliHandler
from app.cli.list import ListCliHandler
def parse_args(argv):
def parse_args(argv: List[str]) -> None:
if basename(argv[0]) == "__main__.py":
argv[0] = "bypass"
parser = argparse.ArgumentParser()

View file

@ -3,6 +3,7 @@ import csv
import datetime
import logging
import sys
from typing import TYPE_CHECKING, Any
from app import app
from app.extensions import db
@ -11,6 +12,11 @@ from app.models.bridges import Bridge, BridgeConf
from app.models.mirrors import Mirror, Origin, Proxy
from app.models.alarms import Alarm, AlarmState
if TYPE_CHECKING:
_SubparserType = argparse._SubParsersAction[argparse.ArgumentParser]
else:
_SubparserType = Any
models = {
"bridge": Bridge,
"bridgeconf": BridgeConf,
@ -23,14 +29,14 @@ models = {
}
def export(model: db.Model):
def export(model: db.Model) -> None:
out = csv.writer(sys.stdout)
out.writerow(model.csv_header())
for r in model.query.all():
out.writerow(r.csv_row())
def impot(model: db.Model):
def impot(model: db.Model) -> None:
first = True
header = model.csv_header()
try:
@ -46,15 +52,15 @@ def impot(model: db.Model):
if header[i] in ["added", "updated", "destroyed", "deprecated", "last_updated", "terraform_updated"]:
# datetime fields
if line[i] == "":
line[i] = None
line[i] = None # type: ignore
else:
line[i] = datetime.datetime.strptime(line[i], "%Y-%m-%d %H:%M:%S.%f")
line[i] = datetime.datetime.strptime(line[i], "%Y-%m-%d %H:%M:%S.%f") # type: ignore
elif header[i] in ["eotk"]:
# boolean fields
line[i] = line[i] == "True"
line[i] = line[i] == "True" # type: ignore
elif header[i].endswith("_id") and line[i] == "":
# integer foreign keys
line[i] = None
line[i] = None # type: ignore
elif header[i] in ["alarm_state"]:
# alarm states
line[i] = getattr(AlarmState, line[i][len("AlarmState."):])
@ -69,7 +75,7 @@ def impot(model: db.Model):
class DbCliHandler:
@classmethod
def add_subparser_to(cls, subparsers: argparse._SubParsersAction) -> None:
def add_subparser_to(cls, subparsers: _SubparserType) -> None:
parser = subparsers.add_parser("db", help="database operations")
parser.add_argument("--export", choices=sorted(models.keys()),
help="export data to CSV format")
@ -77,10 +83,10 @@ class DbCliHandler:
help="import data from CSV format", dest="impot")
parser.set_defaults(cls=cls)
def __init__(self, args):
def __init__(self, args: argparse.Namespace) -> None:
self.args = args
def run(self):
def run(self) -> None:
with app.app_context():
if self.args.export:
export(models[self.args.export])

View file

@ -2,35 +2,40 @@ import argparse
import json
import logging
import sys
from typing import Callable
from typing import Callable, TYPE_CHECKING, Any, Dict
from app import app, mirror_sites
from app.lists.bridgelines import bridgelines
from app.lists.mirror_mapping import mirror_mapping
lists = {
if TYPE_CHECKING:
_SubparserType = argparse._SubParsersAction[argparse.ArgumentParser]
else:
_SubparserType = Any
lists: Dict[str, Callable[[], Any]] = {
"mirror_mapping": mirror_mapping,
"bc2": mirror_sites,
"bridgelines": bridgelines,
}
def dump(list_f: Callable):
def dump(list_f: Callable[[], Any]) -> None:
json.dump(list_f(), sys.stdout, indent=2)
class ListCliHandler:
@classmethod
def add_subparser_to(cls, subparsers: argparse._SubParsersAction) -> None:
def add_subparser_to(cls, subparsers: _SubparserType) -> None:
parser = subparsers.add_parser("list", help="list operations")
parser.add_argument("--dump", choices=sorted(lists.keys()),
help="dump a list in JSON format")
parser.set_defaults(cls=cls)
def __init__(self, args):
def __init__(self, args: argparse.Namespace) -> None:
self.args = args
def run(self):
def run(self) -> None:
with app.app_context():
if self.args.dump:
dump(lists[self.args.dump])

View file

@ -1,5 +1,5 @@
import builtins
from typing import List, Iterable
from typing import List, Iterable, Dict, Any, Optional
from pydantic import BaseModel, Field
@ -26,7 +26,7 @@ class Bridgelines(BaseModel):
title = "Bridgelines Version 1"
def bridgelines(*, distribution_method: str = None):
def bridgelines(*, distribution_method: Optional[str] = None) -> Dict[str, Any]:
bridges: Iterable[Bridge] = Bridge.query.filter(
Bridge.destroyed == None,
Bridge.deprecated == None,

View file

@ -1,5 +1,5 @@
import builtins
from typing import Dict, List
from typing import Dict, List, Union
from pydantic import BaseModel, Field
from tldextract import extract
@ -29,7 +29,7 @@ class MirrorMapping(BaseModel):
title = "Mirror Mapping Version 1.1"
def mirror_mapping():
def mirror_mapping() -> Dict[str, Union[str, Dict[str, str]]]:
from app import app
return MirrorMapping(
version="1.1",

View file

@ -4,7 +4,7 @@ from typing import Union, List, Optional, Any
from app.extensions import db
class AbstractConfiguration(db.Model):
class AbstractConfiguration(db.Model): # type: ignore
__abstract__ = True
id = db.Column(db.Integer, primary_key=True)
@ -13,23 +13,23 @@ class AbstractConfiguration(db.Model):
updated = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
destroyed = db.Column(db.DateTime(), nullable=True)
def destroy(self):
def destroy(self) -> None:
self.destroyed = datetime.utcnow()
self.updated = datetime.utcnow()
@classmethod
def csv_header(cls):
def csv_header(cls) -> List[str]:
return [
"id", "description", "added", "updated", "destroyed"
]
def csv_row(self):
def csv_row(self) -> List[Any]:
return [
getattr(self, x) for x in self.csv_header()
]
class AbstractResource(db.Model):
class AbstractResource(db.Model): # type: ignore
__abstract__ = True
id = db.Column(db.Integer, primary_key=True)

View file

@ -7,7 +7,7 @@ from app.models import AbstractConfiguration
from app.extensions import db
class Activity(db.Model):
class Activity(db.Model): # type: ignore
id = db.Column(db.Integer(), primary_key=True)
group_id = db.Column(db.Integer(), nullable=True)
activity_type = db.Column(db.String(20), nullable=False)
@ -49,7 +49,7 @@ class Webhook(AbstractConfiguration):
format = db.Column(db.String(20))
url = db.Column(db.String(255))
def send(self, text: str):
def send(self, text: str) -> None:
if self.format == "telegram":
data = {"text": text}
else:

View file

@ -1,5 +1,6 @@
import enum
from datetime import datetime
from typing import List, Any
from app import db
@ -11,7 +12,7 @@ class AlarmState(enum.Enum):
CRITICAL = 3
class Alarm(db.Model):
class Alarm(db.Model): # type: ignore
id = db.Column(db.Integer, primary_key=True)
target = db.Column(db.String(60), nullable=False)
group_id = db.Column(db.Integer, db.ForeignKey("group.id"))
@ -30,18 +31,18 @@ class Alarm(db.Model):
bridge = db.relationship("Bridge", back_populates="alarms")
@classmethod
def csv_header(cls):
def csv_header(cls) -> List[str]:
return [
"id", "target", "group_id", "origin_id", "proxy_id", "bridge_id", "alarm_type",
"alarm_state", "state_changed", "last_updated", "text"
]
def csv_row(self):
def csv_row(self) -> List[Any]:
return [
getattr(self, x) for x in self.csv_header()
]
def update_state(self, state: AlarmState, text: str):
def update_state(self, state: AlarmState, text: str) -> None:
if self.alarm_state != state or self.state_changed is None:
self.state_changed = datetime.utcnow()
self.alarm_state = state

View file

@ -21,7 +21,7 @@ class Automation(AbstractConfiguration):
logs = db.relationship("AutomationLogs", back_populates="automation")
def kick(self):
def kick(self) -> None:
self.enabled = True
self.next_run = datetime.datetime.utcnow()
self.updated = datetime.datetime.utcnow()

View file

@ -1,4 +1,5 @@
from datetime import datetime
from typing import List
from app import db
from app.models import AbstractConfiguration
@ -15,7 +16,7 @@ class Group(AbstractConfiguration):
alarms = db.relationship("Alarm", back_populates="group")
@classmethod
def csv_header(cls):
def csv_header(cls) -> List[str]:
return super().csv_header() + [
"group_name", "eotk"
]
@ -43,21 +44,21 @@ class MirrorList(AbstractConfiguration):
"bridgelines": "Tor Bridge Lines"
}
def destroy(self):
def destroy(self) -> None:
self.destroyed = datetime.utcnow()
self.updated = datetime.utcnow()
db.session.commit()
def url(self):
def url(self) -> str:
if self.provider == "gitlab":
return f"https://gitlab.com/{self.container}/-/raw/{self.branch}/{self.filename}"
if self.provider == "github":
return f"https://raw.githubusercontent.com/{self.container}/{self.branch}/{self.filename}"
if self.provider == "s3":
return f"s3://{self.container}/{self.filename}"
return "Unknown provider"
@classmethod
def csv_header(cls):
def csv_header(cls) -> List[str]:
return super().csv_header() + [
"provider", "format", "container", "branch", "filename"
]

View file

@ -1,4 +1,5 @@
from datetime import datetime
from typing import List
from app import db
from app.models import AbstractConfiguration, AbstractResource
@ -14,7 +15,7 @@ class BridgeConf(AbstractConfiguration):
group = db.relationship("Group", back_populates="bridgeconfs")
bridges = db.relationship("Bridge", back_populates="conf")
def destroy(self):
def destroy(self) -> None:
self.destroyed = datetime.utcnow()
self.updated = datetime.utcnow()
for bridge in self.bridges:
@ -23,7 +24,7 @@ class BridgeConf(AbstractConfiguration):
bridge.updated = datetime.utcnow()
@classmethod
def csv_header(cls):
def csv_header(cls) -> List[str]:
return super().csv_header() + [
"group_id", "provider", "method", "description", "number"
]
@ -41,7 +42,7 @@ class Bridge(AbstractResource):
alarms = db.relationship("Alarm", back_populates="bridge")
@classmethod
def csv_header(cls):
def csv_header(cls) -> List[str]:
return super().csv_header() + [
"conf_id", "terraform_updated", "nickname", "fingerprint", "hashed_fingerprint", "bridgeline"
]

View file

@ -1,4 +1,4 @@
from typing import Optional
from typing import Optional, List
from tldextract import extract
@ -18,12 +18,12 @@ class Origin(AbstractConfiguration):
alarms = db.relationship("Alarm", back_populates="origin")
@classmethod
def csv_header(cls):
def csv_header(cls) -> List[str]:
return super().csv_header() + [
"group_id", "domain_name"
]
def destroy(self):
def destroy(self) -> None:
super().destroy()
for proxy in self.proxies:
proxy.destroy()
@ -33,7 +33,8 @@ class Origin(AbstractConfiguration):
onion = Onion.query.filter(Onion.domain_name == tld).first()
if not onion:
return None
return self.domain_name.replace(tld, f"{onion.onion_name}")
domain_name: str = self.domain_name
return domain_name.replace(tld, f"{onion.onion_name}")
class Proxy(AbstractResource):
@ -48,7 +49,7 @@ class Proxy(AbstractResource):
alarms = db.relationship("Alarm", back_populates="proxy")
@classmethod
def csv_header(cls):
def csv_header(cls) -> List[str]:
return super().csv_header() + [
"origin_id", "provider", "psg", "slug", "terraform_updated", "url"
]
@ -61,7 +62,7 @@ class Mirror(AbstractResource):
origin = db.relationship("Origin", back_populates="mirrors")
@classmethod
def csv_header(cls):
def csv_header(cls) -> List[str]:
return super().csv_header() + [
"origin_id", "url"
]

View file

@ -1,6 +1,8 @@
from datetime import datetime, timedelta, timezone
from typing import Optional
from flask import Blueprint, render_template, request
from flask.typing import ResponseReturnValue
from sqlalchemy import desc, or_
from app.models.activity import Activity
@ -43,13 +45,13 @@ def calculate_mirror_expiry(s: datetime) -> str:
@portal.app_template_filter("format_datetime")
def format_datetime(s: datetime) -> str:
def format_datetime(s: Optional[datetime]) -> str:
if s is None:
return "Unknown"
return s.strftime("%a, %d %b %Y %H:%M:%S")
def total_origins_blocked():
def total_origins_blocked() -> int:
count = 0
for o in Origin.query.filter(Origin.destroyed == None).all():
for a in o.alarms:
@ -59,8 +61,9 @@ def total_origins_blocked():
break
return count
@portal.route("/")
def portal_home():
def portal_home() -> ResponseReturnValue:
groups = Group.query.order_by(Group.group_name).all()
now = datetime.now(timezone.utc)
proxies = Proxy.query.filter(Proxy.destroyed == None).all()
@ -86,7 +89,7 @@ def portal_home():
@portal.route("/search")
def search():
def search() -> ResponseReturnValue:
query = request.args.get("query")
proxies = Proxy.query.filter(or_(Proxy.url.contains(query)), Proxy.destroyed == None).all()
origins = Origin.query.filter(or_(Origin.description.contains(query), Origin.domain_name.contains(query))).all()
@ -94,7 +97,7 @@ def search():
@portal.route('/alarms')
def view_alarms():
def view_alarms() -> ResponseReturnValue:
one_day_ago = datetime.now(timezone.utc) - timedelta(days=1)
alarms = Alarm.query.filter(Alarm.last_updated >= one_day_ago).order_by(
desc(Alarm.alarm_state), desc(Alarm.state_changed)).all()

View file

@ -1,6 +1,8 @@
from datetime import datetime
from typing import Optional
from flask import render_template, flash, Response, Blueprint
from flask.typing import ResponseReturnValue
from flask_wtf import FlaskForm
from sqlalchemy import exc
from wtforms import SubmitField, BooleanField
@ -12,13 +14,13 @@ from app.portal.util import view_lifecycle, response_404
bp = Blueprint("automation", __name__)
class EditAutomationForm(FlaskForm):
class EditAutomationForm(FlaskForm): # type: ignore
enabled = BooleanField('Enabled')
submit = SubmitField('Save Changes')
@bp.route("/list")
def automation_list():
def automation_list() -> ResponseReturnValue:
automations = Automation.query.filter(
Automation.destroyed == None).order_by(Automation.description).all()
return render_template("list.html.j2",
@ -29,8 +31,8 @@ def automation_list():
@bp.route('/edit/<automation_id>', methods=['GET', 'POST'])
def automation_edit(automation_id):
automation = Automation.query.filter(Automation.id == automation_id).first()
def automation_edit(automation_id: int) -> ResponseReturnValue:
automation: Optional[Automation] = Automation.query.filter(Automation.id == automation_id).first()
if automation is None:
return Response(render_template("error.html.j2",
section="automation",
@ -52,7 +54,7 @@ def automation_edit(automation_id):
@bp.route("/kick/<automation_id>", methods=['GET', 'POST'])
def automation_kick(automation_id: int):
def automation_kick(automation_id: int) -> ResponseReturnValue:
automation = Automation.query.filter(
Automation.id == automation_id,
Automation.destroyed == None).first()

View file

@ -1,4 +1,7 @@
from typing import Optional
from flask import render_template, Response, flash, redirect, url_for, Blueprint
from flask.typing import ResponseReturnValue
from app.extensions import db
from app.models.bridges import Bridge
@ -8,7 +11,7 @@ bp = Blueprint("bridge", __name__)
@bp.route("/list")
def bridge_list():
def bridge_list() -> ResponseReturnValue:
bridges = Bridge.query.filter(Bridge.destroyed == None).all()
return render_template("list.html.j2",
section="bridge",
@ -18,8 +21,8 @@ def bridge_list():
@bp.route("/block/<bridge_id>", methods=['GET', 'POST'])
def bridge_blocked(bridge_id):
bridge: Bridge = Bridge.query.filter(Bridge.id == bridge_id, Bridge.destroyed == None).first()
def bridge_blocked(bridge_id: int) -> ResponseReturnValue:
bridge: Optional[Bridge] = Bridge.query.filter(Bridge.id == bridge_id, Bridge.destroyed == None).first()
if bridge is None:
return Response(render_template("error.html.j2",
header="404 Proxy Not Found",

View file

@ -1,6 +1,8 @@
from datetime import datetime
from typing import Optional, List
from flask import render_template, url_for, flash, redirect, Response, Blueprint
from flask.typing import ResponseReturnValue
from flask_wtf import FlaskForm
from sqlalchemy import exc
from wtforms import SelectField, StringField, IntegerField, SubmitField
@ -14,7 +16,7 @@ from app.portal.util import response_404, view_lifecycle
bp = Blueprint("bridgeconf", __name__)
class NewBridgeConfForm(FlaskForm):
class NewBridgeConfForm(FlaskForm): # type: ignore
provider = SelectField('Provider', validators=[DataRequired()])
method = SelectField('Distribution Method', validators=[DataRequired()])
description = StringField('Description')
@ -23,15 +25,15 @@ class NewBridgeConfForm(FlaskForm):
submit = SubmitField('Save Changes')
class EditBridgeConfForm(FlaskForm):
class EditBridgeConfForm(FlaskForm): # type: ignore
description = StringField('Description')
number = IntegerField('Number', validators=[NumberRange(1, message="One or more bridges must be created")])
submit = SubmitField('Save Changes')
@bp.route("/list")
def bridgeconf_list():
bridgeconfs = BridgeConf.query.filter(BridgeConf.destroyed == None).all()
def bridgeconf_list() -> ResponseReturnValue:
bridgeconfs: List[BridgeConf] = BridgeConf.query.filter(BridgeConf.destroyed == None).all()
return render_template("list.html.j2",
section="bridgeconf",
title="Tor Bridge Configurations",
@ -42,7 +44,7 @@ def bridgeconf_list():
@bp.route("/new", methods=['GET', 'POST'])
@bp.route("/new/<group_id>", methods=['GET', 'POST'])
def bridgeconf_new(group_id=None):
def bridgeconf_new(group_id: Optional[int] = None) -> ResponseReturnValue:
form = NewBridgeConfForm()
form.group.choices = [(x.id, x.group_name) for x in Group.query.all()]
form.provider.choices = [
@ -82,7 +84,7 @@ def bridgeconf_new(group_id=None):
@bp.route('/edit/<bridgeconf_id>', methods=['GET', 'POST'])
def bridgeconf_edit(bridgeconf_id):
def bridgeconf_edit(bridgeconf_id: int) -> ResponseReturnValue:
bridgeconf = BridgeConf.query.filter(BridgeConf.id == bridgeconf_id).first()
if bridgeconf is None:
return Response(render_template("error.html.j2",
@ -107,7 +109,7 @@ def bridgeconf_edit(bridgeconf_id):
@bp.route("/destroy/<bridgeconf_id>", methods=['GET', 'POST'])
def bridgeconf_destroy(bridgeconf_id: int):
def bridgeconf_destroy(bridgeconf_id: int) -> ResponseReturnValue:
bridgeconf = BridgeConf.query.filter(BridgeConf.id == bridgeconf_id, BridgeConf.destroyed == None).first()
if bridgeconf is None:
return response_404("The requested bridge configuration could not be found.")

View file

@ -1,4 +1,5 @@
from flask import render_template, Blueprint, Response
from flask.typing import ResponseReturnValue
from sqlalchemy import desc
from app.models.base import Group
@ -8,7 +9,7 @@ bp = Blueprint("eotk", __name__)
@bp.route("/list")
def eotk_list():
def eotk_list() -> ResponseReturnValue:
instances = Eotk.query.filter(Eotk.destroyed == None).order_by(desc(Eotk.added)).all()
return render_template("list.html.j2",
section="eotk",
@ -18,7 +19,7 @@ def eotk_list():
@bp.route("/conf/<group_id>")
def eotk_conf(group_id: int):
def eotk_conf(group_id: int) -> ResponseReturnValue:
from app import app
group = Group.query.filter(Group.id == group_id).first()
return Response(render_template("sites.conf.j2",

View file

@ -2,12 +2,12 @@ from flask_wtf import FlaskForm
from wtforms import StringField, SubmitField, SelectField
class EditMirrorForm(FlaskForm):
class EditMirrorForm(FlaskForm): # type: ignore
origin = SelectField('Origin')
url = StringField('URL')
submit = SubmitField('Save Changes')
class EditProxyForm(FlaskForm):
class EditProxyForm(FlaskForm): # type: ignore
origin = SelectField('Origin')
submit = SubmitField('Save Changes')

View file

@ -1,6 +1,7 @@
from datetime import datetime
from flask import render_template, url_for, flash, redirect, Response, Blueprint
from flask.typing import ResponseReturnValue
from flask_wtf import FlaskForm
from sqlalchemy import exc
from wtforms import StringField, BooleanField, SubmitField
@ -12,21 +13,21 @@ from app.models.base import Group
bp = Blueprint("group", __name__)
class NewGroupForm(FlaskForm):
class NewGroupForm(FlaskForm): # type: ignore
group_name = StringField("Short Name", validators=[DataRequired()])
description = StringField("Description", validators=[DataRequired()])
eotk = BooleanField("Deploy EOTK instances?")
submit = SubmitField('Save Changes', render_kw={"class": "btn btn-success"})
class EditGroupForm(FlaskForm):
class EditGroupForm(FlaskForm): # type: ignore
description = StringField('Description', validators=[DataRequired()])
eotk = BooleanField("Deploy EOTK instances?")
submit = SubmitField('Save Changes', render_kw={"class": "btn btn-success"})
@bp.route("/list")
def group_list():
def group_list() -> ResponseReturnValue:
groups = Group.query.order_by(Group.group_name).all()
return render_template("list.html.j2",
section="group",
@ -37,7 +38,7 @@ def group_list():
@bp.route("/new", methods=['GET', 'POST'])
def group_new():
def group_new() -> ResponseReturnValue:
form = NewGroupForm()
if form.validate_on_submit():
group = Group()
@ -59,7 +60,7 @@ def group_new():
@bp.route('/edit/<group_id>', methods=['GET', 'POST'])
def group_edit(group_id):
def group_edit(group_id: int) -> ResponseReturnValue:
group = Group.query.filter(Group.id == group_id).first()
if group is None:
return Response(render_template("error.html.j2",

View file

@ -1,7 +1,9 @@
import json
from datetime import datetime
from typing import Optional
from flask import render_template, url_for, flash, redirect, Blueprint, Response
from flask.typing import ResponseReturnValue
from flask_wtf import FlaskForm
from sqlalchemy import exc
from wtforms import SelectField, StringField, SubmitField
@ -28,7 +30,7 @@ def list_format_name(s: str) -> str:
@bp.route('/list')
def list_list():
def list_list() -> ResponseReturnValue:
lists = MirrorList.query.filter(MirrorList.destroyed == None).all()
return render_template("list.html.j2",
section="list",
@ -48,7 +50,7 @@ def list_list():
@bp.route('/preview/<format_>')
def list_preview(format_: str):
def list_preview(format_: str) -> ResponseReturnValue:
if format_ == "bca":
return Response(json.dumps(mirror_mapping()), content_type="application/json")
if format_ == "bc2":
@ -59,7 +61,7 @@ def list_preview(format_: str):
@bp.route("/destroy/<list_id>", methods=['GET', 'POST'])
def list_destroy(list_id: int):
def list_destroy(list_id: int) -> ResponseReturnValue:
list_ = MirrorList.query.filter(MirrorList.id == list_id, MirrorList.destroyed == None).first()
if list_ is None:
return response_404("The requested bridge configuration could not be found.")
@ -76,10 +78,10 @@ def list_destroy(list_id: int):
@bp.route("/new", methods=['GET', 'POST'])
@bp.route("/new/<group_id>", methods=['GET', 'POST'])
def list_new(group_id=None):
def list_new(group_id: Optional[int] = None) -> ResponseReturnValue:
form = NewMirrorListForm()
form.provider.choices = [(k, v) for k, v in MirrorList.providers_supported]
form.format.choices = [(k, v) for k, v in MirrorList.formats_supported]
form.provider.choices = [(k, v) for k, v in MirrorList.providers_supported] # type: ignore
form.format.choices = [(k, v) for k, v in MirrorList.formats_supported] # type: ignore
if form.validate_on_submit():
list_ = MirrorList()
list_.provider = form.provider.data
@ -105,7 +107,7 @@ def list_new(group_id=None):
return render_template("new.html.j2", section="list", form=form)
class NewMirrorListForm(FlaskForm):
class NewMirrorListForm(FlaskForm): # type: ignore
provider = SelectField('Provider', validators=[DataRequired()])
format = SelectField('Distribution Method', validators=[DataRequired()])
description = StringField('Description', validators=[DataRequired()])

View file

@ -1,6 +1,8 @@
from datetime import datetime
from typing import Optional
from flask import flash, redirect, url_for, render_template, Response, Blueprint
from flask.typing import ResponseReturnValue
from flask_wtf import FlaskForm
from sqlalchemy import exc
from wtforms import StringField, SelectField, SubmitField
@ -14,7 +16,7 @@ from app.portal.util import response_404, view_lifecycle
bp = Blueprint("onion", __name__)
class NewOnionForm(FlaskForm):
class NewOnionForm(FlaskForm): # type: ignore
domain_name = StringField('Domain Name', validators=[DataRequired()])
onion_name = StringField('Onion Name', validators=[DataRequired(), Length(min=56, max=56)],
description="Onion service hostname, excluding the .onion suffix")
@ -23,7 +25,7 @@ class NewOnionForm(FlaskForm):
submit = SubmitField('Save Changes')
class EditOnionForm(FlaskForm):
class EditOnionForm(FlaskForm): # type: ignore
description = StringField('Description', validators=[DataRequired()])
group = SelectField('Group', validators=[DataRequired()])
submit = SubmitField('Save Changes')
@ -31,7 +33,7 @@ class EditOnionForm(FlaskForm):
@bp.route("/new", methods=['GET', 'POST'])
@bp.route("/new/<group_id>", methods=['GET', 'POST'])
def onion_new(group_id=None):
def onion_new(group_id: Optional[int] = None) -> ResponseReturnValue:
form = NewOnionForm()
form.group.choices = [(x.id, x.group_name) for x in Group.query.all()]
if form.validate_on_submit():
@ -57,8 +59,8 @@ def onion_new(group_id=None):
@bp.route('/edit/<onion_id>', methods=['GET', 'POST'])
def onion_edit(onion_id):
onion = Onion.query.filter(Onion.id == onion_id).first()
def onion_edit(onion_id: int) -> ResponseReturnValue:
onion: Optional[Onion] = Onion.query.filter(Onion.id == onion_id).first()
if onion is None:
return Response(render_template("error.html.j2",
section="onion",
@ -83,7 +85,7 @@ def onion_edit(onion_id):
@bp.route("/list")
def onion_list():
def onion_list() -> ResponseReturnValue:
onions = Onion.query.order_by(Onion.domain_name).all()
return render_template("list.html.j2",
section="onion",
@ -94,7 +96,7 @@ def onion_list():
@bp.route("/destroy/<onion_id>", methods=['GET', 'POST'])
def onion_destroy(onion_id: int):
def onion_destroy(onion_id: int) -> ResponseReturnValue:
onion = Onion.query.filter(Onion.id == onion_id, Onion.destroyed == None).first()
if onion is None:
return response_404("The requested onion service could not be found.")

View file

@ -1,6 +1,8 @@
from datetime import datetime
from typing import Optional, List
from flask import flash, redirect, url_for, render_template, Response, Blueprint
from flask.typing import ResponseReturnValue
from flask_wtf import FlaskForm
from sqlalchemy import exc
from wtforms import StringField, SelectField, SubmitField, BooleanField
@ -14,7 +16,7 @@ from app.portal.util import response_404, view_lifecycle
bp = Blueprint("origin", __name__)
class NewOriginForm(FlaskForm):
class NewOriginForm(FlaskForm): # type: ignore
domain_name = StringField('Domain Name', validators=[DataRequired()])
description = StringField('Description', validators=[DataRequired()])
group = SelectField('Group', validators=[DataRequired()])
@ -22,7 +24,7 @@ class NewOriginForm(FlaskForm):
submit = SubmitField('Save Changes')
class EditOriginForm(FlaskForm):
class EditOriginForm(FlaskForm): # type: ignore
description = StringField('Description', validators=[DataRequired()])
group = SelectField('Group', validators=[DataRequired()])
auto_rotate = BooleanField("Enable auto-rotation?")
@ -31,7 +33,7 @@ class EditOriginForm(FlaskForm):
@bp.route("/new", methods=['GET', 'POST'])
@bp.route("/new/<group_id>", methods=['GET', 'POST'])
def origin_new(group_id=None):
def origin_new(group_id: Optional[int] = None) -> ResponseReturnValue:
form = NewOriginForm()
form.group.choices = [(x.id, x.group_name) for x in Group.query.all()]
if form.validate_on_submit():
@ -57,8 +59,8 @@ def origin_new(group_id=None):
@bp.route('/edit/<origin_id>', methods=['GET', 'POST'])
def origin_edit(origin_id):
origin = Origin.query.filter(Origin.id == origin_id).first()
def origin_edit(origin_id: int) -> ResponseReturnValue:
origin: Optional[Origin] = Origin.query.filter(Origin.id == origin_id).first()
if origin is None:
return Response(render_template("error.html.j2",
section="origin",
@ -85,8 +87,8 @@ def origin_edit(origin_id):
@bp.route("/list")
def origin_list():
origins = Origin.query.order_by(Origin.domain_name).all()
def origin_list() -> ResponseReturnValue:
origins: List[Origin] = Origin.query.order_by(Origin.domain_name).all()
return render_template("list.html.j2",
section="origin",
title="Web Origins",
@ -101,7 +103,7 @@ def origin_list():
@bp.route("/onion")
def origin_onion():
def origin_onion() -> ResponseReturnValue:
origins = Origin.query.order_by(Origin.domain_name).all()
return render_template("list.html.j2",
section="origin",
@ -112,7 +114,7 @@ def origin_onion():
@bp.route("/destroy/<origin_id>", methods=['GET', 'POST'])
def origin_destroy(origin_id: int):
def origin_destroy(origin_id: int) -> ResponseReturnValue:
origin = Origin.query.filter(Origin.id == origin_id, Origin.destroyed == None).first()
if origin is None:
return response_404("The requested origin could not be found.")

View file

@ -1,4 +1,5 @@
from flask import render_template, Response, flash, redirect, url_for, Blueprint
from flask.typing import ResponseReturnValue
from sqlalchemy import desc
from app.extensions import db
@ -9,7 +10,7 @@ bp = Blueprint("proxy", __name__)
@bp.route("/list")
def proxy_list():
def proxy_list() -> ResponseReturnValue:
proxies = Proxy.query.filter(Proxy.destroyed == None).order_by(desc(Proxy.added)).all()
return render_template("list.html.j2",
section="proxy",
@ -19,7 +20,7 @@ def proxy_list():
@bp.route("/block/<proxy_id>", methods=['GET', 'POST'])
def proxy_block(proxy_id):
def proxy_block(proxy_id: int) -> ResponseReturnValue:
proxy = Proxy.query.filter(Proxy.id == proxy_id, Proxy.destroyed == None).first()
if proxy is None:
return Response(render_template("error.html.j2",

View file

@ -1,4 +1,5 @@
from flask import Response, render_template, flash, redirect, url_for
from flask.typing import ResponseReturnValue
from flask_wtf import FlaskForm
from wtforms import SubmitField
@ -7,7 +8,7 @@ from app.models import AbstractResource
from app.models.activity import Activity
def response_404(message: str):
def response_404(message: str) -> ResponseReturnValue:
return Response(render_template("error.html.j2",
header="404 Not Found",
message=message))
@ -20,7 +21,7 @@ def view_lifecycle(*,
success_view: str,
section: str,
resource: AbstractResource,
action: str):
action: str) -> ResponseReturnValue:
form = LifecycleForm()
if action == "destroy":
form.submit.render_kw = {"class": "btn btn-danger"}
@ -54,5 +55,5 @@ def view_lifecycle(*,
form=form)
class LifecycleForm(FlaskForm):
class LifecycleForm(FlaskForm): # type: ignore
submit = SubmitField('Confirm')

View file

@ -1,6 +1,8 @@
from datetime import datetime
from typing import Optional
from flask import Blueprint, flash, Response, render_template, redirect, url_for
from flask.typing import ResponseReturnValue
from flask_wtf import FlaskForm
from sqlalchemy import exc
from wtforms import StringField, SelectField, SubmitField
@ -22,7 +24,7 @@ def webhook_format_name(s: str) -> str:
return "Unknown"
class NewWebhookForm(FlaskForm):
class NewWebhookForm(FlaskForm): # type: ignore
description = StringField('Description', validators=[DataRequired()])
format = SelectField('Format', choices=[
("telegram", "Telegram"),
@ -33,7 +35,7 @@ class NewWebhookForm(FlaskForm):
@bp.route("/new", methods=['GET', 'POST'])
def webhook_new():
def webhook_new() -> ResponseReturnValue:
form = NewWebhookForm()
if form.validate_on_submit():
webhook = Webhook(
@ -53,7 +55,7 @@ def webhook_new():
@bp.route('/edit/<webhook_id>', methods=['GET', 'POST'])
def webhook_edit(webhook_id):
def webhook_edit(webhook_id: int) -> ResponseReturnValue:
webhook = Webhook.query.filter(Webhook.id == webhook_id).first()
if webhook is None:
return Response(render_template("error.html.j2",
@ -81,7 +83,7 @@ def webhook_edit(webhook_id):
@bp.route("/list")
def webhook_list():
def webhook_list() -> ResponseReturnValue:
webhooks = Webhook.query.all()
return render_template("list.html.j2",
section="webhook",
@ -92,8 +94,8 @@ def webhook_list():
@bp.route("/destroy/<webhook_id>", methods=['GET', 'POST'])
def webhook_destroy(webhook_id: int):
webhook = Webhook.query.filter(Webhook.id == webhook_id, Webhook.destroyed == None).first()
def webhook_destroy(webhook_id: int) -> ResponseReturnValue:
webhook: Optional[Webhook] = Webhook.query.filter(Webhook.id == webhook_id, Webhook.destroyed == None).first()
if webhook is None:
return response_404("The requested webhook could not be found.")
return view_lifecycle(

View file

@ -1,6 +1,6 @@
from abc import ABCMeta, abstractmethod
import os
from typing import Tuple
from typing import Tuple, Optional
from app import app
@ -19,7 +19,7 @@ class BaseAutomation(metaclass=ABCMeta):
def automate(self, full: bool = False) -> Tuple[bool, str]:
raise NotImplementedError()
def working_directory(self, filename=None) -> str:
def working_directory(self, filename: Optional[str] = None) -> str:
"""
Provides a filesystem path that can be used during the automation run.
This is currently a persistent path, but this should not be relied upon

View file

@ -1,3 +1,5 @@
from typing import Tuple
from azure.identity import ClientSecretCredential
from azure.mgmt.alertsmanagement import AlertsManagementClient
@ -12,7 +14,7 @@ class AlarmProxyAzureCdnAutomation(BaseAutomation):
short_name = "monitor_proxy_azure_cdn"
description = "Import alarms for Azure CDN proxies"
def automate(self):
def automate(self, full: bool = False) -> Tuple[bool, str]:
credential = ClientSecretCredential(
tenant_id=app.config['AZURE_TENANT_ID'],
client_id=app.config['AZURE_CLIENT_ID'],
@ -33,4 +35,4 @@ class AlarmProxyAzureCdnAutomation(BaseAutomation):
alarm.update_state(AlarmState.OK, "Azure monitor alert not firing")
else:
alarm.update_state(AlarmState.CRITICAL, "Azure monitor alert firing")
return True, []
return True, ""

View file

@ -1,4 +1,5 @@
import datetime
from typing import Tuple
import boto3
@ -14,7 +15,7 @@ class AlarmProxyCloudfrontAutomation(BaseAutomation):
short_name = "monitor_proxy_cloudfront"
description = "Import alarms for AWS CloudFront proxies"
def automate(self):
def automate(self, full: bool = False) -> Tuple[bool, str]:
cloudwatch = boto3.client('cloudwatch',
aws_access_key_id=app.config['AWS_ACCESS_KEY'],
aws_secret_access_key=app.config['AWS_SECRET_KEY'],
@ -39,7 +40,7 @@ class AlarmProxyCloudfrontAutomation(BaseAutomation):
Alarm.alarm_type == "cloudfront-quota"
).first()
if alarm is None:
alarm = Alarm()
alarm = Alarm() # type: ignore
alarm.target = "service/cloudfront"
alarm.alarm_type = "cloudfront-quota"
alarm.state_changed = datetime.datetime.utcnow()
@ -57,4 +58,4 @@ class AlarmProxyCloudfrontAutomation(BaseAutomation):
if alarm.alarm_state != old_state:
alarm.state_changed = datetime.datetime.utcnow()
db.session.commit()
return True, []
return True, ""

View file

@ -8,7 +8,7 @@ from app.models.mirrors import Proxy
from app.terraform import BaseAutomation
def set_http_alarm(proxy_id: int, state: AlarmState, text: str):
def set_http_alarm(proxy_id: int, state: AlarmState, text: str) -> None:
alarm = Alarm.query.filter(
Alarm.proxy_id == proxy_id,
Alarm.alarm_type == "http-status"

View file

@ -19,7 +19,10 @@ class BlockBridgeGitHubAutomation(BaseAutomation):
g = Github(app.config['GITHUB_API_KEY'])
repo = g.get_repo(app.config['GITHUB_BRIDGE_REPO'])
for vp in app.config['GITHUB_BRIDGE_VANTAGE_POINTS']:
results = repo.get_contents(f"recentResult_{vp}").decoded_content.decode('utf-8').splitlines()
contents = repo.get_contents(f"recentResult_{vp}")
if isinstance(contents, list):
return False, f"Expected a file at recentResult_{vp} but got a directory."
results = contents.decoded_content.decode('utf-8').splitlines()
for result in results:
parts = result.split("\t")
if isoparse(parts[2]) < (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=3)):

View file

@ -58,7 +58,7 @@ class BlockExternalAutomation(BaseAutomation):
continue
activities.append(Activity(
activity_type="block",
text=(f"Proxy {p.url} for {p.origin.domain_name} detected blocked according to external source. "
text=(f"Proxy {proxy.url} for {proxy.origin.domain_name} detected blocked according to external source. "
"Rotation scheduled.")
))
proxy.deprecate(reason="external")

View file

@ -1,7 +1,7 @@
from collections import defaultdict
from datetime import datetime
from datetime import timedelta
from typing import Dict, Tuple
from typing import Dict, Tuple, Union, Any
import requests
@ -11,7 +11,7 @@ from app.models.mirrors import Origin
from app.terraform import BaseAutomation
def check_origin(domain_name: str):
def check_origin(domain_name: str) -> Dict[str, Any]:
start_date = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%dT%H%%3A%M")
end_date = datetime.utcnow().strftime("%Y-%m-%dT%H%%3A%M")
api_url = f"https://api.ooni.io/api/v1/measurements?domain={domain_name}&since={start_date}&until={end_date}"
@ -19,7 +19,7 @@ def check_origin(domain_name: str):
return _check_origin(api_url, result)
def _check_origin(api_url: str, result: Dict):
def _check_origin(api_url: str, result: Dict[str, Any]) -> Dict[str, Any]:
print(f"Processing {api_url}")
req = requests.get(api_url).json()
if 'results' not in req or not req['results']:
@ -38,7 +38,7 @@ def _check_origin(api_url: str, result: Dict):
return result
def threshold_origin(domain_name):
def threshold_origin(domain_name: str) -> Dict[str, Any]:
ooni = check_origin(domain_name)
for country in ooni:
total = sum([
@ -58,7 +58,7 @@ def threshold_origin(domain_name):
return ooni
def set_ooni_alarm(origin_id: int, country: str, state: AlarmState, text: str):
def set_ooni_alarm(origin_id: int, country: str, state: AlarmState, text: str) -> None:
alarm = Alarm.query.filter(
Alarm.origin_id == origin_id,
Alarm.alarm_type == f"origin-block-ooni-{country}"

View file

@ -1,5 +1,5 @@
import datetime
from typing import Iterable, Optional, Any
from typing import Iterable, Optional, Any, List
from app import app
from app.extensions import db
@ -9,7 +9,18 @@ from app.terraform.terraform import TerraformAutomation
class BridgeAutomation(TerraformAutomation):
def create_missing(self):
template: str
"""
Terraform configuration template using Jinja 2.
"""
template_parameters: List[str]
"""
List of parameters to be read from the application configuration for use
in the templating of the Terraform configuration.
"""
def create_missing(self) -> None:
bridgeconfs: Iterable[BridgeConf] = BridgeConf.query.filter(
BridgeConf.provider == self.provider,
BridgeConf.destroyed == None
@ -35,7 +46,7 @@ class BridgeAutomation(TerraformAutomation):
break
db.session.commit()
def destroy_expired(self):
def destroy_expired(self) -> None:
cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=0)
bridges = [b for b in Bridge.query.filter(
Bridge.destroyed == None,
@ -48,8 +59,9 @@ class BridgeAutomation(TerraformAutomation):
def tf_prehook(self) -> Optional[Any]:
self.create_missing()
self.destroy_expired()
return None
def tf_generate(self):
def tf_generate(self) -> None:
self.tf_write(
self.template,
groups=Group.query.all(),

View file

@ -8,7 +8,9 @@ from app.models.onions import Eotk
from app.terraform.terraform import TerraformAutomation
def update_eotk_instance(group_id: int, region: str, instance_id: str):
def update_eotk_instance(group_id: int,
region: str,
instance_id: str) -> None:
instance = Eotk.query.filter(
Eotk.group_id == group_id,
Eotk.region == region,
@ -74,7 +76,7 @@ class EotkAWSAutomation(TerraformAutomation):
{% endfor %}
"""
def tf_generate(self):
def tf_generate(self) -> None:
self.tf_write(
self.template,
groups=Group.query.filter(

View file

@ -1,4 +1,5 @@
import json
from typing import List
from app import app
from app.lists.mirror_mapping import mirror_mapping
@ -9,7 +10,18 @@ from app.terraform.terraform import TerraformAutomation
class ListAutomation(TerraformAutomation):
def tf_generate(self):
template: str
"""
Terraform configuration template using Jinja 2.
"""
template_parameters: List[str]
"""
List of parameters to be read from the application configuration for use
in the templating of the Terraform configuration.
"""
def tf_generate(self) -> None:
self.tf_write(
self.template,
lists=MirrorList.query.filter(

View file

@ -1,9 +1,10 @@
from abc import abstractmethod
from collections import defaultdict
import datetime
import math
import string
import random
from typing import Dict
from typing import Dict, Optional, Any, List
from sqlalchemy import text
from tldextract import tldextract
@ -17,6 +18,22 @@ from app.terraform.terraform import TerraformAutomation
class ProxyAutomation(TerraformAutomation):
subgroup_max = math.inf
"""
Maximum number of proxies to deploy per sub-group. This is required for some providers
where the number origins per group may exceed the number of proxies that can be configured
in a single "configuration block", e.g. Azure CDN's profiles.
"""
template: str
"""
Terraform configuration template using Jinja 2.
"""
template_parameters: List[str]
"""
List of parameters to be read from the application configuration for use
in the templating of the Terraform configuration.
"""
def get_subgroups(self) -> Dict[int, Dict[int, int]]:
conn = db.engine.connect()
@ -27,12 +44,12 @@ class ProxyAutomation(TerraformAutomation):
AND proxy.provider = :provider
GROUP BY origin.group_id, proxy.psg;
"""), provider=self.provider)
subgroups = defaultdict(lambda: defaultdict(lambda: 0))
subgroups: Dict[int, Dict[int, int]] = defaultdict(lambda: defaultdict(lambda: 0))
for row in result:
subgroups[row[0]][row[1]] = row[2]
return subgroups
def create_missing_proxies(self):
def create_missing_proxies(self) -> None:
groups = Group.query.all()
subgroups = self.get_subgroups()
for group in groups:
@ -62,7 +79,7 @@ class ProxyAutomation(TerraformAutomation):
db.session.add(proxy)
db.session.commit()
def deprecate_orphaned_proxies(self):
def deprecate_orphaned_proxies(self) -> None:
proxies = Proxy.query.filter(
Proxy.deprecated == None,
Proxy.destroyed == None,
@ -73,7 +90,7 @@ class ProxyAutomation(TerraformAutomation):
proxy.deprecate(reason="origin_destroyed")
db.session.commit()
def destroy_expired_proxies(self):
def destroy_expired_proxies(self) -> None:
cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=3)
proxies = Proxy.query.filter(
Proxy.destroyed == None,
@ -85,15 +102,20 @@ class ProxyAutomation(TerraformAutomation):
proxy.updated = datetime.datetime.utcnow()
db.session.commit()
def tf_prehook(self):
@abstractmethod
def import_state(self, state: Any) -> None:
raise NotImplementedError()
def tf_prehook(self) -> Optional[Any]:
self.create_missing_proxies()
self.deprecate_orphaned_proxies()
self.destroy_expired_proxies()
return None
def tf_posthook(self, *, prehook_result):
def tf_posthook(self, *, prehook_result: Any = None) -> None:
self.import_state(self.tf_show())
def tf_generate(self):
def tf_generate(self) -> None:
self.tf_write(
self.template,
groups=Group.query.all(),

View file

@ -1,3 +1,5 @@
from typing import Optional, Any
from app.extensions import db
from app.models.mirrors import Proxy
from app.terraform.proxy import ProxyAutomation
@ -157,7 +159,7 @@ class ProxyAzureCdnAutomation(ProxyAutomation):
{% endfor %}
"""
def import_state(self, state):
def import_state(self, state: Optional[Any]) -> None:
proxies = Proxy.query.filter(
Proxy.provider == self.provider,
Proxy.destroyed == None

View file

@ -1,4 +1,5 @@
import datetime
from typing import Any
from app.extensions import db
from app.models.mirrors import Proxy
@ -72,7 +73,11 @@ class ProxyCloudfrontAutomation(ProxyAutomation):
{% endfor %}
"""
def import_state(self, state):
def import_state(self, state: Any) -> None:
assert(isinstance(state, dict))
if "child_modules" not in state['values']['root_module']:
# There are no CloudFront proxies deployed to import state for
return
for mod in state['values']['root_module']['child_modules']:
if mod['address'].startswith('module.cloudfront_'):
for res in mod['resources']:

View file

@ -1,3 +1,6 @@
# type: ignore
# TODO: This module doesn't work at all
import datetime
import os
import string

View file

@ -19,6 +19,11 @@ class TerraformAutomation(BaseAutomation):
Default parallelism for remote API calls.
"""
provider: str
"""
Short name for the provider used by this module.
"""
def automate(self, full: bool = False) -> Tuple[bool, str]:
"""
Runs the Terraform automation module. The run will follow these steps:

View file

@ -24,4 +24,3 @@ no_implicit_optional = True
; Some libraries don't have hints yet
ignore_missing_imports = True
follow_imports = silent