From c2e035dede5d008f5174b9db731f3fefdc7ffc3f Mon Sep 17 00:00:00 2001 From: luxferre Date: Thu, 11 Jun 2026 14:58:05 +0100 Subject: [PATCH] feat: more accurate status codes 403 Forbidden replacing many 401 Unauthorized usages. --- src/auth/dependencies.py | 15 ++++++------- src/auth/exceptions.py | 13 ----------- src/auth/service.py | 2 +- src/exceptions.py | 9 ++++++++ src/iam/router.py | 16 +++++++------- src/iam/service.py | 2 +- src/organisation/router.py | 45 ++++++++++++++++++++++++-------------- src/utils.py | 5 ++--- test/test_auth_root.py | 28 ++++++++++++------------ test/test_auth_su.py | 14 ++++++------ test/test_iam.py | 6 ++--- 11 files changed, 81 insertions(+), 74 deletions(-) diff --git a/src/auth/dependencies.py b/src/auth/dependencies.py index e29b641..ddba545 100644 --- a/src/auth/dependencies.py +++ b/src/auth/dependencies.py @@ -11,6 +11,7 @@ Exports: from typing import Annotated from fastapi import Depends +from src.exceptions import ForbiddenException from src.user.dependencies import user_model_claims_dependency from src.user.models import User from src.organisation.dependencies import ( @@ -19,8 +20,6 @@ from src.organisation.dependencies import ( ) from src.organisation.models import Organisation as Org -from src.auth.exceptions import UnauthorizedException - async def org_query_user_claims( org_model: org_model_query_dependency, user_model: user_model_claims_dependency @@ -28,7 +27,7 @@ async def org_query_user_claims( if user_model in org_model.user_rel: return True - raise UnauthorizedException() + raise ForbiddenException() org_query_user_claims_dependency = Annotated[bool, Depends(org_query_user_claims)] @@ -45,10 +44,10 @@ async def org_query_root_claims( try: if await user_model_super_admin(user_model, su_emails): return org_model - except UnauthorizedException: + except ForbiddenException: pass - raise UnauthorizedException(message="Must be the org's root user") + raise ForbiddenException(message="Must be the org's root user") org_model_root_claim_query_dependency = Annotated[ @@ -67,10 +66,10 @@ async def org_body_root_claims( try: if await user_model_super_admin(user_model, su_emails): return org_model - except UnauthorizedException: + except ForbiddenException: pass - raise UnauthorizedException(message="Must be the org's root user") + raise ForbiddenException(message="Must be the org's root user") org_model_root_claim_body_dependency = Annotated[ @@ -99,7 +98,7 @@ async def user_model_super_admin( if user_model.email in super_admin_emails: return user_model - raise UnauthorizedException(message="Must be super admin") + raise ForbiddenException(message="Must be super admin") super_admin_dependency = Annotated[type[User], Depends(user_model_super_admin)] diff --git a/src/auth/exceptions.py b/src/auth/exceptions.py index f4a2cba..30ec2ed 100644 --- a/src/auth/exceptions.py +++ b/src/auth/exceptions.py @@ -4,16 +4,3 @@ Module specific exceptions for the auth module Exceptions: - UnauthorizedException: Takes an optional message string """ - -from typing import Optional - -from fastapi import HTTPException, status - - -class UnauthorizedException(HTTPException): - def __init__(self, message: Optional[str] = None) -> None: - detail = "Not authorized" if not message else message - super().__init__( - status_code=status.HTTP_401_UNAUTHORIZED, - detail=detail, - ) diff --git a/src/auth/service.py b/src/auth/service.py index b9f436b..25c2fa7 100644 --- a/src/auth/service.py +++ b/src/auth/service.py @@ -17,7 +17,7 @@ from urllib.request import urlopen from fastapi import Depends from fastapi.security import OpenIdConnect -from src.auth.exceptions import UnauthorizedException +from src.exceptions import UnauthorizedException from src.auth.config import auth_settings from src.user.service import add_user_to_db from src.database import db_dependency diff --git a/src/exceptions.py b/src/exceptions.py index 5a6ed3a..1f5fdcb 100644 --- a/src/exceptions.py +++ b/src/exceptions.py @@ -36,3 +36,12 @@ class ForbiddenException(HTTPException): status_code=status.HTTP_403_FORBIDDEN, detail=detail, ) + + +class UnauthorizedException(HTTPException): + def __init__(self, message: Optional[str] = None) -> None: + detail = "Not authorized" if not message else message + super().__init__( + status_code=status.HTTP_401_UNAUTHORIZED, + detail=detail, + ) diff --git a/src/iam/router.py b/src/iam/router.py index 810c2aa..b1b2dab 100644 --- a/src/iam/router.py +++ b/src/iam/router.py @@ -27,7 +27,6 @@ from src.schemas import GroupSummary, OrgSummary, ResourceName from src.service.exceptions import ServiceNotFoundException from src.exceptions import ConflictException, ForbiddenException from src.database import db_dependency -from src.auth.exceptions import UnauthorizedException from src.auth.service import claims_dependency from src.auth.dependencies import ( org_model_root_claim_query_dependency, @@ -171,7 +170,7 @@ async def get_group_permissions( Gets a list of permissions granted to the group. Also returns a summary for the org and group. """ if group_model.org_id != org_model.id: - raise UnauthorizedException("Group does not belong to this organization") + raise ForbiddenException("Group does not belong to this organization") return { "organisation": org_model, "group": group_model, @@ -198,7 +197,7 @@ async def get_group_users( Gets a list of users assigned to the group. Also returns a summary for the org and group. """ if group_model.org_id != org_model.id: - raise UnauthorizedException("Group does not belong to this organization") + raise ForbiddenException("Group does not belong to this organization") return { "organisation": org_model, "group": group_model, @@ -266,7 +265,7 @@ async def add_group_permission( Grants a permission to a group. Returns a list of the permissions in the group as well as a summary for the org and group. """ if group_model.org_id != org_model.id: - raise UnauthorizedException("Group does not belong to this organization") + raise ForbiddenException("Group does not belong to this organization") if perm_model in group_model.permission_rel: raise ConflictException("Group already has this permission") @@ -311,7 +310,7 @@ async def add_group_user( The user's email address must match the email on their OIDC profile. """ if group_model.org_id != org_model.id: - raise UnauthorizedException("Group does not belong to this organization") + raise ForbiddenException("Group does not belong to this organization") if user_model in group_model.user_rel: raise ConflictException("User already in group") @@ -351,7 +350,7 @@ async def remove_group_permissions( Removes a permission from the group. """ if group_model.org_id != org_model.id: - raise UnauthorizedException("Group does not belong to this organization") + raise ForbiddenException("Group does not belong to this organization") group_model.permission_rel.remove(perm_model) db.flush() @@ -370,8 +369,9 @@ async def remove_group_permissions( response_model=IAMDeleteGroupUserResponse, responses={ status.HTTP_401_UNAUTHORIZED: { - "description": "Group not in org | User not authenticated | User does not have permission" + "description": "User not authenticated | User does not have permission" }, + status.HTTP_403_FORBIDDEN: {"description": "Group not in org"}, }, ) async def remove_group_user( @@ -384,7 +384,7 @@ async def remove_group_user( Removes a user from the group. """ if group_model.org_id != org_model.id: - raise UnauthorizedException("Group does not belong to this organization") + raise ForbiddenException("Group does not belong to this organization") user_model.group_rel.remove(group_model) db.flush() diff --git a/src/iam/service.py b/src/iam/service.py index 6fe4d5f..e7d6699 100644 --- a/src/iam/service.py +++ b/src/iam/service.py @@ -11,7 +11,7 @@ from datetime import datetime, timedelta, timezone from src.iam.schemas import IAMCAoRRequest from src.service.models import Service from src.database import db_dependency -from src.auth.exceptions import UnauthorizedException +from src.exceptions import UnauthorizedException from src.utils import send_email, generate_jwt diff --git a/src/organisation/router.py b/src/organisation/router.py index 10f3521..99c10f6 100644 --- a/src/organisation/router.py +++ b/src/organisation/router.py @@ -23,9 +23,12 @@ from fastapi import APIRouter, status from fastapi.params import Query from sqlalchemy.exc import IntegrityError -from src.auth.exceptions import UnauthorizedException from src.contact.schemas import ContactModel -from src.exceptions import UnprocessableContentException, ConflictException +from src.exceptions import ( + UnprocessableContentException, + ConflictException, + ForbiddenException, +) from src.contact.models import Contact from src.contact.schemas import ContactAddress from src.contact.exceptions import ContactNotFoundException @@ -86,7 +89,7 @@ router = APIRouter( status.HTTP_422_UNPROCESSABLE_CONTENT: { "description": "Missing or invalid org_id query parameter" }, - status.HTTP_401_UNAUTHORIZED: { + status.HTTP_403_FORBIDDEN: { "description": "Not authorised. Must be org root user." }, }, @@ -204,7 +207,7 @@ async def create_org( status.HTTP_422_UNPROCESSABLE_CONTENT: { "description": "Invalid data in request." }, - status.HTTP_401_UNAUTHORIZED: { + status.HTTP_403_FORBIDDEN: { "description": "Not authorised. Must be org root user." }, }, @@ -259,7 +262,7 @@ async def update_questionnaire( status.HTTP_422_UNPROCESSABLE_CONTENT: { "description": "Invalid data in request." }, - status.HTTP_401_UNAUTHORIZED: { + status.HTTP_403_FORBIDDEN: { "description": "Not authorised. Must be super admin." }, }, @@ -290,7 +293,7 @@ async def update_status( status.HTTP_422_UNPROCESSABLE_CONTENT: { "description": "Org ID missing or invalid." }, - status.HTTP_401_UNAUTHORIZED: { + status.HTTP_403_FORBIDDEN: { "description": "Not authorised. Must be org root user." }, }, @@ -314,7 +317,7 @@ async def get_users(org_model: org_model_root_claim_query_dependency): status.HTTP_200_OK: { "description": "Successfully added user to the organisation." }, - status.HTTP_401_UNAUTHORIZED: { + status.HTTP_403_FORBIDDEN: { "description": "Not authorised. Must be org root user." }, status.HTTP_422_UNPROCESSABLE_CONTENT: { @@ -355,7 +358,7 @@ async def add_user_to_org( status.HTTP_204_NO_CONTENT: { "description": "Successfully deleted organisation." }, - status.HTTP_401_UNAUTHORIZED: { + status.HTTP_403_FORBIDDEN: { "description": "Not authorised. Must be super admin." }, status.HTTP_422_UNPROCESSABLE_CONTENT: { @@ -383,12 +386,22 @@ async def delete_organisation_by_id( status.HTTP_204_NO_CONTENT: { "description": "Successfully deleted organisation." }, - status.HTTP_401_UNAUTHORIZED: { - "description": "Not authorised. Must be root user." - }, status.HTTP_422_UNPROCESSABLE_CONTENT: { "description": "Org ID missing or invalid." }, + status.HTTP_403_FORBIDDEN: { + "description": "Forbidden", + "content": { + "application/json": { + "examples": { + "invalid_state": { + "summary": "Organisation is no longer in pre-approval state." + }, + "not_root": {"summary": "Not authorised. Must be root user."}, + } + } + }, + }, }, ) async def delete_preapproved_organisation_by_id( @@ -400,7 +413,7 @@ async def delete_preapproved_organisation_by_id( """ org_status = StatusEnum(org_model.status) if not org_status.is_pre_approval: - raise UnauthorizedException( + raise ForbiddenException( message="Organisation is no longer in pre-approval state." ) @@ -456,7 +469,7 @@ async def update_root_user( status.HTTP_422_UNPROCESSABLE_CONTENT: { "description": "Org ID missing or invalid." }, - status.HTTP_401_UNAUTHORIZED: { + status.HTTP_403_FORBIDDEN: { "description": "Not authorised. Must be org root user." }, }, @@ -479,7 +492,7 @@ async def get_org_groups(org_model: org_model_root_claim_query_dependency): status_code=status.HTTP_204_NO_CONTENT, responses={ status.HTTP_204_NO_CONTENT: {"description": "Successfully removed user."}, - status.HTTP_401_UNAUTHORIZED: { + status.HTTP_403_FORBIDDEN: { "description": "Not authorised. Must be org root user." }, status.HTTP_422_UNPROCESSABLE_CONTENT: { @@ -512,7 +525,7 @@ async def remove_user_from_org( status.HTTP_422_UNPROCESSABLE_CONTENT: { "description": "Invalid data in request." }, - status.HTTP_401_UNAUTHORIZED: { + status.HTTP_403_FORBIDDEN: { "description": "Not authorised. Must be org root user." }, }, @@ -557,7 +570,7 @@ async def get_contact( status.HTTP_422_UNPROCESSABLE_CONTENT: { "description": "Invalid data in request." }, - status.HTTP_401_UNAUTHORIZED: { + status.HTTP_403_FORBIDDEN: { "description": "Not authorised. Must be org root user." }, }, diff --git a/src/utils.py b/src/utils.py index 5f1df8d..e3bdb25 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,9 +1,8 @@ from datetime import datetime, timezone from joserfc import jwt, jwk, errors -from src.auth.exceptions import UnauthorizedException from src.config import settings - +from src.exceptions import ForbiddenException, UnauthorizedException KEY = jwk.import_key(settings.SECRET_KEY.get_secret_value(), "oct") @@ -33,7 +32,7 @@ async def verify_email_token(user_model, token): raise UnauthorizedException("Invitation expired.") if user_model.email != claimed_email: - raise UnauthorizedException("The logged in user and email do not match.") + raise ForbiddenException("The logged in user and email do not match.") return email_claims diff --git a/test/test_auth_root.py b/test/test_auth_root.py index c53b42c..5a8ed36 100644 --- a/test/test_auth_root.py +++ b/test/test_auth_root.py @@ -46,7 +46,7 @@ def add_second_org(db_session): async def test_get_org_auth_root(no_su_client: AsyncClient): resp = await no_su_client.get("/org?org_id=2") assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert "Must be the org's root user" in resp.json()["detail"] @@ -65,7 +65,7 @@ async def test_patch_org_questionnaire_auth_root(no_su_client: AsyncClient): }, ) assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert "Must be the org's root user" in resp.json()["detail"] @@ -73,7 +73,7 @@ async def test_patch_org_questionnaire_auth_root(no_su_client: AsyncClient): async def test_get_org_users_auth_root(no_su_client: AsyncClient): resp = await no_su_client.get("/org/users?org_id=2") assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert "Must be the org's root user" in resp.json()["detail"] @@ -81,7 +81,7 @@ async def test_get_org_users_auth_root(no_su_client: AsyncClient): async def test_get_org_groups_auth_root(no_su_client: AsyncClient): resp = await no_su_client.get("/org/groups?org_id=2") assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert "Must be the org's root user" in resp.json()["detail"] @@ -89,7 +89,7 @@ async def test_get_org_groups_auth_root(no_su_client: AsyncClient): async def test_get_org_contact_auth_root(no_su_client: AsyncClient): resp = await no_su_client.get("/org/contact?org_id=2&contact_type=billing") assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert "Must be the org's root user" in resp.json()["detail"] @@ -104,7 +104,7 @@ async def test_patch_org_contact_auth_root(no_su_client: AsyncClient): }, ) assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert "Must be the org's root user" in resp.json()["detail"] @@ -112,7 +112,7 @@ async def test_patch_org_contact_auth_root(no_su_client: AsyncClient): async def test_get_service_auth_root(no_su_client: AsyncClient): resp = await no_su_client.get("/service/?org_id=2") assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert "Must be the org's root user" in resp.json()["detail"] @@ -120,7 +120,7 @@ async def test_get_service_auth_root(no_su_client: AsyncClient): async def test_get_iam_group_permissions_auth_root(no_su_client: AsyncClient): resp = await no_su_client.get("/iam/group/permissions?org_id=2&group_id=1") assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert "Must be the org's root user" in resp.json()["detail"] @@ -128,7 +128,7 @@ async def test_get_iam_group_permissions_auth_root(no_su_client: AsyncClient): async def test_get_iam_group_users_auth_root(no_su_client: AsyncClient): resp = await no_su_client.get("/iam/group/users?org_id=2&group_id=1") assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert "Must be the org's root user" in resp.json()["detail"] @@ -138,7 +138,7 @@ async def test_post_iam_group_auth_root(no_su_client: AsyncClient): "/iam/group", json={"name": "New Group", "organisation_id": 2} ) assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert "Must be the org's root user" in resp.json()["detail"] @@ -153,7 +153,7 @@ async def test_put_iam_group_permission_auth_root( json={"permission_id": 1, "group_id": 2, "organisation_id": 2}, ) assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert "Must be the org's root user" in resp.json()["detail"] @@ -173,7 +173,7 @@ async def test_put_iam_group_user_auth_root(no_su_client: AsyncClient, db_sessio "/iam/group/user", json={"user_id": 2, "group_id": 1, "organisation_id": 2} ) assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert "Must be the org's root user" in resp.json()["detail"] @@ -181,7 +181,7 @@ async def test_put_iam_group_user_auth_root(no_su_client: AsyncClient, db_sessio async def test_get_iam_permissions_auth_root(no_su_client: AsyncClient): resp = await no_su_client.get("/iam/permissions?org_id=2") assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert "Must be the org's root user" in resp.json()["detail"] @@ -191,5 +191,5 @@ async def test_post_iam_permissions_search_auth_root(no_su_client: AsyncClient): "/iam/permissions/search", json={"organisation_id": 2, "action": "read"} ) assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert "Must be the org's root user" in resp.json()["detail"] diff --git a/test/test_auth_su.py b/test/test_auth_su.py index 2e438fb..18ed386 100644 --- a/test/test_auth_su.py +++ b/test/test_auth_su.py @@ -20,7 +20,7 @@ pytestmark = [ async def test_get_user_auth_su(no_su_client: AsyncClient): resp = await no_su_client.get("/user/?user_id=1") assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert resp.json()["detail"] == "Must be super admin" @@ -30,7 +30,7 @@ async def test_patch_org_status_auth_su(no_su_client: AsyncClient): "/org/status", json={"organisation_id": 1, "status": "submitted"} ) assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert resp.json()["detail"] == "Must be super admin" @@ -52,7 +52,7 @@ async def test_patch_org_root_user_auth_su(no_su_client: AsyncClient, db_session "/org/root_user", json={"organisation_id": 1, "user_id": 2} ) assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert resp.json()["detail"] == "Must be super admin" @@ -60,7 +60,7 @@ async def test_patch_org_root_user_auth_su(no_su_client: AsyncClient, db_session async def test_patch_service_key_auth_su(no_su_client: AsyncClient): resp = await no_su_client.patch("/service/key", json={"service_id": 1}) assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert resp.json()["detail"] == "Must be super admin" @@ -68,7 +68,7 @@ async def test_patch_service_key_auth_su(no_su_client: AsyncClient): async def test_post_service_auth_su(no_su_client: AsyncClient): resp = await no_su_client.post("/service/", json={"name": "New Test Service"}) assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert resp.json()["detail"] == "Must be super admin" @@ -79,7 +79,7 @@ async def test_post_perm_auth_su(no_su_client: AsyncClient, db_session): json={"service_id": 1, "resource": "test_resource", "action": "create"}, ) assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert resp.json()["detail"] == "Must be super admin" @@ -99,5 +99,5 @@ async def test_post_org_user_auth_su(no_su_client: AsyncClient, db_session): "/org/user", json={"organisation_id": 1, "user_id": 2} ) assert resp.status_code != 422 - assert resp.status_code == 401 + assert resp.status_code == 403 assert "Must be super admin" in resp.json()["detail"] diff --git a/test/test_iam.py b/test/test_iam.py index ab3b0df..ea34831 100644 --- a/test/test_iam.py +++ b/test/test_iam.py @@ -205,7 +205,7 @@ async def test_get_group_permissions_mismatch( db_session.flush() resp = await default_client.get(f"/iam/group/permissions?{query}") - assert resp.status_code == 401 + assert resp.status_code == 403 assert resp.json()["detail"] == "Group does not belong to this organization" @@ -271,7 +271,7 @@ async def test_get_group_users_mismatch( db_session.flush() resp = await default_client.get(f"/iam/group/users?{query}") - assert resp.status_code == 401 + assert resp.status_code == 403 assert resp.json()["detail"] == "Group does not belong to this organization" @@ -453,7 +453,7 @@ async def test_put_group_perm_mismatch( db_session.flush() resp = await default_client.put("/iam/group/permission", json=body) - assert resp.status_code == 401 + assert resp.status_code == 403 assert resp.json()["detail"] == "Group does not belong to this organization"