diff --git a/ops_bot/aws.py b/ops_bot/aws.py
index 37376e0..5f2d7e7 100644
--- a/ops_bot/aws.py
+++ b/ops_bot/aws.py
@@ -28,7 +28,7 @@ def handle_notification(payload: Any) -> List[Tuple[str, str]]:
def handle_json_notification(payload: Any, body: Any) -> List[Tuple[str, str]]:
if "AlarmName" not in body:
- payload_str = payload.get("Message")
+ payload_str = json.dumps(body, indent=2)
msg = "Received unknown json payload type over AWS SNS"
msg += f"""\n
```json
@@ -61,6 +61,49 @@ def handle_json_notification(payload: Any, body: Any) -> List[Tuple[str, str]]:
return [(plain, formatted)]
+def handle_cloudtrail_sts(payload: Any) -> List[Tuple[str, str]]:
+ region = payload["region"]
+ # event_type = payload["detail"]["eventType"]
+ event_name = payload["detail"]["eventName"]
+ event_time = payload["detail"]["eventTime"]
+ account_id = payload["detail"]["recipientAccountId"]
+ user_type = payload["detail"]["userIdentity"]["type"]
+ user = "Unknown user"
+ if user_type == "SAMLUser":
+ user = payload["detail"]["userIdentity"]["userName"]
+
+ assumed_role = None
+ if (
+ "responseElements" in payload["detail"]
+ and "assumedRoleUser" in payload["detail"]["responseElements"]
+ ):
+ assumed_role = payload["detail"]["responseElements"]["assumedRoleUser"]["arn"]
+
+ color = COLOR_ALARM
+ if event_name == "AssumeRoleWithSAML":
+ title = f"AWS SAML Sign detected by user `{user}`."
+ else:
+ title = event_name
+
+ subject = event_name
+
+ formatted = [
+ x
+ for x in [
+ f"**🚨 ALERT[{subject}]** : {title}",
+ f"- **Region**: {region}",
+ f"- **Assumed Role**: {assumed_role}",
+ f"- **Event Time**: {event_time}",
+ f"- **Account ID**: {account_id}",
+ ]
+ if x is not None
+ ]
+
+ plain = title
+
+ return [(plain, "
".join(formatted))]
+
+
def handle_cloudtrail_signin(payload: Any) -> List[Tuple[str, str]]:
region = payload["region"]
event_type = payload["detail"]["eventType"]
@@ -156,14 +199,16 @@ async def parse_sns_event(
elif payload.get("Type") == "Notification":
try:
body = json.loads(payload.get("Message"))
+ if "source" in body:
+ source = body["source"]
+ if source == "aws.signin":
+ return handle_cloudtrail_signin(body)
+ if source == "aws.sts":
+ return handle_cloudtrail_sts(body)
+ else:
+ return handle_cloudtrail_generic(body)
return handle_json_notification(payload, body)
except Exception:
return handle_notification(payload)
- elif "source" in payload:
- source = payload["source"]
- if source == "aws.signin":
- return handle_cloudtrail_signin(payload)
- else:
- return handle_cloudtrail_generic(payload)
raise Exception("Unnown SNS payload type")
diff --git a/tests/test_ops_bot.py b/tests/test_ops_bot.py
index 4cd1c58..3e568e8 100644
--- a/tests/test_ops_bot.py
+++ b/tests/test_ops_bot.py
@@ -145,6 +145,85 @@ sns_signin_failure = """
}
"""
+sns_sts_saml = """
+{
+ "version": "0",
+ "id": "f7ea4d10-ee27-ee26-efa3-7fe107e12ba4",
+ "detail-type": "AWS API Call via CloudTrail",
+ "source": "aws.sts",
+ "account": "1234567890",
+ "time": "2025-01-31T15:34:20Z",
+ "region": "eu-west-1",
+ "resources": [],
+ "detail": {
+ "eventVersion": "1.08",
+ "userIdentity": {
+ "type": "SAMLUser",
+ "principalId": "redacted:user@example.com",
+ "userName": "user@example.com",
+ "identityProvider": "redacted"
+ },
+ "eventTime": "2025-01-31T15:34:20Z",
+ "eventSource": "sts.amazonaws.com",
+ "eventName": "AssumeRoleWithSAML",
+ "awsRegion": "eu-west-1",
+ "sourceIPAddress": "54.0.0.0",
+ "userAgent": "aws-internal/3 aws-sdk-java/1.12.779 Linux/4.14.355-275.570.amzn2.x86_64 OpenJDK_64-Bit_Server_VM/17.0.13+11-LTS java/17.0.13 vendor/Amazon.com_Inc. cfg/retry-mode/standard cfg/auth-source#imds",
+ "requestParameters": {
+ "sAMLAssertionID": "_d1e7a65e-2298-4c0f-88b7-4e62f5a7a00c",
+ "roleSessionName": "user@example.com",
+ "roleArn": "arn:aws:iam::1234567890:role/aws-reserved/sso.amazonaws.com/eu-west-1/AWSReservedSSO_AWSAdministratorAccess_abcd1234",
+ "principalArn": "arn:aws:iam::1234567890:saml-provider/AWSSSO_aslksdafkj212_DO_NOT_DELETE",
+ "durationSeconds": 3600
+ },
+ "responseElements": {
+ "credentials": {
+ "accessKeyId": "ASredacted",
+ "sessionToken": "redacted",
+ "expiration": "Jan 31, 2025, 4:34:19 PM"
+ },
+ "assumedRoleUser": {
+ "assumedRoleId": "redacted:user@example.com",
+ "arn": "arn:aws:sts::1234567890:assumed-role/AWSReservedSSO_AWSAdministratorAccess_abcd1234/user@example.com"
+ },
+ "subject": "user@example.com",
+ "subjectType": "persistent",
+ "issuer": "https://portal.sso.eu-west-1.amazonaws.com/saml/assertion/NjQ2NzQ5Mjg5MzMxX2lucy1hY2MwZGYyZTYzOGZkYjNm",
+ "audience": "https://signin.aws.amazon.com/saml",
+ "nameQualifier": "12346"
+ },
+ "requestID": "42ba0a94-8f19-4a1b-9bac-9e8eb6ac1c15",
+ "eventID": "0f652a57-36ad-4a85-8651-252f256660e8",
+ "readOnly": true,
+ "resources": [
+ {
+ "accountId": "1234567890",
+ "type": "AWS::IAM::Role",
+ "ARN": "arn:aws:iam::1234567890:role/aws-reserved/sso.amazonaws.com/eu-west-1/AWSReservedSSO_AWSAdministratorAccess_abcd1234"
+ },
+ {
+ "accountId": "1234567890",
+ "type": "AWS::IAM::SAMLProvider",
+ "ARN": "arn:aws:iam::1234567890:saml-provider/AWSSSO_aslksdafkj212_DO_NOT_DELETE"
+ }
+ ],
+ "eventType": "AwsApiCall",
+ "managementEvent": true,
+ "recipientAccountId": "1234567890",
+ "eventCategory": "Management",
+ "tlsDetails": {
+ "tlsVersion": "TLSv1.3",
+ "cipherSuite": "TLS_AES_128_GCM_SHA256",
+ "clientProvidedHostHeader": "sts.eu-west-1.amazonaws.com"
+ }
+ }
+}
+"""
+
+
+def wrap_sns_msg(msg: str) -> dict:
+ return {"Type": "Notification", "Message": msg}
+
async def test_aws_sns_notification() -> None:
r = await aws.parse_sns_event(None, json.loads(sns_notification), None)
@@ -170,14 +249,21 @@ async def test_aws_sns_unsubscribe() -> None:
async def test_aws_sns_signin() -> None:
- r = await aws.parse_sns_event(None, json.loads(sns_signin), None)
+ r = await aws.parse_sns_event(None, wrap_sns_msg(sns_signin), None)
print(r)
expected = "**🚨 ALERT[AwsConsoleSignIn]** : AWS Console Sign detected by user `user@example.com`.
- **Region**: eu-north-1
- **MFA Used**: Yes
- **Event Time**: 2025-01-31T14:03:15Z
- **Account ID**: 1234567890"
assert r[0][1] == expected
async def test_aws_sns_signin_failure() -> None:
- r = await aws.parse_sns_event(None, json.loads(sns_signin_failure), None)
+ r = await aws.parse_sns_event(None, wrap_sns_msg(sns_signin_failure), None)
print(r)
expected = "**🚨 ALERT[AwsConsoleSignIn]** : Failed AWS Console Sign attempt by user `user@example.com`.
- **Region**: eu-north-1
- **MFA Used**: Yes
- **Error Message**: Failed authentication
- **Event Time**: 2025-01-31T14:01:49Z
- **Account ID**: 1234567890"
assert r[0][1] == expected
+
+
+async def test_aws_sns_sts_saml() -> None:
+ r = await aws.parse_sns_event(None, wrap_sns_msg(sns_sts_saml), None)
+ print(r)
+ expected = "**🚨 ALERT[AssumeRoleWithSAML]** : AWS SAML Sign detected by user `user@example.com`.
- **Region**: eu-west-1
- **Assumed Role**: arn:aws:sts::1234567890:assumed-role/AWSReservedSSO_AWSAdministratorAccess_abcd1234/user@example.com
- **Event Time**: 2025-01-31T15:34:20Z
- **Account ID**: 1234567890"
+ assert r[0][1] == expected