Compare commits

...

4 commits

Author SHA1 Message Date
d3d3b2ca63 feat: auth dependencies
These dependencies require `user_model_claims_dependency` which requires the `claims_dependency`. This caused an import loop error and therefore they must be defined in a different file from `claims_dependency`.

Resolves #6
2026-05-27 14:30:11 +01:00
75f5bc79da feat: service dependencies
Issue #6
2026-05-27 14:29:09 +01:00
d0c8c6c297 minor: typo in org exception 2026-05-27 14:29:09 +01:00
748544fb82 feat: user dependencies
In addition to the by-query and by-body db fetch dependencies. Users also have a by-claim dependency.

Issue #6
2026-05-27 14:29:09 +01:00
11 changed files with 160 additions and 112 deletions

View file

@ -9,3 +9,40 @@ Functions:
- List: Description
- Functions: Description
"""
from typing import Annotated, Any
from fastapi import Depends, HTTPException
from src.user.dependencies import user_model_claims_dependency
from src.organisation.dependencies import org_model_query_dependency
async def org_query_user_claims(org_model: org_model_query_dependency, user_model: user_model_claims_dependency):
if user_model in org_model.user_rel:
return True
raise HTTPException(status_code=401, detail="Not authorised")
org_query_user_claims_dependency = Annotated[dict[str, Any], Depends(org_query_user_claims)]
async def org_query_root_claims(user_model: user_model_claims_dependency, org_model: org_model_query_dependency):
if org_model.root_user_id == user_model.id:
return True
raise HTTPException(status_code=401, detail="Not authorised")
org_query_root_claims_dependency = Annotated[dict[str, Any], Depends(org_query_root_claims)]
async def is_super_admin(user_model: user_model_claims_dependency):
super_admin_emails = []
if user_model.email not in super_admin_emails:
raise HTTPException(status_code=401, detail="Not authorised")
return True
super_admin_dependency = Annotated[dict[str, Any], Depends(is_super_admin)]

View file

@ -13,16 +13,11 @@ from joserfc.errors import ExpiredTokenError
from joserfc.jwk import KeySet
from urllib.request import urlopen
from fastapi import Depends, HTTPException, Path
from fastapi import Depends, HTTPException
from fastapi.security import OpenIdConnect
from sqlalchemy.sql import exists
from src.auth.config import auth_settings
from src.user.service import add_user_to_db
from src.organisation.models import OrgUsers, Organisation as Org
from src.user.models import User
from src.database import db_dependency
from src.organisation.dependencies import org_model_query_dependency
oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG)
@ -65,58 +60,3 @@ async def get_current_user(oidc_auth_string: oidc_dependency) -> dict[str, Any]:
claims_dependency = Annotated[dict[str, Any], Depends(get_current_user)]
async def is_org_user(claims: claims_dependency, db: db_dependency, org_id: int = Path(gt=0)):
org_exists = db.query(exists().where(Org.id == org_id)).scalar()
if not org_exists:
raise HTTPException(status_code=404, detail="Organisation not found")
db_id = claims.get("db_id", None)
if db_id is None:
raise HTTPException(status_code=404, detail="User not found in db")
exists_query = (db.query(OrgUsers)
.filter(OrgUsers.org_id == org_id,
OrgUsers.user_id == db_id
).exists()
)
org_user_exists = db.query(exists_query).scalar()
if not org_user_exists:
raise HTTPException(status_code=401, detail="Not authorised")
return org_user_exists
org_user_dependency = Annotated[dict[str, Any], Depends(is_org_user)]
async def is_org_root_query(claims: claims_dependency, db: db_dependency, org_model: org_model_query_dependency):
db_id = claims.get("db_id", None)
if db_id is None:
raise HTTPException(status_code=404, detail="User not found in db")
if org_model.root_user_id == db_id:
return db.query(User).filter(User.id == db_id).first()
raise HTTPException(status_code=401, detail="Not authorised")
root_user_query_dependency = Annotated[dict[str, Any], Depends(is_org_root_query)]
async def is_super_admin(claims: claims_dependency):
super_admin_ids = []
db_id = claims.get("db_id", None)
if db_id is None:
raise HTTPException(status_code=404, detail="User not found in db")
if db_id not in super_admin_ids:
raise HTTPException(status_code=401, detail="Not authorised")
return True
super_admin_dependency = Annotated[dict[str, Any], Depends(is_super_admin)]

View file

@ -12,7 +12,7 @@ from fastapi import HTTPException, status
class OrgNotFoundException(HTTPException):
def __init__(self, org_id: Optional[int] = None) -> None:
detail = "Organisation not found" if org_id is None else f"User with ID '{org_id}' was not found."
detail = "Organisation not found" if org_id is None else f"Organisation with ID '{org_id}' was not found."
super().__init__(
status_code=status.HTTP_404_NOT_FOUND,
detail=detail,

View file

@ -22,7 +22,7 @@ from src.database import db_dependency
from src.contact.models import Contact
from src.user.models import User
from src.user.exceptions import UserNotFoundException
from src.auth.service import root_user_query_dependency, claims_dependency
from src.auth.service import claims_dependency
from src.organisation.dependencies import org_model_query_dependency, org_model_body_dependency
from src.organisation.constants import ContactType

View file

@ -9,3 +9,31 @@ Functions:
- List: Description
- Functions: Description
"""
from typing import Annotated
from fastapi import Depends, Query
from src.database import db_dependency
from src.service.exceptions import ServiceNotFoundException
from src.service.models import Service
from src.service.schemas import ServiceIDMixin
async def get_service_model_query(db: db_dependency, service_id: Annotated[int, Query(gt=0)]):
service_model = db.get(Service, service_id)
if service_model is None:
raise ServiceNotFoundException(service_id=service_id)
return service_model
service_model_query_dependency = Annotated[type[Service], Depends(get_service_model_query)]
async def get_service_model_body(db: db_dependency, request_model: ServiceIDMixin):
service_model = db.get(Service, request_model.service_id)
if service_model is None:
raise ServiceNotFoundException(service_id=request_model.service_id)
return service_model
service_model_body_dependency = Annotated[type[Service], Depends(get_service_model_body)]

