majuna/app/terraform/block_bridge_github.py

44 lines
1.7 KiB
Python

import datetime
from typing import Tuple
from dateutil.parser import isoparse
from github import Github
from app import app
from app.extensions import db
from app.models.bridges import Bridge
from app.terraform import BaseAutomation
class BlockBridgeGitHubAutomation(BaseAutomation):
"""
Automation task to import bridge reachability results from GitHub.
"""
short_name = "block_bridge_github"
description = "Import bridge reachability results from GitHub"
frequency = 30
def automate(self, full: bool = False) -> Tuple[bool, str]:
github = Github(app.config['GITHUB_API_KEY'])
repo = github.get_repo(app.config['GITHUB_BRIDGE_REPO'])
for vantage_point in app.config['GITHUB_BRIDGE_VANTAGE_POINTS']:
contents = repo.get_contents(f"recentResult_{vantage_point}")
if isinstance(contents, list):
return (False,
f"Expected a file at recentResult_{vantage_point}"
" but got a directory.")
results = contents.decoded_content.decode('utf-8').splitlines()
for result in results:
parts = result.split("\t")
if isoparse(parts[2]) < (datetime.datetime.now(datetime.timezone.utc)
- datetime.timedelta(days=3)):
continue
if int(parts[1]) < 40:
bridge: Bridge = Bridge.query.filter(
Bridge.hashed_fingerprint == parts[0]
).first()
if bridge is not None:
bridge.deprecate(reason="github")
db.session.commit()
return True, ""