lint: tidy up code in cli module

This commit is contained in:
Iain Learmonth 2022-06-17 13:21:35 +01:00
parent 5f7733d064
commit 98895e47de
6 changed files with 61 additions and 58 deletions

View file

@ -1,5 +1,5 @@
[MASTER] [MASTER]
disable=missing-module-docstring disable=missing-module-docstring,missing-class-docstring,missing-function-docstring
ignored-classes=Column ignored-classes=Column
load-plugins=pylint_flask,pylint_flask_sqlalchemy load-plugins=pylint_flask,pylint_flask_sqlalchemy
max-line-length=120 max-line-length=120

View file

@ -0,0 +1,22 @@
import argparse
from abc import abstractmethod
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
_SubparserType = argparse._SubParsersAction[argparse.ArgumentParser] # pylint: disable=protected-access
else:
_SubparserType = Any
class BaseCliHandler:
def __init__(self, args: argparse.Namespace) -> None:
self.args = args
@classmethod
@abstractmethod
def add_subparser_to(cls, subparsers: _SubparserType) -> None:
raise NotImplementedError
@abstractmethod
def run(self) -> None:
raise NotImplementedError

View file

@ -27,8 +27,8 @@ def parse_args(argv: List[str]) -> None:
if __name__ == "__main__": if __name__ == "__main__":
verbose = "-v" in sys.argv or "--verbose" in sys.argv VERBOSE = "-v" in sys.argv or "--verbose" in sys.argv
logging.basicConfig( logging.basicConfig(
level=logging.DEBUG if verbose else logging.INFO) level=logging.DEBUG if VERBOSE else logging.INFO)
logging.debug("Arguments: %s", sys.argv) logging.debug("Arguments: %s", sys.argv)
parse_args(sys.argv) parse_args(sys.argv)

View file

