1
0
Fork 0
forked from sr2/cloud-api
cloud-api/test/test_iam.py

837 lines
24 KiB
Python

""" """
from datetime import timedelta, datetime, timezone
from typing import Any
import pytest
from httpx import AsyncClient
from src.utils import generate_jwt
from .conftest import generate_query_and_status, generate_body_and_status, standard_test
pytestmark = [
pytest.mark.iam_module,
]
@pytest.mark.anyio
async def test_post_act_on_resource_endpoint_success(default_client: AsyncClient):
body = {
"rn": {
"service": "Test Service",
"organisation_id": 1,
"resource": "test_resource",
"instance": None,
},
"action": "read",
}
headers = {
"Authorization": "Bearer not_checked_when_auth_is_disabled",
"X-API-Key": "123456789",
}
resp = await default_client.post("/iam/can_act_on_resource", json=body, headers=headers)
data = resp.json()
assert resp.status_code == 200
assert data["allowed"] is True
print(data)
@pytest.mark.parametrize(
"service, api_key",
[("Test Service", "not_the_correct_key"), ("Test Service Two", "123456789")],
)
@pytest.mark.anyio
async def test_act_on_resource_wrong_key(
default_client: AsyncClient, service: str, api_key: str
):
body = {
"rn": {
"service": service,
"organisation": "Test Org",
"resource": "test_resource",
},
"action": "read",
}
headers = {
"Authorization": "Bearer not_checked_when_auth_is_disabled",
"X-API-Key": api_key,
}
resp = await default_client.post("/iam/can_act_on_resource", json=body, headers=headers)
data = resp.json()
assert resp.status_code == 401
assert data["detail"] == "Invalid API key"
@pytest.mark.anyio
async def test_act_on_resource_missing_key(default_client: AsyncClient):
body = {
"rn": {
"service": "Test Service",
"organisation": "Test Org",
"resource": "test_resource",
},
"action": "read",
}
headers = {"Authorization": "Bearer not_checked_when_auth_is_disabled"}
resp = await default_client.post(
"/iam/can_act_on_resource?action=read", json=body, headers=headers
)
data = resp.json()
assert resp.status_code == 401
assert data["detail"] == "Missing API key"
@pytest.mark.parametrize(
"service, org, resource, action, expected_status",
[
(None, "Test Org", "test_resource", "read", 422),
(42, "Test Org", "test_resource", "read", 422),
("Test Service", None, "test_resource", "read", 422),
("Test Service", 42, "test_resource", "read", 422),
("Test Service", "Test Org", None, "read", 422),
("Test Service", "Test Org", 42, "read", 422),
("Test Service", "Test Org", "test_resource", None, 422),
("Test Service", "Test Org", "test_resource", 42, 422),
],
)
@pytest.mark.anyio
async def test_act_on_resource_endpoint_status_checks(
default_client: AsyncClient, service, org, resource, action, expected_status: int
):
body = {
"rn": {"service": service, "organisation": org, "resource": resource},
"action": action,
}
headers = {
"Authorization": "Bearer not_checked_when_auth_is_disabled",
"X-API-Key": "123456789",
}
resp = await default_client.post("/iam/can_act_on_resource", json=body, headers=headers)
assert resp.status_code == expected_status
@pytest.mark.parametrize(
"service, org, resource, action, expected_response",
[
("Test Service", 1, "test_resource", "read", True),
("Test Service", 1, "test_resource", "create", False),
("Test Service", 1, "no_access_here", "read", False),
("Test Service", 2, "test_resource", "read", False),
],
)
@pytest.mark.anyio
async def test_act_on_resource_logic(
default_client: AsyncClient,
service,
org,
resource,
action,
expected_response: bool,
):
body = {
"rn": {"service": service, "organisation_id": org, "resource": resource},
"action": action,
}
headers = {
"Authorization": "Bearer not_checked_when_auth_is_disabled",
"X-API-Key": "123456789",
}
resp = await default_client.post("/iam/can_act_on_resource", json=body, headers=headers)
data = resp.json()
assert resp.status_code == 200
assert data["allowed"] == expected_response
@pytest.mark.parametrize(
"query, expected_status", generate_query_and_status(["group_id", "org_id"])
)
@pytest.mark.anyio
async def test_get_group_permissions_status_checks(
default_client: AsyncClient, query: str, expected_status: int
):
resp = await default_client.get(f"/iam/group/permissions?{query}")
assert resp.status_code == expected_status
@pytest.mark.parametrize(
"query",
[
"org_id=1&group_id=2",
"org_id=2&group_id=1",
],
)
@pytest.mark.anyio
async def test_get_group_permissions_mismatch(default_client: AsyncClient, query: str):
resp = await default_client.get(f"/iam/group/permissions?{query}")
assert resp.status_code == 403
assert resp.json()["detail"] == "Group does not belong to this organization"
@pytest.mark.parametrize(
"query, expected_status", generate_query_and_status(["group_id", "org_id"])
)
@pytest.mark.anyio
async def test_get_group_users_status_checks(
default_client: AsyncClient, query: str, expected_status: int
):
resp = await default_client.get(f"/iam/group/users?{query}")
assert resp.status_code == expected_status
@pytest.mark.parametrize(
"query",
[
"org_id=1&group_id=2",
"org_id=2&group_id=1",
],
)
@pytest.mark.anyio
async def test_get_group_users_mismatch(default_client: AsyncClient, query: str):
resp = await default_client.get(f"/iam/group/users?{query}")
assert resp.status_code == 403
assert resp.json()["detail"] == "Group does not belong to this organization"
@pytest.mark.parametrize(
"body, expected_status",
generate_body_and_status({"organisation_id": "int", "name": "str"}),
)
@pytest.mark.anyio
async def test_post_group_status_checks(
default_client: AsyncClient, body: dict[str, str], expected_status: int
):
resp = await default_client.post("/iam/group", json=body)
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_post_group_conflict(default_client: AsyncClient):
resp = await default_client.post(
"/iam/group", json={"organisation_id": 1, "name": "Org One Group"}
)
assert resp.status_code == 409
@pytest.mark.anyio
async def test_post_group_non_conflict(default_client: AsyncClient):
resp = await default_client.post(
"/iam/group", json={"organisation_id": 2, "name": "Org One Group"}
)
assert resp.status_code == 201
@pytest.mark.parametrize(
"body, expected_status",
generate_body_and_status(
{"organisation_id": "int", "group_id": "int", "permission_id": "int"}
),
)
@pytest.mark.anyio
async def test_put_group_perm_status_checks(
default_client: AsyncClient, body: dict[str, str], expected_status: int
):
resp = await default_client.put("/iam/group/permission", json=body)
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_put_group_perm_conflict(default_client: AsyncClient):
resp = await default_client.put(
"/iam/group/permission",
json={"organisation_id": 1, "group_id": 1, "permission_id": 1},
)
assert resp.status_code == 409
@pytest.mark.parametrize(
"body",
[
{"organisation_id": 1, "group_id": 2, "permission_id": 1},
{"organisation_id": 2, "group_id": 1, "permission_id": 1},
],
)
@pytest.mark.anyio
async def test_put_group_perm_mismatch(default_client: AsyncClient, body: dict):
resp = await default_client.put("/iam/group/permission", json=body)
assert resp.status_code == 403
assert resp.json()["detail"] == "Group does not belong to this organization"
@pytest.mark.parametrize(
"body, expected_status",
generate_body_and_status(
{"organisation_id": "int", "group_id": "int", "user_id": "int"}
),
)
@pytest.mark.anyio
async def test_put_group_user_status_checks(
default_client: AsyncClient, body: dict[str, str], expected_status: int
):
resp = await default_client.put("/iam/group/user", json=body)
assert resp.status_code == expected_status
@pytest.mark.parametrize("query, expected_status", generate_query_and_status(["org_id"]))
@pytest.mark.anyio
async def test_get_permissions_status_checks(
default_client: AsyncClient, query: str, expected_status: int
):
resp = await default_client.get(f"/iam/permissions?{query}")
assert resp.status_code == expected_status
@pytest.mark.parametrize(
"body, expected_status",
[
(
{"service_id": 1, "resource": "test_resource", "action": "read"},
409,
),
# service_id tests
(
{"service_id": 42, "resource": "test_resource", "action": "read"},
404,
), # Non-existent service
(
{"service_id": "banana", "resource": "test_resource", "action": "read"},
422,
), # Invalid service ID
(
{"service_id": "", "resource": "test_resource", "action": "read"},
422,
), # Blank service ID
(
{"service_id": -1, "resource": "test_resource", "action": "read"},
422,
), # Negative service ID
# resource tests
(
{"service_id": 1, "resource": 42, "action": "read"},
422,
), # Invalid resource type
# action tests
(
{"service_id": 1, "resource": "test_resource", "action": 42},
422,
), # Invalid action type
# missing/partial body tests
({}, 422), # Blank body
({"resource": "test_resource"}, 422), # Only resource
({"action": "read"}, 422), # Only action
({"service_id": 1}, 422), # Only service
({"service_id": 1, "action": "read"}, 422), # Missing resource
({"service_id": 1, "resource": "test_resource"}, 422), # Missing action
({"resource": "test_resource", "action": "read"}, 422), # Missing service
],
)
@pytest.mark.anyio
async def test_post_perm_status_checks(
default_client: AsyncClient, body: dict[str, str], expected_status: int
):
resp = await default_client.post("/iam/permission", json=body)
assert resp.status_code == expected_status
@pytest.mark.parametrize(
"body",
[
{
"organisation_id": 1,
"service_id": 1,
"resource": "test_resource",
"action": "read",
},
{"organisation_id": 1, "service_id": 1},
{"organisation_id": 1, "resource": "test_resource"},
{"organisation_id": 1, "action": "read"},
{"organisation_id": 1, "service_id": 1, "action": "read"},
{"organisation_id": 1, "service_id": 1, "resource": "test_resource"},
{"organisation_id": 1, "resource": "test_resource", "action": "read"},
],
)
@pytest.mark.anyio
async def test_post_perm_search_success(default_client: AsyncClient, body):
resp = await default_client.post("/iam/permissions/search", json=body)
data = resp.json()
assert resp.status_code == 200
assert "permissions" in data
assert isinstance(data["permissions"], list)
permissions_filtered = [
permission for permission in data["permissions"] if permission["id"] == 1
]
assert len(permissions_filtered) == 1
permission = permissions_filtered[0]
assert permission["id"] == 1
assert permission["service_name"] == "Test Service"
assert permission["resource"] == "test_resource"
assert permission["action"] == "read"
@pytest.mark.parametrize(
"body, expected_status",
[
# organisation_id tests
(
{
"organisation_id": 42,
"service_id": 1,
"resource": "test_resource",
"action": "read",
},
404,
), # Non-existent organisation
(
{
"organisation_id": "banana",
"service_id": 1,
"resource": "test_resource",
"action": "read",
},
422,
), # Invalid organisation ID
(
{
"organisation_id": "",
"service_id": 1,
"resource": "test_resource",
"action": "read",
},
422,
), # Blank organisation ID
(
{
"organisation_id": -1,
"service_id": 1,
"resource": "test_resource",
"action": "read",
},
422,
), # Negative organisation ID
# service_id tests
(
{
"organisation_id": 1,
"service_id": "banana",
"resource": "test_resource",
"action": "read",
},
422,
), # Invalid service ID
(
{
"organisation_id": 1,
"service_id": "",
"resource": "test_resource",
"action": "read",
},
422,
), # Blank service ID
(
{
"organisation_id": 1,
"service_id": -1,
"resource": "test_resource",
"action": "read",
},
422,
), # Negative service ID
# resource tests
(
{"organisation_id": 1, "service_id": 1, "resource": 42, "action": "read"},
422,
), # Invalid resource type
# action tests
(
{
"organisation_id": 1,
"service_id": 1,
"resource": "test_resource",
"action": 42,
},
422,
), # Invalid action type
# missing/partial body tests
({}, 422), # Blank body
],
)
@pytest.mark.anyio
async def test_post_perm_search_status_checks(
default_client: AsyncClient, body: dict[str, str], expected_status: int
):
resp = await default_client.post("/iam/permissions/search", json=body)
assert resp.status_code == expected_status
@pytest.mark.parametrize(
"query, expected_status",
generate_query_and_status(["group_id", "org_id", "perm_id"]),
)
@pytest.mark.anyio
async def test_delete_group_permissions_status_checks(
default_client: AsyncClient, query: str, expected_status: int
):
resp = await default_client.delete(f"/iam/group/permission?{query}")
assert resp.status_code == expected_status
@pytest.mark.anyio
async def test_delete_group_permissions_not_in_group(default_client: AsyncClient):
resp = await default_client.delete(
"/iam/group/permission?org_id=1&group_id=1&perm_id=2"
)
assert resp.status_code == 422
@pytest.mark.parametrize(
"body, expected_status",
[
({"organisation_id": 42, "user_email": "admin@test.com", "group_id": 1}, 404),
(
{
"organisation_id": "Test Org",
"user_email": "admin@test.com",
"group_id": 1,
},
422,
),
({"organisation_id": "", "user_email": "admin@test.com", "group_id": 1}, 422),
({}, 422),
({"user_email": 42, "group_id": 1}, 422),
({"organisation_id": 1, "user_email": "Test User", "group_id": 1}, 422),
({"organisation_id": 1, "user_email": "admin@test.com", "group_id": 42}, 404),
({"organisation_id": "Test Org", "user_email": "admin@test.com"}, 422),
({"organisation_id": "", "user_email": "admin@test.com"}, 422),
({"user_email": 42}, 422),
],
)
@pytest.mark.anyio
async def test_put_group_user_invitation_status_checks(
default_client: AsyncClient, body, expected_status
):
resp = await default_client.put("/iam/group/user/invitation", json=body)
assert resp.status_code == expected_status
@pytest.mark.parametrize(
"body, expected_status",
[
({"jwt": "invalid"}, 401),
({"jwt": ""}, 401),
({"jwt": None}, 422),
({"jwt": 42}, 422),
],
)
@pytest.mark.anyio
async def test_put_group_user_invitation_accept_status_checks(
default_client: AsyncClient, body, expected_status
):
resp = await default_client.put("/iam/group/user/invitation/accept", json=body)
assert resp.status_code == expected_status
if resp.status_code == 401:
assert resp.json()["detail"] == "Invalid JWS"
@pytest.mark.anyio
async def test_get_group_permissions_standard(
default_client: AsyncClient, route_data: dict[str, dict[str, Any]]
):
method = "GET"
path = "/iam/group/permissions"
auth_level = "Root User"
query = "?org_id=1&group_id=1"
body = {}
expected_data = {
"organisation": {"id": 1, "name": "Org One"},
"group": {"id": 1, "name": "Org One Group"},
"permissions": [
{
"id": 1,
"service_name": "Test Service",
"resource": "test_resource",
"action": "read",
}
],
}
await standard_test(
default_client, method, path, auth_level, query, body, expected_data, route_data
)
@pytest.mark.anyio
async def test_get_group_users_standard(
default_client: AsyncClient, route_data: dict[str, dict[str, Any]]
):
method = "GET"
path = "/iam/group/users"
auth_level = "Root User"
query = "?org_id=1&group_id=1"
body = {}
expected_data = {
"organisation": {"id": 1, "name": "Org One"},
"group": {"id": 1, "name": "Org One Group"},
"users": [{"id": 1, "email": "admin@test.com"}],
}
await standard_test(
default_client, method, path, auth_level, query, body, expected_data, route_data
)
@pytest.mark.anyio
async def test_post_group_standard(
default_client: AsyncClient, route_data: dict[str, dict[str, Any]]
):
method = "POST"
path = "/iam/group"
auth_level = "Root User"
query = ""
body = {"name": "New Group", "organisation_id": 1}
expected_data = {
"organisation": {"id": 1, "name": "Org One"},
"group": {"id": 4, "name": "New Group"},
}
await standard_test(
default_client, method, path, auth_level, query, body, expected_data, route_data
)
@pytest.mark.anyio
async def test_put_group_permission_standard(
default_client: AsyncClient, route_data: dict[str, dict[str, Any]]
):
method = "PUT"
path = "/iam/group/permission"
auth_level = "Root User"
query = ""
body = {"permission_id": 1, "group_id": 3, "organisation_id": 1}
expected_data = {
"organisation": {"id": 1, "name": "Org One"},
"group": {"id": 3, "name": "Org One Group Two"},
"permissions": [
{
"id": 1,
"service_name": "Test Service",
"resource": "test_resource",
"action": "read",
}
],
}
await standard_test(
default_client, method, path, auth_level, query, body, expected_data, route_data
)
@pytest.mark.anyio
async def test_put_group_user_standard(
default_client: AsyncClient, route_data: dict[str, dict[str, Any]]
):
method = "PUT"
path = "/iam/group/user"
auth_level = "Root User"
query = ""
body = {"user_id": 2, "group_id": 1, "organisation_id": 1}
expected_data = {
"organisation": {"id": 1, "name": "Org One"},
"group": {"id": 1, "name": "Org One Group"},
"users": [
{
"id": 1,
"first_name": "Admin",
"last_name": "Test",
"email": "admin@test.com",
},
{
"id": 2,
"first_name": "User",
"last_name": "Test",
"email": "user@orgone.com",
},
],
}
await standard_test(
default_client, method, path, auth_level, query, body, expected_data, route_data
)
@pytest.mark.anyio
async def test_get_permissions_standard(
default_client: AsyncClient, route_data: dict[str, dict[str, Any]]
):
method = "GET"
path = "/iam/permissions"
auth_level = "Root User"
query = "?org_id=1"
body = {}
expected_data = {
"permissions": [
{
"id": 1,
"service_name": "Test Service",
"resource": "test_resource",
"action": "read",
},
{
"id": 2,
"service_name": "Test Service",
"resource": "test_resource",
"action": "move",
},
]
}
await standard_test(
default_client, method, path, auth_level, query, body, expected_data, route_data
)
@pytest.mark.anyio
async def test_post_permission_standard(
default_client: AsyncClient, route_data: dict[str, dict[str, Any]]
):
method = "POST"
path = "/iam/permission"
auth_level = "Super Admin"
query = ""
body = {"service_id": 1, "resource": "test_resource", "action": "create"}
expected_data = {
"permission": {
"id": 4,
"service_name": "Test Service",
"resource": "test_resource",
"action": "create",
}
}
await standard_test(
default_client, method, path, auth_level, query, body, expected_data, route_data
)
@pytest.mark.anyio
async def test_delete_group_permission_standard(
default_client: AsyncClient, route_data: dict[str, dict[str, Any]]
):
method = "DELETE"
path = "/iam/group/permission"
auth_level = "Root User"
query = "?org_id=1&group_id=1&perm_id=1"
body = {}
expected_data = {"group": {"id": 1, "name": "Org One Group"}, "permissions": []}
await standard_test(
default_client, method, path, auth_level, query, body, expected_data, route_data
)
@pytest.mark.anyio
async def test_delete_permission_standard(
default_client: AsyncClient, route_data: dict[str, dict[str, Any]]
):
method = "DELETE"
path = "/iam/permission"
auth_level = "Super Admin"
query = "?perm_id=1"
body = {}
expected_data = {}
await standard_test(
default_client, method, path, auth_level, query, body, expected_data, route_data
)
@pytest.mark.anyio
async def test_delete_group_user_standard(
default_client: AsyncClient, route_data: dict[str, dict[str, Any]]
):
method = "DELETE"
path = "/iam/group/user"
auth_level = "Root User"
query = "?org_id=1&group_id=1&user_id=1"
body = {}
expected_data = {"group": {"id": 1, "name": "Org One Group"}, "users": []}
await standard_test(
default_client, method, path, auth_level, query, body, expected_data, route_data
)
@pytest.mark.anyio
async def test_put_group_user_invitation_standard(
default_client: AsyncClient, route_data: dict[str, dict[str, Any]]
):
method = "PUT"
path = "/iam/group/user/invitation"
auth_level = "Root User"
query = ""
body = {"user_email": "admin@test.com", "organisation_id": 1, "group_id": 1}
expected_data = {
"group": {"id": 1, "name": "Org One Group"},
"organisation": {"id": 1, "name": "Org One"},
"invited_email": "admin@test.com",
}
await standard_test(
default_client, method, path, auth_level, query, body, expected_data, route_data
)
@pytest.mark.anyio
async def test_put_group_user_invitation_accept_standard(
default_client: AsyncClient, route_data: dict[str, dict[str, Any]]
):
expiry_delta = timedelta(hours=24)
expiry = datetime.now(timezone.utc) + expiry_delta
claims = {
"email": "admin@test.com",
"org_id": 1,
"exp": expiry,
"type": "org_invite",
"group_id": 3,
"group_name": "Org One Group Two",
}
token = await generate_jwt(claims)
method = "PUT"
path = "/iam/group/user/invitation/accept"
auth_level = "User"
query = ""
body = {"jwt": token}
expected_data = {
"organisation": {"name": "Org One", "id": 1},
"user": {"email": "admin@test.com", "id": 1},
"group": {"details": {"id": 3, "name": "Org One Group Two"}, "permissions": []},
}
await standard_test(
default_client, method, path, auth_level, query, body, expected_data, route_data
)