""" Router endpoints for IAM Endpoints: - [POST](/iam/can_act_on_resource): [API key & user claim]: Service access point to verify user permissions - [GET](/iam/group/permissions): [root user]: Gets list of perms(service, resource, action) the given group(id) has - [DELETE](/iam/group/permissions): [root user]: Removes a given perm(id) from the given group(id) - [GET](/iam/group/users): [root user]: Gets a list of users(id, name, email) that are assigned to the given group(id) - [POST](/iam/group): [root user]: Creates a new group for the given org(id) - [PUT](/iam/group/permission): [root user]: Assigns a perm(id) to the given group(id) - [PUT](/iam/group/user): [root user]: Assigns a user(id) to a group(id) - [DELETE](/iam/group/user): [root user]: Removes a user(id) from the given group(id) - [GET](/iam/permissions): [root user]: Gets a list of all permissions - [POST](/iam/permission): [super admin]: Creates a new permission - [DELETE](/iam/permission): [super admin]: Removes a permission - [GET](/iam/permissions/search): [root user]: Returns a list of permissions matching a filter(service|resource|action) """ from fastapi import APIRouter, status, BackgroundTasks from sqlalchemy.exc import IntegrityError from src.iam.exceptions import GroupNotFoundException from src.organisation.exceptions import OrgNotFoundException from src.service.exceptions import ServiceNotFoundException from src.exceptions import ConflictException from src.database import db_dependency from src.auth.exceptions import UnauthorizedException from src.auth.service import claims_dependency from src.auth.dependencies import ( org_model_root_claim_query_dependency, org_model_root_claim_body_dependency, super_admin_dependency, ) from src.user.models import User from src.user.dependencies import ( user_model_body_dependency, user_model_query_dependency, user_model_claims_dependency, ) from src.organisation.models import Organisation as Org from src.service.models import Service from src.iam.service import service_key_dependency, send_user_group_invitation from src.iam.models import ( Permission as Perm, GroupPermissions as GPerms, Group, UserGroups, ) from src.iam.dependencies import ( group_model_query_dependency, group_model_body_dependency, perm_model_body_dependency, perm_model_query_dependency, ) from src.iam.schemas import ( GroupSchema, IAMCAoRRequest, IAMGetGroupPermissionsResponse, IAMGetGroupUsersResponse, IAMPostGroupRequest, IAMPostGroupResponse, IAMPutGroupPermissionRequest, IAMPutGroupPermissionResponse, IAMPutGroupUserRequest, IAMPutGroupUserResponse, IAMDeleteGroupPermissionResponse, IAMDeleteGroupUserResponse, IAMGetPermissionsResponse, IAMPostPermissionRequest, IAMPostPermissionResponse, IAMGetPermissionsSearchRequest, IAMGetPermissionsSearchResponse, IAMPutGroupInvitationRequest, IAMPutGroupInvitationAcceptRequest, ) from src.utils import decode_jwt router = APIRouter( tags=["IAM"], prefix="/iam", ) @router.post("/can_act_on_resource") async def can_act_on_resource( valid_key: service_key_dependency, db: db_dependency, user_claims: claims_dependency, request_model: IAMCAoRRequest, ) -> bool: try: rn = request_model.rn action = request_model.action user_id = user_claims["db_id"] rn_org = rn.organisation rn_service = rn.service rn_resource = rn.resource result = ( db.query(Perm) .join(Service, Service.id == Perm.service_id) .join(GPerms, GPerms.permission_id == Perm.id) .join(Group, Group.id == GPerms.group_id) .join(Org, Org.id == Group.org_id) .join(UserGroups, UserGroups.group_id == Group.id) .join(User, User.id == UserGroups.user_id) .filter(User.id == user_id) .filter(Org.name == rn_org) .filter(Service.name == rn_service) .filter(Perm.resource == rn_resource) .filter(Perm.action == action) ).first() if result: return True else: return False except Exception: return False @router.get("/group/permissions", response_model=IAMGetGroupPermissionsResponse) async def get_group_permissions( group_model: group_model_query_dependency, org_model: org_model_root_claim_query_dependency, ): if group_model.org_id != org_model.id: raise UnauthorizedException("Group does not belong to this organization") return {"permissions": group_model.permission_rel} @router.get("/group/users", response_model=IAMGetGroupUsersResponse) async def get_group_users( group_model: group_model_query_dependency, org_model: org_model_root_claim_query_dependency, ): if group_model.org_id != org_model.id: raise UnauthorizedException("Group does not belong to this organization") return {"users": group_model.user_rel} @router.post("/group", response_model=IAMPostGroupResponse) async def create_group( db: db_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPostGroupRequest, ): group_model = Group(name=request_model.name, org_id=org_model.id) db.add(group_model) try: db.flush() except IntegrityError as e: if ( getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation ): raise ConflictException("Group with this name already exists") response = GroupSchema(**group_model.__dict__) db.commit() return {"group": response} @router.put("/group/permission", response_model=IAMPutGroupPermissionResponse) async def add_group_permission( db: db_dependency, group_model: group_model_body_dependency, perm_model: perm_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPutGroupPermissionRequest, ): if group_model.org_id != org_model.id: raise UnauthorizedException("Group does not belong to this organization") if perm_model in group_model.permission_rel: raise ConflictException("Group already has this permission") group_model.permission_rel.append(perm_model) db.flush() response = IAMPutGroupPermissionResponse( group=GroupSchema(**group_model.__dict__), permissions=group_model.permission_rel, ) db.commit() return response @router.put("/group/user", response_model=IAMPutGroupUserResponse) async def add_group_user( db: db_dependency, group_model: group_model_body_dependency, user_model: user_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPutGroupUserRequest, ): if group_model.org_id != org_model.id: raise UnauthorizedException("Group does not belong to this organization") if user_model in group_model.user_rel: raise ConflictException("User already in group") group_model.user_rel.append(user_model) db.flush() response = IAMPutGroupUserResponse( group=GroupSchema(**group_model.__dict__), users=group_model.user_rel ) db.commit() return response @router.delete("/group/permissions") async def remove_group_permissions( db: db_dependency, group_model: group_model_query_dependency, perm_model: perm_model_query_dependency, org_model: org_model_root_claim_query_dependency, ): if group_model.org_id != org_model.id: raise UnauthorizedException("Group does not belong to this organization") group_model.permission_rel.remove(perm_model) db.flush() response = IAMDeleteGroupPermissionResponse( group=GroupSchema(**group_model.__dict__), permissions=group_model.permission_rel, ) db.commit() return response @router.delete("/group/user") async def remove_group_user( db: db_dependency, group_model: group_model_query_dependency, user_model: user_model_query_dependency, org_model: org_model_root_claim_query_dependency, ): if group_model.org_id != org_model.id: raise UnauthorizedException("Group does not belong to this organization") user_model.group_rel.remove(group_model) db.flush() response = IAMDeleteGroupUserResponse( group=GroupSchema(**group_model.__dict__), users=group_model.user_rel ) db.commit() return response @router.get("/permissions", response_model=IAMGetPermissionsResponse) async def get_permissions( db: db_dependency, org_model: org_model_root_claim_query_dependency ): permission_models = db.query(Perm).all() return {"permissions": permission_models} @router.post("/permission", response_model=IAMPostPermissionResponse) async def create_new_permission( db: db_dependency, su: super_admin_dependency, request_model: IAMPostPermissionRequest, ): service_model = db.get(Service, request_model.service_id) if service_model is None: raise ServiceNotFoundException(service_id=request_model.service_id) perm_model = Perm(**request_model.__dict__) db.add(perm_model) try: db.flush() except IntegrityError as e: if ( getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation ): raise ConflictException(message="Permission already exists") response = { "id": perm_model.id, "service_name": perm_model.service_name, "resource": perm_model.resource, "action": perm_model.action, } db.commit() return {"permission": response} @router.delete("/permission", status_code=status.HTTP_204_NO_CONTENT) async def delete_permission( db: db_dependency, su: super_admin_dependency, perm_model: perm_model_query_dependency, ): db.delete(perm_model) db.commit() @router.post("/permissions/search", response_model=IAMGetPermissionsSearchResponse) async def post_permissions( db: db_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMGetPermissionsSearchRequest, ): permission_query = db.query(Perm) if request_model.service_id is not None: permission_query = permission_query.filter( Perm.service_id == request_model.service_id ) if request_model.resource is not None: permission_query = permission_query.filter( Perm.resource == request_model.resource ) if request_model.action is not None: permission_query = permission_query.filter(Perm.action == request_model.action) permission_models = permission_query.all() return {"permissions": permission_models} @router.put( "/group/user/invitation", summary="Send an email invitation for non-org member to join a group", status_code=status.HTTP_200_OK, ) async def invitation( background_tasks: BackgroundTasks, org_model: org_model_root_claim_body_dependency, group_model: group_model_body_dependency, request_model: IAMPutGroupInvitationRequest, ): org_id = org_model.id org_name = org_model.name user_email = request_model.user_email group_id = group_model.id group_name = group_model.name background_tasks.add_task( send_user_group_invitation, org_id=org_id, org_name=org_name, user_email=user_email, group_id=group_id, group_name=group_name, ) return "Invitation sent" @router.put( "/group/user//invitation/accept", summary="Accept email invitation to join an org's group", status_code=status.HTTP_200_OK, ) async def accept_invitation( db: db_dependency, user_model: user_model_claims_dependency, request_model: IAMPutGroupInvitationAcceptRequest, ): email_claims = await decode_jwt(request_model.jwt) claimed_email = email_claims["email"] if user_model.email != claimed_email: raise UnauthorizedException("The logged in user and email do not match.") org_model = db.get(Org, email_claims["org_id"]) if org_model is None: raise OrgNotFoundException() group_model = db.get(Group, email_claims["group_id"]) if group_model is None: raise GroupNotFoundException() if group_model not in org_model.group_rel: raise UnauthorizedException("Group and org do not match.") if user_model in group_model.user_rel: raise ConflictException(message="User already in group.") group_model.user_rel.append(user_model) db.commit() return "Invitation accepted"