""" Business logic reusable functions related to IAM Exports: - service_key_dependency: bool: verifies request headers contain the correct api key for the service """ from typing import Annotated from datetime import datetime, timedelta, timezone from fastapi import Request, Depends from sqlalchemy.orm import Session from src.database import db_dependency from src.exceptions import UnauthorizedException from src.utils import send_email, generate_jwt from src.iam.models import Group from src.organisation.models import Organisation as Org from src.user.models import User from src.iam.models import Permission as Perm from src.service.models import Service from src.service.schemas import HasServiceName def valid_service_key( db: db_dependency, request: Request, request_model: HasServiceName ) -> bool: rn = request_model.rn api_key = request.headers.get("X-API-Key", None) if not api_key: raise UnauthorizedException("Missing API key") service = rn.service result = ( db.query(Service) .filter(Service.name == service) .filter(Service.api_key == api_key) .first() ) if result is None: raise UnauthorizedException("Invalid API key") return True service_key_dependency = Annotated[bool, Depends(valid_service_key)] async def send_user_group_invitation( user_email: str, org_name: str, org_id: int, group_id: int, group_name: str ): expiry_delta = timedelta(hours=24) expiry = datetime.now(timezone.utc) + expiry_delta claims = { "email": user_email, "org_id": org_id, "group_id": group_id, "group_name": group_name, "exp": expiry, "type": "group_invite", } token = await generate_jwt(claims) subject = f"You have been invited to join a group of {org_name}" body = f"You have been invited to join {group_name}.\nClick the link to accept.\nfrontend.capture/send/to/endpoint/{token}" await send_email( recipient=user_email, subject=subject, body=body, ) async def create_group_and_assign_perms( db: Session, org_model: Org, group_name: str, perm_list: list[int] ): new_group = Group(name=group_name, org_id=org_model.id) db.add(new_group) db.flush() for permission in perm_list: perm_model = db.get(Perm, permission) if perm_model is None: continue new_group.permission_rel.append(perm_model) db.flush() return new_group async def assign_default_group( db: db_dependency, org_model: Org, user_model: User, group_name: str, perm_list: list[int], ): group_model = ( db.query(Group) .filter(Group.org_id == org_model.id) .filter(Group.name == group_name) .first() ) if group_model is None: group_model = await create_group_and_assign_perms( db=db, group_name=group_name, org_model=org_model, perm_list=perm_list ) user_model.group_rel.append(group_model) db.flush()