Compare commits

..

No commits in common. "657f91d73d3a6d88a43acac64c9f644273acadb0" and "b3689c8af6855e51eed6e805c7531c20dad86bc5" have entirely different histories.

15 changed files with 177 additions and 424 deletions

View file

@ -22,15 +22,12 @@ from src.user.service import add_user_to_db
from src.organisation.models import OrgUsers, Organisation as Org
from src.user.models import User
from src.database import db_dependency
from src.organisation.dependencies import org_model_query_dependency
from src.organisation.dependencies import org_model_dependency
oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG)
oidc_dependency = Annotated[str, Depends(oidc)]
def get_dev_user():
return {"db_id": 1}
async def get_current_user(oidc_auth_string: oidc_dependency) -> dict[str, Any]:
config_url = urlopen(auth_settings.OIDC_CONFIG)
@ -54,7 +51,7 @@ async def get_current_user(oidc_auth_string: oidc_dependency) -> dict[str, Any]:
try:
claims_requests.validate(token.claims)
except ExpiredTokenError:
except ExpiredTokenError as e:
raise HTTPException(status_code=401, detail="Token expired")
db_id = await add_user_to_db(token.claims)
@ -93,7 +90,7 @@ async def is_org_user(claims: claims_dependency, db: db_dependency, org_id: int
org_user_dependency = Annotated[dict[str, Any], Depends(is_org_user)]
async def is_org_root_query(claims: claims_dependency, db: db_dependency, org_model: org_model_query_dependency):
async def is_org_root(claims: claims_dependency, db: db_dependency, org_model: org_model_dependency, org_id: int = Path(gt=0)):
db_id = claims.get("db_id", None)
if db_id is None:
raise HTTPException(status_code=404, detail="User not found in db")
@ -104,7 +101,7 @@ async def is_org_root_query(claims: claims_dependency, db: db_dependency, org_mo
raise HTTPException(status_code=401, detail="Not authorised")
root_user_query_dependency = Annotated[dict[str, Any], Depends(is_org_root_query)]
root_user_dependency = Annotated[dict[str, Any], Depends(is_org_root)]
async def is_super_admin(claims: claims_dependency):

View file

@ -23,7 +23,6 @@ class Config(CustomBaseSettings):
APP_VERSION: str = "0.1"
ENVIRONMENT: Environment = Environment.PRODUCTION
SECRET_KEY: SecretStr = ""
DISABLE_AUTH: bool = False
CORS_ORIGINS: list[str] = ["*"]
CORS_ORIGINS_REGEX: str | None = None

View file

@ -1,5 +1,5 @@
"""
Router dependencies for the IAM module
Router dependencies for <this module>
Classes:
- List: Description
@ -8,49 +8,4 @@ Classes:
Functions:
- List: Description
- Functions: Description
"""
from typing import Annotated, Optional
from fastapi import Depends, Query
from src.database import db_dependency
from src.iam.models import Group
from src.iam.exceptions import GroupNotFoundException, PermNotFoundException
from src.iam.schemas import GroupIDMixin, PermIDMixin
def get_group_model_query(db: db_dependency, group_id: Annotated[int, Query(gt=0)]) -> type[Group]:
group_model = db.get(Group, group_id)
if group_model is None:
raise GroupNotFoundException(group_id)
return group_model
group_model_query_dependency = Annotated[type[Group], Depends(get_group_model_query)]
def get_group_model_body(db: db_dependency, request_model: Optional[GroupIDMixin] = None) -> type[Group]:
group_id = getattr(request_model, "group_id", None)
if group_id is None:
raise GroupNotFoundException()
group_model = db.get(Group, group_id)
if group_model is None:
raise GroupNotFoundException(group_id)
return group_model
group_model_body_dependency = Annotated[type[Group], Depends(get_group_model_body)]
def get_perm_model_body(db: db_dependency, request_model: Optional[PermIDMixin] = None) -> type[Group]:
perm_id = getattr(request_model, "permission_id", None)
if perm_id is None:
raise PermNotFoundException
group_model = db.get(Group, perm_id)
if group_model is None:
raise PermNotFoundException(perm_id)
return group_model
perm_model_body_dependency = Annotated[type[Group], Depends(get_perm_model_body)]
"""

View file

@ -1,28 +1,7 @@
"""
Module specific exceptions for the IAM module
Module specific exceptions for <this module>
Exceptions:
- List: Description
- Exceptions: Description
"""
from typing import Optional
from fastapi import HTTPException, status
class GroupNotFoundException(HTTPException):
def __init__(self, group_id: Optional[int] = None) -> None:
detail = "Group not found" if group_id is None else f"User with ID '{group_id}' was not found."
super().__init__(
status_code=status.HTTP_404_NOT_FOUND,
detail=detail,
)
class PermNotFoundException(HTTPException):
def __init__(self, perm_id: Optional[int] = None) -> None:
detail = "Permission not found" if perm_id is None else f"User with ID '{perm_id}' was not found."
super().__init__(
status_code=status.HTTP_404_NOT_FOUND,
detail=detail,
)
"""

View file

@ -22,19 +22,6 @@ class Permission(Base):
UniqueConstraint("service_id", "resource", "action", name="uniq_permission_resource_and_action")
service_rel = relationship("Service", foreign_keys=[service_id])
@property
def service_name(self):
return self.service_rel.name
group_rel = relationship(
"Group",
secondary="group_permissions",
back_populates="permission_rel"
)
class Group(Base):
__tablename__ = "group"
@ -51,12 +38,6 @@ class Group(Base):
org_rel = relationship("Organisation", back_populates="group_rel")
permission_rel = relationship(
"Permission",
secondary="group_permissions",
back_populates="group_rel"
)
class GroupPermissions(Base):
__tablename__ = "group_permissions"

View file

@ -5,25 +5,20 @@ Endpoints:
- List: Description
- Endpoints: Description
"""
from fastapi import APIRouter, HTTPException, status
from typing import Annotated, Optional
from fastapi import APIRouter, Query, HTTPException
from src.database import db_dependency
from src.iam.schemas import IAMGetGroupPermissionsResponse, IAMGetGroupUsersResponse, IAMPostGroupRequest, \
GroupResponse, IAMPostGroupResponse, IAMPutGroupPermissionRequest, IAMPutGroupPermissionResponse, \
IAMPutGroupUserRequest, IAMPutGroupUserResponse, IAMDeleteGroupPermissionRequest, IAMDeleteGroupPermissionResponse, \
IAMDeleteGroupUserRequest, IAMDeleteGroupUserResponse, IAMGetPermissionsResponse, IAMPostPermissionRequest, \
IAMPostPermissionResponse, PermissionResponse, IAMDeletePermissionRequest, IAMGetPermissionsSearchRequest, IAMGetPermissionsSearchResponse
from src.schemas import ResourceName
from src.auth.service import claims_dependency
from src.user.exceptions import UserNotFoundException
from src.user.models import User
from src.organisation.models import Organisation as Org
from src.service.models import Service
from src.organisation.dependencies import org_model_body_dependency
from src.organisation.dependencies import org_model_dependency
from src.iam.service import service_key_dependency
from src.iam.models import Permission as Perm, GroupPermissions as GPerms, Group, UserGroups
from src.iam.dependencies import group_model_query_dependency, group_model_body_dependency, perm_model_body_dependency
router = APIRouter(
tags=["IAM"],
@ -63,125 +58,135 @@ async def can_act_on_resource(valid_key: service_key_dependency, db: db_dependen
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/group/permissions", response_model=IAMGetGroupPermissionsResponse)
async def get_group_permissions(group_model: group_model_query_dependency):
# TODO: root_user_dependency
return {"permissions": group_model.permission_rel}
@router.get("/group/permissions")
async def get_group_permissions(db: db_dependency, group_id: Annotated[int, Query(gt=0)]):
# TODO: iam_admin_dependency
group_perms = db.query(Perm).join(GPerms).filter(GPerms.group_id==group_id).all()
# TODO: Response model
return group_perms
@router.get("/group/users", response_model=IAMGetGroupUsersResponse)
async def get_group_users(group_model: group_model_query_dependency):
# TODO: root_user_dependency
return {"users": group_model.user_rel}
@router.get("/group/users")
async def get_group_users(db: db_dependency, group_id: Annotated[int, Query(gt=0)]):
# TODO: iam_admin_dependency
group_users = db.query(User).join(UserGroups).filter(UserGroups.group_id == group_id).all()
# TODO: Response model
return group_users
@router.post("/group", response_model=IAMPostGroupResponse)
async def create_group(db: db_dependency, request_model: IAMPostGroupRequest, org_model: org_model_body_dependency):
# TODO: root_user_dependency
# TODO: get org ID from dependency instead of query (needs updated dep first)
group_model = Group(name=request_model.name, org_id=org_model.id)
@router.post("/group")
async def create_group(db: db_dependency, group_name: str, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
group_model = Group(name=group_name, org_id=org_id)
db.add(group_model)
db.flush()
response = GroupResponse(**group_model.__dict__)
db.commit()
return {"group": response}
# TODO: Response model
@router.put("/group/permission", response_model=IAMPutGroupPermissionResponse)
async def add_group_permission(db: db_dependency, group_model: group_model_body_dependency, perm_model: perm_model_body_dependency, request_model: IAMPutGroupPermissionRequest):
# TODO: root_user_dependency
group_model.permission_rel.append(perm_model)
@router.put("/group/permissions")
async def add_group_permissions(db: db_dependency, group_id: int, permission_id: int, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
g_perm_model = GPerms(group_id=group_id, permission_id=permission_id)
db.flush()
response = IAMPutGroupPermissionResponse(group=GroupResponse(**group_model.__dict__), permissions=group_model.permission_rel)
db.add(g_perm_model)
db.commit()
return response
# TODO: Response model
@router.put("/group/user")
async def add_group_user(db: db_dependency, group_model: group_model_body_dependency, request_model: IAMPutGroupUserRequest):
# TODO: root_user_dependency
# TODO: user_model_dependency
user_model = db.get(User, request_model.user_id)
if user_model is None:
raise UserNotFoundException(user_id=request_model.user_id)
@router.put("/group/users")
async def add_group_users(db: db_dependency, group_id: int, user_ids: list[int], org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
for user_id in user_ids:
user_group_model = UserGroups(group_id=group_id, user_id=user_id, org_id=org_id)
db.add(user_group_model)
group_model.user_rel.append(user_model)
db.flush()
response = IAMPutGroupUserResponse(group=GroupResponse(**group_model.__dict__), users=group_model.user_rel)
db.commit()
return response
# TODO: Response model
@router.delete("/group/permissions")
async def remove_group_permissions(db: db_dependency, group_model: group_model_body_dependency, perm_model: perm_model_body_dependency, request_model: IAMDeleteGroupPermissionRequest):
# TODO: root_user_dependency
group_model.permission_rel.remove(perm_model)
db.flush()
response = IAMDeleteGroupPermissionResponse(group=GroupResponse(**group_model.__dict__),
permissions=group_model.permission_rel)
async def remove_group_permissions(db: db_dependency, group_id: int, org_model: org_model_dependency, org_id: int, permission_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
g_perm_model = db.query(GPerms).filter(GPerms.group_id == group_id, GPerms.permission_id == permission_id).first()
if g_perm_model is None:
return
db.delete(g_perm_model)
db.commit()
return response
return
# TODO: Response model
@router.delete("/group/user")
async def remove_group_user(db: db_dependency, group_model: group_model_body_dependency, request_model: IAMDeleteGroupUserRequest):
# TODO: root_user_dependency
# TODO: User model dependency
user_model = db.get(User, request_model.user_id)
if user_model is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
async def remove_group_user(db: db_dependency, group_id: int, user_id: int, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
user_group_model = db.query(UserGroups).filter(UserGroups.group_id == group_id, UserGroups.user_id == user_id).first()
if user_group_model is None:
return
user_model.group_rel.remove(group_model)
db.flush()
response = IAMDeleteGroupUserResponse(group=GroupResponse(**group_model.__dict__), users=group_model.user_rel)
db.delete(user_group_model)
db.commit()
return response
return
# TODO: Response model
@router.get("/permissions", response_model=IAMGetPermissionsResponse)
async def get_permissions(db: db_dependency):
# TODO: root_user_dependency
@router.get("/permissions")
async def get_permissions(db: db_dependency, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: request model
permission_models = db.query(Perm).all()
return {"permissions": permission_models}
# TODO: Response model
return permission_models
@router.post("/permission")
async def create_new_permission(db: db_dependency, request_mode: IAMPostPermissionRequest):
async def create_new_permission(db: db_dependency, service_id: int, resource: str, action: str):
# TODO: super_admin_dependency
perm_model = Perm(**request_mode.__dict__)
perm_model = Perm(service_id=service_id, resource=resource, action=action)
db.add(perm_model)
db.flush()
response = IAMPostPermissionResponse(permission=PermissionResponse(**perm_model.__dict__))
db.commit()
return response
@router.delete("/permission", status_code=status.HTTP_204_NO_CONTENT)
async def delete_permission(db: db_dependency, perm_model: perm_model_body_dependency, request_model: IAMDeletePermissionRequest):
# TODO: super_admin_dependency
@router.delete("/permission")
async def delete_permission(db: db_dependency, service_id: int, resource: str, action: str, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
perm_model = db.query(Perm).filter(Perm.service_id==service_id, Perm.resource==resource, Perm.action==action).first()
if perm_model is None:
return
db.delete(perm_model)
db.commit()
return
# TODO: Response model
@router.get("/permissions/search", response_model=IAMGetPermissionsSearchResponse)
async def get_permissions(db: db_dependency, search: IAMGetPermissionsSearchRequest):
# TODO: root_user_dependency
@router.get("/permissions/search")
async def get_permissions(db: db_dependency, org_model: org_model_dependency, org_id: int, service_id: Optional[int] = None, resource: Optional[str] = None, action: Optional[str] = None):
# TODO: iam_admin_dependency
# TODO: request model
permission_query = db.query(Perm)
if search.service_id is not None:
permission_query = permission_query.filter(Perm.service_id == search.service_id)
if service_id is not None:
permission_query = permission_query.filter(Perm.service_id == service_id)
if search.resource is not None:
permission_query = permission_query.filter(Perm.resource == search.resource)
if resource is not None:
permission_query = permission_query.filter(Perm.resource == resource)
if search.action is not None:
permission_query = permission_query.filter(Perm.action == search. action)
if action is not None:
permission_query = permission_query.filter(Perm.action == action)
permission_models = permission_query.all()
return {"permissions": permission_models}
# TODO: Response model
return permission_models

View file

@ -1,98 +1,7 @@
"""
Pydantic models for the IAM module
Pydantic models for <this module>
Models:
- List: Description
- Models: Description
"""
from typing import Optional
from pydantic import EmailStr, ConfigDict
from src.organisation.schemas import OrgIDMixin
from src.schemas import CustomBaseModel
class UserResponse(CustomBaseModel):
id: int
first_name: str
last_name: str
email: EmailStr
class PermissionResponse(CustomBaseModel):
model_config = ConfigDict(from_attributes=True, extra="ignore")
service_name: str
resource: str
action: str
class GroupResponse(CustomBaseModel):
id: int
name: str
class GroupIDMixin(CustomBaseModel):
group_id: int
class PermIDMixin(CustomBaseModel):
permission_id: int
class IAMGetGroupPermissionsResponse(CustomBaseModel):
permissions: list[PermissionResponse]
class IAMGetGroupUsersResponse(CustomBaseModel):
users : list[UserResponse]
class IAMPostGroupRequest(OrgIDMixin):
name: str
class IAMPostGroupResponse(CustomBaseModel):
group: GroupResponse
class IAMPutGroupPermissionRequest(GroupIDMixin, PermIDMixin):
pass
class IAMPutGroupPermissionResponse(CustomBaseModel):
group: GroupResponse
permissions: list[PermissionResponse]
class IAMPutGroupUserRequest(GroupIDMixin):
user_id: int
class IAMPutGroupUserResponse(CustomBaseModel):
group: GroupResponse
users: list[UserResponse]
class IAMDeleteGroupPermissionRequest(GroupIDMixin, PermIDMixin):
pass
class IAMDeleteGroupPermissionResponse(CustomBaseModel):
group: GroupResponse
permissions: list[PermissionResponse]
class IAMDeleteGroupUserRequest(GroupIDMixin):
user_id: int
class IAMDeleteGroupUserResponse(CustomBaseModel):
group: GroupResponse
users: list[UserResponse]
class IAMGetPermissionsResponse(CustomBaseModel):
permissions: list[PermissionResponse]
class IAMPostPermissionRequest(CustomBaseModel):
service_id: int
resource: str
action: str
class IAMPostPermissionResponse(CustomBaseModel):
permission: PermissionResponse
class IAMDeletePermissionRequest(PermIDMixin):
pass
class IAMGetPermissionsSearchRequest(CustomBaseModel):
service_id: Optional[int] = None
resource: Optional[str] = None
action: Optional[str] = None
class IAMGetPermissionsSearchResponse(CustomBaseModel):
permissions: list[PermissionResponse]
"""

View file

@ -12,7 +12,6 @@ from src.config import settings
from src.api import api_router
from src.auth.config import auth_settings
from src.auth.service import get_current_user, get_dev_user
@asynccontextmanager
@ -23,8 +22,8 @@ async def lifespan(_application: FastAPI) -> AsyncGenerator:
if settings.ENVIRONMENT.is_deployed:
# Just a precaution, should be False anyway
settings.DISABLE_AUTH = False
# Do this only on prod
pass
tags_metadata = [
@ -58,8 +57,4 @@ app.add_middleware(
allow_headers=settings.CORS_HEADERS,
)
if settings.ENVIRONMENT == "local" and settings.DISABLE_AUTH:
app.dependency_overrides[get_current_user] = get_dev_user
app.include_router(api_router)

View file

@ -11,33 +11,18 @@ Functions:
"""
from typing import Annotated
from fastapi import Depends, Query
from fastapi import HTTPException, Depends
from src.database import db_dependency
from src.organisation.schemas import OrgIDMixin
from src.organisation.models import Organisation as Org
from src.organisation.exceptions import OrgNotFoundException
def get_org_model_query(db: db_dependency, org_id: Annotated[int, Query(gt=0)]) -> type[Org]:
org_model = db.get(Org, org_id)
def get_org_model(db: db_dependency, org_id: int) -> type[Org]:
org_model = db.query(Org).filter(Org.id == org_id).first()
if org_model is None:
raise OrgNotFoundException(org_id)
raise HTTPException(status_code=404, detail="Organisation not found")
return org_model
org_model_query_dependency = Annotated[type[Org], Depends(get_org_model_query)]
def get_org_model_body(db: db_dependency, request_model: OrgIDMixin) -> type[Org]:
org_id = getattr(request_model, "organisation_id", None)
if org_id is None:
raise OrgNotFoundException
org_model = db.get(Org, org_id)
if org_model is None:
raise OrgNotFoundException(org_id)
return org_model
org_model_body_dependency = Annotated[type[Org], Depends(get_org_model_body)]
org_model_dependency = Annotated[type[Org], Depends(get_org_model)]

View file

@ -4,16 +4,4 @@ Module specific exceptions for organisation module
Exceptions:
- List: Description
- Exceptions: Description
"""
from typing import Optional
from fastapi import HTTPException, status
class OrgNotFoundException(HTTPException):
def __init__(self, org_id: Optional[int] = None) -> None:
detail = "Organisation not found" if org_id is None else f"User with ID '{org_id}' was not found."
super().__init__(
status_code=status.HTTP_404_NOT_FOUND,
detail=detail,
)
"""

View file

@ -15,22 +15,23 @@ Endpoints:
from typing import Annotated, Optional
from fastapi import APIRouter, HTTPException, status
from fastapi.params import Query
from fastapi.params import Path, Query
from src.contact.schemas import ContactAddress
from src.database import db_dependency
from src.contact.models import Contact
from src.user.models import User
from src.user.exceptions import UserNotFoundException
from src.auth.service import root_user_query_dependency, claims_dependency
from src.auth.service import root_user_dependency, claims_dependency
from src.organisation.dependencies import org_model_query_dependency, org_model_body_dependency
from src.organisation.dependencies import org_model_dependency
from src.organisation.constants import ContactType
from src.organisation.models import Organisation as Org
from src.organisation.schemas import OrgOrgPostRequest, OrgQuestionnairePatchRequest, OrgStatusPatchRequest, \
OrgContactPatchRequest, \
OrgUserPostRequest, OrgUserGetResponse, OrgContactGetResponse, OrgOrgGetResponse, OrgRootPatchRequest, \
OrgGroupGetResponse, OrgUserDeleteRequest, OrgDeleteOrgRequest
OrgGroupGetResponse, OrgUserDeleteRequest
router = APIRouter(
prefix="/org",
@ -38,8 +39,8 @@ router = APIRouter(
)
@router.get("/id", response_model=OrgOrgGetResponse)
async def get_org_by_id(org_model: org_model_query_dependency):
@router.get("/id/{org_id}", response_model=OrgOrgGetResponse)
async def get_org_by_id(org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]):
response = {
"name": org_model.name,
"status": org_model.status,
@ -53,16 +54,12 @@ async def get_org_by_id(org_model: org_model_query_dependency):
@router.post("/")
async def create_org(db: db_dependency, user: claims_dependency, request_model: OrgOrgPostRequest):
async def create_org(db: db_dependency, user: claims_dependency, org_request: OrgOrgPostRequest):
db_id: Optional[int] = user.get("db_id", None)
if db_id is None:
raise UserNotFoundException()
if request_model.intake_questionnaire:
intake_questionnaire = request_model.intake_questionnaire.model_dump()
else:
intake_questionnaire = None
org_model = Org(name=request_model.name, intake_questionnaire=intake_questionnaire)
org_model = Org(name=org_request.name, intake_questionnaire=org_request.intake_questionnaire.model_dump())
org_model.status = "partial" # Status is always set to partial at first, see update_questionnaire() doc
@ -80,70 +77,67 @@ async def create_org(db: db_dependency, user: claims_dependency, request_model:
db.commit()
@router.patch("/questionnaire")
async def update_questionnaire(db: db_dependency, org_model: org_model_body_dependency, request_model: OrgQuestionnairePatchRequest):
@router.patch("/{org_id}/questionnaire")
async def update_questionnaire(db: db_dependency, org_model: org_model_dependency, q_request: OrgQuestionnairePatchRequest, org_id: Annotated[int, Path(gt=0)]):
"""
Route for updating questionnaire.
The partial bool allows for submission of partially completed questionnaire and/or
final "are you sure" check before setting the org to be in "submitted" status, awaiting admin approval.
"""
org_model.intake_questionnaire = request_model.intake_questionnaire.model_dump()
org_model.intake_questionnaire = q_request.intake_questionnaire.model_dump()
# Allows for partially completed questionnaires to be saved without being submitted for review
if not request_model.partial:
if not q_request.partial:
org_model.status = "submitted"
db.commit()
@router.patch("/status")
async def update_status(db: db_dependency, org_model: org_model_body_dependency, request_model: OrgStatusPatchRequest):
org_model.status = request_model.status
@router.patch("/{org_id}/status")
async def update_status(db: db_dependency, org_model: org_model_dependency, status_request: OrgStatusPatchRequest, org_id: Annotated[int, Path(gt=0)]):
org_model.status = status_request.status
db.commit()
@router.get("/users", response_model=OrgUserGetResponse)
async def get_users(org_model: org_model_query_dependency):
@router.get("/{org_id}/users", response_model=OrgUserGetResponse)
async def get_users(org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]):
return {"users": [user.email for user in org_model.user_rel]}
@router.post("/users")
async def add_user_to_org(db: db_dependency, org_model: org_model_body_dependency, request_model: OrgUserPostRequest):
# TODO: user_model_body_dependency
user_model = db.get(User, request_model.user_id)
@router.post("/{org_id}/users")
async def add_user_to_org(db: db_dependency, org_model: org_model_dependency, user_request: OrgUserPostRequest, org_id: Annotated[int, Path(gt=0)]):
user_model = db.get(User, user_request.user_id)
if user_model in org_model.user_rel:
return
org_model.user_rel.append(user_model)
db.commit()
@router.delete("/", status_code=status.HTTP_204_NO_CONTENT)
async def delete_organisation_by_id(db: db_dependency, org_model: org_model_body_dependency, request_model: OrgDeleteOrgRequest):
@router.delete("/{org_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_organisation_by_id(db: db_dependency, org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]):
db.delete(org_model)
db.commit()
@router.patch("/root_user", status_code=status.HTTP_204_NO_CONTENT)
async def update_root_user(db: db_dependency, org_model: org_model_body_dependency, request_model: OrgRootPatchRequest):
# TODO: user_model_body_dependency
root_user_model = db.get(User, request_model.user_id)
@router.patch("/{org_id}/root_user", status_code=status.HTTP_204_NO_CONTENT)
async def update_root_user(db: db_dependency, org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)], user_request: OrgRootPatchRequest):
root_user_model = db.get(User, user_request.user_id)
if root_user_model is None:
raise UserNotFoundException(user_id=request_model.user_id)
raise UserNotFoundException(user_id=user_request.user_id)
org_model.root_user_rel = root_user_model
db.commit()
@router.get("/groups", response_model=OrgGroupGetResponse)
async def get_org_groups(org_model: org_model_query_dependency):
@router.get("/{org_id}/groups", response_model=OrgGroupGetResponse)
async def get_org_groups(org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]):
return {"groups": [group.name for group in org_model.group_rel]}
@router.delete("/user", status_code=status.HTTP_204_NO_CONTENT)
async def remove_user_from_org(db: db_dependency, org_model: org_model_body_dependency, request_model: OrgUserDeleteRequest):
# TODO: user_model_body_dependency
user_id = request_model.user_id
@router.delete("/{org_id}/user", status_code=status.HTTP_204_NO_CONTENT)
async def remove_user_from_org(db: db_dependency, org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)], user_request: OrgUserDeleteRequest):
user_id = user_request.user_id
user = db.get(User, user_id)
if user is None:
@ -155,9 +149,8 @@ async def remove_user_from_org(db: db_dependency, org_model: org_model_body_depe
org_model.user_rel.remove(user)
db.commit()
@router.get("/contact", response_model=OrgContactGetResponse)
async def get_contact(org_model: org_model_query_dependency, contact_type: Annotated[ContactType, Query()]):
@router.get("/{org_id}/contact", response_model=OrgContactGetResponse)
async def get_contact(org_model: org_model_dependency, contact_type: Annotated[ContactType, Query()], org_id: Annotated[int, Path(gt=0)]):
match contact_type:
case "billing":
contact_model = org_model.billing_contact_rel
@ -177,9 +170,10 @@ async def get_contact(org_model: org_model_query_dependency, contact_type: Annot
)
@router.patch("/contact", response_model=OrgContactGetResponse)
async def update_contact(db: db_dependency, org_model: org_model_body_dependency, request_model: OrgContactPatchRequest):
match request_model.contact_type:
@router.patch("/{org_id}/contact", response_model=OrgContactGetResponse)
async def update_contact(db: db_dependency, org_model: org_model_dependency, contact_type: Annotated[ContactType, Query()], contact_request: OrgContactPatchRequest, org_id: Annotated[int, Path(gt=0)]):
match contact_type:
case "billing":
contact_model = org_model.billing_contact_rel
case "security":
@ -192,7 +186,7 @@ async def update_contact(db: db_dependency, org_model: org_model_body_dependency
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
update_data = request_model.model_dump(exclude_none=True)
update_data = contact_request.model_dump(exclude_none=True)
for key, value in update_data.items():
if hasattr(contact_model, key):
setattr(contact_model, key, value)

View file

@ -18,23 +18,19 @@ class OrgQuestionnaire(CustomBaseModel):
question_two: str
question_three: str
class OrgIDMixin(CustomBaseModel):
organisation_id: int
class OrgOrgPostRequest(CustomBaseModel):
name: str
intake_questionnaire: Optional[OrgQuestionnaire] = None
class OrgQuestionnairePatchRequest(OrgIDMixin):
class OrgQuestionnairePatchRequest(CustomBaseModel):
intake_questionnaire: OrgQuestionnaire
partial: bool
class OrgStatusPatchRequest(OrgIDMixin):
class OrgStatusPatchRequest(CustomBaseModel):
status: Status
class OrgContactPatchRequest(OrgIDMixin):
contact_type: ContactType
class OrgContactPatchRequest(CustomBaseModel):
email: Optional[EmailStr] = None
first_name: Optional[str] = None
last_name: Optional[str] = None
@ -48,13 +44,13 @@ class OrgContactPatchRequest(OrgIDMixin):
country_code: Optional[str] = None
postal_code: Optional[str] = None
class OrgUserPostRequest(OrgIDMixin):
class OrgUserPostRequest(CustomBaseModel):
user_id: int
class OrgUserDeleteRequest(OrgIDMixin):
class OrgUserDeleteRequest(CustomBaseModel):
user_id: int
class OrgRootPatchRequest(OrgIDMixin):
class OrgRootPatchRequest(CustomBaseModel):
user_id: int
class OrgUserGetResponse(CustomBaseModel):
@ -81,6 +77,3 @@ class OrgOrgGetResponse(CustomBaseModel):
owner_contact: Optional[str] = None
billing_contact: Optional[str] = None
security_contact: Optional[str] = None
class OrgDeleteOrgRequest(OrgIDMixin):
pass

View file

@ -5,63 +5,59 @@ Endpoints:
- List: Description
- Endpoints: Description
"""
from typing import Annotated
from fastapi import APIRouter, HTTPException, status
from fastapi.params import Path
from fastapi import APIRouter
from src.database import db_dependency
from src.service.models import Service
from src.service.utils import generate_api_key
from src.service.schemas import ServiceGetServiceResponse, ServicePostServiceRequest, ServicePostServiceResponse, \
ServiceWithKeyResponse, ServicePatchKeyResponse
router = APIRouter(
tags=["Service"],
prefix="/service",
)
@router.get("/", response_model=ServiceGetServiceResponse)
@router.get("/")
async def get_all_services(db: db_dependency):
# TODO: user_dependency
# TODO: request model
permission_models = db.query(Service).all()
return {"services": permission_models}
# TODO: Response model
return permission_models
@router.post("/", response_model=ServicePostServiceResponse)
async def register_service(db: db_dependency, service_request: ServicePostServiceRequest):
@router.post("/")
async def register_service(db: db_dependency, service_name: str):
# TODO: super_admin_dependency
# TODO: request model
key = generate_api_key()
service_model = Service(name=service_request.name, api_key=key)
service_model = Service(name=service_name, api_key=key)
db.add(service_model)
db.flush()
response = ServiceWithKeyResponse(**service_model.__dict__)
db.commit()
return {"service": response}
# TODO: response model
@router.patch("/{service_id}/key", response_model=ServicePatchKeyResponse)
async def regenerate_api_key(db: db_dependency, service_id: Annotated[int, Path(gt=0,description="Service database ID")]):
@router.patch("/{service_id}/key")
async def regenerate_api_key(db: db_dependency, service_id: int):
# TODO: super_admin_dependency
service_model = db.get(Service, service_id)
if service_model is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Service not found")
# TODO: request model
key = generate_api_key()
service_model = db.query(Service).filter(Service.id==service_id).first()
service_model.api_key = key
db.flush()
response = ServiceWithKeyResponse(**service_model.__dict__)
db.add(service_model)
db.commit()
return {"service": response}
# TODO: response model
@router.delete("/{service_id}", status_code=status.HTTP_204_NO_CONTENT)
async def remove_service(db: db_dependency, service_id: Annotated[int, Path(gt=0,description="Service database ID")]):
@router.delete("/{service_id}")
async def remove_service(db: db_dependency, service_id: int):
# TODO: super_admin_dependency
service_model = db.get(Service, service_id)
# TODO: request model
service_model = db.query(Service).filter(Service.id==service_id).first()
if service_model is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Service not found")
return
db.delete(service_model)
db.commit()
# TODO: response model

View file

@ -1,31 +1,7 @@
"""
Pydantic models for the service module
Pydantic models for <this module>
Models:
- List: Description
- Models: Description
"""
from pydantic import ConfigDict
from src.schemas import CustomBaseModel
class ServiceResponse(CustomBaseModel):
model_config = ConfigDict(from_attributes=True, extra="ignore")
id: int
name: str
class ServiceWithKeyResponse(ServiceResponse):
api_key: str
class ServiceGetServiceResponse(CustomBaseModel):
services: list[ServiceResponse]
class ServicePostServiceRequest(CustomBaseModel):
name: str
class ServicePostServiceResponse(CustomBaseModel):
service: ServiceWithKeyResponse
class ServicePatchKeyResponse(CustomBaseModel):
service: ServiceWithKeyResponse
"""

View file

@ -10,6 +10,7 @@ from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship
from src.database import Base
from src.iam.models import Group
class User(Base):