feat: org status check moved
Accessing endpoints as super admin no longer requires the org to be approved.
This commit is contained in:
parent
a655eaf543
commit
092e12a892
4 changed files with 94 additions and 115 deletions
|
|
@ -9,8 +9,9 @@ Exports:
|
|||
"""
|
||||
|
||||
from typing import Annotated
|
||||
from fastapi import Depends
|
||||
from fastapi import Depends, Request
|
||||
|
||||
from src.auth.service import org_status_check
|
||||
from src.exceptions import ForbiddenException
|
||||
from src.user.dependencies import user_model_claims_dependency
|
||||
from src.user.models import User
|
||||
|
|
@ -37,16 +38,19 @@ async def org_query_root_claims(
|
|||
user_model: user_model_claims_dependency,
|
||||
org_model: org_model_query_dependency,
|
||||
su_emails: su_list_dependency,
|
||||
request: Request,
|
||||
):
|
||||
if org_model.root_user_id == user_model.id:
|
||||
return org_model
|
||||
|
||||
try:
|
||||
if await user_model_super_admin(user_model, su_emails):
|
||||
return org_model
|
||||
except ForbiddenException:
|
||||
pass
|
||||
|
||||
await org_status_check(org_model, request)
|
||||
|
||||
if org_model.root_user_id == user_model.id:
|
||||
return org_model
|
||||
|
||||
raise ForbiddenException(message="Must be the org's root user")
|
||||
|
||||
|
||||
|
|
@ -59,16 +63,19 @@ async def org_body_root_claims(
|
|||
user_model: user_model_claims_dependency,
|
||||
org_model: org_model_body_dependency,
|
||||
su_emails: su_list_dependency,
|
||||
request: Request,
|
||||
):
|
||||
if org_model.root_user_id == user_model.id:
|
||||
return org_model
|
||||
|
||||
try:
|
||||
if await user_model_super_admin(user_model, su_emails):
|
||||
return org_model
|
||||
except ForbiddenException:
|
||||
pass
|
||||
|
||||
await org_status_check(org_model, request)
|
||||
|
||||
if org_model.root_user_id == user_model.id:
|
||||
return org_model
|
||||
|
||||
raise ForbiddenException(message="Must be the org's root user")
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -14,10 +14,13 @@ from joserfc.errors import ExpiredTokenError
|
|||
from joserfc.jwk import KeySet
|
||||
from urllib.request import urlopen
|
||||
|
||||
from fastapi import Depends
|
||||
from fastapi import Depends, Request
|
||||
from fastapi.security import OpenIdConnect
|
||||
|
||||
from src.exceptions import UnauthorizedException
|
||||
from src.organisation.constants import Status as OrgStatus
|
||||
from src.organisation.exceptions import AwaitingApprovalException
|
||||
from src.organisation.models import Organisation as Org
|
||||
from src.exceptions import UnauthorizedException, ForbiddenException
|
||||
from src.auth.config import auth_settings
|
||||
from src.user.service import add_user_to_db
|
||||
from src.database import db_dependency
|
||||
|
|
@ -27,7 +30,7 @@ oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG)
|
|||
oidc_dependency = Annotated[str, Depends(oidc)]
|
||||
|
||||
|
||||
def get_dev_user():
|
||||
async def get_dev_user():
|
||||
return {"db_id": 1, "email": "chris@sr2.uk"}
|
||||
|
||||
|
||||
|
|
@ -61,3 +64,26 @@ async def get_current_user(
|
|||
|
||||
|
||||
claims_dependency = Annotated[dict[str, Any], Depends(get_current_user)]
|
||||
|
||||
|
||||
async def org_status_check(org_model: Org, request: Request):
|
||||
org_status = OrgStatus(org_model.status)
|
||||
if org_status.is_blocked:
|
||||
raise ForbiddenException("This organisation cannot perform this action.")
|
||||
|
||||
root = "/api/v1"
|
||||
|
||||
pre_approval_endpoints = [
|
||||
f"PATCH{root}/org/status",
|
||||
f"PATCH{root}/org/questionnaire",
|
||||
f"GET{root}/org",
|
||||
f"GET{root}/org/contact",
|
||||
f"PATCH{root}/org/contact",
|
||||
f"DELETE{root}/org/self",
|
||||
]
|
||||
current_request = f"{request.method}{request.url.path}"
|
||||
if (
|
||||
current_request not in pre_approval_endpoints
|
||||
and org_model.status != OrgStatus.APPROVED
|
||||
):
|
||||
raise AwaitingApprovalException(org_model.id)
|
||||
|
|
|
|||
|
|
@ -7,65 +7,38 @@ Exports:
|
|||
"""
|
||||
|
||||
from typing import Annotated, Optional
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from fastapi import Depends, Query, Request
|
||||
from fastapi import Depends, Query
|
||||
|
||||
from src.database import db_dependency
|
||||
from src.exceptions import ForbiddenException
|
||||
|
||||
from src.organisation.schemas import OrgIDMixin
|
||||
from src.organisation.models import Organisation as Org
|
||||
from src.organisation.exceptions import OrgNotFoundException, AwaitingApprovalException
|
||||
from src.organisation.constants import Status as OrgStatus
|
||||
|
||||
|
||||
def get_org_model(db: Session, request: Request, org_id: int):
|
||||
org_model = db.get(Org, org_id)
|
||||
if org_model is None:
|
||||
raise OrgNotFoundException(org_id)
|
||||
|
||||
org_status = OrgStatus(org_model.status)
|
||||
if org_status.is_blocked:
|
||||
raise ForbiddenException("This organisation cannot perform this action.")
|
||||
|
||||
root = "/api/v1"
|
||||
|
||||
pre_approval_endpoints = [
|
||||
f"PATCH{root}/org/status",
|
||||
f"PATCH{root}/org/questionnaire",
|
||||
f"GET{root}/org",
|
||||
f"GET{root}/org/contact",
|
||||
f"PATCH{root}/org/contact",
|
||||
f"DELETE{root}/org/self",
|
||||
]
|
||||
current_request = f"{request.method}{request.url.path}"
|
||||
if (
|
||||
current_request not in pre_approval_endpoints
|
||||
and org_model.status != OrgStatus.APPROVED
|
||||
):
|
||||
raise AwaitingApprovalException(org_id)
|
||||
|
||||
return org_model
|
||||
from src.organisation.exceptions import OrgNotFoundException
|
||||
|
||||
|
||||
def get_org_model_query(
|
||||
db: db_dependency, request: Request, org_id: Annotated[int, Query(gt=0)]
|
||||
db: db_dependency, org_id: Annotated[int, Query(gt=0)]
|
||||
) -> type[Org]:
|
||||
return get_org_model(db, request, org_id)
|
||||
org_model = db.get(Org, org_id)
|
||||
if org_model is None:
|
||||
raise OrgNotFoundException(org_id)
|
||||
return org_model
|
||||
|
||||
|
||||
org_model_query_dependency = Annotated[type[Org], Depends(get_org_model_query)]
|
||||
|
||||
|
||||
def get_org_model_body(
|
||||
db: db_dependency, request: Request, request_model: OrgIDMixin
|
||||
) -> type[Org]:
|
||||
def get_org_model_body(db: db_dependency, request_model: OrgIDMixin) -> type[Org]:
|
||||
org_id: Optional[int] = getattr(request_model, "organisation_id", None)
|
||||
if org_id is None:
|
||||
raise OrgNotFoundException()
|
||||
|
||||
return get_org_model(db, request, org_id)
|
||||
org_model = db.get(Org, org_id)
|
||||
if org_model is None:
|
||||
raise OrgNotFoundException(org_id)
|
||||
|
||||
return org_model
|
||||
|
||||
|
||||
org_model_body_dependency = Annotated[type[Org], Depends(get_org_model_body)]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue