From 511480dffea8acbe8eb8e3f28cafe3a4e0a55433 Mon Sep 17 00:00:00 2001 From: luxferre Date: Tue, 2 Jun 2026 14:44:30 +0100 Subject: [PATCH 1/7] fix: wrong org dependency in get perms --- src/iam/router.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/iam/router.py b/src/iam/router.py index 1cf904c..a0f123b 100644 --- a/src/iam/router.py +++ b/src/iam/router.py @@ -122,7 +122,7 @@ async def add_group_permission(db: db_dependency, group_model: group_model_body_ return response -@router.put("/group/user") +@router.put("/group/user", response_model=IAMPutGroupUserResponse) async def add_group_user(db: db_dependency, group_model: group_model_body_dependency, user_model: user_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPutGroupUserRequest): if group_model.org_id != org_model.id: raise UnauthorizedException() @@ -164,7 +164,7 @@ async def remove_group_user(db: db_dependency, group_model: group_model_body_dep @router.get("/permissions", response_model=IAMGetPermissionsResponse) -async def get_permissions(db: db_dependency, org_model: org_model_root_claim_body_dependency): +async def get_permissions(db: db_dependency, org_model: org_model_root_claim_query_dependency): permission_models = db.query(Perm).all() return {"permissions": permission_models} From 5d1606aa9d62461957982f7282f5f4403f0381df Mon Sep 17 00:00:00 2001 From: luxferre Date: Tue, 2 Jun 2026 15:13:00 +0100 Subject: [PATCH 2/7] fix: permission search changed to post Get requests cannot have bodies. --- src/iam/router.py | 16 ++++++++-------- src/iam/schemas.py | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/iam/router.py b/src/iam/router.py index a0f123b..caeb459 100644 --- a/src/iam/router.py +++ b/src/iam/router.py @@ -190,18 +190,18 @@ async def delete_permission(db: db_dependency, su: super_admin_dependency, perm_ db.commit() -@router.get("/permissions/search", response_model=IAMGetPermissionsSearchResponse) -async def get_permissions(db: db_dependency, org_model: org_model_root_claim_body_dependency, search: IAMGetPermissionsSearchRequest): +@router.post("/permissions/search", response_model=IAMGetPermissionsSearchResponse) +async def get_permissions(db: db_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMGetPermissionsSearchRequest): permission_query = db.query(Perm) - if search.service_id is not None: - permission_query = permission_query.filter(Perm.service_id == search.service_id) + if request_model.service_id is not None: + permission_query = permission_query.filter(Perm.service_id == request_model.service_id) - if search.resource is not None: - permission_query = permission_query.filter(Perm.resource == search.resource) + if request_model.resource is not None: + permission_query = permission_query.filter(Perm.resource == request_model.resource) - if search.action is not None: - permission_query = permission_query.filter(Perm.action == search. action) + if request_model.action is not None: + permission_query = permission_query.filter(Perm.action == request_model. action) permission_models = permission_query.all() diff --git a/src/iam/schemas.py b/src/iam/schemas.py index ff6cbfc..58e636d 100644 --- a/src/iam/schemas.py +++ b/src/iam/schemas.py @@ -17,7 +17,7 @@ from user.schemas import UserIDMixin class UserSchema(CustomBaseModel): model_config = ConfigDict(from_attributes=True, extra="ignore") - + id: int first_name: str last_name: str @@ -94,8 +94,8 @@ class IAMPostPermissionResponse(CustomBaseModel): class IAMDeletePermissionRequest(PermIDMixin): pass -class IAMGetPermissionsSearchRequest(CustomBaseModel): - service_id: Optional[int] = None +class IAMGetPermissionsSearchRequest(OrgIDMixin): + service_id: Optional[int] = Field(gt=0) resource: Optional[str] = None action: Optional[str] = None From ae0181c3ffa93483d875720645528561a198f12d Mon Sep 17 00:00:00 2001 From: luxferre Date: Tue, 2 Jun 2026 15:36:05 +0100 Subject: [PATCH 3/7] fix: create permission endpoint Verifies service exists before attaching permission. Response built manually because calculated properties are not handled by .__dict()__ Request model uses Service ID mixin. Service ID mixin verifies ID > 0 --- src/iam/router.py | 14 +++++++++----- src/iam/schemas.py | 4 ++-- src/service/schemas.py | 4 ++-- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/iam/router.py b/src/iam/router.py index caeb459..316c5fa 100644 --- a/src/iam/router.py +++ b/src/iam/router.py @@ -19,6 +19,7 @@ from fastapi import APIRouter, status from sqlalchemy.exc import IntegrityError from psycopg import errors +from service.exceptions import ServiceNotFoundException from src.exceptions import ConflictException from src.database import db_dependency from src.schemas import ResourceName @@ -170,18 +171,21 @@ async def get_permissions(db: db_dependency, org_model: org_model_root_claim_que return {"permissions": permission_models} -@router.post("/permission") -async def create_new_permission(db: db_dependency, su: super_admin_dependency, request_mode: IAMPostPermissionRequest): - perm_model = Perm(**request_mode.__dict__) +@router.post("/permission", response_model=IAMPostPermissionResponse) +async def create_new_permission(db: db_dependency, su: super_admin_dependency, request_model: IAMPostPermissionRequest): + service_model = db.get(Service, request_model.service_id) + if service_model is None: + raise ServiceNotFoundException(service_id=request_model.service_id) + perm_model = Perm(**request_model.__dict__) try: db.add(perm_model) except IntegrityError as e: if isinstance(e.orig, errors.UniqueViolation): raise ConflictException(message="Permission already exists") db.flush() - response = IAMPostPermissionResponse(permission=PermissionSchema(**perm_model.__dict__)) + response = {"service_name": perm_model.service_name, "resource": perm_model.resource, "action": perm_model.action} db.commit() - return response + return {"permission": response} @router.delete("/permission", status_code=status.HTTP_204_NO_CONTENT) diff --git a/src/iam/schemas.py b/src/iam/schemas.py index 58e636d..bd2f586 100644 --- a/src/iam/schemas.py +++ b/src/iam/schemas.py @@ -10,6 +10,7 @@ from typing import Optional from pydantic import EmailStr, ConfigDict, Field +from src.service.schemas import ServiceIDMixin from src.organisation.schemas import OrgIDMixin from src.schemas import CustomBaseModel from user.schemas import UserIDMixin @@ -83,8 +84,7 @@ class IAMDeleteGroupUserResponse(CustomBaseModel): class IAMGetPermissionsResponse(CustomBaseModel): permissions: list[PermissionSchema] -class IAMPostPermissionRequest(CustomBaseModel): - service_id: int +class IAMPostPermissionRequest(ServiceIDMixin): resource: str action: str diff --git a/src/service/schemas.py b/src/service/schemas.py index 2d89400..50e7c35 100644 --- a/src/service/schemas.py +++ b/src/service/schemas.py @@ -6,12 +6,12 @@ Models follow the nomenclature of: - Mixins: "Mixin" - Models: "" ie "ServiceGetServiceResponse" """ -from pydantic import ConfigDict +from pydantic import ConfigDict, Field from src.schemas import CustomBaseModel class ServiceIDMixin(CustomBaseModel): - service_id: int + service_id: int = Field(gt=0) class ServiceSchema(CustomBaseModel): model_config = ConfigDict(from_attributes=True, extra="ignore") From b8b5b6dbd3abf7012ae5fcf2c1dd0c96f5e238bc Mon Sep 17 00:00:00 2001 From: luxferre Date: Tue, 2 Jun 2026 15:55:50 +0100 Subject: [PATCH 4/7] fix: permission search typing --- src/iam/schemas.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/iam/schemas.py b/src/iam/schemas.py index bd2f586..fa6adfc 100644 --- a/src/iam/schemas.py +++ b/src/iam/schemas.py @@ -6,7 +6,7 @@ Models follow the nomenclature of: - Mixins: "Mixin" - Models: "" ie "IAMGetGroupPermissionsResponse" """ -from typing import Optional +from typing import Optional, Annotated from pydantic import EmailStr, ConfigDict, Field @@ -95,7 +95,7 @@ class IAMDeletePermissionRequest(PermIDMixin): pass class IAMGetPermissionsSearchRequest(OrgIDMixin): - service_id: Optional[int] = Field(gt=0) + service_id: Annotated[int | None, Field(gt=0)] = None resource: Optional[str] = None action: Optional[str] = None From 6f174deefc9c9078b315349375c4cc2126dc944a Mon Sep 17 00:00:00 2001 From: luxferre Date: Tue, 2 Jun 2026 16:10:16 +0100 Subject: [PATCH 5/7] tests: db seeding change --- test/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/conftest.py b/test/conftest.py index e29e131..e3ea2fc 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -54,7 +54,7 @@ def _seed(db): status="approved", intake_questionnaire={"question_two": "answer two"})) db.add(Service(name="Test Service", api_key="123456789")) db.add(Permission(service_id=1, resource="test_resource", action="read")) - db.add(Group(name="Test Group")) + db.add(Group(name="Test Group", org_id=1)) db.flush() group_model = db.get(Group, 1) perm_model = db.get(Permission, 1) From 65a9514be6c9e0fcf3c151f54fc757c6e26e537b Mon Sep 17 00:00:00 2001 From: luxferre Date: Tue, 2 Jun 2026 16:11:52 +0100 Subject: [PATCH 6/7] fix: drop superfluous column in usergroups Group org is assigned in the Group table. Also assigning it in the UserGroups table complicated relationship creation and it was never used. --- .../2026-05-29_drop_user_groups_org_column.py | 34 +++++++++++++++++++ src/iam/models.py | 1 - 2 files changed, 34 insertions(+), 1 deletion(-) create mode 100644 .alembic/versions/2026-05-29_drop_user_groups_org_column.py diff --git a/.alembic/versions/2026-05-29_drop_user_groups_org_column.py b/.alembic/versions/2026-05-29_drop_user_groups_org_column.py new file mode 100644 index 0000000..b5c71c9 --- /dev/null +++ b/.alembic/versions/2026-05-29_drop_user_groups_org_column.py @@ -0,0 +1,34 @@ +"""Drop user_groups org column + +Revision ID: d9dc6986fe38 +Revises: 8132c4b88665 +Create Date: 2026-05-29 16:10:00.320982 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'd9dc6986fe38' +down_revision: Union[str, Sequence[str], None] = '8132c4b88665' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint(op.f('user_groups_org_id_fkey'), 'user_groups', type_='foreignkey') + op.drop_column('user_groups', 'org_id') + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('user_groups', sa.Column('org_id', sa.INTEGER(), autoincrement=False, nullable=False)) + op.create_foreign_key(op.f('user_groups_org_id_fkey'), 'user_groups', 'organisation', ['org_id'], ['id'], ondelete='CASCADE') + # ### end Alembic commands ### diff --git a/src/iam/models.py b/src/iam/models.py index 6ae2289..f542b70 100644 --- a/src/iam/models.py +++ b/src/iam/models.py @@ -78,6 +78,5 @@ class GroupPermissions(Base): class UserGroups(Base): __tablename__ = "user_groups" - org_id = Column(Integer, ForeignKey("organisation.id", ondelete="CASCADE"), primary_key=True) user_id = Column(Integer, ForeignKey("user.id", ondelete="CASCADE"), primary_key=True) group_id = Column(Integer, ForeignKey("group.id", ondelete="CASCADE"), primary_key=True) From 806bbfcbfc7d5df581b22db6d9f3ff97cbe6ad4e Mon Sep 17 00:00:00 2001 From: luxferre Date: Tue, 2 Jun 2026 16:12:17 +0100 Subject: [PATCH 7/7] tests: iam router --- test/test_iam.py | 414 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 414 insertions(+) create mode 100644 test/test_iam.py diff --git a/test/test_iam.py b/test/test_iam.py new file mode 100644 index 0000000..578eade --- /dev/null +++ b/test/test_iam.py @@ -0,0 +1,414 @@ +""" +Act on resource tests only check for pass/fail on input validation. Logic is not tested. +""" +import pytest +from httpx import AsyncClient + +from src.user.models import User +from .conftest import client, db_session + +from src.iam.models import Group + + +@pytest.mark.anyio +async def test_post_act_on_resource_endpoint_success(client: AsyncClient): + body = { + "service": "Test Service", + "organisation": "Test Org", + "resource": "test_resource" + } + headers = { + "Authorization": "Bearer not_checked_when_auth_is_disabled", + "X-API-Key": "123456789" + } + resp = await client.post("/iam/can_act_on_resource?action=read", json=body, headers=headers) + data = resp.json() + + assert resp.status_code == 200 + assert data == True + + +@pytest.mark.anyio +@pytest.mark.parametrize( + "service, org, resource, action, expected_status", + [ + (None, "Test Org", "test_resource", "read", 422), + (42, "Test Org", "test_resource", "read", 422), + ("Test Service", None, "test_resource", "read", 422), + ("Test Service", 42, "test_resource", "read", 422), + ("Test Service", "Test Org", None, "read", 422), + ("Test Service", "Test Org", 42, "read", 422), + ], +) +@pytest.mark.anyio +async def test_post_act_on_resource_endpoint_failure(client: AsyncClient, service, org, resource, action, + expected_status: int): + body = { + "service": service, + "organisation": org, + "resource": resource + } + headers = { + "Authorization": "Bearer not_checked_when_auth_is_disabled", + "X-API-Key": "123456789" + } + resp = await client.post(f"/iam/can_act_on_resource?action={action}", json=body, headers=headers) + + assert resp.status_code == expected_status + + +@pytest.mark.anyio +async def test_get_group_permissions_success(client: AsyncClient): + resp = await client.get("/iam/group/permissions?org_id=1&group_id=1") + data = resp.json() + + assert resp.status_code == 200 + assert "permissions" in data + assert type(data["permissions"]) == list + assert data["permissions"][0]["service_name"] == "Test Service" + assert data["permissions"][0]["resource"] == "test_resource" + assert data["permissions"][0]["action"] == "read" + + +@pytest.mark.parametrize( + "query, expected_status", + [ + ("org_id=2&group_id=1", 404), # Non-exists org, valid group + ("org_id=banana&group_id=1", 422), # Invalid org, valid group + ("org_id=&group_id=1", 422), # Blank org, valid group + ("org_id=-1&group_id=1", 422), # Negative org, valid group + ("group_id=1", 422), # Only group + ("", 422), # Blank query + ("org_id=&group_id=", 422), # Both blank + ("org_id=1&group_id=2", 404), # Valid org, non-exists group + ("org_id=1&group_id=banana", 422), # Valid org, invalid group + ("org_id=1&group_id=", 422), # Valid org, blank group + ("org_id=1&group_id=-1", 422), # Valid org, negative group + ("org_id=1", 422), # Only org + ], +) +@pytest.mark.anyio +async def test_get_group_permissions_failure(client: AsyncClient, query: str, expected_status: int): + resp = await client.get(f"/iam/group/permissions?{query}") + + assert resp.status_code == expected_status + + +@pytest.mark.anyio +async def test_get_group_users_success(client: AsyncClient): + resp = await client.get("/iam/group/users?org_id=1&group_id=1") + data = resp.json() + + assert resp.status_code == 200 + assert "users" in data + assert type(data["users"]) == list + assert data["users"][0]["id"] == 1 + assert data["users"][0]["first_name"] == "Admin" + assert data["users"][0]["last_name"] == "Test" + assert data["users"][0]["email"] == "admin@test.com" + + +@pytest.mark.parametrize( + "query, expected_status", + [ + ("org_id=2&group_id=1", 404), # Non-exists org, valid group + ("org_id=banana&group_id=1", 422), # Invalid org, valid group + ("org_id=&group_id=1", 422), # Blank org, valid group + ("org_id=-1&group_id=1", 422), # Negative org, valid group + ("group_id=1", 422), # Only group + ("", 422), # Blank query + ("org_id=&group_id=", 422), # Both blank + ("org_id=1&group_id=2", 404), # Valid org, non-exists group + ("org_id=1&group_id=banana", 422), # Valid org, invalid group + ("org_id=1&group_id=", 422), # Valid org, blank group + ("org_id=1&group_id=-1", 422), # Valid org, negative group + ("org_id=1", 422), # Only org + ], +) +@pytest.mark.anyio +async def test_get_group_users_failure(client: AsyncClient, query: str, expected_status: int): + resp = await client.get(f"/iam/group/users?{query}") + + assert resp.status_code == expected_status + + +@pytest.mark.anyio +async def test_post_group_success(client: AsyncClient): + resp = await client.post("/iam/group", json={"name": "New Group", "organisation_id": 1}) + data = resp.json() + + assert resp.status_code == 200 + assert "group" in data + assert type(data["group"]) == dict + assert data["group"]["name"] == "New Group" + assert data["group"]["id"] == 2 + + +@pytest.mark.parametrize( + "body, expected_status", + [ + ({"organisation_id": 2, "name": "new group"}, 404), # Non-existent organisation, valid name + ({"organisation_id": "banana", "name": "new group"}, 422), # Invalid organisation ID, valid name + ({"organisation_id": "", "name": "new group"}, 422), # Blank organisation ID, valid name + ({"organisation_id": -1, "name": "new group"}, 422), # Negative organisation ID, valid name + ({"name": 1}, 422), # Only name + ({}, 422), # Blank body + ({"organisation_id": "", "name": ""}, 422), # Both blank + ({"organisation_id": 1, "name": 42}, 422), # Valid organisation, invalid name + ({"organisation_id": 1, "name": ""}, 422), # Valid organisation, blank name + ({"organisation_id": 1, "name": "hi"}, 422), # Valid organisation, small name + ({"organisation_id": 1}, 422), # Only organisation ID + ], +) +@pytest.mark.anyio +async def test_post_group_failure(client: AsyncClient, body: dict[str, str], expected_status: int): + resp = await client.post("/iam/group", json=body) + + assert resp.status_code == expected_status + + +@pytest.mark.anyio +async def test_put_group_perm_success(client: AsyncClient, db_session): + db_session.add(Group(name="Test Group Two", org_id=1)) + db_session.flush() + resp = await client.put("/iam/group/permission", json={"permission_id": 1, "group_id": 2, "organisation_id": 1}) + data = resp.json() + + assert resp.status_code == 200 + assert "group" in data + assert type(data["group"]) == dict + assert data["group"]["name"] == "Test Group Two" + assert data["group"]["id"] == 2 + + assert "permissions" in data + assert type(data["permissions"]) == list + assert data["permissions"][0]["service_name"] == "Test Service" + assert data["permissions"][0]["resource"] == "test_resource" + assert data["permissions"][0]["action"] == "read" + + +@pytest.mark.parametrize( + "body, expected_status", + [ + ({"organisation_id": 42, "group_id": 1, "permission_id": 1}, 404), # Non-existent organisation + ({"organisation_id": "banana", "group_id": 1, "permission_id": 1}, 422), # Invalid organisation ID + ({"organisation_id": "", "group_id": 1, "permission_id": 1}, 422), # Blank organisation ID + ({"organisation_id": -1, "group_id": 1, "permission_id": 1}, 422), # Negative organisation ID + + ({"organisation_id": 1, "group_id": 42, "permission_id": 1}, 404), # Non-existent group + ({"organisation_id": 1, "group_id": "banana", "permission_id": 1}, 422), # Invalid group ID + ({"organisation_id": 1, "group_id": "", "permission_id": 1}, 422), # Blank group ID + ({"organisation_id": 1, "group_id": -1, "permission_id": 1}, 422), # Negative group ID + + ({"organisation_id": 1, "group_id": 1, "permission_id": 42}, 404), # Non-existent permission + ({"organisation_id": 1, "group_id": 1, "permission_id": "banana"}, 422), # Invalid permission ID + ({"organisation_id": 1, "group_id": 1, "permission_id": ""}, 422), # Blank permission ID + ({"organisation_id": 1, "group_id": 1, "permission_id": -1}, 422), # Negative permission ID + + ({}, 422), # Blank body + ({"permission_id": 1}, 422), # Only permission + ({"organisation_id": 1}, 422), # Only organisation + ({"group_id": 1}, 422), # Only group + + ({"organisation_id": 1, "permission_id": 1}, 422), # Missing group + ({"group_id": 1, "permission_id": 1}, 422), # Missing organisation + ({"organisation_id": 1, "group_id": 1}, 422), # Missing permission + + ], +) +@pytest.mark.anyio +async def test_put_group_perm_failure(client: AsyncClient, body: dict[str, str], expected_status: int): + resp = await client.put("/iam/group/permission", json=body) + + assert resp.status_code == expected_status + + +@pytest.mark.anyio +async def test_put_group_user_success(client: AsyncClient, db_session): + db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234")) + db_session.flush() + + resp = await client.put("/iam/group/user", json={"user_id": 2, "group_id": 1, "organisation_id": 1}) + data = resp.json() + + assert resp.status_code == 200 + assert "group" in data + assert type(data["group"]) == dict + assert data["group"]["name"] == "Test Group" + assert data["group"]["id"] == 1 + + assert "users" in data + assert type(data["users"]) == list + assert data["users"][1]["id"] == 2 + assert data["users"][1]["first_name"] == "User" + assert data["users"][1]["last_name"] == "Test" + assert data["users"][1]["email"] == "user@test.org" + + +@pytest.mark.parametrize( + "body, expected_status", + [ + ({"organisation_id": 42, "group_id": 1, "user_id": 1}, 404), # Non-existent organisation + ({"organisation_id": "banana", "group_id": 1, "user_id": 1}, 422), # Invalid organisation ID + ({"organisation_id": "", "group_id": 1, "user_id": 1}, 422), # Blank organisation ID + ({"organisation_id": -1, "group_id": 1, "user_id": 1}, 422), # Negative organisation ID + + ({"organisation_id": 1, "group_id": 42, "user_id": 1}, 404), # Non-existent group + ({"organisation_id": 1, "group_id": "banana", "user_id": 1}, 422), # Invalid group ID + ({"organisation_id": 1, "group_id": "", "user_id": 1}, 422), # Blank group ID + ({"organisation_id": 1, "group_id": -1, "user_id": 1}, 422), # Negative group ID + + ({"organisation_id": 1, "group_id": 1, "user_id": 42}, 404), # Non-existent user + ({"organisation_id": 1, "group_id": 1, "user_id": "banana"}, 422), # Invalid user ID + ({"organisation_id": 1, "group_id": 1, "user_id": ""}, 422), # Blank user ID + ({"organisation_id": 1, "group_id": 1, "user_id": -1}, 422), # Negative user ID + + ({}, 422), # Blank body + ({"user_id": 1}, 422), # Only user + ({"organisation_id": 1}, 422), # Only organisation + ({"group_id": 1}, 422), # Only group + + ({"organisation_id": 1, "user_id": 1}, 422), # Missing group + ({"group_id": 1, "user_id": 1}, 422), # Missing organisation + ({"organisation_id": 1, "group_id": 1}, 422), # Missing user + + ], +) +@pytest.mark.anyio +async def test_put_group_user_failure(client: AsyncClient, body: dict[str, str], expected_status: int): + resp = await client.put("/iam/group/user", json=body) + + assert resp.status_code == expected_status + + +@pytest.mark.anyio +async def test_get_permissions_success(client: AsyncClient): + resp = await client.get("/iam/permissions?org_id=1") + data = resp.json() + + assert resp.status_code == 200 + assert "permissions" in data + assert type(data["permissions"]) == list + assert data["permissions"][0]["service_name"] == "Test Service" + assert data["permissions"][0]["resource"] == "test_resource" + assert data["permissions"][0]["action"] == "read" + + +@pytest.mark.parametrize( + "query, expected_status", + [ + ("org_id=42", 404), # Non-exists org + ("org_id=banana", 422), # Invalid org + ("org_id=", 422), # Blank org + ("org_id=-1", 422), # Negative org + ("", 422), # Blank query + ], +) +@pytest.mark.anyio +async def test_get_permissions_failure(client: AsyncClient, query: str, expected_status: int): + resp = await client.get(f"/iam/permissions?{query}") + + assert resp.status_code == expected_status + + +@pytest.mark.anyio +async def test_post_perm_success(client: AsyncClient, db_session): + resp = await client.post("/iam/permission", json={"service_id": 1, "resource": "test_resource", "action": "create"}) + data = resp.json() + + assert resp.status_code == 200 + assert "permission" in data + assert data["permission"]["service_name"] == "Test Service" + assert data["permission"]["resource"] == "test_resource" + assert data["permission"]["action"] == "create" + + +@pytest.mark.parametrize( + "body, expected_status", + [ + # service_id tests + ({"service_id": 42, "resource": "test_resource", "action": "read"}, 404), # Non-existent service + ({"service_id": "banana", "resource": "test_resource", "action": "read"}, 422), # Invalid service ID + ({"service_id": "", "resource": "test_resource", "action": "read"}, 422), # Blank service ID + ({"service_id": -1, "resource": "test_resource", "action": "read"}, 422), # Negative service ID + + # resource tests + ({"service_id": 1, "resource": 42, "action": "read"}, 422), # Invalid resource type + + # action tests + ({"service_id": 1, "resource": "test_resource", "action": 42}, 422), # Invalid action type + + # missing/partial body tests + ({}, 422), # Blank body + ({"resource": "test_resource"}, 422), # Only resource + ({"action": "read"}, 422), # Only action + ({"service_id": 1}, 422), # Only service + + ({"service_id": 1, "action": "read"}, 422), # Missing resource + ({"service_id": 1, "resource": "test_resource"}, 422), # Missing action + ({"resource": "test_resource", "action": "read"}, 422), # Missing service + + ], +) +@pytest.mark.anyio +async def test_post_perm_failure(client: AsyncClient, body: dict[str, str], expected_status: int): + resp = await client.post("/iam/permission", json=body) + + assert resp.status_code == expected_status + + +@pytest.mark.parametrize( + "body", + [ + {"organisation_id": 1, "service_id": 1, "resource": "test_resource", "action": "read"}, + + {"organisation_id": 1, "service_id": 1}, + {"organisation_id": 1, "resource": "test_resource"}, + {"organisation_id": 1, "action": "read"}, + {"organisation_id": 1, "service_id": 1, "action": "read"}, + {"organisation_id": 1, "service_id": 1, "resource": "test_resource"}, + {"organisation_id": 1, "resource": "test_resource", "action": "read"}, + ], +) +@pytest.mark.anyio +async def test_post_perm_search_success(client: AsyncClient, db_session, body): + resp = await client.post("/iam/permissions/search", json=body) + data = resp.json() + + assert resp.status_code == 200 + assert "permissions" in data + assert type(data["permissions"]) == list + assert data["permissions"][0]["service_name"] == "Test Service" + assert data["permissions"][0]["resource"] == "test_resource" + assert data["permissions"][0]["action"] == "read" + + +@pytest.mark.parametrize( + "body, expected_status", + [ + # organisation_id tests + ({"organisation_id": 42, "service_id": 1, "resource": "test_resource", "action": "read"}, 404), # Non-existent organisation + ({"organisation_id": "banana", "service_id": 1, "resource": "test_resource", "action": "read"}, 422), # Invalid organisation ID + ({"organisation_id": "", "service_id": 1, "resource": "test_resource", "action": "read"}, 422), # Blank organisation ID + ({"organisation_id": -1, "service_id": 1, "resource": "test_resource", "action": "read"}, 422), # Negative organisation ID + + # service_id tests + ({"organisation_id": 1, "service_id": "banana", "resource": "test_resource", "action": "read"}, 422), # Invalid service ID + ({"organisation_id": 1, "service_id": "", "resource": "test_resource", "action": "read"}, 422), # Blank service ID + ({"organisation_id": 1, "service_id": -1, "resource": "test_resource", "action": "read"}, 422), # Negative service ID + + # resource tests + ({"organisation_id": 1, "service_id": 1, "resource": 42, "action": "read"}, 422), # Invalid resource type + + # action tests + ({"organisation_id": 1, "service_id": 1, "resource": "test_resource", "action": 42}, 422), # Invalid action type + + # missing/partial body tests + ({}, 422), # Blank body + ], +) +@pytest.mark.anyio +async def test_post_perm_search_failure(client: AsyncClient, body: dict[str, str], expected_status: int): + resp = await client.post("/iam/permissions/search", json=body) + + assert resp.status_code == expected_status