cloud-api/src/iam/router.py
2026-05-27 16:26:34 +01:00

198 lines
7.8 KiB
Python

"""
Router endpoints for <this module>
Endpoints:
- List: Description
- Endpoints: Description
"""
from fastapi import APIRouter, status
from sqlalchemy.exc import IntegrityError
from psycopg import errors
from src.exceptions import Conflict
from src.database import db_dependency
from src.schemas import ResourceName
from src.auth.exceptions import UnauthorizedException
from src.auth.service import claims_dependency
from src.auth.dependencies import org_model_root_claim_query_dependency, org_model_root_claim_body_dependency, \
super_admin_dependency
from src.user.models import User
from src.user.dependencies import user_model_body_dependency
from src.organisation.models import Organisation as Org
from src.service.models import Service
from src.iam.service import service_key_dependency
from src.iam.models import Permission as Perm, GroupPermissions as GPerms, Group, UserGroups
from src.iam.dependencies import group_model_query_dependency, group_model_body_dependency, perm_model_body_dependency
from src.iam.schemas import IAMGetGroupPermissionsResponse, IAMGetGroupUsersResponse, IAMPostGroupRequest, \
GroupResponse, IAMPostGroupResponse, IAMPutGroupPermissionRequest, IAMPutGroupPermissionResponse, \
IAMPutGroupUserRequest, IAMPutGroupUserResponse, IAMDeleteGroupPermissionRequest, IAMDeleteGroupPermissionResponse, \
IAMDeleteGroupUserRequest, IAMDeleteGroupUserResponse, IAMGetPermissionsResponse, IAMPostPermissionRequest, \
IAMPostPermissionResponse, PermissionResponse, IAMDeletePermissionRequest, IAMGetPermissionsSearchRequest, IAMGetPermissionsSearchResponse
router = APIRouter(
tags=["IAM"],
prefix="/iam",
)
@router.post("/can_act_on_resource")
async def can_act_on_resource(valid_key: service_key_dependency, db: db_dependency, user_claims: claims_dependency,
rn: ResourceName, action: str) -> bool:
try:
user_id = user_claims["db_id"]
rn_org = rn.organisation
rn_service = rn.service
rn_resource = rn.resource
result = (db.query(Perm)
.join(Service, Service.id == Perm.service_id)
.join(GPerms, GPerms.permission_id == Perm.id)
.join(Group, Group.id == GPerms.group_id)
.join(Org, Org.id == Group.org_id)
.join(UserGroups, UserGroups.group_id == Group.id)
.join(User, User.id == UserGroups.user_id)
.filter(User.id == user_id)
.filter(Org.name == rn_org)
.filter(Service.name == rn_service)
.filter(Perm.resource == rn_resource)
.filter(Perm.action == action)
).first()
if result:
return True
else:
return False
except Exception:
raise UnauthorizedException()
@router.get("/group/permissions", response_model=IAMGetGroupPermissionsResponse)
async def get_group_permissions(group_model: group_model_query_dependency, org_model: org_model_root_claim_query_dependency):
if group_model.org_id != org_model.id:
raise UnauthorizedException()
return {"permissions": group_model.permission_rel}
@router.get("/group/users", response_model=IAMGetGroupUsersResponse)
async def get_group_users(group_model: group_model_query_dependency, org_model: org_model_root_claim_query_dependency):
if group_model.org_id == org_model.id:
raise UnauthorizedException()
return {"users": group_model.user_rel}
@router.post("/group", response_model=IAMPostGroupResponse)
async def create_group(db: db_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPostGroupRequest):
group_model = Group(name=request_model.name, org_id=org_model.id)
db.add(group_model)
try:
db.flush()
except IntegrityError as e:
if isinstance(e.orig, errors.UniqueViolation):
raise Conflict("Group with this name already exists")
response = GroupResponse(**group_model.__dict__)
db.commit()
return {"group": response}
@router.put("/group/permission", response_model=IAMPutGroupPermissionResponse)
async def add_group_permission(db: db_dependency, group_model: group_model_body_dependency, perm_model: perm_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPutGroupPermissionRequest):
if group_model.org_id == org_model.id:
raise UnauthorizedException()
if perm_model in group_model.permission_rel:
raise Conflict("Group already has this permission")
group_model.permission_rel.append(perm_model)
db.flush()
response = IAMPutGroupPermissionResponse(group=GroupResponse(**group_model.__dict__), permissions=group_model.permission_rel)
db.commit()
return response
@router.put("/group/user")
async def add_group_user(db: db_dependency, group_model: group_model_body_dependency, user_model: user_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMPutGroupUserRequest):
if group_model.org_id == org_model.id:
raise UnauthorizedException()
if user_model in group_model.user_rel:
raise Conflict("User already in group")
group_model.user_rel.append(user_model)
db.flush()
response = IAMPutGroupUserResponse(group=GroupResponse(**group_model.__dict__), users=group_model.user_rel)
db.commit()
return response
@router.delete("/group/permissions")
async def remove_group_permissions(db: db_dependency, group_model: group_model_body_dependency, perm_model: perm_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMDeleteGroupPermissionRequest):
if group_model.org_id == org_model.id:
raise UnauthorizedException()
group_model.permission_rel.remove(perm_model)
db.flush()
response = IAMDeleteGroupPermissionResponse(group=GroupResponse(**group_model.__dict__),
permissions=group_model.permission_rel)
db.commit()
return response
@router.delete("/group/user")
async def remove_group_user(db: db_dependency, group_model: group_model_body_dependency, user_model: user_model_body_dependency, org_model: org_model_root_claim_body_dependency, request_model: IAMDeleteGroupUserRequest):
if group_model.org_id == org_model.id:
raise UnauthorizedException()
user_model.group_rel.remove(group_model)
db.flush()
response = IAMDeleteGroupUserResponse(group=GroupResponse(**group_model.__dict__), users=group_model.user_rel)
db.commit()
return response
@router.get("/permissions", response_model=IAMGetPermissionsResponse)
async def get_permissions(db: db_dependency, org_model: org_model_root_claim_body_dependency):
permission_models = db.query(Perm).all()
return {"permissions": permission_models}
@router.post("/permission")
async def create_new_permission(db: db_dependency, su: super_admin_dependency, request_mode: IAMPostPermissionRequest):
perm_model = Perm(**request_mode.__dict__)
try:
db.add(perm_model)
except IntegrityError as e:
if isinstance(e.orig, errors.UniqueViolation):
raise Conflict(message="Permission already exists")
db.flush()
response = IAMPostPermissionResponse(permission=PermissionResponse(**perm_model.__dict__))
db.commit()
return response
@router.delete("/permission", status_code=status.HTTP_204_NO_CONTENT)
async def delete_permission(db: db_dependency, su: super_admin_dependency, perm_model: perm_model_body_dependency, request_model: IAMDeletePermissionRequest):
db.delete(perm_model)
db.commit()
@router.get("/permissions/search", response_model=IAMGetPermissionsSearchResponse)
async def get_permissions(db: db_dependency, org_model: org_model_root_claim_body_dependency, search: IAMGetPermissionsSearchRequest):
permission_query = db.query(Perm)
if search.service_id is not None:
permission_query = permission_query.filter(Perm.service_id == search.service_id)
if search.resource is not None:
permission_query = permission_query.filter(Perm.resource == search.resource)
if search.action is not None:
permission_query = permission_query.filter(Perm.action == search. action)
permission_models = permission_query.all()
return {"permissions": permission_models}