forked from sr2/cloud-api
104 lines
2.5 KiB
Python
104 lines
2.5 KiB
Python
"""
|
|
Auth dependencies
|
|
|
|
Exports:
|
|
- org_query_user_claims_dependency: bool: Verifies user belongs to org
|
|
- org_model_root_claim_query_dependency: org_model: verifies org exists and user is either root or su, gets org from query
|
|
- org_model_root_claim_body_dependency: org_model: verifies org exists and user is either root or su, gets org from body
|
|
- super_admin_dependency: user_model: verifies the user is a super admin
|
|
"""
|
|
|
|
from typing import Annotated
|
|
from fastapi import Depends
|
|
|
|
from src.exceptions import ForbiddenException
|
|
from src.user.dependencies import user_model_claims_dependency
|
|
from src.user.models import User
|
|
from src.organisation.dependencies import (
|
|
org_model_query_dependency,
|
|
org_model_body_dependency,
|
|
)
|
|
from src.organisation.models import Organisation as Org
|
|
|
|
|
|
async def org_query_user_claims(
|
|
org_model: org_model_query_dependency, user_model: user_model_claims_dependency
|
|
):
|
|
if user_model in org_model.user_rel:
|
|
return True
|
|
|
|
raise ForbiddenException()
|
|
|
|
|
|
org_query_user_claims_dependency = Annotated[bool, Depends(org_query_user_claims)]
|
|
|
|
|
|
async def org_query_root_claims(
|
|
user_model: user_model_claims_dependency,
|
|
org_model: org_model_query_dependency,
|
|
su_emails: su_list_dependency,
|
|
):
|
|
if org_model.root_user_id == user_model.id:
|
|
return org_model
|
|
|
|
try:
|
|
if await user_model_super_admin(user_model, su_emails):
|
|
return org_model
|
|
except ForbiddenException:
|
|
pass
|
|
|
|
raise ForbiddenException(message="Must be the org's root user")
|
|
|
|
|
|
org_model_root_claim_query_dependency = Annotated[
|
|
type[Org], Depends(org_query_root_claims)
|
|
]
|
|
|
|
|
|
async def org_body_root_claims(
|
|
user_model: user_model_claims_dependency,
|
|
org_model: org_model_body_dependency,
|
|
su_emails: su_list_dependency,
|
|
):
|
|
if org_model.root_user_id == user_model.id:
|
|
return org_model
|
|
|
|
try:
|
|
if await user_model_super_admin(user_model, su_emails):
|
|
return org_model
|
|
except ForbiddenException:
|
|
pass
|
|
|
|
raise ForbiddenException(message="Must be the org's root user")
|
|
|
|
|
|
org_model_root_claim_body_dependency = Annotated[
|
|
type[Org], Depends(org_body_root_claims)
|
|
]
|
|
|
|
|
|
def get_super_admin_list():
|
|
return []
|
|
|
|
|
|
def empty_su_list():
|
|
return []
|
|
|
|
|
|
def testing_su_list():
|
|
return ["admin@test.com"]
|
|
|
|
|
|
su_list_dependency = Annotated[list[User], Depends(get_super_admin_list)]
|
|
|
|
|
|
async def user_model_super_admin(
|
|
user_model: user_model_claims_dependency, super_admin_emails: su_list_dependency
|
|
):
|
|
if user_model.email in super_admin_emails:
|
|
return user_model
|
|
|
|
raise ForbiddenException(message="Must be super admin")
|
|
|
|
|
|
super_admin_dependency = Annotated[type[User], Depends(user_model_super_admin)]
|