1
0
Fork 0
forked from sr2/cloud-api
cloud-api/src/iam/service.py

115 lines
2.8 KiB
Python
Raw Normal View History

"""
2026-05-28 13:22:24 +01:00
Business logic reusable functions related to IAM
2026-05-28 13:22:24 +01:00
Exports:
- service_key_dependency: bool: verifies request headers contain the correct api key for the service
"""
from typing import Annotated
from datetime import datetime, timedelta, timezone
from src.service.models import Service
from src.database import db_dependency
from src.exceptions import UnauthorizedException
from src.utils import send_email, generate_jwt
from src.iam.schemas import IAMCAoRRequest
from src.iam.models import Group
2026-06-08 10:45:38 +01:00
from fastapi import Request, Depends
def valid_service_key(
db: db_dependency, request: Request, request_model: IAMCAoRRequest
) -> bool:
rn = request_model.rn
api_key = request.headers.get("X-API-Key", None)
if not api_key:
2026-06-04 14:53:35 +01:00
raise UnauthorizedException("Missing API key")
service = rn.service
result = (
db.query(Service)
.filter(Service.name == service)
.filter(Service.api_key == api_key)
.first()
)
if result is None:
2026-06-04 14:53:35 +01:00
raise UnauthorizedException("Invalid API key")
return True
service_key_dependency = Annotated[bool, Depends(valid_service_key)]
async def send_user_group_invitation(
user_email: str, org_name: str, org_id: int, group_id: int, group_name: str
):
expiry_delta = timedelta(hours=24)
expiry = datetime.now(timezone.utc) + expiry_delta
claims = {
"email": user_email,
"org_id": org_id,
"group_id": group_id,
"group_name": group_name,
"exp": expiry,
"type": "group_invite",
}
token = await generate_jwt(claims)
subject = f"You have been invited to join a group of {org_name}"
body = f"You have been invited to join {group_name}.\nClick the link to accept.\nfrontend.capture/send/to/endpoint/{token}"
await send_email(
recipient=user_email,
subject=subject,
body=body,
)
async def create_default_user_group(db: db_dependency, org_model):
new_group = Group(name="Default Users", org_id=org_model.id)
db.add(new_group)
db.flush()
# Grant default permissions here
db.flush()
return new_group
async def assign_default_user_group(db: db_dependency, org_model, user_model):
group_model = None
for group in org_model.group_rel:
if group.name == "Default Users":
group_model = group
break
if group_model is None:
group_model = await create_default_user_group(db=db, org_model=org_model)
user_model.group_rel.append(group_model)
db.flush()
async def create_default_root_group(db: db_dependency, org_model):
new_group = Group(name="Root User", org_id=org_model.id)
db.add(new_group)
db.flush()
# Grant default permissions here
db.flush()
return new_group
async def assign_default_root_group(db: db_dependency, org_model, user_model):
group_model = None
for group in org_model.group_rel:
if group.name == "Root User":
group_model = group
break
if group_model is None:
group_model = await create_default_root_group(db=db, org_model=org_model)
user_model.group_rel.append(group_model)
db.flush()