Refactor codebase to by DRY

This commit is contained in:
Abel Luck 2022-12-01 16:31:04 +00:00
parent c925079e8b
commit 83a526c533
13 changed files with 320 additions and 131 deletions

View file

@ -1,18 +1,21 @@
import json
import logging
from typing import Any, Tuple
from typing import Any, List, Tuple
from fastapi import Request
from ops_bot.common import COLOR_ALARM, COLOR_OK, COLOR_UNKNOWN
from ops_bot.config import RoutingKey
def handle_subscribe_confirm(payload: Any) -> Tuple[str, str]:
def handle_subscribe_confirm(payload: Any) -> List[Tuple[str, str]]:
message = payload.get("Message")
url = payload.get("SubscribeURL")
plain = f"{message}\n\n{url}"
return plain, plain
return [(plain, plain)]
def handle_notification(payload: Any) -> Tuple[str, str]:
def handle_notification(payload: Any) -> List[Tuple[str, str]]:
message = payload.get("Message")
subject = payload.get("Subject")
@ -20,15 +23,15 @@ def handle_notification(payload: Any) -> Tuple[str, str]:
formatted = (
f"<strong><font color={COLOR_ALARM}>{subject}</font></strong>\n<p>{message}</p>"
)
return plain, formatted
return [(plain, formatted)]
def handle_json_notification(payload: Any, body: Any) -> Tuple[str, str]:
def handle_json_notification(payload: Any, body: Any) -> List[Tuple[str, str]]:
if "AlarmName" not in body:
msg = "Received unknown json payload type over AWS SNS"
logging.info(msg)
logging.info(payload.get("Message"))
return msg, msg
return [(msg, msg)]
description = body.get("AlarmDescription")
subject = payload.get("Subject")
@ -50,10 +53,14 @@ def handle_json_notification(payload: Any, body: Any) -> Tuple[str, str]:
else:
plain += "\n{description}"
formatted += f"\n<p>{description}</p>"
return plain, formatted
return [(plain, formatted)]
def parse_sns_event(payload: Any) -> Tuple[str, str]:
async def parse_sns_event(
route: RoutingKey,
payload: Any,
request: Request,
) -> List[Tuple[str, str]]:
if payload.get("Type") == "SubscriptionConfirmation":
return handle_subscribe_confirm(payload)
elif payload.get("Type") == "UnsubscribeConfirmation":

56
ops_bot/cli.py Normal file
View file

@ -0,0 +1,56 @@
import secrets
import sys
from typing import Any
import click
from ops_bot.config import RoutingKey, load_config, save_config
@click.group()
@click.option(
"--config-file", help="the path to the config file", default="config.json"
)
@click.pass_context
def cli(ctx: Any, config_file: str) -> None:
ctx.obj = config_file
pass
@cli.command(help="Add a new routing key to the configuration file")
@click.option(
"--name", help="a friendly detailed name for the hook so you can remember it later"
)
@click.option(
"--hook-type",
help="The type of webhook to add",
type=click.Choice(["gitlab", "pagerduty", "aws-sns"], case_sensitive=False),
)
@click.option("--room-id", help="The room ID to send the messages to")
@click.pass_obj
def add_hook(config_file: str, name: str, hook_type: str, room_id: str) -> None:
settings = load_config(config_file)
path_key = secrets.token_urlsafe(30)
secret_token = secrets.token_urlsafe(30)
if name in set([key.name for key in settings.routing_keys]):
print("Error: A hook with that name already exists")
sys.exit(1)
settings.routing_keys.append(
RoutingKey(
name=name,
path_key=path_key,
secret_token=secret_token,
room_id=room_id,
hook_type=hook_type,
)
)
save_config(settings)
url = f"/hook/{path_key}"
print("Hook added successfully")
print()
print("Your webhook URL is:")
print(f"\t{url}")
print("The secret token is:")
print(f"\t{secret_token}")

47
ops_bot/config.py Normal file
View file

