tests: improved coverage

This commit is contained in:
Chris Milne 2026-06-05 09:10:55 +01:00
parent c8024daa97
commit f600664789
5 changed files with 178 additions and 11 deletions

View file

@ -88,7 +88,7 @@ async def get_group_permissions(group_model: group_model_query_dependency, org_m
@router.get("/group/users", response_model=IAMGetGroupUsersResponse) @router.get("/group/users", response_model=IAMGetGroupUsersResponse)
async def get_group_users(group_model: group_model_query_dependency, org_model: org_model_root_claim_query_dependency): async def get_group_users(group_model: group_model_query_dependency, org_model: org_model_root_claim_query_dependency):
if group_model.org_id != org_model.id: if group_model.org_id != org_model.id:
raise UnauthorizedException("User does not belong to this organization") raise UnauthorizedException("Group does not belong to this organization")
return {"users": group_model.user_rel} return {"users": group_model.user_rel}

View file

@ -137,8 +137,6 @@ async def update_questionnaire(db: db_dependency, org_model: org_model_root_clai
if hasattr(questionnaire_model, key): if hasattr(questionnaire_model, key):
setattr(questionnaire_model, key, value) setattr(questionnaire_model, key, value)
else: else:
if key == "partial" or key == "organisation_id":
continue
raise UnprocessableContentException("Invalid keys in update request") raise UnprocessableContentException("Invalid keys in update request")
# Allows for partially completed questionnaires to be saved without being submitted for review # Allows for partially completed questionnaires to be saved without being submitted for review
@ -241,7 +239,7 @@ async def update_root_user(db: db_dependency, org_model: org_model_body_dependen
Promotes an existing organisation user to the root user, giving them full control of the org. Promotes an existing organisation user to the root user, giving them full control of the org.
""" """
if user_model not in org_model.user_rel: if user_model not in org_model.user_rel:
raise UnauthorizedException(message="This user does not belong to your organisation.") raise UnprocessableContentException(message="This user does not belong to your organisation.")
org_model.root_user_rel = user_model org_model.root_user_rel = user_model
db.flush() db.flush()
response = OrgPatchRootResponse(name=org_model.name, root_user_email=org_model.root_user_email) response = OrgPatchRootResponse(name=org_model.name, root_user_email=org_model.root_user_email)

26
test/test_auth_general.py Normal file
View file

@ -0,0 +1,26 @@
"""
"""
import pytest
from httpx import AsyncClient
from .conftest import no_su_client
from src.organisation.models import Organisation as Org
from src.user.models import User
from src.iam.models import Group
@pytest.mark.anyio
async def test_get_org_auth_root_su(default_client: AsyncClient, db_session):
# If a super admin can access a resource when not the root user
db_session.add(User(email="admin@test.org", first_name="Admin", last_name="Test", oidc_id="abcd-efgh-ijkl-4321"))
db_session.flush()
db_session.add(
Org(name="Test Org Two", root_user_id=2, billing_contact_id=1, owner_contact_id=2, security_contact_id=3,
status="approved", intake_questionnaire={}))
db_session.flush()
resp = await default_client.get("/org?org_id=2")
assert resp.status_code != 422
assert resp.status_code == 200
assert resp.json()["name"] == "Test Org Two"

View file

@ -5,10 +5,11 @@ import pytest
from httpx import AsyncClient from httpx import AsyncClient
from src.user.models import User from src.user.models import User
from .conftest import default_client, db_session from src.organisation.models import Organisation as Org
from src.iam.models import Group from src.iam.models import Group
from .conftest import default_client, db_session
@pytest.mark.anyio @pytest.mark.anyio
async def test_post_act_on_resource_endpoint_success(default_client: AsyncClient): async def test_post_act_on_resource_endpoint_success(default_client: AsyncClient):
@ -28,7 +29,48 @@ async def test_post_act_on_resource_endpoint_success(default_client: AsyncClient
assert data == True assert data == True
@pytest.mark.parametrize(
"service, api_key",
[
("Test Service", "not_the_correct_key"),
("Test Service Two", "123456789")
],
)
@pytest.mark.anyio @pytest.mark.anyio
async def test_act_on_resource_wrong_key(default_client: AsyncClient, db_session, service: str, api_key: str):
body = {
"service": service,
"organisation": "Test Org",
"resource": "test_resource"
}
headers = {
"Authorization": "Bearer not_checked_when_auth_is_disabled",
"X-API-Key": api_key
}
resp = await default_client.post("/iam/can_act_on_resource?action=read", json=body, headers=headers)
data = resp.json()
assert resp.status_code == 401
assert data["detail"] == "Invalid API key"
@pytest.mark.anyio
async def test_act_on_resource_missing_key(default_client: AsyncClient):
body = {
"service": "Test Service",
"organisation": "Test Org",
"resource": "test_resource"
}
headers = {
"Authorization": "Bearer not_checked_when_auth_is_disabled"
}
resp = await default_client.post("/iam/can_act_on_resource?action=read", json=body, headers=headers)
data = resp.json()
assert resp.status_code == 401
assert data["detail"] == "Missing API key"
@pytest.mark.parametrize( @pytest.mark.parametrize(
"service, org, resource, action, expected_status", "service, org, resource, action, expected_status",
[ [
@ -41,7 +83,7 @@ async def test_post_act_on_resource_endpoint_success(default_client: AsyncClient
], ],
) )
@pytest.mark.anyio @pytest.mark.anyio
async def test_post_act_on_resource_endpoint_failure(default_client: AsyncClient, service, org, resource, action, async def test_act_on_resource_endpoint_failure(default_client: AsyncClient, service, org, resource, action,
expected_status: int): expected_status: int):
body = { body = {
"service": service, "service": service,
@ -57,6 +99,34 @@ async def test_post_act_on_resource_endpoint_failure(default_client: AsyncClient
assert resp.status_code == expected_status assert resp.status_code == expected_status
@pytest.mark.parametrize(
"service, org, resource, action, expected_response",
[
("Test Service", "Test Org", "test_resource", "read", True),
("Test Service", "Test Org", "test_resource", "create", False),
("Test Service", "Test Org", "no_access_here", "read", False),
("Test Service", "Test Org Two", "test_resource", "read", False),
],
)
@pytest.mark.anyio
async def test_act_on_resource_logic(default_client: AsyncClient, db_session, service, org, resource, action,
expected_response: bool):
body = {
"service": service,
"organisation": org,
"resource": resource
}
headers = {
"Authorization": "Bearer not_checked_when_auth_is_disabled",
"X-API-Key": "123456789"
}
resp = await default_client.post(f"/iam/can_act_on_resource?action={action}", json=body, headers=headers)
data = resp.json()
assert resp.status_code == 200
assert data == expected_response
@pytest.mark.anyio @pytest.mark.anyio
async def test_get_group_permissions_success(default_client: AsyncClient): async def test_get_group_permissions_success(default_client: AsyncClient):
resp = await default_client.get("/iam/group/permissions?org_id=1&group_id=1") resp = await default_client.get("/iam/group/permissions?org_id=1&group_id=1")
@ -73,14 +143,16 @@ async def test_get_group_permissions_success(default_client: AsyncClient):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"query, expected_status", "query, expected_status",
[ [
("org_id=2&group_id=1", 404), # Non-exists org, valid group ("org_id=42&group_id=1", 404), # Non-exists org, valid group
("org_id=banana&group_id=1", 422), # Invalid org, valid group ("org_id=banana&group_id=1", 422), # Invalid org, valid group
("org_id=&group_id=1", 422), # Blank org, valid group ("org_id=&group_id=1", 422), # Blank org, valid group
("org_id=-1&group_id=1", 422), # Negative org, valid group ("org_id=-1&group_id=1", 422), # Negative org, valid group
("group_id=1", 422), # Only group ("group_id=1", 422), # Only group
("org_id=1&group_id=2", 404), # Group/Org mismatch
("org_id=2&group_id=1", 404), # Group/Org mismatch
("", 422), # Blank query ("", 422), # Blank query
("org_id=&group_id=", 422), # Both blank ("org_id=&group_id=", 422), # Both blank
("org_id=1&group_id=2", 404), # Valid org, non-exists group ("org_id=1&group_id=42", 404), # Valid org, non-exists group
("org_id=1&group_id=banana", 422), # Valid org, invalid group ("org_id=1&group_id=banana", 422), # Valid org, invalid group
("org_id=1&group_id=", 422), # Valid org, blank group ("org_id=1&group_id=", 422), # Valid org, blank group
("org_id=1&group_id=-1", 422), # Valid org, negative group ("org_id=1&group_id=-1", 422), # Valid org, negative group
@ -88,12 +160,30 @@ async def test_get_group_permissions_success(default_client: AsyncClient):
], ],
) )
@pytest.mark.anyio @pytest.mark.anyio
async def test_get_group_permissions_failure(default_client: AsyncClient, query: str, expected_status: int): async def test_get_group_permissions_failure(default_client: AsyncClient, db_session, query: str, expected_status: int):
resp = await default_client.get(f"/iam/group/permissions?{query}") resp = await default_client.get(f"/iam/group/permissions?{query}")
assert resp.status_code == expected_status assert resp.status_code == expected_status
@pytest.mark.parametrize(
"query",
[
"org_id=1&group_id=2",
"org_id=2&group_id=1",
],
)
@pytest.mark.anyio
async def test_get_group_permissions_mismatch(default_client: AsyncClient, db_session, query: str):
db_session.add(Org(name="Another Test Org", root_user_id=1, billing_contact_id=1, owner_contact_id=2, security_contact_id=3, status="approved"))
db_session.add(Group(name="Another Test Group", org_id=2))
db_session.flush()
resp = await default_client.get(f"/iam/group/permissions?{query}")
assert resp.status_code == 401
assert resp.json()["detail"] == "Group does not belong to this organization"
@pytest.mark.anyio @pytest.mark.anyio
async def test_get_group_users_success(default_client: AsyncClient): async def test_get_group_users_success(default_client: AsyncClient):
resp = await default_client.get("/iam/group/users?org_id=1&group_id=1") resp = await default_client.get("/iam/group/users?org_id=1&group_id=1")
@ -132,6 +222,24 @@ async def test_get_group_users_failure(default_client: AsyncClient, query: str,
assert resp.status_code == expected_status assert resp.status_code == expected_status
@pytest.mark.parametrize(
"query",
[
"org_id=1&group_id=2",
"org_id=2&group_id=1",
],
)
@pytest.mark.anyio
async def test_get_group_users_mismatch(default_client: AsyncClient, db_session, query: str):
db_session.add(Org(name="Another Test Org", root_user_id=1, billing_contact_id=1, owner_contact_id=2, security_contact_id=3, status="approved"))
db_session.add(Group(name="Another Test Group", org_id=2))
db_session.flush()
resp = await default_client.get(f"/iam/group/users?{query}")
assert resp.status_code == 401
assert resp.json()["detail"] == "Group does not belong to this organization"
@pytest.mark.anyio @pytest.mark.anyio
async def test_post_group_success(default_client: AsyncClient): async def test_post_group_success(default_client: AsyncClient):
resp = await default_client.post("/iam/group", json={"name": "New Group", "organisation_id": 1}) resp = await default_client.post("/iam/group", json={"name": "New Group", "organisation_id": 1})
@ -214,6 +322,8 @@ async def test_put_group_perm_success(default_client: AsyncClient, db_session):
({"group_id": 1, "permission_id": 1}, 422), # Missing organisation ({"group_id": 1, "permission_id": 1}, 422), # Missing organisation
({"organisation_id": 1, "group_id": 1}, 422), # Missing permission ({"organisation_id": 1, "group_id": 1}, 422), # Missing permission
({"organisation_id": 1, "group_id": 1, "permission_id": 1}, 409), # Permission already in group
], ],
) )
@pytest.mark.anyio @pytest.mark.anyio
@ -223,6 +333,24 @@ async def test_put_group_perm_failure(default_client: AsyncClient, body: dict[st
assert resp.status_code == expected_status assert resp.status_code == expected_status
@pytest.mark.parametrize(
"body",
[
{"organisation_id": 1, "group_id": 2, "permission_id": 1},
{"organisation_id": 2, "group_id": 1, "permission_id": 1},
],
)
@pytest.mark.anyio
async def test_put_group_perm_mismatch(default_client: AsyncClient, db_session, body: dict):
db_session.add(Org(name="Another Test Org", root_user_id=1, billing_contact_id=1, owner_contact_id=2, security_contact_id=3, status="approved"))
db_session.add(Group(name="Another Test Group", org_id=2))
db_session.flush()
resp = await default_client.put(f"/iam/group/permission", json=body)
assert resp.status_code == 401
assert resp.json()["detail"] == "Group does not belong to this organization"
@pytest.mark.anyio @pytest.mark.anyio
async def test_put_group_user_success(default_client: AsyncClient, db_session): async def test_put_group_user_success(default_client: AsyncClient, db_session):
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234")) db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))

View file

@ -201,10 +201,11 @@ async def test_post_org_user_success(default_client: AsyncClient, db_session):
({"organisation_id": 1, "user_id": "id"}, 422), ({"organisation_id": 1, "user_id": "id"}, 422),
({"user_id": 2}, 422), ({"user_id": 2}, 422),
({"organisation_id": 1, "user_id": 42}, 404), ({"organisation_id": 1, "user_id": 42}, 404),
({"organisation_id": 1, "user_id": 1}, 409),
], ],
) )
@pytest.mark.anyio @pytest.mark.anyio
async def test_post_org_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int, db_session): async def test_post_org_user_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int, db_session):
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234")) db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
db_session.flush() db_session.flush()
@ -252,6 +253,18 @@ async def test_patch_root_user_failure(default_client: AsyncClient, body: dict[s
assert resp.status_code == expected_status assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_patch_org_root_user_non_member(default_client: AsyncClient, db_session):
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
db_session.flush()
resp = await default_client.patch("/org/root_user", json={"organisation_id": 1, "user_id": 2})
data = resp.json()
assert resp.status_code == 422
assert data["detail"] == "This user does not belong to your organisation."
@pytest.mark.anyio @pytest.mark.anyio
async def test_get_org_groups_success(default_client: AsyncClient): async def test_get_org_groups_success(default_client: AsyncClient):
resp = await default_client.get("/org/groups?org_id=1") resp = await default_client.get("/org/groups?org_id=1")
@ -367,6 +380,8 @@ async def test_patch_org_contact_success(default_client: AsyncClient, key: str,
"body, expected_status", "body, expected_status",
[ [
({"organisation_id": 42, "contact_type": "billing"}, 404), ({"organisation_id": 42, "contact_type": "billing"}, 404),
({"organisation_id": 1, "contact_type": "security"}, 200),
({"organisation_id": 1, "contact_type": "owner"}, 200),
({"organisation_id": "Test Org", "contact_type": "billing"}, 422), ({"organisation_id": "Test Org", "contact_type": "billing"}, 422),
({"organisation_id": "", "contact_type": "billing"}, 422), ({"organisation_id": "", "contact_type": "billing"}, 422),
({}, 422), ({}, 422),