2026-04-06 12:41:49 +01:00
|
|
|
"""
|
|
|
|
|
Module specific business logic for auth module
|
|
|
|
|
|
|
|
|
|
Exports:
|
|
|
|
|
- claims_dependency
|
|
|
|
|
"""
|
|
|
|
|
import json
|
2026-05-19 09:49:27 +01:00
|
|
|
import requests
|
2026-04-06 12:41:49 +01:00
|
|
|
|
2026-05-19 09:49:27 +01:00
|
|
|
from typing import Annotated, Any
|
|
|
|
|
from joserfc import jwt
|
2026-05-20 10:50:49 +01:00
|
|
|
from joserfc.errors import ExpiredTokenError
|
2026-05-19 09:49:27 +01:00
|
|
|
from joserfc.jwk import KeySet
|
2026-04-06 12:41:49 +01:00
|
|
|
from urllib.request import urlopen
|
|
|
|
|
|
2026-05-27 14:58:10 +01:00
|
|
|
from fastapi import Depends
|
2026-04-06 12:41:49 +01:00
|
|
|
from fastapi.security import OpenIdConnect
|
|
|
|
|
|
2026-05-27 14:58:10 +01:00
|
|
|
from src.auth.exceptions import UnauthorizedException
|
2026-04-06 12:41:49 +01:00
|
|
|
from src.auth.config import auth_settings
|
|
|
|
|
from src.user.service import add_user_to_db
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG)
|
|
|
|
|
oidc_dependency = Annotated[str, Depends(oidc)]
|
|
|
|
|
|
2026-05-26 11:42:49 +01:00
|
|
|
def get_dev_user():
|
|
|
|
|
return {"db_id": 1}
|
|
|
|
|
|
2026-04-06 12:41:49 +01:00
|
|
|
|
2026-05-19 09:49:27 +01:00
|
|
|
async def get_current_user(oidc_auth_string: oidc_dependency) -> dict[str, Any]:
|
2026-04-06 12:41:49 +01:00
|
|
|
config_url = urlopen(auth_settings.OIDC_CONFIG)
|
|
|
|
|
config = json.loads(config_url.read())
|
|
|
|
|
jwks_uri = config["jwks_uri"]
|
2026-05-19 09:49:27 +01:00
|
|
|
key_response = requests.get(jwks_uri)
|
|
|
|
|
jwk_keys = KeySet.import_key_set(key_response.json())
|
2026-04-06 12:41:49 +01:00
|
|
|
|
|
|
|
|
claims_options = {
|
|
|
|
|
"exp": {"essential": True},
|
|
|
|
|
"aud": {"essential": True, "value": "account"},
|
|
|
|
|
"iss": {"essential": True, "value": auth_settings.OIDC_ISSUER},
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-19 09:49:27 +01:00
|
|
|
token = jwt.decode(
|
2026-04-06 12:41:49 +01:00
|
|
|
oidc_auth_string.replace("Bearer ", ""),
|
2026-05-19 09:49:27 +01:00
|
|
|
jwk_keys
|
2026-04-06 12:41:49 +01:00
|
|
|
)
|
|
|
|
|
|
2026-05-19 09:49:27 +01:00
|
|
|
claims_requests = jwt.JWTClaimsRegistry(**claims_options)
|
2026-04-06 12:41:49 +01:00
|
|
|
|
2026-05-20 10:50:49 +01:00
|
|
|
try:
|
|
|
|
|
claims_requests.validate(token.claims)
|
2026-05-27 12:21:03 +01:00
|
|
|
except ExpiredTokenError:
|
2026-05-27 14:58:10 +01:00
|
|
|
raise UnauthorizedException(message="Token is expired")
|
2026-05-19 09:49:27 +01:00
|
|
|
db_id = await add_user_to_db(token.claims)
|
2026-04-06 12:41:49 +01:00
|
|
|
|
2026-05-19 09:49:27 +01:00
|
|
|
token.claims["db_id"] = db_id
|
2026-04-06 12:41:49 +01:00
|
|
|
|
2026-05-19 09:49:27 +01:00
|
|
|
return token.claims
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
claims_dependency = Annotated[dict[str, Any], Depends(get_current_user)]
|