Support aws.sts sns notifications

This commit is contained in:
Abel Luck 2025-01-31 16:52:25 +01:00
parent 7c06e91ea8
commit c13d5fc536
2 changed files with 140 additions and 9 deletions

View file

@ -28,7 +28,7 @@ def handle_notification(payload: Any) -> List[Tuple[str, str]]:
def handle_json_notification(payload: Any, body: Any) -> List[Tuple[str, str]]:
if "AlarmName" not in body:
payload_str = payload.get("Message")
payload_str = json.dumps(body, indent=2)
msg = "Received unknown json payload type over AWS SNS"
msg += f"""\n<br/>
```json
@ -61,6 +61,49 @@ def handle_json_notification(payload: Any, body: Any) -> List[Tuple[str, str]]:
return [(plain, formatted)]
def handle_cloudtrail_sts(payload: Any) -> List[Tuple[str, str]]:
region = payload["region"]
# event_type = payload["detail"]["eventType"]
event_name = payload["detail"]["eventName"]
event_time = payload["detail"]["eventTime"]
account_id = payload["detail"]["recipientAccountId"]
user_type = payload["detail"]["userIdentity"]["type"]
user = "Unknown user"
if user_type == "SAMLUser":
user = payload["detail"]["userIdentity"]["userName"]
assumed_role = None
if (
"responseElements" in payload["detail"]
and "assumedRoleUser" in payload["detail"]["responseElements"]
):
assumed_role = payload["detail"]["responseElements"]["assumedRoleUser"]["arn"]
color = COLOR_ALARM
if event_name == "AssumeRoleWithSAML":
title = f"AWS SAML Sign detected by user `{user}`."
else:
title = event_name
subject = event_name
formatted = [
x
for x in [
f"<font color={color}>**🚨 ALERT[{subject}]** </font>: {title}",
f"- **Region**: {region}",
f"- **Assumed Role**: {assumed_role}",
f"- **Event Time**: {event_time}",
f"- **Account ID**: {account_id}",
]
if x is not None
]
plain = title
return [(plain, "<br/>".join(formatted))]
def handle_cloudtrail_signin(payload: Any) -> List[Tuple[str, str]]:
region = payload["region"]
event_type = payload["detail"]["eventType"]
@ -156,14 +199,16 @@ async def parse_sns_event(
elif payload.get("Type") == "Notification":
try:
body = json.loads(payload.get("Message"))
if "source" in body:
source = body["source"]
if source == "aws.signin":
return handle_cloudtrail_signin(body)
if source == "aws.sts":
return handle_cloudtrail_sts(body)
else:
return handle_cloudtrail_generic(body)
return handle_json_notification(payload, body)
except Exception:
return handle_notification(payload)
elif "source" in payload:
source = payload["source"]
if source == "aws.signin":
return handle_cloudtrail_signin(payload)
else:
return handle_cloudtrail_generic(payload)
raise Exception("Unnown SNS payload type")