@ -0,0 +1,47 @@
import json
import logging
from pathlib import Path
from typing import List, Literal
from pydantic import BaseSettings
from ops_bot.matrix import MatrixClientSettings
class RoutingKey(BaseSettings):
name: str
path_key: str
secret_token: str
room_id: str
hook_type: Literal["gitlab", "pagerduty", "aws-sns"]
class BotSettings(BaseSettings):
routing_keys: List[RoutingKey]
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO"
matrix: MatrixClientSettings
class Config:
env_prefix = "BOT_"
case_sensitive = False
def get_rooms(self) -> List[str]:
return list(set([key.room_id for key in self.routing_keys]))
def config_file_exists(filename: str) -> bool:
return Path(filename).exists()
def load_config(filename: str = "config.json") -> BotSettings:
if config_file_exists(filename):
bot_settings = BotSettings.parse_file(filename)
else:
bot_settings = BotSettings(_env_file=".env", _env_file_encoding="utf-8")
logging.getLogger().setLevel(bot_settings.log_level)
return bot_settings
def save_config(settings: BotSettings, filename: str = "config.json") -> None:
with open(filename, "w") as f:
f.write(json.dumps(settings.dict(), indent=2))

View file

@ -1,12 +1,15 @@
import logging
import re
from typing import Any, List, Tuple
from typing import Any, List, Optional, Tuple
import attr
from fastapi import Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBasicCredentials
from jinja2 import TemplateNotFound
from mautrix.types import Format, MessageType, TextMessageEventContent
from mautrix.util.formatter import parse_html
from ..config import RoutingKey
from ..util.template import TemplateManager, TemplateUtil
from .types import OTHER_ENUMS, Action, EventParse # type: ignore
@ -18,7 +21,17 @@ messages = TemplateManager("gitlab", "messages")
templates = TemplateManager("gitlab", "mixins")
async def parse_event(x_gitlab_event: str, payload: Any) -> List[Tuple[str, str]]:
async def authorize(
route: RoutingKey,
request: Request,
basic_credentials: Optional[HTTPBasicCredentials],
bearer_credentials: Optional[HTTPAuthorizationCredentials],
) -> bool:
provided: Optional[str] = request.headers.get("x-gitlab-token")
return provided == route.secret_token
async def handle_event(x_gitlab_event: str, payload: Any) -> List[Tuple[str, str]]:
evt = EventParse[x_gitlab_event].deserialize(payload)
try:
tpl = messages[evt.template_name]
@ -68,3 +81,10 @@ async def parse_event(x_gitlab_event: str, payload: Any) -> List[Tuple[str, str]
}
msgs.append((content.body, content.formatted_body))
return msgs
async def parse_event(
route: RoutingKey, payload: Any, request: Request
) -> List[Tuple[str, str]]:
x_gitlab_event = request.headers.get("x-gitlab-event")
return await handle_event(x_gitlab_event, payload)

View file

