2026-06-10 14:14:22 +01:00
|
|
|
from datetime import datetime, timezone
|
2026-06-09 12:22:36 +01:00
|
|
|
from joserfc import jwt, jwk, errors
|
|
|
|
|
|
2026-06-09 14:45:40 +01:00
|
|
|
from src.auth.exceptions import UnauthorizedException
|
2026-06-09 12:22:36 +01:00
|
|
|
from src.config import settings
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
KEY = jwk.import_key(settings.SECRET_KEY.get_secret_value(), "oct")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def generate_jwt(claims):
|
|
|
|
|
jwt_token = jwt.encode(header={"alg": "HS256"}, key=KEY, claims=claims)
|
|
|
|
|
|
|
|
|
|
return jwt_token
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def decode_jwt(encoded):
|
|
|
|
|
try:
|
|
|
|
|
token = jwt.decode(encoded, key=KEY)
|
|
|
|
|
return token.claims
|
|
|
|
|
except errors.DecodeError:
|
|
|
|
|
raise UnauthorizedException("Invalid JWS")
|
|
|
|
|
|
|
|
|
|
|
2026-06-10 14:14:22 +01:00
|
|
|
async def verify_email_token(user_model, token):
|
|
|
|
|
email_claims = await decode_jwt(token)
|
|
|
|
|
|
|
|
|
|
claimed_email = email_claims["email"]
|
|
|
|
|
|
|
|
|
|
expiry = datetime.fromtimestamp(email_claims["exp"], timezone.utc)
|
|
|
|
|
|
|
|
|
|
if expiry < datetime.now(timezone.utc):
|
|
|
|
|
raise UnauthorizedException("Invitation expired.")
|
|
|
|
|
|
|
|
|
|
if user_model.email != claimed_email:
|
|
|
|
|
raise UnauthorizedException("The logged in user and email do not match.")
|
|
|
|
|
|
|
|
|
|
return email_claims
|
|
|
|
|
|
|
|
|
|
|
2026-06-09 12:22:36 +01:00
|
|
|
async def send_email(recipient: str, subject: str, body: str):
|
|
|
|
|
print(recipient)
|
|
|
|
|
print(subject)
|
|
|
|
|
print(body)
|