Compare commits

...

7 commits

Author SHA1 Message Date
806bbfcbfc tests: iam router 2026-06-02 16:12:17 +01:00
65a9514be6 fix: drop superfluous column in usergroups
Group org is assigned in the Group table. Also assigning it in the UserGroups table complicated relationship creation and it was never used.
2026-06-02 16:11:52 +01:00
6f174deefc tests: db seeding change 2026-06-02 16:10:16 +01:00
b8b5b6dbd3 fix: permission search typing 2026-06-02 15:55:50 +01:00
ae0181c3ff fix: create permission endpoint
Verifies service exists before attaching permission.

Response built manually because calculated properties are not handled by .__dict()__

Request model uses Service ID mixin.

Service ID mixin verifies ID > 0
2026-06-02 15:36:05 +01:00
5d1606aa9d fix: permission search changed to post
Get requests cannot have bodies.
2026-06-02 15:13:00 +01:00
511480dffe fix: wrong org dependency in get perms 2026-06-02 14:44:30 +01:00
7 changed files with 476 additions and 25 deletions

View file

@ -0,0 +1,34 @@
"""Drop user_groups org column
Revision ID: d9dc6986fe38
Revises: 8132c4b88665
Create Date: 2026-05-29 16:10:00.320982
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'd9dc6986fe38'
down_revision: Union[str, Sequence[str], None] = '8132c4b88665'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(op.f('user_groups_org_id_fkey'), 'user_groups', type_='foreignkey')
op.drop_column('user_groups', 'org_id')
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('user_groups', sa.Column('org_id', sa.INTEGER(), autoincrement=False, nullable=False))
op.create_foreign_key(op.f('user_groups_org_id_fkey'), 'user_groups', 'organisation', ['org_id'], ['id'], ondelete='CASCADE')
# ### end Alembic commands ###

View file

@ -78,6 +78,5 @@ class GroupPermissions(Base):
class UserGroups(Base):
__tablename__ = "user_groups"
org_id = Column(Integer, ForeignKey("organisation.id", ondelete="CASCADE"), primary_key=True)
user_id = Column(Integer, ForeignKey("user.id", ondelete="CASCADE"), primary_key=True)
group_id = Column(Integer, ForeignKey("group.id", ondelete="CASCADE"), primary_key=True)

View file

@ -19,6 +19,7 @@ from fastapi import APIRouter, status
from sqlalchemy.exc import IntegrityError
from psycopg import errors
from service.exceptions import ServiceNotFoundException
from src.exceptions import ConflictException
from src.database import db_dependency
from src.schemas import ResourceName
@ -122,7 +123,7 @@ async def add_group_permission(db: db_dependency, group_model: group_model_body_
return response
@router.put("/group/user")
@router.put("/group/user", response_model=IAMPutGroupUserResponse)
async def add_group_user(db: db_dependency, group_model: group_model_body_dependency, user_model: user_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPutGroupUserRequest):
if group_model.org_id != org_model.id:
raise UnauthorizedException()
@ -164,24 +165,27 @@ async def remove_group_user(db: db_dependency, group_model: group_model_body_dep
@router.get("/permissions", response_model=IAMGetPermissionsResponse)
async def get_permissions(db: db_dependency, org_model: org_model_root_claim_body_dependency):
async def get_permissions(db: db_dependency, org_model: org_model_root_claim_query_dependency):
permission_models = db.query(Perm).all()
return {"permissions": permission_models}
@router.post("/permission")
async def create_new_permission(db: db_dependency, su: super_admin_dependency, request_mode: IAMPostPermissionRequest):
perm_model = Perm(**request_mode.__dict__)
@router.post("/permission", response_model=IAMPostPermissionResponse)
async def create_new_permission(db: db_dependency, su: super_admin_dependency, request_model: IAMPostPermissionRequest):
service_model = db.get(Service, request_model.service_id)
if service_model is None:
raise ServiceNotFoundException(service_id=request_model.service_id)
perm_model = Perm(**request_model.__dict__)
try:
db.add(perm_model)
except IntegrityError as e:
if isinstance(e.orig, errors.UniqueViolation):
raise ConflictException(message="Permission already exists")
db.flush()
response = IAMPostPermissionResponse(permission=PermissionSchema(**perm_model.__dict__))
response = {"service_name": perm_model.service_name, "resource": perm_model.resource, "action": perm_model.action}
db.commit()
return response
return {"permission": response}
@router.delete("/permission", status_code=status.HTTP_204_NO_CONTENT)
@ -190,18 +194,18 @@ async def delete_permission(db: db_dependency, su: super_admin_dependency, perm_
db.commit()
@router.get("/permissions/search", response_model=IAMGetPermissionsSearchResponse)
async def get_permissions(db: db_dependency, org_model: org_model_root_claim_body_dependency, search: IAMGetPermissionsSearchRequest):
@router.post("/permissions/search", response_model=IAMGetPermissionsSearchResponse)
async def get_permissions(db: db_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMGetPermissionsSearchRequest):
permission_query = db.query(Perm)
if search.service_id is not None:
permission_query = permission_query.filter(Perm.service_id == search.service_id)
if request_model.service_id is not None:
permission_query = permission_query.filter(Perm.service_id == request_model.service_id)
if search.resource is not None:
permission_query = permission_query.filter(Perm.resource == search.resource)
if request_model.resource is not None:
permission_query = permission_query.filter(Perm.resource == request_model.resource)
if search.action is not None:
permission_query = permission_query.filter(Perm.action == search. action)
if request_model.action is not None:
permission_query = permission_query.filter(Perm.action == request_model. action)
permission_models = permission_query.all()

View file

@ -6,10 +6,11 @@ Models follow the nomenclature of:
- Mixins: "<Attribute>Mixin"
- Models: "<Module><Method><Resource><Opt:Resource><Direction>" ie "IAMGetGroupPermissionsResponse"
"""
from typing import Optional
from typing import Optional, Annotated
from pydantic import EmailStr, ConfigDict, Field
from src.service.schemas import ServiceIDMixin
from src.organisation.schemas import OrgIDMixin
from src.schemas import CustomBaseModel
from user.schemas import UserIDMixin
@ -17,7 +18,7 @@ from user.schemas import UserIDMixin
class UserSchema(CustomBaseModel):
model_config = ConfigDict(from_attributes=True, extra="ignore")
id: int
first_name: str
last_name: str
@ -83,8 +84,7 @@ class IAMDeleteGroupUserResponse(CustomBaseModel):
class IAMGetPermissionsResponse(CustomBaseModel):
permissions: list[PermissionSchema]
class IAMPostPermissionRequest(CustomBaseModel):
service_id: int
class IAMPostPermissionRequest(ServiceIDMixin):
resource: str
action: str
@ -94,8 +94,8 @@ class IAMPostPermissionResponse(CustomBaseModel):
class IAMDeletePermissionRequest(PermIDMixin):
pass
class IAMGetPermissionsSearchRequest(CustomBaseModel):
service_id: Optional[int] = None
class IAMGetPermissionsSearchRequest(OrgIDMixin):
service_id: Annotated[int | None, Field(gt=0)] = None
resource: Optional[str] = None
action: Optional[str] = None

