Compare commits

...

11 commits

Author SHA1 Message Date
8b89595531 feat: group invitation response model
All checks were successful
ci / lint_and_test (push) Successful in 14s
2026-06-12 10:02:35 +01:00
1a29da73c2 feat: use of service model dependency 2026-06-12 09:37:55 +01:00
f06b19340c feat: remove group permission check if exists 2026-06-12 09:30:34 +01:00
c94c26f2a6 feat: blocked orgs cannot access any endpoints 2026-06-11 16:39:32 +01:00
37a3972d04 feat: questionnaire only modifiable before submission 2026-06-11 16:35:53 +01:00
2b923becf0 feat: perm search ignores empty strings 2026-06-11 16:23:02 +01:00
1a6a6ad97d fix: remove trailing slash and plurals in paths 2026-06-11 16:14:22 +01:00
c74e895bf1 feat: return org summary when creating group 2026-06-11 16:07:31 +01:00
5d122a7690 feat: fully defined response code descriptions
Only done on three endpoints. This is a lot of repeated text.
2026-06-11 16:02:51 +01:00
c2e035dede feat: more accurate status codes
403 Forbidden replacing many 401 Unauthorized usages.
2026-06-11 14:58:05 +01:00
b3ae655009 feat: healthcheck endpoint structure 2026-06-11 14:27:08 +01:00
20 changed files with 300 additions and 126 deletions

View file

@ -2,7 +2,7 @@
This module hooks the routers for the main endpoints into a single router for importing to the app. This module hooks the routers for the main endpoints into a single router for importing to the app.
""" """
from fastapi import APIRouter from fastapi import APIRouter, status
from src.auth.router import router as auth_router from src.auth.router import router as auth_router
from src.contact.router import router as contact_router from src.contact.router import router as contact_router
@ -11,6 +11,7 @@ from src.user.router import router as user_router
from src.admin.router import router as admin_router from src.admin.router import router as admin_router
from src.iam.router import router as iam_router from src.iam.router import router as iam_router
from src.service.router import router as service_router from src.service.router import router as service_router
from src.schemas import CustomBaseModel
api_router = APIRouter(prefix="/api/v1") api_router = APIRouter(prefix="/api/v1")
@ -24,7 +25,16 @@ api_router.include_router(service_router)
api_router.include_router(iam_router) api_router.include_router(iam_router)
@api_router.get("/healthcheck", include_in_schema=False) class HealthCheckResponse(CustomBaseModel):
status: str
@api_router.get(
path="/healthcheck",
status_code=status.HTTP_200_OK,
response_model=HealthCheckResponse,
include_in_schema=False,
)
def healthcheck(): def healthcheck():
"""Simple healthcheck endpoint.""" """Simple health check endpoint."""
return {"status": "ok"} return {"status": "ok"}

View file

