matrix-ops-bot/ops_bot/aws.py

56 lines
1.7 KiB
Python

import json
import logging
from typing import Any, Tuple
from ops_bot.common import urgency_color
def handle_subscribe_confirm(payload: Any) -> Tuple[str, str]:
message = payload.get("Message")
url = payload.get("SubscribeURL")
plain = f"{message}\n\n{url}"
return plain, plain
def handle_notification(payload: Any) -> Tuple[str, str]:
message = payload.get("Message")
subject = payload.get("Subject")
plain = f"{subject}\n{message}"
color = urgency_color("high")
formatted = (
f"<strong><font color={color}>{subject}</font></strong>\n<p>{message}</p>"
)
return plain, formatted
def handle_json_notification(payload: Any, body: Any) -> Tuple[str, str]:
if "AlarmName" not in body:
msg = "Received unknown json payload type over AWS SNS"
logging.info(msg)
logging.info(payload.get("Message"))
return msg, msg
description = body.get("AlarmDescription")
subject = payload.get("Subject")
plain = f"{subject}\n{description}"
color = urgency_color("high")
formatted = (
f"<strong><font color={color}>{subject}</font></strong>\n<p>{description}</p>"
)
return plain, formatted
def parse_sns_event(payload: Any) -> Tuple[str, str]:
if payload.get("Type") == "SubscriptionConfirmation":
return handle_subscribe_confirm(payload)
elif payload.get("Type") == "UnsubscribeConfirmation":
return handle_subscribe_confirm(payload)
elif payload.get("Type") == "Notification":
try:
body = json.loads(payload.get("Message"))
return handle_json_notification(payload, body)
except Exception:
return handle_notification(payload)
raise Exception("Unnown SNS payload type")