matrix-ops-bot/ops_bot/aws.py

57 lines
1.7 KiB
Python
Raw Normal View History

2022-11-30 15:52:04 +00:00
import json
import logging
2022-11-30 15:21:09 +00:00
from typing import Any, Tuple
from ops_bot.common import urgency_color
def handle_subscribe_confirm(payload: Any) -> Tuple[str, str]:
message = payload.get("Message")
url = payload.get("SubscribeURL")
plain = f"{message}\n\n{url}"
return plain, plain
def handle_notification(payload: Any) -> Tuple[str, str]:
message = payload.get("Message")
subject = payload.get("Subject")
plain = f"{subject}\n{message}"
color = urgency_color("high")
formatted = (
f"<strong><font color={color}>{subject}</font></strong>\n<p>{message}</p>"
)
return plain, formatted
2022-11-30 15:52:04 +00:00
def handle_json_notification(payload: Any, body: Any) -> Tuple[str, str]:
if "AlarmName" not in body:
msg = "Received unknown json payload type over AWS SNS"
logging.info(msg)
logging.info(payload.get("Message"))
return msg, msg
description = body.get("AlarmDescription")
subject = payload.get("Subject")
plain = f"{subject}\n{description}"
color = urgency_color("high")
formatted = (
f"<strong><font color={color}>{subject}</font></strong>\n<p>{description}</p>"
)
return plain, formatted
2022-11-30 15:21:09 +00:00
def parse_sns_event(payload: Any) -> Tuple[str, str]:
if payload.get("Type") == "SubscriptionConfirmation":
return handle_subscribe_confirm(payload)
elif payload.get("Type") == "UnsubscribeConfirmation":
return handle_subscribe_confirm(payload)
elif payload.get("Type") == "Notification":
2022-11-30 15:52:04 +00:00
try:
body = json.loads(payload.get("Message"))
return handle_json_notification(payload, body)
except Exception:
return handle_notification(payload)
2022-11-30 15:21:09 +00:00
raise Exception("Unnown SNS payload type")