import datetime from typing import Tuple from dateutil.parser import isoparse from github import Github from app import app from app.extensions import db from app.models.bridges import Bridge from app.terraform import BaseAutomation class BlockBridgeGitHubAutomation(BaseAutomation): """ Automation task to import bridge reachability results from GitHub. """ short_name = "block_bridge_github" description = "Import bridge reachability results from GitHub" frequency = 30 def automate(self, full: bool = False) -> Tuple[bool, str]: github = Github(app.config['GITHUB_API_KEY']) repo = github.get_repo(app.config['GITHUB_BRIDGE_REPO']) for vantage_point in app.config['GITHUB_BRIDGE_VANTAGE_POINTS']: contents = repo.get_contents(f"recentResult_{vantage_point}") if isinstance(contents, list): return (False, f"Expected a file at recentResult_{vantage_point}" " but got a directory.") results = contents.decoded_content.decode('utf-8').splitlines() for result in results: parts = result.split("\t") if isoparse(parts[2]) < (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=3)): continue if int(parts[1]) < 40: bridge: Bridge = Bridge.query.filter( Bridge.hashed_fingerprint == parts[0] ).first() if bridge is not None: bridge.deprecate(reason="github") db.session.commit() return True, ""