diff --git a/test/test_iam.py b/test/test_iam.py index c890925..314403c 100644 --- a/test/test_iam.py +++ b/test/test_iam.py @@ -1,9 +1,13 @@ """ """ +from datetime import timedelta, datetime, timezone +from typing import Any + import pytest from httpx import AsyncClient -from .conftest import generate_query_and_status, generate_body_and_status +from src.utils import generate_jwt +from .conftest import generate_query_and_status, generate_body_and_status, standard_test pytestmark = [ pytest.mark.iam_module, @@ -144,23 +148,6 @@ async def test_act_on_resource_logic( assert data["allowed"] == expected_response -@pytest.mark.anyio -async def test_get_group_permissions_success(default_client: AsyncClient): - resp = await default_client.get("/iam/group/permissions?org_id=1&group_id=1") - assert resp.status_code == 200 - - data = resp.json() - - assert "permissions" in data - assert isinstance(data["permissions"], list) - - permission = data["permissions"][0] - assert permission["id"] == 1 - assert permission["service_name"] == "Test Service" - assert permission["resource"] == "test_resource" - assert permission["action"] == "read" - - @pytest.mark.parametrize( "query, expected_status", generate_query_and_status(["group_id", "org_id"]) ) @@ -188,31 +175,6 @@ async def test_get_group_permissions_mismatch(default_client: AsyncClient, query assert resp.json()["detail"] == "Group does not belong to this organization" -@pytest.mark.anyio -async def test_get_group_users_success(default_client: AsyncClient): - resp = await default_client.get("/iam/group/users?org_id=1&group_id=1") - assert resp.status_code == 200 - - data = resp.json() - - assert "users" in data - assert isinstance(data["users"], list) - - user = data["users"][0] - assert user["id"] == 1 - assert user["email"] == "admin@test.com" - - assert "group" in data - assert isinstance(data["group"], dict) - assert data["group"]["id"] == 1 - assert data["group"]["name"] == "Org One Group" - - assert "organisation" in data - assert isinstance(data["organisation"], dict) - assert data["organisation"]["id"] == 1 - assert data["organisation"]["name"] == "Org One" - - @pytest.mark.parametrize( "query, expected_status", generate_query_and_status(["group_id", "org_id"]) ) @@ -240,21 +202,6 @@ async def test_get_group_users_mismatch(default_client: AsyncClient, query: str) assert resp.json()["detail"] == "Group does not belong to this organization" -@pytest.mark.anyio -async def test_post_group_success(default_client: AsyncClient): - resp = await default_client.post( - "/iam/group", json={"name": "New Group", "organisation_id": 1} - ) - assert resp.status_code == 201 - - data = resp.json() - - assert "group" in data - assert isinstance(data["group"], dict) - assert data["group"]["name"] == "New Group" - assert data["group"]["id"] == 4 - - @pytest.mark.parametrize( "body, expected_status", generate_body_and_status({"organisation_id": "int", "name": "str"}), @@ -286,31 +233,6 @@ async def test_post_group_non_conflict(default_client: AsyncClient): assert resp.status_code == 201 -@pytest.mark.anyio -async def test_put_group_perm_success(default_client: AsyncClient): - resp = await default_client.put( - "/iam/group/permission", - json={"permission_id": 1, "group_id": 3, "organisation_id": 1}, - ) - assert resp.status_code == 200 - - data = resp.json() - - assert "group" in data - assert isinstance(data["group"], dict) - assert data["group"]["name"] == "Org One Group Two" - assert data["group"]["id"] == 3 - - assert "permissions" in data - assert isinstance(data["permissions"], list) - - permission = data["permissions"][0] - assert permission["id"] == 1 - assert permission["service_name"] == "Test Service" - assert permission["resource"] == "test_resource" - assert permission["action"] == "read" - - @pytest.mark.parametrize( "body, expected_status", generate_body_and_status( @@ -351,30 +273,6 @@ async def test_put_group_perm_mismatch(default_client: AsyncClient, body: dict): assert resp.json()["detail"] == "Group does not belong to this organization" -@pytest.mark.anyio -async def test_put_group_user_success(default_client: AsyncClient): - resp = await default_client.put( - "/iam/group/user", json={"user_id": 2, "group_id": 1, "organisation_id": 1} - ) - assert resp.status_code == 200 - - data = resp.json() - - assert "group" in data - assert isinstance(data["group"], dict) - assert data["group"]["name"] == "Org One Group" - assert data["group"]["id"] == 1 - - assert "users" in data - assert isinstance(data["users"], list) - - user = data["users"][1] - assert user["id"] == 2 - assert user["first_name"] == "User" - assert user["last_name"] == "Test" - assert user["email"] == "user@orgone.com" - - @pytest.mark.parametrize( "body, expected_status", generate_body_and_status( @@ -390,22 +288,6 @@ async def test_put_group_user_status_checks( assert resp.status_code == expected_status -@pytest.mark.anyio -async def test_get_permissions_success(default_client: AsyncClient): - resp = await default_client.get("/iam/permissions?org_id=1") - data = resp.json() - - assert resp.status_code == 200 - assert "permissions" in data - assert isinstance(data["permissions"], list) - - permission = data["permissions"][0] - assert permission["id"] == 1 - assert permission["service_name"] == "Test Service" - assert permission["resource"] == "test_resource" - assert permission["action"] == "read" - - @pytest.mark.parametrize("query, expected_status", generate_query_and_status(["org_id"])) @pytest.mark.anyio async def test_get_permissions_status_checks( @@ -416,25 +298,6 @@ async def test_get_permissions_status_checks( assert resp.status_code == expected_status -@pytest.mark.anyio -async def test_post_perm_success(default_client: AsyncClient): - resp = await default_client.post( - "/iam/permission", - json={"service_id": 1, "resource": "test_resource", "action": "create"}, - ) - assert resp.status_code == 201 - - data = resp.json() - - assert "permission" in data - assert isinstance(data["permission"], dict) - - assert data["permission"]["id"] == 4 - assert data["permission"]["service_name"] == "Test Service" - assert data["permission"]["resource"] == "test_resource" - assert data["permission"]["action"] == "create" - - @pytest.mark.parametrize( "body, expected_status", [ @@ -621,22 +484,6 @@ async def test_post_perm_search_status_checks( assert resp.status_code == expected_status -@pytest.mark.anyio -async def test_delete_group_permissions_success(default_client: AsyncClient): - resp = await default_client.delete( - "/iam/group/permission?org_id=1&group_id=1&perm_id=1" - ) - data = resp.json() - - assert resp.status_code == 200 - assert "permissions" in data - assert isinstance(data["permissions"], list) - assert len(data["permissions"]) == 0 - assert "group" in data - assert data["group"]["id"] == 1 - assert data["group"]["name"] == "Org One Group" - - @pytest.mark.parametrize( "query, expected_status", generate_query_and_status(["group_id", "org_id", "perm_id"]), @@ -657,49 +504,6 @@ async def test_delete_group_permissions_not_in_group(default_client: AsyncClient assert resp.status_code == 422 -@pytest.mark.anyio -async def test_delete_permissions_success(default_client: AsyncClient): - resp = await default_client.delete("/iam/permission?perm_id=1") - - assert resp.status_code == 204 - - -@pytest.mark.anyio -async def test_delete_group_users_success(default_client: AsyncClient): - resp = await default_client.delete("/iam/group/user?org_id=1&group_id=1&user_id=1") - data = resp.json() - - assert resp.status_code == 200 - assert "users" in data - assert isinstance(data["users"], list) - assert len(data["users"]) == 0 - assert "group" in data - assert data["group"]["id"] == 1 - assert data["group"]["name"] == "Org One Group" - - -@pytest.mark.anyio -async def test_put_group_user_invitation_success(default_client: AsyncClient): - body = {"user_email": "admin@test.com", "organisation_id": 1, "group_id": 1} - resp = await default_client.put("/iam/group/user/invitation", json=body) - - assert resp.status_code == 200 - data = resp.json() - assert "organisation" in data - assert isinstance(data["organisation"], dict) - assert data["organisation"]["id"] == 1 - assert data["organisation"]["name"] == "Org One" - - assert "invited_email" in data - assert isinstance(data["invited_email"], str) - assert data["invited_email"] == "admin@test.com" - - assert "group" in data - assert isinstance(data["group"], dict) - assert data["group"]["name"] == "Org One Group" - assert data["group"]["id"] == 1 - - @pytest.mark.parametrize( "body, expected_status", [ @@ -750,3 +554,284 @@ async def test_put_group_user_invitation_accept_status_checks( if resp.status_code == 401: assert resp.json()["detail"] == "Invalid JWS" + + +@pytest.mark.anyio +async def test_get_group_permissions_standard( + default_client: AsyncClient, route_data: dict[str, dict[str, Any]] +): + method = "GET" + path = "/iam/group/permissions" + auth_level = "Root User" + query = "?org_id=1&group_id=1" + body = {} + expected_data = { + "organisation": {"id": 1, "name": "Org One"}, + "group": {"id": 1, "name": "Org One Group"}, + "permissions": [ + { + "id": 1, + "service_name": "Test Service", + "resource": "test_resource", + "action": "read", + } + ], + } + + await standard_test( + default_client, method, path, auth_level, query, body, expected_data, route_data + ) + + +@pytest.mark.anyio +async def test_get_group_users_standard( + default_client: AsyncClient, route_data: dict[str, dict[str, Any]] +): + method = "GET" + path = "/iam/group/users" + auth_level = "Root User" + query = "?org_id=1&group_id=1" + body = {} + expected_data = { + "organisation": {"id": 1, "name": "Org One"}, + "group": {"id": 1, "name": "Org One Group"}, + "users": [{"id": 1, "email": "admin@test.com"}], + } + + await standard_test( + default_client, method, path, auth_level, query, body, expected_data, route_data + ) + + +@pytest.mark.anyio +async def test_post_group_standard( + default_client: AsyncClient, route_data: dict[str, dict[str, Any]] +): + method = "POST" + path = "/iam/group" + auth_level = "Root User" + query = "" + body = {"name": "New Group", "organisation_id": 1} + expected_data = { + "organisation": {"id": 1, "name": "Org One"}, + "group": {"id": 4, "name": "New Group"}, + } + + await standard_test( + default_client, method, path, auth_level, query, body, expected_data, route_data + ) + + +@pytest.mark.anyio +async def test_put_group_permission_standard( + default_client: AsyncClient, route_data: dict[str, dict[str, Any]] +): + method = "PUT" + path = "/iam/group/permission" + auth_level = "Root User" + query = "" + body = {"permission_id": 1, "group_id": 3, "organisation_id": 1} + expected_data = { + "organisation": {"id": 1, "name": "Org One"}, + "group": {"id": 3, "name": "Org One Group Two"}, + "permissions": [ + { + "id": 1, + "service_name": "Test Service", + "resource": "test_resource", + "action": "read", + } + ], + } + + await standard_test( + default_client, method, path, auth_level, query, body, expected_data, route_data + ) + + +@pytest.mark.anyio +async def test_put_group_user_standard( + default_client: AsyncClient, route_data: dict[str, dict[str, Any]] +): + method = "PUT" + path = "/iam/group/user" + auth_level = "Root User" + query = "" + body = {"user_id": 2, "group_id": 1, "organisation_id": 1} + expected_data = { + "organisation": {"id": 1, "name": "Org One"}, + "group": {"id": 1, "name": "Org One Group"}, + "users": [ + { + "id": 1, + "first_name": "Admin", + "last_name": "Test", + "email": "admin@test.com", + }, + { + "id": 2, + "first_name": "User", + "last_name": "Test", + "email": "user@orgone.com", + }, + ], + } + + await standard_test( + default_client, method, path, auth_level, query, body, expected_data, route_data + ) + + +@pytest.mark.anyio +async def test_get_permissions_standard( + default_client: AsyncClient, route_data: dict[str, dict[str, Any]] +): + method = "GET" + path = "/iam/permissions" + auth_level = "Root User" + query = "?org_id=1" + body = {} + expected_data = { + "permissions": [ + { + "id": 1, + "service_name": "Test Service", + "resource": "test_resource", + "action": "read", + }, + { + "id": 2, + "service_name": "Test Service", + "resource": "test_resource", + "action": "move", + }, + ] + } + + await standard_test( + default_client, method, path, auth_level, query, body, expected_data, route_data + ) + + +@pytest.mark.anyio +async def test_post_permission_standard( + default_client: AsyncClient, route_data: dict[str, dict[str, Any]] +): + method = "POST" + path = "/iam/permission" + auth_level = "Super Admin" + query = "" + body = {"service_id": 1, "resource": "test_resource", "action": "create"} + expected_data = { + "permission": { + "id": 4, + "service_name": "Test Service", + "resource": "test_resource", + "action": "create", + } + } + + await standard_test( + default_client, method, path, auth_level, query, body, expected_data, route_data + ) + + +@pytest.mark.anyio +async def test_delete_group_permission_standard( + default_client: AsyncClient, route_data: dict[str, dict[str, Any]] +): + method = "DELETE" + path = "/iam/group/permission" + auth_level = "Root User" + query = "?org_id=1&group_id=1&perm_id=1" + body = {} + expected_data = {"group": {"id": 1, "name": "Org One Group"}, "permissions": []} + + await standard_test( + default_client, method, path, auth_level, query, body, expected_data, route_data + ) + + +@pytest.mark.anyio +async def test_delete_permission_standard( + default_client: AsyncClient, route_data: dict[str, dict[str, Any]] +): + method = "DELETE" + path = "/iam/permission" + auth_level = "Super Admin" + query = "?perm_id=1" + body = {} + expected_data = {} + + await standard_test( + default_client, method, path, auth_level, query, body, expected_data, route_data + ) + + +@pytest.mark.anyio +async def test_delete_group_user_standard( + default_client: AsyncClient, route_data: dict[str, dict[str, Any]] +): + method = "DELETE" + path = "/iam/group/user" + auth_level = "Root User" + query = "?org_id=1&group_id=1&user_id=1" + body = {} + expected_data = {"group": {"id": 1, "name": "Org One Group"}, "users": []} + + await standard_test( + default_client, method, path, auth_level, query, body, expected_data, route_data + ) + + +@pytest.mark.anyio +async def test_put_group_user_invitation_standard( + default_client: AsyncClient, route_data: dict[str, dict[str, Any]] +): + method = "PUT" + path = "/iam/group/user/invitation" + auth_level = "Root User" + query = "" + body = {"user_email": "admin@test.com", "organisation_id": 1, "group_id": 1} + expected_data = { + "group": {"id": 1, "name": "Org One Group"}, + "organisation": {"id": 1, "name": "Org One"}, + "invited_email": "admin@test.com", + } + + await standard_test( + default_client, method, path, auth_level, query, body, expected_data, route_data + ) + + +@pytest.mark.anyio +async def test_put_group_user_invitation_accept_standard( + default_client: AsyncClient, route_data: dict[str, dict[str, Any]] +): + expiry_delta = timedelta(hours=24) + expiry = datetime.now(timezone.utc) + expiry_delta + claims = { + "email": "admin@test.com", + "org_id": 1, + "exp": expiry, + "type": "org_invite", + "group_id": 3, + "group_name": "Org One Group Two", + } + + token = await generate_jwt(claims) + + method = "PUT" + path = "/iam/group/user/invitation/accept" + auth_level = "User" + query = "" + body = {"jwt": token} + expected_data = { + "organisation": {"name": "Org One", "id": 1}, + "user": {"email": "admin@test.com", "id": 1}, + "group": {"details": {"id": 3, "name": "Org One Group Two"}, "permissions": []}, + } + + await standard_test( + default_client, method, path, auth_level, query, body, expected_data, route_data + )