cloud-api/src/organisation/router.py
luxferre 3e4f68dd9b fix: unique violations
Directly using Psycopg error instead of the error code.

Also, raise all other IntegrityErrors instead of silently dropping them.
2026-06-15 14:38:14 +01:00

665 lines
19 KiB
Python

"""
Router endpoints for organisation module
Endpoints:
- [GET](/org/id): [root user]: Get details about an organisation(id)
- [POST](/org/): [oidc claim]: Creates an organisation, adds the current user as a user and sets them to be the root user
- [PATCH](/org/questionnaire): [root user]: Updates the org's intake questionnaire and optionally be submitted for review
- [PATCH](/org/status): [super admin]: Allows a super admin to update an org(id) status(Status enum)
- [GET](/org/users): [root user]: Gets a list of the org(id) users(email)
- [POST](/org/users): [root user]: Adds a new user(id) to the org(id)
- [DELETE](/org/): [super admin]: Deletes an organisation(id)
- [PATCH](/org/root_user): [super admin]: Updates an org(id) root user(id)
- [GET](/org/groups): [root user]: Gets a list of the org(id) groups(name)
- [DELETE](/org/user): [root user]: Removes a user(id) from an org(id)
- [GET](/org/contact): [root user]: Gets the (contact_type) contact for an org(id)
- [PATCH](/org/contact): [root user]: Updates the (contact_type) contact for an org(id). Any number of details can be changed.
"""
from datetime import datetime, timezone
from typing import Annotated
from sqlalchemy.exc import IntegrityError
from psycopg.errors import UniqueViolation
from fastapi import APIRouter, status
from fastapi.params import Query
from src.contact.schemas import ContactModel
from src.exceptions import (
UnprocessableContentException,
ConflictException,
ForbiddenException,
)
from src.contact.models import Contact
from src.contact.schemas import ContactAddress
from src.contact.exceptions import ContactNotFoundException
from src.database import db_dependency
from src.iam.service import assign_default_user_group, assign_default_root_group
from src.organisation.schemas_questionnaires import QuestionnaireQuestionsVersion0
from src.user.dependencies import (
user_model_body_dependency,
user_model_claims_dependency,
user_model_query_dependency,
)
from src.auth.dependencies import (
super_admin_dependency,
org_model_root_claim_query_dependency,
org_model_root_claim_body_dependency,
)
from src.organisation.dependencies import (
org_model_body_dependency,
org_model_query_dependency,
)
from src.organisation.constants import ContactType, Status as StatusEnum
from src.organisation.models import Organisation as Org
from src.organisation.schemas import (
OrgPostOrgRequest,
OrgPatchQuestionnaireRequest,
OrgPatchStatusRequest,
OrgPatchContactRequest,
OrgPostUserRequest,
OrgGetUserResponse,
OrgGetContactResponse,
OrgGetOrgResponse,
OrgPatchRootRequest,
OrgGetGroupResponse,
OrgPostOrgResponse,
OrgPatchQuestionnaireResponse,
OrgPatchStatusResponse,
OrgPostUserResponse,
OrgPatchRootResponse,
Questionnaire,
OrgPatchContactResponse,
QuestionnaireMetadata,
)
router = APIRouter(
prefix="/org",
tags=["Organisation"],
)
@router.get(
"",
summary="Get org details by ID.",
response_model=OrgGetOrgResponse,
status_code=status.HTTP_200_OK,
responses={
status.HTTP_200_OK: {"description": "Successful retrieval from database"},
status.HTTP_404_NOT_FOUND: {"description": "Organisation not found"},
status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Missing or invalid org_id query parameter"
},
status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be org root user."
},
},
)
async def get_org_by_id(
db: db_dependency, org_model: org_model_root_claim_query_dependency
):
"""
Returns organisation details including key member email addresses
"""
response = {
"organisation_id": org_model.id,
"name": org_model.name,
"status": org_model.status,
"intake_questionnaire": org_model.intake_questionnaire,
"root_user_email": org_model.root_user_email,
"billing_contact": {
"id": org_model.billing_contact_id,
"email": org_model.billing_contact_rel.email,
},
"owner_contact": {
"id": org_model.owner_contact_id,
"email": org_model.owner_contact_rel.email,
},
"security_contact": {
"id": org_model.security_contact_id,
"email": org_model.security_contact_rel.email,
},
}
return {"organisations": [response]}
@router.post(
"",
summary="Create new organisation.",
status_code=status.HTTP_201_CREATED,
response_model=OrgPostOrgResponse,
responses={
status.HTTP_201_CREATED: {"description": "Successfully created organisation."},
status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Invalid data in request."
},
status.HTTP_401_UNAUTHORIZED: {
"description": "User must be logged in with OIDC to create organisation."
},
status.HTTP_409_CONFLICT: {
"description": "Organisation with this name already exists."
},
},
)
async def create_org(
db: db_dependency,
user_model: user_model_claims_dependency,
request_model: OrgPostOrgRequest,
):
"""
Creates a new organisation with optional questionnaire (to be completed or submitted).
ALl organisations are given the "partial" status on creation. See update_questionnaire() for more details.
"""
if request_model.intake_questionnaire:
questionnaire_questions = request_model.intake_questionnaire.model_dump()
else:
questionnaire_questions = QuestionnaireQuestionsVersion0().model_dump()
questionnaire_metadata = QuestionnaireMetadata(version=0, submission_date=None)
intake_questionnaire = Questionnaire(
metadata=questionnaire_metadata,
questions=questionnaire_questions,
)
org_model = Org(
name=request_model.name,
intake_questionnaire=intake_questionnaire.model_dump(mode="json"),
)
org_model.status = "partial"
db.add(org_model)
try:
db.flush()
except IntegrityError as e:
if (
isinstance(e.orig, UniqueViolation) # Postgres unique violation
or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation
):
raise ConflictException(
message="Organisation with this name already exists"
)
raise
# Adds currently logged-in user to org users list and sets them as root_user
org_model.user_rel.append(user_model)
org_model.root_user_rel = user_model
# Creates default user and default root IAM groups and assigns them
await assign_default_user_group(db, org_model, user_model)
await assign_default_root_group(db, org_model, user_model)
for contact_type in [
"billing_contact_id",
"security_contact_id",
"owner_contact_id",
]:
contact_model = Contact(org_id=org_model.id)
db.add(contact_model)
db.flush()
org_model.__setattr__(contact_type, contact_model.id)
response = OrgPostOrgResponse(**org_model.__dict__)
db.commit()
return response
@router.patch(
"/questionnaire",
summary="Update questionnaire.",
status_code=status.HTTP_200_OK,
response_model=OrgPatchQuestionnaireResponse,
responses={
status.HTTP_200_OK: {"description": "Successfully updated questionnaire."},
status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Invalid data in request."
},
status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be org root user."
},
},
)
async def update_questionnaire(
db: db_dependency,
org_model: org_model_root_claim_body_dependency,
request_model: OrgPatchQuestionnaireRequest,
):
"""
Route for updating questionnaire.
The partial bool allows for submission of partially completed questionnaire and/or
final "are you sure" check before setting the org to be in "submitted" status, awaiting admin approval.
"""
org_status = StatusEnum(org_model.status)
if not org_status.is_pre_submission:
raise ForbiddenException(
"Questionnaire may only be modified prior to submission."
)
update_data = request_model.intake_questionnaire.model_dump(exclude_none=True)
questionnaire = org_model.intake_questionnaire
questions_model = QuestionnaireQuestionsVersion0(**questionnaire["questions"])
for key, value in update_data.items():
if hasattr(questions_model, key):
setattr(questions_model, key, value)
else:
raise UnprocessableContentException("Invalid keys in update request")
metadata = QuestionnaireMetadata(version=questionnaire["metadata"]["version"])
# Allows for partially completed questionnaires to be saved without being submitted for review
if not request_model.partial:
org_model.status = "submitted"
metadata.submission_date = datetime.now(timezone.utc)
questionnaire_model = Questionnaire(
metadata=metadata,
questions=questions_model,
)
org_model.intake_questionnaire = questionnaire_model.model_dump(mode="json")
db.flush()
response = OrgPatchQuestionnaireResponse(**org_model.__dict__)
db.commit()
return response
@router.patch(
"/status",
summary="Update status of organisation.",
status_code=status.HTTP_200_OK,
response_model=OrgPatchStatusResponse,
responses={
status.HTTP_200_OK: {
"description": "Successfully updated organisation status."
},
status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Invalid data in request."
},
status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be super admin."
},
},
)
async def update_status(
db: db_dependency,
org_model: org_model_body_dependency,
su: super_admin_dependency,
request_model: OrgPatchStatusRequest,
):
"""
Sets an organisation's status. This is the endpoint for approving or denying an organisation after reviewing the questionnaire.
"""
org_model.status = request_model.status
db.flush()
response = OrgPatchStatusResponse(**org_model.__dict__)
db.commit()
return response
@router.get(
"/users",
summary="Get email addresses of users of the organisation.",
status_code=status.HTTP_200_OK,
response_model=OrgGetUserResponse,
responses={
status.HTTP_200_OK: {"description": "Successful retrieval of users."},
status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Org ID missing or invalid."
},
status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be org root user."
},
},
)
async def get_users(org_model: org_model_root_claim_query_dependency):
"""
Returns a list of the email addresses of all users of the organisation.
"""
return {
"users": [{"email": user.email, "id": user.id} for user in org_model.user_rel],
"organisation": org_model,
}
@router.post(
"/user",
summary="Add user to the organisation.",
status_code=status.HTTP_200_OK,
response_model=OrgPostUserResponse,
responses={
status.HTTP_200_OK: {
"description": "Successfully added user to the organisation."
},
status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be org root user."
},
status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Invalid data in request."
},
status.HTTP_409_CONFLICT: {
"description": "User is already a member of the organisation."
},
},
)
async def add_user_to_org(
db: db_dependency,
org_model: org_model_body_dependency,
user_model: user_model_body_dependency,
su: super_admin_dependency,
request_model: OrgPostUserRequest,
):
"""
Adds a user to the organisation.
"""
if user_model in org_model.user_rel:
raise ConflictException(message="User already a part of this organisation")
org_model.user_rel.append(user_model)
db.flush()
await assign_default_user_group(db=db, org_model=org_model, user_model=user_model)
response = {
"organisation": org_model,
"users": [{"id": user.id, "email": user.email} for user in org_model.user_rel],
}
db.commit()
return response
@router.delete(
"",
summary="Delete organisation from the hub.",
status_code=status.HTTP_204_NO_CONTENT,
responses={
status.HTTP_204_NO_CONTENT: {
"description": "Successfully deleted organisation."
},
status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be super admin."
},
status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Org ID missing or invalid."
},
},
)
async def delete_organisation_by_id(
db: db_dependency,
org_model: org_model_query_dependency,
su: super_admin_dependency,
):
"""
Removes an organisation from the hub.
"""
db.delete(org_model)
db.commit()
@router.delete(
"/self",
summary="Delete organisation from the hub as root user before it has been approved.",
status_code=status.HTTP_204_NO_CONTENT,
responses={
status.HTTP_204_NO_CONTENT: {
"description": "Successfully deleted organisation."
},
status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Unprocessable content.",
"content": {
"application/json": {
"examples": {
"org_id": {"summary": "Invalid or missing org ID."},
"oidc_claims": {"summary": "Invalid or missing OIDC claims."},
}
}
},
},
status.HTTP_401_UNAUTHORIZED: {
"description": "Unauthorized",
"content": {
"application/json": {
"examples": {
"awaiting_approval": {
"summary": "Organisation has not yet been approved."
},
"expired_token": {"summary": "User token has expired."},
"oidc": {"summary": "Failed to verify OIDC claims."},
}
}
},
},
status.HTTP_403_FORBIDDEN: {
"description": "Forbidden",
"content": {
"application/json": {
"examples": {
"invalid_state": {
"summary": "Organisation is no longer in pre-approval state."
},
"not_root": {"summary": "Not authorised. Must be root user."},
}
}
},
},
status.HTTP_404_NOT_FOUND: {
"description": "Not found",
"content": {
"application/json": {
"examples": {
"db_id": {
"summary": "User not found in db when checking claims."
},
"user_model": {"summary": "User model not found in db."},
"org_model": {"summary": "Org model not found in db."},
}
}
},
},
},
)
async def delete_preapproved_organisation_by_id(
db: db_dependency,
org_model: org_model_root_claim_query_dependency,
):
"""
Removes an organisation from the hub before it has been approved, if user is root.
"""
org_status = StatusEnum(org_model.status)
if not org_status.is_pre_approval:
raise ForbiddenException(
message="Organisation is no longer in pre-approval state."
)
db.delete(org_model)
db.commit()
@router.patch(
"/root_user",
summary="Update the root user of the organisation.",
status_code=status.HTTP_200_OK,
response_model=OrgPatchRootResponse,
responses={
status.HTTP_200_OK: {"description": "Successfully updated root user."},
status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Invalid data in request."
},
status.HTTP_401_UNAUTHORIZED: {
"description": "Not authorised. Must be super admin."
},
},
)
async def update_root_user(
db: db_dependency,
org_model: org_model_body_dependency,
user_model: user_model_body_dependency,
su: super_admin_dependency,
request_model: OrgPatchRootRequest,
):
"""
Promotes an existing organisation user to the root user, giving them full control of the org.
"""
if user_model not in org_model.user_rel:
raise UnprocessableContentException(
message="This user does not belong to your organisation."
)
org_model.root_user_rel = user_model
db.flush()
response = OrgPatchRootResponse(
name=org_model.name, root_user_email=org_model.root_user_email
)
db.commit()
return response
@router.get(
"/groups",
summary="Get all organisation IAM groups.",
status_code=status.HTTP_200_OK,
response_model=OrgGetGroupResponse,
responses={
status.HTTP_200_OK: {"description": "Successful retrieval of IAM groups."},
status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Org ID missing or invalid."
},
status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be org root user."
},
},
)
async def get_org_groups(org_model: org_model_root_claim_query_dependency):
"""
Returns a list of the names of all IAM groups created by the organisation.
"""
return {
"organisation": org_model,
"groups": [
{"id": group.id, "name": group.name} for group in org_model.group_rel
],
}
@router.delete(
"/user",
summary="Remove user from organisation.",
status_code=status.HTTP_204_NO_CONTENT,
responses={
status.HTTP_204_NO_CONTENT: {"description": "Successfully removed user."},
status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be org root user."
},
status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Invalid data in request."
},
},
)
async def remove_user_from_org(
db: db_dependency,
org_model: org_model_root_claim_query_dependency,
user_model: user_model_query_dependency,
):
"""
Revokes a user's membership in an organisation.
"""
if user_model not in org_model.user_rel:
return
org_model.user_rel.remove(user_model)
db.commit()
@router.get(
"/contact",
summary="Get contact for organisation.",
status_code=status.HTTP_200_OK,
response_model=OrgGetContactResponse,
responses={
status.HTTP_200_OK: {"description": "Successful retrieval of contact."},
status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Invalid data in request."
},
status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be org root user."
},
},
)
async def get_contact(
org_model: org_model_root_claim_query_dependency,
contact_type: Annotated[
ContactType, Query(description="Must be billing|security|owner")
],
):
"""
Gets full details for a contact point at an organisation.
"""
match contact_type:
case "billing":
contact_model = org_model.billing_contact_rel
case "security":
contact_model = org_model.security_contact_rel
case "owner":
contact_model = org_model.owner_contact_rel
case _:
raise UnprocessableContentException("Invalid contact type")
if contact_model is None:
raise ContactNotFoundException()
address = ContactAddress.model_validate(contact_model)
contact_response = ContactModel.model_construct(
**contact_model.__dict__, address=address
)
return {"contact": contact_response, "organisation": org_model}
@router.patch(
"/contact",
summary="Update contact for organisation.",
status_code=status.HTTP_200_OK,
response_model=OrgPatchContactResponse,
responses={
status.HTTP_200_OK: {"description": "Successfully updated contact."},
status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Invalid data in request."
},
status.HTTP_403_FORBIDDEN: {
"description": "Not authorised. Must be org root user."
},
},
)
async def update_contact(
db: db_dependency,
org_model: org_model_root_claim_body_dependency,
request_model: OrgPatchContactRequest,
):
"""
Updates details for a contact point at an organisation.
"""
match request_model.contact_type:
case "billing":
contact_model = org_model.billing_contact_rel
case "security":
contact_model = org_model.security_contact_rel
case "owner":
contact_model = org_model.owner_contact_rel
case _:
raise UnprocessableContentException("Invalid contact type")
if contact_model is None:
raise ContactNotFoundException()
update_data = request_model.model_dump(exclude_none=True)
for key, value in update_data.items():
if hasattr(contact_model, key):
setattr(contact_model, key, value)
else:
if key == "contact_type" or key == "organisation_id":
continue
raise UnprocessableContentException("Invalid keys in update request")
db.flush()
address = ContactAddress.model_validate(contact_model)
contact_response = ContactModel.model_construct(
**contact_model.__dict__, address=address
)
db.commit()
return {"contact": contact_response, "organisation": org_model}