Compare commits

..

5 commits

Author SHA1 Message Date
657f91d73d feat: org dependencies
Org endpoints use query/body model dependencies to perform initial db lookups.

Issue #6

Org ID path params have been replaced with either query params (get endpoints) or body values.

Resolves #10

Endpoints in other modules that rely on an org model lookup have also been updated.
2026-05-27 12:21:30 +01:00
c6a2b301dc feat: iam dependencies
IAM endpoints now use dependencies to perform most initial database get requests.

Issue #6
2026-05-27 12:21:30 +01:00
d4f1b73deb feat: iam endpoint req/res models 2026-05-26 16:25:14 +01:00
fa8439cc6c feat: auth bypass for dev and testing
ENVIRONMENT must be "local" and DISABLE_AUTH set for this to be active. Both of these default to production values to prevent this being enabled accidentally.

Resolves #5
2026-05-26 11:42:49 +01:00
652dfb7b4a feat: service module req/res models 2026-05-26 10:16:59 +01:00
15 changed files with 430 additions and 183 deletions

View file

@ -22,12 +22,15 @@ from src.user.service import add_user_to_db
from src.organisation.models import OrgUsers, Organisation as Org from src.organisation.models import OrgUsers, Organisation as Org
from src.user.models import User from src.user.models import User
from src.database import db_dependency from src.database import db_dependency
from src.organisation.dependencies import org_model_dependency from src.organisation.dependencies import org_model_query_dependency
oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG) oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG)
oidc_dependency = Annotated[str, Depends(oidc)] oidc_dependency = Annotated[str, Depends(oidc)]
def get_dev_user():
return {"db_id": 1}
async def get_current_user(oidc_auth_string: oidc_dependency) -> dict[str, Any]: async def get_current_user(oidc_auth_string: oidc_dependency) -> dict[str, Any]:
config_url = urlopen(auth_settings.OIDC_CONFIG) config_url = urlopen(auth_settings.OIDC_CONFIG)
@ -51,7 +54,7 @@ async def get_current_user(oidc_auth_string: oidc_dependency) -> dict[str, Any]:
try: try:
claims_requests.validate(token.claims) claims_requests.validate(token.claims)
except ExpiredTokenError as e: except ExpiredTokenError:
raise HTTPException(status_code=401, detail="Token expired") raise HTTPException(status_code=401, detail="Token expired")
db_id = await add_user_to_db(token.claims) db_id = await add_user_to_db(token.claims)
@ -90,7 +93,7 @@ async def is_org_user(claims: claims_dependency, db: db_dependency, org_id: int
org_user_dependency = Annotated[dict[str, Any], Depends(is_org_user)] org_user_dependency = Annotated[dict[str, Any], Depends(is_org_user)]
async def is_org_root(claims: claims_dependency, db: db_dependency, org_model: org_model_dependency, org_id: int = Path(gt=0)): async def is_org_root_query(claims: claims_dependency, db: db_dependency, org_model: org_model_query_dependency):
db_id = claims.get("db_id", None) db_id = claims.get("db_id", None)
if db_id is None: if db_id is None:
raise HTTPException(status_code=404, detail="User not found in db") raise HTTPException(status_code=404, detail="User not found in db")
@ -101,7 +104,7 @@ async def is_org_root(claims: claims_dependency, db: db_dependency, org_model: o
raise HTTPException(status_code=401, detail="Not authorised") raise HTTPException(status_code=401, detail="Not authorised")
root_user_dependency = Annotated[dict[str, Any], Depends(is_org_root)] root_user_query_dependency = Annotated[dict[str, Any], Depends(is_org_root_query)]
async def is_super_admin(claims: claims_dependency): async def is_super_admin(claims: claims_dependency):

View file

@ -23,6 +23,7 @@ class Config(CustomBaseSettings):
APP_VERSION: str = "0.1" APP_VERSION: str = "0.1"
ENVIRONMENT: Environment = Environment.PRODUCTION ENVIRONMENT: Environment = Environment.PRODUCTION
SECRET_KEY: SecretStr = "" SECRET_KEY: SecretStr = ""
DISABLE_AUTH: bool = False
CORS_ORIGINS: list[str] = ["*"] CORS_ORIGINS: list[str] = ["*"]
CORS_ORIGINS_REGEX: str | None = None CORS_ORIGINS_REGEX: str | None = None

View file

@ -1,5 +1,5 @@
""" """
Router dependencies for <this module> Router dependencies for the IAM module
Classes: Classes:
- List: Description - List: Description
@ -9,3 +9,48 @@ Functions:
- List: Description - List: Description
- Functions: Description - Functions: Description
""" """
from typing import Annotated, Optional
from fastapi import Depends, Query
from src.database import db_dependency
from src.iam.models import Group
from src.iam.exceptions import GroupNotFoundException, PermNotFoundException
from src.iam.schemas import GroupIDMixin, PermIDMixin
def get_group_model_query(db: db_dependency, group_id: Annotated[int, Query(gt=0)]) -> type[Group]:
group_model = db.get(Group, group_id)
if group_model is None:
raise GroupNotFoundException(group_id)
return group_model
group_model_query_dependency = Annotated[type[Group], Depends(get_group_model_query)]
def get_group_model_body(db: db_dependency, request_model: Optional[GroupIDMixin] = None) -> type[Group]:
group_id = getattr(request_model, "group_id", None)
if group_id is None:
raise GroupNotFoundException()
group_model = db.get(Group, group_id)
if group_model is None:
raise GroupNotFoundException(group_id)
return group_model
group_model_body_dependency = Annotated[type[Group], Depends(get_group_model_body)]
def get_perm_model_body(db: db_dependency, request_model: Optional[PermIDMixin] = None) -> type[Group]:
perm_id = getattr(request_model, "permission_id", None)
if perm_id is None:
raise PermNotFoundException
group_model = db.get(Group, perm_id)
if group_model is None:
raise PermNotFoundException(perm_id)
return group_model
perm_model_body_dependency = Annotated[type[Group], Depends(get_perm_model_body)]

View file

@ -1,7 +1,28 @@
""" """
Module specific exceptions for <this module> Module specific exceptions for the IAM module
Exceptions: Exceptions:
- List: Description - List: Description
- Exceptions: Description - Exceptions: Description
""" """
from typing import Optional
from fastapi import HTTPException, status
class GroupNotFoundException(HTTPException):
def __init__(self, group_id: Optional[int] = None) -> None:
detail = "Group not found" if group_id is None else f"User with ID '{group_id}' was not found."
super().__init__(
status_code=status.HTTP_404_NOT_FOUND,
detail=detail,
)
class PermNotFoundException(HTTPException):
def __init__(self, perm_id: Optional[int] = None) -> None:
detail = "Permission not found" if perm_id is None else f"User with ID '{perm_id}' was not found."
super().__init__(
status_code=status.HTTP_404_NOT_FOUND,
detail=detail,
)

View file

@ -22,6 +22,19 @@ class Permission(Base):
UniqueConstraint("service_id", "resource", "action", name="uniq_permission_resource_and_action") UniqueConstraint("service_id", "resource", "action", name="uniq_permission_resource_and_action")
service_rel = relationship("Service", foreign_keys=[service_id])
@property
def service_name(self):
return self.service_rel.name
group_rel = relationship(
"Group",
secondary="group_permissions",
back_populates="permission_rel"
)
class Group(Base): class Group(Base):
__tablename__ = "group" __tablename__ = "group"
@ -38,6 +51,12 @@ class Group(Base):
org_rel = relationship("Organisation", back_populates="group_rel") org_rel = relationship("Organisation", back_populates="group_rel")
permission_rel = relationship(
"Permission",
secondary="group_permissions",
back_populates="group_rel"
)
class GroupPermissions(Base): class GroupPermissions(Base):
__tablename__ = "group_permissions" __tablename__ = "group_permissions"

View file

@ -5,20 +5,25 @@ Endpoints:
- List: Description - List: Description
- Endpoints: Description - Endpoints: Description
""" """
from typing import Annotated, Optional from fastapi import APIRouter, HTTPException, status
from fastapi import APIRouter, Query, HTTPException
from src.database import db_dependency from src.database import db_dependency
from src.iam.schemas import IAMGetGroupPermissionsResponse, IAMGetGroupUsersResponse, IAMPostGroupRequest, \
GroupResponse, IAMPostGroupResponse, IAMPutGroupPermissionRequest, IAMPutGroupPermissionResponse, \
IAMPutGroupUserRequest, IAMPutGroupUserResponse, IAMDeleteGroupPermissionRequest, IAMDeleteGroupPermissionResponse, \
IAMDeleteGroupUserRequest, IAMDeleteGroupUserResponse, IAMGetPermissionsResponse, IAMPostPermissionRequest, \
IAMPostPermissionResponse, PermissionResponse, IAMDeletePermissionRequest, IAMGetPermissionsSearchRequest, IAMGetPermissionsSearchResponse
from src.schemas import ResourceName from src.schemas import ResourceName
from src.auth.service import claims_dependency from src.auth.service import claims_dependency
from src.user.exceptions import UserNotFoundException
from src.user.models import User from src.user.models import User
from src.organisation.models import Organisation as Org from src.organisation.models import Organisation as Org
from src.service.models import Service from src.service.models import Service
from src.organisation.dependencies import org_model_dependency from src.organisation.dependencies import org_model_body_dependency
from src.iam.service import service_key_dependency from src.iam.service import service_key_dependency
from src.iam.models import Permission as Perm, GroupPermissions as GPerms, Group, UserGroups from src.iam.models import Permission as Perm, GroupPermissions as GPerms, Group, UserGroups
from src.iam.dependencies import group_model_query_dependency, group_model_body_dependency, perm_model_body_dependency
router = APIRouter( router = APIRouter(
tags=["IAM"], tags=["IAM"],
@ -58,135 +63,125 @@ async def can_act_on_resource(valid_key: service_key_dependency, db: db_dependen
raise HTTPException(status_code=500, detail="Internal server error") raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/group/permissions") @router.get("/group/permissions", response_model=IAMGetGroupPermissionsResponse)
async def get_group_permissions(db: db_dependency, group_id: Annotated[int, Query(gt=0)]): async def get_group_permissions(group_model: group_model_query_dependency):
# TODO: iam_admin_dependency # TODO: root_user_dependency
group_perms = db.query(Perm).join(GPerms).filter(GPerms.group_id==group_id).all() return {"permissions": group_model.permission_rel}
# TODO: Response model
return group_perms
@router.get("/group/users") @router.get("/group/users", response_model=IAMGetGroupUsersResponse)
async def get_group_users(db: db_dependency, group_id: Annotated[int, Query(gt=0)]): async def get_group_users(group_model: group_model_query_dependency):
# TODO: iam_admin_dependency # TODO: root_user_dependency
group_users = db.query(User).join(UserGroups).filter(UserGroups.group_id == group_id).all() return {"users": group_model.user_rel}
# TODO: Response model
return group_users
@router.post("/group") @router.post("/group", response_model=IAMPostGroupResponse)
async def create_group(db: db_dependency, group_name: str, org_model: org_model_dependency, org_id: int): async def create_group(db: db_dependency, request_model: IAMPostGroupRequest, org_model: org_model_body_dependency):
# TODO: iam_admin_dependency # TODO: root_user_dependency
# TODO: Request model # TODO: get org ID from dependency instead of query (needs updated dep first)
group_model = Group(name=group_name, org_id=org_id) group_model = Group(name=request_model.name, org_id=org_model.id)
db.add(group_model) db.add(group_model)
db.flush()
response = GroupResponse(**group_model.__dict__)
db.commit() db.commit()
# TODO: Response model return {"group": response}
@router.put("/group/permissions") @router.put("/group/permission", response_model=IAMPutGroupPermissionResponse)
async def add_group_permissions(db: db_dependency, group_id: int, permission_id: int, org_model: org_model_dependency, org_id: int): async def add_group_permission(db: db_dependency, group_model: group_model_body_dependency, perm_model: perm_model_body_dependency, request_model: IAMPutGroupPermissionRequest):
# TODO: iam_admin_dependency # TODO: root_user_dependency
# TODO: Request model group_model.permission_rel.append(perm_model)
g_perm_model = GPerms(group_id=group_id, permission_id=permission_id)
db.add(g_perm_model) db.flush()
response = IAMPutGroupPermissionResponse(group=GroupResponse(**group_model.__dict__), permissions=group_model.permission_rel)
db.commit() db.commit()
# TODO: Response model return response
@router.put("/group/users") @router.put("/group/user")
async def add_group_users(db: db_dependency, group_id: int, user_ids: list[int], org_model: org_model_dependency, org_id: int): async def add_group_user(db: db_dependency, group_model: group_model_body_dependency, request_model: IAMPutGroupUserRequest):
# TODO: iam_admin_dependency # TODO: root_user_dependency
# TODO: Request model # TODO: user_model_dependency
for user_id in user_ids: user_model = db.get(User, request_model.user_id)
user_group_model = UserGroups(group_id=group_id, user_id=user_id, org_id=org_id) if user_model is None:
db.add(user_group_model) raise UserNotFoundException(user_id=request_model.user_id)
group_model.user_rel.append(user_model)
db.flush()
response = IAMPutGroupUserResponse(group=GroupResponse(**group_model.__dict__), users=group_model.user_rel)
db.commit() db.commit()
# TODO: Response model return response
@router.delete("/group/permissions") @router.delete("/group/permissions")
async def remove_group_permissions(db: db_dependency, group_id: int, org_model: org_model_dependency, org_id: int, permission_id: int): async def remove_group_permissions(db: db_dependency, group_model: group_model_body_dependency, perm_model: perm_model_body_dependency, request_model: IAMDeleteGroupPermissionRequest):
# TODO: iam_admin_dependency # TODO: root_user_dependency
# TODO: Request model group_model.permission_rel.remove(perm_model)
g_perm_model = db.query(GPerms).filter(GPerms.group_id == group_id, GPerms.permission_id == permission_id).first() db.flush()
if g_perm_model is None: response = IAMDeleteGroupPermissionResponse(group=GroupResponse(**group_model.__dict__),
return permissions=group_model.permission_rel)
db.delete(g_perm_model)
db.commit() db.commit()
return return response
# TODO: Response model
@router.delete("/group/user") @router.delete("/group/user")
async def remove_group_user(db: db_dependency, group_id: int, user_id: int, org_model: org_model_dependency, org_id: int): async def remove_group_user(db: db_dependency, group_model: group_model_body_dependency, request_model: IAMDeleteGroupUserRequest):
# TODO: iam_admin_dependency # TODO: root_user_dependency
# TODO: Request model # TODO: User model dependency
user_group_model = db.query(UserGroups).filter(UserGroups.group_id == group_id, UserGroups.user_id == user_id).first() user_model = db.get(User, request_model.user_id)
if user_group_model is None: if user_model is None:
return raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
db.delete(user_group_model) user_model.group_rel.remove(group_model)
db.flush()
response = IAMDeleteGroupUserResponse(group=GroupResponse(**group_model.__dict__), users=group_model.user_rel)
db.commit() db.commit()
return
# TODO: Response model return response
@router.get("/permissions") @router.get("/permissions", response_model=IAMGetPermissionsResponse)
async def get_permissions(db: db_dependency, org_model: org_model_dependency, org_id: int): async def get_permissions(db: db_dependency):
# TODO: iam_admin_dependency # TODO: root_user_dependency
# TODO: request model
permission_models = db.query(Perm).all() permission_models = db.query(Perm).all()
# TODO: Response model return {"permissions": permission_models}
return permission_models
@router.post("/permission") @router.post("/permission")
async def create_new_permission(db: db_dependency, service_id: int, resource: str, action: str): async def create_new_permission(db: db_dependency, request_mode: IAMPostPermissionRequest):
# TODO: super_admin_dependency # TODO: super_admin_dependency
perm_model = Perm(service_id=service_id, resource=resource, action=action) perm_model = Perm(**request_mode.__dict__)
db.add(perm_model) db.add(perm_model)
db.flush()
response = IAMPostPermissionResponse(permission=PermissionResponse(**perm_model.__dict__))
db.commit() db.commit()
return response
@router.delete("/permission") @router.delete("/permission", status_code=status.HTTP_204_NO_CONTENT)
async def delete_permission(db: db_dependency, service_id: int, resource: str, action: str, org_model: org_model_dependency, org_id: int): async def delete_permission(db: db_dependency, perm_model: perm_model_body_dependency, request_model: IAMDeletePermissionRequest):
# TODO: iam_admin_dependency # TODO: super_admin_dependency
# TODO: Request model
perm_model = db.query(Perm).filter(Perm.service_id==service_id, Perm.resource==resource, Perm.action==action).first()
if perm_model is None:
return
db.delete(perm_model) db.delete(perm_model)
db.commit() db.commit()
return
# TODO: Response model
@router.get("/permissions/search") @router.get("/permissions/search", response_model=IAMGetPermissionsSearchResponse)
async def get_permissions(db: db_dependency, org_model: org_model_dependency, org_id: int, service_id: Optional[int] = None, resource: Optional[str] = None, action: Optional[str] = None): async def get_permissions(db: db_dependency, search: IAMGetPermissionsSearchRequest):
# TODO: iam_admin_dependency # TODO: root_user_dependency
# TODO: request model
permission_query = db.query(Perm) permission_query = db.query(Perm)
if service_id is not None: if search.service_id is not None:
permission_query = permission_query.filter(Perm.service_id == service_id) permission_query = permission_query.filter(Perm.service_id == search.service_id)
if resource is not None: if search.resource is not None:
permission_query = permission_query.filter(Perm.resource == resource) permission_query = permission_query.filter(Perm.resource == search.resource)
if action is not None: if search.action is not None:
permission_query = permission_query.filter(Perm.action == action) permission_query = permission_query.filter(Perm.action == search. action)
permission_models = permission_query.all() permission_models = permission_query.all()
# TODO: Response model return {"permissions": permission_models}
return permission_models

View file

@ -1,7 +1,98 @@
""" """
Pydantic models for <this module> Pydantic models for the IAM module
Models: Models:
- List: Description - List: Description
- Models: Description - Models: Description
""" """
from typing import Optional
from pydantic import EmailStr, ConfigDict
from src.organisation.schemas import OrgIDMixin
from src.schemas import CustomBaseModel
class UserResponse(CustomBaseModel):
id: int
first_name: str
last_name: str
email: EmailStr
class PermissionResponse(CustomBaseModel):
model_config = ConfigDict(from_attributes=True, extra="ignore")
service_name: str
resource: str
action: str
class GroupResponse(CustomBaseModel):
id: int
name: str
class GroupIDMixin(CustomBaseModel):
group_id: int
class PermIDMixin(CustomBaseModel):
permission_id: int
class IAMGetGroupPermissionsResponse(CustomBaseModel):
permissions: list[PermissionResponse]
class IAMGetGroupUsersResponse(CustomBaseModel):
users : list[UserResponse]
class IAMPostGroupRequest(OrgIDMixin):
name: str
class IAMPostGroupResponse(CustomBaseModel):
group: GroupResponse
class IAMPutGroupPermissionRequest(GroupIDMixin, PermIDMixin):
pass
class IAMPutGroupPermissionResponse(CustomBaseModel):
group: GroupResponse
permissions: list[PermissionResponse]
class IAMPutGroupUserRequest(GroupIDMixin):
user_id: int
class IAMPutGroupUserResponse(CustomBaseModel):
group: GroupResponse
users: list[UserResponse]
class IAMDeleteGroupPermissionRequest(GroupIDMixin, PermIDMixin):
pass
class IAMDeleteGroupPermissionResponse(CustomBaseModel):
group: GroupResponse
permissions: list[PermissionResponse]
class IAMDeleteGroupUserRequest(GroupIDMixin):
user_id: int
class IAMDeleteGroupUserResponse(CustomBaseModel):
group: GroupResponse
users: list[UserResponse]
class IAMGetPermissionsResponse(CustomBaseModel):
permissions: list[PermissionResponse]
class IAMPostPermissionRequest(CustomBaseModel):
service_id: int
resource: str
action: str
class IAMPostPermissionResponse(CustomBaseModel):
permission: PermissionResponse
class IAMDeletePermissionRequest(PermIDMixin):
pass
class IAMGetPermissionsSearchRequest(CustomBaseModel):
service_id: Optional[int] = None
resource: Optional[str] = None
action: Optional[str] = None
class IAMGetPermissionsSearchResponse(CustomBaseModel):
permissions: list[PermissionResponse]

View file

@ -12,6 +12,7 @@ from src.config import settings
from src.api import api_router from src.api import api_router
from src.auth.config import auth_settings from src.auth.config import auth_settings
from src.auth.service import get_current_user, get_dev_user
@asynccontextmanager @asynccontextmanager
@ -22,8 +23,8 @@ async def lifespan(_application: FastAPI) -> AsyncGenerator:
if settings.ENVIRONMENT.is_deployed: if settings.ENVIRONMENT.is_deployed:
# Do this only on prod # Just a precaution, should be False anyway
pass settings.DISABLE_AUTH = False
tags_metadata = [ tags_metadata = [
@ -57,4 +58,8 @@ app.add_middleware(
allow_headers=settings.CORS_HEADERS, allow_headers=settings.CORS_HEADERS,
) )
if settings.ENVIRONMENT == "local" and settings.DISABLE_AUTH:
app.dependency_overrides[get_current_user] = get_dev_user
app.include_router(api_router) app.include_router(api_router)

View file

@ -11,18 +11,33 @@ Functions:
""" """
from typing import Annotated from typing import Annotated
from fastapi import HTTPException, Depends from fastapi import Depends, Query
from src.database import db_dependency from src.database import db_dependency
from src.organisation.schemas import OrgIDMixin
from src.organisation.models import Organisation as Org from src.organisation.models import Organisation as Org
from src.organisation.exceptions import OrgNotFoundException
def get_org_model(db: db_dependency, org_id: int) -> type[Org]: def get_org_model_query(db: db_dependency, org_id: Annotated[int, Query(gt=0)]) -> type[Org]:
org_model = db.query(Org).filter(Org.id == org_id).first() org_model = db.get(Org, org_id)
if org_model is None: if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found") raise OrgNotFoundException(org_id)
return org_model return org_model
org_model_dependency = Annotated[type[Org], Depends(get_org_model)] org_model_query_dependency = Annotated[type[Org], Depends(get_org_model_query)]
def get_org_model_body(db: db_dependency, request_model: OrgIDMixin) -> type[Org]:
org_id = getattr(request_model, "organisation_id", None)
if org_id is None:
raise OrgNotFoundException
org_model = db.get(Org, org_id)
if org_model is None:
raise OrgNotFoundException(org_id)
return org_model
org_model_body_dependency = Annotated[type[Org], Depends(get_org_model_body)]

View file

@ -5,3 +5,15 @@ Exceptions:
- List: Description - List: Description
- Exceptions: Description - Exceptions: Description
""" """
from typing import Optional
from fastapi import HTTPException, status
class OrgNotFoundException(HTTPException):
def __init__(self, org_id: Optional[int] = None) -> None:
detail = "Organisation not found" if org_id is None else f"User with ID '{org_id}' was not found."
super().__init__(
status_code=status.HTTP_404_NOT_FOUND,
detail=detail,
)

View file

@ -15,23 +15,22 @@ Endpoints:
from typing import Annotated, Optional from typing import Annotated, Optional
from fastapi import APIRouter, HTTPException, status from fastapi import APIRouter, HTTPException, status
from fastapi.params import Path, Query from fastapi.params import Query
from src.contact.schemas import ContactAddress from src.contact.schemas import ContactAddress
from src.database import db_dependency from src.database import db_dependency
from src.contact.models import Contact from src.contact.models import Contact
from src.user.models import User from src.user.models import User
from src.user.exceptions import UserNotFoundException from src.user.exceptions import UserNotFoundException
from src.auth.service import root_user_dependency, claims_dependency from src.auth.service import root_user_query_dependency, claims_dependency
from src.organisation.dependencies import org_model_dependency from src.organisation.dependencies import org_model_query_dependency, org_model_body_dependency
from src.organisation.constants import ContactType from src.organisation.constants import ContactType
from src.organisation.models import Organisation as Org from src.organisation.models import Organisation as Org
from src.organisation.schemas import OrgOrgPostRequest, OrgQuestionnairePatchRequest, OrgStatusPatchRequest, \ from src.organisation.schemas import OrgOrgPostRequest, OrgQuestionnairePatchRequest, OrgStatusPatchRequest, \
OrgContactPatchRequest, \ OrgContactPatchRequest, \
OrgUserPostRequest, OrgUserGetResponse, OrgContactGetResponse, OrgOrgGetResponse, OrgRootPatchRequest, \ OrgUserPostRequest, OrgUserGetResponse, OrgContactGetResponse, OrgOrgGetResponse, OrgRootPatchRequest, \
OrgGroupGetResponse, OrgUserDeleteRequest OrgGroupGetResponse, OrgUserDeleteRequest, OrgDeleteOrgRequest
router = APIRouter( router = APIRouter(
prefix="/org", prefix="/org",
@ -39,8 +38,8 @@ router = APIRouter(
) )
@router.get("/id/{org_id}", response_model=OrgOrgGetResponse) @router.get("/id", response_model=OrgOrgGetResponse)
async def get_org_by_id(org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]): async def get_org_by_id(org_model: org_model_query_dependency):
response = { response = {
"name": org_model.name, "name": org_model.name,
"status": org_model.status, "status": org_model.status,
@ -54,12 +53,16 @@ async def get_org_by_id(org_model: org_model_dependency, org_id: Annotated[int,
@router.post("/") @router.post("/")
async def create_org(db: db_dependency, user: claims_dependency, org_request: OrgOrgPostRequest): async def create_org(db: db_dependency, user: claims_dependency, request_model: OrgOrgPostRequest):
db_id: Optional[int] = user.get("db_id", None) db_id: Optional[int] = user.get("db_id", None)
if db_id is None: if db_id is None:
raise UserNotFoundException() raise UserNotFoundException()
org_model = Org(name=org_request.name, intake_questionnaire=org_request.intake_questionnaire.model_dump()) if request_model.intake_questionnaire:
intake_questionnaire = request_model.intake_questionnaire.model_dump()
else:
intake_questionnaire = None
org_model = Org(name=request_model.name, intake_questionnaire=intake_questionnaire)
org_model.status = "partial" # Status is always set to partial at first, see update_questionnaire() doc org_model.status = "partial" # Status is always set to partial at first, see update_questionnaire() doc
@ -77,67 +80,70 @@ async def create_org(db: db_dependency, user: claims_dependency, org_request: Or
db.commit() db.commit()
@router.patch("/{org_id}/questionnaire") @router.patch("/questionnaire")
async def update_questionnaire(db: db_dependency, org_model: org_model_dependency, q_request: OrgQuestionnairePatchRequest, org_id: Annotated[int, Path(gt=0)]): async def update_questionnaire(db: db_dependency, org_model: org_model_body_dependency, request_model: OrgQuestionnairePatchRequest):
""" """
Route for updating questionnaire. Route for updating questionnaire.
The partial bool allows for submission of partially completed questionnaire and/or The partial bool allows for submission of partially completed questionnaire and/or
final "are you sure" check before setting the org to be in "submitted" status, awaiting admin approval. final "are you sure" check before setting the org to be in "submitted" status, awaiting admin approval.
""" """
org_model.intake_questionnaire = q_request.intake_questionnaire.model_dump() org_model.intake_questionnaire = request_model.intake_questionnaire.model_dump()
# Allows for partially completed questionnaires to be saved without being submitted for review # Allows for partially completed questionnaires to be saved without being submitted for review
if not q_request.partial: if not request_model.partial:
org_model.status = "submitted" org_model.status = "submitted"
db.commit() db.commit()
@router.patch("/{org_id}/status") @router.patch("/status")
async def update_status(db: db_dependency, org_model: org_model_dependency, status_request: OrgStatusPatchRequest, org_id: Annotated[int, Path(gt=0)]): async def update_status(db: db_dependency, org_model: org_model_body_dependency, request_model: OrgStatusPatchRequest):
org_model.status = status_request.status org_model.status = request_model.status
db.commit() db.commit()
@router.get("/{org_id}/users", response_model=OrgUserGetResponse) @router.get("/users", response_model=OrgUserGetResponse)
async def get_users(org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]): async def get_users(org_model: org_model_query_dependency):
return {"users": [user.email for user in org_model.user_rel]} return {"users": [user.email for user in org_model.user_rel]}
@router.post("/{org_id}/users") @router.post("/users")
async def add_user_to_org(db: db_dependency, org_model: org_model_dependency, user_request: OrgUserPostRequest, org_id: Annotated[int, Path(gt=0)]): async def add_user_to_org(db: db_dependency, org_model: org_model_body_dependency, request_model: OrgUserPostRequest):
user_model = db.get(User, user_request.user_id) # TODO: user_model_body_dependency
user_model = db.get(User, request_model.user_id)
if user_model in org_model.user_rel: if user_model in org_model.user_rel:
return return
org_model.user_rel.append(user_model) org_model.user_rel.append(user_model)
db.commit() db.commit()
@router.delete("/{org_id}", status_code=status.HTTP_204_NO_CONTENT) @router.delete("/", status_code=status.HTTP_204_NO_CONTENT)
async def delete_organisation_by_id(db: db_dependency, org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]): async def delete_organisation_by_id(db: db_dependency, org_model: org_model_body_dependency, request_model: OrgDeleteOrgRequest):
db.delete(org_model) db.delete(org_model)
db.commit() db.commit()
@router.patch("/{org_id}/root_user", status_code=status.HTTP_204_NO_CONTENT) @router.patch("/root_user", status_code=status.HTTP_204_NO_CONTENT)
async def update_root_user(db: db_dependency, org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)], user_request: OrgRootPatchRequest): async def update_root_user(db: db_dependency, org_model: org_model_body_dependency, request_model: OrgRootPatchRequest):
root_user_model = db.get(User, user_request.user_id) # TODO: user_model_body_dependency
root_user_model = db.get(User, request_model.user_id)
if root_user_model is None: if root_user_model is None:
raise UserNotFoundException(user_id=user_request.user_id) raise UserNotFoundException(user_id=request_model.user_id)
org_model.root_user_rel = root_user_model org_model.root_user_rel = root_user_model
db.commit() db.commit()
@router.get("/{org_id}/groups", response_model=OrgGroupGetResponse) @router.get("/groups", response_model=OrgGroupGetResponse)
async def get_org_groups(org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]): async def get_org_groups(org_model: org_model_query_dependency):
return {"groups": [group.name for group in org_model.group_rel]} return {"groups": [group.name for group in org_model.group_rel]}
@router.delete("/{org_id}/user", status_code=status.HTTP_204_NO_CONTENT) @router.delete("/user", status_code=status.HTTP_204_NO_CONTENT)
async def remove_user_from_org(db: db_dependency, org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)], user_request: OrgUserDeleteRequest): async def remove_user_from_org(db: db_dependency, org_model: org_model_body_dependency, request_model: OrgUserDeleteRequest):
user_id = user_request.user_id # TODO: user_model_body_dependency
user_id = request_model.user_id
user = db.get(User, user_id) user = db.get(User, user_id)
if user is None: if user is None:
@ -149,8 +155,9 @@ async def remove_user_from_org(db: db_dependency, org_model: org_model_dependenc
org_model.user_rel.remove(user) org_model.user_rel.remove(user)
db.commit() db.commit()
@router.get("/{org_id}/contact", response_model=OrgContactGetResponse)
async def get_contact(org_model: org_model_dependency, contact_type: Annotated[ContactType, Query()], org_id: Annotated[int, Path(gt=0)]): @router.get("/contact", response_model=OrgContactGetResponse)
async def get_contact(org_model: org_model_query_dependency, contact_type: Annotated[ContactType, Query()]):
match contact_type: match contact_type:
case "billing": case "billing":
contact_model = org_model.billing_contact_rel contact_model = org_model.billing_contact_rel
@ -170,10 +177,9 @@ async def get_contact(org_model: org_model_dependency, contact_type: Annotated[C
) )
@router.patch("/contact", response_model=OrgContactGetResponse)
@router.patch("/{org_id}/contact", response_model=OrgContactGetResponse) async def update_contact(db: db_dependency, org_model: org_model_body_dependency, request_model: OrgContactPatchRequest):
async def update_contact(db: db_dependency, org_model: org_model_dependency, contact_type: Annotated[ContactType, Query()], contact_request: OrgContactPatchRequest, org_id: Annotated[int, Path(gt=0)]): match request_model.contact_type:
match contact_type:
case "billing": case "billing":
contact_model = org_model.billing_contact_rel contact_model = org_model.billing_contact_rel
case "security": case "security":
@ -186,7 +192,7 @@ async def update_contact(db: db_dependency, org_model: org_model_dependency, con
if contact_model is None: if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found") raise HTTPException(status_code=404, detail="Contact not found")
update_data = contact_request.model_dump(exclude_none=True) update_data = request_model.model_dump(exclude_none=True)
for key, value in update_data.items(): for key, value in update_data.items():
if hasattr(contact_model, key): if hasattr(contact_model, key):
setattr(contact_model, key, value) setattr(contact_model, key, value)

View file

@ -18,19 +18,23 @@ class OrgQuestionnaire(CustomBaseModel):
question_two: str question_two: str
question_three: str question_three: str
class OrgIDMixin(CustomBaseModel):
organisation_id: int
class OrgOrgPostRequest(CustomBaseModel): class OrgOrgPostRequest(CustomBaseModel):
name: str name: str
intake_questionnaire: Optional[OrgQuestionnaire] = None intake_questionnaire: Optional[OrgQuestionnaire] = None
class OrgQuestionnairePatchRequest(CustomBaseModel): class OrgQuestionnairePatchRequest(OrgIDMixin):
intake_questionnaire: OrgQuestionnaire intake_questionnaire: OrgQuestionnaire
partial: bool partial: bool
class OrgStatusPatchRequest(CustomBaseModel): class OrgStatusPatchRequest(OrgIDMixin):
status: Status status: Status
class OrgContactPatchRequest(CustomBaseModel): class OrgContactPatchRequest(OrgIDMixin):
contact_type: ContactType
email: Optional[EmailStr] = None email: Optional[EmailStr] = None
first_name: Optional[str] = None first_name: Optional[str] = None
last_name: Optional[str] = None last_name: Optional[str] = None
@ -44,13 +48,13 @@ class OrgContactPatchRequest(CustomBaseModel):
country_code: Optional[str] = None country_code: Optional[str] = None
postal_code: Optional[str] = None postal_code: Optional[str] = None
class OrgUserPostRequest(CustomBaseModel): class OrgUserPostRequest(OrgIDMixin):
user_id: int user_id: int
class OrgUserDeleteRequest(CustomBaseModel): class OrgUserDeleteRequest(OrgIDMixin):
user_id: int user_id: int
class OrgRootPatchRequest(CustomBaseModel): class OrgRootPatchRequest(OrgIDMixin):
user_id: int user_id: int
class OrgUserGetResponse(CustomBaseModel): class OrgUserGetResponse(CustomBaseModel):
@ -77,3 +81,6 @@ class OrgOrgGetResponse(CustomBaseModel):
owner_contact: Optional[str] = None owner_contact: Optional[str] = None
billing_contact: Optional[str] = None billing_contact: Optional[str] = None
security_contact: Optional[str] = None security_contact: Optional[str] = None
class OrgDeleteOrgRequest(OrgIDMixin):
pass

View file

@ -5,59 +5,63 @@ Endpoints:
- List: Description - List: Description
- Endpoints: Description - Endpoints: Description
""" """
from fastapi import APIRouter from typing import Annotated
from fastapi import APIRouter, HTTPException, status
from fastapi.params import Path
from src.database import db_dependency from src.database import db_dependency
from src.service.models import Service from src.service.models import Service
from src.service.utils import generate_api_key from src.service.utils import generate_api_key
from src.service.schemas import ServiceGetServiceResponse, ServicePostServiceRequest, ServicePostServiceResponse, \
ServiceWithKeyResponse, ServicePatchKeyResponse
router = APIRouter( router = APIRouter(
tags=["Service"], tags=["Service"],
prefix="/service", prefix="/service",
) )
@router.get("/") @router.get("/", response_model=ServiceGetServiceResponse)
async def get_all_services(db: db_dependency): async def get_all_services(db: db_dependency):
# TODO: user_dependency # TODO: user_dependency
# TODO: request model
permission_models = db.query(Service).all() permission_models = db.query(Service).all()
# TODO: Response model return {"services": permission_models}
return permission_models
@router.post("/") @router.post("/", response_model=ServicePostServiceResponse)
async def register_service(db: db_dependency, service_name: str): async def register_service(db: db_dependency, service_request: ServicePostServiceRequest):
# TODO: super_admin_dependency # TODO: super_admin_dependency
# TODO: request model
key = generate_api_key() key = generate_api_key()
service_model = Service(name=service_name, api_key=key) service_model = Service(name=service_request.name, api_key=key)
db.add(service_model) db.add(service_model)
db.flush()
response = ServiceWithKeyResponse(**service_model.__dict__)
db.commit() db.commit()
# TODO: response model return {"service": response}
@router.patch("/{service_id}/key") @router.patch("/{service_id}/key", response_model=ServicePatchKeyResponse)
async def regenerate_api_key(db: db_dependency, service_id: int): async def regenerate_api_key(db: db_dependency, service_id: Annotated[int, Path(gt=0,description="Service database ID")]):
# TODO: super_admin_dependency # TODO: super_admin_dependency
# TODO: request model service_model = db.get(Service, service_id)
key = generate_api_key()
service_model = db.query(Service).filter(Service.id==service_id).first()
service_model.api_key = key
db.add(service_model)
db.commit()
# TODO: response model
@router.delete("/{service_id}")
async def remove_service(db: db_dependency, service_id: int):
# TODO: super_admin_dependency
# TODO: request model
service_model = db.query(Service).filter(Service.id==service_id).first()
if service_model is None: if service_model is None:
return raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Service not found")
key = generate_api_key()
service_model.api_key = key
db.flush()
response = ServiceWithKeyResponse(**service_model.__dict__)
db.commit()
return {"service": response}
@router.delete("/{service_id}", status_code=status.HTTP_204_NO_CONTENT)
async def remove_service(db: db_dependency, service_id: Annotated[int, Path(gt=0,description="Service database ID")]):
# TODO: super_admin_dependency
service_model = db.get(Service, service_id)
if service_model is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Service not found")
db.delete(service_model) db.delete(service_model)
db.commit() db.commit()
# TODO: response model

View file

@ -1,7 +1,31 @@
""" """
Pydantic models for <this module> Pydantic models for the service module
Models: Models:
- List: Description - List: Description
- Models: Description - Models: Description
""" """
from pydantic import ConfigDict
from src.schemas import CustomBaseModel
class ServiceResponse(CustomBaseModel):
model_config = ConfigDict(from_attributes=True, extra="ignore")
id: int
name: str
class ServiceWithKeyResponse(ServiceResponse):
api_key: str
class ServiceGetServiceResponse(CustomBaseModel):
services: list[ServiceResponse]
class ServicePostServiceRequest(CustomBaseModel):
name: str
class ServicePostServiceResponse(CustomBaseModel):
service: ServiceWithKeyResponse
class ServicePatchKeyResponse(CustomBaseModel):
service: ServiceWithKeyResponse

View file

@ -10,7 +10,6 @@ from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from src.database import Base from src.database import Base
from src.iam.models import Group
class User(Base): class User(Base):