@ -1,36 +1,28 @@
import asyncio
import json
import logging
from pathlib import Path
from typing import Any, Dict, Literal, Optional, Tuple, cast
from typing import Any, Dict, List, Optional, Protocol, Tuple, cast
import uvicorn
from dotenv import load_dotenv
from fastapi import Depends, FastAPI, Header, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from pydantic import BaseSettings
from fastapi import Depends, FastAPI, HTTPException, Request, status
from fastapi.security import (
HTTPAuthorizationCredentials,
HTTPBasic,
HTTPBasicCredentials,
HTTPBearer,
)
from ops_bot import aws, pagerduty
from ops_bot.config import BotSettings, RoutingKey, load_config
from ops_bot.gitlab import hook as gitlab_hook
from ops_bot.matrix import MatrixClient, MatrixClientSettings
from ops_bot.matrix import MatrixClient
load_dotenv()
class BotSettings(BaseSettings):
bearer_token: str
routing_keys: Dict[str, str]
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO"
matrix: MatrixClientSettings
class Config:
env_prefix = "BOT_"
secrets_dir = "/run/secrets"
case_sensitive = False
app = FastAPI()
security = HTTPBearer()
bearer_security = HTTPBearer(auto_error=False)
basic_security = HTTPBasic(auto_error=False)
async def get_matrix_service(request: Request) -> MatrixClient:
@ -46,14 +38,8 @@ async def matrix_main(matrix_client: MatrixClient) -> None:
@app.on_event("startup")
async def startup_event() -> None:
# if "config.json" exists read it
if Path("config.json").exists():
bot_settings = BotSettings.parse_file("config.json")
else:
bot_settings = BotSettings(_env_file=".env", _env_file_encoding="utf-8")
logging.getLogger().setLevel(bot_settings.log_level)
bot_settings.matrix.join_rooms = list(bot_settings.routing_keys.values())
c = MatrixClient(settings=bot_settings.matrix)
bot_settings = load_config()
c = MatrixClient(settings=bot_settings.matrix, join_rooms=bot_settings.get_rooms())
app.state.matrix_client = c
app.state.bot_settings = bot_settings
asyncio.create_task(matrix_main(c))
@ -69,87 +55,111 @@ async def root() -> Dict[str, str]:
return {"message": "Hello World"}
def authorize(
request: Request, credentials: HTTPAuthorizationCredentials = Depends(security)
async def bearer_token_authorizer(
route: RoutingKey,
request: Request,
basic_credentials: Optional[HTTPBasicCredentials],
bearer_credentials: Optional[HTTPAuthorizationCredentials],
) -> bool:
bearer_token: Optional[str] = route.secret_token
return (
bearer_credentials is not None
and bearer_credentials.credentials == bearer_token
)
async def nop_authorizer(
route: RoutingKey,
request: Request,
basic_credentials: Optional[HTTPBasicCredentials],
bearer_credentials: Optional[HTTPAuthorizationCredentials],
) -> bool:
bearer_token = request.app.state.bot_settings.bearer_token
if credentials.credentials != bearer_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect bearer token",
headers={"WWW-Authenticate": "Bearer"},
)
return True
def get_destination(bot_settings: BotSettings, routing_key: str) -> Optional[str]:
return bot_settings.routing_keys.get(routing_key, None)
def get_route(bot_settings: BotSettings, path_key: str) -> Optional[RoutingKey]:
# find path_key in bot_settings.routing_keys
for route in bot_settings.routing_keys:
if route.path_key == path_key:
return route
return None
async def receive_helper(request: Request) -> Tuple[str, Any]:
payload: Any = await request.json()
routing_key = request.path_params["routing_key"]
room_id = get_destination(request.app.state.bot_settings, routing_key)
if room_id is None:
class Authorizer(Protocol):
async def __call__(
self,
route: RoutingKey,
request: Request,
basic_credentials: Optional[HTTPBasicCredentials],
bearer_credentials: Optional[HTTPAuthorizationCredentials],
) -> bool:
...
class ParseHandler(Protocol):
async def __call__(
self,
route: RoutingKey,
payload: Any,
request: Request,
) -> List[Tuple[str, str]]:
...
handlers: Dict[str, Tuple[Authorizer, ParseHandler]] = {
"gitlab": (gitlab_hook.authorize, gitlab_hook.parse_event),
"pagerduty": (bearer_token_authorizer, pagerduty.parse_pagerduty_event),
"aws-sns": (nop_authorizer, aws.parse_sns_event),
}
@app.post("/hook/{routing_key}")
async def webhook_handler(
request: Request,
routing_key: str,
basic_credentials: Optional[HTTPBasicCredentials] = Depends(basic_security),
bearer_credentials: Optional[HTTPAuthorizationCredentials] = Depends(
bearer_security
),
matrix_client: MatrixClient = Depends(get_matrix_service),
) -> Dict[str, str]:
route = get_route(request.app.state.bot_settings, routing_key)
if not route:
logging.error(f"unknown routing key {routing_key}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Unknown routing key"
)
payload_str = json.dumps(payload, sort_keys=True, indent=2)
logging.info(f"received payload: \n {payload_str}")
return room_id, payload
@app.post("/hook/pagerduty/{routing_key}")
async def pagerduty_hook(
request: Request,
matrix_client: MatrixClient = Depends(get_matrix_service),
auth: bool = Depends(authorize),
) -> Dict[str, str]:
room_id, payload = await receive_helper(request)
msg_plain, msg_formatted = pagerduty.parse_pagerduty_event(payload)
await matrix_client.room_send(
room_id,
msg_plain,
message_formatted=msg_formatted,
)
return {"message": msg_plain, "message_formatted": msg_formatted}
@app.post("/hook/aws-sns/{routing_key}")
async def aws_sns_hook(
request: Request, matrix_client: MatrixClient = Depends(get_matrix_service)
) -> Dict[str, str]:
room_id, payload = await receive_helper(request)
msg_plain, msg_formatted = aws.parse_sns_event(payload)
await matrix_client.room_send(
room_id,
msg_plain,
message_formatted=msg_formatted,
)
return {"message": msg_plain, "message_formatted": msg_formatted}
@app.post("/hook/gitlab/{routing_key}")
async def gitlab_webhook(
request: Request,
x_gitlab_token: str = Header(default=""),
x_gitlab_event: str = Header(default=""),
matrix_client: MatrixClient = Depends(get_matrix_service),
) -> Dict[str, str]:
bearer_token = request.app.state.bot_settings.bearer_token
if x_gitlab_token != bearer_token:
handler: Optional[Tuple[Authorizer, ParseHandler]] = handlers.get(route.hook_type)
if not handler:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect X-Gitlab-Token"
status_code=status.HTTP_404_NOT_FOUND, detail="Unknown hook type"
)
room_id, payload = await receive_helper(request)
messages = await gitlab_hook.parse_event(x_gitlab_event, payload)
authorizer, parse_handler = handler
if not await authorizer(
route,
request=request,
bearer_credentials=bearer_credentials,
basic_credentials=basic_credentials,
):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials"
)
payload: Any = await request.json()
messages = await parse_handler(route, payload, request=request)
for msg_plain, msg_formatted in messages:
await matrix_client.room_send(
room_id,
route.room_id,
msg_plain,
message_formatted=msg_formatted,
)
return {"status": "ok"}

