Compare commits
7 commits
9403e9291f
...
806bbfcbfc
| Author | SHA1 | Date | |
|---|---|---|---|
| 806bbfcbfc | |||
| 65a9514be6 | |||
| 6f174deefc | |||
| b8b5b6dbd3 | |||
| ae0181c3ff | |||
| 5d1606aa9d | |||
| 511480dffe |
7 changed files with 476 additions and 25 deletions
34
.alembic/versions/2026-05-29_drop_user_groups_org_column.py
Normal file
34
.alembic/versions/2026-05-29_drop_user_groups_org_column.py
Normal file
|
|
@ -0,0 +1,34 @@
|
||||||
|
"""Drop user_groups org column
|
||||||
|
|
||||||
|
Revision ID: d9dc6986fe38
|
||||||
|
Revises: 8132c4b88665
|
||||||
|
Create Date: 2026-05-29 16:10:00.320982
|
||||||
|
|
||||||
|
"""
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = 'd9dc6986fe38'
|
||||||
|
down_revision: Union[str, Sequence[str], None] = '8132c4b88665'
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
"""Upgrade schema."""
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.drop_constraint(op.f('user_groups_org_id_fkey'), 'user_groups', type_='foreignkey')
|
||||||
|
op.drop_column('user_groups', 'org_id')
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
"""Downgrade schema."""
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.add_column('user_groups', sa.Column('org_id', sa.INTEGER(), autoincrement=False, nullable=False))
|
||||||
|
op.create_foreign_key(op.f('user_groups_org_id_fkey'), 'user_groups', 'organisation', ['org_id'], ['id'], ondelete='CASCADE')
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
@ -78,6 +78,5 @@ class GroupPermissions(Base):
|
||||||
|
|
||||||
class UserGroups(Base):
|
class UserGroups(Base):
|
||||||
__tablename__ = "user_groups"
|
__tablename__ = "user_groups"
|
||||||
org_id = Column(Integer, ForeignKey("organisation.id", ondelete="CASCADE"), primary_key=True)
|
|
||||||
user_id = Column(Integer, ForeignKey("user.id", ondelete="CASCADE"), primary_key=True)
|
user_id = Column(Integer, ForeignKey("user.id", ondelete="CASCADE"), primary_key=True)
|
||||||
group_id = Column(Integer, ForeignKey("group.id", ondelete="CASCADE"), primary_key=True)
|
group_id = Column(Integer, ForeignKey("group.id", ondelete="CASCADE"), primary_key=True)
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ from fastapi import APIRouter, status
|
||||||
from sqlalchemy.exc import IntegrityError
|
from sqlalchemy.exc import IntegrityError
|
||||||
from psycopg import errors
|
from psycopg import errors
|
||||||
|
|
||||||
|
from service.exceptions import ServiceNotFoundException
|
||||||
from src.exceptions import ConflictException
|
from src.exceptions import ConflictException
|
||||||
from src.database import db_dependency
|
from src.database import db_dependency
|
||||||
from src.schemas import ResourceName
|
from src.schemas import ResourceName
|
||||||
|
|
@ -122,7 +123,7 @@ async def add_group_permission(db: db_dependency, group_model: group_model_body_
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
@router.put("/group/user")
|
@router.put("/group/user", response_model=IAMPutGroupUserResponse)
|
||||||
async def add_group_user(db: db_dependency, group_model: group_model_body_dependency, user_model: user_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPutGroupUserRequest):
|
async def add_group_user(db: db_dependency, group_model: group_model_body_dependency, user_model: user_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPutGroupUserRequest):
|
||||||
if group_model.org_id != org_model.id:
|
if group_model.org_id != org_model.id:
|
||||||
raise UnauthorizedException()
|
raise UnauthorizedException()
|
||||||
|
|
@ -164,24 +165,27 @@ async def remove_group_user(db: db_dependency, group_model: group_model_body_dep
|
||||||
|
|
||||||
|
|
||||||
@router.get("/permissions", response_model=IAMGetPermissionsResponse)
|
@router.get("/permissions", response_model=IAMGetPermissionsResponse)
|
||||||
async def get_permissions(db: db_dependency, org_model: org_model_root_claim_body_dependency):
|
async def get_permissions(db: db_dependency, org_model: org_model_root_claim_query_dependency):
|
||||||
permission_models = db.query(Perm).all()
|
permission_models = db.query(Perm).all()
|
||||||
|
|
||||||
return {"permissions": permission_models}
|
return {"permissions": permission_models}
|
||||||
|
|
||||||
|
|
||||||
@router.post("/permission")
|
@router.post("/permission", response_model=IAMPostPermissionResponse)
|
||||||
async def create_new_permission(db: db_dependency, su: super_admin_dependency, request_mode: IAMPostPermissionRequest):
|
async def create_new_permission(db: db_dependency, su: super_admin_dependency, request_model: IAMPostPermissionRequest):
|
||||||
perm_model = Perm(**request_mode.__dict__)
|
service_model = db.get(Service, request_model.service_id)
|
||||||
|
if service_model is None:
|
||||||
|
raise ServiceNotFoundException(service_id=request_model.service_id)
|
||||||
|
perm_model = Perm(**request_model.__dict__)
|
||||||
try:
|
try:
|
||||||
db.add(perm_model)
|
db.add(perm_model)
|
||||||
except IntegrityError as e:
|
except IntegrityError as e:
|
||||||
if isinstance(e.orig, errors.UniqueViolation):
|
if isinstance(e.orig, errors.UniqueViolation):
|
||||||
raise ConflictException(message="Permission already exists")
|
raise ConflictException(message="Permission already exists")
|
||||||
db.flush()
|
db.flush()
|
||||||
response = IAMPostPermissionResponse(permission=PermissionSchema(**perm_model.__dict__))
|
response = {"service_name": perm_model.service_name, "resource": perm_model.resource, "action": perm_model.action}
|
||||||
db.commit()
|
db.commit()
|
||||||
return response
|
return {"permission": response}
|
||||||
|
|
||||||
|
|
||||||
@router.delete("/permission", status_code=status.HTTP_204_NO_CONTENT)
|
@router.delete("/permission", status_code=status.HTTP_204_NO_CONTENT)
|
||||||
|
|
@ -190,18 +194,18 @@ async def delete_permission(db: db_dependency, su: super_admin_dependency, perm_
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
|
|
||||||
@router.get("/permissions/search", response_model=IAMGetPermissionsSearchResponse)
|
@router.post("/permissions/search", response_model=IAMGetPermissionsSearchResponse)
|
||||||
async def get_permissions(db: db_dependency, org_model: org_model_root_claim_body_dependency, search: IAMGetPermissionsSearchRequest):
|
async def get_permissions(db: db_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMGetPermissionsSearchRequest):
|
||||||
permission_query = db.query(Perm)
|
permission_query = db.query(Perm)
|
||||||
|
|
||||||
if search.service_id is not None:
|
if request_model.service_id is not None:
|
||||||
permission_query = permission_query.filter(Perm.service_id == search.service_id)
|
permission_query = permission_query.filter(Perm.service_id == request_model.service_id)
|
||||||
|
|
||||||
if search.resource is not None:
|
if request_model.resource is not None:
|
||||||
permission_query = permission_query.filter(Perm.resource == search.resource)
|
permission_query = permission_query.filter(Perm.resource == request_model.resource)
|
||||||
|
|
||||||
if search.action is not None:
|
if request_model.action is not None:
|
||||||
permission_query = permission_query.filter(Perm.action == search. action)
|
permission_query = permission_query.filter(Perm.action == request_model. action)
|
||||||
|
|
||||||
permission_models = permission_query.all()
|
permission_models = permission_query.all()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,10 +6,11 @@ Models follow the nomenclature of:
|
||||||
- Mixins: "<Attribute>Mixin"
|
- Mixins: "<Attribute>Mixin"
|
||||||
- Models: "<Module><Method><Resource><Opt:Resource><Direction>" ie "IAMGetGroupPermissionsResponse"
|
- Models: "<Module><Method><Resource><Opt:Resource><Direction>" ie "IAMGetGroupPermissionsResponse"
|
||||||
"""
|
"""
|
||||||
from typing import Optional
|
from typing import Optional, Annotated
|
||||||
|
|
||||||
from pydantic import EmailStr, ConfigDict, Field
|
from pydantic import EmailStr, ConfigDict, Field
|
||||||
|
|
||||||
|
from src.service.schemas import ServiceIDMixin
|
||||||
from src.organisation.schemas import OrgIDMixin
|
from src.organisation.schemas import OrgIDMixin
|
||||||
from src.schemas import CustomBaseModel
|
from src.schemas import CustomBaseModel
|
||||||
from user.schemas import UserIDMixin
|
from user.schemas import UserIDMixin
|
||||||
|
|
@ -83,8 +84,7 @@ class IAMDeleteGroupUserResponse(CustomBaseModel):
|
||||||
class IAMGetPermissionsResponse(CustomBaseModel):
|
class IAMGetPermissionsResponse(CustomBaseModel):
|
||||||
permissions: list[PermissionSchema]
|
permissions: list[PermissionSchema]
|
||||||
|
|
||||||
class IAMPostPermissionRequest(CustomBaseModel):
|
class IAMPostPermissionRequest(ServiceIDMixin):
|
||||||
service_id: int
|
|
||||||
resource: str
|
resource: str
|
||||||
action: str
|
action: str
|
||||||
|
|
||||||
|
|
@ -94,8 +94,8 @@ class IAMPostPermissionResponse(CustomBaseModel):
|
||||||
class IAMDeletePermissionRequest(PermIDMixin):
|
class IAMDeletePermissionRequest(PermIDMixin):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class IAMGetPermissionsSearchRequest(CustomBaseModel):
|
class IAMGetPermissionsSearchRequest(OrgIDMixin):
|
||||||
service_id: Optional[int] = None
|
service_id: Annotated[int | None, Field(gt=0)] = None
|
||||||
resource: Optional[str] = None
|
resource: Optional[str] = None
|
||||||
action: Optional[str] = None
|
action: Optional[str] = None
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,12 +6,12 @@ Models follow the nomenclature of:
|
||||||
- Mixins: "<Attribute>Mixin"
|
- Mixins: "<Attribute>Mixin"
|
||||||
- Models: "<Module><Method><Resource><Opt:Resource><Direction>" ie "ServiceGetServiceResponse"
|
- Models: "<Module><Method><Resource><Opt:Resource><Direction>" ie "ServiceGetServiceResponse"
|
||||||
"""
|
"""
|
||||||
from pydantic import ConfigDict
|
from pydantic import ConfigDict, Field
|
||||||
|
|
||||||
from src.schemas import CustomBaseModel
|
from src.schemas import CustomBaseModel
|
||||||
|
|
||||||
class ServiceIDMixin(CustomBaseModel):
|
class ServiceIDMixin(CustomBaseModel):
|
||||||
service_id: int
|
service_id: int = Field(gt=0)
|
||||||
|
|
||||||
class ServiceSchema(CustomBaseModel):
|
class ServiceSchema(CustomBaseModel):
|
||||||
model_config = ConfigDict(from_attributes=True, extra="ignore")
|
model_config = ConfigDict(from_attributes=True, extra="ignore")
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,7 @@ def _seed(db):
|
||||||
status="approved", intake_questionnaire={"question_two": "answer two"}))
|
status="approved", intake_questionnaire={"question_two": "answer two"}))
|
||||||
db.add(Service(name="Test Service", api_key="123456789"))
|
db.add(Service(name="Test Service", api_key="123456789"))
|
||||||
db.add(Permission(service_id=1, resource="test_resource", action="read"))
|
db.add(Permission(service_id=1, resource="test_resource", action="read"))
|
||||||
db.add(Group(name="Test Group"))
|
db.add(Group(name="Test Group", org_id=1))
|
||||||
db.flush()
|
db.flush()
|
||||||
group_model = db.get(Group, 1)
|
group_model = db.get(Group, 1)
|
||||||
perm_model = db.get(Permission, 1)
|
perm_model = db.get(Permission, 1)
|
||||||
|
|
|
||||||
414
test/test_iam.py
Normal file
414
test/test_iam.py
Normal file
|
|
@ -0,0 +1,414 @@
|
||||||
|
"""
|
||||||
|
Act on resource tests only check for pass/fail on input validation. Logic is not tested.
|
||||||
|
"""
|
||||||
|
import pytest
|
||||||
|
from httpx import AsyncClient
|
||||||
|
|
||||||
|
from src.user.models import User
|
||||||
|
from .conftest import client, db_session
|
||||||
|
|
||||||
|
from src.iam.models import Group
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_post_act_on_resource_endpoint_success(client: AsyncClient):
|
||||||
|
body = {
|
||||||
|
"service": "Test Service",
|
||||||
|
"organisation": "Test Org",
|
||||||
|
"resource": "test_resource"
|
||||||
|
}
|
||||||
|
headers = {
|
||||||
|
"Authorization": "Bearer not_checked_when_auth_is_disabled",
|
||||||
|
"X-API-Key": "123456789"
|
||||||
|
}
|
||||||
|
resp = await client.post("/iam/can_act_on_resource?action=read", json=body, headers=headers)
|
||||||
|
data = resp.json()
|
||||||
|
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert data == True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"service, org, resource, action, expected_status",
|
||||||
|
[
|
||||||
|
(None, "Test Org", "test_resource", "read", 422),
|
||||||
|
(42, "Test Org", "test_resource", "read", 422),
|
||||||
|
("Test Service", None, "test_resource", "read", 422),
|
||||||
|
("Test Service", 42, "test_resource", "read", 422),
|
||||||
|
("Test Service", "Test Org", None, "read", 422),
|
||||||
|
("Test Service", "Test Org", 42, "read", 422),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_post_act_on_resource_endpoint_failure(client: AsyncClient, service, org, resource, action,
|
||||||
|
expected_status: int):
|
||||||
|
body = {
|
||||||
|
"service": service,
|
||||||
|
"organisation": org,
|
||||||
|
"resource": resource
|
||||||
|
}
|
||||||
|
headers = {
|
||||||
|
"Authorization": "Bearer not_checked_when_auth_is_disabled",
|
||||||
|
"X-API-Key": "123456789"
|
||||||
|
}
|
||||||
|
resp = await client.post(f"/iam/can_act_on_resource?action={action}", json=body, headers=headers)
|
||||||
|
|
||||||
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_group_permissions_success(client: AsyncClient):
|
||||||
|
resp = await client.get("/iam/group/permissions?org_id=1&group_id=1")
|
||||||
|
data = resp.json()
|
||||||
|
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert "permissions" in data
|
||||||
|
assert type(data["permissions"]) == list
|
||||||
|
assert data["permissions"][0]["service_name"] == "Test Service"
|
||||||
|
assert data["permissions"][0]["resource"] == "test_resource"
|
||||||
|
assert data["permissions"][0]["action"] == "read"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"query, expected_status",
|
||||||
|
[
|
||||||
|
("org_id=2&group_id=1", 404), # Non-exists org, valid group
|
||||||
|
("org_id=banana&group_id=1", 422), # Invalid org, valid group
|
||||||
|
("org_id=&group_id=1", 422), # Blank org, valid group
|
||||||
|
("org_id=-1&group_id=1", 422), # Negative org, valid group
|
||||||
|
("group_id=1", 422), # Only group
|
||||||
|
("", 422), # Blank query
|
||||||
|
("org_id=&group_id=", 422), # Both blank
|
||||||
|
("org_id=1&group_id=2", 404), # Valid org, non-exists group
|
||||||
|
("org_id=1&group_id=banana", 422), # Valid org, invalid group
|
||||||
|
("org_id=1&group_id=", 422), # Valid org, blank group
|
||||||
|
("org_id=1&group_id=-1", 422), # Valid org, negative group
|
||||||
|
("org_id=1", 422), # Only org
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_group_permissions_failure(client: AsyncClient, query: str, expected_status: int):
|
||||||
|
resp = await client.get(f"/iam/group/permissions?{query}")
|
||||||
|
|
||||||
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_group_users_success(client: AsyncClient):
|
||||||
|
resp = await client.get("/iam/group/users?org_id=1&group_id=1")
|
||||||
|
data = resp.json()
|
||||||
|
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert "users" in data
|
||||||
|
assert type(data["users"]) == list
|
||||||
|
assert data["users"][0]["id"] == 1
|
||||||
|
assert data["users"][0]["first_name"] == "Admin"
|
||||||
|
assert data["users"][0]["last_name"] == "Test"
|
||||||
|
assert data["users"][0]["email"] == "admin@test.com"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"query, expected_status",
|
||||||
|
[
|
||||||
|
("org_id=2&group_id=1", 404), # Non-exists org, valid group
|
||||||
|
("org_id=banana&group_id=1", 422), # Invalid org, valid group
|
||||||
|
("org_id=&group_id=1", 422), # Blank org, valid group
|
||||||
|
("org_id=-1&group_id=1", 422), # Negative org, valid group
|
||||||
|
("group_id=1", 422), # Only group
|
||||||
|
("", 422), # Blank query
|
||||||
|
("org_id=&group_id=", 422), # Both blank
|
||||||
|
("org_id=1&group_id=2", 404), # Valid org, non-exists group
|
||||||
|
("org_id=1&group_id=banana", 422), # Valid org, invalid group
|
||||||
|
("org_id=1&group_id=", 422), # Valid org, blank group
|
||||||
|
("org_id=1&group_id=-1", 422), # Valid org, negative group
|
||||||
|
("org_id=1", 422), # Only org
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_group_users_failure(client: AsyncClient, query: str, expected_status: int):
|
||||||
|
resp = await client.get(f"/iam/group/users?{query}")
|
||||||
|
|
||||||
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_post_group_success(client: AsyncClient):
|
||||||
|
resp = await client.post("/iam/group", json={"name": "New Group", "organisation_id": 1})
|
||||||
|
data = resp.json()
|
||||||
|
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert "group" in data
|
||||||
|
assert type(data["group"]) == dict
|
||||||
|
assert data["group"]["name"] == "New Group"
|
||||||
|
assert data["group"]["id"] == 2
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"body, expected_status",
|
||||||
|
[
|
||||||
|
({"organisation_id": 2, "name": "new group"}, 404), # Non-existent organisation, valid name
|
||||||
|
({"organisation_id": "banana", "name": "new group"}, 422), # Invalid organisation ID, valid name
|
||||||
|
({"organisation_id": "", "name": "new group"}, 422), # Blank organisation ID, valid name
|
||||||
|
({"organisation_id": -1, "name": "new group"}, 422), # Negative organisation ID, valid name
|
||||||
|
({"name": 1}, 422), # Only name
|
||||||
|
({}, 422), # Blank body
|
||||||
|
({"organisation_id": "", "name": ""}, 422), # Both blank
|
||||||
|
({"organisation_id": 1, "name": 42}, 422), # Valid organisation, invalid name
|
||||||
|
({"organisation_id": 1, "name": ""}, 422), # Valid organisation, blank name
|
||||||
|
({"organisation_id": 1, "name": "hi"}, 422), # Valid organisation, small name
|
||||||
|
({"organisation_id": 1}, 422), # Only organisation ID
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_post_group_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
|
||||||
|
resp = await client.post("/iam/group", json=body)
|
||||||
|
|
||||||
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_put_group_perm_success(client: AsyncClient, db_session):
|
||||||
|
db_session.add(Group(name="Test Group Two", org_id=1))
|
||||||
|
db_session.flush()
|
||||||
|
resp = await client.put("/iam/group/permission", json={"permission_id": 1, "group_id": 2, "organisation_id": 1})
|
||||||
|
data = resp.json()
|
||||||
|
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert "group" in data
|
||||||
|
assert type(data["group"]) == dict
|
||||||
|
assert data["group"]["name"] == "Test Group Two"
|
||||||
|
assert data["group"]["id"] == 2
|
||||||
|
|
||||||
|
assert "permissions" in data
|
||||||
|
assert type(data["permissions"]) == list
|
||||||
|
assert data["permissions"][0]["service_name"] == "Test Service"
|
||||||
|
assert data["permissions"][0]["resource"] == "test_resource"
|
||||||
|
assert data["permissions"][0]["action"] == "read"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"body, expected_status",
|
||||||
|
[
|
||||||
|
({"organisation_id": 42, "group_id": 1, "permission_id": 1}, 404), # Non-existent organisation
|
||||||
|
({"organisation_id": "banana", "group_id": 1, "permission_id": 1}, 422), # Invalid organisation ID
|
||||||
|
({"organisation_id": "", "group_id": 1, "permission_id": 1}, 422), # Blank organisation ID
|
||||||
|
({"organisation_id": -1, "group_id": 1, "permission_id": 1}, 422), # Negative organisation ID
|
||||||
|
|
||||||
|
({"organisation_id": 1, "group_id": 42, "permission_id": 1}, 404), # Non-existent group
|
||||||
|
({"organisation_id": 1, "group_id": "banana", "permission_id": 1}, 422), # Invalid group ID
|
||||||
|
({"organisation_id": 1, "group_id": "", "permission_id": 1}, 422), # Blank group ID
|
||||||
|
({"organisation_id": 1, "group_id": -1, "permission_id": 1}, 422), # Negative group ID
|
||||||
|
|
||||||
|
({"organisation_id": 1, "group_id": 1, "permission_id": 42}, 404), # Non-existent permission
|
||||||
|
({"organisation_id": 1, "group_id": 1, "permission_id": "banana"}, 422), # Invalid permission ID
|
||||||
|
({"organisation_id": 1, "group_id": 1, "permission_id": ""}, 422), # Blank permission ID
|
||||||
|
({"organisation_id": 1, "group_id": 1, "permission_id": -1}, 422), # Negative permission ID
|
||||||
|
|
||||||
|
({}, 422), # Blank body
|
||||||
|
({"permission_id": 1}, 422), # Only permission
|
||||||
|
({"organisation_id": 1}, 422), # Only organisation
|
||||||
|
({"group_id": 1}, 422), # Only group
|
||||||
|
|
||||||
|
({"organisation_id": 1, "permission_id": 1}, 422), # Missing group
|
||||||
|
({"group_id": 1, "permission_id": 1}, 422), # Missing organisation
|
||||||
|
({"organisation_id": 1, "group_id": 1}, 422), # Missing permission
|
||||||
|
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_put_group_perm_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
|
||||||
|
resp = await client.put("/iam/group/permission", json=body)
|
||||||
|
|
||||||
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_put_group_user_success(client: AsyncClient, db_session):
|
||||||
|
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
||||||
|
db_session.flush()
|
||||||
|
|
||||||
|
resp = await client.put("/iam/group/user", json={"user_id": 2, "group_id": 1, "organisation_id": 1})
|
||||||
|
data = resp.json()
|
||||||
|
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert "group" in data
|
||||||
|
assert type(data["group"]) == dict
|
||||||
|
assert data["group"]["name"] == "Test Group"
|
||||||
|
assert data["group"]["id"] == 1
|
||||||
|
|
||||||
|
assert "users" in data
|
||||||
|
assert type(data["users"]) == list
|
||||||
|
assert data["users"][1]["id"] == 2
|
||||||
|
assert data["users"][1]["first_name"] == "User"
|
||||||
|
assert data["users"][1]["last_name"] == "Test"
|
||||||
|
assert data["users"][1]["email"] == "user@test.org"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"body, expected_status",
|
||||||
|
[
|
||||||
|
({"organisation_id": 42, "group_id": 1, "user_id": 1}, 404), # Non-existent organisation
|
||||||
|
({"organisation_id": "banana", "group_id": 1, "user_id": 1}, 422), # Invalid organisation ID
|
||||||
|
({"organisation_id": "", "group_id": 1, "user_id": 1}, 422), # Blank organisation ID
|
||||||
|
({"organisation_id": -1, "group_id": 1, "user_id": 1}, 422), # Negative organisation ID
|
||||||
|
|
||||||
|
({"organisation_id": 1, "group_id": 42, "user_id": 1}, 404), # Non-existent group
|
||||||
|
({"organisation_id": 1, "group_id": "banana", "user_id": 1}, 422), # Invalid group ID
|
||||||
|
({"organisation_id": 1, "group_id": "", "user_id": 1}, 422), # Blank group ID
|
||||||
|
({"organisation_id": 1, "group_id": -1, "user_id": 1}, 422), # Negative group ID
|
||||||
|
|
||||||
|
({"organisation_id": 1, "group_id": 1, "user_id": 42}, 404), # Non-existent user
|
||||||
|
({"organisation_id": 1, "group_id": 1, "user_id": "banana"}, 422), # Invalid user ID
|
||||||
|
({"organisation_id": 1, "group_id": 1, "user_id": ""}, 422), # Blank user ID
|
||||||
|
({"organisation_id": 1, "group_id": 1, "user_id": -1}, 422), # Negative user ID
|
||||||
|
|
||||||
|
({}, 422), # Blank body
|
||||||
|
({"user_id": 1}, 422), # Only user
|
||||||
|
({"organisation_id": 1}, 422), # Only organisation
|
||||||
|
({"group_id": 1}, 422), # Only group
|
||||||
|
|
||||||
|
({"organisation_id": 1, "user_id": 1}, 422), # Missing group
|
||||||
|
({"group_id": 1, "user_id": 1}, 422), # Missing organisation
|
||||||
|
({"organisation_id": 1, "group_id": 1}, 422), # Missing user
|
||||||
|
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_put_group_user_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
|
||||||
|
resp = await client.put("/iam/group/user", json=body)
|
||||||
|
|
||||||
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_permissions_success(client: AsyncClient):
|
||||||
|
resp = await client.get("/iam/permissions?org_id=1")
|
||||||
|
data = resp.json()
|
||||||
|
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert "permissions" in data
|
||||||
|
assert type(data["permissions"]) == list
|
||||||
|
assert data["permissions"][0]["service_name"] == "Test Service"
|
||||||
|
assert data["permissions"][0]["resource"] == "test_resource"
|
||||||
|
assert data["permissions"][0]["action"] == "read"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"query, expected_status",
|
||||||
|
[
|
||||||
|
("org_id=42", 404), # Non-exists org
|
||||||
|
("org_id=banana", 422), # Invalid org
|
||||||
|
("org_id=", 422), # Blank org
|
||||||
|
("org_id=-1", 422), # Negative org
|
||||||
|
("", 422), # Blank query
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_permissions_failure(client: AsyncClient, query: str, expected_status: int):
|
||||||
|
resp = await client.get(f"/iam/permissions?{query}")
|
||||||
|
|
||||||
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_post_perm_success(client: AsyncClient, db_session):
|
||||||
|
resp = await client.post("/iam/permission", json={"service_id": 1, "resource": "test_resource", "action": "create"})
|
||||||
|
data = resp.json()
|
||||||
|
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert "permission" in data
|
||||||
|
assert data["permission"]["service_name"] == "Test Service"
|
||||||
|
assert data["permission"]["resource"] == "test_resource"
|
||||||
|
assert data["permission"]["action"] == "create"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"body, expected_status",
|
||||||
|
[
|
||||||
|
# service_id tests
|
||||||
|
({"service_id": 42, "resource": "test_resource", "action": "read"}, 404), # Non-existent service
|
||||||
|
({"service_id": "banana", "resource": "test_resource", "action": "read"}, 422), # Invalid service ID
|
||||||
|
({"service_id": "", "resource": "test_resource", "action": "read"}, 422), # Blank service ID
|
||||||
|
({"service_id": -1, "resource": "test_resource", "action": "read"}, 422), # Negative service ID
|
||||||
|
|
||||||
|
# resource tests
|
||||||
|
({"service_id": 1, "resource": 42, "action": "read"}, 422), # Invalid resource type
|
||||||
|
|
||||||
|
# action tests
|
||||||
|
({"service_id": 1, "resource": "test_resource", "action": 42}, 422), # Invalid action type
|
||||||
|
|
||||||
|
# missing/partial body tests
|
||||||
|
({}, 422), # Blank body
|
||||||
|
({"resource": "test_resource"}, 422), # Only resource
|
||||||
|
({"action": "read"}, 422), # Only action
|
||||||
|
({"service_id": 1}, 422), # Only service
|
||||||
|
|
||||||
|
({"service_id": 1, "action": "read"}, 422), # Missing resource
|
||||||
|
({"service_id": 1, "resource": "test_resource"}, 422), # Missing action
|
||||||
|
({"resource": "test_resource", "action": "read"}, 422), # Missing service
|
||||||
|
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_post_perm_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
|
||||||
|
resp = await client.post("/iam/permission", json=body)
|
||||||
|
|
||||||
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"body",
|
||||||
|
[
|
||||||
|
{"organisation_id": 1, "service_id": 1, "resource": "test_resource", "action": "read"},
|
||||||
|
|
||||||
|
{"organisation_id": 1, "service_id": 1},
|
||||||
|
{"organisation_id": 1, "resource": "test_resource"},
|
||||||
|
{"organisation_id": 1, "action": "read"},
|
||||||
|
{"organisation_id": 1, "service_id": 1, "action": "read"},
|
||||||
|
{"organisation_id": 1, "service_id": 1, "resource": "test_resource"},
|
||||||
|
{"organisation_id": 1, "resource": "test_resource", "action": "read"},
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_post_perm_search_success(client: AsyncClient, db_session, body):
|
||||||
|
resp = await client.post("/iam/permissions/search", json=body)
|
||||||
|
data = resp.json()
|
||||||
|
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert "permissions" in data
|
||||||
|
assert type(data["permissions"]) == list
|
||||||
|
assert data["permissions"][0]["service_name"] == "Test Service"
|
||||||
|
assert data["permissions"][0]["resource"] == "test_resource"
|
||||||
|
assert data["permissions"][0]["action"] == "read"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"body, expected_status",
|
||||||
|
[
|
||||||
|
# organisation_id tests
|
||||||
|
({"organisation_id": 42, "service_id": 1, "resource": "test_resource", "action": "read"}, 404), # Non-existent organisation
|
||||||
|
({"organisation_id": "banana", "service_id": 1, "resource": "test_resource", "action": "read"}, 422), # Invalid organisation ID
|
||||||
|
({"organisation_id": "", "service_id": 1, "resource": "test_resource", "action": "read"}, 422), # Blank organisation ID
|
||||||
|
({"organisation_id": -1, "service_id": 1, "resource": "test_resource", "action": "read"}, 422), # Negative organisation ID
|
||||||
|
|
||||||
|
# service_id tests
|
||||||
|
({"organisation_id": 1, "service_id": "banana", "resource": "test_resource", "action": "read"}, 422), # Invalid service ID
|
||||||
|
({"organisation_id": 1, "service_id": "", "resource": "test_resource", "action": "read"}, 422), # Blank service ID
|
||||||
|
({"organisation_id": 1, "service_id": -1, "resource": "test_resource", "action": "read"}, 422), # Negative service ID
|
||||||
|
|
||||||
|
# resource tests
|
||||||
|
({"organisation_id": 1, "service_id": 1, "resource": 42, "action": "read"}, 422), # Invalid resource type
|
||||||
|
|
||||||
|
# action tests
|
||||||
|
({"organisation_id": 1, "service_id": 1, "resource": "test_resource", "action": 42}, 422), # Invalid action type
|
||||||
|
|
||||||
|
# missing/partial body tests
|
||||||
|
({}, 422), # Blank body
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_post_perm_search_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
|
||||||
|
resp = await client.post("/iam/permissions/search", json=body)
|
||||||
|
|
||||||
|
assert resp.status_code == expected_status
|
||||||
Loading…
Add table
Add a link
Reference in a new issue