@ -1,11 +1,11 @@
import argparse
import datetime import datetime
import json import json
import logging import logging
from traceback import TracebackException from traceback import TracebackException
from typing import Type, TYPE_CHECKING, Any from typing import Type
from app import app from app import app
from app.cli import _SubparserType, BaseCliHandler
from app.extensions import db from app.extensions import db
from app.models.activity import Activity from app.models.activity import Activity
from app.models.automation import Automation, AutomationState, AutomationLogs from app.models.automation import Automation, AutomationState, AutomationLogs
@ -30,12 +30,6 @@ from app.terraform.list.s3 import ListS3Automation
from app.terraform.proxy.azure_cdn import ProxyAzureCdnAutomation from app.terraform.proxy.azure_cdn import ProxyAzureCdnAutomation
from app.terraform.proxy.cloudfront import ProxyCloudfrontAutomation from app.terraform.proxy.cloudfront import ProxyCloudfrontAutomation
if TYPE_CHECKING:
_SubparserType = argparse._SubParsersAction[argparse.ArgumentParser]
else:
_SubparserType = Any
jobs = { jobs = {
x.short_name: x x.short_name: x
for x in [ for x in [
@ -63,6 +57,13 @@ jobs = {
def run_all(**kwargs: bool) -> None: def run_all(**kwargs: bool) -> None:
"""
Run all automation tasks.
:param kwargs: this function takes the same arguments as :func:`run_job` and will pass the same arguments
to every task
:return: None
"""
for job in jobs.values(): for job in jobs.values():
run_job(job, **kwargs) # type: ignore run_job(job, **kwargs) # type: ignore
@ -100,10 +101,10 @@ def run_job(job_cls: Type[BaseAutomation], *,
# the error handling process isn't really handling the error, but rather causing it # the error handling process isn't really handling the error, but rather causing it
# to be logged for investigation. Catching more specific exceptions would just mean that # to be logged for investigation. Catching more specific exceptions would just mean that
# others go unrecorded and are difficult to debug. # others go unrecorded and are difficult to debug.
except Exception as e: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
tb = TracebackException.from_exception(e) trace = TracebackException.from_exception(exc)
success = False success = False
logs = "\n".join(tb.format()) logs = "\n".join(trace.format())
if success: if success:
automation.state = AutomationState.IDLE automation.state = AutomationState.IDLE
automation.next_run = datetime.datetime.utcnow() + datetime.timedelta( automation.next_run = datetime.datetime.utcnow() + datetime.timedelta(
@ -128,7 +129,7 @@ def run_job(job_cls: Type[BaseAutomation], *,
db.session.commit() db.session.commit()
class AutomateCliHandler: class AutomateCliHandler(BaseCliHandler):
@classmethod @classmethod
def add_subparser_to(cls, subparsers: _SubparserType) -> None: def add_subparser_to(cls, subparsers: _SubparserType) -> None:
parser = subparsers.add_parser("automate", help="automation operations") parser = subparsers.add_parser("automate", help="automation operations")
@ -139,9 +140,6 @@ class AutomateCliHandler:
parser.add_argument("--ignore-schedule", help="run job even if it's not time yet", action="store_true") parser.add_argument("--ignore-schedule", help="run job even if it's not time yet", action="store_true")
parser.set_defaults(cls=cls) parser.set_defaults(cls=cls)
def __init__(self, args: argparse.Namespace) -> None:
self.args = args
def run(self) -> None: def run(self) -> None:
with app.app_context(): with app.app_context():
if self.args.job: if self.args.job:

View file

@ -1,22 +1,16 @@
import argparse
import csv import csv
import datetime import datetime
import logging import logging
import sys import sys
from typing import TYPE_CHECKING, Any
from app import app from app import app
from app.cli import _SubparserType, BaseCliHandler
from app.extensions import db from app.extensions import db
from app.models.base import Group, MirrorList from app.models.base import Group, MirrorList
from app.models.bridges import Bridge, BridgeConf from app.models.bridges import Bridge, BridgeConf
from app.models.mirrors import Origin, Proxy from app.models.mirrors import Origin, Proxy
from app.models.alarms import Alarm, AlarmState from app.models.alarms import Alarm, AlarmState
if TYPE_CHECKING:
_SubparserType = argparse._SubParsersAction[argparse.ArgumentParser]
else:
_SubparserType = Any
models = { models = {
"bridge": Bridge, "bridge": Bridge,
"bridgeconf": BridgeConf, "bridgeconf": BridgeConf,
@ -31,8 +25,8 @@ models = {
def export(model: db.Model) -> None: def export(model: db.Model) -> None:
out = csv.writer(sys.stdout) out = csv.writer(sys.stdout)
out.writerow(model.csv_header()) out.writerow(model.csv_header())
for r in model.query.all(): for row in model.query.all():
out.writerow(r.csv_row()) out.writerow(row.csv_row())
def impot(model: db.Model) -> None: def impot(model: db.Model) -> None:
@ -46,35 +40,35 @@ def impot(model: db.Model) -> None:
sys.exit(1) sys.exit(1)
first = False first = False
continue continue
x = model() new_entity = model()
for i in range(len(header)): for idx, field_name in header:
if header[i] in ["added", "updated", "destroyed", "deprecated", "last_updated", "terraform_updated"]: if field_name in ["added", "updated", "destroyed", "deprecated", "last_updated", "terraform_updated"]:
# datetime fields # datetime fields
if line[i] == "": if line[idx] == "":
line[i] = None # type: ignore line[idx] = None # type: ignore
else: else:
line[i] = datetime.datetime.strptime(line[i], "%Y-%m-%d %H:%M:%S.%f") # type: ignore line[idx] = datetime.datetime.strptime(line[idx], "%Y-%m-%d %H:%M:%S.%f") # type: ignore
elif header[i] in ["eotk", "auto_rotation", "smart"]: elif field_name in ["eotk", "auto_rotation", "smart"]:
# boolean fields # boolean fields
line[i] = line[i] == "True" # type: ignore line[idx] = line[idx] == "True" # type: ignore
elif header[i].endswith("_id") and line[i] == "": elif field_name.endswith("_id") and line[idx] == "":
# integer foreign keys # integer foreign keys
line[i] = None # type: ignore line[idx] = None # type: ignore
elif header[i] in ["alarm_state"]: elif field_name in ["alarm_state"]:
# alarm states # alarm states
line[i] = getattr(AlarmState, line[i][len("AlarmState."):]) line[idx] = getattr(AlarmState, line[idx][len("AlarmState."):])
setattr(x, header[i], line[i]) setattr(new_entity, field_name, line[idx])
db.session.add(x) db.session.add(new_entity)
db.session.commit() db.session.commit()
logging.info("Import completed successfully") logging.info("Import completed successfully")
# Many things can go wrong in the above, like IO, format or database errors. # Many things can go wrong in the above, like IO, format or database errors.
# We catch all the errors and ensure the database transaction is rolled back, and log it. # We catch all the errors and ensure the database transaction is rolled back, and log it.
except Exception as e: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
logging.exception(e) logging.exception(exc)
db.session.rollback() db.session.rollback()
class DbCliHandler: class DbCliHandler(BaseCliHandler):
@classmethod @classmethod
def add_subparser_to(cls, subparsers: _SubparserType) -> None: def add_subparser_to(cls, subparsers: _SubparserType) -> None:
parser = subparsers.add_parser("db", help="database operations") parser = subparsers.add_parser("db", help="database operations")
@ -84,9 +78,6 @@ class DbCliHandler:
help="import data from CSV format", dest="impot") help="import data from CSV format", dest="impot")
parser.set_defaults(cls=cls) parser.set_defaults(cls=cls)
def __init__(self, args: argparse.Namespace) -> None:
self.args = args
def run(self) -> None: def run(self) -> None:
with app.app_context(): with app.app_context():
if self.args.export: if self.args.export:

View file

@ -1,23 +1,18 @@
import argparse
import json import json
import logging import logging
import sys import sys
from typing import Callable, TYPE_CHECKING, Any from typing import Callable, Any
from app import app from app import app
from app.cli import _SubparserType, BaseCliHandler
from app.lists import lists from app.lists import lists
if TYPE_CHECKING:
_SubparserType = argparse._SubParsersAction[argparse.ArgumentParser]
else:
_SubparserType = Any
def dump(list_f: Callable[[], Any]) -> None: def dump(list_f: Callable[[], Any]) -> None:
json.dump(list_f(), sys.stdout, indent=2) json.dump(list_f(), sys.stdout, indent=2)
class ListCliHandler: class ListCliHandler(BaseCliHandler):
@classmethod @classmethod
def add_subparser_to(cls, subparsers: _SubparserType) -> None: def add_subparser_to(cls, subparsers: _SubparserType) -> None:
parser = subparsers.add_parser("list", help="list operations") parser = subparsers.add_parser("list", help="list operations")
@ -25,9 +20,6 @@ class ListCliHandler:
help="dump a list in JSON format") help="dump a list in JSON format")
parser.set_defaults(cls=cls) parser.set_defaults(cls=cls)
def __init__(self, args: argparse.Namespace) -> None:
self.args = args
def run(self) -> None: def run(self) -> None:
with app.app_context(): with app.app_context():
if self.args.dump: if self.args.dump: