cloud-api/src/auth/dependencies.py
luxferre 2ec9cca99c feat: super admin handling
The list of super admins now comes from a dependency, allowing it to easily be overrridden during testing.
2026-06-04 11:48:50 +01:00

78 lines
2.5 KiB
Python

"""
Auth dependencies
Exports:
- org_query_user_claims_dependency: bool: Verifies user belongs to org
- org_model_root_claim_query_dependency: org_model: verifies org exists and user is either root or su, gets org from query
- org_model_root_claim_body_dependency: org_model: verifies org exists and user is either root or su, gets org from body
- super_admin_dependency: user_model: verifies the user is a super admin
"""
from typing import Annotated
from fastapi import Depends
from src.user.dependencies import user_model_claims_dependency
from src.user.models import User
from src.organisation.dependencies import org_model_query_dependency, org_model_body_dependency
from src.organisation.models import Organisation as Org
from src.auth.exceptions import UnauthorizedException
async def org_query_user_claims(org_model: org_model_query_dependency, user_model: user_model_claims_dependency):
if user_model in org_model.user_rel:
return True
raise UnauthorizedException()
org_query_user_claims_dependency = Annotated[bool, Depends(org_query_user_claims)]
async def org_query_root_claims(user_model: user_model_claims_dependency, org_model: org_model_query_dependency, su_emails: su_list_dependency):
if org_model.root_user_id == user_model.id:
return org_model
try:
if await user_model_super_admin(user_model, su_emails):
return org_model
except UnauthorizedException:
pass
raise UnauthorizedException(message="Must be the org's root user")
org_model_root_claim_query_dependency = Annotated[type[Org], Depends(org_query_root_claims)]
async def org_body_root_claims(user_model: user_model_claims_dependency, org_model: org_model_body_dependency, su_emails: su_list_dependency):
if org_model.root_user_id == user_model.id:
return org_model
try:
if await user_model_super_admin(user_model, su_emails):
return org_model
except UnauthorizedException:
pass
raise UnauthorizedException(message="Must be the org's root user")
org_model_root_claim_body_dependency = Annotated[type[Org], Depends(org_body_root_claims)]
def get_super_admin_list():
return []
def empty_su_list():
return []
su_list_dependency = Annotated[list[User], Depends(get_super_admin_list)]
async def user_model_super_admin(user_model: user_model_claims_dependency, super_admin_emails: su_list_dependency):
if user_model.email in super_admin_emails:
return user_model
raise UnauthorizedException(message="Must be super admin")
super_admin_dependency = Annotated[type[User], Depends(user_model_super_admin)]