From 0056f819b6c734556c50e11de43da2d4c21ff74d Mon Sep 17 00:00:00 2001 From: Abel Luck Date: Fri, 6 Mar 2026 11:57:48 +0100 Subject: [PATCH] Add operational metrics for webhook and Matrix health --- ops_bot/main.py | 73 +++++++++++++++++++++++++++++----- ops_bot/matrix.py | 51 ++++++++++++++++-------- ops_bot/metrics.py | 97 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 195 insertions(+), 26 deletions(-) create mode 100644 ops_bot/metrics.py diff --git a/ops_bot/main.py b/ops_bot/main.py index fbbaaf4..80889e2 100644 --- a/ops_bot/main.py +++ b/ops_bot/main.py @@ -2,6 +2,7 @@ import asyncio import logging import os import sys +import time from contextlib import asynccontextmanager from typing import Any, AsyncIterator, Dict, List, Optional, Protocol, Tuple, cast @@ -20,6 +21,16 @@ from ops_bot import alertmanager, aws, pagerduty from ops_bot.config import BotSettings, RoutingKey, load_config from ops_bot.gitlab import hook as gitlab_hook from ops_bot.matrix import MatrixClient +from ops_bot.metrics import ( + CONFIG_LOADED_TOTAL, + EVENT_TO_SEND_SECONDS, + MESSAGES_SENT_TOTAL, + MESSAGE_SEND_FAILURES_TOTAL, + WEBHOOK_EVENTS_TOTAL, + classify_payload_error, + classify_send_failure, + source_label, +) async def get_matrix_service(request: Request) -> MatrixClient: @@ -36,8 +47,15 @@ async def matrix_main(matrix_client: MatrixClient) -> None: @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: config_fname = os.environ.get("BOT_CONFIG_FILE", "config.json") - bot_settings = load_config(config_fname) - c = MatrixClient(settings=bot_settings.matrix, join_rooms=bot_settings.get_rooms()) + try: + bot_settings = load_config(config_fname) + c = MatrixClient( + settings=bot_settings.matrix, join_rooms=bot_settings.get_rooms() + ) + except Exception: + CONFIG_LOADED_TOTAL.labels(result="failure").inc() + raise + CONFIG_LOADED_TOTAL.labels(result="success").inc() app.state.matrix_client = c app.state.bot_settings = bot_settings asyncio.create_task(matrix_main(c)) @@ -131,16 +149,20 @@ async def webhook_handler( ), matrix_client: MatrixClient = Depends(get_matrix_service), ) -> Dict[str, str]: + request_start = time.perf_counter() route = get_route(request.app.state.bot_settings, routing_key) if not route: logging.error(f"unknown routing key {routing_key}") + WEBHOOK_EVENTS_TOTAL.labels(source="unknown", result="unknown_route").inc() raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Unknown routing key" ) + source = source_label(route.hook_type) handler: Optional[Tuple[Authorizer, ParseHandler]] = handlers.get(route.hook_type) if not handler: + WEBHOOK_EVENTS_TOTAL.labels(source=source, result="handler_error").inc() raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Unknown hook type" ) @@ -153,20 +175,51 @@ async def webhook_handler( bearer_credentials=bearer_credentials, basic_credentials=basic_credentials, ): + WEBHOOK_EVENTS_TOTAL.labels(source=source, result="auth_failed").inc() raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials" ) - payload: Any = await request.json() - - messages = await parse_handler(route, payload, request=request) - for msg_plain, msg_formatted in messages: - await matrix_client.room_send( - route.room_id, - msg_plain, - message_formatted=msg_formatted, + try: + payload: Any = await request.json() + except Exception: + WEBHOOK_EVENTS_TOTAL.labels(source=source, result="invalid_payload").inc() + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid payload" ) + try: + messages = await parse_handler(route, payload, request=request) + except Exception as exc: + WEBHOOK_EVENTS_TOTAL.labels( + source=source, result=classify_payload_error(exc) + ).inc() + raise + + first_send_attempt_observed = False + for msg_plain, msg_formatted in messages: + if not first_send_attempt_observed: + EVENT_TO_SEND_SECONDS.labels(source=source).observe( + time.perf_counter() - request_start + ) + first_send_attempt_observed = True + try: + await matrix_client.room_send( + route.room_id, + msg_plain, + message_formatted=msg_formatted, + ) + except Exception as exc: + MESSAGE_SEND_FAILURES_TOTAL.labels( + source=source, reason=classify_send_failure(exc) + ).inc() + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail="Failed to send message to Matrix", + ) + MESSAGES_SENT_TOTAL.labels(source=source).inc() + + WEBHOOK_EVENTS_TOTAL.labels(source=source, result="accepted").inc() return {"status": "ok"} diff --git a/ops_bot/matrix.py b/ops_bot/matrix.py index da1a56c..9662f2d 100644 --- a/ops_bot/matrix.py +++ b/ops_bot/matrix.py @@ -10,6 +10,12 @@ from nio import AsyncClient, AsyncClientConfig, LoginResponse from pydantic import BaseModel from pydantic_settings import BaseSettings +from ops_bot.metrics import ( + MATRIX_AUTH_TOTAL, + MATRIX_SYNC_ERRORS_TOTAL, + classify_sync_error, +) + class ClientCredentials(BaseModel): homeserver: str @@ -95,7 +101,11 @@ class MatrixClient: await client.join(room) await client.joined_rooms() - await client.sync_forever(timeout=300000, full_state=True) + try: + await client.sync_forever(timeout=300000, full_state=True) + except Exception as exc: + MATRIX_SYNC_ERRORS_TOTAL.labels(reason=classify_sync_error(exc)).inc() + raise def save_credentials(self, resp: LoginResponse, homeserver: str) -> None: credentials = ClientCredentials( @@ -120,8 +130,10 @@ class MatrixClient: ) if isinstance(response, LoginResponse): + MATRIX_AUTH_TOTAL.labels(mode="fresh_login", result="success").inc() self.save_credentials(response, self.settings.homeserver) else: + MATRIX_AUTH_TOTAL.labels(mode="fresh_login", result="failure").inc() logging.error( f'Login for "{self.settings.user_id}" via homeserver="{self.settings.homeserver}"' ) @@ -129,22 +141,27 @@ class MatrixClient: sys.exit(1) async def login_with_credentials(self) -> None: - credentials = self.credential_store.read() + try: + credentials = self.credential_store.read() - self.client = AsyncClient( - homeserver=credentials.homeserver, - user=credentials.user_id, - device_id=credentials.device_id, - store_path=str(self.store_path), - config=self.client_config, - ssl=True, - ) + self.client = AsyncClient( + homeserver=credentials.homeserver, + user=credentials.user_id, + device_id=credentials.device_id, + store_path=str(self.store_path), + config=self.client_config, + ssl=True, + ) - self.client.restore_login( - user_id=credentials.user_id, - device_id=credentials.device_id, - access_token=credentials.access_token, - ) + self.client.restore_login( + user_id=credentials.user_id, + device_id=credentials.device_id, + access_token=credentials.access_token, + ) + except Exception: + MATRIX_AUTH_TOTAL.labels(mode="credential_restore", result="failure").inc() + raise + MATRIX_AUTH_TOTAL.labels(mode="credential_restore", result="success").inc() async def login(self) -> None: if self.credential_store.exists(): @@ -171,12 +188,14 @@ class MatrixClient: message_formatted, extensions=["extra"] ) - await self.client.room_send( + response = await self.client.room_send( room_id=room, message_type="m.room.message", content=content, ignore_unverified_devices=True, ) + if response.__class__.__name__.endswith("Error"): + raise RuntimeError(f"Matrix room_send failed: {response}") async def shutdown(self) -> None: if self.client is not None: diff --git a/ops_bot/metrics.py b/ops_bot/metrics.py new file mode 100644 index 0000000..4514aa1 --- /dev/null +++ b/ops_bot/metrics.py @@ -0,0 +1,97 @@ +import json +from typing import Optional + +from prometheus_client import Counter, Histogram + +WEBHOOK_EVENTS_TOTAL = Counter( + "matrix_ops_bot_webhook_events_total", + "Incoming webhook events by source and processing result.", + ["source", "result"], +) + +MESSAGES_SENT_TOTAL = Counter( + "matrix_ops_bot_messages_sent_total", + "Messages successfully sent to Matrix by source.", + ["source"], +) + +MESSAGE_SEND_FAILURES_TOTAL = Counter( + "matrix_ops_bot_message_send_failures_total", + "Failures while sending messages to Matrix by source and reason.", + ["source", "reason"], +) + +MATRIX_AUTH_TOTAL = Counter( + "matrix_ops_bot_matrix_auth_total", + "Matrix authentication and credential-restore attempts by mode and result.", + ["mode", "result"], +) + +MATRIX_SYNC_ERRORS_TOTAL = Counter( + "matrix_ops_bot_matrix_sync_errors_total", + "Matrix sync loop errors by coarse reason.", + ["reason"], +) + +EVENT_TO_SEND_SECONDS = Histogram( + "matrix_ops_bot_event_to_send_seconds", + "Time from webhook receipt to first Matrix send attempt by source.", + ["source"], +) + +CONFIG_LOADED_TOTAL = Counter( + "matrix_ops_bot_config_loaded_total", + "Bot config load outcomes at startup.", + ["result"], +) + + +def source_label(source: Optional[str]) -> str: + if source is None: + return "unknown" + normalized = source.strip().lower().replace("-", "_") + if normalized in {"gitlab", "pagerduty", "aws_sns", "alertmanager"}: + return normalized + return "unknown" + + +def classify_send_failure(exc: Exception) -> str: + msg = f"{type(exc).__name__} {exc}".lower() + if "forbidden" in msg or "403" in msg: + return "forbidden" + if "rate limit" in msg or "too many requests" in msg or "429" in msg: + return "ratelimit" + if "unknown room" in msg or "room not found" in msg: + return "unknown_room" + if ( + "timeout" in msg + or "connection" in msg + or "network" in msg + or "dns" in msg + or "refused" in msg + ): + return "network" + return "exception" + + +def classify_sync_error(exc: Exception) -> str: + msg = f"{type(exc).__name__} {exc}".lower() + if "401" in msg or "403" in msg or "unauthorized" in msg or "forbidden" in msg: + return "auth" + if ( + "timeout" in msg + or "connection" in msg + or "network" in msg + or "dns" in msg + or "refused" in msg + ): + return "network" + if "megolm" in msg or "olm" in msg or "decrypt" in msg: + return "crypto" + return "unknown" + + +def classify_payload_error(exc: Exception) -> str: + if isinstance(exc, (ValueError, TypeError, KeyError, json.JSONDecodeError)): + return "invalid_payload" + return "handler_error"