2022-11-30 15:52:04 +00:00
|
|
|
|
import json
|
|
|
|
|
|
import logging
|
2022-12-01 16:31:04 +00:00
|
|
|
|
from typing import Any, List, Tuple
|
|
|
|
|
|
|
|
|
|
|
|
from fastapi import Request
|
2022-11-30 15:21:09 +00:00
|
|
|
|
|
2025-01-31 16:25:25 +01:00
|
|
|
|
from ops_bot.common import COLOR_ALARM, COLOR_OK, COLOR_UNKNOWN, COLOR_WARNING
|
2022-12-01 16:31:04 +00:00
|
|
|
|
from ops_bot.config import RoutingKey
|
2022-11-30 15:21:09 +00:00
|
|
|
|
|
|
|
|
|
|
|
2022-12-01 16:31:04 +00:00
|
|
|
|
def handle_subscribe_confirm(payload: Any) -> List[Tuple[str, str]]:
|
2022-11-30 15:21:09 +00:00
|
|
|
|
message = payload.get("Message")
|
|
|
|
|
|
url = payload.get("SubscribeURL")
|
|
|
|
|
|
plain = f"{message}\n\n{url}"
|
2022-12-01 16:31:04 +00:00
|
|
|
|
return [(plain, plain)]
|
2022-11-30 15:21:09 +00:00
|
|
|
|
|
|
|
|
|
|
|
2022-12-01 16:31:04 +00:00
|
|
|
|
def handle_notification(payload: Any) -> List[Tuple[str, str]]:
|
2022-11-30 15:21:09 +00:00
|
|
|
|
message = payload.get("Message")
|
|
|
|
|
|
subject = payload.get("Subject")
|
|
|
|
|
|
|
|
|
|
|
|
plain = f"{subject}\n{message}"
|
|
|
|
|
|
formatted = (
|
2022-11-30 16:05:39 +00:00
|
|
|
|
f"<strong><font color={COLOR_ALARM}>{subject}</font></strong>\n<p>{message}</p>"
|
2022-11-30 15:21:09 +00:00
|
|
|
|
)
|
2022-12-01 16:31:04 +00:00
|
|
|
|
return [(plain, formatted)]
|
2022-11-30 15:21:09 +00:00
|
|
|
|
|
|
|
|
|
|
|
2022-12-01 16:31:04 +00:00
|
|
|
|
def handle_json_notification(payload: Any, body: Any) -> List[Tuple[str, str]]:
|
2022-11-30 15:52:04 +00:00
|
|
|
|
if "AlarmName" not in body:
|
2025-01-31 16:52:25 +01:00
|
|
|
|
payload_str = json.dumps(body, indent=2)
|
2022-11-30 15:52:04 +00:00
|
|
|
|
msg = "Received unknown json payload type over AWS SNS"
|
2025-01-31 16:25:25 +01:00
|
|
|
|
msg += f"""\n<br/>
|
|
|
|
|
|
```json
|
|
|
|
|
|
{payload_str}
|
|
|
|
|
|
```"""
|
2022-11-30 15:52:04 +00:00
|
|
|
|
logging.info(msg)
|
|
|
|
|
|
logging.info(payload.get("Message"))
|
2022-12-01 16:31:04 +00:00
|
|
|
|
return [(msg, msg)]
|
2022-11-30 15:52:04 +00:00
|
|
|
|
|
|
|
|
|
|
description = body.get("AlarmDescription")
|
|
|
|
|
|
subject = payload.get("Subject")
|
2022-11-30 16:30:29 +00:00
|
|
|
|
state_value = body.get("NewStateValue", "unknown")
|
2022-11-30 15:52:04 +00:00
|
|
|
|
|
2022-11-30 16:05:39 +00:00
|
|
|
|
if state_value == "ALARM":
|
|
|
|
|
|
color = COLOR_ALARM
|
|
|
|
|
|
elif state_value == "OK":
|
|
|
|
|
|
color = COLOR_OK
|
|
|
|
|
|
else:
|
|
|
|
|
|
color = COLOR_UNKNOWN
|
2022-11-30 16:11:09 +00:00
|
|
|
|
|
|
|
|
|
|
plain = f"{subject}"
|
|
|
|
|
|
formatted = f"<strong><font color={color}>{subject}</font></strong>\n"
|
|
|
|
|
|
|
|
|
|
|
|
if state_value == "OK":
|
|
|
|
|
|
plain += "\n(this alarm has been resolved!)"
|
|
|
|
|
|
formatted += "\n<p>(this alarm has been resolved!)</p>"
|
|
|
|
|
|
else:
|
|
|
|
|
|
plain += "\n{description}"
|
|
|
|
|
|
formatted += f"\n<p>{description}</p>"
|
2022-12-01 16:31:04 +00:00
|
|
|
|
return [(plain, formatted)]
|
2022-11-30 15:52:04 +00:00
|
|
|
|
|
|
|
|
|
|
|
2025-01-31 16:52:25 +01:00
|
|
|
|
def handle_cloudtrail_sts(payload: Any) -> List[Tuple[str, str]]:
|
|
|
|
|
|
region = payload["region"]
|
|
|
|
|
|
# event_type = payload["detail"]["eventType"]
|
|
|
|
|
|
event_name = payload["detail"]["eventName"]
|
|
|
|
|
|
event_time = payload["detail"]["eventTime"]
|
|
|
|
|
|
account_id = payload["detail"]["recipientAccountId"]
|
|
|
|
|
|
user_type = payload["detail"]["userIdentity"]["type"]
|
|
|
|
|
|
user = "Unknown user"
|
|
|
|
|
|
if user_type == "SAMLUser":
|
|
|
|
|
|
user = payload["detail"]["userIdentity"]["userName"]
|
|
|
|
|
|
|
|
|
|
|
|
assumed_role = None
|
|
|
|
|
|
if (
|
|
|
|
|
|
"responseElements" in payload["detail"]
|
|
|
|
|
|
and "assumedRoleUser" in payload["detail"]["responseElements"]
|
|
|
|
|
|
):
|
|
|
|
|
|
assumed_role = payload["detail"]["responseElements"]["assumedRoleUser"]["arn"]
|
|
|
|
|
|
|
|
|
|
|
|
color = COLOR_ALARM
|
|
|
|
|
|
if event_name == "AssumeRoleWithSAML":
|
|
|
|
|
|
title = f"AWS SAML Sign detected by user `{user}`."
|
|
|
|
|
|
else:
|
|
|
|
|
|
title = event_name
|
|
|
|
|
|
|
|
|
|
|
|
subject = event_name
|
|
|
|
|
|
|
|
|
|
|
|
formatted = [
|
|
|
|
|
|
x
|
|
|
|
|
|
for x in [
|
|
|
|
|
|
f"<font color={color}>**🚨 ALERT[{subject}]** </font>: {title}",
|
|
|
|
|
|
f"- **Region**: {region}",
|
2026-03-05 16:03:34 +01:00
|
|
|
|
(f"- **Assumed Role**: {assumed_role}" if assumed_role else None),
|
2025-01-31 16:52:25 +01:00
|
|
|
|
f"- **Event Time**: {event_time}",
|
|
|
|
|
|
f"- **Account ID**: {account_id}",
|
|
|
|
|
|
]
|
|
|
|
|
|
if x is not None
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
plain = title
|
|
|
|
|
|
|
|
|
|
|
|
return [(plain, "<br/>".join(formatted))]
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-01-31 16:25:25 +01:00
|
|
|
|
def handle_cloudtrail_signin(payload: Any) -> List[Tuple[str, str]]:
|
|
|
|
|
|
region = payload["region"]
|
|
|
|
|
|
event_type = payload["detail"]["eventType"]
|
|
|
|
|
|
event_time = payload["detail"]["eventTime"]
|
|
|
|
|
|
|
|
|
|
|
|
account_id = None
|
|
|
|
|
|
if "accountId" in payload["detail"]["userIdentity"]:
|
|
|
|
|
|
account_id = payload["detail"]["userIdentity"]["accountId"]
|
|
|
|
|
|
else:
|
|
|
|
|
|
account_id = payload["detail"]["recipientAccountId"]
|
|
|
|
|
|
|
|
|
|
|
|
user_type = payload["detail"]["userIdentity"]["type"]
|
|
|
|
|
|
if user_type == "IAMUser":
|
|
|
|
|
|
user = payload["detail"]["userIdentity"]["userName"]
|
|
|
|
|
|
elif user_type == "Root":
|
|
|
|
|
|
user = "Root User"
|
|
|
|
|
|
elif user_type == "AssumedRole":
|
|
|
|
|
|
user = payload["detail"]["userIdentity"]["principalId"]
|
|
|
|
|
|
else:
|
|
|
|
|
|
user = "Unknown user"
|
|
|
|
|
|
|
|
|
|
|
|
mfa_used = "unknown"
|
|
|
|
|
|
if (
|
|
|
|
|
|
"additionalEventData" in payload["detail"]
|
|
|
|
|
|
and "MFAUsed" in payload["detail"]["additionalEventData"]
|
|
|
|
|
|
):
|
|
|
|
|
|
mfa_used = payload["detail"]["additionalEventData"]["MFAUsed"]
|
|
|
|
|
|
|
|
|
|
|
|
was_failure = False
|
|
|
|
|
|
if (
|
|
|
|
|
|
"responseElements" in payload["detail"]
|
|
|
|
|
|
and "ConsoleLogin" in payload["detail"]["responseElements"]
|
|
|
|
|
|
):
|
|
|
|
|
|
was_failure = payload["detail"]["responseElements"]["ConsoleLogin"] == "Failure"
|
|
|
|
|
|
|
|
|
|
|
|
error_message = None
|
|
|
|
|
|
if "errorMessage" in payload["detail"]:
|
|
|
|
|
|
error_message = payload["detail"]["errorMessage"]
|
|
|
|
|
|
|
|
|
|
|
|
if was_failure:
|
|
|
|
|
|
title = f"Failed AWS Console Sign attempt by user `{user}`."
|
|
|
|
|
|
color = COLOR_WARNING
|
|
|
|
|
|
else:
|
|
|
|
|
|
title = f"AWS Console Sign detected by user `{user}`."
|
|
|
|
|
|
color = COLOR_ALARM
|
|
|
|
|
|
|
|
|
|
|
|
formatted = [
|
|
|
|
|
|
x
|
|
|
|
|
|
for x in [
|
|
|
|
|
|
f"<font color={color}>**🚨 ALERT[{event_type}]** </font>: {title}",
|
|
|
|
|
|
f"- **Region**: {region}",
|
|
|
|
|
|
f"- **MFA Used**: {mfa_used}",
|
|
|
|
|
|
(f"- **Error Message**: {error_message}" if error_message else None),
|
|
|
|
|
|
f"- **Event Time**: {event_time}",
|
|
|
|
|
|
f"- **Account ID**: {account_id}",
|
|
|
|
|
|
]
|
|
|
|
|
|
if x is not None
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
plain = title
|
|
|
|
|
|
|
|
|
|
|
|
return [(plain, "<br/>".join(formatted))]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_cloudtrail_generic(payload: Any) -> List[Tuple[str, str]]:
|
|
|
|
|
|
region = payload["region"]
|
|
|
|
|
|
event_type = payload["detail"]["eventType"]
|
|
|
|
|
|
event_time = payload["detail"]["eventTime"]
|
|
|
|
|
|
account_id = payload["detail"]["recipientAccountId"]
|
|
|
|
|
|
detail = payload["detail-type"]
|
|
|
|
|
|
|
|
|
|
|
|
plain = f"{detail}"
|
|
|
|
|
|
|
|
|
|
|
|
formatted = [
|
|
|
|
|
|
f"<font color={COLOR_UNKNOWN}>**⚠️CLOUDTRAIL EVENT[{event_type}]**</font>: {detail}",
|
|
|
|
|
|
f"**Region**: {region}",
|
|
|
|
|
|
f"**Event Time**: {event_time}",
|
|
|
|
|
|
f"**Account ID**: {account_id}",
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
return [(plain, "<br/>".join(formatted))]
|
|
|
|
|
|
|
|
|
|
|
|
|
2022-12-01 16:31:04 +00:00
|
|
|
|
async def parse_sns_event(
|
|
|
|
|
|
route: RoutingKey,
|
|
|
|
|
|
payload: Any,
|
|
|
|
|
|
request: Request,
|
|
|
|
|
|
) -> List[Tuple[str, str]]:
|
2022-11-30 15:21:09 +00:00
|
|
|
|
if payload.get("Type") == "SubscriptionConfirmation":
|
|
|
|
|
|
return handle_subscribe_confirm(payload)
|
|
|
|
|
|
elif payload.get("Type") == "UnsubscribeConfirmation":
|
|
|
|
|
|
return handle_subscribe_confirm(payload)
|
|
|
|
|
|
elif payload.get("Type") == "Notification":
|
2022-11-30 15:52:04 +00:00
|
|
|
|
try:
|
|
|
|
|
|
body = json.loads(payload.get("Message"))
|
2025-01-31 16:52:25 +01:00
|
|
|
|
if "source" in body:
|
|
|
|
|
|
source = body["source"]
|
|
|
|
|
|
if source == "aws.signin":
|
|
|
|
|
|
return handle_cloudtrail_signin(body)
|
|
|
|
|
|
if source == "aws.sts":
|
|
|
|
|
|
return handle_cloudtrail_sts(body)
|
|
|
|
|
|
else:
|
|
|
|
|
|
return handle_cloudtrail_generic(body)
|
2022-11-30 15:52:04 +00:00
|
|
|
|
return handle_json_notification(payload, body)
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
return handle_notification(payload)
|
2025-01-31 16:25:25 +01:00
|
|
|
|
|
2022-11-30 15:21:09 +00:00
|
|
|
|
raise Exception("Unnown SNS payload type")
|