@ -11,6 +11,7 @@ Exports:
from typing import Annotated from typing import Annotated
from fastapi import Depends from fastapi import Depends
from src.exceptions import ForbiddenException
from src.user.dependencies import user_model_claims_dependency from src.user.dependencies import user_model_claims_dependency
from src.user.models import User from src.user.models import User
from src.organisation.dependencies import ( from src.organisation.dependencies import (
@ -19,8 +20,6 @@ from src.organisation.dependencies import (
) )
from src.organisation.models import Organisation as Org from src.organisation.models import Organisation as Org
from src.auth.exceptions import UnauthorizedException
async def org_query_user_claims( async def org_query_user_claims(
org_model: org_model_query_dependency, user_model: user_model_claims_dependency org_model: org_model_query_dependency, user_model: user_model_claims_dependency
@ -28,7 +27,7 @@ async def org_query_user_claims(
if user_model in org_model.user_rel: if user_model in org_model.user_rel:
return True return True
raise UnauthorizedException() raise ForbiddenException()
org_query_user_claims_dependency = Annotated[bool, Depends(org_query_user_claims)] org_query_user_claims_dependency = Annotated[bool, Depends(org_query_user_claims)]
@ -45,10 +44,10 @@ async def org_query_root_claims(
try: try:
if await user_model_super_admin(user_model, su_emails): if await user_model_super_admin(user_model, su_emails):
return org_model return org_model
except UnauthorizedException: except ForbiddenException:
pass pass
raise UnauthorizedException(message="Must be the org's root user") raise ForbiddenException(message="Must be the org's root user")
org_model_root_claim_query_dependency = Annotated[ org_model_root_claim_query_dependency = Annotated[
@ -67,10 +66,10 @@ async def org_body_root_claims(
try: try:
if await user_model_super_admin(user_model, su_emails): if await user_model_super_admin(user_model, su_emails):
return org_model return org_model
except UnauthorizedException: except ForbiddenException:
pass pass
raise UnauthorizedException(message="Must be the org's root user") raise ForbiddenException(message="Must be the org's root user")
org_model_root_claim_body_dependency = Annotated[ org_model_root_claim_body_dependency = Annotated[
@ -99,7 +98,7 @@ async def user_model_super_admin(
if user_model.email in super_admin_emails: if user_model.email in super_admin_emails:
return user_model return user_model
raise UnauthorizedException(message="Must be super admin") raise ForbiddenException(message="Must be super admin")
super_admin_dependency = Annotated[type[User], Depends(user_model_super_admin)] super_admin_dependency = Annotated[type[User], Depends(user_model_super_admin)]

View file

@ -4,16 +4,3 @@ Module specific exceptions for the auth module
Exceptions: Exceptions:
- UnauthorizedException: Takes an optional message string - UnauthorizedException: Takes an optional message string
""" """
from typing import Optional
from fastapi import HTTPException, status
class UnauthorizedException(HTTPException):
def __init__(self, message: Optional[str] = None) -> None:
detail = "Not authorized" if not message else message
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=detail,
)

View file

@ -17,7 +17,7 @@ from urllib.request import urlopen
from fastapi import Depends from fastapi import Depends
from fastapi.security import OpenIdConnect from fastapi.security import OpenIdConnect
from src.auth.exceptions import UnauthorizedException from src.exceptions import UnauthorizedException
from src.auth.config import auth_settings from src.auth.config import auth_settings
from src.user.service import add_user_to_db from src.user.service import add_user_to_db
from src.database import db_dependency from src.database import db_dependency

View file

@ -36,3 +36,12 @@ class ForbiddenException(HTTPException):
status_code=status.HTTP_403_FORBIDDEN, status_code=status.HTTP_403_FORBIDDEN,
detail=detail, detail=detail,
) )
class UnauthorizedException(HTTPException):
def __init__(self, message: Optional[str] = None) -> None:
detail = "Not authorized" if not message else message
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=detail,
)

View file

@ -24,10 +24,13 @@ from sqlalchemy.exc import IntegrityError
from src.iam.exceptions import GroupNotFoundException from src.iam.exceptions import GroupNotFoundException
from src.organisation.exceptions import OrgNotFoundException from src.organisation.exceptions import OrgNotFoundException
from src.schemas import GroupSummary, OrgSummary, ResourceName from src.schemas import GroupSummary, OrgSummary, ResourceName
from src.service.exceptions import ServiceNotFoundException from src.service.dependencies import service_model_body_dependency
from src.exceptions import ConflictException, ForbiddenException from src.exceptions import (
ConflictException,
ForbiddenException,
UnprocessableContentException,
)
from src.database import db_dependency from src.database import db_dependency
from src.auth.exceptions import UnauthorizedException
from src.auth.service import claims_dependency from src.auth.service import claims_dependency
from src.auth.dependencies import ( from src.auth.dependencies import (
org_model_root_claim_query_dependency, org_model_root_claim_query_dependency,
@ -57,7 +60,6 @@ from src.iam.dependencies import (
perm_model_query_dependency, perm_model_query_dependency,
) )
from src.iam.schemas import ( from src.iam.schemas import (
GroupSchema,
IAMCAoRRequest, IAMCAoRRequest,
IAMGetGroupPermissionsResponse, IAMGetGroupPermissionsResponse,
IAMGetGroupUsersResponse, IAMGetGroupUsersResponse,
@ -77,6 +79,8 @@ from src.iam.schemas import (
IAMPutGroupInvitationRequest, IAMPutGroupInvitationRequest,
IAMPutGroupInvitationAcceptRequest, IAMPutGroupInvitationAcceptRequest,
IAMCAoRResponse, IAMCAoRResponse,
IAMPutGroupInvitationAcceptResponse,
IAMPutGroupInvitationResponse,
) )
from src.utils import verify_email_token from src.utils import verify_email_token
@ -158,9 +162,56 @@ async def can_act_on_resource(
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
response_model=IAMGetGroupPermissionsResponse, response_model=IAMGetGroupPermissionsResponse,
responses={ responses={
status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Unprocessable content.",
"content": {
"application/json": {
"examples": {
"org_id": {"summary": "Invalid or missing org ID."},
"oidc_claims": {"summary": "Invalid or missing OIDC claims."},
}
}
},
},
status.HTTP_401_UNAUTHORIZED: { status.HTTP_401_UNAUTHORIZED: {
"description": "Group does not belong to this organisation" "description": "Unauthorized",
} "content": {
"application/json": {
"examples": {
"awaiting_approval": {
"summary": "Organisation has not yet been approved."
},
"expired_token": {"summary": "User token has expired."},
"oidc": {"summary": "Failed to verify OIDC claims."},
}
}
},
},
status.HTTP_403_FORBIDDEN: {
"description": "Forbidden",
"content": {
"application/json": {
"examples": {
"not_root": {"summary": "Not authorised. Must be root user."},
}
}
},
},
status.HTTP_404_NOT_FOUND: {
"description": "Not found",
"content": {
"application/json": {
"examples": {
"db_id": {
"summary": "User not found in db when checking claims."
},
"user_model": {"summary": "User model not found in db."},
"org_model": {"summary": "Org model not found in db."},
"group_model": {"summary": "Group model not found in db."},
}
}
},
},
}, },
) )
async def get_group_permissions( async def get_group_permissions(
@ -171,7 +222,7 @@ async def get_group_permissions(
Gets a list of permissions granted to the group. Also returns a summary for the org and group. Gets a list of permissions granted to the group. Also returns a summary for the org and group.
""" """
if group_model.org_id != org_model.id: if group_model.org_id != org_model.id:
raise UnauthorizedException("Group does not belong to this organization") raise ForbiddenException("Group does not belong to this organization")
return { return {
"organisation": org_model, "organisation": org_model,
"group": group_model, "group": group_model,
@ -198,7 +249,7 @@ async def get_group_users(
Gets a list of users assigned to the group. Also returns a summary for the org and group. Gets a list of users assigned to the group. Also returns a summary for the org and group.
""" """
if group_model.org_id != org_model.id: if group_model.org_id != org_model.id:
raise UnauthorizedException("Group does not belong to this organization") raise ForbiddenException("Group does not belong to this organization")
return { return {
"organisation": org_model, "organisation": org_model,
"group": group_model, "group": group_model,
@ -236,9 +287,10 @@ async def create_group(
or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation
): ):
raise ConflictException("Group with this name already exists") raise ConflictException("Group with this name already exists")
response = GroupSchema(**group_model.__dict__) group_response = GroupSummary(**group_model.__dict__)
org_response = OrgSummary(**org_model.__dict__)
db.commit() db.commit()
return {"group": response} return {"group": group_response, "organisation": org_response}
@router.put( @router.put(
@ -266,7 +318,7 @@ async def add_group_permission(
Grants a permission to a group. Returns a list of the permissions in the group as well as a summary for the org and group. Grants a permission to a group. Returns a list of the permissions in the group as well as a summary for the org and group.
""" """
if group_model.org_id != org_model.id: if group_model.org_id != org_model.id:
raise UnauthorizedException("Group does not belong to this organization") raise ForbiddenException("Group does not belong to this organization")
if perm_model in group_model.permission_rel: if perm_model in group_model.permission_rel:
raise ConflictException("Group already has this permission") raise ConflictException("Group already has this permission")
@ -311,7 +363,7 @@ async def add_group_user(
The user's email address must match the email on their OIDC profile. The user's email address must match the email on their OIDC profile.
""" """
if group_model.org_id != org_model.id: if group_model.org_id != org_model.id:
raise UnauthorizedException("Group does not belong to this organization") raise ForbiddenException("Group does not belong to this organization")
if user_model in group_model.user_rel: if user_model in group_model.user_rel:
raise ConflictException("User already in group") raise ConflictException("User already in group")
@ -324,14 +376,14 @@ async def add_group_user(
group_model.user_rel.append(user_model) group_model.user_rel.append(user_model)
db.flush() db.flush()
response = IAMPutGroupUserResponse( response = IAMPutGroupUserResponse(
group=GroupSchema(**group_model.__dict__), users=group_model.user_rel group=GroupSummary(**group_model.__dict__), users=group_model.user_rel
) )
db.commit() db.commit()
return response return response
@router.delete( @router.delete(
path="/group/permissions", path="/group/permission",
summary="Removes a permission from the group", summary="Removes a permission from the group",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
response_model=IAMDeleteGroupPermissionResponse, response_model=IAMDeleteGroupPermissionResponse,
@ -341,7 +393,7 @@ async def add_group_user(
}, },
}, },
) )
async def remove_group_permissions( async def remove_group_permission(
db: db_dependency, db: db_dependency,
group_model: group_model_query_dependency, group_model: group_model_query_dependency,
perm_model: perm_model_query_dependency, perm_model: perm_model_query_dependency,
@ -351,12 +403,15 @@ async def remove_group_permissions(
Removes a permission from the group. Removes a permission from the group.
""" """
if group_model.org_id != org_model.id: if group_model.org_id != org_model.id:
raise UnauthorizedException("Group does not belong to this organization") raise ForbiddenException("Group does not belong to this organization")
if perm_model not in group_model.permission_rel:
raise UnprocessableContentException("Permission not granted to group")
group_model.permission_rel.remove(perm_model) group_model.permission_rel.remove(perm_model)
db.flush() db.flush()
response = IAMDeleteGroupPermissionResponse( response = IAMDeleteGroupPermissionResponse(
group=GroupSchema(**group_model.__dict__), group=GroupSummary(**group_model.__dict__),
permissions=group_model.permission_rel, permissions=group_model.permission_rel,
) )
db.commit() db.commit()
@ -370,8 +425,9 @@ async def remove_group_permissions(
response_model=IAMDeleteGroupUserResponse, response_model=IAMDeleteGroupUserResponse,
responses={ responses={
status.HTTP_401_UNAUTHORIZED: { status.HTTP_401_UNAUTHORIZED: {
"description": "Group not in org | User not authenticated | User does not have permission" "description": "User not authenticated | User does not have permission"
}, },
status.HTTP_403_FORBIDDEN: {"description": "Group not in org"},
}, },
) )
async def remove_group_user( async def remove_group_user(
@ -384,12 +440,12 @@ async def remove_group_user(
Removes a user from the group. Removes a user from the group.
""" """
if group_model.org_id != org_model.id: if group_model.org_id != org_model.id:
raise UnauthorizedException("Group does not belong to this organization") raise ForbiddenException("Group does not belong to this organization")
user_model.group_rel.remove(group_model) user_model.group_rel.remove(group_model)
db.flush() db.flush()
response = IAMDeleteGroupUserResponse( response = IAMDeleteGroupUserResponse(
group=GroupSchema(**group_model.__dict__), users=group_model.user_rel group=GroupSummary(**group_model.__dict__), users=group_model.user_rel
) )
db.commit() db.commit()
@ -433,13 +489,11 @@ async def create_new_permission(
db: db_dependency, db: db_dependency,
su: super_admin_dependency, su: super_admin_dependency,
request_model: IAMPostPermissionRequest, request_model: IAMPostPermissionRequest,
service_model: service_model_body_dependency, # Used to verify service model exists
): ):
""" """
Allows a super admin to create a new IAM permission for a service. Allows a super admin to create a new IAM permission for a service.
""" """
service_model = db.get(Service, request_model.service_id)
if service_model is None:
raise ServiceNotFoundException(service_id=request_model.service_id)
perm_model = Perm(**request_model.__dict__) perm_model = Perm(**request_model.__dict__)
db.add(perm_model) db.add(perm_model)
try: try:
@ -496,17 +550,17 @@ async def post_permissions(
""" """
permission_query = db.query(Perm) permission_query = db.query(Perm)
if request_model.service_id is not None: if not (request_model.service_id is None or request_model.service_id == ""):
permission_query = permission_query.filter( permission_query = permission_query.filter(
Perm.service_id == request_model.service_id Perm.service_id == request_model.service_id
) )
if request_model.resource is not None: if not (request_model.resource is None or request_model.resource == ""):
permission_query = permission_query.filter( permission_query = permission_query.filter(
Perm.resource == request_model.resource Perm.resource == request_model.resource
) )
if request_model.action is not None: if not (request_model.action is None or request_model.action == ""):
permission_query = permission_query.filter(Perm.action == request_model.action) permission_query = permission_query.filter(Perm.action == request_model.action)
permission_models = permission_query.all() permission_models = permission_query.all()
@ -518,6 +572,7 @@ async def post_permissions(
path="/group/user/invitation", path="/group/user/invitation",
summary="Send an email invitation for non-org member to join a group", summary="Send an email invitation for non-org member to join a group",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
response_model=IAMPutGroupInvitationResponse,
responses={}, responses={},
) )
async def invitation( async def invitation(
@ -547,13 +602,20 @@ async def invitation(
group_name=group_name, group_name=group_name,
) )
return "Invitation sent" response = {
"organisation": org_model,
"group": group_model,
"invited_email": user_email,
}
return response
@router.put( @router.put(
path="/group/user//invitation/accept", path="/group/user//invitation/accept",
summary="Accept email invitation to join an org's group", summary="Accept email invitation to join an org's group",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
response_model=IAMPutGroupInvitationAcceptResponse,
responses={ responses={
status.HTTP_404_NOT_FOUND: {"description": "User|Org|Group not found"}, status.HTTP_404_NOT_FOUND: {"description": "User|Org|Group not found"},
status.HTTP_403_FORBIDDEN: { status.HTTP_403_FORBIDDEN: {
@ -589,6 +651,13 @@ async def accept_invitation(
raise ConflictException("User already in group.") raise ConflictException("User already in group.")
group_model.user_rel.append(user_model) group_model.user_rel.append(user_model)
db.flush()
response = {
"organisation": org_model,
"user": user_model,
"group": {"details": group_model, "permissions": group_model.permission_rel},
}
db.commit() db.commit()
return "Invitation accepted" return response

View file

@ -42,9 +42,9 @@ class PermissionSchema(CustomBaseModel):
action: str action: str
class GroupSchema(CustomBaseModel): class GroupDetails(CustomBaseModel):
id: int details: GroupSummary
name: str permissions: list[PermissionSchema]
class IAMCAoRRequest(CustomBaseModel): class IAMCAoRRequest(CustomBaseModel):
@ -76,7 +76,8 @@ class IAMPostGroupRequest(OrgIDMixin):
class IAMPostGroupResponse(CustomBaseModel): class IAMPostGroupResponse(CustomBaseModel):
group: GroupSchema organisation: OrgSummary
group: GroupSummary
class IAMPutGroupPermissionRequest(GroupIDMixin, PermIDMixin, OrgIDMixin): class IAMPutGroupPermissionRequest(GroupIDMixin, PermIDMixin, OrgIDMixin):
@ -94,17 +95,17 @@ class IAMPutGroupUserRequest(GroupIDMixin, UserIDMixin, OrgIDMixin):
class IAMPutGroupUserResponse(CustomBaseModel): class IAMPutGroupUserResponse(CustomBaseModel):
group: GroupSchema group: GroupSummary
users: list[UserSchema] users: list[UserSchema]
class IAMDeleteGroupPermissionResponse(CustomBaseModel): class IAMDeleteGroupPermissionResponse(CustomBaseModel):
group: GroupSchema group: GroupSummary
permissions: list[PermissionSchema] permissions: list[PermissionSchema]
class IAMDeleteGroupUserResponse(CustomBaseModel): class IAMDeleteGroupUserResponse(CustomBaseModel):
group: GroupSchema group: GroupSummary
users: list[UserSchema] users: list[UserSchema]
@ -135,5 +136,17 @@ class IAMPutGroupInvitationRequest(OrgIDMixin, GroupIDMixin):
user_email: EmailStr user_email: EmailStr
class IAMPutGroupInvitationResponse(CustomBaseModel):
organisation: OrgSummary
group: GroupSummary
invited_email: EmailStr
class IAMPutGroupInvitationAcceptRequest(CustomBaseModel): class IAMPutGroupInvitationAcceptRequest(CustomBaseModel):
jwt: str jwt: str
class IAMPutGroupInvitationAcceptResponse(CustomBaseModel):
organisation: OrgSummary
user: UserSummary
group: GroupDetails

View file

@ -11,7 +11,7 @@ from datetime import datetime, timedelta, timezone
from src.iam.schemas import IAMCAoRRequest from src.iam.schemas import IAMCAoRRequest
from src.service.models import Service from src.service.models import Service
from src.database import db_dependency from src.database import db_dependency
from src.auth.exceptions import UnauthorizedException from src.exceptions import UnauthorizedException
from src.utils import send_email, generate_jwt from src.utils import send_email, generate_jwt

View file

@ -31,7 +31,15 @@ class Status(StrEnum):
@property @property
def is_pre_approval(self): def is_pre_approval(self):
return self in (self.PARTIAL, self.SUBMITTED) return self in (self.PARTIAL, self.SUBMITTED, self.REMEDIATION)
@property
def is_pre_submission(self):
return self in (self.PARTIAL, self.REMEDIATION)
@property
def is_blocked(self):
return self in (self.REMOVED, self.REJECTED)
class ContactType(StrEnum): class ContactType(StrEnum):

View file

@ -12,6 +12,7 @@ from sqlalchemy.orm import Session
from fastapi import Depends, Query, Request from fastapi import Depends, Query, Request
from src.database import db_dependency from src.database import db_dependency
from src.exceptions import ForbiddenException
from src.organisation.schemas import OrgIDMixin from src.organisation.schemas import OrgIDMixin
from src.organisation.models import Organisation as Org from src.organisation.models import Organisation as Org
@ -24,6 +25,10 @@ def get_org_model(db: Session, request: Request, org_id: int):
if org_model is None: if org_model is None:
raise OrgNotFoundException(org_id) raise OrgNotFoundException(org_id)
org_status = OrgStatus(org_model.status)
if org_status.is_blocked:
raise ForbiddenException("This organisation cannot perform this action.")
root = "/api/v1" root = "/api/v1"
pre_approval_endpoints = [ pre_approval_endpoints = [
@ -58,7 +63,7 @@ def get_org_model_body(
) -> type[Org]: ) -> type[Org]:
org_id: Optional[int] = getattr(request_model, "organisation_id", None) org_id: Optional[int] = getattr(request_model, "organisation_id", None)
if org_id is None: if org_id is None:
raise OrgNotFoundException raise OrgNotFoundException()
return get_org_model(db, request, org_id) return get_org_model(db, request, org_id)

View file

@ -23,9 +23,12 @@ from fastapi import APIRouter, status
from fastapi.params import Query from fastapi.params import Query
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from src.auth.exceptions import UnauthorizedException
from src.contact.schemas import ContactModel from src.contact.schemas import ContactModel
from src.exceptions import UnprocessableContentException, ConflictException from src.exceptions import (
UnprocessableContentException,
ConflictException,
ForbiddenException,
)
from src.contact.models import Contact from src.contact.models import Contact
from src.contact.schemas import ContactAddress from src.contact.schemas import ContactAddress
from src.contact.exceptions import ContactNotFoundException from src.contact.exceptions import ContactNotFoundException
@ -86,7 +89,7 @@ router = APIRouter(
status.HTTP_422_UNPROCESSABLE_CONTENT: { status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Missing or invalid org_id query parameter" "description": "Missing or invalid org_id query parameter"
}, },
status.HTTP_401_UNAUTHORIZED: { status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be org root user." "description": "Not authorised. Must be org root user."
}, },
}, },
@ -204,7 +207,7 @@ async def create_org(
status.HTTP_422_UNPROCESSABLE_CONTENT: { status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Invalid data in request." "description": "Invalid data in request."
}, },
status.HTTP_401_UNAUTHORIZED: { status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be org root user." "description": "Not authorised. Must be org root user."
}, },
}, },
@ -219,6 +222,11 @@ async def update_questionnaire(
The partial bool allows for submission of partially completed questionnaire and/or The partial bool allows for submission of partially completed questionnaire and/or
final "are you sure" check before setting the org to be in "submitted" status, awaiting admin approval. final "are you sure" check before setting the org to be in "submitted" status, awaiting admin approval.
""" """
org_status = StatusEnum(org_model.status)
if not org_status.is_pre_submission:
raise ForbiddenException(
"Questionnaire may only be modified prior to submission."
)
update_data = request_model.intake_questionnaire.model_dump(exclude_none=True) update_data = request_model.intake_questionnaire.model_dump(exclude_none=True)
questionnaire = org_model.intake_questionnaire questionnaire = org_model.intake_questionnaire
questions_model = QuestionnaireQuestionsVersion0(**questionnaire["questions"]) questions_model = QuestionnaireQuestionsVersion0(**questionnaire["questions"])
@ -259,7 +267,7 @@ async def update_questionnaire(
status.HTTP_422_UNPROCESSABLE_CONTENT: { status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Invalid data in request." "description": "Invalid data in request."
}, },
status.HTTP_401_UNAUTHORIZED: { status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be super admin." "description": "Not authorised. Must be super admin."
}, },
}, },
@ -290,7 +298,7 @@ async def update_status(
status.HTTP_422_UNPROCESSABLE_CONTENT: { status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Org ID missing or invalid." "description": "Org ID missing or invalid."
}, },
status.HTTP_401_UNAUTHORIZED: { status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be org root user." "description": "Not authorised. Must be org root user."
}, },
}, },
@ -314,7 +322,7 @@ async def get_users(org_model: org_model_root_claim_query_dependency):
status.HTTP_200_OK: { status.HTTP_200_OK: {
"description": "Successfully added user to the organisation." "description": "Successfully added user to the organisation."
}, },
status.HTTP_401_UNAUTHORIZED: { status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be org root user." "description": "Not authorised. Must be org root user."
}, },
status.HTTP_422_UNPROCESSABLE_CONTENT: { status.HTTP_422_UNPROCESSABLE_CONTENT: {
@ -355,7 +363,7 @@ async def add_user_to_org(
status.HTTP_204_NO_CONTENT: { status.HTTP_204_NO_CONTENT: {
"description": "Successfully deleted organisation." "description": "Successfully deleted organisation."
}, },
status.HTTP_401_UNAUTHORIZED: { status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be super admin." "description": "Not authorised. Must be super admin."
}, },
status.HTTP_422_UNPROCESSABLE_CONTENT: { status.HTTP_422_UNPROCESSABLE_CONTENT: {
@ -383,11 +391,57 @@ async def delete_organisation_by_id(
status.HTTP_204_NO_CONTENT: { status.HTTP_204_NO_CONTENT: {
"description": "Successfully deleted organisation." "description": "Successfully deleted organisation."
}, },
status.HTTP_401_UNAUTHORIZED: {
"description": "Not authorised. Must be root user."
},
status.HTTP_422_UNPROCESSABLE_CONTENT: { status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Org ID missing or invalid." "description": "Unprocessable content.",
"content": {
"application/json": {
"examples": {
"org_id": {"summary": "Invalid or missing org ID."},
"oidc_claims": {"summary": "Invalid or missing OIDC claims."},
}
}
},
},
status.HTTP_401_UNAUTHORIZED: {
"description": "Unauthorized",
"content": {
"application/json": {
"examples": {
"awaiting_approval": {
"summary": "Organisation has not yet been approved."
},
"expired_token": {"summary": "User token has expired."},
"oidc": {"summary": "Failed to verify OIDC claims."},
}
}
},
},
status.HTTP_403_FORBIDDEN: {
"description": "Forbidden",
"content": {
"application/json": {
"examples": {
"invalid_state": {
"summary": "Organisation is no longer in pre-approval state."
},
"not_root": {"summary": "Not authorised. Must be root user."},
}
}
},
},
status.HTTP_404_NOT_FOUND: {
"description": "Not found",
"content": {
"application/json": {
"examples": {
"db_id": {
"summary": "User not found in db when checking claims."
},
"user_model": {"summary": "User model not found in db."},
"org_model": {"summary": "Org model not found in db."},
}
}
},
}, },
}, },
) )
@ -400,7 +454,7 @@ async def delete_preapproved_organisation_by_id(
""" """
org_status = StatusEnum(org_model.status) org_status = StatusEnum(org_model.status)
if not org_status.is_pre_approval: if not org_status.is_pre_approval:
raise UnauthorizedException( raise ForbiddenException(
message="Organisation is no longer in pre-approval state." message="Organisation is no longer in pre-approval state."
) )
@ -456,7 +510,7 @@ async def update_root_user(
status.HTTP_422_UNPROCESSABLE_CONTENT: { status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Org ID missing or invalid." "description": "Org ID missing or invalid."
}, },
status.HTTP_401_UNAUTHORIZED: { status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be org root user." "description": "Not authorised. Must be org root user."
}, },
}, },
@ -479,7 +533,7 @@ async def get_org_groups(org_model: org_model_root_claim_query_dependency):
status_code=status.HTTP_204_NO_CONTENT, status_code=status.HTTP_204_NO_CONTENT,
responses={ responses={
status.HTTP_204_NO_CONTENT: {"description": "Successfully removed user."}, status.HTTP_204_NO_CONTENT: {"description": "Successfully removed user."},
status.HTTP_401_UNAUTHORIZED: { status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be org root user." "description": "Not authorised. Must be org root user."
}, },
status.HTTP_422_UNPROCESSABLE_CONTENT: { status.HTTP_422_UNPROCESSABLE_CONTENT: {
@ -512,7 +566,7 @@ async def remove_user_from_org(
status.HTTP_422_UNPROCESSABLE_CONTENT: { status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Invalid data in request." "description": "Invalid data in request."
}, },
status.HTTP_401_UNAUTHORIZED: { status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be org root user." "description": "Not authorised. Must be org root user."
}, },
}, },
@ -557,7 +611,7 @@ async def get_contact(
status.HTTP_422_UNPROCESSABLE_CONTENT: { status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Invalid data in request." "description": "Invalid data in request."
}, },
status.HTTP_401_UNAUTHORIZED: { status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be org root user." "description": "Not authorised. Must be org root user."
}, },
}, },

View file

@ -40,13 +40,34 @@ router = APIRouter(
@router.get( @router.get(
"/", "",
summary="Get all services", summary="Get all services",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
response_model=ServiceGetServiceResponse, response_model=ServiceGetServiceResponse,
responses={ responses={
status.HTTP_200_OK: {"description": "Successful retrieval from database"}, status.HTTP_200_OK: {"description": "Successful retrieval from database"},
status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"}, status.HTTP_401_UNAUTHORIZED: {
"description": "Unauthorized",
"content": {
"application/json": {
"examples": {
"awaiting_approval": {
"summary": "Organisation has not yet been approved."
},
}
}
},
},
status.HTTP_403_FORBIDDEN: {
"description": "Forbidden",
"content": {
"application/json": {
"examples": {
"not_root": {"summary": "Not authorised. Must be root user."},
}
}
},
},
}, },
) )
async def get_all_services( async def get_all_services(
@ -61,7 +82,7 @@ async def get_all_services(
@router.post( @router.post(
"/", "",
summary="Register a new service.", summary="Register a new service.",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
response_model=ServicePostServiceResponse, response_model=ServicePostServiceResponse,
@ -127,7 +148,7 @@ async def regenerate_api_key(
@router.delete( @router.delete(
"/", "",
summary="Remove a service.", summary="Remove a service.",
status_code=status.HTTP_204_NO_CONTENT, status_code=status.HTTP_204_NO_CONTENT,
responses={ responses={

View file

@ -76,7 +76,7 @@ async def current_user(user_model: user_model_claims_dependency):
@router.get( @router.get(
"/", "",
summary="Get user hub details by ID.", summary="Get user hub details by ID.",
response_model=UserResponse, response_model=UserResponse,
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
@ -95,7 +95,7 @@ async def get_user_by_id(
@router.delete( @router.delete(
"/", "",
summary="Delete user from hub by ID.", summary="Delete user from hub by ID.",
status_code=status.HTTP_204_NO_CONTENT, status_code=status.HTTP_204_NO_CONTENT,
responses={ responses={

View file

@ -1,9 +1,8 @@
from datetime import datetime, timezone from datetime import datetime, timezone
from joserfc import jwt, jwk, errors from joserfc import jwt, jwk, errors
from src.auth.exceptions import UnauthorizedException
from src.config import settings from src.config import settings
from src.exceptions import ForbiddenException, UnauthorizedException
KEY = jwk.import_key(settings.SECRET_KEY.get_secret_value(), "oct") KEY = jwk.import_key(settings.SECRET_KEY.get_secret_value(), "oct")
@ -33,7 +32,7 @@ async def verify_email_token(user_model, token):
raise UnauthorizedException("Invitation expired.") raise UnauthorizedException("Invitation expired.")
if user_model.email != claimed_email: if user_model.email != claimed_email:
raise UnauthorizedException("The logged in user and email do not match.") raise ForbiddenException("The logged in user and email do not match.")
return email_claims return email_claims

View file

@ -138,7 +138,7 @@ async def test_patch_org_contact_auth_approval(default_client: AsyncClient):
@pytest.mark.anyio @pytest.mark.anyio
async def test_get_service_auth_approval(default_client: AsyncClient): async def test_get_service_auth_approval(default_client: AsyncClient):
resp = await default_client.get("/service/?org_id=1") resp = await default_client.get("/service?org_id=1")
assert resp.status_code != 422 assert resp.status_code != 422
assert "has not been approved." in resp.json()["detail"] assert "has not been approved." in resp.json()["detail"]

View file

@ -46,7 +46,7 @@ def add_second_org(db_session):
async def test_get_org_auth_root(no_su_client: AsyncClient): async def test_get_org_auth_root(no_su_client: AsyncClient):
resp = await no_su_client.get("/org?org_id=2") resp = await no_su_client.get("/org?org_id=2")
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert "Must be the org's root user" in resp.json()["detail"] assert "Must be the org's root user" in resp.json()["detail"]
@ -65,7 +65,7 @@ async def test_patch_org_questionnaire_auth_root(no_su_client: AsyncClient):
}, },
) )
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert "Must be the org's root user" in resp.json()["detail"] assert "Must be the org's root user" in resp.json()["detail"]
@ -73,7 +73,7 @@ async def test_patch_org_questionnaire_auth_root(no_su_client: AsyncClient):
async def test_get_org_users_auth_root(no_su_client: AsyncClient): async def test_get_org_users_auth_root(no_su_client: AsyncClient):
resp = await no_su_client.get("/org/users?org_id=2") resp = await no_su_client.get("/org/users?org_id=2")
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert "Must be the org's root user" in resp.json()["detail"] assert "Must be the org's root user" in resp.json()["detail"]
@ -81,7 +81,7 @@ async def test_get_org_users_auth_root(no_su_client: AsyncClient):
async def test_get_org_groups_auth_root(no_su_client: AsyncClient): async def test_get_org_groups_auth_root(no_su_client: AsyncClient):
resp = await no_su_client.get("/org/groups?org_id=2") resp = await no_su_client.get("/org/groups?org_id=2")
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert "Must be the org's root user" in resp.json()["detail"] assert "Must be the org's root user" in resp.json()["detail"]
@ -89,7 +89,7 @@ async def test_get_org_groups_auth_root(no_su_client: AsyncClient):
async def test_get_org_contact_auth_root(no_su_client: AsyncClient): async def test_get_org_contact_auth_root(no_su_client: AsyncClient):
resp = await no_su_client.get("/org/contact?org_id=2&contact_type=billing") resp = await no_su_client.get("/org/contact?org_id=2&contact_type=billing")
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert "Must be the org's root user" in resp.json()["detail"] assert "Must be the org's root user" in resp.json()["detail"]
@ -104,15 +104,15 @@ async def test_patch_org_contact_auth_root(no_su_client: AsyncClient):
}, },
) )
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert "Must be the org's root user" in resp.json()["detail"] assert "Must be the org's root user" in resp.json()["detail"]
@pytest.mark.anyio @pytest.mark.anyio
async def test_get_service_auth_root(no_su_client: AsyncClient): async def test_get_service_auth_root(no_su_client: AsyncClient):
resp = await no_su_client.get("/service/?org_id=2") resp = await no_su_client.get("/service?org_id=2")
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert "Must be the org's root user" in resp.json()["detail"] assert "Must be the org's root user" in resp.json()["detail"]
@ -120,7 +120,7 @@ async def test_get_service_auth_root(no_su_client: AsyncClient):
async def test_get_iam_group_permissions_auth_root(no_su_client: AsyncClient): async def test_get_iam_group_permissions_auth_root(no_su_client: AsyncClient):
resp = await no_su_client.get("/iam/group/permissions?org_id=2&group_id=1") resp = await no_su_client.get("/iam/group/permissions?org_id=2&group_id=1")
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert "Must be the org's root user" in resp.json()["detail"] assert "Must be the org's root user" in resp.json()["detail"]
@ -128,7 +128,7 @@ async def test_get_iam_group_permissions_auth_root(no_su_client: AsyncClient):
async def test_get_iam_group_users_auth_root(no_su_client: AsyncClient): async def test_get_iam_group_users_auth_root(no_su_client: AsyncClient):
resp = await no_su_client.get("/iam/group/users?org_id=2&group_id=1") resp = await no_su_client.get("/iam/group/users?org_id=2&group_id=1")
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert "Must be the org's root user" in resp.json()["detail"] assert "Must be the org's root user" in resp.json()["detail"]
@ -138,7 +138,7 @@ async def test_post_iam_group_auth_root(no_su_client: AsyncClient):
"/iam/group", json={"name": "New Group", "organisation_id": 2} "/iam/group", json={"name": "New Group", "organisation_id": 2}
) )
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert "Must be the org's root user" in resp.json()["detail"] assert "Must be the org's root user" in resp.json()["detail"]
@ -153,7 +153,7 @@ async def test_put_iam_group_permission_auth_root(
json={"permission_id": 1, "group_id": 2, "organisation_id": 2}, json={"permission_id": 1, "group_id": 2, "organisation_id": 2},
) )
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert "Must be the org's root user" in resp.json()["detail"] assert "Must be the org's root user" in resp.json()["detail"]
@ -173,7 +173,7 @@ async def test_put_iam_group_user_auth_root(no_su_client: AsyncClient, db_sessio
"/iam/group/user", json={"user_id": 2, "group_id": 1, "organisation_id": 2} "/iam/group/user", json={"user_id": 2, "group_id": 1, "organisation_id": 2}
) )
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert "Must be the org's root user" in resp.json()["detail"] assert "Must be the org's root user" in resp.json()["detail"]
@ -181,7 +181,7 @@ async def test_put_iam_group_user_auth_root(no_su_client: AsyncClient, db_sessio
async def test_get_iam_permissions_auth_root(no_su_client: AsyncClient): async def test_get_iam_permissions_auth_root(no_su_client: AsyncClient):
resp = await no_su_client.get("/iam/permissions?org_id=2") resp = await no_su_client.get("/iam/permissions?org_id=2")
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert "Must be the org's root user" in resp.json()["detail"] assert "Must be the org's root user" in resp.json()["detail"]
@ -191,5 +191,5 @@ async def test_post_iam_permissions_search_auth_root(no_su_client: AsyncClient):
"/iam/permissions/search", json={"organisation_id": 2, "action": "read"} "/iam/permissions/search", json={"organisation_id": 2, "action": "read"}
) )
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert "Must be the org's root user" in resp.json()["detail"] assert "Must be the org's root user" in resp.json()["detail"]

View file

@ -18,9 +18,9 @@ pytestmark = [
@pytest.mark.anyio @pytest.mark.anyio
async def test_get_user_auth_su(no_su_client: AsyncClient): async def test_get_user_auth_su(no_su_client: AsyncClient):
resp = await no_su_client.get("/user/?user_id=1") resp = await no_su_client.get("/user?user_id=1")
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert resp.json()["detail"] == "Must be super admin" assert resp.json()["detail"] == "Must be super admin"
@ -30,7 +30,7 @@ async def test_patch_org_status_auth_su(no_su_client: AsyncClient):
"/org/status", json={"organisation_id": 1, "status": "submitted"} "/org/status", json={"organisation_id": 1, "status": "submitted"}
) )
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert resp.json()["detail"] == "Must be super admin" assert resp.json()["detail"] == "Must be super admin"
@ -52,7 +52,7 @@ async def test_patch_org_root_user_auth_su(no_su_client: AsyncClient, db_session
"/org/root_user", json={"organisation_id": 1, "user_id": 2} "/org/root_user", json={"organisation_id": 1, "user_id": 2}
) )
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert resp.json()["detail"] == "Must be super admin" assert resp.json()["detail"] == "Must be super admin"
@ -60,15 +60,15 @@ async def test_patch_org_root_user_auth_su(no_su_client: AsyncClient, db_session
async def test_patch_service_key_auth_su(no_su_client: AsyncClient): async def test_patch_service_key_auth_su(no_su_client: AsyncClient):
resp = await no_su_client.patch("/service/key", json={"service_id": 1}) resp = await no_su_client.patch("/service/key", json={"service_id": 1})
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert resp.json()["detail"] == "Must be super admin" assert resp.json()["detail"] == "Must be super admin"
@pytest.mark.anyio @pytest.mark.anyio
async def test_post_service_auth_su(no_su_client: AsyncClient): async def test_post_service_auth_su(no_su_client: AsyncClient):
resp = await no_su_client.post("/service/", json={"name": "New Test Service"}) resp = await no_su_client.post("/service", json={"name": "New Test Service"})
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert resp.json()["detail"] == "Must be super admin" assert resp.json()["detail"] == "Must be super admin"
@ -79,7 +79,7 @@ async def test_post_perm_auth_su(no_su_client: AsyncClient, db_session):
json={"service_id": 1, "resource": "test_resource", "action": "create"}, json={"service_id": 1, "resource": "test_resource", "action": "create"},
) )
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert resp.json()["detail"] == "Must be super admin" assert resp.json()["detail"] == "Must be super admin"
@ -99,5 +99,5 @@ async def test_post_org_user_auth_su(no_su_client: AsyncClient, db_session):
"/org/user", json={"organisation_id": 1, "user_id": 2} "/org/user", json={"organisation_id": 1, "user_id": 2}
) )
assert resp.status_code != 422 assert resp.status_code != 422
assert resp.status_code == 401 assert resp.status_code == 403
assert "Must be super admin" in resp.json()["detail"] assert "Must be super admin" in resp.json()["detail"]

View file

@ -205,7 +205,7 @@ async def test_get_group_permissions_mismatch(
db_session.flush() db_session.flush()
resp = await default_client.get(f"/iam/group/permissions?{query}") resp = await default_client.get(f"/iam/group/permissions?{query}")
assert resp.status_code == 401 assert resp.status_code == 403
assert resp.json()["detail"] == "Group does not belong to this organization" assert resp.json()["detail"] == "Group does not belong to this organization"
@ -271,7 +271,7 @@ async def test_get_group_users_mismatch(
db_session.flush() db_session.flush()
resp = await default_client.get(f"/iam/group/users?{query}") resp = await default_client.get(f"/iam/group/users?{query}")
assert resp.status_code == 401 assert resp.status_code == 403
assert resp.json()["detail"] == "Group does not belong to this organization" assert resp.json()["detail"] == "Group does not belong to this organization"
@ -453,7 +453,7 @@ async def test_put_group_perm_mismatch(
db_session.flush() db_session.flush()
resp = await default_client.put("/iam/group/permission", json=body) resp = await default_client.put("/iam/group/permission", json=body)
assert resp.status_code == 401 assert resp.status_code == 403
assert resp.json()["detail"] == "Group does not belong to this organization" assert resp.json()["detail"] == "Group does not belong to this organization"
@ -785,7 +785,7 @@ async def test_post_perm_search_status_checks(
@pytest.mark.anyio @pytest.mark.anyio
async def test_delete_group_permissions_success(default_client: AsyncClient): async def test_delete_group_permissions_success(default_client: AsyncClient):
resp = await default_client.delete( resp = await default_client.delete(
"/iam/group/permissions?org_id=1&group_id=1&perm_id=1" "/iam/group/permission?org_id=1&group_id=1&perm_id=1"
) )
data = resp.json() data = resp.json()

View file

@ -15,7 +15,7 @@ pytestmark = [
@pytest.mark.anyio @pytest.mark.anyio
async def test_get_services_success(default_client: AsyncClient): async def test_get_services_success(default_client: AsyncClient):
resp = await default_client.get("/service/?org_id=1") resp = await default_client.get("/service?org_id=1")
data = resp.json() data = resp.json()
assert resp.status_code == 200 assert resp.status_code == 200
@ -32,14 +32,14 @@ async def test_get_services_success(default_client: AsyncClient):
async def test_get_services_status_checks( async def test_get_services_status_checks(
default_client: AsyncClient, query: str, expected_status: int default_client: AsyncClient, query: str, expected_status: int
): ):
resp = await default_client.get(f"/service/?{query}") resp = await default_client.get(f"/service?{query}")
assert resp.status_code == expected_status assert resp.status_code == expected_status
@pytest.mark.anyio @pytest.mark.anyio
async def test_post_service_success(default_client: AsyncClient): async def test_post_service_success(default_client: AsyncClient):
resp = await default_client.post("/service/", json={"name": "New Test Service"}) resp = await default_client.post("/service", json={"name": "New Test Service"})
data = resp.json() data = resp.json()
assert resp.status_code == 200 assert resp.status_code == 200
@ -62,7 +62,7 @@ async def test_post_service_success(default_client: AsyncClient):
async def test_post_service_status_checks( async def test_post_service_status_checks(
default_client: AsyncClient, body: dict[str, str], expected_status: int default_client: AsyncClient, body: dict[str, str], expected_status: int
): ):
resp = await default_client.post("/service/", json=body) resp = await default_client.post("/service", json=body)
assert resp.status_code == expected_status assert resp.status_code == expected_status
@ -100,6 +100,6 @@ async def test_patch_services_status_checks(
@pytest.mark.anyio @pytest.mark.anyio
async def test_delete_service_success(default_client: AsyncClient): async def test_delete_service_success(default_client: AsyncClient):
resp = await default_client.delete("/service/?service_id=1") resp = await default_client.delete("/service?service_id=1")
assert resp.status_code == 204 assert resp.status_code == 204

View file

@ -32,7 +32,7 @@ async def test_get_self_db_success(default_client: AsyncClient):
@pytest.mark.anyio @pytest.mark.anyio
async def test_get_user_success(default_client: AsyncClient): async def test_get_user_success(default_client: AsyncClient):
resp = await default_client.get("/user/?user_id=1") resp = await default_client.get("/user?user_id=1")
data = resp.json() data = resp.json()
assert resp.status_code == 200 assert resp.status_code == 200
@ -52,14 +52,14 @@ async def test_get_user_success(default_client: AsyncClient):
async def test_get_user_status_checks( async def test_get_user_status_checks(
default_client: AsyncClient, query: str, expected_status: int default_client: AsyncClient, query: str, expected_status: int
): ):
resp = await default_client.get(f"/user/?{query}") resp = await default_client.get(f"/user?{query}")
assert resp.status_code == expected_status assert resp.status_code == expected_status
@pytest.mark.anyio @pytest.mark.anyio
async def test_delete_user_success(default_client: AsyncClient): async def test_delete_user_success(default_client: AsyncClient):
resp = await default_client.delete("/user/?user_id=1") resp = await default_client.delete("/user?user_id=1")
assert resp.status_code == 204 assert resp.status_code == 204