diff --git a/src/auth/dependencies.py b/src/auth/dependencies.py index 754892f..f1c1d78 100644 --- a/src/auth/dependencies.py +++ b/src/auth/dependencies.py @@ -18,6 +18,13 @@ from src.organisation.models import Organisation as Org from src.auth.exceptions import UnauthorizedException +def is_super_admin(user_model) -> bool: + super_admin_emails = ["chris@sr2.uk"] + if user_model.email not in super_admin_emails: + raise UnauthorizedException(message="Must be super admin") + return True + + async def org_query_user_claims(org_model: org_model_query_dependency, user_model: user_model_claims_dependency): if user_model in org_model.user_rel: return True @@ -28,51 +35,51 @@ async def org_query_user_claims(org_model: org_model_query_dependency, user_mode org_query_user_claims_dependency = Annotated[bool, Depends(org_query_user_claims)] -async def org_query_root_claims(user_model: user_model_claims_dependency, org_model: org_model_query_dependency, su_emails: su_list_dependency): +async def org_query_root_claims(user_model: user_model_claims_dependency, org_model: org_model_query_dependency): if org_model.root_user_id == user_model.id: return org_model - try: - if await user_model_super_admin(user_model, su_emails): - return org_model - except UnauthorizedException: - pass + if is_super_admin(user_model): + return org_model - raise UnauthorizedException(message="Must be the org's root user") + raise UnauthorizedException() org_model_root_claim_query_dependency = Annotated[type[Org], Depends(org_query_root_claims)] -async def org_body_root_claims(user_model: user_model_claims_dependency, org_model: org_model_body_dependency, su_emails: su_list_dependency): +async def org_body_root_claims(user_model: user_model_claims_dependency, org_model: org_model_body_dependency): if org_model.root_user_id == user_model.id: return org_model - try: - if await user_model_super_admin(user_model, su_emails): - return org_model - except UnauthorizedException: - pass + if is_super_admin(user_model): + return org_model - raise UnauthorizedException(message="Must be the org's root user") + raise UnauthorizedException() org_model_root_claim_body_dependency = Annotated[type[Org], Depends(org_body_root_claims)] -def get_super_admin_list(): - return [] - -def empty_su_list(): - return [] - -su_list_dependency = Annotated[list[User], Depends(get_super_admin_list)] - -async def user_model_super_admin(user_model: user_model_claims_dependency, super_admin_emails: su_list_dependency): - if user_model.email in super_admin_emails: +async def user_model_super_admin(user_model: user_model_claims_dependency): + if is_super_admin(user_model): return user_model raise UnauthorizedException(message="Must be super admin") super_admin_dependency = Annotated[type[User], Depends(user_model_super_admin)] + + +# Override for testing +async def never_super_admin(user_model: user_model_claims_dependency): + def _is_super_admin(_user_model) -> bool: + super_admin_emails = [] + if _user_model.email not in super_admin_emails: + raise UnauthorizedException(message="Must be super admin") + return True + + if _is_super_admin(user_model): + return user_model + + raise UnauthorizedException(message="Must be super admin") diff --git a/test/conftest.py b/test/conftest.py index 416dcc6..2de749c 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -10,7 +10,7 @@ from src.organisation.models import Organisation as Org from src.contact.models import Contact from src.iam.models import Group, Permission from src.auth.service import get_current_user, get_dev_user -from src.auth.dependencies import empty_su_list, get_super_admin_list +from src.auth.dependencies import user_model_super_admin, never_super_admin from src.main import app # inited FastAPI app from src.database import engine, Base, get_db @@ -45,26 +45,13 @@ async def default_client(db_session) -> AsyncGenerator[AsyncClient, None]: app.dependency_overrides.clear() -@pytest.fixture -async def no_user_client(db_session) -> AsyncGenerator[AsyncClient, None]: - def get_db_override(): - return db_session - app.dependency_overrides[get_db] = get_db_override - transport = ASGITransport(app=app) - async with AsyncClient(transport=transport, base_url="http://localhost:8000/api/v1") as ac: - yield ac - - app.dependency_overrides.clear() - - - @pytest.fixture async def no_su_client(db_session) -> AsyncGenerator[AsyncClient, None]: def get_db_override(): return db_session app.dependency_overrides[get_db] = get_db_override app.dependency_overrides[get_current_user] = get_dev_user - app.dependency_overrides[get_super_admin_list] = empty_su_list + app.dependency_overrides[user_model_super_admin] = never_super_admin transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://localhost:8000/api/v1") as ac: yield ac diff --git a/test/test_auth_root.py b/test/test_auth_root.py deleted file mode 100644 index ea7abd9..0000000 --- a/test/test_auth_root.py +++ /dev/null @@ -1,154 +0,0 @@ -""" -This module ensures root user only endpoints do return a correctly formatted 401 when user is not the root user for the org -DELETE endpoints are not tested -""" -import pytest -from httpx import AsyncClient - -from .conftest import no_su_client - -from src.organisation.models import Organisation as Org -from src.user.models import User -from src.iam.models import Group - - -@pytest.fixture(autouse=True) -def add_second_org(db_session): - db_session.add(User(email="admin@test.org", first_name="Admin", last_name="Test", oidc_id="abcd-efgh-ijkl-4321")) - db_session.flush() - db_session.add(Org(name="Test Org Two", root_user_id=2, billing_contact_id=1, owner_contact_id=2, security_contact_id=3, - status="approved", intake_questionnaire={})) - db_session.flush() - - -@pytest.mark.anyio -async def test_get_org_auth_root(no_su_client: AsyncClient): - resp = await no_su_client.get("/org?org_id=2") - assert resp.status_code != 422 - assert resp.status_code == 401 - assert "Must be the org's root user" in resp.json()["detail"] - - -@pytest.mark.anyio -async def test_patch_org_questionnaire_auth_root(no_su_client: AsyncClient): - resp = await no_su_client.patch("/org/questionnaire", json={"organisation_id": 2, - "intake_questionnaire": {"question_one": "new answer one", - "question_two": None, - "question_three": None}, - "partial": True}) - assert resp.status_code != 422 - assert resp.status_code == 401 - assert "Must be the org's root user" in resp.json()["detail"] - - -@pytest.mark.anyio -async def test_get_org_users_auth_root(no_su_client: AsyncClient): - resp = await no_su_client.get("/org/users?org_id=2") - assert resp.status_code != 422 - assert resp.status_code == 401 - assert "Must be the org's root user" in resp.json()["detail"] - - -@pytest.mark.anyio -async def test_post_org_user_auth_root(no_su_client: AsyncClient, db_session): - db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234")) - db_session.flush() - - resp = await no_su_client.post("/org/user", json={"organisation_id": 2, "user_id": 2}) - assert resp.status_code != 422 - assert resp.status_code == 401 - assert "Must be the org's root user" in resp.json()["detail"] - - -@pytest.mark.anyio -async def test_get_org_groups_auth_root(no_su_client: AsyncClient): - resp = await no_su_client.get("/org/groups?org_id=2") - assert resp.status_code != 422 - assert resp.status_code == 401 - assert "Must be the org's root user" in resp.json()["detail"] - - -@pytest.mark.anyio -async def test_get_org_contact_auth_root(no_su_client: AsyncClient): - resp = await no_su_client.get(f"/org/contact?org_id=2&contact_type=billing") - assert resp.status_code != 422 - assert resp.status_code == 401 - assert "Must be the org's root user" in resp.json()["detail"] - - -@pytest.mark.anyio -async def test_patch_org_contact_auth_root(no_su_client: AsyncClient): - resp = await no_su_client.patch("/org/contact", - json={"organisation_id": 2, "contact_type": "billing", "email": "user@example.com"}) - assert resp.status_code != 422 - assert resp.status_code == 401 - assert "Must be the org's root user" in resp.json()["detail"] - - -@pytest.mark.anyio -async def test_get_service_auth_root(no_su_client: AsyncClient): - resp = await no_su_client.get("/service/?org_id=2") - assert resp.status_code != 422 - assert resp.status_code == 401 - assert "Must be the org's root user" in resp.json()["detail"] - - -@pytest.mark.anyio -async def test_get_iam_group_permissions_auth_root(no_su_client: AsyncClient): - resp = await no_su_client.get("/iam/group/permissions?org_id=2&group_id=1") - assert resp.status_code != 422 - assert resp.status_code == 401 - assert "Must be the org's root user" in resp.json()["detail"] - - -@pytest.mark.anyio -async def test_get_iam_group_users_auth_root(no_su_client: AsyncClient): - resp = await no_su_client.get("/iam/group/users?org_id=2&group_id=1") - assert resp.status_code != 422 - assert resp.status_code == 401 - assert "Must be the org's root user" in resp.json()["detail"] - - -@pytest.mark.anyio -async def test_post_iam_group_auth_root(no_su_client: AsyncClient): - resp = await no_su_client.post("/iam/group", json={"name": "New Group", "organisation_id": 2}) - assert resp.status_code != 422 - assert resp.status_code == 401 - assert "Must be the org's root user" in resp.json()["detail"] - - -@pytest.mark.anyio -async def test_put_iam_group_permission_auth_root(no_su_client: AsyncClient, db_session): - db_session.add(Group(name="Test Group Two", org_id=2)) - db_session.flush() - resp = await no_su_client.put("/iam/group/permission", json={"permission_id": 1, "group_id": 2, "organisation_id": 2}) - assert resp.status_code != 422 - assert resp.status_code == 401 - assert "Must be the org's root user" in resp.json()["detail"] - - -@pytest.mark.anyio -async def test_put_iam_group_user_auth_root(no_su_client: AsyncClient, db_session): - db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234")) - db_session.flush() - - resp = await no_su_client.put("/iam/group/user", json={"user_id": 2, "group_id": 1, "organisation_id": 2}) - assert resp.status_code != 422 - assert resp.status_code == 401 - assert "Must be the org's root user" in resp.json()["detail"] - - -@pytest.mark.anyio -async def test_get_iam_permissions_auth_root(no_su_client: AsyncClient): - resp = await no_su_client.get("/iam/permissions?org_id=2") - assert resp.status_code != 422 - assert resp.status_code == 401 - assert "Must be the org's root user" in resp.json()["detail"] - - -@pytest.mark.anyio -async def test_post_iam_permissions_search_auth_root(no_su_client: AsyncClient): - resp = await no_su_client.post("/iam/permissions/search", json={"organisation_id": 2, "action": "read"}) - assert resp.status_code != 422 - assert resp.status_code == 401 - assert "Must be the org's root user" in resp.json()["detail"] diff --git a/test/test_auth_user.py b/test/test_auth_user.py deleted file mode 100644 index e5ce189..0000000 --- a/test/test_auth_user.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -This testing module removes the testing user override to verify that endpoints with only the user requirement return a 401 error when not logged in -""" -import pytest -from httpx import AsyncClient - -from .conftest import no_user_client - - -@pytest.mark.anyio -async def test_get_self_db(no_user_client: AsyncClient): - resp = await no_user_client.get("/user/self/db") - assert resp.status_code != 422 - assert resp.status_code == 401 - assert resp.json()["detail"] == "Not authenticated" - - -@pytest.mark.anyio -async def test_post_org_success(no_user_client: AsyncClient): - resp = await no_user_client.post("/org", json={"name": "New Test Org"}) - assert resp.status_code != 422 - assert resp.status_code == 401 - assert resp.json()["detail"] == "Not authenticated"