Compare commits

...

6 commits

Author SHA1 Message Date
bcdef91dd0 feat: user invite response models
All checks were successful
ci / lint_and_test (push) Successful in 13s
2026-06-11 14:14:31 +01:00
8925280f96 feat: questions union
Allows responses to include questionnaire questions of multiple versions
2026-06-11 13:38:14 +01:00
38c26cca7b docs: iam module meta doc 2026-06-11 13:07:48 +01:00
0a7f9092c7 feat: questionnaire shape update 2026-06-11 12:24:36 +01:00
c268097306 feat: helper for generating module docstrings 2026-06-11 09:57:34 +01:00
dd0478d5e7 docs: iam router
Issue: #13
2026-06-11 09:36:51 +01:00
12 changed files with 289 additions and 62 deletions

View file

@ -2,18 +2,20 @@
Router endpoints for IAM Router endpoints for IAM
Endpoints: Endpoints:
- [POST](/iam/can_act_on_resource): [API key & user claim]: Service access point to verify user permissions - [POST](/api/v1/iam/can_act_on_resource): [API Key & User JWT]: Used for services to check user access permission
- [GET](/iam/group/permissions): [root user]: Gets list of perms(service, resource, action) the given group(id) has - [GET](/api/v1/iam/group/permissions): [Root User]: Gets a list of permissions granted to a group
- [DELETE](/iam/group/permissions): [root user]: Removes a given perm(id) from the given group(id) - [GET](/api/v1/iam/group/users): [Root User]: Gets a list of users assigned to a group
- [GET](/iam/group/users): [root user]: Gets a list of users(id, name, email) that are assigned to the given group(id) - [POST](/api/v1/iam/group): [Root User]: Creates a new group
- [POST](/iam/group): [root user]: Creates a new group for the given org(id) - [PUT](/api/v1/iam/group/permission): [Root User]: Grants a permission to a group
- [PUT](/iam/group/permission): [root user]: Assigns a perm(id) to the given group(id) - [PUT](/api/v1/iam/group/user): [Root User]: Directly adds a user to the group
- [PUT](/iam/group/user): [root user]: Assigns a user(id) to a group(id) - [DELETE](/api/v1/iam/group/permissions): [Root User]: Removes a permission from the group
- [DELETE](/iam/group/user): [root user]: Removes a user(id) from the given group(id) - [DELETE](/api/v1/iam/group/user): [Root User]: Removes a user from the group
- [GET](/iam/permissions): [root user]: Gets a list of all permissions - [GET](/api/v1/iam/permissions): [Root User]: Returns a full list of permissions
- [POST](/iam/permission): [super admin]: Creates a new permission - [POST](/api/v1/iam/permission): [Super Admin]: Creates a new permission for a service
- [DELETE](/iam/permission): [super admin]: Removes a permission - [DELETE](/api/v1/iam/permission): [Super Admin]: Deletes a permission
- [GET](/iam/permissions/search): [root user]: Returns a list of permissions matching a filter(service|resource|action) - [POST](/api/v1/iam/permissions/search): [Root User]: Search list of permissions
- [PUT](/api/v1/iam/group/user/invitation): [Root User]: Send an email invitation for non-org member to join a group
- [PUT](/api/v1/iam/group/user//invitation/accept): [User Claim & Invite JWT]: Accept email invitation to join an org's group
""" """
from fastapi import APIRouter, status, BackgroundTasks from fastapi import APIRouter, status, BackgroundTasks
@ -281,7 +283,21 @@ async def add_group_permission(
return response return response
@router.put("/group/user", response_model=IAMPutGroupUserResponse) @router.put(
path="/group/user",
summary="Directly adds a user to the group",
status_code=status.HTTP_200_OK,
response_model=IAMPutGroupUserResponse,
responses={
status.HTTP_401_UNAUTHORIZED: {
"description": "Group not in org | User not authenticated | User does not have permission"
},
status.HTTP_409_CONFLICT: {"description": "User is already in group"},
status.HTTP_403_FORBIDDEN: {
"description": "Only existing org members can be added directly."
},
},
)
async def add_group_user( async def add_group_user(
db: db_dependency, db: db_dependency,
group_model: group_model_body_dependency, group_model: group_model_body_dependency,
@ -289,6 +305,11 @@ async def add_group_user(
org_model: org_model_root_claim_body_dependency, org_model: org_model_root_claim_body_dependency,
request_model: IAMPutGroupUserRequest, request_model: IAMPutGroupUserRequest,
): ):
"""
Directly adds an organisation member to a group.\n
To add a non-member, use an email invitation instead.\n
The user's email address must match the email on their OIDC profile.
"""
if group_model.org_id != org_model.id: if group_model.org_id != org_model.id:
raise UnauthorizedException("Group does not belong to this organization") raise UnauthorizedException("Group does not belong to this organization")
@ -309,13 +330,26 @@ async def add_group_user(
return response return response
@router.delete("/group/permissions") @router.delete(
path="/group/permissions",
summary="Removes a permission from the group",
status_code=status.HTTP_200_OK,
response_model=IAMDeleteGroupPermissionResponse,
responses={
status.HTTP_401_UNAUTHORIZED: {
"description": "Group not in org | User not authenticated | User does not have permission"
},
},
)
async def remove_group_permissions( async def remove_group_permissions(
db: db_dependency, db: db_dependency,
group_model: group_model_query_dependency, group_model: group_model_query_dependency,
perm_model: perm_model_query_dependency, perm_model: perm_model_query_dependency,
org_model: org_model_root_claim_query_dependency, org_model: org_model_root_claim_query_dependency,
): ):
"""
Removes a permission from the group.
"""
if group_model.org_id != org_model.id: if group_model.org_id != org_model.id:
raise UnauthorizedException("Group does not belong to this organization") raise UnauthorizedException("Group does not belong to this organization")
@ -329,13 +363,26 @@ async def remove_group_permissions(
return response return response
@router.delete("/group/user") @router.delete(
path="/group/user",
summary="Removes a user from the group",
status_code=status.HTTP_200_OK,
response_model=IAMDeleteGroupUserResponse,
responses={
status.HTTP_401_UNAUTHORIZED: {
"description": "Group not in org | User not authenticated | User does not have permission"
},
},
)
async def remove_group_user( async def remove_group_user(
db: db_dependency, db: db_dependency,
group_model: group_model_query_dependency, group_model: group_model_query_dependency,
user_model: user_model_query_dependency, user_model: user_model_query_dependency,
org_model: org_model_root_claim_query_dependency, org_model: org_model_root_claim_query_dependency,
): ):
"""
Removes a user from the group.
"""
if group_model.org_id != org_model.id: if group_model.org_id != org_model.id:
raise UnauthorizedException("Group does not belong to this organization") raise UnauthorizedException("Group does not belong to this organization")
@ -349,21 +396,47 @@ async def remove_group_user(
return response return response
@router.get("/permissions", response_model=IAMGetPermissionsResponse) @router.get(
path="/permissions",
summary="Returns a full list of permissions",
status_code=status.HTTP_200_OK,
response_model=IAMGetPermissionsResponse,
responses={
status.HTTP_401_UNAUTHORIZED: {
"description": "User must be root user of an organisation."
},
},
)
async def get_permissions( async def get_permissions(
db: db_dependency, org_model: org_model_root_claim_query_dependency db: db_dependency, org_model: org_model_root_claim_query_dependency
): ):
"""
Returns a full list of permissions.
"""
permission_models = db.query(Perm).all() permission_models = db.query(Perm).all()
return {"permissions": permission_models} return {"permissions": permission_models}
@router.post("/permission", response_model=IAMPostPermissionResponse) @router.post(
path="/permission",
summary="Creates a new permission for a service",
status_code=status.HTTP_201_CREATED,
response_model=IAMPostPermissionResponse,
responses={
status.HTTP_401_UNAUTHORIZED: {"description": "Must be super user."},
status.HTTP_404_NOT_FOUND: {"description": "Service does not exist"},
status.HTTP_409_CONFLICT: {"description": "Permission already exists"},
},
)
async def create_new_permission( async def create_new_permission(
db: db_dependency, db: db_dependency,
su: super_admin_dependency, su: super_admin_dependency,
request_model: IAMPostPermissionRequest, request_model: IAMPostPermissionRequest,
): ):
"""
Allows a super admin to create a new IAM permission for a service.
"""
service_model = db.get(Service, request_model.service_id) service_model = db.get(Service, request_model.service_id)
if service_model is None: if service_model is None:
raise ServiceNotFoundException(service_id=request_model.service_id) raise ServiceNotFoundException(service_id=request_model.service_id)
@ -387,22 +460,40 @@ async def create_new_permission(
return {"permission": response} return {"permission": response}
@router.delete("/permission", status_code=status.HTTP_204_NO_CONTENT) @router.delete(
path="/permission",
summary="Deletes a permission",
status_code=status.HTTP_204_NO_CONTENT,
responses={},
)
async def delete_permission( async def delete_permission(
db: db_dependency, db: db_dependency,
su: super_admin_dependency, su: super_admin_dependency,
perm_model: perm_model_query_dependency, perm_model: perm_model_query_dependency,
): ):
"""
Allows a super admin to remove a permission.
"""
db.delete(perm_model) db.delete(perm_model)
db.commit() db.commit()
@router.post("/permissions/search", response_model=IAMGetPermissionsSearchResponse) @router.post(
path="/permissions/search",
summary="Search list of permissions",
status_code=status.HTTP_200_OK,
response_model=IAMGetPermissionsSearchResponse,
responses={},
)
async def post_permissions( async def post_permissions(
db: db_dependency, db: db_dependency,
org_model: org_model_root_claim_body_dependency, org_model: org_model_root_claim_body_dependency,
request_model: IAMGetPermissionsSearchRequest, request_model: IAMGetPermissionsSearchRequest,
): ):
"""
Returns a list of permissions filtered by the queries provided.\n
If a query is null, it will be ignored.
"""
permission_query = db.query(Perm) permission_query = db.query(Perm)
if request_model.service_id is not None: if request_model.service_id is not None:
@ -424,9 +515,10 @@ async def post_permissions(
@router.put( @router.put(
"/group/user/invitation", path="/group/user/invitation",
summary="Send an email invitation for non-org member to join a group", summary="Send an email invitation for non-org member to join a group",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
responses={},
) )
async def invitation( async def invitation(
background_tasks: BackgroundTasks, background_tasks: BackgroundTasks,
@ -434,11 +526,17 @@ async def invitation(
group_model: group_model_body_dependency, group_model: group_model_body_dependency,
request_model: IAMPutGroupInvitationRequest, request_model: IAMPutGroupInvitationRequest,
): ):
org_id = org_model.id """
org_name = org_model.name Sends an email invitation to join a group.\n
This is intended for inviting no-members to a group, giving them permission to access org resources.\n
i.e. Allowing somebody in a partner organisation to view metrics.\n
Can also be used for inviting organisaion members if needed.
"""
org_id: int = org_model.id
org_name: str = org_model.name
user_email = request_model.user_email user_email = request_model.user_email
group_id = group_model.id group_id: int = group_model.id
group_name = group_model.name group_name: str = group_model.name
background_tasks.add_task( background_tasks.add_task(
send_user_group_invitation, send_user_group_invitation,
@ -453,32 +551,42 @@ async def invitation(
@router.put( @router.put(
"/group/user//invitation/accept", path="/group/user//invitation/accept",
summary="Accept email invitation to join an org's group", summary="Accept email invitation to join an org's group",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
responses={
status.HTTP_404_NOT_FOUND: {"description": "User|Org|Group not found"},
status.HTTP_403_FORBIDDEN: {
"description": "Group and organisation do not match"
},
status.HTTP_409_CONFLICT: {"description": "User is already in the group"},
},
) )
async def accept_invitation( async def accept_invitation(
db: db_dependency, db: db_dependency,
user_model: user_model_claims_dependency, user_model: user_model_claims_dependency,
request_model: IAMPutGroupInvitationAcceptRequest, request_model: IAMPutGroupInvitationAcceptRequest,
): ):
"""
Accepts an invitation to join an org's group
"""
email_claims = await verify_email_token( email_claims = await verify_email_token(
token=request_model.jwt, user_model=user_model token=request_model.jwt, user_model=user_model
) )
org_model = db.get(Org, email_claims["org_id"]) org_model = db.get(Org, email_claims["org_id"])
if org_model is None: if org_model is None:
raise OrgNotFoundException() raise OrgNotFoundException(email_claims["org_id"])
group_model = db.get(Group, email_claims["group_id"]) group_model = db.get(Group, email_claims["group_id"])
if group_model is None: if group_model is None:
raise GroupNotFoundException() raise GroupNotFoundException(email_claims["group_id"])
if group_model not in org_model.group_rel: if group_model not in org_model.group_rel:
raise UnauthorizedException("Group and org do not match.") raise ForbiddenException("Group and org do not match.")
if user_model in group_model.user_rel: if user_model in group_model.user_rel:
raise ConflictException(message="User already in group.") raise ConflictException("User already in group.")
group_model.user_rel.append(user_model) group_model.user_rel.append(user_model)
db.commit() db.commit()

View file

@ -34,13 +34,17 @@ tags_metadata = [
"name": "User", "name": "User",
"description": "User related operations, includes getting information about the current user", "description": "User related operations, includes getting information about the current user",
}, },
{
"name": "Organisation",
"description": "Organisation related operations, includes getting lists of users etc associated with orgs",
},
{ {
"name": "Service", "name": "Service",
"description": "Services related operations, includes registering services and reissuing API keys", "description": "Services related operations, includes registering services and reissuing API keys",
}, },
{ {
"name": "Organisation", "name": "IAM",
"description": "Organisation related operations, includes getting lists of users etc associated with orgs", "description": "Operations related to the role based identity and access management system. This includes management of groups, permissions, and related users.",
}, },
] ]

View file

@ -16,6 +16,7 @@ Endpoints:
- [PATCH](/org/contact): [root user]: Updates the (contact_type) contact for an org(id). Any number of details can be changed. - [PATCH](/org/contact): [root user]: Updates the (contact_type) contact for an org(id). Any number of details can be changed.
""" """
from datetime import datetime, timezone
from typing import Annotated from typing import Annotated
from fastapi import APIRouter, status from fastapi import APIRouter, status
@ -29,6 +30,7 @@ from src.contact.models import Contact
from src.contact.schemas import ContactAddress from src.contact.schemas import ContactAddress
from src.contact.exceptions import ContactNotFoundException from src.contact.exceptions import ContactNotFoundException
from src.database import db_dependency from src.database import db_dependency
from src.organisation.schemas_questionnaires import QuestionnaireQuestionsVersion0
from src.user.dependencies import ( from src.user.dependencies import (
user_model_body_dependency, user_model_body_dependency,
user_model_claims_dependency, user_model_claims_dependency,
@ -64,6 +66,7 @@ from src.organisation.schemas import (
OrgPatchRootResponse, OrgPatchRootResponse,
Questionnaire, Questionnaire,
OrgPatchContactResponse, OrgPatchContactResponse,
QuestionnaireMetadata,
) )
router = APIRouter( router = APIRouter(
@ -88,7 +91,9 @@ router = APIRouter(
}, },
}, },
) )
async def get_org_by_id(org_model: org_model_root_claim_query_dependency): async def get_org_by_id(
db: db_dependency, org_model: org_model_root_claim_query_dependency
):
""" """
Returns organisation details including key member email addresses Returns organisation details including key member email addresses
""" """
@ -143,10 +148,21 @@ async def create_org(
ALl organisations are given the "partial" status on creation. See update_questionnaire() for more details. ALl organisations are given the "partial" status on creation. See update_questionnaire() for more details.
""" """
if request_model.intake_questionnaire: if request_model.intake_questionnaire:
intake_questionnaire = request_model.intake_questionnaire.model_dump() questionnaire_questions = request_model.intake_questionnaire.model_dump()
else: else:
intake_questionnaire = None questionnaire_questions = QuestionnaireQuestionsVersion0().model_dump()
org_model = Org(name=request_model.name, intake_questionnaire=intake_questionnaire)
questionnaire_metadata = QuestionnaireMetadata(version=0, submission_date=None)
intake_questionnaire = Questionnaire(
metadata=questionnaire_metadata,
questions=questionnaire_questions,
)
org_model = Org(
name=request_model.name,
intake_questionnaire=intake_questionnaire.model_dump(mode="json"),
)
org_model.status = "partial" org_model.status = "partial"
@ -204,18 +220,27 @@ async def update_questionnaire(
final "are you sure" check before setting the org to be in "submitted" status, awaiting admin approval. final "are you sure" check before setting the org to be in "submitted" status, awaiting admin approval.
""" """
update_data = request_model.intake_questionnaire.model_dump(exclude_none=True) update_data = request_model.intake_questionnaire.model_dump(exclude_none=True)
questionnaire_model = Questionnaire(**org_model.intake_questionnaire) questionnaire = org_model.intake_questionnaire
questions_model = QuestionnaireQuestionsVersion0(**questionnaire["questions"])
for key, value in update_data.items(): for key, value in update_data.items():
if hasattr(questionnaire_model, key): if hasattr(questions_model, key):
setattr(questionnaire_model, key, value) setattr(questions_model, key, value)
else: else:
raise UnprocessableContentException("Invalid keys in update request") raise UnprocessableContentException("Invalid keys in update request")
metadata = QuestionnaireMetadata(version=questionnaire["metadata"]["version"])
# Allows for partially completed questionnaires to be saved without being submitted for review # Allows for partially completed questionnaires to be saved without being submitted for review
if not request_model.partial: if not request_model.partial:
org_model.status = "submitted" org_model.status = "submitted"
metadata.submission_date = datetime.now(timezone.utc)
org_model.intake_questionnaire = questionnaire_model.model_dump() questionnaire_model = Questionnaire(
metadata=metadata,
questions=questions_model,
)
org_model.intake_questionnaire = questionnaire_model.model_dump(mode="json")
db.flush() db.flush()
response = OrgPatchQuestionnaireResponse(**org_model.__dict__) response = OrgPatchQuestionnaireResponse(**org_model.__dict__)
db.commit() db.commit()

View file

@ -7,6 +7,7 @@ Models follow the nomenclature of:
""" """
from typing import Optional from typing import Optional
from datetime import datetime
from pydantic import EmailStr, ConfigDict from pydantic import EmailStr, ConfigDict
@ -21,12 +22,20 @@ from src.schemas import (
from src.contact.schemas import ContactModel from src.contact.schemas import ContactModel
from src.organisation.constants import Status, ContactType from src.organisation.constants import Status, ContactType
from src.organisation.schemas_questionnaires import (
QuestionnaireQuestionsVersion0 as CurrentQuestions,
questionnaire_union,
)
class QuestionnaireMetadata(CustomBaseModel):
version: int
submission_date: Optional[datetime] = None
class Questionnaire(CustomBaseModel): class Questionnaire(CustomBaseModel):
question_one: Optional[str] = None metadata: QuestionnaireMetadata
question_two: Optional[str] = None questions: questionnaire_union
question_three: Optional[str] = None
class ContactSummary(CustomBaseModel): class ContactSummary(CustomBaseModel):
@ -47,7 +56,7 @@ class OrgSchema(OrgIDMixin):
class OrgPostOrgRequest(CustomBaseModel): class OrgPostOrgRequest(CustomBaseModel):
name: str name: str
intake_questionnaire: Optional[Questionnaire] = None intake_questionnaire: Optional[CurrentQuestions] = None
class OrgPostOrgResponse(CustomBaseModel): class OrgPostOrgResponse(CustomBaseModel):
@ -57,7 +66,7 @@ class OrgPostOrgResponse(CustomBaseModel):
class OrgPatchQuestionnaireRequest(OrgIDMixin): class OrgPatchQuestionnaireRequest(OrgIDMixin):
intake_questionnaire: Questionnaire intake_questionnaire: CurrentQuestions
partial: bool partial: bool

View file

@ -0,0 +1,16 @@
from typing import Optional
from src.schemas import CustomBaseModel
class QuestionnaireQuestions(CustomBaseModel):
pass
class QuestionnaireQuestionsVersion0(QuestionnaireQuestions):
question_one: Optional[str] = None
question_two: Optional[str] = None
question_three: Optional[str] = None
questionnaire_union = QuestionnaireQuestionsVersion0 # | QuestionnaireQuestionsVersion1

View file

@ -17,6 +17,8 @@ from src.user.schemas import (
UserPostInvitationRequest, UserPostInvitationRequest,
UserPostInvitationAcceptRequest, UserPostInvitationAcceptRequest,
UserGetSelfOrgsResponse, UserGetSelfOrgsResponse,
UserPostInvitationResponse,
UserPostInvitationAcceptResponse,
) )
from src.user.dependencies import ( from src.user.dependencies import (
user_model_claims_dependency, user_model_claims_dependency,
@ -153,6 +155,7 @@ async def get_user_orgs(user_model: user_model_claims_dependency):
"/invitation", "/invitation",
summary="Send an email invitation for a user to join an org", summary="Send an email invitation for a user to join an org",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
response_model=UserPostInvitationResponse,
) )
async def invitation( async def invitation(
background_tasks: BackgroundTasks, background_tasks: BackgroundTasks,
@ -167,13 +170,19 @@ async def invitation(
send_invitation, org_id=org_id, org_name=org_name, user_email=user_email send_invitation, org_id=org_id, org_name=org_name, user_email=user_email
) )
return "Invitation sent" response = {
"organisation": org_model,
"invited_email": user_email,
}
return response
@router.post( @router.post(
"/invitation/accept", "/invitation/accept",
summary="Accept email invitation to join an org", summary="Accept email invitation to join an org",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
response_model=UserPostInvitationAcceptResponse,
) )
async def accept_invitation( async def accept_invitation(
db: db_dependency, db: db_dependency,
@ -189,6 +198,13 @@ async def accept_invitation(
raise OrgNotFoundException() raise OrgNotFoundException()
org_model.user_rel.append(user_model) org_model.user_rel.append(user_model)
db.flush()
response = {
"organisation": org_model,
"user": user_model,
}
db.commit() db.commit()
return "Invitation accepted" return response

View file

@ -6,7 +6,7 @@ from typing import Optional
from pydantic import EmailStr from pydantic import EmailStr
from src.organisation.schemas import OrgSchema from src.organisation.schemas import OrgSchema
from src.schemas import CustomBaseModel, OrgIDMixin from src.schemas import CustomBaseModel, OrgIDMixin, OrgSummary, UserSummary
class OIDCClaims(CustomBaseModel): class OIDCClaims(CustomBaseModel):
@ -60,3 +60,13 @@ class UserPostInvitationAcceptRequest(CustomBaseModel):
class UserGetSelfOrgsResponse(CustomBaseModel): class UserGetSelfOrgsResponse(CustomBaseModel):
organisations: list[OrgSchema] organisations: list[OrgSchema]
class UserPostInvitationResponse(CustomBaseModel):
organisation: OrgSummary
invited_email: EmailStr
class UserPostInvitationAcceptResponse(CustomBaseModel):
organisation: OrgSummary
user: UserSummary

View file

@ -103,7 +103,10 @@ def _seed(db):
owner_contact_id=2, owner_contact_id=2,
security_contact_id=3, security_contact_id=3,
status="approved", status="approved",
intake_questionnaire={"question_two": "answer two"}, intake_questionnaire={
"metadata": {"version": 0, "submission_date": None},
"questions": {"question_two": "answer two"},
},
) )
) )
db.add(Service(name="Test Service", api_key="123456789")) db.add(Service(name="Test Service", api_key="123456789"))
@ -167,11 +170,25 @@ def get_testable_routes():
if method in {"HEAD", "OPTIONS"}: if method in {"HEAD", "OPTIONS"}:
continue continue
routes.append((method, route.path, route.status_code, route.response_model)) routes.append(
(
method,
route.path,
route.status_code,
route.response_model,
route.summary,
)
)
return routes return routes
# with open("endpoints.txt", "w") as f: # with open("endpoints.txt", "w") as f:
# for ep in get_testable_routes(): # for ep in get_testable_routes():
# f.write(f"[{ep[0]}]{ep[1]}({ep[2]}) -> {ep[2]}: {ep[3]}\n") # f.write(f"[{ep[0]}]({ep[1]}) -> {ep[2]}: {ep[3]}\n")
#
#
### Docstring formatted output ###
# with open("endpoints.txt", "w") as f:
# for ep in get_testable_routes():
# f.write(f"- [{ep[0]}]({ep[1]}): []: {ep[4]}\n")

View file

@ -32,7 +32,10 @@ async def test_get_org_auth_root_su(default_client: AsyncClient, db_session):
owner_contact_id=2, owner_contact_id=2,
security_contact_id=3, security_contact_id=3,
status="approved", status="approved",
intake_questionnaire={}, intake_questionnaire={
"metadata": {"version": 0, "submission_date": None},
"questions": {},
},
) )
) )
db_session.flush() db_session.flush()

View file

@ -587,7 +587,7 @@ async def test_post_perm_success(default_client: AsyncClient, db_session):
"/iam/permission", "/iam/permission",
json={"service_id": 1, "resource": "test_resource", "action": "create"}, json={"service_id": 1, "resource": "test_resource", "action": "create"},
) )
assert resp.status_code == 200 assert resp.status_code == 201
data = resp.json() data = resp.json()

View file

@ -103,9 +103,13 @@ async def test_patch_org_questionnaire_partial_success(
assert data["status"] == "partial" assert data["status"] == "partial"
assert "intake_questionnaire" in data assert "intake_questionnaire" in data
assert isinstance(data["intake_questionnaire"], dict) assert isinstance(data["intake_questionnaire"], dict)
assert data["intake_questionnaire"]["question_one"] == "new answer one" metadata = data["intake_questionnaire"]["metadata"]
assert data["intake_questionnaire"]["question_two"] == "answer two" assert metadata["version"] == 0
assert data["intake_questionnaire"]["question_three"] is None assert metadata["submission_date"] is None
questions = data["intake_questionnaire"]["questions"]
assert questions["question_one"] == "new answer one"
assert questions["question_two"] == "answer two"
assert questions["question_three"] is None
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -172,9 +176,13 @@ async def test_patch_org_questionnaire_submit_success(
assert data["status"] == "submitted" assert data["status"] == "submitted"
assert "intake_questionnaire" in data assert "intake_questionnaire" in data
assert isinstance(data["intake_questionnaire"], dict) assert isinstance(data["intake_questionnaire"], dict)
assert data["intake_questionnaire"]["question_one"] == "new answer one" metadata = data["intake_questionnaire"]["metadata"]
assert data["intake_questionnaire"]["question_two"] == "answer two" assert metadata["version"] == 0
assert data["intake_questionnaire"]["question_three"] is None assert metadata["submission_date"] is not None
questions = data["intake_questionnaire"]["questions"]
assert questions["question_one"] == "new answer one"
assert questions["question_two"] == "answer two"
assert questions["question_three"] is None
@pytest.mark.parametrize( @pytest.mark.parametrize(

View file

@ -70,7 +70,15 @@ async def test_post_user_invitation_success(default_client: AsyncClient):
resp = await default_client.post("/user/invitation", json=body) resp = await default_client.post("/user/invitation", json=body)
assert resp.status_code == 200 assert resp.status_code == 200
assert resp.json() == "Invitation sent" data = resp.json()
assert "organisation" in data
assert isinstance(data["organisation"], dict)
assert data["organisation"]["id"] == 1
assert data["organisation"]["name"] == "Test Org"
assert "invited_email" in data
assert isinstance(data["invited_email"], str)
assert data["invited_email"] == "admin@test.com"
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -161,9 +169,12 @@ async def test_get_self_orgs_dynamic(default_client: AsyncClient):
"security_contact": {"email": "security@test.org", "id": 3}, "security_contact": {"email": "security@test.org", "id": 3},
"billing_contact": {"email": "billing@test.org", "id": 1}, "billing_contact": {"email": "billing@test.org", "id": 1},
"intake_questionnaire": { "intake_questionnaire": {
"question_one": None, "questions": {
"question_three": None, "question_one": None,
"question_two": "answer two", "question_three": None,
"question_two": "answer two",
},
"metadata": {"version": 0, "submission_date": None},
}, },
} }
] ]