Compare commits

..

No commits in common. "806bbfcbfc7d5df581b22db6d9f3ff97cbe6ad4e" and "9403e9291fba2a75212d5155f11235e6e2f2c219" have entirely different histories.

7 changed files with 25 additions and 476 deletions

View file

@ -1,34 +0,0 @@
"""Drop user_groups org column
Revision ID: d9dc6986fe38
Revises: 8132c4b88665
Create Date: 2026-05-29 16:10:00.320982
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'd9dc6986fe38'
down_revision: Union[str, Sequence[str], None] = '8132c4b88665'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(op.f('user_groups_org_id_fkey'), 'user_groups', type_='foreignkey')
op.drop_column('user_groups', 'org_id')
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('user_groups', sa.Column('org_id', sa.INTEGER(), autoincrement=False, nullable=False))
op.create_foreign_key(op.f('user_groups_org_id_fkey'), 'user_groups', 'organisation', ['org_id'], ['id'], ondelete='CASCADE')
# ### end Alembic commands ###

View file

@ -78,5 +78,6 @@ class GroupPermissions(Base):
class UserGroups(Base):
__tablename__ = "user_groups"
org_id = Column(Integer, ForeignKey("organisation.id", ondelete="CASCADE"), primary_key=True)
user_id = Column(Integer, ForeignKey("user.id", ondelete="CASCADE"), primary_key=True)
group_id = Column(Integer, ForeignKey("group.id", ondelete="CASCADE"), primary_key=True)

View file

@ -19,7 +19,6 @@ from fastapi import APIRouter, status
from sqlalchemy.exc import IntegrityError
from psycopg import errors
from service.exceptions import ServiceNotFoundException
from src.exceptions import ConflictException
from src.database import db_dependency
from src.schemas import ResourceName
@ -123,7 +122,7 @@ async def add_group_permission(db: db_dependency, group_model: group_model_body_
return response
@router.put("/group/user", response_model=IAMPutGroupUserResponse)
@router.put("/group/user")
async def add_group_user(db: db_dependency, group_model: group_model_body_dependency, user_model: user_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPutGroupUserRequest):
if group_model.org_id != org_model.id:
raise UnauthorizedException()
@ -165,27 +164,24 @@ async def remove_group_user(db: db_dependency, group_model: group_model_body_dep
@router.get("/permissions", response_model=IAMGetPermissionsResponse)
async def get_permissions(db: db_dependency, org_model: org_model_root_claim_query_dependency):
async def get_permissions(db: db_dependency, org_model: org_model_root_claim_body_dependency):
permission_models = db.query(Perm).all()
return {"permissions": permission_models}
@router.post("/permission", response_model=IAMPostPermissionResponse)
async def create_new_permission(db: db_dependency, su: super_admin_dependency, request_model: IAMPostPermissionRequest):
service_model = db.get(Service, request_model.service_id)
if service_model is None:
raise ServiceNotFoundException(service_id=request_model.service_id)
perm_model = Perm(**request_model.__dict__)
@router.post("/permission")
async def create_new_permission(db: db_dependency, su: super_admin_dependency, request_mode: IAMPostPermissionRequest):
perm_model = Perm(**request_mode.__dict__)
try:
db.add(perm_model)
except IntegrityError as e:
if isinstance(e.orig, errors.UniqueViolation):
raise ConflictException(message="Permission already exists")
db.flush()
response = {"service_name": perm_model.service_name, "resource": perm_model.resource, "action": perm_model.action}
response = IAMPostPermissionResponse(permission=PermissionSchema(**perm_model.__dict__))
db.commit()
return {"permission": response}
return response
@router.delete("/permission", status_code=status.HTTP_204_NO_CONTENT)
@ -194,18 +190,18 @@ async def delete_permission(db: db_dependency, su: super_admin_dependency, perm_
db.commit()
@router.post("/permissions/search", response_model=IAMGetPermissionsSearchResponse)
async def get_permissions(db: db_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMGetPermissionsSearchRequest):
@router.get("/permissions/search", response_model=IAMGetPermissionsSearchResponse)
async def get_permissions(db: db_dependency, org_model: org_model_root_claim_body_dependency, search: IAMGetPermissionsSearchRequest):
permission_query = db.query(Perm)
if request_model.service_id is not None:
permission_query = permission_query.filter(Perm.service_id == request_model.service_id)
if search.service_id is not None:
permission_query = permission_query.filter(Perm.service_id == search.service_id)
if request_model.resource is not None:
permission_query = permission_query.filter(Perm.resource == request_model.resource)
if search.resource is not None:
permission_query = permission_query.filter(Perm.resource == search.resource)
if request_model.action is not None:
permission_query = permission_query.filter(Perm.action == request_model. action)
if search.action is not None:
permission_query = permission_query.filter(Perm.action == search. action)
permission_models = permission_query.all()

View file

@ -6,11 +6,10 @@ Models follow the nomenclature of:
- Mixins: "<Attribute>Mixin"
- Models: "<Module><Method><Resource><Opt:Resource><Direction>" ie "IAMGetGroupPermissionsResponse"
"""
from typing import Optional, Annotated
from typing import Optional
from pydantic import EmailStr, ConfigDict, Field
from src.service.schemas import ServiceIDMixin
from src.organisation.schemas import OrgIDMixin
from src.schemas import CustomBaseModel
from user.schemas import UserIDMixin
@ -84,7 +83,8 @@ class IAMDeleteGroupUserResponse(CustomBaseModel):
class IAMGetPermissionsResponse(CustomBaseModel):
permissions: list[PermissionSchema]
class IAMPostPermissionRequest(ServiceIDMixin):
class IAMPostPermissionRequest(CustomBaseModel):
service_id: int
resource: str
action: str
@ -94,8 +94,8 @@ class IAMPostPermissionResponse(CustomBaseModel):
class IAMDeletePermissionRequest(PermIDMixin):
pass
class IAMGetPermissionsSearchRequest(OrgIDMixin):
service_id: Annotated[int | None, Field(gt=0)] = None
class IAMGetPermissionsSearchRequest(CustomBaseModel):
service_id: Optional[int] = None
resource: Optional[str] = None
action: Optional[str] = None

