import os import sys from typing import Iterator import yaml from flask import Flask, redirect, url_for, send_from_directory from flask.typing import ResponseReturnValue from prometheus_client import make_wsgi_app, Metric, CollectorRegistry from prometheus_client.metrics_core import GaugeMetricFamily, CounterMetricFamily from prometheus_client.registry import Collector, REGISTRY from sqlalchemy import text from sqlalchemy.exc import SQLAlchemyError from werkzeug.middleware.dispatcher import DispatcherMiddleware from app.api import api from app.extensions import bootstrap from app.extensions import db from app.extensions import migrate from app.models.automation import Automation, AutomationState from app.portal import portal from app.portal.report import report from app.tfstate import tfstate from prometheus_flask_exporter import PrometheusMetrics app = Flask(__name__) app.config.from_file("../config.yaml", load=yaml.safe_load) # create new registry to avoid multiple metrics registration in global REGISTRY registry = CollectorRegistry() metrics = PrometheusMetrics(app, registry=registry) app.wsgi_app = DispatcherMiddleware(app.wsgi_app, { # type: ignore[method-assign] '/metrics': make_wsgi_app(registry) }) # register default collectors to our new registry collectors = list(REGISTRY._collector_to_names.keys()) for collector in collectors: registry.register(collector) db.init_app(app) migrate.init_app(app, db, render_as_batch=True) bootstrap.init_app(app) app.register_blueprint(api, url_prefix="/api") app.register_blueprint(portal, url_prefix="/portal") app.register_blueprint(tfstate, url_prefix="/tfstate") app.register_blueprint(report, url_prefix="/report") def not_migrating() -> bool: return len(sys.argv) < 2 or sys.argv[1] != "db" class DefinedProxiesCollector(Collector): def collect(self) -> Iterator[Metric]: with app.app_context(): ok = GaugeMetricFamily("database_collector", "Status of a database collector (0: bad, 1: good)", labels=["collector"]) try: with db.engine.connect() as conn: result = conn.execute(text(""" SELECT origin.group_id, "group".group_name, proxy.provider, proxy.pool_id, pool.pool_name, COUNT(proxy.id) FROM proxy, origin, pool, "group" WHERE proxy.origin_id = origin.id AND origin.group_id = "group".id AND proxy.pool_id = pool.id AND proxy.destroyed IS NULL GROUP BY origin.group_id, "group".group_name, proxy.provider, proxy.pool_id, pool.pool_name; """)) c = GaugeMetricFamily("defined_proxies", "Number of proxies currently defined for deployment", labels=['group_id', 'group_name', 'provider', 'pool_id', 'pool_name']) for row in result: c.add_metric([str(row[0]), row[1], row[2], str(row[3]), row[4]], row[5]) yield c ok.add_metric(["defined_proxies"], 1) except SQLAlchemyError: ok.add_metric(["defined_proxies"], 0) yield ok class BlockedProxiesCollector(Collector): def collect(self) -> Iterator[Metric]: with app.app_context(): ok = GaugeMetricFamily("database_collector", "Status of a database collector (0: bad, 1: good)", labels=["collector"]) try: with db.engine.connect() as conn: result = conn.execute(text(""" SELECT origin.group_id, "group".group_name, proxy.provider, proxy.pool_id, pool.pool_name, proxy.deprecation_reason, COUNT(proxy.id) FROM proxy, origin, pool, "group" WHERE proxy.origin_id = origin.id AND origin.group_id = "group".id AND proxy.pool_id = pool.id AND proxy.deprecated IS NOT NULL GROUP BY origin.group_id, "group".group_name, proxy.provider, proxy.pool_id, pool.pool_name, proxy.deprecation_reason; """)) c = CounterMetricFamily("deprecated_proxies", "Number of proxies deprecated", labels=['group_id', 'group_name', 'provider', 'pool_id', 'pool_name', 'deprecation_reason']) for row in result: c.add_metric([str(row[0]), row[1], row[2], str(row[3]), row[4], row[5]], row[6]) yield c ok.add_metric(["deprecated_proxies"], 0) except SQLAlchemyError: ok.add_metric(["deprecated_proxies"], 0) yield ok class AutomationCollector(Collector): def collect(self) -> Iterator[Metric]: with app.app_context(): ok = GaugeMetricFamily("database_collector", "Status of a database collector (0: bad, 1: good)", labels=["collector"]) try: state = GaugeMetricFamily("automation_state", "The automation state (0: idle, 1: running, 2: error)", labels=['automation_name']) enabled = GaugeMetricFamily("automation_enabled", "Whether an automation is enabled (0: disabled, 1: enabled)", labels=['automation_name']) next_run = GaugeMetricFamily("automation_next_run", "The timestamp of the next run of the automation", labels=['automation_name']) last_run_start = GaugeMetricFamily("automation_last_run_start", "The timestamp of the last run of the automation ", labels=['automation_name']) automations = Automation.query.all() for automation in automations: if automation.short_name in app.config['HIDDEN_AUTOMATIONS']: continue if automation.state == AutomationState.IDLE: state.add_metric([automation.short_name], 0) elif automation.state == AutomationState.RUNNING: state.add_metric([automation.short_name], 1) else: state.add_metric([automation.short_name], 2) enabled.add_metric([automation.short_name], 1 if automation.enabled else 0) if automation.next_run: next_run.add_metric([automation.short_name], automation.next_run.timestamp()) else: next_run.add_metric([automation.short_name], 0) if automation.last_run: last_run_start.add_metric([automation.short_name], automation.last_run.timestamp()) else: last_run_start.add_metric([automation.short_name], 0) yield state yield enabled yield next_run yield last_run_start ok.add_metric(["automation_state"], 1) except SQLAlchemyError: ok.add_metric(["automation_state"], 0) yield ok # register all custom collectors to registry if not_migrating() and 'DISABLE_METRICS' not in os.environ: registry.register(DefinedProxiesCollector()) registry.register(BlockedProxiesCollector()) registry.register(AutomationCollector()) @app.route('/ui') def redirect_ui() -> ResponseReturnValue: return redirect("/ui/") @app.route('/ui/', defaults={'path': ''}) @app.route('/ui/') def serve_ui(path: str) -> ResponseReturnValue: if path != "" and os.path.exists("app/static/ui/" + path): return send_from_directory('static/ui', path) else: return send_from_directory('static/ui', 'index.html') @app.route('/') def index() -> ResponseReturnValue: # TODO: update to point at new UI when ready return redirect(url_for("portal.portal_home")) if __name__ == '__main__': app.run()