View file

@ -46,19 +46,18 @@ class MatrixClientSettings(BaseSettings):
password: str
device_name: str
store_path: str
join_rooms: Optional[List[str]]
verify_ssl: Optional[bool] = True
class Config:
env_prefix = "MATRIX_"
secrets_dir = "/run/secrets"
case_sensitive = False
class MatrixClient:
def __init__(self, settings: MatrixClientSettings):
def __init__(self, settings: MatrixClientSettings, join_rooms: List[str]):
self.settings = settings
self.store_path = pathlib.Path(settings.store_path)
self.join_rooms = join_rooms
self.credential_store = LocalCredentialStore(
self.store_path.joinpath("credentials.json")
)
@ -79,9 +78,8 @@ class MatrixClient:
if self.client.should_upload_keys:
await self.client.keys_upload()
if self.settings.join_rooms:
for room in self.settings.join_rooms:
await self.client.join(room)
for room in self.join_rooms:
await self.client.join(room)
await self.client.joined_rooms()
await self.client.sync_forever(timeout=300000, full_state=True)

View file

@ -1,7 +1,10 @@
import json
from typing import Any, Tuple
from typing import Any, List, Tuple
from fastapi import Request
from ops_bot.common import COLOR_ALARM, COLOR_UNKNOWN
from ops_bot.config import RoutingKey
def urgency_color(urgency: str) -> str:
@ -11,7 +14,11 @@ def urgency_color(urgency: str) -> str:
return COLOR_UNKNOWN
def parse_pagerduty_event(payload: Any) -> Tuple[str, str]:
async def parse_pagerduty_event(
route: RoutingKey,
payload: Any,
request: Request,
) -> List[Tuple[str, str]]:
"""
Parses a pagerduty webhook v3 event into a human readable message.
Returns a tuple where the first item is plain text, and the second item is matrix html formatted text
@ -37,12 +44,14 @@ def parse_pagerduty_event(payload: Any) -> Tuple[str, str]:
else:
color = urgency_color(urgency)
formatted = f"<strong><font color={color}>{header_str}</font></strong> on {service_name}: [{title}]({url})"
return plain, formatted
return [(plain, formatted)]
payload_str = json.dumps(payload, sort_keys=True, indent=2)
return (
"unhandled",
f"""**unhandled pager duty event** (this may or may not be a critical problem, please look carefully)
return [
(
"unhandled",
f"""**unhandled pager duty event** (this may or may not be a critical problem, please look carefully)
<pre><code class="language-json">{payload_str}</code></pre>
""",
)
)
]