feat: iam endpoint req/res models

This commit is contained in:
Chris Milne 2026-05-26 16:25:14 +01:00
parent fa8439cc6c
commit d4f1b73deb
3 changed files with 216 additions and 81 deletions

View file

@ -22,6 +22,19 @@ class Permission(Base):
UniqueConstraint("service_id", "resource", "action", name="uniq_permission_resource_and_action")
service_rel = relationship("Service", foreign_keys=[service_id])
@property
def service_name(self):
return self.service_rel.name
group_rel = relationship(
"Group",
secondary="group_permissions",
back_populates="permission_rel"
)
class Group(Base):
__tablename__ = "group"
@ -38,6 +51,12 @@ class Group(Base):
org_rel = relationship("Organisation", back_populates="group_rel")
permission_rel = relationship(
"Permission",
secondary="group_permissions",
back_populates="group_rel"
)
class GroupPermissions(Base):
__tablename__ = "group_permissions"

View file

@ -5,11 +5,16 @@ Endpoints:
- List: Description
- Endpoints: Description
"""
from typing import Annotated, Optional
from typing import Annotated
from fastapi import APIRouter, Query, HTTPException
from fastapi import APIRouter, Query, HTTPException, status
from src.database import db_dependency
from src.iam.schemas import IAMGetGroupPermissionsResponse, IAMGetGroupUsersResponse, IAMPostGroupRequest, \
GroupResponse, IAMPostGroupResponse, IAMPutGroupPermissionRequest, IAMPutGroupPermissionResponse, \
IAMPutGroupUserRequest, IAMPutGroupUserResponse, IAMDeleteGroupPermissionRequest, IAMDeleteGroupPermissionResponse, \
IAMDeleteGroupUserRequest, IAMDeleteGroupUserResponse, IAMGetPermissionsResponse, IAMPostPermissionRequest, \
IAMPostPermissionResponse, PermissionResponse, IAMDeletePermissionRequest, IAMGetPermissionsSearchRequest, IAMGetPermissionsSearchResponse
from src.schemas import ResourceName
from src.auth.service import claims_dependency
from src.user.models import User
@ -58,135 +63,154 @@ async def can_act_on_resource(valid_key: service_key_dependency, db: db_dependen
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/group/permissions")
@router.get("/group/permissions", response_model=IAMGetGroupPermissionsResponse)
async def get_group_permissions(db: db_dependency, group_id: Annotated[int, Query(gt=0)]):
# TODO: iam_admin_dependency
group_perms = db.query(Perm).join(GPerms).filter(GPerms.group_id==group_id).all()
# TODO: root_user_dependency & org_id query param
group_model = db.get(Group, group_id)
if group_model is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Group not found")
# TODO: Response model
return group_perms
return {"permissions": group_model.permission_rel}
@router.get("/group/users")
@router.get("/group/users", response_model=IAMGetGroupUsersResponse)
async def get_group_users(db: db_dependency, group_id: Annotated[int, Query(gt=0)]):
# TODO: iam_admin_dependency
group_users = db.query(User).join(UserGroups).filter(UserGroups.group_id == group_id).all()
# TODO: root_user_dependency & org_id query param
group_model = db.get(Group, group_id)
if group_model is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Group not found")
# TODO: Response model
return group_users
return {"users": group_model.user_rel}
@router.post("/group")
async def create_group(db: db_dependency, group_name: str, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
group_model = Group(name=group_name, org_id=org_id)
@router.post("/group", response_model=IAMPostGroupResponse)
async def create_group(db: db_dependency, group_request: IAMPostGroupRequest, org_model: org_model_dependency, org_id: Annotated[int, Query(gt=0)]):
# TODO: root_user_dependency
group_model = Group(name=group_request.name, org_id=org_id)
db.add(group_model)
db.flush()
response = GroupResponse(**group_model.__dict__)
db.commit()
# TODO: Response model
return {"group": response}
@router.put("/group/permissions")
async def add_group_permissions(db: db_dependency, group_id: int, permission_id: int, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
g_perm_model = GPerms(group_id=group_id, permission_id=permission_id)
@router.put("/group/permission", response_model=IAMPutGroupPermissionResponse)
async def add_group_permission(db: db_dependency, request_model: IAMPutGroupPermissionRequest, org_model: org_model_dependency, org_id: Annotated[int, Query(gt=0)]):
# TODO: root_user_dependency
group_model = db.get(Group, request_model.group_id)
if group_model is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Group not found")
perm_model = db.get(Perm, request_model.permission_id)
if perm_model is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Permission not found")
db.add(g_perm_model)
group_model.permission_rel.append(perm_model)
db.flush()
response = IAMPutGroupPermissionResponse(group=GroupResponse(**group_model.__dict__), permissions=group_model.permission_rel)
db.commit()
# TODO: Response model
return response
@router.put("/group/users")
async def add_group_users(db: db_dependency, group_id: int, user_ids: list[int], org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
for user_id in user_ids:
user_group_model = UserGroups(group_id=group_id, user_id=user_id, org_id=org_id)
db.add(user_group_model)
@router.put("/group/user")
async def add_group_user(db: db_dependency, request_model: IAMPutGroupUserRequest, org_model: org_model_dependency, org_id: Annotated[int, Query(gt=0)]):
# TODO: root_user_dependency
group_model = db.get(Group, request_model.group_id)
if group_model is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Group not found")
user_model = db.get(User, request_model.user_id)
if user_model is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
group_model.user_rel.append(user_model)
db.flush()
response = IAMPutGroupUserResponse(group=GroupResponse(**group_model.__dict__), users=group_model.user_rel)
db.commit()
# TODO: Response model
return response
@router.delete("/group/permissions")
async def remove_group_permissions(db: db_dependency, group_id: int, org_model: org_model_dependency, org_id: int, permission_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
g_perm_model = db.query(GPerms).filter(GPerms.group_id == group_id, GPerms.permission_id == permission_id).first()
if g_perm_model is None:
return
async def remove_group_permissions(db: db_dependency, request_model: IAMDeleteGroupPermissionRequest, org_model: org_model_dependency, org_id: Annotated[int, Query(gt=0)]):
# TODO: root_user_dependency
group_model = db.get(Group, request_model.group_id)
if group_model is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Group not found")
perm_model = db.get(Perm, request_model.permission_id)
if perm_model is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Permission not found")
db.delete(g_perm_model)
group_model.permission_rel.remove(perm_model)
db.flush()
response = IAMDeleteGroupPermissionResponse(group=GroupResponse(**group_model.__dict__),
permissions=group_model.permission_rel)
db.commit()
return
# TODO: Response model
return response
@router.delete("/group/user")
async def remove_group_user(db: db_dependency, group_id: int, user_id: int, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
user_group_model = db.query(UserGroups).filter(UserGroups.group_id == group_id, UserGroups.user_id == user_id).first()
if user_group_model is None:
return
async def remove_group_user(db: db_dependency, request_model: IAMDeleteGroupUserRequest, org_model: org_model_dependency, org_id: Annotated[int, Query(gt=0)]):
# TODO: root_user_dependency
group_model = db.get(Group, request_model.group_id)
if group_model is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Group not found")
user_model = db.get(User, request_model.user_id)
if user_model is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
db.delete(user_group_model)
user_model.group_rel.remove(group_model)
db.flush()
response = IAMDeleteGroupUserResponse(group=GroupResponse(**group_model.__dict__), users=group_model.user_rel)
db.commit()
return
# TODO: Response model
return response
@router.get("/permissions")
async def get_permissions(db: db_dependency, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: request model
@router.get("/permissions", response_model=IAMGetPermissionsResponse)
async def get_permissions(db: db_dependency, org_model: org_model_dependency, org_id: Annotated[int, Query(gt=0)]):
# TODO: root_user_dependency
permission_models = db.query(Perm).all()
# TODO: Response model
return permission_models
return {"permissions": permission_models}
@router.post("/permission")
async def create_new_permission(db: db_dependency, service_id: int, resource: str, action: str):
async def create_new_permission(db: db_dependency, request_mode: IAMPostPermissionRequest):
# TODO: super_admin_dependency
perm_model = Perm(service_id=service_id, resource=resource, action=action)
perm_model = Perm(**request_mode.__dict__)
db.add(perm_model)
db.flush()
response = IAMPostPermissionResponse(permission=PermissionResponse(**perm_model.__dict__))
db.commit()
return response
@router.delete("/permission")
async def delete_permission(db: db_dependency, service_id: int, resource: str, action: str, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
perm_model = db.query(Perm).filter(Perm.service_id==service_id, Perm.resource==resource, Perm.action==action).first()
@router.delete("/permission", status_code=status.HTTP_204_NO_CONTENT)
async def delete_permission(db: db_dependency, request_model: IAMDeletePermissionRequest):
# TODO: super_admin_dependency
perm_model = db.query(Perm).filter(Perm.service_id==request_model.service_id, Perm.resource==request_model.resource, Perm.action==request_model.action).first()
if perm_model is None:
return
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Permission not found")
db.delete(perm_model)
db.commit()
return
# TODO: Response model
@router.get("/permissions/search")
async def get_permissions(db: db_dependency, org_model: org_model_dependency, org_id: int, service_id: Optional[int] = None, resource: Optional[str] = None, action: Optional[str] = None):
# TODO: iam_admin_dependency
# TODO: request model
@router.get("/permissions/search", response_model=IAMGetPermissionsSearchResponse)
async def get_permissions(db: db_dependency, search: IAMGetPermissionsSearchRequest):
# TODO: super_admin_dependency
permission_query = db.query(Perm)
if service_id is not None:
permission_query = permission_query.filter(Perm.service_id == service_id)
if search.service_id is not None:
permission_query = permission_query.filter(Perm.service_id == search.service_id)
if resource is not None:
permission_query = permission_query.filter(Perm.resource == resource)
if search.resource is not None:
permission_query = permission_query.filter(Perm.resource == search.resource)
if action is not None:
permission_query = permission_query.filter(Perm.action == action)
if search.action is not None:
permission_query = permission_query.filter(Perm.action == search. action)
permission_models = permission_query.all()
# TODO: Response model
return permission_models
return {"permissions": permission_models}

View file

@ -1,7 +1,99 @@
"""
Pydantic models for <this module>
Pydantic models for the IAM module
Models:
- List: Description
- Models: Description
"""
from typing import Optional
from pydantic import EmailStr, ConfigDict
from src.schemas import CustomBaseModel
from src.organisation.constants import Status, ContactType
from src.contact.schemas import ContactAddress
class UserResponse(CustomBaseModel):
id: int
first_name: str
last_name: str
email: EmailStr
class PermissionResponse(CustomBaseModel):
model_config = ConfigDict(from_attributes=True, extra="ignore")
service_name: str
resource: str
action: str
class GroupResponse(CustomBaseModel):
id: int
name: str
class IAMGetGroupPermissionsResponse(CustomBaseModel):
permissions: list[PermissionResponse]
class IAMGetGroupUsersResponse(CustomBaseModel):
users : list[UserResponse]
class IAMPostGroupRequest(CustomBaseModel):
name: str
class IAMPostGroupResponse(CustomBaseModel):
group: GroupResponse
class IAMPutGroupPermissionRequest(CustomBaseModel):
group_id: int
permission_id: int
class IAMPutGroupPermissionResponse(CustomBaseModel):
group: GroupResponse
permissions: list[PermissionResponse]
class IAMPutGroupUserRequest(CustomBaseModel):
group_id: int
user_id: int
class IAMPutGroupUserResponse(CustomBaseModel):
group: GroupResponse
users: list[UserResponse]
class IAMDeleteGroupPermissionRequest(CustomBaseModel):
group_id: int
permission_id: int
class IAMDeleteGroupPermissionResponse(CustomBaseModel):
group: GroupResponse
permissions: list[PermissionResponse]
class IAMDeleteGroupUserRequest(CustomBaseModel):
group_id: int
user_id: int
class IAMDeleteGroupUserResponse(CustomBaseModel):
group: GroupResponse
users: list[UserResponse]
class IAMGetPermissionsResponse(CustomBaseModel):
permissions: list[PermissionResponse]
class IAMPostPermissionRequest(CustomBaseModel):
service_id: int
resource: str
action: str
class IAMPostPermissionResponse(CustomBaseModel):
permission: PermissionResponse
class IAMDeletePermissionRequest(CustomBaseModel):
service_id: int
resource: str
action: str
class IAMGetPermissionsSearchRequest(CustomBaseModel):
service_id: Optional[int] = None
resource: Optional[str] = None
action: Optional[str] = None
class IAMGetPermissionsSearchResponse(CustomBaseModel):
permissions: list[PermissionResponse]