feat: super admin handling

The list of super admins now comes from a dependency, allowing it to easily be overrridden during testing.
This commit is contained in:
Chris Milne 2026-06-04 11:48:50 +01:00
parent 3460cd76a5
commit 2ec9cca99c
2 changed files with 26 additions and 33 deletions

View file

@ -18,13 +18,6 @@ from src.organisation.models import Organisation as Org
from src.auth.exceptions import UnauthorizedException
def is_super_admin(user_model) -> bool:
super_admin_emails = ["chris@sr2.uk"]
if user_model.email not in super_admin_emails:
raise UnauthorizedException(message="Must be super admin")
return True
async def org_query_user_claims(org_model: org_model_query_dependency, user_model: user_model_claims_dependency):
if user_model in org_model.user_rel:
return True
@ -35,51 +28,51 @@ async def org_query_user_claims(org_model: org_model_query_dependency, user_mode
org_query_user_claims_dependency = Annotated[bool, Depends(org_query_user_claims)]
async def org_query_root_claims(user_model: user_model_claims_dependency, org_model: org_model_query_dependency):
async def org_query_root_claims(user_model: user_model_claims_dependency, org_model: org_model_query_dependency, su_emails: su_list_dependency):
if org_model.root_user_id == user_model.id:
return org_model
if is_super_admin(user_model):
return org_model
try:
if await user_model_super_admin(user_model, su_emails):
return org_model
except UnauthorizedException:
pass
raise UnauthorizedException()
raise UnauthorizedException(message="Must be the org's root user")
org_model_root_claim_query_dependency = Annotated[type[Org], Depends(org_query_root_claims)]
async def org_body_root_claims(user_model: user_model_claims_dependency, org_model: org_model_body_dependency):
async def org_body_root_claims(user_model: user_model_claims_dependency, org_model: org_model_body_dependency, su_emails: su_list_dependency):
if org_model.root_user_id == user_model.id:
return org_model
if is_super_admin(user_model):
return org_model
try:
if await user_model_super_admin(user_model, su_emails):
return org_model
except UnauthorizedException:
pass
raise UnauthorizedException()
raise UnauthorizedException(message="Must be the org's root user")
org_model_root_claim_body_dependency = Annotated[type[Org], Depends(org_body_root_claims)]
async def user_model_super_admin(user_model: user_model_claims_dependency):
if is_super_admin(user_model):
def get_super_admin_list():
return []
def empty_su_list():
return []
su_list_dependency = Annotated[list[User], Depends(get_super_admin_list)]
async def user_model_super_admin(user_model: user_model_claims_dependency, super_admin_emails: su_list_dependency):
if user_model.email in super_admin_emails:
return user_model
raise UnauthorizedException(message="Must be super admin")
super_admin_dependency = Annotated[type[User], Depends(user_model_super_admin)]
# Override for testing
async def never_super_admin(user_model: user_model_claims_dependency):
def _is_super_admin(_user_model) -> bool:
super_admin_emails = []
if _user_model.email not in super_admin_emails:
raise UnauthorizedException(message="Must be super admin")
return True
if _is_super_admin(user_model):
return user_model
raise UnauthorizedException(message="Must be super admin")

View file

@ -10,7 +10,7 @@ from src.organisation.models import Organisation as Org
from src.contact.models import Contact
from src.iam.models import Group, Permission
from src.auth.service import get_current_user, get_dev_user
from src.auth.dependencies import user_model_super_admin, never_super_admin
from src.auth.dependencies import empty_su_list, get_super_admin_list
from src.main import app # inited FastAPI app
from src.database import engine, Base, get_db
@ -51,7 +51,7 @@ async def no_su_client(db_session) -> AsyncGenerator[AsyncClient, None]:
return db_session
app.dependency_overrides[get_db] = get_db_override
app.dependency_overrides[get_current_user] = get_dev_user
app.dependency_overrides[user_model_super_admin] = never_super_admin
app.dependency_overrides[get_super_admin_list] = empty_su_list
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://localhost:8000/api/v1") as ac:
yield ac