""" Router endpoints for Endpoints: - List: Description - Endpoints: Description """ from typing import Annotated, Optional from fastapi import APIRouter, Query, HTTPException from src.database import db_dependency from src.schemas import ResourceName from src.auth.service import claims_dependency from src.user.models import User from src.organisation.models import Organisation as Org from src.service.models import Service from src.organisation.dependencies import org_model_dependency from src.iam.service import service_key_dependency from src.iam.models import Permission as Perm, GroupPermissions as GPerms, Group, UserGroups router = APIRouter( tags=["IAM"], prefix="/iam", ) @router.post("/can_act_on_resource") async def can_act_on_resource(valid_key: service_key_dependency, db: db_dependency, user_claims: claims_dependency, rn: ResourceName, action: str) -> bool: try: user_id = user_claims["db_id"] rn_org = rn.organisation rn_service = rn.service rn_resource = rn.resource result = (db.query(Perm) .join(Service, Service.id == Perm.service_id) .join(GPerms, GPerms.permission_id == Perm.id) .join(Group, Group.id == GPerms.group_id) .join(Org, Org.id == Group.org_id) .join(UserGroups, UserGroups.group_id == Group.id) .join(User, User.id == UserGroups.user_id) .filter(User.id == user_id) .filter(Org.name == rn_org) .filter(Service.name == rn_service) .filter(Perm.resource == rn_resource) .filter(Perm.action == action) ).first() if result: return True else: return False except Exception as e: print(e) raise HTTPException(status_code=500, detail="Internal server error") @router.get("/group/permissions") async def get_group_permissions(db: db_dependency, group_id: Annotated[int, Query(gt=0)]): # TODO: iam_admin_dependency group_perms = db.query(Perm).join(GPerms).filter(GPerms.group_id==group_id).all() # TODO: Response model return group_perms @router.get("/group/users") async def get_group_users(db: db_dependency, group_id: Annotated[int, Query(gt=0)]): # TODO: iam_admin_dependency group_users = db.query(User).join(UserGroups).filter(UserGroups.group_id == group_id).all() # TODO: Response model return group_users @router.post("/group") async def create_group(db: db_dependency, group_name: str, org_model: org_model_dependency, org_id: int): # TODO: iam_admin_dependency # TODO: Request model group_model = Group(name=group_name, org_id=org_id) db.add(group_model) db.commit() # TODO: Response model @router.put("/group/permissions") async def add_group_permissions(db: db_dependency, group_id: int, permission_id: int, org_model: org_model_dependency, org_id: int): # TODO: iam_admin_dependency # TODO: Request model g_perm_model = GPerms(group_id=group_id, permission_id=permission_id) db.add(g_perm_model) db.commit() # TODO: Response model @router.put("/group/users") async def add_group_users(db: db_dependency, group_id: int, user_ids: list[int], org_model: org_model_dependency, org_id: int): # TODO: iam_admin_dependency # TODO: Request model for user_id in user_ids: user_group_model = UserGroups(group_id=group_id, user_id=user_id, org_id=org_id) db.add(user_group_model) db.commit() # TODO: Response model @router.delete("/group/permissions") async def remove_group_permissions(db: db_dependency, group_id: int, org_model: org_model_dependency, org_id: int, permission_id: int): # TODO: iam_admin_dependency # TODO: Request model g_perm_model = db.query(GPerms).filter(GPerms.group_id == group_id, GPerms.permission_id == permission_id).first() if g_perm_model is None: return db.delete(g_perm_model) db.commit() return # TODO: Response model @router.delete("/group/user") async def remove_group_user(db: db_dependency, group_id: int, user_id: int, org_model: org_model_dependency, org_id: int): # TODO: iam_admin_dependency # TODO: Request model user_group_model = db.query(UserGroups).filter(UserGroups.group_id == group_id, UserGroups.user_id == user_id).first() if user_group_model is None: return db.delete(user_group_model) db.commit() return # TODO: Response model @router.get("/permissions") async def get_permissions(db: db_dependency, org_model: org_model_dependency, org_id: int): # TODO: iam_admin_dependency # TODO: request model permission_models = db.query(Perm).all() # TODO: Response model return permission_models @router.post("/permission") async def create_new_permission(db: db_dependency, service_id: int, resource: str, action: str): # TODO: super_admin_dependency perm_model = Perm(service_id=service_id, resource=resource, action=action) db.add(perm_model) db.commit() @router.delete("/permission") async def delete_permission(db: db_dependency, service_id: int, resource: str, action: str, org_model: org_model_dependency, org_id: int): # TODO: iam_admin_dependency # TODO: Request model perm_model = db.query(Perm).filter(Perm.service_id==service_id, Perm.resource==resource, Perm.action==action).first() if perm_model is None: return db.delete(perm_model) db.commit() return # TODO: Response model @router.get("/permissions/search") async def get_permissions(db: db_dependency, org_model: org_model_dependency, org_id: int, service_id: Optional[int] = None, resource: Optional[str] = None, action: Optional[str] = None): # TODO: iam_admin_dependency # TODO: request model permission_query = db.query(Perm) if service_id is not None: permission_query = permission_query.filter(Perm.service_id == service_id) if resource is not None: permission_query = permission_query.filter(Perm.resource == resource) if action is not None: permission_query = permission_query.filter(Perm.action == action) permission_models = permission_query.all() # TODO: Response model return permission_models