From 2ec9cca99c0f5959530db5c21b9d9b8620081c06 Mon Sep 17 00:00:00 2001 From: luxferre Date: Thu, 4 Jun 2026 11:48:50 +0100 Subject: [PATCH] feat: super admin handling The list of super admins now comes from a dependency, allowing it to easily be overrridden during testing. --- src/auth/dependencies.py | 55 ++++++++++++++++++---------------------- test/conftest.py | 4 +-- 2 files changed, 26 insertions(+), 33 deletions(-) diff --git a/src/auth/dependencies.py b/src/auth/dependencies.py index f1c1d78..754892f 100644 --- a/src/auth/dependencies.py +++ b/src/auth/dependencies.py @@ -18,13 +18,6 @@ from src.organisation.models import Organisation as Org from src.auth.exceptions import UnauthorizedException -def is_super_admin(user_model) -> bool: - super_admin_emails = ["chris@sr2.uk"] - if user_model.email not in super_admin_emails: - raise UnauthorizedException(message="Must be super admin") - return True - - async def org_query_user_claims(org_model: org_model_query_dependency, user_model: user_model_claims_dependency): if user_model in org_model.user_rel: return True @@ -35,51 +28,51 @@ async def org_query_user_claims(org_model: org_model_query_dependency, user_mode org_query_user_claims_dependency = Annotated[bool, Depends(org_query_user_claims)] -async def org_query_root_claims(user_model: user_model_claims_dependency, org_model: org_model_query_dependency): +async def org_query_root_claims(user_model: user_model_claims_dependency, org_model: org_model_query_dependency, su_emails: su_list_dependency): if org_model.root_user_id == user_model.id: return org_model - if is_super_admin(user_model): - return org_model + try: + if await user_model_super_admin(user_model, su_emails): + return org_model + except UnauthorizedException: + pass - raise UnauthorizedException() + raise UnauthorizedException(message="Must be the org's root user") org_model_root_claim_query_dependency = Annotated[type[Org], Depends(org_query_root_claims)] -async def org_body_root_claims(user_model: user_model_claims_dependency, org_model: org_model_body_dependency): +async def org_body_root_claims(user_model: user_model_claims_dependency, org_model: org_model_body_dependency, su_emails: su_list_dependency): if org_model.root_user_id == user_model.id: return org_model - if is_super_admin(user_model): - return org_model + try: + if await user_model_super_admin(user_model, su_emails): + return org_model + except UnauthorizedException: + pass - raise UnauthorizedException() + raise UnauthorizedException(message="Must be the org's root user") org_model_root_claim_body_dependency = Annotated[type[Org], Depends(org_body_root_claims)] -async def user_model_super_admin(user_model: user_model_claims_dependency): - if is_super_admin(user_model): +def get_super_admin_list(): + return [] + +def empty_su_list(): + return [] + +su_list_dependency = Annotated[list[User], Depends(get_super_admin_list)] + +async def user_model_super_admin(user_model: user_model_claims_dependency, super_admin_emails: su_list_dependency): + if user_model.email in super_admin_emails: return user_model raise UnauthorizedException(message="Must be super admin") super_admin_dependency = Annotated[type[User], Depends(user_model_super_admin)] - - -# Override for testing -async def never_super_admin(user_model: user_model_claims_dependency): - def _is_super_admin(_user_model) -> bool: - super_admin_emails = [] - if _user_model.email not in super_admin_emails: - raise UnauthorizedException(message="Must be super admin") - return True - - if _is_super_admin(user_model): - return user_model - - raise UnauthorizedException(message="Must be super admin") diff --git a/test/conftest.py b/test/conftest.py index 2de749c..58b5dc5 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -10,7 +10,7 @@ from src.organisation.models import Organisation as Org from src.contact.models import Contact from src.iam.models import Group, Permission from src.auth.service import get_current_user, get_dev_user -from src.auth.dependencies import user_model_super_admin, never_super_admin +from src.auth.dependencies import empty_su_list, get_super_admin_list from src.main import app # inited FastAPI app from src.database import engine, Base, get_db @@ -51,7 +51,7 @@ async def no_su_client(db_session) -> AsyncGenerator[AsyncClient, None]: return db_session app.dependency_overrides[get_db] = get_db_override app.dependency_overrides[get_current_user] = get_dev_user - app.dependency_overrides[user_model_super_admin] = never_super_admin + app.dependency_overrides[get_super_admin_list] = empty_su_list transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://localhost:8000/api/v1") as ac: yield ac