View file

@ -5,3 +5,15 @@ Exceptions:
- List: Description
- Exceptions: Description
"""
from typing import Optional
from fastapi import HTTPException, status
class ServiceNotFoundException(HTTPException):
def __init__(self, service_id: Optional[int] = None) -> None:
detail = "Service not found" if service_id is None else f"Service with ID '{service_id}' was not found."
super().__init__(
status_code=status.HTTP_404_NOT_FOUND,
detail=detail,
)

View file

@ -5,17 +5,15 @@ Endpoints:
- List: Description
- Endpoints: Description
"""
from typing import Annotated
from fastapi import APIRouter, HTTPException, status
from fastapi.params import Path
from fastapi import APIRouter, status
from src.database import db_dependency
from src.service.models import Service
from src.service.utils import generate_api_key
from src.service.dependencies import service_model_body_dependency
from src.service.schemas import ServiceGetServiceResponse, ServicePostServiceRequest, ServicePostServiceResponse, \
ServiceWithKeyResponse, ServicePatchKeyResponse
ServiceWithKeyResponse, ServicePatchKeyResponse, ServicePatchKeyRequest, ServiceDeleteServiceRequest
router = APIRouter(
tags=["Service"],
@ -41,27 +39,19 @@ async def register_service(db: db_dependency, service_request: ServicePostServic
db.commit()
return {"service": response}
@router.patch("/{service_id}/key", response_model=ServicePatchKeyResponse)
async def regenerate_api_key(db: db_dependency, service_id: Annotated[int, Path(gt=0,description="Service database ID")]):
@router.patch("/key", response_model=ServicePatchKeyResponse)
async def regenerate_api_key(db: db_dependency, service_model: service_model_body_dependency, request_model: ServicePatchKeyRequest):
# TODO: super_admin_dependency
service_model = db.get(Service, service_id)
if service_model is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Service not found")
key = generate_api_key()
service_model.api_key = key
db.flush()
response = ServiceWithKeyResponse(**service_model.__dict__)
db.commit()
return {"service": response}
@router.delete("/{service_id}", status_code=status.HTTP_204_NO_CONTENT)
async def remove_service(db: db_dependency, service_id: Annotated[int, Path(gt=0,description="Service database ID")]):
@router.delete("/", status_code=status.HTTP_204_NO_CONTENT)
async def remove_service(db: db_dependency, service_model: service_model_body_dependency, request_model: ServiceDeleteServiceRequest):
# TODO: super_admin_dependency
service_model = db.get(Service, service_id)
if service_model is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Service not found")
db.delete(service_model)
db.commit()

View file

@ -9,6 +9,9 @@ from pydantic import ConfigDict
from src.schemas import CustomBaseModel
class ServiceIDMixin(CustomBaseModel):
service_id: int
class ServiceResponse(CustomBaseModel):
model_config = ConfigDict(from_attributes=True, extra="ignore")
@ -27,5 +30,11 @@ class ServicePostServiceRequest(CustomBaseModel):
class ServicePostServiceResponse(CustomBaseModel):
service: ServiceWithKeyResponse
class ServicePatchKeyRequest(ServiceIDMixin):
pass
class ServicePatchKeyResponse(CustomBaseModel):
service: ServiceWithKeyResponse
class ServiceDeleteServiceRequest(ServiceIDMixin):
pass

View file

@ -9,3 +9,46 @@ Functions:
- List: Description
- Functions: Description
"""
from typing import Annotated
from fastapi import Depends, Query
from src.user.exceptions import UserNotFoundException
from src.user.models import User
from src.auth.service import claims_dependency
from src.database import db_dependency
from src.user.schemas import UserIDMixin
async def get_user_model_claims(claims: claims_dependency, db: db_dependency):
user_id = claims.get("db_id", None)
if user_id is None:
raise UserNotFoundException()
user_model = db.get(User, user_id)
if user_model is None:
raise UserNotFoundException(user_id=user_id)
return user_model
user_model_claims_dependency = Annotated[type[User], Depends(get_user_model_claims)]
async def get_user_model_query(db: db_dependency, user_id: Annotated[int, Query(gt=0)]):
user_model = db.get(User, user_id)
if user_model is None:
raise UserNotFoundException(user_id=user_id)
return user_model
user_model_query_dependency = Annotated[type[User], Depends(get_user_model_query)]
async def get_user_model_body(db: db_dependency, request_model: UserIDMixin):
user_model = db.get(User, request_model.user_id)
if user_model is None:
raise UserNotFoundException(user_id=request_model.user_id)
return user_model
user_model_body_dependency = Annotated[type[User], Depends(get_user_model_body)]

View file

@ -11,15 +11,11 @@ Endpoints:
- [get]/{user_id}/orgs/admin - Retrieves only admin organisations for a specific user
- [delete]/{user_id} - Deletes a user from the db by their db ID
"""
from typing import Annotated
from fastapi import APIRouter
from fastapi.params import Path
from starlette import status
from src.user.models import User
from src.user.schemas import UserResponse, OIDCClaims
from src.user.exceptions import UserNotFoundException
from src.user.schemas import UserResponse, OIDCClaims, UserDeleteUserRequest
from src.user.dependencies import user_model_claims_dependency, user_model_query_dependency, user_model_body_dependency
from src.auth.service import claims_dependency
from src.database import db_dependency
@ -45,46 +41,31 @@ async def current_user_claims(user: claims_dependency):
status.HTTP_404_NOT_FOUND: {"description": "User not found"},
status.HTTP_200_OK: {"description": "Successful retrieval from database"},
})
async def current_user(user: claims_dependency, db: db_dependency):
async def current_user(user_model: user_model_claims_dependency):
"""
Returns the database details associated with the currently logged-in user.
"""
user_id = user.get("db_id", None)
if user_id is None:
raise UserNotFoundException()
user_model = (db.query(User).filter(User.id == user_id).first())
if user_model is None:
raise UserNotFoundException(user_id=user_id)
return user_model
@router.get("/{user_id}", response_model=UserResponse, status_code=status.HTTP_200_OK, responses={
@router.get("/", response_model=UserResponse, status_code=status.HTTP_200_OK, responses={
status.HTTP_404_NOT_FOUND: {"description": "User not found"},
status.HTTP_200_OK: {"description": "Successful retrieval from database"},
})
async def get_user_by_id(db: db_dependency, user_id: Annotated[int, Path(gt=0,description="User database ID")]):
async def get_user_by_id(user_model: user_model_query_dependency):
"""
Returns the database details associated with the provided user ID.
"""
user_model = (db.query(User).filter(User.id == user_id).first())
if user_model is None:
raise UserNotFoundException(user_id=user_id)
return user_model
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT, responses={
@router.delete("/", status_code=status.HTTP_204_NO_CONTENT, responses={
status.HTTP_204_NO_CONTENT: {"description": "User deleted"},
status.HTTP_404_NOT_FOUND: {"description": "User not found"},
})
async def delete_user_by_id(user_id: Annotated[int, Path(gt=0)], db: db_dependency):
async def delete_user_by_id(db: db_dependency, user_model: user_model_body_dependency, request_model: UserDeleteUserRequest):
"""
Deletes the user with the provided ID from the database. This will not remove them from OIDC, and they will be automatically readded on next login.
"""
user_model = (db.query(User).filter(User.id == user_id).first())
if user_model is None:
raise UserNotFoundException(user_id=user_id)
db.delete(user_model)
db.commit()

View file

@ -9,6 +9,10 @@ from typing import Optional
from src.schemas import CustomBaseModel
class UserIDMixin(CustomBaseModel):
user_id: int
class OIDCClaims(CustomBaseModel):
exp: int
iat: int
@ -52,3 +56,7 @@ class UserResponse(CustomBaseModel):
class OrgResponse(CustomBaseModel):
org_id: int
name: str
class UserDeleteUserRequest(UserIDMixin):
pass