import json import logging from typing import Any, List, Tuple from fastapi import Request from ops_bot.common import COLOR_ALARM, COLOR_OK, COLOR_UNKNOWN, COLOR_WARNING from ops_bot.config import RoutingKey def handle_subscribe_confirm(payload: Any) -> List[Tuple[str, str]]: message = payload.get("Message") url = payload.get("SubscribeURL") plain = f"{message}\n\n{url}" return [(plain, plain)] def handle_notification(payload: Any) -> List[Tuple[str, str]]: message = payload.get("Message") subject = payload.get("Subject") plain = f"{subject}\n{message}" formatted = ( f"{subject}\n

{message}

" ) return [(plain, formatted)] def handle_json_notification(payload: Any, body: Any) -> List[Tuple[str, str]]: if "AlarmName" not in body: payload_str = payload.get("Message") msg = "Received unknown json payload type over AWS SNS" msg += f"""\n
```json {payload_str} ```""" logging.info(msg) logging.info(payload.get("Message")) return [(msg, msg)] description = body.get("AlarmDescription") subject = payload.get("Subject") state_value = body.get("NewStateValue", "unknown") if state_value == "ALARM": color = COLOR_ALARM elif state_value == "OK": color = COLOR_OK else: color = COLOR_UNKNOWN plain = f"{subject}" formatted = f"{subject}\n" if state_value == "OK": plain += "\n(this alarm has been resolved!)" formatted += "\n

(this alarm has been resolved!)

" else: plain += "\n{description}" formatted += f"\n

{description}

" return [(plain, formatted)] def handle_cloudtrail_signin(payload: Any) -> List[Tuple[str, str]]: region = payload["region"] event_type = payload["detail"]["eventType"] event_time = payload["detail"]["eventTime"] account_id = None if "accountId" in payload["detail"]["userIdentity"]: account_id = payload["detail"]["userIdentity"]["accountId"] else: account_id = payload["detail"]["recipientAccountId"] user_type = payload["detail"]["userIdentity"]["type"] if user_type == "IAMUser": user = payload["detail"]["userIdentity"]["userName"] elif user_type == "Root": user = "Root User" elif user_type == "AssumedRole": user = payload["detail"]["userIdentity"]["principalId"] else: user = "Unknown user" mfa_used = "unknown" if ( "additionalEventData" in payload["detail"] and "MFAUsed" in payload["detail"]["additionalEventData"] ): mfa_used = payload["detail"]["additionalEventData"]["MFAUsed"] was_failure = False if ( "responseElements" in payload["detail"] and "ConsoleLogin" in payload["detail"]["responseElements"] ): was_failure = payload["detail"]["responseElements"]["ConsoleLogin"] == "Failure" error_message = None if "errorMessage" in payload["detail"]: error_message = payload["detail"]["errorMessage"] if was_failure: title = f"Failed AWS Console Sign attempt by user `{user}`." color = COLOR_WARNING else: title = f"AWS Console Sign detected by user `{user}`." color = COLOR_ALARM formatted = [ x for x in [ f"**🚨 ALERT[{event_type}]** : {title}", f"- **Region**: {region}", f"- **MFA Used**: {mfa_used}", (f"- **Error Message**: {error_message}" if error_message else None), f"- **Event Time**: {event_time}", f"- **Account ID**: {account_id}", ] if x is not None ] plain = title return [(plain, "
".join(formatted))] def handle_cloudtrail_generic(payload: Any) -> List[Tuple[str, str]]: region = payload["region"] event_type = payload["detail"]["eventType"] event_time = payload["detail"]["eventTime"] account_id = payload["detail"]["recipientAccountId"] detail = payload["detail-type"] plain = f"{detail}" formatted = [ f"**⚠️CLOUDTRAIL EVENT[{event_type}]**: {detail}", f"**Region**: {region}", f"**Event Time**: {event_time}", f"**Account ID**: {account_id}", ] return [(plain, "
".join(formatted))] async def parse_sns_event( route: RoutingKey, payload: Any, request: Request, ) -> List[Tuple[str, str]]: if payload.get("Type") == "SubscriptionConfirmation": return handle_subscribe_confirm(payload) elif payload.get("Type") == "UnsubscribeConfirmation": return handle_subscribe_confirm(payload) elif payload.get("Type") == "Notification": try: body = json.loads(payload.get("Message")) return handle_json_notification(payload, body) except Exception: return handle_notification(payload) elif "source" in payload: source = payload["source"] if source == "aws.signin": return handle_cloudtrail_signin(payload) else: return handle_cloudtrail_generic(payload) raise Exception("Unnown SNS payload type")