View file

@ -6,12 +6,12 @@ Models follow the nomenclature of:
- Mixins: "<Attribute>Mixin"
- Models: "<Module><Method><Resource><Opt:Resource><Direction>" ie "ServiceGetServiceResponse"
"""
from pydantic import ConfigDict, Field
from pydantic import ConfigDict
from src.schemas import CustomBaseModel
class ServiceIDMixin(CustomBaseModel):
service_id: int = Field(gt=0)
service_id: int
class ServiceSchema(CustomBaseModel):
model_config = ConfigDict(from_attributes=True, extra="ignore")

View file

@ -54,7 +54,7 @@ def _seed(db):
status="approved", intake_questionnaire={"question_two": "answer two"}))
db.add(Service(name="Test Service", api_key="123456789"))
db.add(Permission(service_id=1, resource="test_resource", action="read"))
db.add(Group(name="Test Group", org_id=1))
db.add(Group(name="Test Group"))
db.flush()
group_model = db.get(Group, 1)
perm_model = db.get(Permission, 1)

View file

@ -1,414 +0,0 @@
"""
Act on resource tests only check for pass/fail on input validation. Logic is not tested.
"""
import pytest
from httpx import AsyncClient
from src.user.models import User
from .conftest import client, db_session
from src.iam.models import Group
@pytest.mark.anyio
async def test_post_act_on_resource_endpoint_success(client: AsyncClient):
body = {
"service": "Test Service",
"organisation": "Test Org",
"resource": "test_resource"
}
headers = {
"Authorization": "Bearer not_checked_when_auth_is_disabled",
"X-API-Key": "123456789"
}
resp = await client.post("/iam/can_act_on_resource?action=read", json=body, headers=headers)
data = resp.json()
assert resp.status_code == 200
assert data == True
@pytest.mark.anyio
@pytest.mark.parametrize(
"service, org, resource, action, expected_status",
[
(None, "Test Org", "test_resource", "read", 422),
(42, "Test Org", "test_resource", "read", 422),
("Test Service", None, "test_resource", "read", 422),
("Test Service", 42, "test_resource", "read", 422),
("Test Service", "Test Org", None, "read", 422),
("Test Service", "Test Org", 42, "read", 422),
],
)
@pytest.mark.anyio
async def test_post_act_on_resource_endpoint_failure(client: AsyncClient, service, org, resource, action,
expected_status: int):
body = {
"service": service,
"organisation": org,
"resource": resource
}
headers = {
"Authorization": "Bearer not_checked_when_auth_is_disabled",
"X-API-Key": "123456789"
}
resp = await client.post(f"/iam/can_act_on_resource?action={action}", json=body, headers=headers)
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_get_group_permissions_success(client: AsyncClient):
resp = await client.get("/iam/group/permissions?org_id=1&group_id=1")
data = resp.json()
assert resp.status_code == 200
assert "permissions" in data
assert type(data["permissions"]) == list
assert data["permissions"][0]["service_name"] == "Test Service"
assert data["permissions"][0]["resource"] == "test_resource"
assert data["permissions"][0]["action"] == "read"
@pytest.mark.parametrize(
"query, expected_status",
[
("org_id=2&group_id=1", 404), # Non-exists org, valid group
("org_id=banana&group_id=1", 422), # Invalid org, valid group
("org_id=&group_id=1", 422), # Blank org, valid group
("org_id=-1&group_id=1", 422), # Negative org, valid group
("group_id=1", 422), # Only group
("", 422), # Blank query
("org_id=&group_id=", 422), # Both blank
("org_id=1&group_id=2", 404), # Valid org, non-exists group
("org_id=1&group_id=banana", 422), # Valid org, invalid group
("org_id=1&group_id=", 422), # Valid org, blank group
("org_id=1&group_id=-1", 422), # Valid org, negative group
("org_id=1", 422), # Only org
],
)
@pytest.mark.anyio
async def test_get_group_permissions_failure(client: AsyncClient, query: str, expected_status: int):
resp = await client.get(f"/iam/group/permissions?{query}")
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_get_group_users_success(client: AsyncClient):
resp = await client.get("/iam/group/users?org_id=1&group_id=1")
data = resp.json()
assert resp.status_code == 200
assert "users" in data
assert type(data["users"]) == list
assert data["users"][0]["id"] == 1
assert data["users"][0]["first_name"] == "Admin"
assert data["users"][0]["last_name"] == "Test"
assert data["users"][0]["email"] == "admin@test.com"
@pytest.mark.parametrize(
"query, expected_status",
[
("org_id=2&group_id=1", 404), # Non-exists org, valid group
("org_id=banana&group_id=1", 422), # Invalid org, valid group
("org_id=&group_id=1", 422), # Blank org, valid group
("org_id=-1&group_id=1", 422), # Negative org, valid group
("group_id=1", 422), # Only group
("", 422), # Blank query
("org_id=&group_id=", 422), # Both blank
("org_id=1&group_id=2", 404), # Valid org, non-exists group
("org_id=1&group_id=banana", 422), # Valid org, invalid group
("org_id=1&group_id=", 422), # Valid org, blank group
("org_id=1&group_id=-1", 422), # Valid org, negative group
("org_id=1", 422), # Only org
],
)
@pytest.mark.anyio
async def test_get_group_users_failure(client: AsyncClient, query: str, expected_status: int):
resp = await client.get(f"/iam/group/users?{query}")
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_post_group_success(client: AsyncClient):
resp = await client.post("/iam/group", json={"name": "New Group", "organisation_id": 1})
data = resp.json()
assert resp.status_code == 200
assert "group" in data
assert type(data["group"]) == dict
assert data["group"]["name"] == "New Group"
assert data["group"]["id"] == 2
@pytest.mark.parametrize(
"body, expected_status",
[
({"organisation_id": 2, "name": "new group"}, 404), # Non-existent organisation, valid name
({"organisation_id": "banana", "name": "new group"}, 422), # Invalid organisation ID, valid name
({"organisation_id": "", "name": "new group"}, 422), # Blank organisation ID, valid name
({"organisation_id": -1, "name": "new group"}, 422), # Negative organisation ID, valid name
({"name": 1}, 422), # Only name
({}, 422), # Blank body
({"organisation_id": "", "name": ""}, 422), # Both blank
({"organisation_id": 1, "name": 42}, 422), # Valid organisation, invalid name
({"organisation_id": 1, "name": ""}, 422), # Valid organisation, blank name
({"organisation_id": 1, "name": "hi"}, 422), # Valid organisation, small name
({"organisation_id": 1}, 422), # Only organisation ID
],
)
@pytest.mark.anyio
async def test_post_group_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
resp = await client.post("/iam/group", json=body)
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_put_group_perm_success(client: AsyncClient, db_session):
db_session.add(Group(name="Test Group Two", org_id=1))
db_session.flush()
resp = await client.put("/iam/group/permission", json={"permission_id": 1, "group_id": 2, "organisation_id": 1})
data = resp.json()
assert resp.status_code == 200
assert "group" in data
assert type(data["group"]) == dict
assert data["group"]["name"] == "Test Group Two"
assert data["group"]["id"] == 2
assert "permissions" in data
assert type(data["permissions"]) == list
assert data["permissions"][0]["service_name"] == "Test Service"
assert data["permissions"][0]["resource"] == "test_resource"
assert data["permissions"][0]["action"] == "read"
@pytest.mark.parametrize(
"body, expected_status",
[
({"organisation_id": 42, "group_id": 1, "permission_id": 1}, 404), # Non-existent organisation
({"organisation_id": "banana", "group_id": 1, "permission_id": 1}, 422), # Invalid organisation ID
({"organisation_id": "", "group_id": 1, "permission_id": 1}, 422), # Blank organisation ID
({"organisation_id": -1, "group_id": 1, "permission_id": 1}, 422), # Negative organisation ID
({"organisation_id": 1, "group_id": 42, "permission_id": 1}, 404), # Non-existent group
({"organisation_id": 1, "group_id": "banana", "permission_id": 1}, 422), # Invalid group ID
({"organisation_id": 1, "group_id": "", "permission_id": 1}, 422), # Blank group ID
({"organisation_id": 1, "group_id": -1, "permission_id": 1}, 422), # Negative group ID
({"organisation_id": 1, "group_id": 1, "permission_id": 42}, 404), # Non-existent permission
({"organisation_id": 1, "group_id": 1, "permission_id": "banana"}, 422), # Invalid permission ID
({"organisation_id": 1, "group_id": 1, "permission_id": ""}, 422), # Blank permission ID
({"organisation_id": 1, "group_id": 1, "permission_id": -1}, 422), # Negative permission ID
({}, 422), # Blank body
({"permission_id": 1}, 422), # Only permission
({"organisation_id": 1}, 422), # Only organisation
({"group_id": 1}, 422), # Only group
({"organisation_id": 1, "permission_id": 1}, 422), # Missing group
({"group_id": 1, "permission_id": 1}, 422), # Missing organisation
({"organisation_id": 1, "group_id": 1}, 422), # Missing permission
],
)
@pytest.mark.anyio
async def test_put_group_perm_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
resp = await client.put("/iam/group/permission", json=body)
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_put_group_user_success(client: AsyncClient, db_session):
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
db_session.flush()
resp = await client.put("/iam/group/user", json={"user_id": 2, "group_id": 1, "organisation_id": 1})
data = resp.json()
assert resp.status_code == 200
assert "group" in data
assert type(data["group"]) == dict
assert data["group"]["name"] == "Test Group"
assert data["group"]["id"] == 1
assert "users" in data
assert type(data["users"]) == list
assert data["users"][1]["id"] == 2
assert data["users"][1]["first_name"] == "User"
assert data["users"][1]["last_name"] == "Test"
assert data["users"][1]["email"] == "user@test.org"
@pytest.mark.parametrize(
"body, expected_status",
[
({"organisation_id": 42, "group_id": 1, "user_id": 1}, 404), # Non-existent organisation
({"organisation_id": "banana", "group_id": 1, "user_id": 1}, 422), # Invalid organisation ID
({"organisation_id": "", "group_id": 1, "user_id": 1}, 422), # Blank organisation ID
({"organisation_id": -1, "group_id": 1, "user_id": 1}, 422), # Negative organisation ID
({"organisation_id": 1, "group_id": 42, "user_id": 1}, 404), # Non-existent group
({"organisation_id": 1, "group_id": "banana", "user_id": 1}, 422), # Invalid group ID
({"organisation_id": 1, "group_id": "", "user_id": 1}, 422), # Blank group ID
({"organisation_id": 1, "group_id": -1, "user_id": 1}, 422), # Negative group ID
({"organisation_id": 1, "group_id": 1, "user_id": 42}, 404), # Non-existent user
({"organisation_id": 1, "group_id": 1, "user_id": "banana"}, 422), # Invalid user ID
({"organisation_id": 1, "group_id": 1, "user_id": ""}, 422), # Blank user ID
({"organisation_id": 1, "group_id": 1, "user_id": -1}, 422), # Negative user ID
({}, 422), # Blank body
({"user_id": 1}, 422), # Only user
({"organisation_id": 1}, 422), # Only organisation
({"group_id": 1}, 422), # Only group
({"organisation_id": 1, "user_id": 1}, 422), # Missing group
({"group_id": 1, "user_id": 1}, 422), # Missing organisation
({"organisation_id": 1, "group_id": 1}, 422), # Missing user
],
)
@pytest.mark.anyio
async def test_put_group_user_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
resp = await client.put("/iam/group/user", json=body)
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_get_permissions_success(client: AsyncClient):
resp = await client.get("/iam/permissions?org_id=1")
data = resp.json()
assert resp.status_code == 200
assert "permissions" in data
assert type(data["permissions"]) == list
assert data["permissions"][0]["service_name"] == "Test Service"
assert data["permissions"][0]["resource"] == "test_resource"
assert data["permissions"][0]["action"] == "read"
@pytest.mark.parametrize(
"query, expected_status",
[
("org_id=42", 404), # Non-exists org
("org_id=banana", 422), # Invalid org
("org_id=", 422), # Blank org
("org_id=-1", 422), # Negative org
("", 422), # Blank query
],
)
@pytest.mark.anyio
async def test_get_permissions_failure(client: AsyncClient, query: str, expected_status: int):
resp = await client.get(f"/iam/permissions?{query}")
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_post_perm_success(client: AsyncClient, db_session):
resp = await client.post("/iam/permission", json={"service_id": 1, "resource": "test_resource", "action": "create"})
data = resp.json()
assert resp.status_code == 200
assert "permission" in data
assert data["permission"]["service_name"] == "Test Service"
assert data["permission"]["resource"] == "test_resource"
assert data["permission"]["action"] == "create"
@pytest.mark.parametrize(
"body, expected_status",
[
# service_id tests
({"service_id": 42, "resource": "test_resource", "action": "read"}, 404), # Non-existent service
({"service_id": "banana", "resource": "test_resource", "action": "read"}, 422), # Invalid service ID
({"service_id": "", "resource": "test_resource", "action": "read"}, 422), # Blank service ID
({"service_id": -1, "resource": "test_resource", "action": "read"}, 422), # Negative service ID
# resource tests
({"service_id": 1, "resource": 42, "action": "read"}, 422), # Invalid resource type
# action tests
({"service_id": 1, "resource": "test_resource", "action": 42}, 422), # Invalid action type
# missing/partial body tests
({}, 422), # Blank body
({"resource": "test_resource"}, 422), # Only resource
({"action": "read"}, 422), # Only action
({"service_id": 1}, 422), # Only service
({"service_id": 1, "action": "read"}, 422), # Missing resource
({"service_id": 1, "resource": "test_resource"}, 422), # Missing action
({"resource": "test_resource", "action": "read"}, 422), # Missing service
],
)
@pytest.mark.anyio
async def test_post_perm_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
resp = await client.post("/iam/permission", json=body)
assert resp.status_code == expected_status
@pytest.mark.parametrize(
"body",
[
{"organisation_id": 1, "service_id": 1, "resource": "test_resource", "action": "read"},
{"organisation_id": 1, "service_id": 1},
{"organisation_id": 1, "resource": "test_resource"},
{"organisation_id": 1, "action": "read"},
{"organisation_id": 1, "service_id": 1, "action": "read"},
{"organisation_id": 1, "service_id": 1, "resource": "test_resource"},
{"organisation_id": 1, "resource": "test_resource", "action": "read"},
],
)
@pytest.mark.anyio
async def test_post_perm_search_success(client: AsyncClient, db_session, body):
resp = await client.post("/iam/permissions/search", json=body)
data = resp.json()
assert resp.status_code == 200
assert "permissions" in data
assert type(data["permissions"]) == list
assert data["permissions"][0]["service_name"] == "Test Service"
assert data["permissions"][0]["resource"] == "test_resource"
assert data["permissions"][0]["action"] == "read"
@pytest.mark.parametrize(
"body, expected_status",
[
# organisation_id tests
({"organisation_id": 42, "service_id": 1, "resource": "test_resource", "action": "read"}, 404), # Non-existent organisation
({"organisation_id": "banana", "service_id": 1, "resource": "test_resource", "action": "read"}, 422), # Invalid organisation ID
({"organisation_id": "", "service_id": 1, "resource": "test_resource", "action": "read"}, 422), # Blank organisation ID
({"organisation_id": -1, "service_id": 1, "resource": "test_resource", "action": "read"}, 422), # Negative organisation ID
# service_id tests
({"organisation_id": 1, "service_id": "banana", "resource": "test_resource", "action": "read"}, 422), # Invalid service ID
({"organisation_id": 1, "service_id": "", "resource": "test_resource", "action": "read"}, 422), # Blank service ID
({"organisation_id": 1, "service_id": -1, "resource": "test_resource", "action": "read"}, 422), # Negative service ID
# resource tests
({"organisation_id": 1, "service_id": 1, "resource": 42, "action": "read"}, 422), # Invalid resource type
# action tests
({"organisation_id": 1, "service_id": 1, "resource": "test_resource", "action": 42}, 422), # Invalid action type
# missing/partial body tests
({}, 422), # Blank body
],
)
@pytest.mark.anyio
async def test_post_perm_search_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
resp = await client.post("/iam/permissions/search", json=body)
assert resp.status_code == expected_status