1
0
Fork 0
forked from sr2/cloud-api

feat: caor docs and response model

This commit is contained in:
Chris Milne 2026-06-10 16:16:56 +01:00
parent 0b521414b3
commit ec41d1ed05
4 changed files with 45 additions and 9 deletions

View file

@ -28,7 +28,7 @@ oidc_dependency = Annotated[str, Depends(oidc)]
def get_dev_user(): def get_dev_user():
return {"db_id": 1} return {"db_id": 1, "email": "chris@sr2.uk"}
async def get_current_user( async def get_current_user(

View file

@ -21,7 +21,7 @@ from sqlalchemy.exc import IntegrityError
from src.iam.exceptions import GroupNotFoundException from src.iam.exceptions import GroupNotFoundException
from src.organisation.exceptions import OrgNotFoundException from src.organisation.exceptions import OrgNotFoundException
from src.schemas import GroupSummary, OrgSummary from src.schemas import GroupSummary, OrgSummary, ResourceName
from src.service.exceptions import ServiceNotFoundException from src.service.exceptions import ServiceNotFoundException
from src.exceptions import ConflictException, ForbiddenException from src.exceptions import ConflictException, ForbiddenException
from src.database import db_dependency from src.database import db_dependency
@ -74,6 +74,7 @@ from src.iam.schemas import (
IAMGetPermissionsSearchResponse, IAMGetPermissionsSearchResponse,
IAMPutGroupInvitationRequest, IAMPutGroupInvitationRequest,
IAMPutGroupInvitationAcceptRequest, IAMPutGroupInvitationAcceptRequest,
IAMCAoRResponse,
) )
from src.utils import verify_email_token from src.utils import verify_email_token
@ -83,13 +84,35 @@ router = APIRouter(
) )
@router.post("/can_act_on_resource") @router.post(
path="/can_act_on_resource",
summary="Used for services to check user access permission",
status_code=status.HTTP_200_OK,
response_model=IAMCAoRResponse,
responses={
status.HTTP_401_UNAUTHORIZED: {
"description": "API Key missing or invalid | Issue verifying user OIDC claims"
},
},
)
async def can_act_on_resource( async def can_act_on_resource(
valid_key: service_key_dependency, valid_key: service_key_dependency,
db: db_dependency, db: db_dependency,
user_claims: claims_dependency, user_claims: claims_dependency,
request_model: IAMCAoRRequest, request_model: IAMCAoRRequest,
) -> bool: ):
"""
This endpoint is not meant for the Hub frontend to interact with.
Services accessing this endpoint must be already registered within the Hub and been issued an API key.
Resource Names have an instance property but permissions do not presently have that level of granularity.
"""
response = {
"allowed": False,
"rn": ResourceName(organisation="", service="", resource=""),
"action": "",
"user": {"id": 0, "email": ""},
}
try: try:
rn = request_model.rn rn = request_model.rn
action = request_model.action action = request_model.action
@ -98,6 +121,10 @@ async def can_act_on_resource(
rn_service = rn.service rn_service = rn.service
rn_resource = rn.resource rn_resource = rn.resource
response["user"] = {"id": user_id, "email": user_claims["email"]}
response["action"] = action
response["rn"] = rn
result = ( result = (
db.query(Perm) db.query(Perm)
.join(Service, Service.id == Perm.service_id) .join(Service, Service.id == Perm.service_id)
@ -114,11 +141,13 @@ async def can_act_on_resource(
).first() ).first()
if result: if result:
return True response["allowed"] = True
else: else:
return False response["allowed"] = False
except Exception: except Exception:
return False response["allowed"] = False
return response
@router.get("/group/permissions", response_model=IAMGetGroupPermissionsResponse) @router.get("/group/permissions", response_model=IAMGetGroupPermissionsResponse)

View file

@ -52,6 +52,13 @@ class IAMCAoRRequest(CustomBaseModel):
rn: ResourceName rn: ResourceName
class IAMCAoRResponse(CustomBaseModel):
allowed: bool
user: UserSummary
action: str
rn: ResourceName
class IAMGetGroupPermissionsResponse(CustomBaseModel): class IAMGetGroupPermissionsResponse(CustomBaseModel):
organisation: OrgSummary organisation: OrgSummary
group: GroupSummary group: GroupSummary

View file

@ -36,7 +36,7 @@ async def test_post_act_on_resource_endpoint_success(default_client: AsyncClient
data = resp.json() data = resp.json()
assert resp.status_code == 200 assert resp.status_code == 200
assert data is True assert data["allowed"] is True
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -148,7 +148,7 @@ async def test_act_on_resource_logic(
data = resp.json() data = resp.json()
assert resp.status_code == 200 assert resp.status_code == 200
assert data == expected_response assert data["allowed"] == expected_response
@pytest.mark.anyio @pytest.mark.anyio