""" Module specific business logic for auth module Exports: - claims_dependency - authed_dependency """ import json import requests from typing import Annotated, Any from joserfc import jwt from urllib.request import urlopen from fastapi import Depends, HTTPException from fastapi.security import OpenIdConnect from joserfc.jwk import KeySet from src.auth.config import auth_settings oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG) oidc_dependency = Annotated[str, Depends(oidc)] async def get_current_user(oidc_auth_string: oidc_dependency) -> dict[str, Any]: config_url = urlopen(auth_settings.OIDC_CONFIG) config = json.loads(config_url.read()) jwks_uri = config["jwks_uri"] key_response = requests.get(jwks_uri) jwk_keys = KeySet.import_key_set(key_response.json()) claims_options = { "exp": {"essential": True}, "aud": {"essential": True, "value": "account"}, "iss": {"essential": True, "value": auth_settings.OIDC_ISSUER}, } token = jwt.decode( oidc_auth_string.replace("Bearer ", ""), jwk_keys ) claims_requests = jwt.JWTClaimsRegistry(**claims_options) claims_requests.validate(token.claims) return token.claims claims_dependency = Annotated[dict[str, Any], Depends(get_current_user)] async def is_authed_user(claims: claims_dependency) -> bool: authed_users: list[str] = ["chris@sr2.uk"] user_email = claims.get("email", None) if not user_email or user_email not in authed_users: raise HTTPException(status_code=403, detail="Not authenticated") return claims.get("email") in authed_users authed_dependency = Annotated[bool, Depends(is_authed_user)]