From 34cb4414c92d6e670bfc37c752e27bfdb380497d Mon Sep 17 00:00:00 2001 From: luxferre Date: Tue, 19 May 2026 09:49:27 +0100 Subject: [PATCH] feat: auth library upgrade The parts of Authlib used are now deprecated in favour of JoseRFC. --- requirements.txt | 1 + src/auth/service.py | 40 ++++++++++++++++++++-------------------- src/user/service.py | 5 +++-- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/requirements.txt b/requirements.txt index 007020b..b48c39d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,3 +18,4 @@ httptools psycopg email-validator alembic +joserfc diff --git a/src/auth/service.py b/src/auth/service.py index 80d49c5..aee2aae 100644 --- a/src/auth/service.py +++ b/src/auth/service.py @@ -5,16 +5,15 @@ Exports: - claims_dependency """ import json +import requests -from typing import Annotated -from authlib.jose import jwt +from typing import Annotated, Any +from joserfc import jwt +from joserfc.jwk import KeySet from urllib.request import urlopen from fastapi import Depends, HTTPException, Path from fastapi.security import OpenIdConnect -from authlib.jose.rfc7517.jwk import JsonWebKey -from authlib.jose.rfc7517.key_set import KeySet -from authlib.oauth2.rfc7523.validator import JWTBearerToken from sqlalchemy.sql import exists from src.auth.config import auth_settings @@ -27,12 +26,12 @@ oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG) oidc_dependency = Annotated[str, Depends(oidc)] -async def get_current_user(oidc_auth_string: oidc_dependency) -> JWTBearerToken: +async def get_current_user(oidc_auth_string: oidc_dependency) -> dict[str, Any]: config_url = urlopen(auth_settings.OIDC_CONFIG) config = json.loads(config_url.read()) jwks_uri = config["jwks_uri"] - key_response = urlopen(jwks_uri) - jwk_keys: KeySet = JsonWebKey.import_key_set(json.loads(key_response.read())) + key_response = requests.get(jwks_uri) + jwk_keys = KeySet.import_key_set(key_response.json()) claims_options = { "exp": {"essential": True}, @@ -40,22 +39,23 @@ async def get_current_user(oidc_auth_string: oidc_dependency) -> JWTBearerToken: "iss": {"essential": True, "value": auth_settings.OIDC_ISSUER}, } - claims: JWTBearerToken = jwt.decode( + token = jwt.decode( oidc_auth_string.replace("Bearer ", ""), - jwk_keys, - claims_options=claims_options, - claims_cls=JWTBearerToken, + jwk_keys ) - claims.validate() - db_id = await add_user_to_db(claims) + claims_requests = jwt.JWTClaimsRegistry(**claims_options) - claims["db_id"] = db_id + claims_requests.validate(token.claims) - return claims + db_id = await add_user_to_db(token.claims) + + token.claims["db_id"] = db_id + + return token.claims -claims_dependency = Annotated[JWTBearerToken, Depends(get_current_user)] +claims_dependency = Annotated[dict[str, Any], Depends(get_current_user)] async def is_org_user(claims: claims_dependency, db: db_dependency, org_id: int = Path(gt=0)): @@ -81,7 +81,7 @@ async def is_org_user(claims: claims_dependency, db: db_dependency, org_id: int return org_user_exists -org_user_dependency = Annotated[JWTBearerToken, Depends(is_org_user)] +org_user_dependency = Annotated[dict[str, Any], Depends(is_org_user)] async def is_org_admin(claims: claims_dependency, db: db_dependency, org_id: int = Path(gt=0)): @@ -108,7 +108,7 @@ async def is_org_admin(claims: claims_dependency, db: db_dependency, org_id: int return org_admin_exists -org_admin_dependency = Annotated[JWTBearerToken, Depends(is_org_admin)] +org_admin_dependency = Annotated[dict[str, Any], Depends(is_org_admin)] async def is_super_admin(claims: claims_dependency): @@ -123,7 +123,7 @@ async def is_super_admin(claims: claims_dependency): return True -super_admin_dependency = Annotated[JWTBearerToken, Depends(is_super_admin)] +super_admin_dependency = Annotated[dict[str, Any], Depends(is_super_admin)] diff --git a/src/user/service.py b/src/user/service.py index 8549c4c..b97d153 100644 --- a/src/user/service.py +++ b/src/user/service.py @@ -7,7 +7,8 @@ Functions: Exports: - add_user_to_db """ -from authlib.jose import JWTClaims +from typing import Any + from fastapi import HTTPException from src.user.schemas import OIDCUser @@ -15,7 +16,7 @@ from src.user.models import User from src.database import get_db -async def add_user_to_db(user_claims: JWTClaims) -> int: +async def add_user_to_db(user_claims: dict[str, Any]) -> int: try: valid_user = OIDCUser(first_name=user_claims["given_name"], last_name=user_claims["family_name"], email=user_claims["email"], oidc_id=user_claims["sub"]) except Exception as e: