diff --git a/src/auth/service.py b/src/auth/service.py index 28f1c9c..52a5275 100644 --- a/src/auth/service.py +++ b/src/auth/service.py @@ -6,16 +6,15 @@ Exports: - authed_dependency """ import json +import requests -from typing import Annotated -from authlib.jose import jwt +from typing import Annotated, Any +from joserfc import jwt from urllib.request import urlopen from fastapi import Depends, HTTPException from fastapi.security import OpenIdConnect -from authlib.jose.rfc7517.jwk import JsonWebKey -from authlib.jose.rfc7517.key_set import KeySet -from authlib.oauth2.rfc7523.validator import JWTBearerToken +from joserfc.jwk import KeySet from src.auth.config import auth_settings @@ -24,12 +23,12 @@ oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG) oidc_dependency = Annotated[str, Depends(oidc)] -async def get_current_user(oidc_auth_string: oidc_dependency) -> JWTBearerToken: +async def get_current_user(oidc_auth_string: oidc_dependency) -> dict[str, Any]: config_url = urlopen(auth_settings.OIDC_CONFIG) config = json.loads(config_url.read()) jwks_uri = config["jwks_uri"] - key_response = urlopen(jwks_uri) - jwk_keys: KeySet = JsonWebKey.import_key_set(json.loads(key_response.read())) + key_response = requests.get(jwks_uri) + jwk_keys = KeySet.import_key_set(key_response.json()) claims_options = { "exp": {"essential": True}, @@ -37,19 +36,19 @@ async def get_current_user(oidc_auth_string: oidc_dependency) -> JWTBearerToken: "iss": {"essential": True, "value": auth_settings.OIDC_ISSUER}, } - claims: JWTBearerToken = jwt.decode( + token = jwt.decode( oidc_auth_string.replace("Bearer ", ""), - jwk_keys, - claims_options=claims_options, - claims_cls=JWTBearerToken, + jwk_keys ) - claims.validate() + claims_requests = jwt.JWTClaimsRegistry(**claims_options) - return claims + claims_requests.validate(token.claims) + + return token.claims -claims_dependency = Annotated[JWTBearerToken, Depends(get_current_user)] +claims_dependency = Annotated[dict[str, Any], Depends(get_current_user)] async def is_authed_user(claims: claims_dependency) -> bool: