import asyncio import json import logging from typing import Any, Dict, Literal, Optional, cast import uvicorn from fastapi import Depends, FastAPI, HTTPException, Request, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from pydantic import BaseSettings from ops_bot import aws, pagerduty from ops_bot.matrix import MatrixClient, MatrixClientSettings class BotSettings(BaseSettings): bearer_token: str routing_keys: Dict[str, str] log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO" class Config: env_prefix = "BOT_" secrets_dir = "/run/secrets" case_sensitive = False app = FastAPI() security = HTTPBearer() async def get_matrix_service(request: Request) -> MatrixClient: """A helper to fetch the matrix client from the app state""" return cast(MatrixClient, request.app.state.matrix_client) async def matrix_main(matrix_client: MatrixClient) -> None: """Execs the matrix client asyncio task""" workers = [asyncio.create_task(matrix_client.start())] await asyncio.gather(*workers) @app.on_event("startup") async def startup_event() -> None: bot_settings = BotSettings(_env_file=".env", _env_file_encoding="utf-8") logging.getLogger().setLevel(bot_settings.log_level) matrix_settings = MatrixClientSettings(_env_file=".env", _env_file_encoding="utf-8") matrix_settings.join_rooms = list(bot_settings.routing_keys.values()) c = MatrixClient(settings=matrix_settings) app.state.matrix_client = c app.state.bot_settings = bot_settings asyncio.create_task(matrix_main(c)) @app.on_event("shutdown") async def shutdown_event() -> None: await app.state.matrix_client.shutdown() @app.get("/") async def root() -> Dict[str, str]: return {"message": "Hello World"} def authorize( request: Request, credentials: HTTPAuthorizationCredentials = Depends(security) ) -> bool: bearer_token = request.app.state.bot_settings.bearer_token if credentials.credentials != bearer_token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect bearer token", headers={"WWW-Authenticate": "Bearer"}, ) return True def get_destination(bot_settings: BotSettings, routing_key: str) -> Optional[str]: return bot_settings.routing_keys.get(routing_key, None) async def receive_helper(request: Request): payload: Any = await request.json() routing_key = request.path_params["routing_key"] room_id = get_destination(request.app.state.bot_settings, routing_key) if room_id is None: logging.error(f"unknown routing key {routing_key}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Unknown routing key" ) payload_str = json.dumps(payload, sort_keys=True, indent=2) logging.info(f"received payload: \n {payload_str}") return payload @app.post("/hook/pagerduty/{routing_key}") async def pagerduty_hook( request: Request, matrix_client: MatrixClient = Depends(get_matrix_service), auth: bool = Depends(authorize), ) -> Dict[str, str]: room_id, payload = await receive_helper(request) msg_plain, msg_formatted = pagerduty.parse_pagerduty_event(payload) await matrix_client.room_send( room_id, msg_plain, message_formatted=msg_formatted, ) return {"message": msg_plain, "message_formatted": msg_formatted} @app.post("/hook/aws-sns/{routing_key}") async def aws_sns_hook( request: Request, matrix_client: MatrixClient = Depends(get_matrix_service), auth: bool = Depends(authorize), ) -> Dict[str, str]: room_id, payload = await receive_helper(request) msg_plain, msg_formatted = aws.parse_sns_event(payload) await matrix_client.room_send( room_id, msg_plain, message_formatted=msg_formatted, ) return {"message": msg_plain, "message_formatted": msg_formatted} def start_dev() -> None: uvicorn.run("ops_bot.main:app", port=1111, host="127.0.0.1", reload=True) def start() -> None: uvicorn.run("ops_bot.main:app", port=1111, host="0.0.0.0") # nosec B104 if __name__ == "__main__": start()