View file

@ -6,12 +6,12 @@ Models follow the nomenclature of:
- Mixins: "<Attribute>Mixin"
- Models: "<Module><Method><Resource><Opt:Resource><Direction>" ie "ServiceGetServiceResponse"
"""
from pydantic import ConfigDict
from pydantic import ConfigDict, Field
from src.schemas import CustomBaseModel
class ServiceIDMixin(CustomBaseModel):
service_id: int
service_id: int = Field(gt=0)
class ServiceSchema(CustomBaseModel):
model_config = ConfigDict(from_attributes=True, extra="ignore")

View file

@ -54,7 +54,7 @@ def _seed(db):
status="approved", intake_questionnaire={"question_two": "answer two"}))
db.add(Service(name="Test Service", api_key="123456789"))
db.add(Permission(service_id=1, resource="test_resource", action="read"))
db.add(Group(name="Test Group"))
db.add(Group(name="Test Group", org_id=1))
db.flush()
group_model = db.get(Group, 1)
perm_model = db.get(Permission, 1)

414
test/test_iam.py Normal file
View file

@ -0,0 +1,414 @@
"""
Act on resource tests only check for pass/fail on input validation. Logic is not tested.
"""
import pytest
from httpx import AsyncClient
from src.user.models import User
from .conftest import client, db_session
from src.iam.models import Group
@pytest.mark.anyio
async def test_post_act_on_resource_endpoint_success(client: AsyncClient):
body = {
"service": "Test Service",
"organisation": "Test Org",
"resource": "test_resource"
}
headers = {
"Authorization": "Bearer not_checked_when_auth_is_disabled",
"X-API-Key": "123456789"
}
resp = await client.post("/iam/can_act_on_resource?action=read", json=body, headers=headers)
data = resp.json()
assert resp.status_code == 200
assert data == True
@pytest.mark.anyio
@pytest.mark.parametrize(
"service, org, resource, action, expected_status",
[
(None, "Test Org", "test_resource", "read", 422),
(42, "Test Org", "test_resource", "read", 422),
("Test Service", None, "test_resource", "read", 422),
("Test Service", 42, "test_resource", "read", 422),
("Test Service", "Test Org", None, "read", 422),
("Test Service", "Test Org", 42, "read", 422),
],
)
@pytest.mark.anyio
async def test_post_act_on_resource_endpoint_failure(client: AsyncClient, service, org, resource, action,
expected_status: int):
body = {
"service": service,
"organisation": org,
"resource": resource
}
headers = {
"Authorization": "Bearer not_checked_when_auth_is_disabled",
"X-API-Key": "123456789"
}
resp = await client.post(f"/iam/can_act_on_resource?action={action}", json=body, headers=headers)
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_get_group_permissions_success(client: AsyncClient):
resp = await client.get("/iam/group/permissions?org_id=1&group_id=1")
data = resp.json()
assert resp.status_code == 200
assert "permissions" in data
assert type(data["permissions"]) == list
assert data["permissions"][0]["service_name"] == "Test Service"
assert data["permissions"][0]["resource"] == "test_resource"
assert data["permissions"][0]["action"] == "read"
@pytest.mark.parametrize(
"query, expected_status",
[
("org_id=2&group_id=1", 404), # Non-exists org, valid group
("org_id=banana&group_id=1", 422), # Invalid org, valid group
("org_id=&group_id=1", 422), # Blank org, valid group
("org_id=-1&group_id=1", 422), # Negative org, valid group
("group_id=1", 422), # Only group
("", 422), # Blank query
("org_id=&group_id=", 422), # Both blank
("org_id=1&group_id=2", 404), # Valid org, non-exists group
("org_id=1&group_id=banana", 422), # Valid org, invalid group
("org_id=1&group_id=", 422), # Valid org, blank group
("org_id=1&group_id=-1", 422), # Valid org, negative group
("org_id=1", 422), # Only org
],
)
@pytest.mark.anyio
async def test_get_group_permissions_failure(client: AsyncClient, query: str, expected_status: int):
resp = await client.get(f"/iam/group/permissions?{query}")
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_get_group_users_success(client: AsyncClient):
resp = await client.get("/iam/group/users?org_id=1&group_id=1")
data = resp.json()
assert resp.status_code == 200
assert "users" in data
assert type(data["users"]) == list
assert data["users"][0]["id"] == 1
assert data["users"][0]["first_name"] == "Admin"
assert data["users"][0]["last_name"] == "Test"
assert data["users"][0]["email"] == "admin@test.com"
@pytest.mark.parametrize(
"query, expected_status",
[
("org_id=2&group_id=1", 404), # Non-exists org, valid group
("org_id=banana&group_id=1", 422), # Invalid org, valid group
("org_id=&group_id=1", 422), # Blank org, valid group
("org_id=-1&group_id=1", 422), # Negative org, valid group
("group_id=1", 422), # Only group
("", 422), # Blank query
("org_id=&group_id=", 422), # Both blank
("org_id=1&group_id=2", 404), # Valid org, non-exists group
("org_id=1&group_id=banana", 422), # Valid org, invalid group
("org_id=1&group_id=", 422), # Valid org, blank group
("org_id=1&group_id=-1", 422), # Valid org, negative group
("org_id=1", 422), # Only org
],
)
@pytest.mark.anyio
async def test_get_group_users_failure(client: AsyncClient, query: str, expected_status: int):
resp = await client.get(f"/iam/group/users?{query}")
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_post_group_success(client: AsyncClient):
resp = await client.post("/iam/group", json={"name": "New Group", "organisation_id": 1})
data = resp.json()
assert resp.status_code == 200
assert "group" in data
assert type(data["group"]) == dict
assert data["group"]["name"] == "New Group"
assert data["group"]["id"] == 2
@pytest.mark.parametrize(
"body, expected_status",
[
({"organisation_id": 2, "name": "new group"}, 404), # Non-existent organisation, valid name
({"organisation_id": "banana", "name": "new group"}, 422), # Invalid organisation ID, valid name
({"organisation_id": "", "name": "new group"}, 422), # Blank organisation ID, valid name
({"organisation_id": -1, "name": "new group"}, 422), # Negative organisation ID, valid name
({"name": 1}, 422), # Only name
({}, 422), # Blank body
({"organisation_id": "", "name": ""}, 422), # Both blank
({"organisation_id": 1, "name": 42}, 422), # Valid organisation, invalid name
({"organisation_id": 1, "name": ""}, 422), # Valid organisation, blank name
({"organisation_id": 1, "name": "hi"}, 422), # Valid organisation, small name
({"organisation_id": 1}, 422), # Only organisation ID
],
)
@pytest.mark.anyio
async def test_post_group_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
resp = await client.post("/iam/group", json=body)
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_put_group_perm_success(client: AsyncClient, db_session):
db_session.add(Group(name="Test Group Two", org_id=1))
db_session.flush()
resp = await client.put("/iam/group/permission", json={"permission_id": 1, "group_id": 2, "organisation_id": 1})
data = resp.json()
assert resp.status_code == 200
assert "group" in data
assert type(data["group"]) == dict
assert data["group"]["name"] == "Test Group Two"
assert data["group"]["id"] == 2
assert "permissions" in data
assert type(data["permissions"]) == list
assert data["permissions"][0]["service_name"] == "Test Service"
assert data["permissions"][0]["resource"] == "test_resource"
assert data["permissions"][0]["action"] == "read"
@pytest.mark.parametrize(
"body, expected_status",
[
({"organisation_id": 42, "group_id": 1, "permission_id": 1}, 404), # Non-existent organisation
({"organisation_id": "banana", "group_id": 1, "permission_id": 1}, 422), # Invalid organisation ID
({"organisation_id": "", "group_id": 1, "permission_id": 1}, 422), # Blank organisation ID
({"organisation_id": -1, "group_id": 1, "permission_id": 1}, 422), # Negative organisation ID
({"organisation_id": 1, "group_id": 42, "permission_id": 1}, 404), # Non-existent group
({"organisation_id": 1, "group_id": "banana", "permission_id": 1}, 422), # Invalid group ID
({"organisation_id": 1, "group_id": "", "permission_id": 1}, 422), # Blank group ID
({"organisation_id": 1, "group_id": -1, "permission_id": 1}, 422), # Negative group ID
({"organisation_id": 1, "group_id": 1, "permission_id": 42}, 404), # Non-existent permission
({"organisation_id": 1, "group_id": 1, "permission_id": "banana"}, 422), # Invalid permission ID
({"organisation_id": 1, "group_id": 1, "permission_id": ""}, 422), # Blank permission ID
({"organisation_id": 1, "group_id": 1, "permission_id": -1}, 422), # Negative permission ID
({}, 422), # Blank body
({"permission_id": 1}, 422), # Only permission
({"organisation_id": 1}, 422), # Only organisation
({"group_id": 1}, 422), # Only group
({"organisation_id": 1, "permission_id": 1}, 422), # Missing group
({"group_id": 1, "permission_id": 1}, 422), # Missing organisation
({"organisation_id": 1, "group_id": 1}, 422), # Missing permission
],
)
@pytest.mark.anyio
async def test_put_group_perm_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
resp = await client.put("/iam/group/permission", json=body)
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_put_group_user_success(client: AsyncClient, db_session):
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
db_session.flush()
resp = await client.put("/iam/group/user", json={"user_id": 2, "group_id": 1, "organisation_id": 1})
data = resp.json()
assert resp.status_code == 200
assert "group" in data
assert type(data["group"]) == dict
assert data["group"]["name"] == "Test Group"
assert data["group"]["id"] == 1
assert "users" in data
assert type(data["users"]) == list
assert data["users"][1]["id"] == 2
assert data["users"][1]["first_name"] == "User"
assert data["users"][1]["last_name"] == "Test"
assert data["users"][1]["email"] == "user@test.org"
@pytest.mark.parametrize(
"body, expected_status",
[
({"organisation_id": 42, "group_id": 1, "user_id": 1}, 404), # Non-existent organisation
({"organisation_id": "banana", "group_id": 1, "user_id": 1}, 422), # Invalid organisation ID
({"organisation_id": "", "group_id": 1, "user_id": 1}, 422), # Blank organisation ID
({"organisation_id": -1, "group_id": 1, "user_id": 1}, 422), # Negative organisation ID
({"organisation_id": 1, "group_id": 42, "user_id": 1}, 404), # Non-existent group
({"organisation_id": 1, "group_id": "banana", "user_id": 1}, 422), # Invalid group ID
({"organisation_id": 1, "group_id": "", "user_id": 1}, 422), # Blank group ID
({"organisation_id": 1, "group_id": -1, "user_id": 1}, 422), # Negative group ID
({"organisation_id": 1, "group_id": 1, "user_id": 42}, 404), # Non-existent user
({"organisation_id": 1, "group_id": 1, "user_id": "banana"}, 422), # Invalid user ID
({"organisation_id": 1, "group_id": 1, "user_id": ""}, 422), # Blank user ID
({"organisation_id": 1, "group_id": 1, "user_id": -1}, 422), # Negative user ID
({}, 422), # Blank body
({"user_id": 1}, 422), # Only user
({"organisation_id": 1}, 422), # Only organisation
({"group_id": 1}, 422), # Only group
({"organisation_id": 1, "user_id": 1}, 422), # Missing group
({"group_id": 1, "user_id": 1}, 422), # Missing organisation
({"organisation_id": 1, "group_id": 1}, 422), # Missing user
],
)
@pytest.mark.anyio
async def test_put_group_user_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
resp = await client.put("/iam/group/user", json=body)
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_get_permissions_success(client: AsyncClient):
resp = await client.get("/iam/permissions?org_id=1")
data = resp.json()
assert resp.status_code == 200
assert "permissions" in data
assert type(data["permissions"]) == list
assert data["permissions"][0]["service_name"] == "Test Service"
assert data["permissions"][0]["resource"] == "test_resource"
assert data["permissions"][0]["action"] == "read"
@pytest.mark.parametrize(
"query, expected_status",
[
("org_id=42", 404), # Non-exists org
("org_id=banana", 422), # Invalid org
("org_id=", 422), # Blank org
("org_id=-1", 422), # Negative org
("", 422), # Blank query
],
)
@pytest.mark.anyio
async def test_get_permissions_failure(client: AsyncClient, query: str, expected_status: int):
resp = await client.get(f"/iam/permissions?{query}")
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_post_perm_success(client: AsyncClient, db_session):
resp = await client.post("/iam/permission", json={"service_id": 1, "resource": "test_resource", "action": "create"})
data = resp.json()
assert resp.status_code == 200
assert "permission" in data
assert data["permission"]["service_name"] == "Test Service"
assert data["permission"]["resource"] == "test_resource"
assert data["permission"]["action"] == "create"
@pytest.mark.parametrize(
"body, expected_status",
[
# service_id tests
({"service_id": 42, "resource": "test_resource", "action": "read"}, 404), # Non-existent service
({"service_id": "banana", "resource": "test_resource", "action": "read"}, 422), # Invalid service ID
({"service_id": "", "resource": "test_resource", "action": "read"}, 422), # Blank service ID
({"service_id": -1, "resource": "test_resource", "action": "read"}, 422), # Negative service ID
# resource tests
({"service_id": 1, "resource": 42, "action": "read"}, 422), # Invalid resource type
# action tests
({"service_id": 1, "resource": "test_resource", "action": 42}, 422), # Invalid action type
# missing/partial body tests
({}, 422), # Blank body
({"resource": "test_resource"}, 422), # Only resource
({"action": "read"}, 422), # Only action
({"service_id": 1}, 422), # Only service
({"service_id": 1, "action": "read"}, 422), # Missing resource
({"service_id": 1, "resource": "test_resource"}, 422), # Missing action
({"resource": "test_resource", "action": "read"}, 422), # Missing service
],
)
@pytest.mark.anyio
async def test_post_perm_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
resp = await client.post("/iam/permission", json=body)
assert resp.status_code == expected_status
@pytest.mark.parametrize(
"body",
[
{"organisation_id": 1, "service_id": 1, "resource": "test_resource", "action": "read"},
{"organisation_id": 1, "service_id": 1},
{"organisation_id": 1, "resource": "test_resource"},
{"organisation_id": 1, "action": "read"},
{"organisation_id": 1, "service_id": 1, "action": "read"},
{"organisation_id": 1, "service_id": 1, "resource": "test_resource"},
{"organisation_id": 1, "resource": "test_resource", "action": "read"},
],
)
@pytest.mark.anyio
async def test_post_perm_search_success(client: AsyncClient, db_session, body):
resp = await client.post("/iam/permissions/search", json=body)
data = resp.json()
assert resp.status_code == 200
assert "permissions" in data
assert type(data["permissions"]) == list
assert data["permissions"][0]["service_name"] == "Test Service"
assert data["permissions"][0]["resource"] == "test_resource"
assert data["permissions"][0]["action"] == "read"
@pytest.mark.parametrize(
"body, expected_status",
[
# organisation_id tests
({"organisation_id": 42, "service_id": 1, "resource": "test_resource", "action": "read"}, 404), # Non-existent organisation
({"organisation_id": "banana", "service_id": 1, "resource": "test_resource", "action": "read"}, 422), # Invalid organisation ID
({"organisation_id": "", "service_id": 1, "resource": "test_resource", "action": "read"}, 422), # Blank organisation ID
({"organisation_id": -1, "service_id": 1, "resource": "test_resource", "action": "read"}, 422), # Negative organisation ID
# service_id tests
({"organisation_id": 1, "service_id": "banana", "resource": "test_resource", "action": "read"}, 422), # Invalid service ID
({"organisation_id": 1, "service_id": "", "resource": "test_resource", "action": "read"}, 422), # Blank service ID
({"organisation_id": 1, "service_id": -1, "resource": "test_resource", "action": "read"}, 422), # Negative service ID
# resource tests
({"organisation_id": 1, "service_id": 1, "resource": 42, "action": "read"}, 422), # Invalid resource type
# action tests
({"organisation_id": 1, "service_id": 1, "resource": "test_resource", "action": 42}, 422), # Invalid action type
# missing/partial body tests
({}, 422), # Blank body
],
)
@pytest.mark.anyio
async def test_post_perm_search_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
resp = await client.post("/iam/permissions/search", json=body)
assert resp.status_code == expected_status