cloud-api/src/iam/service.py
luxferre c2e035dede feat: more accurate status codes
403 Forbidden replacing many 401 Unauthorized usages.
2026-06-11 14:58:05 +01:00

66 lines
1.7 KiB
Python

"""
Business logic reusable functions related to IAM
Exports:
- service_key_dependency: bool: verifies request headers contain the correct api key for the service
"""
from typing import Annotated
from datetime import datetime, timedelta, timezone
from src.iam.schemas import IAMCAoRRequest
from src.service.models import Service
from src.database import db_dependency
from src.exceptions import UnauthorizedException
from src.utils import send_email, generate_jwt
from fastapi import Request, Depends
def valid_service_key(
db: db_dependency, request: Request, request_model: IAMCAoRRequest
) -> bool:
rn = request_model.rn
api_key = request.headers.get("X-API-Key", None)
if not api_key:
raise UnauthorizedException("Missing API key")
service = rn.service
result = (
db.query(Service)
.filter(Service.name == service)
.filter(Service.api_key == api_key)
.first()
)
if result is None:
raise UnauthorizedException("Invalid API key")
return True
service_key_dependency = Annotated[bool, Depends(valid_service_key)]
async def send_user_group_invitation(
user_email: str, org_name: str, org_id: int, group_id: int, group_name: str
):
expiry_delta = timedelta(hours=24)
expiry = datetime.now(timezone.utc) + expiry_delta
claims = {
"email": user_email,
"org_id": org_id,
"group_id": group_id,
"group_name": group_name,
"exp": expiry,
"type": "group_invite",
}
token = await generate_jwt(claims)
subject = f"You have been invited to join a group of {org_name}"
body = f"You have been invited to join {group_name}.\nClick the link to accept.\nfrontend.capture/send/to/endpoint/{token}"
await send_email(
recipient=user_email,
subject=subject,
body=body,
)