63 lines
1.7 KiB
Python
63 lines
1.7 KiB
Python
"""
|
|
Module specific business logic for auth module
|
|
|
|
Exports:
|
|
- claims_dependency
|
|
- authed_dependency
|
|
"""
|
|
import json
|
|
|
|
from typing import Annotated
|
|
from authlib.jose import jwt
|
|
from urllib.request import urlopen
|
|
|
|
from fastapi import Depends, HTTPException
|
|
from fastapi.security import OpenIdConnect
|
|
from authlib.jose.rfc7517.jwk import JsonWebKey
|
|
from authlib.jose.rfc7517.key_set import KeySet
|
|
from authlib.oauth2.rfc7523.validator import JWTBearerToken
|
|
|
|
from src.auth.config import auth_settings
|
|
|
|
|
|
oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG)
|
|
oidc_dependency = Annotated[str, Depends(oidc)]
|
|
|
|
|
|
async def get_current_user(oidc_auth_string: oidc_dependency) -> JWTBearerToken:
|
|
config_url = urlopen(auth_settings.OIDC_CONFIG)
|
|
config = json.loads(config_url.read())
|
|
jwks_uri = config["jwks_uri"]
|
|
key_response = urlopen(jwks_uri)
|
|
jwk_keys: KeySet = JsonWebKey.import_key_set(json.loads(key_response.read()))
|
|
|
|
claims_options = {
|
|
"exp": {"essential": True},
|
|
"aud": {"essential": True, "value": "account"},
|
|
"iss": {"essential": True, "value": auth_settings.OIDC_ISSUER},
|
|
}
|
|
|
|
claims: JWTBearerToken = jwt.decode(
|
|
oidc_auth_string.replace("Bearer ", ""),
|
|
jwk_keys,
|
|
claims_options=claims_options,
|
|
claims_cls=JWTBearerToken,
|
|
)
|
|
|
|
claims.validate()
|
|
|
|
return claims
|
|
|
|
|
|
claims_dependency = Annotated[JWTBearerToken, Depends(get_current_user)]
|
|
|
|
|
|
async def is_authed_user(claims: claims_dependency) -> bool:
|
|
authed_users: list[str] = ["chris@sr2.uk"]
|
|
user_email = claims.get("email", None)
|
|
if not user_email or user_email not in authed_users:
|
|
raise HTTPException(status_code=403, detail="Not authenticated")
|
|
return claims.get("email") in authed_users
|
|
|
|
|
|
authed_dependency = Annotated[bool, Depends(is_authed_user)]
|