feat: auth library upgrade

The parts of Authlib used are now deprecated in favour of JoseRFC.
This commit is contained in:
Chris Milne 2026-05-19 09:49:27 +01:00
parent 376a7a9fe5
commit 34cb4414c9
3 changed files with 24 additions and 22 deletions

View file

@ -18,3 +18,4 @@ httptools
psycopg
email-validator
alembic
joserfc

View file

@ -5,16 +5,15 @@ Exports:
- claims_dependency
"""
import json
import requests
from typing import Annotated
from authlib.jose import jwt
from typing import Annotated, Any
from joserfc import jwt
from joserfc.jwk import KeySet
from urllib.request import urlopen
from fastapi import Depends, HTTPException, Path
from fastapi.security import OpenIdConnect
from authlib.jose.rfc7517.jwk import JsonWebKey
from authlib.jose.rfc7517.key_set import KeySet
from authlib.oauth2.rfc7523.validator import JWTBearerToken
from sqlalchemy.sql import exists
from src.auth.config import auth_settings
@ -27,12 +26,12 @@ oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG)
oidc_dependency = Annotated[str, Depends(oidc)]
async def get_current_user(oidc_auth_string: oidc_dependency) -> JWTBearerToken:
async def get_current_user(oidc_auth_string: oidc_dependency) -> dict[str, Any]:
config_url = urlopen(auth_settings.OIDC_CONFIG)
config = json.loads(config_url.read())
jwks_uri = config["jwks_uri"]
key_response = urlopen(jwks_uri)
jwk_keys: KeySet = JsonWebKey.import_key_set(json.loads(key_response.read()))
key_response = requests.get(jwks_uri)
jwk_keys = KeySet.import_key_set(key_response.json())
claims_options = {
"exp": {"essential": True},
@ -40,22 +39,23 @@ async def get_current_user(oidc_auth_string: oidc_dependency) -> JWTBearerToken:
"iss": {"essential": True, "value": auth_settings.OIDC_ISSUER},
}
claims: JWTBearerToken = jwt.decode(
token = jwt.decode(
oidc_auth_string.replace("Bearer ", ""),
jwk_keys,
claims_options=claims_options,
claims_cls=JWTBearerToken,
jwk_keys
)
claims.validate()
db_id = await add_user_to_db(claims)
claims_requests = jwt.JWTClaimsRegistry(**claims_options)
claims["db_id"] = db_id
claims_requests.validate(token.claims)
return claims
db_id = await add_user_to_db(token.claims)
token.claims["db_id"] = db_id
return token.claims
claims_dependency = Annotated[JWTBearerToken, Depends(get_current_user)]
claims_dependency = Annotated[dict[str, Any], Depends(get_current_user)]
async def is_org_user(claims: claims_dependency, db: db_dependency, org_id: int = Path(gt=0)):
@ -81,7 +81,7 @@ async def is_org_user(claims: claims_dependency, db: db_dependency, org_id: int
return org_user_exists
org_user_dependency = Annotated[JWTBearerToken, Depends(is_org_user)]
org_user_dependency = Annotated[dict[str, Any], Depends(is_org_user)]
async def is_org_admin(claims: claims_dependency, db: db_dependency, org_id: int = Path(gt=0)):
@ -108,7 +108,7 @@ async def is_org_admin(claims: claims_dependency, db: db_dependency, org_id: int
return org_admin_exists
org_admin_dependency = Annotated[JWTBearerToken, Depends(is_org_admin)]
org_admin_dependency = Annotated[dict[str, Any], Depends(is_org_admin)]
async def is_super_admin(claims: claims_dependency):
@ -123,7 +123,7 @@ async def is_super_admin(claims: claims_dependency):
return True
super_admin_dependency = Annotated[JWTBearerToken, Depends(is_super_admin)]
super_admin_dependency = Annotated[dict[str, Any], Depends(is_super_admin)]

View file

@ -7,7 +7,8 @@ Functions:
Exports:
- add_user_to_db
"""
from authlib.jose import JWTClaims
from typing import Any
from fastapi import HTTPException
from src.user.schemas import OIDCUser
@ -15,7 +16,7 @@ from src.user.models import User
from src.database import get_db
async def add_user_to_db(user_claims: JWTClaims) -> int:
async def add_user_to_db(user_claims: dict[str, Any]) -> int:
try:
valid_user = OIDCUser(first_name=user_claims["given_name"], last_name=user_claims["family_name"], email=user_claims["email"], oidc_id=user_claims["sub"])
except Exception as e: