matrix-ops-bot/ops_bot/aws.py

214 lines
6.7 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
from typing import Any, List, Tuple
from fastapi import Request
from ops_bot.common import COLOR_ALARM, COLOR_OK, COLOR_UNKNOWN, COLOR_WARNING
from ops_bot.config import RoutingKey
def handle_subscribe_confirm(payload: Any) -> List[Tuple[str, str]]:
message = payload.get("Message")
url = payload.get("SubscribeURL")
plain = f"{message}\n\n{url}"
return [(plain, plain)]
def handle_notification(payload: Any) -> List[Tuple[str, str]]:
message = payload.get("Message")
subject = payload.get("Subject")
plain = f"{subject}\n{message}"
formatted = (
f"<strong><font color={COLOR_ALARM}>{subject}</font></strong>\n<p>{message}</p>"
)
return [(plain, formatted)]
def handle_json_notification(payload: Any, body: Any) -> List[Tuple[str, str]]:
if "AlarmName" not in body:
payload_str = json.dumps(body, indent=2)
msg = "Received unknown json payload type over AWS SNS"
msg += f"""\n<br/>
```json
{payload_str}
```"""
logging.info(msg)
logging.info(payload.get("Message"))
return [(msg, msg)]
description = body.get("AlarmDescription")
subject = payload.get("Subject")
state_value = body.get("NewStateValue", "unknown")
if state_value == "ALARM":
color = COLOR_ALARM
elif state_value == "OK":
color = COLOR_OK
else:
color = COLOR_UNKNOWN
plain = f"{subject}"
formatted = f"<strong><font color={color}>{subject}</font></strong>\n"
if state_value == "OK":
plain += "\n(this alarm has been resolved!)"
formatted += "\n<p>(this alarm has been resolved!)</p>"
else:
plain += "\n{description}"
formatted += f"\n<p>{description}</p>"
return [(plain, formatted)]
def handle_cloudtrail_sts(payload: Any) -> List[Tuple[str, str]]:
region = payload["region"]
# event_type = payload["detail"]["eventType"]
event_name = payload["detail"]["eventName"]
event_time = payload["detail"]["eventTime"]
account_id = payload["detail"]["recipientAccountId"]
user_type = payload["detail"]["userIdentity"]["type"]
user = "Unknown user"
if user_type == "SAMLUser":
user = payload["detail"]["userIdentity"]["userName"]
assumed_role = None
if (
"responseElements" in payload["detail"]
and "assumedRoleUser" in payload["detail"]["responseElements"]
):
assumed_role = payload["detail"]["responseElements"]["assumedRoleUser"]["arn"]
color = COLOR_ALARM
if event_name == "AssumeRoleWithSAML":
title = f"AWS SAML Sign detected by user `{user}`."
else:
title = event_name
subject = event_name
formatted = [
x
for x in [
f"<font color={color}>**🚨 ALERT[{subject}]** </font>: {title}",
f"- **Region**: {region}",
(f"- **Assumed Role**: {assumed_role}" if assumed_role else None),
f"- **Event Time**: {event_time}",
f"- **Account ID**: {account_id}",
]
if x is not None
]
plain = title
return [(plain, "<br/>".join(formatted))]
def handle_cloudtrail_signin(payload: Any) -> List[Tuple[str, str]]:
region = payload["region"]
event_type = payload["detail"]["eventType"]
event_time = payload["detail"]["eventTime"]
account_id = None
if "accountId" in payload["detail"]["userIdentity"]:
account_id = payload["detail"]["userIdentity"]["accountId"]
else:
account_id = payload["detail"]["recipientAccountId"]
user_type = payload["detail"]["userIdentity"]["type"]
if user_type == "IAMUser":
user = payload["detail"]["userIdentity"]["userName"]
elif user_type == "Root":
user = "Root User"
elif user_type == "AssumedRole":
user = payload["detail"]["userIdentity"]["principalId"]
else:
user = "Unknown user"
mfa_used = "unknown"
if (
"additionalEventData" in payload["detail"]
and "MFAUsed" in payload["detail"]["additionalEventData"]
):
mfa_used = payload["detail"]["additionalEventData"]["MFAUsed"]
was_failure = False
if (
"responseElements" in payload["detail"]
and "ConsoleLogin" in payload["detail"]["responseElements"]
):
was_failure = payload["detail"]["responseElements"]["ConsoleLogin"] == "Failure"
error_message = None
if "errorMessage" in payload["detail"]:
error_message = payload["detail"]["errorMessage"]
if was_failure:
title = f"Failed AWS Console Sign attempt by user `{user}`."
color = COLOR_WARNING
else:
title = f"AWS Console Sign detected by user `{user}`."
color = COLOR_ALARM
formatted = [
x
for x in [
f"<font color={color}>**🚨 ALERT[{event_type}]** </font>: {title}",
f"- **Region**: {region}",
f"- **MFA Used**: {mfa_used}",
(f"- **Error Message**: {error_message}" if error_message else None),
f"- **Event Time**: {event_time}",
f"- **Account ID**: {account_id}",
]
if x is not None
]
plain = title
return [(plain, "<br/>".join(formatted))]
def handle_cloudtrail_generic(payload: Any) -> List[Tuple[str, str]]:
region = payload["region"]
event_type = payload["detail"]["eventType"]
event_time = payload["detail"]["eventTime"]
account_id = payload["detail"]["recipientAccountId"]
detail = payload["detail-type"]
plain = f"{detail}"
formatted = [
f"<font color={COLOR_UNKNOWN}>**⚠CLOUDTRAIL EVENT[{event_type}]**</font>: {detail}",
f"**Region**: {region}",
f"**Event Time**: {event_time}",
f"**Account ID**: {account_id}",
]
return [(plain, "<br/>".join(formatted))]
async def parse_sns_event(
route: RoutingKey,
payload: Any,
request: Request,
) -> List[Tuple[str, str]]:
if payload.get("Type") == "SubscriptionConfirmation":
return handle_subscribe_confirm(payload)
elif payload.get("Type") == "UnsubscribeConfirmation":
return handle_subscribe_confirm(payload)
elif payload.get("Type") == "Notification":
try:
body = json.loads(payload.get("Message"))
if "source" in body:
source = body["source"]
if source == "aws.signin":
return handle_cloudtrail_signin(body)
if source == "aws.sts":
return handle_cloudtrail_sts(body)
else:
return handle_cloudtrail_generic(body)
return handle_json_notification(payload, body)
except Exception:
return handle_notification(payload)
raise Exception("Unnown SNS payload type")