1
0
Fork 0
forked from sr2/cloud-api
cloud-api/src/iam/service.py

113 lines
2.8 KiB
Python
Raw Normal View History

"""
2026-05-28 13:22:24 +01:00
Business logic reusable functions related to IAM
2026-05-28 13:22:24 +01:00
Exports:
- service_key_dependency: bool: verifies request headers contain the correct api key for the service
"""
from typing import Annotated
from datetime import datetime, timedelta, timezone
from fastapi import Request, Depends
from src.database import db_dependency
from src.exceptions import UnauthorizedException
from src.utils import send_email, generate_jwt
from src.iam.models import Group
from src.service.models import Service
from src.service.schemas import HasServiceName
def valid_service_key(
db: db_dependency, request: Request, request_model: HasServiceName
) -> bool:
rn = request_model.rn
api_key = request.headers.get("X-API-Key", None)
if not api_key:
2026-06-04 14:53:35 +01:00
raise UnauthorizedException("Missing API key")
service = rn.service
result = (
db.query(Service)
.filter(Service.name == service)
.filter(Service.api_key == api_key)
.first()
)
if result is None:
2026-06-04 14:53:35 +01:00
raise UnauthorizedException("Invalid API key")
return True
service_key_dependency = Annotated[bool, Depends(valid_service_key)]
async def send_user_group_invitation(
user_email: str, org_name: str, org_id: int, group_id: int, group_name: str
):
expiry_delta = timedelta(hours=24)
expiry = datetime.now(timezone.utc) + expiry_delta
claims = {
"email": user_email,
"org_id": org_id,
"group_id": group_id,
"group_name": group_name,
"exp": expiry,
"type": "group_invite",
}
token = await generate_jwt(claims)
subject = f"You have been invited to join a group of {org_name}"
body = f"You have been invited to join {group_name}.\nClick the link to accept.\nfrontend.capture/send/to/endpoint/{token}"
await send_email(
recipient=user_email,
subject=subject,
body=body,
)
async def create_default_user_group(db: db_dependency, org_model):
new_group = Group(name="Default Users", org_id=org_model.id)
db.add(new_group)
db.flush()
# Grant default permissions here
db.flush()
return new_group
async def assign_default_user_group(db: db_dependency, org_model, user_model):
group_model = None
for group in org_model.group_rel:
if group.name == "Default Users":
group_model = group
break
if group_model is None:
group_model = await create_default_user_group(db=db, org_model=org_model)
user_model.group_rel.append(group_model)
db.flush()
async def create_default_root_group(db: db_dependency, org_model):
new_group = Group(name="Root User", org_id=org_model.id)
db.add(new_group)
db.flush()
# Grant default permissions here
db.flush()
return new_group
async def assign_default_root_group(db: db_dependency, org_model, user_model):
group_model = None
for group in org_model.group_rel:
if group.name == "Root User":
group_model = group
break
if group_model is None:
group_model = await create_default_root_group(db=db, org_model=org_model)
user_model.group_rel.append(group_model)
db.flush()