import argparse import datetime import json import logging from traceback import TracebackException from typing import Type, TYPE_CHECKING, Any from app import app from app.extensions import db from app.models.activity import Activity from app.models.automation import Automation, AutomationState, AutomationLogs from app.terraform import BaseAutomation from app.terraform.block_bridge_github import BlockBridgeGitHubAutomation from app.terraform.block_external import BlockExternalAutomation from app.terraform.block_ooni import BlockOONIAutomation from app.terraform.block_roskomsvoboda import BlockRoskomsvobodaAutomation from app.terraform.eotk.aws import EotkAWSAutomation from app.terraform.alarms.proxy_azure_cdn import AlarmProxyAzureCdnAutomation from app.terraform.alarms.proxy_cloudfront import AlarmProxyCloudfrontAutomation from app.terraform.alarms.proxy_http_status import AlarmProxyHTTPStatusAutomation from app.terraform.bridge.aws import BridgeAWSAutomation from app.terraform.bridge.gandi import BridgeGandiAutomation from app.terraform.bridge.hcloud import BridgeHcloudAutomation from app.terraform.bridge.ovh import BridgeOvhAutomation from app.terraform.list.github import ListGithubAutomation from app.terraform.list.gitlab import ListGitlabAutomation from app.terraform.list.s3 import ListS3Automation from app.terraform.proxy.azure_cdn import ProxyAzureCdnAutomation from app.terraform.proxy.cloudfront import ProxyCloudfrontAutomation if TYPE_CHECKING: _SubparserType = argparse._SubParsersAction[argparse.ArgumentParser] else: _SubparserType = Any jobs = { x.short_name: x for x in [ AlarmProxyAzureCdnAutomation, AlarmProxyCloudfrontAutomation, AlarmProxyHTTPStatusAutomation, BlockBridgeGitHubAutomation, BlockExternalAutomation, BlockOONIAutomation, BlockRoskomsvobodaAutomation, BridgeAWSAutomation, BridgeGandiAutomation, BridgeHcloudAutomation, BridgeOvhAutomation, EotkAWSAutomation, ListGithubAutomation, ListGitlabAutomation, ListS3Automation, ProxyAzureCdnAutomation, ProxyCloudfrontAutomation ] } def run_all(**kwargs: bool) -> None: for job in jobs.values(): run_job(job, **kwargs) # type: ignore def run_job(job_cls: Type[BaseAutomation], *, force: bool = False, ignore_schedule: bool = False) -> None: automation = Automation.query.filter(Automation.short_name == job_cls.short_name).first() if automation is None: automation = Automation() automation.short_name = job_cls.short_name automation.description = job_cls.description automation.enabled = False automation.next_is_full = False automation.added = datetime.datetime.utcnow() automation.updated = automation.added db.session.add(automation) db.session.commit() else: if automation.state == AutomationState.RUNNING and not force: logging.warning("Not running an already running automation") return if not ignore_schedule and not force: if automation.next_run is not None and automation.next_run > datetime.datetime.utcnow(): logging.warning("Not time to run this job yet") return if not automation.enabled and not force: logging.warning(f"job {job_cls.short_name} is disabled and --force not specified") return automation.state = AutomationState.RUNNING db.session.commit() job: BaseAutomation = job_cls() try: success, logs = job.automate() except Exception as e: tb = TracebackException.from_exception(e) success = False logs = "".join(tb.format()) if success: automation.state = AutomationState.IDLE automation.next_run = datetime.datetime.utcnow() + datetime.timedelta( minutes=getattr(job, "frequency", 7)) else: automation.state = AutomationState.ERROR automation.enabled = False automation.next_run = None log = AutomationLogs() log.automation_id = automation.id log.added = datetime.datetime.utcnow() log.updated = datetime.datetime.utcnow() log.logs = json.dumps(logs) db.session.add(log) activity = Activity( activity_type="automation", text=f"FLASH! Automation Failure: {automation.short_name}. See logs for details." ) activity.notify() # Notify before commit because the failure occurred even if we can't commit. automation.last_run = datetime.datetime.utcnow() db.session.commit() class AutomateCliHandler: @classmethod def add_subparser_to(cls, subparsers: _SubparserType) -> None: parser = subparsers.add_parser("automate", help="automation operations") parser.add_argument("-a", "--all", dest="all", help="run all automation jobs", action="store_true") parser.add_argument("-j", "--job", dest="job", choices=sorted(jobs.keys()), help="run a specific automation job") parser.add_argument("--force", help="run job even if disabled and it's not time yet", action="store_true") parser.add_argument("--ignore-schedule", help="run job even if it's not time yet", action="store_true") parser.set_defaults(cls=cls) def __init__(self, args: argparse.Namespace) -> None: self.args = args def run(self) -> None: with app.app_context(): if self.args.job: run_job(jobs[self.args.job], # type: ignore force=self.args.force, ignore_schedule=self.args.ignore_schedule) elif self.args.all: run_all(force=self.args.force, ignore_schedule=self.args.ignore_schedule) else: logging.error("No action requested")