From f600664789aabc4bef05fd9f51570d847991c9e7 Mon Sep 17 00:00:00 2001 From: luxferre Date: Fri, 5 Jun 2026 09:10:55 +0100 Subject: [PATCH] tests: improved coverage --- src/iam/router.py | 2 +- src/organisation/router.py | 4 +- test/test_auth_general.py | 26 +++++++ test/test_iam.py | 140 +++++++++++++++++++++++++++++++++++-- test/test_organisation.py | 17 ++++- 5 files changed, 178 insertions(+), 11 deletions(-) create mode 100644 test/test_auth_general.py diff --git a/src/iam/router.py b/src/iam/router.py index e360615..ce0074e 100644 --- a/src/iam/router.py +++ b/src/iam/router.py @@ -88,7 +88,7 @@ async def get_group_permissions(group_model: group_model_query_dependency, org_m @router.get("/group/users", response_model=IAMGetGroupUsersResponse) async def get_group_users(group_model: group_model_query_dependency, org_model: org_model_root_claim_query_dependency): if group_model.org_id != org_model.id: - raise UnauthorizedException("User does not belong to this organization") + raise UnauthorizedException("Group does not belong to this organization") return {"users": group_model.user_rel} diff --git a/src/organisation/router.py b/src/organisation/router.py index a9672b1..8e8bd36 100644 --- a/src/organisation/router.py +++ b/src/organisation/router.py @@ -137,8 +137,6 @@ async def update_questionnaire(db: db_dependency, org_model: org_model_root_clai if hasattr(questionnaire_model, key): setattr(questionnaire_model, key, value) else: - if key == "partial" or key == "organisation_id": - continue raise UnprocessableContentException("Invalid keys in update request") # Allows for partially completed questionnaires to be saved without being submitted for review @@ -241,7 +239,7 @@ async def update_root_user(db: db_dependency, org_model: org_model_body_dependen Promotes an existing organisation user to the root user, giving them full control of the org. """ if user_model not in org_model.user_rel: - raise UnauthorizedException(message="This user does not belong to your organisation.") + raise UnprocessableContentException(message="This user does not belong to your organisation.") org_model.root_user_rel = user_model db.flush() response = OrgPatchRootResponse(name=org_model.name, root_user_email=org_model.root_user_email) diff --git a/test/test_auth_general.py b/test/test_auth_general.py new file mode 100644 index 0000000..533307b --- /dev/null +++ b/test/test_auth_general.py @@ -0,0 +1,26 @@ +""" +""" +import pytest +from httpx import AsyncClient + +from .conftest import no_su_client + +from src.organisation.models import Organisation as Org +from src.user.models import User +from src.iam.models import Group + + +@pytest.mark.anyio +async def test_get_org_auth_root_su(default_client: AsyncClient, db_session): + # If a super admin can access a resource when not the root user + db_session.add(User(email="admin@test.org", first_name="Admin", last_name="Test", oidc_id="abcd-efgh-ijkl-4321")) + db_session.flush() + db_session.add( + Org(name="Test Org Two", root_user_id=2, billing_contact_id=1, owner_contact_id=2, security_contact_id=3, + status="approved", intake_questionnaire={})) + db_session.flush() + + resp = await default_client.get("/org?org_id=2") + assert resp.status_code != 422 + assert resp.status_code == 200 + assert resp.json()["name"] == "Test Org Two" diff --git a/test/test_iam.py b/test/test_iam.py index 2d85925..fa52a0b 100644 --- a/test/test_iam.py +++ b/test/test_iam.py @@ -5,10 +5,11 @@ import pytest from httpx import AsyncClient from src.user.models import User -from .conftest import default_client, db_session - +from src.organisation.models import Organisation as Org from src.iam.models import Group +from .conftest import default_client, db_session + @pytest.mark.anyio async def test_post_act_on_resource_endpoint_success(default_client: AsyncClient): @@ -28,7 +29,48 @@ async def test_post_act_on_resource_endpoint_success(default_client: AsyncClient assert data == True +@pytest.mark.parametrize( + "service, api_key", + [ + ("Test Service", "not_the_correct_key"), + ("Test Service Two", "123456789") + ], +) @pytest.mark.anyio +async def test_act_on_resource_wrong_key(default_client: AsyncClient, db_session, service: str, api_key: str): + body = { + "service": service, + "organisation": "Test Org", + "resource": "test_resource" + } + headers = { + "Authorization": "Bearer not_checked_when_auth_is_disabled", + "X-API-Key": api_key + } + resp = await default_client.post("/iam/can_act_on_resource?action=read", json=body, headers=headers) + data = resp.json() + + assert resp.status_code == 401 + assert data["detail"] == "Invalid API key" + + +@pytest.mark.anyio +async def test_act_on_resource_missing_key(default_client: AsyncClient): + body = { + "service": "Test Service", + "organisation": "Test Org", + "resource": "test_resource" + } + headers = { + "Authorization": "Bearer not_checked_when_auth_is_disabled" + } + resp = await default_client.post("/iam/can_act_on_resource?action=read", json=body, headers=headers) + data = resp.json() + + assert resp.status_code == 401 + assert data["detail"] == "Missing API key" + + @pytest.mark.parametrize( "service, org, resource, action, expected_status", [ @@ -41,7 +83,7 @@ async def test_post_act_on_resource_endpoint_success(default_client: AsyncClient ], ) @pytest.mark.anyio -async def test_post_act_on_resource_endpoint_failure(default_client: AsyncClient, service, org, resource, action, +async def test_act_on_resource_endpoint_failure(default_client: AsyncClient, service, org, resource, action, expected_status: int): body = { "service": service, @@ -57,6 +99,34 @@ async def test_post_act_on_resource_endpoint_failure(default_client: AsyncClient assert resp.status_code == expected_status +@pytest.mark.parametrize( + "service, org, resource, action, expected_response", + [ + ("Test Service", "Test Org", "test_resource", "read", True), + ("Test Service", "Test Org", "test_resource", "create", False), + ("Test Service", "Test Org", "no_access_here", "read", False), + ("Test Service", "Test Org Two", "test_resource", "read", False), + ], +) +@pytest.mark.anyio +async def test_act_on_resource_logic(default_client: AsyncClient, db_session, service, org, resource, action, + expected_response: bool): + body = { + "service": service, + "organisation": org, + "resource": resource + } + headers = { + "Authorization": "Bearer not_checked_when_auth_is_disabled", + "X-API-Key": "123456789" + } + resp = await default_client.post(f"/iam/can_act_on_resource?action={action}", json=body, headers=headers) + data = resp.json() + + assert resp.status_code == 200 + assert data == expected_response + + @pytest.mark.anyio async def test_get_group_permissions_success(default_client: AsyncClient): resp = await default_client.get("/iam/group/permissions?org_id=1&group_id=1") @@ -73,14 +143,16 @@ async def test_get_group_permissions_success(default_client: AsyncClient): @pytest.mark.parametrize( "query, expected_status", [ - ("org_id=2&group_id=1", 404), # Non-exists org, valid group + ("org_id=42&group_id=1", 404), # Non-exists org, valid group ("org_id=banana&group_id=1", 422), # Invalid org, valid group ("org_id=&group_id=1", 422), # Blank org, valid group ("org_id=-1&group_id=1", 422), # Negative org, valid group ("group_id=1", 422), # Only group + ("org_id=1&group_id=2", 404), # Group/Org mismatch + ("org_id=2&group_id=1", 404), # Group/Org mismatch ("", 422), # Blank query ("org_id=&group_id=", 422), # Both blank - ("org_id=1&group_id=2", 404), # Valid org, non-exists group + ("org_id=1&group_id=42", 404), # Valid org, non-exists group ("org_id=1&group_id=banana", 422), # Valid org, invalid group ("org_id=1&group_id=", 422), # Valid org, blank group ("org_id=1&group_id=-1", 422), # Valid org, negative group @@ -88,12 +160,30 @@ async def test_get_group_permissions_success(default_client: AsyncClient): ], ) @pytest.mark.anyio -async def test_get_group_permissions_failure(default_client: AsyncClient, query: str, expected_status: int): +async def test_get_group_permissions_failure(default_client: AsyncClient, db_session, query: str, expected_status: int): resp = await default_client.get(f"/iam/group/permissions?{query}") assert resp.status_code == expected_status +@pytest.mark.parametrize( + "query", + [ + "org_id=1&group_id=2", + "org_id=2&group_id=1", + ], +) +@pytest.mark.anyio +async def test_get_group_permissions_mismatch(default_client: AsyncClient, db_session, query: str): + db_session.add(Org(name="Another Test Org", root_user_id=1, billing_contact_id=1, owner_contact_id=2, security_contact_id=3, status="approved")) + db_session.add(Group(name="Another Test Group", org_id=2)) + db_session.flush() + resp = await default_client.get(f"/iam/group/permissions?{query}") + + assert resp.status_code == 401 + assert resp.json()["detail"] == "Group does not belong to this organization" + + @pytest.mark.anyio async def test_get_group_users_success(default_client: AsyncClient): resp = await default_client.get("/iam/group/users?org_id=1&group_id=1") @@ -132,6 +222,24 @@ async def test_get_group_users_failure(default_client: AsyncClient, query: str, assert resp.status_code == expected_status +@pytest.mark.parametrize( + "query", + [ + "org_id=1&group_id=2", + "org_id=2&group_id=1", + ], +) +@pytest.mark.anyio +async def test_get_group_users_mismatch(default_client: AsyncClient, db_session, query: str): + db_session.add(Org(name="Another Test Org", root_user_id=1, billing_contact_id=1, owner_contact_id=2, security_contact_id=3, status="approved")) + db_session.add(Group(name="Another Test Group", org_id=2)) + db_session.flush() + resp = await default_client.get(f"/iam/group/users?{query}") + + assert resp.status_code == 401 + assert resp.json()["detail"] == "Group does not belong to this organization" + + @pytest.mark.anyio async def test_post_group_success(default_client: AsyncClient): resp = await default_client.post("/iam/group", json={"name": "New Group", "organisation_id": 1}) @@ -214,6 +322,8 @@ async def test_put_group_perm_success(default_client: AsyncClient, db_session): ({"group_id": 1, "permission_id": 1}, 422), # Missing organisation ({"organisation_id": 1, "group_id": 1}, 422), # Missing permission + ({"organisation_id": 1, "group_id": 1, "permission_id": 1}, 409), # Permission already in group + ], ) @pytest.mark.anyio @@ -223,6 +333,24 @@ async def test_put_group_perm_failure(default_client: AsyncClient, body: dict[st assert resp.status_code == expected_status +@pytest.mark.parametrize( + "body", + [ + {"organisation_id": 1, "group_id": 2, "permission_id": 1}, + {"organisation_id": 2, "group_id": 1, "permission_id": 1}, + ], +) +@pytest.mark.anyio +async def test_put_group_perm_mismatch(default_client: AsyncClient, db_session, body: dict): + db_session.add(Org(name="Another Test Org", root_user_id=1, billing_contact_id=1, owner_contact_id=2, security_contact_id=3, status="approved")) + db_session.add(Group(name="Another Test Group", org_id=2)) + db_session.flush() + resp = await default_client.put(f"/iam/group/permission", json=body) + + assert resp.status_code == 401 + assert resp.json()["detail"] == "Group does not belong to this organization" + + @pytest.mark.anyio async def test_put_group_user_success(default_client: AsyncClient, db_session): db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234")) diff --git a/test/test_organisation.py b/test/test_organisation.py index 83f5074..bde5df0 100644 --- a/test/test_organisation.py +++ b/test/test_organisation.py @@ -201,10 +201,11 @@ async def test_post_org_user_success(default_client: AsyncClient, db_session): ({"organisation_id": 1, "user_id": "id"}, 422), ({"user_id": 2}, 422), ({"organisation_id": 1, "user_id": 42}, 404), + ({"organisation_id": 1, "user_id": 1}, 409), ], ) @pytest.mark.anyio -async def test_post_org_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int, db_session): +async def test_post_org_user_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int, db_session): db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234")) db_session.flush() @@ -252,6 +253,18 @@ async def test_patch_root_user_failure(default_client: AsyncClient, body: dict[s assert resp.status_code == expected_status +@pytest.mark.anyio +async def test_patch_org_root_user_non_member(default_client: AsyncClient, db_session): + db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234")) + db_session.flush() + + resp = await default_client.patch("/org/root_user", json={"organisation_id": 1, "user_id": 2}) + data = resp.json() + + assert resp.status_code == 422 + assert data["detail"] == "This user does not belong to your organisation." + + @pytest.mark.anyio async def test_get_org_groups_success(default_client: AsyncClient): resp = await default_client.get("/org/groups?org_id=1") @@ -367,6 +380,8 @@ async def test_patch_org_contact_success(default_client: AsyncClient, key: str, "body, expected_status", [ ({"organisation_id": 42, "contact_type": "billing"}, 404), + ({"organisation_id": 1, "contact_type": "security"}, 200), + ({"organisation_id": 1, "contact_type": "owner"}, 200), ({"organisation_id": "Test Org", "contact_type": "billing"}, 422), ({"organisation_id": "", "contact_type": "billing"}, 422), ({}, 422),