feat: iam rbac system

Endpoints and db architecture to support a role based IAM system.
This commit is contained in:
Chris Milne 2026-05-25 09:05:17 +01:00
parent 7b3ee9d5fa
commit 23f2ce98d7
31 changed files with 634 additions and 317 deletions

View file

@ -8,22 +8,22 @@ Endpoints:
- [patch]/{org_id}/status - Updates the status of an organisation
- [patch]/{org_id}/contact - Assigns a contact to an organisation (as billing, security, or owner)
- [get]/{org_id}/users - Retrieves all users associated with an organisation
- [get]/{org_id}/users/admins - Retrieves only the admin users of an organisation
- [post]/{org_id}/users - Adds a new user to an organisation
- [patch]/{org_id}/users - Updates details of an existing organisation user (e.g., admin status)
- [delete]/{org_id} - Deletes an organisation by ID
- [get]/{org_id}/contact/{contact_type} - Retrieves the contact of a specific type (owner, billing, security) for an organisation
"""
from typing import Annotated
from fastapi import APIRouter, HTTPException
from fastapi.params import Path
from fastapi import APIRouter, HTTPException, status
from fastapi.params import Path, Query
from sqlalchemy.sql import exists
from src.database import db_dependency
from src.contact.models import Contact
from src.iam.models import Group
from src.organisation.dependencies import org_model_dependency
from src.organisation.constants import ContactType
from src.organisation.models import Organisation as Org, OrgUsers
from src.organisation.schemas import OrgOrgPostRequest, OrgQuestionnairePatchRequest, OrgStatusPatchRequest, \
@ -127,17 +127,6 @@ async def get_users(db: db_dependency, org_id: Annotated[int, Path(gt=0)]):
return org_user_models
@router.get("/{org_id}/users/admins", response_model=list[OrgUserGetResponse])
async def get_admin_users(db: db_dependency, org_id: Annotated[int, Path(gt=0)]):
org_exists = db.query(exists().where(Org.id == org_id)).scalar()
if not org_exists:
raise HTTPException(status_code=404, detail="Organisation not found")
org_user_models = db.query(OrgUsers).filter(OrgUsers.org_id == org_id).filter(OrgUsers.is_admin == True).all()
return org_user_models
@router.post("/{org_id}/users")
async def add_user_to_org(db: db_dependency, user_request: OrgUserPostRequest, org_id: Annotated[int, Path(gt=0)]):
org_model = (db.query(Org).filter(Org.id == org_id).first())
@ -150,27 +139,6 @@ async def add_user_to_org(db: db_dependency, user_request: OrgUserPostRequest, o
db.commit()
@router.patch("/{org_id}/users")
async def update_user_details(db: db_dependency, user_request: OrgUserPostRequest, org_id: Annotated[int, Path(gt=0)]):
"""
Currently used only to update user admin status for organisation.
"""
org_model = (db.query(Org).filter(Org.id == org_id).first())
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
org_user_model = db.query(OrgUsers).filter(OrgUsers.org_id == org_id).filter(OrgUsers.user_id == user_request.user_id).first()
if org_user_model is None:
raise HTTPException(status_code=404, detail="Organisation user not found")
if user_request.is_admin is not None:
org_user_model.is_admin = user_request.is_admin
db.add(org_user_model)
db.commit()
@router.delete("/{org_id}")
async def delete_organisation_by_id(db: db_dependency, org_id: Annotated[int, Path(gt=0)]):
org_model = (db.query(Org).filter(Org.id == org_id).first())
@ -201,3 +169,39 @@ async def get_contact(db: db_dependency, contact_type: ContactType, org_id: Anno
raise HTTPException(status_code=404, detail="Contact not found")
return contact_model
@router.get("/{org_id}/root_user")
async def get_org_root_user(db: db_dependency, org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]):
root_user = org_model.root_user_id
return {"root_user": root_user}
@router.patch("/{org_id}/root_user")
async def update_root_user(db: db_dependency, org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)], root_user: Annotated[int, Query(gt=0)]):
# TODO: Request model, ditch query
# TODO: Verify root_user exists, possibly with a user_model_dependency
org_model.root_user_id = root_user
db.add(org_model)
db.commit()
# TODO: Response model
@router.get("/{org_id}/groups")
async def get_org_groups(db: db_dependency, org_id: Annotated[int, Path(gt=0)]):
org_group_models = db.query(Group).filter(Group.org_id == org_id).all()
# TODO: Response model
return org_group_models
@router.delete("/{org_id}/user")
async def remove_user_from_org(db: db_dependency, org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)], user_id: Annotated[int, Query(gt=0)]):
orguser_model = db.query(OrgUsers).filter(OrgUsers.org_id == org_id, OrgUsers.user_id == user_id).first()
if orguser_model is None:
raise HTTPException(status_code=status.HTTP_204_NO_CONTENT)
db.delete(orguser_model)
db.commit()
pass