matrix-ops-bot/ops_bot/main.py

109 lines
3.2 KiB
Python
Raw Normal View History

import asyncio
from typing import Any, Dict, Optional, cast
import uvicorn
from fastapi import Depends, FastAPI, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from pydantic import BaseSettings
from ops_bot import pagerduty
from ops_bot.matrix import MatrixClient, MatrixClientSettings
class BotSettings(BaseSettings):
bearer_token: str
routing_keys: Dict[str, str]
class Config:
env_prefix = "BOT_"
2022-07-22 12:53:34 +00:00
secrets_dir = "/run/secrets"
case_sensitive = False
app = FastAPI()
security = HTTPBearer()
async def get_matrix_service(request: Request) -> MatrixClient:
"""A helper to fetch the matrix client from the app state"""
return cast(MatrixClient, request.app.state.matrix_client)
async def matrix_main(matrix_client: MatrixClient) -> None:
"""Execs the matrix client asyncio task"""
workers = [asyncio.create_task(matrix_client.start())]
await asyncio.gather(*workers)
@app.on_event("startup")
async def startup_event() -> None:
bot_settings = BotSettings(_env_file=".env", _env_file_encoding="utf-8")
matrix_settings = MatrixClientSettings(_env_file=".env", _env_file_encoding="utf-8")
matrix_settings.join_rooms = list(bot_settings.routing_keys.values())
c = MatrixClient(settings=matrix_settings)
app.state.matrix_client = c
app.state.bot_settings = bot_settings
asyncio.create_task(matrix_main(c))
@app.on_event("shutdown")
async def shutdown_event() -> None:
await app.state.matrix_client.shutdown()
@app.get("/")
async def root() -> Dict[str, str]:
return {"message": "Hello World"}
def authorize(
request: Request, credentials: HTTPAuthorizationCredentials = Depends(security)
) -> bool:
bearer_token = request.app.state.bot_settings.bearer_token
if credentials.credentials != bearer_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect bearer token",
headers={"WWW-Authenticate": "Bearer"},
)
return True
def get_destination(bot_settings: BotSettings, routing_key: str) -> Optional[str]:
return bot_settings.routing_keys.get(routing_key, None)
@app.post("/hook/pagerduty/{routing_key}")
async def pagerduty_hook(
request: Request,
matrix_client: MatrixClient = Depends(get_matrix_service),
auth: bool = Depends(authorize),
) -> Dict[str, str]:
payload: Any = await request.json()
room_id = get_destination(
request.app.state.bot_settings, request.path_params["routing_key"]
)
if room_id is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Unknown routing key"
)
msg_plain, msg_formatted = pagerduty.parse_pagerduty_event(payload)
await matrix_client.room_send(
room_id,
msg_plain,
message_formatted=msg_formatted,
)
return {"message": msg_plain, "message_formatted": msg_formatted}
def start_dev() -> None:
uvicorn.run("ops_bot.main:app", port=1111, host="127.0.0.1", reload=True)
def start() -> None:
uvicorn.run("ops_bot.main:app", port=1111, host="0.0.0.0") # nosec B104
2022-07-22 12:53:34 +00:00
if __name__ == "__main__":
start()