""" Router endpoints for Endpoints: - List: Description - Endpoints: Description """ from typing import Annotated from fastapi import APIRouter, Query, HTTPException, status from src.database import db_dependency from src.iam.schemas import IAMGetGroupPermissionsResponse, IAMGetGroupUsersResponse, IAMPostGroupRequest, \ GroupResponse, IAMPostGroupResponse, IAMPutGroupPermissionRequest, IAMPutGroupPermissionResponse, \ IAMPutGroupUserRequest, IAMPutGroupUserResponse, IAMDeleteGroupPermissionRequest, IAMDeleteGroupPermissionResponse, \ IAMDeleteGroupUserRequest, IAMDeleteGroupUserResponse, IAMGetPermissionsResponse, IAMPostPermissionRequest, \ IAMPostPermissionResponse, PermissionResponse, IAMDeletePermissionRequest, IAMGetPermissionsSearchRequest, IAMGetPermissionsSearchResponse from src.schemas import ResourceName from src.auth.service import claims_dependency from src.user.exceptions import UserNotFoundException from src.user.models import User from src.organisation.models import Organisation as Org from src.service.models import Service from src.organisation.dependencies import org_model_dependency from src.iam.service import service_key_dependency from src.iam.models import Permission as Perm, GroupPermissions as GPerms, Group, UserGroups from src.iam.dependencies import group_model_query_dependency, group_model_body_dependency, perm_model_body_dependency router = APIRouter( tags=["IAM"], prefix="/iam", ) @router.post("/can_act_on_resource") async def can_act_on_resource(valid_key: service_key_dependency, db: db_dependency, user_claims: claims_dependency, rn: ResourceName, action: str) -> bool: try: user_id = user_claims["db_id"] rn_org = rn.organisation rn_service = rn.service rn_resource = rn.resource result = (db.query(Perm) .join(Service, Service.id == Perm.service_id) .join(GPerms, GPerms.permission_id == Perm.id) .join(Group, Group.id == GPerms.group_id) .join(Org, Org.id == Group.org_id) .join(UserGroups, UserGroups.group_id == Group.id) .join(User, User.id == UserGroups.user_id) .filter(User.id == user_id) .filter(Org.name == rn_org) .filter(Service.name == rn_service) .filter(Perm.resource == rn_resource) .filter(Perm.action == action) ).first() if result: return True else: return False except Exception as e: print(e) raise HTTPException(status_code=500, detail="Internal server error") @router.get("/group/permissions", response_model=IAMGetGroupPermissionsResponse) async def get_group_permissions(db: db_dependency, group_model: group_model_query_dependency): # TODO: root_user_dependency return {"permissions": group_model.permission_rel} @router.get("/group/users", response_model=IAMGetGroupUsersResponse) async def get_group_users(db: db_dependency, group_model: group_model_query_dependency): # TODO: root_user_dependency return {"users": group_model.user_rel} @router.post("/group", response_model=IAMPostGroupResponse) async def create_group(db: db_dependency, group_request: IAMPostGroupRequest, org_model: org_model_dependency, org_id: Annotated[int, Query(gt=0)]): # TODO: root_user_dependency # TODO: get org ID from dependency instead of query (needs updated dep first) group_model = Group(name=group_request.name, org_id=org_id) db.add(group_model) db.flush() response = GroupResponse(**group_model.__dict__) db.commit() return {"group": response} @router.put("/group/permission", response_model=IAMPutGroupPermissionResponse) async def add_group_permission(db: db_dependency, group_model: group_model_body_dependency, perm_model: perm_model_body_dependency, request_model: IAMPutGroupPermissionRequest): # TODO: root_user_dependency group_model.permission_rel.append(perm_model) db.flush() response = IAMPutGroupPermissionResponse(group=GroupResponse(**group_model.__dict__), permissions=group_model.permission_rel) db.commit() return response @router.put("/group/user") async def add_group_user(db: db_dependency, group_model: group_model_body_dependency, request_model: IAMPutGroupUserRequest): # TODO: root_user_dependency # TODO: user_model_dependency user_model = db.get(User, request_model.user_id) if user_model is None: raise UserNotFoundException(user_id=request_model.user_id) group_model.user_rel.append(user_model) db.flush() response = IAMPutGroupUserResponse(group=GroupResponse(**group_model.__dict__), users=group_model.user_rel) db.commit() return response @router.delete("/group/permissions") async def remove_group_permissions(db: db_dependency, group_model: group_model_body_dependency, perm_model: perm_model_body_dependency, request_model: IAMDeleteGroupPermissionRequest): # TODO: root_user_dependency group_model.permission_rel.remove(perm_model) db.flush() response = IAMDeleteGroupPermissionResponse(group=GroupResponse(**group_model.__dict__), permissions=group_model.permission_rel) db.commit() return response @router.delete("/group/user") async def remove_group_user(db: db_dependency, group_model: group_model_body_dependency, request_model: IAMDeleteGroupUserRequest): # TODO: root_user_dependency # TODO: User model dependency user_model = db.get(User, request_model.user_id) if user_model is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") user_model.group_rel.remove(group_model) db.flush() response = IAMDeleteGroupUserResponse(group=GroupResponse(**group_model.__dict__), users=group_model.user_rel) db.commit() return response @router.get("/permissions", response_model=IAMGetPermissionsResponse) async def get_permissions(db: db_dependency): # TODO: root_user_dependency permission_models = db.query(Perm).all() return {"permissions": permission_models} @router.post("/permission") async def create_new_permission(db: db_dependency, request_mode: IAMPostPermissionRequest): # TODO: super_admin_dependency perm_model = Perm(**request_mode.__dict__) db.add(perm_model) db.flush() response = IAMPostPermissionResponse(permission=PermissionResponse(**perm_model.__dict__)) db.commit() return response @router.delete("/permission", status_code=status.HTTP_204_NO_CONTENT) async def delete_permission(db: db_dependency, perm_model: perm_model_body_dependency, request_model: IAMDeletePermissionRequest): # TODO: super_admin_dependency db.delete(perm_model) db.commit() @router.get("/permissions/search", response_model=IAMGetPermissionsSearchResponse) async def get_permissions(db: db_dependency, search: IAMGetPermissionsSearchRequest): # TODO: super_admin_dependency permission_query = db.query(Perm) if search.service_id is not None: permission_query = permission_query.filter(Perm.service_id == search.service_id) if search.resource is not None: permission_query = permission_query.filter(Perm.resource == search.resource) if search.action is not None: permission_query = permission_query.filter(Perm.action == search. action) permission_models = permission_query.all() return {"permissions": permission_models}