""" """ import pytest from httpx import AsyncClient from .conftest import generate_query_and_status, generate_body_and_status pytestmark = [ pytest.mark.iam_module, ] @pytest.mark.anyio async def test_post_act_on_resource_endpoint_success(default_client: AsyncClient): body = { "rn": { "service": "Test Service", "organisation_id": 1, "resource": "test_resource", "instance": None, }, "action": "read", } headers = { "Authorization": "Bearer not_checked_when_auth_is_disabled", "X-API-Key": "123456789", } resp = await default_client.post( "/iam/can_act_on_resource", json=body, headers=headers ) data = resp.json() assert resp.status_code == 200 assert data["allowed"] is True print(data) @pytest.mark.parametrize( "service, api_key", [("Test Service", "not_the_correct_key"), ("Test Service Two", "123456789")], ) @pytest.mark.anyio async def test_act_on_resource_wrong_key( default_client: AsyncClient, service: str, api_key: str ): body = { "rn": { "service": service, "organisation": "Test Org", "resource": "test_resource", }, "action": "read", } headers = { "Authorization": "Bearer not_checked_when_auth_is_disabled", "X-API-Key": api_key, } resp = await default_client.post( "/iam/can_act_on_resource", json=body, headers=headers ) data = resp.json() assert resp.status_code == 401 assert data["detail"] == "Invalid API key" @pytest.mark.anyio async def test_act_on_resource_missing_key(default_client: AsyncClient): body = { "rn": { "service": "Test Service", "organisation": "Test Org", "resource": "test_resource", }, "action": "read", } headers = {"Authorization": "Bearer not_checked_when_auth_is_disabled"} resp = await default_client.post( "/iam/can_act_on_resource?action=read", json=body, headers=headers ) data = resp.json() assert resp.status_code == 401 assert data["detail"] == "Missing API key" @pytest.mark.parametrize( "service, org, resource, action, expected_status", [ (None, "Test Org", "test_resource", "read", 422), (42, "Test Org", "test_resource", "read", 422), ("Test Service", None, "test_resource", "read", 422), ("Test Service", 42, "test_resource", "read", 422), ("Test Service", "Test Org", None, "read", 422), ("Test Service", "Test Org", 42, "read", 422), ("Test Service", "Test Org", "test_resource", None, 422), ("Test Service", "Test Org", "test_resource", 42, 422), ], ) @pytest.mark.anyio async def test_act_on_resource_endpoint_status_checks( default_client: AsyncClient, service, org, resource, action, expected_status: int ): body = { "rn": {"service": service, "organisation": org, "resource": resource}, "action": action, } headers = { "Authorization": "Bearer not_checked_when_auth_is_disabled", "X-API-Key": "123456789", } resp = await default_client.post( "/iam/can_act_on_resource", json=body, headers=headers ) assert resp.status_code == expected_status @pytest.mark.parametrize( "service, org, resource, action, expected_response", [ ("Test Service", 1, "test_resource", "read", True), ("Test Service", 1, "test_resource", "create", False), ("Test Service", 1, "no_access_here", "read", False), ("Test Service", 2, "test_resource", "read", False), ], ) @pytest.mark.anyio async def test_act_on_resource_logic( default_client: AsyncClient, service, org, resource, action, expected_response: bool, ): body = { "rn": {"service": service, "organisation_id": org, "resource": resource}, "action": action, } headers = { "Authorization": "Bearer not_checked_when_auth_is_disabled", "X-API-Key": "123456789", } resp = await default_client.post( "/iam/can_act_on_resource", json=body, headers=headers ) data = resp.json() assert resp.status_code == 200 assert data["allowed"] == expected_response @pytest.mark.anyio async def test_get_group_permissions_success(default_client: AsyncClient): resp = await default_client.get("/iam/group/permissions?org_id=1&group_id=1") assert resp.status_code == 200 data = resp.json() assert "permissions" in data assert isinstance(data["permissions"], list) permission = data["permissions"][0] assert permission["id"] == 1 assert permission["service_name"] == "Test Service" assert permission["resource"] == "test_resource" assert permission["action"] == "read" @pytest.mark.parametrize( "query, expected_status", generate_query_and_status(["group_id", "org_id"]) ) @pytest.mark.anyio async def test_get_group_permissions_status_checks( default_client: AsyncClient, query: str, expected_status: int ): resp = await default_client.get(f"/iam/group/permissions?{query}") assert resp.status_code == expected_status @pytest.mark.parametrize( "query", [ "org_id=1&group_id=2", "org_id=2&group_id=1", ], ) @pytest.mark.anyio async def test_get_group_permissions_mismatch(default_client: AsyncClient, query: str): resp = await default_client.get(f"/iam/group/permissions?{query}") assert resp.status_code == 403 assert resp.json()["detail"] == "Group does not belong to this organization" @pytest.mark.anyio async def test_get_group_users_success(default_client: AsyncClient): resp = await default_client.get("/iam/group/users?org_id=1&group_id=1") assert resp.status_code == 200 data = resp.json() assert "users" in data assert isinstance(data["users"], list) user = data["users"][0] assert user["id"] == 1 assert user["email"] == "admin@test.com" assert "group" in data assert isinstance(data["group"], dict) assert data["group"]["id"] == 1 assert data["group"]["name"] == "Org One Group" assert "organisation" in data assert isinstance(data["organisation"], dict) assert data["organisation"]["id"] == 1 assert data["organisation"]["name"] == "Org One" @pytest.mark.parametrize( "query, expected_status", generate_query_and_status(["group_id", "org_id"]) ) @pytest.mark.anyio async def test_get_group_users_status_checks( default_client: AsyncClient, query: str, expected_status: int ): resp = await default_client.get(f"/iam/group/users?{query}") assert resp.status_code == expected_status @pytest.mark.parametrize( "query", [ "org_id=1&group_id=2", "org_id=2&group_id=1", ], ) @pytest.mark.anyio async def test_get_group_users_mismatch(default_client: AsyncClient, query: str): resp = await default_client.get(f"/iam/group/users?{query}") assert resp.status_code == 403 assert resp.json()["detail"] == "Group does not belong to this organization" @pytest.mark.anyio async def test_post_group_success(default_client: AsyncClient): resp = await default_client.post( "/iam/group", json={"name": "New Group", "organisation_id": 1} ) assert resp.status_code == 201 data = resp.json() assert "group" in data assert isinstance(data["group"], dict) assert data["group"]["name"] == "New Group" assert data["group"]["id"] == 4 @pytest.mark.parametrize( "body, expected_status", generate_body_and_status({"organisation_id": "int", "name": "str"}), ) @pytest.mark.anyio async def test_post_group_status_checks( default_client: AsyncClient, body: dict[str, str], expected_status: int ): resp = await default_client.post("/iam/group", json=body) assert resp.status_code == expected_status @pytest.mark.anyio async def test_post_group_conflict(default_client: AsyncClient): resp = await default_client.post( "/iam/group", json={"organisation_id": 1, "name": "Org One Group"} ) assert resp.status_code == 409 @pytest.mark.anyio async def test_post_group_non_conflict(default_client: AsyncClient): resp = await default_client.post( "/iam/group", json={"organisation_id": 2, "name": "Org One Group"} ) assert resp.status_code == 201 @pytest.mark.anyio async def test_put_group_perm_success(default_client: AsyncClient): resp = await default_client.put( "/iam/group/permission", json={"permission_id": 1, "group_id": 3, "organisation_id": 1}, ) assert resp.status_code == 200 data = resp.json() assert "group" in data assert isinstance(data["group"], dict) assert data["group"]["name"] == "Org One Group Two" assert data["group"]["id"] == 3 assert "permissions" in data assert isinstance(data["permissions"], list) permission = data["permissions"][0] assert permission["id"] == 1 assert permission["service_name"] == "Test Service" assert permission["resource"] == "test_resource" assert permission["action"] == "read" @pytest.mark.parametrize( "body, expected_status", generate_body_and_status( {"organisation_id": "int", "group_id": "int", "permission_id": "int"} ), ) @pytest.mark.anyio async def test_put_group_perm_status_checks( default_client: AsyncClient, body: dict[str, str], expected_status: int ): resp = await default_client.put("/iam/group/permission", json=body) assert resp.status_code == expected_status @pytest.mark.anyio async def test_put_group_perm_conflict(default_client: AsyncClient): resp = await default_client.put( "/iam/group/permission", json={"organisation_id": 1, "group_id": 1, "permission_id": 1}, ) assert resp.status_code == 409 @pytest.mark.parametrize( "body", [ {"organisation_id": 1, "group_id": 2, "permission_id": 1}, {"organisation_id": 2, "group_id": 1, "permission_id": 1}, ], ) @pytest.mark.anyio async def test_put_group_perm_mismatch(default_client: AsyncClient, body: dict): resp = await default_client.put("/iam/group/permission", json=body) assert resp.status_code == 403 assert resp.json()["detail"] == "Group does not belong to this organization" @pytest.mark.anyio async def test_put_group_user_success(default_client: AsyncClient): resp = await default_client.put( "/iam/group/user", json={"user_id": 2, "group_id": 1, "organisation_id": 1} ) assert resp.status_code == 200 data = resp.json() assert "group" in data assert isinstance(data["group"], dict) assert data["group"]["name"] == "Org One Group" assert data["group"]["id"] == 1 assert "users" in data assert isinstance(data["users"], list) user = data["users"][1] assert user["id"] == 2 assert user["first_name"] == "User" assert user["last_name"] == "Test" assert user["email"] == "user@orgone.com" @pytest.mark.parametrize( "body, expected_status", generate_body_and_status( {"organisation_id": "int", "group_id": "int", "user_id": "int"} ), ) @pytest.mark.anyio async def test_put_group_user_status_checks( default_client: AsyncClient, body: dict[str, str], expected_status: int ): resp = await default_client.put("/iam/group/user", json=body) assert resp.status_code == expected_status @pytest.mark.anyio async def test_get_permissions_success(default_client: AsyncClient): resp = await default_client.get("/iam/permissions?org_id=1") data = resp.json() assert resp.status_code == 200 assert "permissions" in data assert isinstance(data["permissions"], list) permission = data["permissions"][0] assert permission["id"] == 1 assert permission["service_name"] == "Test Service" assert permission["resource"] == "test_resource" assert permission["action"] == "read" @pytest.mark.parametrize( "query, expected_status", generate_query_and_status(["org_id"]) ) @pytest.mark.anyio async def test_get_permissions_status_checks( default_client: AsyncClient, query: str, expected_status: int ): resp = await default_client.get(f"/iam/permissions?{query}") assert resp.status_code == expected_status @pytest.mark.anyio async def test_post_perm_success(default_client: AsyncClient): resp = await default_client.post( "/iam/permission", json={"service_id": 1, "resource": "test_resource", "action": "create"}, ) assert resp.status_code == 201 data = resp.json() assert "permission" in data assert isinstance(data["permission"], dict) assert data["permission"]["id"] == 4 assert data["permission"]["service_name"] == "Test Service" assert data["permission"]["resource"] == "test_resource" assert data["permission"]["action"] == "create" @pytest.mark.parametrize( "body, expected_status", [ ( {"service_id": 1, "resource": "test_resource", "action": "read"}, 409, ), # service_id tests ( {"service_id": 42, "resource": "test_resource", "action": "read"}, 404, ), # Non-existent service ( {"service_id": "banana", "resource": "test_resource", "action": "read"}, 422, ), # Invalid service ID ( {"service_id": "", "resource": "test_resource", "action": "read"}, 422, ), # Blank service ID ( {"service_id": -1, "resource": "test_resource", "action": "read"}, 422, ), # Negative service ID # resource tests ( {"service_id": 1, "resource": 42, "action": "read"}, 422, ), # Invalid resource type # action tests ( {"service_id": 1, "resource": "test_resource", "action": 42}, 422, ), # Invalid action type # missing/partial body tests ({}, 422), # Blank body ({"resource": "test_resource"}, 422), # Only resource ({"action": "read"}, 422), # Only action ({"service_id": 1}, 422), # Only service ({"service_id": 1, "action": "read"}, 422), # Missing resource ({"service_id": 1, "resource": "test_resource"}, 422), # Missing action ({"resource": "test_resource", "action": "read"}, 422), # Missing service ], ) @pytest.mark.anyio async def test_post_perm_status_checks( default_client: AsyncClient, body: dict[str, str], expected_status: int ): resp = await default_client.post("/iam/permission", json=body) assert resp.status_code == expected_status @pytest.mark.parametrize( "body", [ { "organisation_id": 1, "service_id": 1, "resource": "test_resource", "action": "read", }, {"organisation_id": 1, "service_id": 1}, {"organisation_id": 1, "resource": "test_resource"}, {"organisation_id": 1, "action": "read"}, {"organisation_id": 1, "service_id": 1, "action": "read"}, {"organisation_id": 1, "service_id": 1, "resource": "test_resource"}, {"organisation_id": 1, "resource": "test_resource", "action": "read"}, ], ) @pytest.mark.anyio async def test_post_perm_search_success(default_client: AsyncClient, body): resp = await default_client.post("/iam/permissions/search", json=body) data = resp.json() assert resp.status_code == 200 assert "permissions" in data assert isinstance(data["permissions"], list) permissions_filtered = [ permission for permission in data["permissions"] if permission["id"] == 1 ] assert len(permissions_filtered) == 1 permission = permissions_filtered[0] assert permission["id"] == 1 assert permission["service_name"] == "Test Service" assert permission["resource"] == "test_resource" assert permission["action"] == "read" @pytest.mark.parametrize( "body, expected_status", [ # organisation_id tests ( { "organisation_id": 42, "service_id": 1, "resource": "test_resource", "action": "read", }, 404, ), # Non-existent organisation ( { "organisation_id": "banana", "service_id": 1, "resource": "test_resource", "action": "read", }, 422, ), # Invalid organisation ID ( { "organisation_id": "", "service_id": 1, "resource": "test_resource", "action": "read", }, 422, ), # Blank organisation ID ( { "organisation_id": -1, "service_id": 1, "resource": "test_resource", "action": "read", }, 422, ), # Negative organisation ID # service_id tests ( { "organisation_id": 1, "service_id": "banana", "resource": "test_resource", "action": "read", }, 422, ), # Invalid service ID ( { "organisation_id": 1, "service_id": "", "resource": "test_resource", "action": "read", }, 422, ), # Blank service ID ( { "organisation_id": 1, "service_id": -1, "resource": "test_resource", "action": "read", }, 422, ), # Negative service ID # resource tests ( {"organisation_id": 1, "service_id": 1, "resource": 42, "action": "read"}, 422, ), # Invalid resource type # action tests ( { "organisation_id": 1, "service_id": 1, "resource": "test_resource", "action": 42, }, 422, ), # Invalid action type # missing/partial body tests ({}, 422), # Blank body ], ) @pytest.mark.anyio async def test_post_perm_search_status_checks( default_client: AsyncClient, body: dict[str, str], expected_status: int ): resp = await default_client.post("/iam/permissions/search", json=body) assert resp.status_code == expected_status @pytest.mark.anyio async def test_delete_group_permissions_success(default_client: AsyncClient): resp = await default_client.delete( "/iam/group/permission?org_id=1&group_id=1&perm_id=1" ) data = resp.json() assert resp.status_code == 200 assert "permissions" in data assert isinstance(data["permissions"], list) assert len(data["permissions"]) == 0 assert "group" in data assert data["group"]["id"] == 1 assert data["group"]["name"] == "Org One Group" @pytest.mark.parametrize( "query, expected_status", generate_query_and_status(["group_id", "org_id", "perm_id"]), ) @pytest.mark.anyio async def test_delete_group_permissions_status_checks( default_client: AsyncClient, query: str, expected_status: int ): resp = await default_client.delete(f"/iam/group/permission?{query}") assert resp.status_code == expected_status @pytest.mark.anyio async def test_delete_group_permissions_not_in_group(default_client: AsyncClient): resp = await default_client.delete( "/iam/group/permission?org_id=1&group_id=1&perm_id=2" ) assert resp.status_code == 422 @pytest.mark.anyio async def test_delete_permissions_success(default_client: AsyncClient): resp = await default_client.delete("/iam/permission?perm_id=1") assert resp.status_code == 204 @pytest.mark.anyio async def test_delete_group_users_success(default_client: AsyncClient): resp = await default_client.delete("/iam/group/user?org_id=1&group_id=1&user_id=1") data = resp.json() assert resp.status_code == 200 assert "users" in data assert isinstance(data["users"], list) assert len(data["users"]) == 0 assert "group" in data assert data["group"]["id"] == 1 assert data["group"]["name"] == "Org One Group" @pytest.mark.anyio async def test_put_group_user_invitation_success(default_client: AsyncClient): body = {"user_email": "admin@test.com", "organisation_id": 1, "group_id": 1} resp = await default_client.put("/iam/group/user/invitation", json=body) assert resp.status_code == 200 data = resp.json() assert "organisation" in data assert isinstance(data["organisation"], dict) assert data["organisation"]["id"] == 1 assert data["organisation"]["name"] == "Org One" assert "invited_email" in data assert isinstance(data["invited_email"], str) assert data["invited_email"] == "admin@test.com" assert "group" in data assert isinstance(data["group"], dict) assert data["group"]["name"] == "Org One Group" assert data["group"]["id"] == 1 @pytest.mark.parametrize( "body, expected_status", [ ({"organisation_id": 42, "user_email": "admin@test.com", "group_id": 1}, 404), ( { "organisation_id": "Test Org", "user_email": "admin@test.com", "group_id": 1, }, 422, ), ({"organisation_id": "", "user_email": "admin@test.com", "group_id": 1}, 422), ({}, 422), ({"user_email": 42, "group_id": 1}, 422), ({"organisation_id": 1, "user_email": "Test User", "group_id": 1}, 422), ({"organisation_id": 1, "user_email": "admin@test.com", "group_id": 42}, 404), ({"organisation_id": "Test Org", "user_email": "admin@test.com"}, 422), ({"organisation_id": "", "user_email": "admin@test.com"}, 422), ({"user_email": 42}, 422), ], ) @pytest.mark.anyio async def test_put_group_user_invitation_status_checks( default_client: AsyncClient, body, expected_status ): resp = await default_client.put("/iam/group/user/invitation", json=body) assert resp.status_code == expected_status @pytest.mark.parametrize( "body, expected_status", [ ({"jwt": "invalid"}, 401), ({"jwt": ""}, 401), ({"jwt": None}, 422), ({"jwt": 42}, 422), ], ) @pytest.mark.anyio async def test_put_group_user_invitation_accept_status_checks( default_client: AsyncClient, body, expected_status ): resp = await default_client.put("/iam/group/user/invitation/accept", json=body) assert resp.status_code == expected_status if resp.status_code == 401: assert resp.json()["detail"] == "Invalid JWS"