""" Router endpoints for IAM Endpoints: - [POST](/iam/can_act_on_resource): [API key & user claim]: Service access point to verify user permissions - [GET](/iam/group/permissions): [root user]: Gets list of perms(service, resource, action) the given group(id) has - [DELETE](/iam/group/permissions): [root user]: Removes a given perm(id) from the given group(id) - [GET](/iam/group/users): [root user]: Gets a list of users(id, name, email) that are assigned to the given group(id) - [POST](/iam/group): [root user]: Creates a new group for the given org(id) - [PUT](/iam/group/permission): [root user]: Assigns a perm(id) to the given group(id) - [PUT](/iam/group/user): [root user]: Assigns a user(id) to a group(id) - [DELETE](/iam/group/user): [root user]: Removes a user(id) from the given group(id) - [GET](/iam/permissions): [root user]: Gets a list of all permissions - [POST](/iam/permission): [super admin]: Creates a new permission - [DELETE](/iam/permission): [super admin]: Removes a permission - [GET](/iam/permissions/search): [root user]: Returns a list of permissions matching a filter(service|resource|action) """ from fastapi import APIRouter, status, BackgroundTasks from sqlalchemy.exc import IntegrityError from src.iam.exceptions import GroupNotFoundException from src.organisation.exceptions import OrgNotFoundException from src.schemas import GroupSummary, OrgSummary from src.service.exceptions import ServiceNotFoundException from src.exceptions import ConflictException from src.database import db_dependency from src.auth.exceptions import UnauthorizedException from src.auth.service import claims_dependency from src.auth.dependencies import ( org_model_root_claim_query_dependency, org_model_root_claim_body_dependency, super_admin_dependency, ) from src.user.models import User from src.user.dependencies import ( user_model_body_dependency, user_model_query_dependency, user_model_claims_dependency, ) from src.organisation.models import Organisation as Org from src.service.models import Service from src.iam.service import service_key_dependency, send_user_group_invitation from src.iam.models import ( Permission as Perm, GroupPermissions as GPerms, Group, UserGroups, ) from src.iam.dependencies import ( group_model_query_dependency, group_model_body_dependency, perm_model_body_dependency, perm_model_query_dependency, ) from src.iam.schemas import ( GroupSchema, IAMCAoRRequest, IAMGetGroupPermissionsResponse, IAMGetGroupUsersResponse, IAMPostGroupRequest, IAMPostGroupResponse, IAMPutGroupPermissionRequest, IAMPutGroupPermissionResponse, IAMPutGroupUserRequest, IAMPutGroupUserResponse, IAMDeleteGroupPermissionResponse, IAMDeleteGroupUserResponse, IAMGetPermissionsResponse, IAMPostPermissionRequest, IAMPostPermissionResponse, IAMGetPermissionsSearchRequest, IAMGetPermissionsSearchResponse, IAMPutGroupInvitationRequest, IAMPutGroupInvitationAcceptRequest, ) from src.utils import verify_email_token router = APIRouter( tags=["IAM"], prefix="/iam", ) @router.post("/can_act_on_resource") async def can_act_on_resource( valid_key: service_key_dependency, db: db_dependency, user_claims: claims_dependency, request_model: IAMCAoRRequest, ) -> bool: try: rn = request_model.rn action = request_model.action user_id = user_claims["db_id"] rn_org = rn.organisation rn_service = rn.service rn_resource = rn.resource result = ( db.query(Perm) .join(Service, Service.id == Perm.service_id) .join(GPerms, GPerms.permission_id == Perm.id) .join(Group, Group.id == GPerms.group_id) .join(Org, Org.id == Group.org_id) .join(UserGroups, UserGroups.group_id == Group.id) .join(User, User.id == UserGroups.user_id) .filter(User.id == user_id) .filter(Org.name == rn_org) .filter(Service.name == rn_service) .filter(Perm.resource == rn_resource) .filter(Perm.action == action) ).first() if result: return True else: return False except Exception: return False @router.get("/group/permissions", response_model=IAMGetGroupPermissionsResponse) async def get_group_permissions( group_model: group_model_query_dependency, org_model: org_model_root_claim_query_dependency, ): if group_model.org_id != org_model.id: raise UnauthorizedException("Group does not belong to this organization") return { "organisation": org_model, "group": group_model, "permissions": group_model.permission_rel, } @router.get("/group/users", response_model=IAMGetGroupUsersResponse) async def get_group_users( group_model: group_model_query_dependency, org_model: org_model_root_claim_query_dependency, ): if group_model.org_id != org_model.id: raise UnauthorizedException("Group does not belong to this organization") return { "organisation": org_model, "group": group_model, "users": group_model.user_rel, } @router.post("/group", response_model=IAMPostGroupResponse) async def create_group( db: db_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPostGroupRequest, ): group_model = Group(name=request_model.name, org_id=org_model.id) db.add(group_model) try: db.flush() except IntegrityError as e: if ( getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation ): raise ConflictException("Group with this name already exists") response = GroupSchema(**group_model.__dict__) db.commit() return {"group": response} @router.put("/group/permission", response_model=IAMPutGroupPermissionResponse) async def add_group_permission( db: db_dependency, group_model: group_model_body_dependency, perm_model: perm_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPutGroupPermissionRequest, ): if group_model.org_id != org_model.id: raise UnauthorizedException("Group does not belong to this organization") if perm_model in group_model.permission_rel: raise ConflictException("Group already has this permission") group_model.permission_rel.append(perm_model) db.flush() response = IAMPutGroupPermissionResponse( organisation=OrgSummary(**org_model.__dict__), group=GroupSummary(**group_model.__dict__), permissions=group_model.permission_rel, ) db.commit() return response @router.put("/group/user", response_model=IAMPutGroupUserResponse) async def add_group_user( db: db_dependency, group_model: group_model_body_dependency, user_model: user_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPutGroupUserRequest, ): if group_model.org_id != org_model.id: raise UnauthorizedException("Group does not belong to this organization") if user_model in group_model.user_rel: raise ConflictException("User already in group") group_model.user_rel.append(user_model) db.flush() response = IAMPutGroupUserResponse( group=GroupSchema(**group_model.__dict__), users=group_model.user_rel ) db.commit() return response @router.delete("/group/permissions") async def remove_group_permissions( db: db_dependency, group_model: group_model_query_dependency, perm_model: perm_model_query_dependency, org_model: org_model_root_claim_query_dependency, ): if group_model.org_id != org_model.id: raise UnauthorizedException("Group does not belong to this organization") group_model.permission_rel.remove(perm_model) db.flush() response = IAMDeleteGroupPermissionResponse( group=GroupSchema(**group_model.__dict__), permissions=group_model.permission_rel, ) db.commit() return response @router.delete("/group/user") async def remove_group_user( db: db_dependency, group_model: group_model_query_dependency, user_model: user_model_query_dependency, org_model: org_model_root_claim_query_dependency, ): if group_model.org_id != org_model.id: raise UnauthorizedException("Group does not belong to this organization") user_model.group_rel.remove(group_model) db.flush() response = IAMDeleteGroupUserResponse( group=GroupSchema(**group_model.__dict__), users=group_model.user_rel ) db.commit() return response @router.get("/permissions", response_model=IAMGetPermissionsResponse) async def get_permissions( db: db_dependency, org_model: org_model_root_claim_query_dependency ): permission_models = db.query(Perm).all() return {"permissions": permission_models} @router.post("/permission", response_model=IAMPostPermissionResponse) async def create_new_permission( db: db_dependency, su: super_admin_dependency, request_model: IAMPostPermissionRequest, ): service_model = db.get(Service, request_model.service_id) if service_model is None: raise ServiceNotFoundException(service_id=request_model.service_id) perm_model = Perm(**request_model.__dict__) db.add(perm_model) try: db.flush() except IntegrityError as e: if ( getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation ): raise ConflictException(message="Permission already exists") response = { "id": perm_model.id, "service_name": perm_model.service_name, "resource": perm_model.resource, "action": perm_model.action, } db.commit() return {"permission": response} @router.delete("/permission", status_code=status.HTTP_204_NO_CONTENT) async def delete_permission( db: db_dependency, su: super_admin_dependency, perm_model: perm_model_query_dependency, ): db.delete(perm_model) db.commit() @router.post("/permissions/search", response_model=IAMGetPermissionsSearchResponse) async def post_permissions( db: db_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMGetPermissionsSearchRequest, ): permission_query = db.query(Perm) if request_model.service_id is not None: permission_query = permission_query.filter( Perm.service_id == request_model.service_id ) if request_model.resource is not None: permission_query = permission_query.filter( Perm.resource == request_model.resource ) if request_model.action is not None: permission_query = permission_query.filter(Perm.action == request_model.action) permission_models = permission_query.all() return {"permissions": permission_models} @router.put( "/group/user/invitation", summary="Send an email invitation for non-org member to join a group", status_code=status.HTTP_200_OK, ) async def invitation( background_tasks: BackgroundTasks, org_model: org_model_root_claim_body_dependency, group_model: group_model_body_dependency, request_model: IAMPutGroupInvitationRequest, ): org_id = org_model.id org_name = org_model.name user_email = request_model.user_email group_id = group_model.id group_name = group_model.name background_tasks.add_task( send_user_group_invitation, org_id=org_id, org_name=org_name, user_email=user_email, group_id=group_id, group_name=group_name, ) return "Invitation sent" @router.put( "/group/user//invitation/accept", summary="Accept email invitation to join an org's group", status_code=status.HTTP_200_OK, ) async def accept_invitation( db: db_dependency, user_model: user_model_claims_dependency, request_model: IAMPutGroupInvitationAcceptRequest, ): email_claims = await verify_email_token( token=request_model.jwt, user_model=user_model ) org_model = db.get(Org, email_claims["org_id"]) if org_model is None: raise OrgNotFoundException() group_model = db.get(Group, email_claims["group_id"]) if group_model is None: raise GroupNotFoundException() if group_model not in org_model.group_rel: raise UnauthorizedException("Group and org do not match.") if user_model in group_model.user_rel: raise ConflictException(message="User already in group.") group_model.user_rel.append(user_model) db.commit() return "Invitation accepted"