1
0
Fork 0
forked from sr2/cloud-api

feat: sua expiry handling

This commit is contained in:
Chris Milne 2026-06-10 14:14:22 +01:00
parent 294baadcb7
commit ec572aa4c1
3 changed files with 25 additions and 13 deletions

View file

@ -75,7 +75,7 @@ from src.iam.schemas import (
IAMPutGroupInvitationRequest, IAMPutGroupInvitationRequest,
IAMPutGroupInvitationAcceptRequest, IAMPutGroupInvitationAcceptRequest,
) )
from src.utils import decode_jwt from src.utils import verify_email_token
router = APIRouter( router = APIRouter(
tags=["IAM"], tags=["IAM"],
@ -373,11 +373,9 @@ async def accept_invitation(
user_model: user_model_claims_dependency, user_model: user_model_claims_dependency,
request_model: IAMPutGroupInvitationAcceptRequest, request_model: IAMPutGroupInvitationAcceptRequest,
): ):
email_claims = await decode_jwt(request_model.jwt) email_claims = await verify_email_token(
claimed_email = email_claims["email"] token=request_model.jwt, user_model=user_model
)
if user_model.email != claimed_email:
raise UnauthorizedException("The logged in user and email do not match.")
org_model = db.get(Org, email_claims["org_id"]) org_model = db.get(Org, email_claims["org_id"])
if org_model is None: if org_model is None:

View file

@ -10,7 +10,6 @@ Endpoints:
from fastapi import APIRouter, status, BackgroundTasks from fastapi import APIRouter, status, BackgroundTasks
from src.auth.exceptions import UnauthorizedException
from src.organisation.exceptions import OrgNotFoundException from src.organisation.exceptions import OrgNotFoundException
from src.user.schemas import ( from src.user.schemas import (
UserResponse, UserResponse,
@ -32,7 +31,7 @@ from src.auth.dependencies import (
) )
from src.auth.service import claims_dependency from src.auth.service import claims_dependency
from src.database import db_dependency from src.database import db_dependency
from src.utils import decode_jwt from src.utils import verify_email_token
router = APIRouter( router = APIRouter(
prefix="/user", prefix="/user",
@ -181,11 +180,9 @@ async def accept_invitation(
user_model: user_model_claims_dependency, user_model: user_model_claims_dependency,
request_model: UserPostInvitationAcceptRequest, request_model: UserPostInvitationAcceptRequest,
): ):
email_claims = await decode_jwt(request_model.jwt) email_claims = await verify_email_token(
claimed_email = email_claims["email"] token=request_model.jwt, user_model=user_model
)
if user_model.email != claimed_email:
raise UnauthorizedException("The logged in user and email do not match.")
org_model = db.get(Org, email_claims["org_id"]) org_model = db.get(Org, email_claims["org_id"])
if org_model is None: if org_model is None:

View file

@ -1,3 +1,4 @@
from datetime import datetime, timezone
from joserfc import jwt, jwk, errors from joserfc import jwt, jwk, errors
from src.auth.exceptions import UnauthorizedException from src.auth.exceptions import UnauthorizedException
@ -21,6 +22,22 @@ async def decode_jwt(encoded):
raise UnauthorizedException("Invalid JWS") raise UnauthorizedException("Invalid JWS")
async def verify_email_token(user_model, token):
email_claims = await decode_jwt(token)
claimed_email = email_claims["email"]
expiry = datetime.fromtimestamp(email_claims["exp"], timezone.utc)
if expiry < datetime.now(timezone.utc):
raise UnauthorizedException("Invitation expired.")
if user_model.email != claimed_email:
raise UnauthorizedException("The logged in user and email do not match.")
return email_claims
async def send_email(recipient: str, subject: str, body: str): async def send_email(recipient: str, subject: str, body: str):
print(recipient) print(recipient)
print(subject) print(subject)