Compare commits
6 commits
af680dbc38
...
3460cd76a5
| Author | SHA1 | Date | |
|---|---|---|---|
| 3460cd76a5 | |||
| 1b54918bb4 | |||
| f44c5b6f36 | |||
| 9d9ca0b907 | |||
| a907506ec1 | |||
| 905fcf8814 |
10 changed files with 418 additions and 147 deletions
3
.gitignore
vendored
3
.gitignore
vendored
|
|
@ -205,3 +205,6 @@ cython_debug/
|
||||||
marimo/_static/
|
marimo/_static/
|
||||||
marimo/_lsp/
|
marimo/_lsp/
|
||||||
__marimo__/
|
__marimo__/
|
||||||
|
|
||||||
|
|
||||||
|
endpoints.txt
|
||||||
|
|
@ -21,7 +21,7 @@ from src.auth.exceptions import UnauthorizedException
|
||||||
def is_super_admin(user_model) -> bool:
|
def is_super_admin(user_model) -> bool:
|
||||||
super_admin_emails = ["chris@sr2.uk"]
|
super_admin_emails = ["chris@sr2.uk"]
|
||||||
if user_model.email not in super_admin_emails:
|
if user_model.email not in super_admin_emails:
|
||||||
raise UnauthorizedException()
|
raise UnauthorizedException(message="Must be super admin")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -65,7 +65,21 @@ async def user_model_super_admin(user_model: user_model_claims_dependency):
|
||||||
if is_super_admin(user_model):
|
if is_super_admin(user_model):
|
||||||
return user_model
|
return user_model
|
||||||
|
|
||||||
raise UnauthorizedException()
|
raise UnauthorizedException(message="Must be super admin")
|
||||||
|
|
||||||
|
|
||||||
super_admin_dependency = Annotated[type[User], Depends(user_model_super_admin)]
|
super_admin_dependency = Annotated[type[User], Depends(user_model_super_admin)]
|
||||||
|
|
||||||
|
|
||||||
|
# Override for testing
|
||||||
|
async def never_super_admin(user_model: user_model_claims_dependency):
|
||||||
|
def _is_super_admin(_user_model) -> bool:
|
||||||
|
super_admin_emails = []
|
||||||
|
if _user_model.email not in super_admin_emails:
|
||||||
|
raise UnauthorizedException(message="Must be super admin")
|
||||||
|
return True
|
||||||
|
|
||||||
|
if _is_super_admin(user_model):
|
||||||
|
return user_model
|
||||||
|
|
||||||
|
raise UnauthorizedException(message="Must be super admin")
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ from src.organisation.models import Organisation as Org
|
||||||
from src.contact.models import Contact
|
from src.contact.models import Contact
|
||||||
from src.iam.models import Group, Permission
|
from src.iam.models import Group, Permission
|
||||||
from src.auth.service import get_current_user, get_dev_user
|
from src.auth.service import get_current_user, get_dev_user
|
||||||
|
from src.auth.dependencies import user_model_super_admin, never_super_admin
|
||||||
|
|
||||||
from src.main import app # inited FastAPI app
|
from src.main import app # inited FastAPI app
|
||||||
from src.database import engine, Base, get_db
|
from src.database import engine, Base, get_db
|
||||||
|
|
@ -32,7 +33,7 @@ def db_session():
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
async def client(db_session) -> AsyncGenerator[AsyncClient, None]:
|
async def default_client(db_session) -> AsyncGenerator[AsyncClient, None]:
|
||||||
def get_db_override():
|
def get_db_override():
|
||||||
return db_session
|
return db_session
|
||||||
app.dependency_overrides[get_db] = get_db_override
|
app.dependency_overrides[get_db] = get_db_override
|
||||||
|
|
@ -44,6 +45,20 @@ async def client(db_session) -> AsyncGenerator[AsyncClient, None]:
|
||||||
app.dependency_overrides.clear()
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def no_su_client(db_session) -> AsyncGenerator[AsyncClient, None]:
|
||||||
|
def get_db_override():
|
||||||
|
return db_session
|
||||||
|
app.dependency_overrides[get_db] = get_db_override
|
||||||
|
app.dependency_overrides[get_current_user] = get_dev_user
|
||||||
|
app.dependency_overrides[user_model_super_admin] = never_super_admin
|
||||||
|
transport = ASGITransport(app=app)
|
||||||
|
async with AsyncClient(transport=transport, base_url="http://localhost:8000/api/v1") as ac:
|
||||||
|
yield ac
|
||||||
|
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
def _seed(db):
|
def _seed(db):
|
||||||
db.add(User(email="admin@test.com", first_name="Admin", last_name="Test", oidc_id="abcd-efgh-ijkl-mnop"))
|
db.add(User(email="admin@test.com", first_name="Admin", last_name="Test", oidc_id="abcd-efgh-ijkl-mnop"))
|
||||||
db.add(Contact(org_id=1, email="billing@test.org", phonenumber="07521539927"))
|
db.add(Contact(org_id=1, email="billing@test.org", phonenumber="07521539927"))
|
||||||
|
|
@ -66,3 +81,26 @@ def _seed(db):
|
||||||
db.flush()
|
db.flush()
|
||||||
group_model.user_rel.append(user_model)
|
group_model.user_rel.append(user_model)
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
|
# # Produces a text file with method and path for every endpoint in the API
|
||||||
|
# from fastapi.routing import APIRoute
|
||||||
|
#
|
||||||
|
# def get_testable_routes():
|
||||||
|
# routes = []
|
||||||
|
#
|
||||||
|
# for route in app.routes:
|
||||||
|
# if not isinstance(route, APIRoute):
|
||||||
|
# continue
|
||||||
|
#
|
||||||
|
# for method in route.methods:
|
||||||
|
# if method in {"HEAD", "OPTIONS"}:
|
||||||
|
# continue
|
||||||
|
#
|
||||||
|
# routes.append((route.path, method))
|
||||||
|
#
|
||||||
|
# return routes
|
||||||
|
#
|
||||||
|
#
|
||||||
|
# with open("endpoints.txt", "w") as f:
|
||||||
|
# for ep in get_testable_routes():
|
||||||
|
# f.write(f"{ep[1]} {ep[0]}\n")
|
||||||
|
|
|
||||||
157
test/test_auth_approval.py
Normal file
157
test/test_auth_approval.py
Normal file
|
|
@ -0,0 +1,157 @@
|
||||||
|
"""
|
||||||
|
This test module checks relevant endpoints to ensure only approved orgs get access or, for pre-approval endpoints, that they are not blocked.
|
||||||
|
Endpoints not checked here are endpoints that do not require an org check.
|
||||||
|
Delete endpoints are currently skipped because the testing system cannot use bodies in deletes.
|
||||||
|
"""
|
||||||
|
import pytest
|
||||||
|
from httpx import AsyncClient
|
||||||
|
|
||||||
|
from .conftest import default_client
|
||||||
|
|
||||||
|
from src.organisation.models import Organisation as Org, OrgUsers
|
||||||
|
from src.user.models import User
|
||||||
|
from src.iam.models import Group
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def set_org_partial(db_session):
|
||||||
|
org_model = db_session.get(Org, 1)
|
||||||
|
org_model.status = "partial"
|
||||||
|
db_session.flush()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_org_auth_approval(default_client: AsyncClient):
|
||||||
|
resp = await default_client.get("/org?org_id=1")
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_patch_org_questionnaire_auth_approval(default_client: AsyncClient):
|
||||||
|
resp = await default_client.patch("/org/questionnaire", json={"organisation_id": 1,
|
||||||
|
"intake_questionnaire": {"question_one": "new answer one",
|
||||||
|
"question_two": None,
|
||||||
|
"question_three": None},
|
||||||
|
"partial": True})
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_patch_org_status_auth_approval(default_client: AsyncClient):
|
||||||
|
resp = await default_client.patch("/org/status", json={"organisation_id": 1, "status": "submitted"})
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_org_users_auth_approval(default_client: AsyncClient):
|
||||||
|
resp = await default_client.get("/org/users?org_id=1")
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert "has not been approved." in resp.json()["detail"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_post_org_user_auth_approval(default_client: AsyncClient, db_session):
|
||||||
|
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
||||||
|
db_session.flush()
|
||||||
|
|
||||||
|
resp = await default_client.post("/org/user", json={"organisation_id": 1, "user_id": 2})
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert "has not been approved." in resp.json()["detail"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_patch_org_root_user_auth_approval(default_client: AsyncClient, db_session):
|
||||||
|
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
||||||
|
db_session.flush()
|
||||||
|
db_session.add(OrgUsers(org_id=1, user_id=2))
|
||||||
|
db_session.flush()
|
||||||
|
|
||||||
|
resp = await default_client.patch("/org/root_user", json={"organisation_id": 1, "user_id": 2})
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert "has not been approved." in resp.json()["detail"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_org_groups_auth_approval(default_client: AsyncClient):
|
||||||
|
resp = await default_client.get("/org/groups?org_id=1")
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert "has not been approved." in resp.json()["detail"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_org_contact_auth_approval(default_client: AsyncClient):
|
||||||
|
resp = await default_client.get(f"/org/contact?org_id=1&contact_type=billing")
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_patch_org_contact_auth_approval(default_client: AsyncClient):
|
||||||
|
resp = await default_client.patch("/org/contact",
|
||||||
|
json={"organisation_id": 1, "contact_type": "billing", "email": "user@example.com"})
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_service_auth_approval(default_client: AsyncClient):
|
||||||
|
resp = await default_client.get("/service/?org_id=1")
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert "has not been approved." in resp.json()["detail"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_iam_group_permissions_auth_approval(default_client: AsyncClient):
|
||||||
|
resp = await default_client.get("/iam/group/permissions?org_id=1&group_id=1")
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert "has not been approved." in resp.json()["detail"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_iam_group_users_auth_approval(default_client: AsyncClient):
|
||||||
|
resp = await default_client.get("/iam/group/users?org_id=1&group_id=1")
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert "has not been approved." in resp.json()["detail"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_post_iam_group_auth_approval(default_client: AsyncClient):
|
||||||
|
resp = await default_client.post("/iam/group", json={"name": "New Group", "organisation_id": 1})
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert "has not been approved." in resp.json()["detail"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_put_iam_group_permission_auth_approval(default_client: AsyncClient, db_session):
|
||||||
|
db_session.add(Group(name="Test Group Two", org_id=1))
|
||||||
|
db_session.flush()
|
||||||
|
resp = await default_client.put("/iam/group/permission", json={"permission_id": 1, "group_id": 2, "organisation_id": 1})
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert "has not been approved." in resp.json()["detail"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_put_iam_group_user_auth_approval(default_client: AsyncClient, db_session):
|
||||||
|
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
||||||
|
db_session.flush()
|
||||||
|
|
||||||
|
resp = await default_client.put("/iam/group/user", json={"user_id": 2, "group_id": 1, "organisation_id": 1})
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert "has not been approved." in resp.json()["detail"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_iam_permissions_auth_approval(default_client: AsyncClient):
|
||||||
|
resp = await default_client.get("/iam/permissions?org_id=1")
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert "has not been approved." in resp.json()["detail"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_post_iam_permissions_search_auth_approval(default_client: AsyncClient):
|
||||||
|
resp = await default_client.post("/iam/permissions/search", json={"organisation_id": 1, "action": "read"})
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert "has not been approved." in resp.json()["detail"]
|
||||||
62
test/test_auth_su.py
Normal file
62
test/test_auth_su.py
Normal file
|
|
@ -0,0 +1,62 @@
|
||||||
|
"""
|
||||||
|
This module ensures super admin only endpoints do return a correctly formatted 401 when user is not a super admin
|
||||||
|
DELETE endpoints are not tested
|
||||||
|
"""
|
||||||
|
import pytest
|
||||||
|
from httpx import AsyncClient
|
||||||
|
|
||||||
|
from src.organisation.models import OrgUsers
|
||||||
|
from src.user.models import User
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_user_auth_su(no_su_client: AsyncClient):
|
||||||
|
resp = await no_su_client.get("/user/?user_id=1")
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert resp.status_code == 401
|
||||||
|
assert resp.json()["detail"] == "Must be super admin"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_patch_org_status_auth_su(no_su_client: AsyncClient):
|
||||||
|
resp = await no_su_client.patch("/org/status", json={"organisation_id": 1, "status": "submitted"})
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert resp.status_code == 401
|
||||||
|
assert resp.json()["detail"] == "Must be super admin"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_patch_org_root_user_auth_su(no_su_client: AsyncClient, db_session):
|
||||||
|
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
||||||
|
db_session.flush()
|
||||||
|
db_session.add(OrgUsers(org_id=1, user_id=2))
|
||||||
|
db_session.flush()
|
||||||
|
|
||||||
|
resp = await no_su_client.patch("/org/root_user", json={"organisation_id": 1, "user_id": 2})
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert resp.status_code == 401
|
||||||
|
assert resp.json()["detail"] == "Must be super admin"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_patch_service_key_auth_su(no_su_client: AsyncClient):
|
||||||
|
resp = await no_su_client.patch("/service/key", json={"service_id": 1})
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert resp.status_code == 401
|
||||||
|
assert resp.json()["detail"] == "Must be super admin"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_post_service_auth_su(no_su_client: AsyncClient):
|
||||||
|
resp = await no_su_client.post("/service/", json={"name": "New Test Service"})
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert resp.status_code == 401
|
||||||
|
assert resp.json()["detail"] == "Must be super admin"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_post_perm_success(no_su_client: AsyncClient, db_session):
|
||||||
|
resp = await no_su_client.post("/iam/permission", json={"service_id": 1, "resource": "test_resource", "action": "create"})
|
||||||
|
assert resp.status_code != 422
|
||||||
|
assert resp.status_code == 401
|
||||||
|
assert resp.json()["detail"] == "Must be super admin"
|
||||||
|
|
@ -1,11 +1,11 @@
|
||||||
import pytest
|
import pytest
|
||||||
from httpx import AsyncClient
|
from httpx import AsyncClient
|
||||||
|
|
||||||
from .conftest import client
|
from .conftest import default_client
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_healthcheck(client: AsyncClient):
|
async def test_healthcheck(default_client: AsyncClient):
|
||||||
resp = await client.get("/healthcheck")
|
resp = await default_client.get("/healthcheck")
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
assert resp.json() == {"status": "ok"}
|
assert resp.json() == {"status": "ok"}
|
||||||
|
|
|
||||||
|
|
@ -5,13 +5,13 @@ import pytest
|
||||||
from httpx import AsyncClient
|
from httpx import AsyncClient
|
||||||
|
|
||||||
from src.user.models import User
|
from src.user.models import User
|
||||||
from .conftest import client, db_session
|
from .conftest import default_client, db_session
|
||||||
|
|
||||||
from src.iam.models import Group
|
from src.iam.models import Group
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_post_act_on_resource_endpoint_success(client: AsyncClient):
|
async def test_post_act_on_resource_endpoint_success(default_client: AsyncClient):
|
||||||
body = {
|
body = {
|
||||||
"service": "Test Service",
|
"service": "Test Service",
|
||||||
"organisation": "Test Org",
|
"organisation": "Test Org",
|
||||||
|
|
@ -21,7 +21,7 @@ async def test_post_act_on_resource_endpoint_success(client: AsyncClient):
|
||||||
"Authorization": "Bearer not_checked_when_auth_is_disabled",
|
"Authorization": "Bearer not_checked_when_auth_is_disabled",
|
||||||
"X-API-Key": "123456789"
|
"X-API-Key": "123456789"
|
||||||
}
|
}
|
||||||
resp = await client.post("/iam/can_act_on_resource?action=read", json=body, headers=headers)
|
resp = await default_client.post("/iam/can_act_on_resource?action=read", json=body, headers=headers)
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -41,7 +41,7 @@ async def test_post_act_on_resource_endpoint_success(client: AsyncClient):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_post_act_on_resource_endpoint_failure(client: AsyncClient, service, org, resource, action,
|
async def test_post_act_on_resource_endpoint_failure(default_client: AsyncClient, service, org, resource, action,
|
||||||
expected_status: int):
|
expected_status: int):
|
||||||
body = {
|
body = {
|
||||||
"service": service,
|
"service": service,
|
||||||
|
|
@ -52,14 +52,14 @@ async def test_post_act_on_resource_endpoint_failure(client: AsyncClient, servic
|
||||||
"Authorization": "Bearer not_checked_when_auth_is_disabled",
|
"Authorization": "Bearer not_checked_when_auth_is_disabled",
|
||||||
"X-API-Key": "123456789"
|
"X-API-Key": "123456789"
|
||||||
}
|
}
|
||||||
resp = await client.post(f"/iam/can_act_on_resource?action={action}", json=body, headers=headers)
|
resp = await default_client.post(f"/iam/can_act_on_resource?action={action}", json=body, headers=headers)
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_group_permissions_success(client: AsyncClient):
|
async def test_get_group_permissions_success(default_client: AsyncClient):
|
||||||
resp = await client.get("/iam/group/permissions?org_id=1&group_id=1")
|
resp = await default_client.get("/iam/group/permissions?org_id=1&group_id=1")
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -88,15 +88,15 @@ async def test_get_group_permissions_success(client: AsyncClient):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_group_permissions_failure(client: AsyncClient, query: str, expected_status: int):
|
async def test_get_group_permissions_failure(default_client: AsyncClient, query: str, expected_status: int):
|
||||||
resp = await client.get(f"/iam/group/permissions?{query}")
|
resp = await default_client.get(f"/iam/group/permissions?{query}")
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_group_users_success(client: AsyncClient):
|
async def test_get_group_users_success(default_client: AsyncClient):
|
||||||
resp = await client.get("/iam/group/users?org_id=1&group_id=1")
|
resp = await default_client.get("/iam/group/users?org_id=1&group_id=1")
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -126,15 +126,15 @@ async def test_get_group_users_success(client: AsyncClient):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_group_users_failure(client: AsyncClient, query: str, expected_status: int):
|
async def test_get_group_users_failure(default_client: AsyncClient, query: str, expected_status: int):
|
||||||
resp = await client.get(f"/iam/group/users?{query}")
|
resp = await default_client.get(f"/iam/group/users?{query}")
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_post_group_success(client: AsyncClient):
|
async def test_post_group_success(default_client: AsyncClient):
|
||||||
resp = await client.post("/iam/group", json={"name": "New Group", "organisation_id": 1})
|
resp = await default_client.post("/iam/group", json={"name": "New Group", "organisation_id": 1})
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -161,17 +161,17 @@ async def test_post_group_success(client: AsyncClient):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_post_group_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
|
async def test_post_group_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int):
|
||||||
resp = await client.post("/iam/group", json=body)
|
resp = await default_client.post("/iam/group", json=body)
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_put_group_perm_success(client: AsyncClient, db_session):
|
async def test_put_group_perm_success(default_client: AsyncClient, db_session):
|
||||||
db_session.add(Group(name="Test Group Two", org_id=1))
|
db_session.add(Group(name="Test Group Two", org_id=1))
|
||||||
db_session.flush()
|
db_session.flush()
|
||||||
resp = await client.put("/iam/group/permission", json={"permission_id": 1, "group_id": 2, "organisation_id": 1})
|
resp = await default_client.put("/iam/group/permission", json={"permission_id": 1, "group_id": 2, "organisation_id": 1})
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -217,18 +217,18 @@ async def test_put_group_perm_success(client: AsyncClient, db_session):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_put_group_perm_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
|
async def test_put_group_perm_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int):
|
||||||
resp = await client.put("/iam/group/permission", json=body)
|
resp = await default_client.put("/iam/group/permission", json=body)
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_put_group_user_success(client: AsyncClient, db_session):
|
async def test_put_group_user_success(default_client: AsyncClient, db_session):
|
||||||
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
||||||
db_session.flush()
|
db_session.flush()
|
||||||
|
|
||||||
resp = await client.put("/iam/group/user", json={"user_id": 2, "group_id": 1, "organisation_id": 1})
|
resp = await default_client.put("/iam/group/user", json={"user_id": 2, "group_id": 1, "organisation_id": 1})
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -275,15 +275,15 @@ async def test_put_group_user_success(client: AsyncClient, db_session):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_put_group_user_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
|
async def test_put_group_user_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int):
|
||||||
resp = await client.put("/iam/group/user", json=body)
|
resp = await default_client.put("/iam/group/user", json=body)
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_permissions_success(client: AsyncClient):
|
async def test_get_permissions_success(default_client: AsyncClient):
|
||||||
resp = await client.get("/iam/permissions?org_id=1")
|
resp = await default_client.get("/iam/permissions?org_id=1")
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -305,15 +305,15 @@ async def test_get_permissions_success(client: AsyncClient):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_permissions_failure(client: AsyncClient, query: str, expected_status: int):
|
async def test_get_permissions_failure(default_client: AsyncClient, query: str, expected_status: int):
|
||||||
resp = await client.get(f"/iam/permissions?{query}")
|
resp = await default_client.get(f"/iam/permissions?{query}")
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_post_perm_success(client: AsyncClient, db_session):
|
async def test_post_perm_success(default_client: AsyncClient, db_session):
|
||||||
resp = await client.post("/iam/permission", json={"service_id": 1, "resource": "test_resource", "action": "create"})
|
resp = await default_client.post("/iam/permission", json={"service_id": 1, "resource": "test_resource", "action": "create"})
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -351,8 +351,8 @@ async def test_post_perm_success(client: AsyncClient, db_session):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_post_perm_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
|
async def test_post_perm_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int):
|
||||||
resp = await client.post("/iam/permission", json=body)
|
resp = await default_client.post("/iam/permission", json=body)
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
@ -371,8 +371,8 @@ async def test_post_perm_failure(client: AsyncClient, body: dict[str, str], expe
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_post_perm_search_success(client: AsyncClient, db_session, body):
|
async def test_post_perm_search_success(default_client: AsyncClient, db_session, body):
|
||||||
resp = await client.post("/iam/permissions/search", json=body)
|
resp = await default_client.post("/iam/permissions/search", json=body)
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -408,7 +408,7 @@ async def test_post_perm_search_success(client: AsyncClient, db_session, body):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_post_perm_search_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
|
async def test_post_perm_search_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int):
|
||||||
resp = await client.post("/iam/permissions/search", json=body)
|
resp = await default_client.post("/iam/permissions/search", json=body)
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
|
||||||
|
|
@ -6,12 +6,12 @@ from httpx import AsyncClient
|
||||||
|
|
||||||
from src.organisation.models import Organisation, OrgUsers
|
from src.organisation.models import Organisation, OrgUsers
|
||||||
from src.user.models import User
|
from src.user.models import User
|
||||||
from .conftest import client
|
from .conftest import default_client
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_org_success(client: AsyncClient):
|
async def test_get_org_success(default_client: AsyncClient):
|
||||||
resp = await client.get("/org?org_id=1")
|
resp = await default_client.get("/org?org_id=1")
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -32,15 +32,15 @@ async def test_get_org_success(client: AsyncClient):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_org_failure(client: AsyncClient, query: str, expected_status: int):
|
async def test_get_org_failure(default_client: AsyncClient, query: str, expected_status: int):
|
||||||
resp = await client.get(f"/org?{query}")
|
resp = await default_client.get(f"/org?{query}")
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_post_org_success(client: AsyncClient):
|
async def test_post_org_success(default_client: AsyncClient):
|
||||||
resp = await client.post("/org", json={"name": "New Test Org"})
|
resp = await default_client.post("/org", json={"name": "New Test Org"})
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 201
|
assert resp.status_code == 201
|
||||||
|
|
@ -57,18 +57,18 @@ async def test_post_org_success(client: AsyncClient):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_post_org_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
|
async def test_post_org_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int):
|
||||||
resp = await client.post("/org", json=body)
|
resp = await default_client.post("/org", json=body)
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_patch_org_questionnaire_partial_success(client: AsyncClient, db_session):
|
async def test_patch_org_questionnaire_partial_success(default_client: AsyncClient, db_session):
|
||||||
org_model = db_session.get(Organisation, 1)
|
org_model = db_session.get(Organisation, 1)
|
||||||
org_model.status = "partial"
|
org_model.status = "partial"
|
||||||
db_session.flush()
|
db_session.flush()
|
||||||
resp = await client.patch("/org/questionnaire", json={"organisation_id": 1, "intake_questionnaire": {"question_one": "new answer one", "question_two": None, "question_three": None}, "partial": True})
|
resp = await default_client.patch("/org/questionnaire", json={"organisation_id": 1, "intake_questionnaire": {"question_one": "new answer one", "question_two": None, "question_three": None}, "partial": True})
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -93,18 +93,18 @@ async def test_patch_org_questionnaire_partial_success(client: AsyncClient, db_s
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_patch_questionnaire_partial_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
|
async def test_patch_questionnaire_partial_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int):
|
||||||
resp = await client.patch("/org/questionnaire", json=body)
|
resp = await default_client.patch("/org/questionnaire", json=body)
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_patch_org_questionnaire_submit_success(client: AsyncClient, db_session):
|
async def test_patch_org_questionnaire_submit_success(default_client: AsyncClient, db_session):
|
||||||
org_model = db_session.get(Organisation, 1)
|
org_model = db_session.get(Organisation, 1)
|
||||||
org_model.status = "partial"
|
org_model.status = "partial"
|
||||||
db_session.flush()
|
db_session.flush()
|
||||||
resp = await client.patch("/org/questionnaire", json={"organisation_id": 1, "intake_questionnaire": {"question_one": "new answer one", "question_two": None, "question_three": None}, "partial": False})
|
resp = await default_client.patch("/org/questionnaire", json={"organisation_id": 1, "intake_questionnaire": {"question_one": "new answer one", "question_two": None, "question_three": None}, "partial": False})
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -121,8 +121,8 @@ async def test_patch_org_questionnaire_submit_success(client: AsyncClient, db_se
|
||||||
["partial", "submitted", "remediation", "approved", "rejected", "removed"]
|
["partial", "submitted", "remediation", "approved", "rejected", "removed"]
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_patch_org_status_success(client: AsyncClient, status: str):
|
async def test_patch_org_status_success(default_client: AsyncClient, status: str):
|
||||||
resp = await client.patch("/org/status", json={"organisation_id": 1, "status": status})
|
resp = await default_client.patch("/org/status", json={"organisation_id": 1, "status": status})
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -142,15 +142,15 @@ async def test_patch_org_status_success(client: AsyncClient, status: str):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_patch_org_status_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
|
async def test_patch_org_status_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int):
|
||||||
resp = await client.patch("/org/status", json=body)
|
resp = await default_client.patch("/org/status", json=body)
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_org_users_success(client: AsyncClient):
|
async def test_get_org_users_success(default_client: AsyncClient):
|
||||||
resp = await client.get("/org/users?org_id=1")
|
resp = await default_client.get("/org/users?org_id=1")
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -173,18 +173,18 @@ async def test_get_org_users_success(client: AsyncClient):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_org_users_failure(client: AsyncClient, query: str, expected_status: int):
|
async def test_get_org_users_failure(default_client: AsyncClient, query: str, expected_status: int):
|
||||||
resp = await client.get(f"/org/users?{query}")
|
resp = await default_client.get(f"/org/users?{query}")
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_post_org_user_success(client: AsyncClient, db_session):
|
async def test_post_org_user_success(default_client: AsyncClient, db_session):
|
||||||
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
||||||
db_session.flush()
|
db_session.flush()
|
||||||
|
|
||||||
resp = await client.post("/org/user", json={"organisation_id": 1, "user_id": 2})
|
resp = await default_client.post("/org/user", json={"organisation_id": 1, "user_id": 2})
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -204,23 +204,23 @@ async def test_post_org_user_success(client: AsyncClient, db_session):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_post_org_failure(client: AsyncClient, body: dict[str, str], expected_status: int, db_session):
|
async def test_post_org_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int, db_session):
|
||||||
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
||||||
db_session.flush()
|
db_session.flush()
|
||||||
|
|
||||||
resp = await client.post("/org/user", json=body)
|
resp = await default_client.post("/org/user", json=body)
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_patch_org_root_user_success(client: AsyncClient, db_session):
|
async def test_patch_org_root_user_success(default_client: AsyncClient, db_session):
|
||||||
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
||||||
db_session.flush()
|
db_session.flush()
|
||||||
db_session.add(OrgUsers(org_id=1, user_id=2))
|
db_session.add(OrgUsers(org_id=1, user_id=2))
|
||||||
db_session.flush()
|
db_session.flush()
|
||||||
|
|
||||||
resp = await client.patch("/org/root_user", json={"organisation_id": 1, "user_id": 2})
|
resp = await default_client.patch("/org/root_user", json={"organisation_id": 1, "user_id": 2})
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -241,20 +241,20 @@ async def test_patch_org_root_user_success(client: AsyncClient, db_session):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_patch_root_user_failure(client: AsyncClient, body: dict[str, str], expected_status: int, db_session):
|
async def test_patch_root_user_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int, db_session):
|
||||||
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
||||||
db_session.flush()
|
db_session.flush()
|
||||||
db_session.add(OrgUsers(org_id=1, user_id=2))
|
db_session.add(OrgUsers(org_id=1, user_id=2))
|
||||||
db_session.flush()
|
db_session.flush()
|
||||||
|
|
||||||
resp = await client.patch("/org/root_user", json=body)
|
resp = await default_client.patch("/org/root_user", json=body)
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_org_groups_success(client: AsyncClient):
|
async def test_get_org_groups_success(default_client: AsyncClient):
|
||||||
resp = await client.get("/org/groups?org_id=1")
|
resp = await default_client.get("/org/groups?org_id=1")
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -272,8 +272,8 @@ async def test_get_org_groups_success(client: AsyncClient):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_org_groups_failure(client: AsyncClient, query: str, expected_status: int):
|
async def test_get_org_groups_failure(default_client: AsyncClient, query: str, expected_status: int):
|
||||||
resp = await client.get(f"/org/groups?{query}")
|
resp = await default_client.get(f"/org/groups?{query}")
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
@ -283,8 +283,8 @@ async def test_get_org_groups_failure(client: AsyncClient, query: str, expected_
|
||||||
["billing", "security", "owner"]
|
["billing", "security", "owner"]
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_org_contact_success(client: AsyncClient, contact_type: str):
|
async def test_get_org_contact_success(default_client: AsyncClient, contact_type: str):
|
||||||
resp = await client.get(f"/org/contact?org_id=1&contact_type={contact_type}")
|
resp = await default_client.get(f"/org/contact?org_id=1&contact_type={contact_type}")
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -326,8 +326,8 @@ async def test_get_org_contact_success(client: AsyncClient, contact_type: str):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_org_contact_failure(client: AsyncClient, query: str, expected_status: int):
|
async def test_get_org_contact_failure(default_client: AsyncClient, query: str, expected_status: int):
|
||||||
resp = await client.get(f"/org/contact?{query}")
|
resp = await default_client.get(f"/org/contact?{query}")
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
@ -350,8 +350,8 @@ async def test_get_org_contact_failure(client: AsyncClient, query: str, expected
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_patch_org_contact_success(client: AsyncClient, key: str, value: str):
|
async def test_patch_org_contact_success(default_client: AsyncClient, key: str, value: str):
|
||||||
resp = await client.patch("/org/contact", json={"organisation_id": 1, "contact_type": "billing", key: value})
|
resp = await default_client.patch("/org/contact", json={"organisation_id": 1, "contact_type": "billing", key: value})
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -376,7 +376,7 @@ async def test_patch_org_contact_success(client: AsyncClient, key: str, value: s
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_patch_org_status_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
|
async def test_patch_org_status_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int):
|
||||||
resp = await client.patch("/org/contact", json=body)
|
resp = await default_client.patch("/org/contact", json=body)
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
|
||||||
|
|
@ -4,12 +4,12 @@
|
||||||
import pytest
|
import pytest
|
||||||
from httpx import AsyncClient
|
from httpx import AsyncClient
|
||||||
|
|
||||||
from .conftest import client
|
from .conftest import default_client
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_services_success(client: AsyncClient):
|
async def test_get_services_success(default_client: AsyncClient):
|
||||||
resp = await client.get("/service/?org_id=1")
|
resp = await default_client.get("/service/?org_id=1")
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -18,7 +18,6 @@ async def test_get_services_success(client: AsyncClient):
|
||||||
assert data["services"][0]["name"] == "Test Service"
|
assert data["services"][0]["name"] == "Test Service"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"query, expected_status",
|
"query, expected_status",
|
||||||
[
|
[
|
||||||
|
|
@ -28,15 +27,15 @@ async def test_get_services_success(client: AsyncClient):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_services_failure(client: AsyncClient, query: str, expected_status: int):
|
async def test_get_services_failure(default_client: AsyncClient, query: str, expected_status: int):
|
||||||
resp = await client.get(f"/service/?{query}")
|
resp = await default_client.get(f"/service/?{query}")
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_post_service_success(client: AsyncClient):
|
async def test_post_service_success(default_client: AsyncClient):
|
||||||
resp = await client.post("/service/", json={"name": "New Test Service"})
|
resp = await default_client.post("/service/", json={"name": "New Test Service"})
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -46,7 +45,6 @@ async def test_post_service_success(client: AsyncClient):
|
||||||
assert type(data["service"]["api_key"]) == str
|
assert type(data["service"]["api_key"]) == str
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"body, expected_status",
|
"body, expected_status",
|
||||||
[
|
[
|
||||||
|
|
@ -55,15 +53,15 @@ async def test_post_service_success(client: AsyncClient):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_post_services_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
|
async def test_post_services_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int):
|
||||||
resp = await client.post("/service/", json=body)
|
resp = await default_client.post("/service/", json=body)
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_patch_service_success(client: AsyncClient):
|
async def test_patch_service_success(default_client: AsyncClient):
|
||||||
resp = await client.patch("/service/key", json={"service_id": 1})
|
resp = await default_client.patch("/service/key", json={"service_id": 1})
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -73,7 +71,6 @@ async def test_patch_service_success(client: AsyncClient):
|
||||||
assert type(data["service"]["api_key"]) == str
|
assert type(data["service"]["api_key"]) == str
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"body, expected_status",
|
"body, expected_status",
|
||||||
[
|
[
|
||||||
|
|
@ -84,7 +81,7 @@ async def test_patch_service_success(client: AsyncClient):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_patch_services_failure(client: AsyncClient, body: dict[str, str], expected_status: int):
|
async def test_patch_services_failure(default_client: AsyncClient, body: dict[str, str], expected_status: int):
|
||||||
resp = await client.patch("/service/key", json=body)
|
resp = await default_client.patch("/service/key", json=body)
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
|
||||||
|
|
@ -6,11 +6,11 @@
|
||||||
import pytest
|
import pytest
|
||||||
from httpx import AsyncClient
|
from httpx import AsyncClient
|
||||||
|
|
||||||
from .conftest import client
|
from .conftest import default_client
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_self_db(client: AsyncClient):
|
async def test_get_self_db(default_client: AsyncClient):
|
||||||
resp = await client.get("/user/self/db")
|
resp = await default_client.get("/user/self/db")
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -22,8 +22,8 @@ async def test_get_self_db(client: AsyncClient):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_get_user_success(client: AsyncClient):
|
async def test_get_user_success(default_client: AsyncClient):
|
||||||
resp = await client.get("/user/?user_id=1")
|
resp = await default_client.get("/user/?user_id=1")
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
@ -44,7 +44,7 @@ async def test_get_user_success(client: AsyncClient):
|
||||||
("", 422),
|
("", 422),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_get_user_fail(client: AsyncClient, query: str, expected_status: int):
|
async def test_get_user_fail(default_client: AsyncClient, query: str, expected_status: int):
|
||||||
resp = await client.get(f"/user/?{query}")
|
resp = await default_client.get(f"/user/?{query}")
|
||||||
|
|
||||||
assert resp.status_code == expected_status
|
assert resp.status_code == expected_status
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue