""" Router endpoints for IAM Endpoints: - [POST](/iam/can_act_on_resource): [API key & user claim]: Service access point to verify user permissions - [GET](/iam/group/permissions): [root user]: Gets list of perms(service, resource, action) the given group(id) has - [DELETE](/iam/group/permissions): [root user]: Removes a given perm(id) from the given group(id) - [GET](/iam/group/users): [root user]: Gets a list of users(id, name, email) that are assigned to the given group(id) - [POST](/iam/group): [root user]: Creates a new group for the given org(id) - [PUT](/iam/group/permission): [root user]: Assigns a perm(id) to the given group(id) - [PUT](/iam/group/user): [root user]: Assigns a user(id) to a group(id) - [DELETE](/iam/group/user): [root user]: Removes a user(id) from the given group(id) - [GET](/iam/permissions): [root user]: Gets a list of all permissions - [POST](/iam/permission): [super admin]: Creates a new permission - [DELETE](/iam/permission): [super admin]: Removes a permission - [GET](/iam/permissions/search): [root user]: Returns a list of permissions matching a filter(service|resource|action) """ from fastapi import APIRouter, status from sqlalchemy.exc import IntegrityError from psycopg import errors from src.exceptions import ConflictException from src.database import db_dependency from src.schemas import ResourceName from src.auth.exceptions import UnauthorizedException from src.auth.service import claims_dependency from src.auth.dependencies import org_model_root_claim_query_dependency, org_model_root_claim_body_dependency, \ super_admin_dependency from src.user.models import User from src.user.dependencies import user_model_body_dependency from src.organisation.models import Organisation as Org from src.service.models import Service from src.iam.service import service_key_dependency from src.iam.models import Permission as Perm, GroupPermissions as GPerms, Group, UserGroups from src.iam.dependencies import group_model_query_dependency, group_model_body_dependency, perm_model_body_dependency from src.iam.schemas import IAMGetGroupPermissionsResponse, IAMGetGroupUsersResponse, IAMPostGroupRequest, \ GroupSchema, IAMPostGroupResponse, IAMPutGroupPermissionRequest, IAMPutGroupPermissionResponse, \ IAMPutGroupUserRequest, IAMPutGroupUserResponse, IAMDeleteGroupPermissionRequest, IAMDeleteGroupPermissionResponse, \ IAMDeleteGroupUserRequest, IAMDeleteGroupUserResponse, IAMGetPermissionsResponse, IAMPostPermissionRequest, \ IAMPostPermissionResponse, PermissionSchema, IAMDeletePermissionRequest, IAMGetPermissionsSearchRequest, IAMGetPermissionsSearchResponse router = APIRouter( tags=["IAM"], prefix="/iam", ) @router.post("/can_act_on_resource") async def can_act_on_resource(valid_key: service_key_dependency, db: db_dependency, user_claims: claims_dependency, rn: ResourceName, action: str) -> bool: try: user_id = user_claims["db_id"] rn_org = rn.organisation rn_service = rn.service rn_resource = rn.resource result = (db.query(Perm) .join(Service, Service.id == Perm.service_id) .join(GPerms, GPerms.permission_id == Perm.id) .join(Group, Group.id == GPerms.group_id) .join(Org, Org.id == Group.org_id) .join(UserGroups, UserGroups.group_id == Group.id) .join(User, User.id == UserGroups.user_id) .filter(User.id == user_id) .filter(Org.name == rn_org) .filter(Service.name == rn_service) .filter(Perm.resource == rn_resource) .filter(Perm.action == action) ).first() if result: return True else: return False except Exception: raise UnauthorizedException() @router.get("/group/permissions", response_model=IAMGetGroupPermissionsResponse) async def get_group_permissions(group_model: group_model_query_dependency, org_model: org_model_root_claim_query_dependency): if group_model.org_id != org_model.id: raise UnauthorizedException() return {"permissions": group_model.permission_rel} @router.get("/group/users", response_model=IAMGetGroupUsersResponse) async def get_group_users(group_model: group_model_query_dependency, org_model: org_model_root_claim_query_dependency): if group_model.org_id != org_model.id: raise UnauthorizedException() return {"users": group_model.user_rel} @router.post("/group", response_model=IAMPostGroupResponse) async def create_group(db: db_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPostGroupRequest): group_model = Group(name=request_model.name, org_id=org_model.id) db.add(group_model) try: db.flush() except IntegrityError as e: if isinstance(e.orig, errors.UniqueViolation): raise ConflictException("Group with this name already exists") response = GroupSchema(**group_model.__dict__) db.commit() return {"group": response} @router.put("/group/permission", response_model=IAMPutGroupPermissionResponse) async def add_group_permission(db: db_dependency, group_model: group_model_body_dependency, perm_model: perm_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPutGroupPermissionRequest): if group_model.org_id != org_model.id: raise UnauthorizedException() if perm_model in group_model.permission_rel: raise ConflictException("Group already has this permission") group_model.permission_rel.append(perm_model) db.flush() response = IAMPutGroupPermissionResponse(group=GroupSchema(**group_model.__dict__), permissions=group_model.permission_rel) db.commit() return response @router.put("/group/user", response_model=IAMPutGroupUserResponse) async def add_group_user(db: db_dependency, group_model: group_model_body_dependency, user_model: user_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPutGroupUserRequest): if group_model.org_id != org_model.id: raise UnauthorizedException() if user_model in group_model.user_rel: raise ConflictException("User already in group") group_model.user_rel.append(user_model) db.flush() response = IAMPutGroupUserResponse(group=GroupSchema(**group_model.__dict__), users=group_model.user_rel) db.commit() return response @router.delete("/group/permissions") async def remove_group_permissions(db: db_dependency, group_model: group_model_body_dependency, perm_model: perm_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMDeleteGroupPermissionRequest): if group_model.org_id != org_model.id: raise UnauthorizedException() group_model.permission_rel.remove(perm_model) db.flush() response = IAMDeleteGroupPermissionResponse(group=GroupSchema(**group_model.__dict__), permissions=group_model.permission_rel) db.commit() return response @router.delete("/group/user") async def remove_group_user(db: db_dependency, group_model: group_model_body_dependency, user_model: user_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMDeleteGroupUserRequest): if group_model.org_id != org_model.id: raise UnauthorizedException() user_model.group_rel.remove(group_model) db.flush() response = IAMDeleteGroupUserResponse(group=GroupSchema(**group_model.__dict__), users=group_model.user_rel) db.commit() return response @router.get("/permissions", response_model=IAMGetPermissionsResponse) async def get_permissions(db: db_dependency, org_model: org_model_root_claim_query_dependency): permission_models = db.query(Perm).all() return {"permissions": permission_models} @router.post("/permission") async def create_new_permission(db: db_dependency, su: super_admin_dependency, request_mode: IAMPostPermissionRequest): perm_model = Perm(**request_mode.__dict__) try: db.add(perm_model) except IntegrityError as e: if isinstance(e.orig, errors.UniqueViolation): raise ConflictException(message="Permission already exists") db.flush() response = IAMPostPermissionResponse(permission=PermissionSchema(**perm_model.__dict__)) db.commit() return response @router.delete("/permission", status_code=status.HTTP_204_NO_CONTENT) async def delete_permission(db: db_dependency, su: super_admin_dependency, perm_model: perm_model_body_dependency, request_model: IAMDeletePermissionRequest): db.delete(perm_model) db.commit() @router.get("/permissions/search", response_model=IAMGetPermissionsSearchResponse) async def get_permissions(db: db_dependency, org_model: org_model_root_claim_body_dependency, search: IAMGetPermissionsSearchRequest): permission_query = db.query(Perm) if search.service_id is not None: permission_query = permission_query.filter(Perm.service_id == search.service_id) if search.resource is not None: permission_query = permission_query.filter(Perm.resource == search.resource) if search.action is not None: permission_query = permission_query.filter(Perm.action == search. action) permission_models = permission_query.all() return {"permissions": permission_models}