Compare commits
3 commits
3460cd76a5
...
94cf6c5258
| Author | SHA1 | Date | |
|---|---|---|---|
| 94cf6c5258 | |||
| c670d8ff51 | |||
| 2ec9cca99c |
4 changed files with 216 additions and 33 deletions
|
|
@ -18,13 +18,6 @@ from src.organisation.models import Organisation as Org
|
|||
from src.auth.exceptions import UnauthorizedException
|
||||
|
||||
|
||||
def is_super_admin(user_model) -> bool:
|
||||
super_admin_emails = ["chris@sr2.uk"]
|
||||
if user_model.email not in super_admin_emails:
|
||||
raise UnauthorizedException(message="Must be super admin")
|
||||
return True
|
||||
|
||||
|
||||
async def org_query_user_claims(org_model: org_model_query_dependency, user_model: user_model_claims_dependency):
|
||||
if user_model in org_model.user_rel:
|
||||
return True
|
||||
|
|
@ -35,51 +28,51 @@ async def org_query_user_claims(org_model: org_model_query_dependency, user_mode
|
|||
org_query_user_claims_dependency = Annotated[bool, Depends(org_query_user_claims)]
|
||||
|
||||
|
||||
async def org_query_root_claims(user_model: user_model_claims_dependency, org_model: org_model_query_dependency):
|
||||
async def org_query_root_claims(user_model: user_model_claims_dependency, org_model: org_model_query_dependency, su_emails: su_list_dependency):
|
||||
if org_model.root_user_id == user_model.id:
|
||||
return org_model
|
||||
|
||||
if is_super_admin(user_model):
|
||||
return org_model
|
||||
try:
|
||||
if await user_model_super_admin(user_model, su_emails):
|
||||
return org_model
|
||||
except UnauthorizedException:
|
||||
pass
|
||||
|
||||
raise UnauthorizedException()
|
||||
raise UnauthorizedException(message="Must be the org's root user")
|
||||
|
||||
|
||||
org_model_root_claim_query_dependency = Annotated[type[Org], Depends(org_query_root_claims)]
|
||||
|
||||
|
||||
async def org_body_root_claims(user_model: user_model_claims_dependency, org_model: org_model_body_dependency):
|
||||
async def org_body_root_claims(user_model: user_model_claims_dependency, org_model: org_model_body_dependency, su_emails: su_list_dependency):
|
||||
if org_model.root_user_id == user_model.id:
|
||||
return org_model
|
||||
|
||||
if is_super_admin(user_model):
|
||||
return org_model
|
||||
try:
|
||||
if await user_model_super_admin(user_model, su_emails):
|
||||
return org_model
|
||||
except UnauthorizedException:
|
||||
pass
|
||||
|
||||
raise UnauthorizedException()
|
||||
raise UnauthorizedException(message="Must be the org's root user")
|
||||
|
||||
|
||||
org_model_root_claim_body_dependency = Annotated[type[Org], Depends(org_body_root_claims)]
|
||||
|
||||
|
||||
async def user_model_super_admin(user_model: user_model_claims_dependency):
|
||||
if is_super_admin(user_model):
|
||||
def get_super_admin_list():
|
||||
return []
|
||||
|
||||
def empty_su_list():
|
||||
return []
|
||||
|
||||
su_list_dependency = Annotated[list[User], Depends(get_super_admin_list)]
|
||||
|
||||
async def user_model_super_admin(user_model: user_model_claims_dependency, super_admin_emails: su_list_dependency):
|
||||
if user_model.email in super_admin_emails:
|
||||
return user_model
|
||||
|
||||
raise UnauthorizedException(message="Must be super admin")
|
||||
|
||||
|
||||
super_admin_dependency = Annotated[type[User], Depends(user_model_super_admin)]
|
||||
|
||||
|
||||
# Override for testing
|
||||
async def never_super_admin(user_model: user_model_claims_dependency):
|
||||
def _is_super_admin(_user_model) -> bool:
|
||||
super_admin_emails = []
|
||||
if _user_model.email not in super_admin_emails:
|
||||
raise UnauthorizedException(message="Must be super admin")
|
||||
return True
|
||||
|
||||
if _is_super_admin(user_model):
|
||||
return user_model
|
||||
|
||||
raise UnauthorizedException(message="Must be super admin")
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ from src.organisation.models import Organisation as Org
|
|||
from src.contact.models import Contact
|
||||
from src.iam.models import Group, Permission
|
||||
from src.auth.service import get_current_user, get_dev_user
|
||||
from src.auth.dependencies import user_model_super_admin, never_super_admin
|
||||
from src.auth.dependencies import empty_su_list, get_super_admin_list
|
||||
|
||||
from src.main import app # inited FastAPI app
|
||||
from src.database import engine, Base, get_db
|
||||
|
|
@ -45,13 +45,26 @@ async def default_client(db_session) -> AsyncGenerator[AsyncClient, None]:
|
|||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def no_user_client(db_session) -> AsyncGenerator[AsyncClient, None]:
|
||||
def get_db_override():
|
||||
return db_session
|
||||
app.dependency_overrides[get_db] = get_db_override
|
||||
transport = ASGITransport(app=app)
|
||||
async with AsyncClient(transport=transport, base_url="http://localhost:8000/api/v1") as ac:
|
||||
yield ac
|
||||
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def no_su_client(db_session) -> AsyncGenerator[AsyncClient, None]:
|
||||
def get_db_override():
|
||||
return db_session
|
||||
app.dependency_overrides[get_db] = get_db_override
|
||||
app.dependency_overrides[get_current_user] = get_dev_user
|
||||
app.dependency_overrides[user_model_super_admin] = never_super_admin
|
||||
app.dependency_overrides[get_super_admin_list] = empty_su_list
|
||||
transport = ASGITransport(app=app)
|
||||
async with AsyncClient(transport=transport, base_url="http://localhost:8000/api/v1") as ac:
|
||||
yield ac
|
||||
|
|
|
|||
154
test/test_auth_root.py
Normal file
154
test/test_auth_root.py
Normal file
|
|
@ -0,0 +1,154 @@
|
|||
"""
|
||||
This module ensures root user only endpoints do return a correctly formatted 401 when user is not the root user for the org
|
||||
DELETE endpoints are not tested
|
||||
"""
|
||||
import pytest
|
||||
from httpx import AsyncClient
|
||||
|
||||
from .conftest import no_su_client
|
||||
|
||||
from src.organisation.models import Organisation as Org
|
||||
from src.user.models import User
|
||||
from src.iam.models import Group
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def add_second_org(db_session):
|
||||
db_session.add(User(email="admin@test.org", first_name="Admin", last_name="Test", oidc_id="abcd-efgh-ijkl-4321"))
|
||||
db_session.flush()
|
||||
db_session.add(Org(name="Test Org Two", root_user_id=2, billing_contact_id=1, owner_contact_id=2, security_contact_id=3,
|
||||
status="approved", intake_questionnaire={}))
|
||||
db_session.flush()
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_org_auth_root(no_su_client: AsyncClient):
|
||||
resp = await no_su_client.get("/org?org_id=2")
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert "Must be the org's root user" in resp.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_patch_org_questionnaire_auth_root(no_su_client: AsyncClient):
|
||||
resp = await no_su_client.patch("/org/questionnaire", json={"organisation_id": 2,
|
||||
"intake_questionnaire": {"question_one": "new answer one",
|
||||
"question_two": None,
|
||||
"question_three": None},
|
||||
"partial": True})
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert "Must be the org's root user" in resp.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_org_users_auth_root(no_su_client: AsyncClient):
|
||||
resp = await no_su_client.get("/org/users?org_id=2")
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert "Must be the org's root user" in resp.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_post_org_user_auth_root(no_su_client: AsyncClient, db_session):
|
||||
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
||||
db_session.flush()
|
||||
|
||||
resp = await no_su_client.post("/org/user", json={"organisation_id": 2, "user_id": 2})
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert "Must be the org's root user" in resp.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_org_groups_auth_root(no_su_client: AsyncClient):
|
||||
resp = await no_su_client.get("/org/groups?org_id=2")
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert "Must be the org's root user" in resp.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_org_contact_auth_root(no_su_client: AsyncClient):
|
||||
resp = await no_su_client.get(f"/org/contact?org_id=2&contact_type=billing")
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert "Must be the org's root user" in resp.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_patch_org_contact_auth_root(no_su_client: AsyncClient):
|
||||
resp = await no_su_client.patch("/org/contact",
|
||||
json={"organisation_id": 2, "contact_type": "billing", "email": "user@example.com"})
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert "Must be the org's root user" in resp.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_service_auth_root(no_su_client: AsyncClient):
|
||||
resp = await no_su_client.get("/service/?org_id=2")
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert "Must be the org's root user" in resp.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_iam_group_permissions_auth_root(no_su_client: AsyncClient):
|
||||
resp = await no_su_client.get("/iam/group/permissions?org_id=2&group_id=1")
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert "Must be the org's root user" in resp.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_iam_group_users_auth_root(no_su_client: AsyncClient):
|
||||
resp = await no_su_client.get("/iam/group/users?org_id=2&group_id=1")
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert "Must be the org's root user" in resp.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_post_iam_group_auth_root(no_su_client: AsyncClient):
|
||||
resp = await no_su_client.post("/iam/group", json={"name": "New Group", "organisation_id": 2})
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert "Must be the org's root user" in resp.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_put_iam_group_permission_auth_root(no_su_client: AsyncClient, db_session):
|
||||
db_session.add(Group(name="Test Group Two", org_id=2))
|
||||
db_session.flush()
|
||||
resp = await no_su_client.put("/iam/group/permission", json={"permission_id": 1, "group_id": 2, "organisation_id": 2})
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert "Must be the org's root user" in resp.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_put_iam_group_user_auth_root(no_su_client: AsyncClient, db_session):
|
||||
db_session.add(User(email="user@test.org", first_name="User", last_name="Test", oidc_id="abcd-efgh-ijkl-1234"))
|
||||
db_session.flush()
|
||||
|
||||
resp = await no_su_client.put("/iam/group/user", json={"user_id": 2, "group_id": 1, "organisation_id": 2})
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert "Must be the org's root user" in resp.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_iam_permissions_auth_root(no_su_client: AsyncClient):
|
||||
resp = await no_su_client.get("/iam/permissions?org_id=2")
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert "Must be the org's root user" in resp.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_post_iam_permissions_search_auth_root(no_su_client: AsyncClient):
|
||||
resp = await no_su_client.post("/iam/permissions/search", json={"organisation_id": 2, "action": "read"})
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert "Must be the org's root user" in resp.json()["detail"]
|
||||
23
test/test_auth_user.py
Normal file
23
test/test_auth_user.py
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
"""
|
||||
This testing module removes the testing user override to verify that endpoints with only the user requirement return a 401 error when not logged in
|
||||
"""
|
||||
import pytest
|
||||
from httpx import AsyncClient
|
||||
|
||||
from .conftest import no_user_client
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_self_db(no_user_client: AsyncClient):
|
||||
resp = await no_user_client.get("/user/self/db")
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert resp.json()["detail"] == "Not authenticated"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_post_org_success(no_user_client: AsyncClient):
|
||||
resp = await no_user_client.post("/org", json={"name": "New Test Org"})
|
||||
assert resp.status_code != 422
|
||||
assert resp.status_code == 401
|
||||
assert resp.json()["detail"] == "Not authenticated"
|
||||
Loading…
Add table
Add a link
Reference in a new issue