cloud-api/src/iam/router.py
luxferre 0b521414b3
All checks were successful
ci / lint_and_test (push) Successful in 14s
feat: add group user by id restriction
Adding by ID can only be done for existing org members
2026-06-10 14:48:22 +01:00

402 lines
12 KiB
Python

"""
Router endpoints for IAM
Endpoints:
- [POST](/iam/can_act_on_resource): [API key & user claim]: Service access point to verify user permissions
- [GET](/iam/group/permissions): [root user]: Gets list of perms(service, resource, action) the given group(id) has
- [DELETE](/iam/group/permissions): [root user]: Removes a given perm(id) from the given group(id)
- [GET](/iam/group/users): [root user]: Gets a list of users(id, name, email) that are assigned to the given group(id)
- [POST](/iam/group): [root user]: Creates a new group for the given org(id)
- [PUT](/iam/group/permission): [root user]: Assigns a perm(id) to the given group(id)
- [PUT](/iam/group/user): [root user]: Assigns a user(id) to a group(id)
- [DELETE](/iam/group/user): [root user]: Removes a user(id) from the given group(id)
- [GET](/iam/permissions): [root user]: Gets a list of all permissions
- [POST](/iam/permission): [super admin]: Creates a new permission
- [DELETE](/iam/permission): [super admin]: Removes a permission
- [GET](/iam/permissions/search): [root user]: Returns a list of permissions matching a filter(service|resource|action)
"""
from fastapi import APIRouter, status, BackgroundTasks
from sqlalchemy.exc import IntegrityError
from src.iam.exceptions import GroupNotFoundException
from src.organisation.exceptions import OrgNotFoundException
from src.schemas import GroupSummary, OrgSummary
from src.service.exceptions import ServiceNotFoundException
from src.exceptions import ConflictException, ForbiddenException
from src.database import db_dependency
from src.auth.exceptions import UnauthorizedException
from src.auth.service import claims_dependency
from src.auth.dependencies import (
org_model_root_claim_query_dependency,
org_model_root_claim_body_dependency,
super_admin_dependency,
)
from src.user.models import User
from src.user.dependencies import (
user_model_body_dependency,
user_model_query_dependency,
user_model_claims_dependency,
)
from src.organisation.models import Organisation as Org
from src.service.models import Service
from src.iam.service import service_key_dependency, send_user_group_invitation
from src.iam.models import (
Permission as Perm,
GroupPermissions as GPerms,
Group,
UserGroups,
)
from src.iam.dependencies import (
group_model_query_dependency,
group_model_body_dependency,
perm_model_body_dependency,
perm_model_query_dependency,
)
from src.iam.schemas import (
GroupSchema,
IAMCAoRRequest,
IAMGetGroupPermissionsResponse,
IAMGetGroupUsersResponse,
IAMPostGroupRequest,
IAMPostGroupResponse,
IAMPutGroupPermissionRequest,
IAMPutGroupPermissionResponse,
IAMPutGroupUserRequest,
IAMPutGroupUserResponse,
IAMDeleteGroupPermissionResponse,
IAMDeleteGroupUserResponse,
IAMGetPermissionsResponse,
IAMPostPermissionRequest,
IAMPostPermissionResponse,
IAMGetPermissionsSearchRequest,
IAMGetPermissionsSearchResponse,
IAMPutGroupInvitationRequest,
IAMPutGroupInvitationAcceptRequest,
)
from src.utils import verify_email_token
router = APIRouter(
tags=["IAM"],
prefix="/iam",
)
@router.post("/can_act_on_resource")
async def can_act_on_resource(
valid_key: service_key_dependency,
db: db_dependency,
user_claims: claims_dependency,
request_model: IAMCAoRRequest,
) -> bool:
try:
rn = request_model.rn
action = request_model.action
user_id = user_claims["db_id"]
rn_org = rn.organisation
rn_service = rn.service
rn_resource = rn.resource
result = (
db.query(Perm)
.join(Service, Service.id == Perm.service_id)
.join(GPerms, GPerms.permission_id == Perm.id)
.join(Group, Group.id == GPerms.group_id)
.join(Org, Org.id == Group.org_id)
.join(UserGroups, UserGroups.group_id == Group.id)
.join(User, User.id == UserGroups.user_id)
.filter(User.id == user_id)
.filter(Org.name == rn_org)
.filter(Service.name == rn_service)
.filter(Perm.resource == rn_resource)
.filter(Perm.action == action)
).first()
if result:
return True
else:
return False
except Exception:
return False
@router.get("/group/permissions", response_model=IAMGetGroupPermissionsResponse)
async def get_group_permissions(
group_model: group_model_query_dependency,
org_model: org_model_root_claim_query_dependency,
):
if group_model.org_id != org_model.id:
raise UnauthorizedException("Group does not belong to this organization")
return {
"organisation": org_model,
"group": group_model,
"permissions": group_model.permission_rel,
}
@router.get("/group/users", response_model=IAMGetGroupUsersResponse)
async def get_group_users(
group_model: group_model_query_dependency,
org_model: org_model_root_claim_query_dependency,
):
if group_model.org_id != org_model.id:
raise UnauthorizedException("Group does not belong to this organization")
return {
"organisation": org_model,
"group": group_model,
"users": group_model.user_rel,
}
@router.post("/group", response_model=IAMPostGroupResponse)
async def create_group(
db: db_dependency,
org_model: org_model_root_claim_body_dependency,
request_model: IAMPostGroupRequest,
):
group_model = Group(name=request_model.name, org_id=org_model.id)
db.add(group_model)
try:
db.flush()
except IntegrityError as e:
if (
getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation
or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation
):
raise ConflictException("Group with this name already exists")
response = GroupSchema(**group_model.__dict__)
db.commit()
return {"group": response}
@router.put("/group/permission", response_model=IAMPutGroupPermissionResponse)
async def add_group_permission(
db: db_dependency,
group_model: group_model_body_dependency,
perm_model: perm_model_body_dependency,
org_model: org_model_root_claim_body_dependency,
request_model: IAMPutGroupPermissionRequest,
):
if group_model.org_id != org_model.id:
raise UnauthorizedException("Group does not belong to this organization")
if perm_model in group_model.permission_rel:
raise ConflictException("Group already has this permission")
group_model.permission_rel.append(perm_model)
db.flush()
response = IAMPutGroupPermissionResponse(
organisation=OrgSummary(**org_model.__dict__),
group=GroupSummary(**group_model.__dict__),
permissions=group_model.permission_rel,
)
db.commit()
return response
@router.put("/group/user", response_model=IAMPutGroupUserResponse)
async def add_group_user(
db: db_dependency,
group_model: group_model_body_dependency,
user_model: user_model_body_dependency,
org_model: org_model_root_claim_body_dependency,
request_model: IAMPutGroupUserRequest,
):
if group_model.org_id != org_model.id:
raise UnauthorizedException("Group does not belong to this organization")
if user_model in group_model.user_rel:
raise ConflictException("User already in group")
if user_model not in org_model.user_rel:
raise ForbiddenException(
"Adding users directly can only be done with org members. Use email invitation instead."
)
group_model.user_rel.append(user_model)
db.flush()
response = IAMPutGroupUserResponse(
group=GroupSchema(**group_model.__dict__), users=group_model.user_rel
)
db.commit()
return response
@router.delete("/group/permissions")
async def remove_group_permissions(
db: db_dependency,
group_model: group_model_query_dependency,
perm_model: perm_model_query_dependency,
org_model: org_model_root_claim_query_dependency,
):
if group_model.org_id != org_model.id:
raise UnauthorizedException("Group does not belong to this organization")
group_model.permission_rel.remove(perm_model)
db.flush()
response = IAMDeleteGroupPermissionResponse(
group=GroupSchema(**group_model.__dict__),
permissions=group_model.permission_rel,
)
db.commit()
return response
@router.delete("/group/user")
async def remove_group_user(
db: db_dependency,
group_model: group_model_query_dependency,
user_model: user_model_query_dependency,
org_model: org_model_root_claim_query_dependency,
):
if group_model.org_id != org_model.id:
raise UnauthorizedException("Group does not belong to this organization")
user_model.group_rel.remove(group_model)
db.flush()
response = IAMDeleteGroupUserResponse(
group=GroupSchema(**group_model.__dict__), users=group_model.user_rel
)
db.commit()
return response
@router.get("/permissions", response_model=IAMGetPermissionsResponse)
async def get_permissions(
db: db_dependency, org_model: org_model_root_claim_query_dependency
):
permission_models = db.query(Perm).all()
return {"permissions": permission_models}
@router.post("/permission", response_model=IAMPostPermissionResponse)
async def create_new_permission(
db: db_dependency,
su: super_admin_dependency,
request_model: IAMPostPermissionRequest,
):
service_model = db.get(Service, request_model.service_id)
if service_model is None:
raise ServiceNotFoundException(service_id=request_model.service_id)
perm_model = Perm(**request_model.__dict__)
db.add(perm_model)
try:
db.flush()
except IntegrityError as e:
if (
getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation
or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation
):
raise ConflictException(message="Permission already exists")
response = {
"id": perm_model.id,
"service_name": perm_model.service_name,
"resource": perm_model.resource,
"action": perm_model.action,
}
db.commit()
return {"permission": response}
@router.delete("/permission", status_code=status.HTTP_204_NO_CONTENT)
async def delete_permission(
db: db_dependency,
su: super_admin_dependency,
perm_model: perm_model_query_dependency,
):
db.delete(perm_model)
db.commit()
@router.post("/permissions/search", response_model=IAMGetPermissionsSearchResponse)
async def post_permissions(
db: db_dependency,
org_model: org_model_root_claim_body_dependency,
request_model: IAMGetPermissionsSearchRequest,
):
permission_query = db.query(Perm)
if request_model.service_id is not None:
permission_query = permission_query.filter(
Perm.service_id == request_model.service_id
)
if request_model.resource is not None:
permission_query = permission_query.filter(
Perm.resource == request_model.resource
)
if request_model.action is not None:
permission_query = permission_query.filter(Perm.action == request_model.action)
permission_models = permission_query.all()
return {"permissions": permission_models}
@router.put(
"/group/user/invitation",
summary="Send an email invitation for non-org member to join a group",
status_code=status.HTTP_200_OK,
)
async def invitation(
background_tasks: BackgroundTasks,
org_model: org_model_root_claim_body_dependency,
group_model: group_model_body_dependency,
request_model: IAMPutGroupInvitationRequest,
):
org_id = org_model.id
org_name = org_model.name
user_email = request_model.user_email
group_id = group_model.id
group_name = group_model.name
background_tasks.add_task(
send_user_group_invitation,
org_id=org_id,
org_name=org_name,
user_email=user_email,
group_id=group_id,
group_name=group_name,
)
return "Invitation sent"
@router.put(
"/group/user//invitation/accept",
summary="Accept email invitation to join an org's group",
status_code=status.HTTP_200_OK,
)
async def accept_invitation(
db: db_dependency,
user_model: user_model_claims_dependency,
request_model: IAMPutGroupInvitationAcceptRequest,
):
email_claims = await verify_email_token(
token=request_model.jwt, user_model=user_model
)
org_model = db.get(Org, email_claims["org_id"])
if org_model is None:
raise OrgNotFoundException()
group_model = db.get(Group, email_claims["group_id"])
if group_model is None:
raise GroupNotFoundException()
if group_model not in org_model.group_rel:
raise UnauthorizedException("Group and org do not match.")
if user_model in group_model.user_rel:
raise ConflictException(message="User already in group.")
group_model.user_rel.append(user_model)
db.commit()
return "Invitation accepted"