misp/src/auth/service.py

62 lines
1.6 KiB
Python

"""
Module specific business logic for auth module
Exports:
- claims_dependency
- authed_dependency
"""
import json
import requests
from typing import Annotated, Any
from joserfc import jwt
from urllib.request import urlopen
from fastapi import Depends, HTTPException
from fastapi.security import OpenIdConnect
from joserfc.jwk import KeySet
from src.auth.config import auth_settings
oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG)
oidc_dependency = Annotated[str, Depends(oidc)]
async def get_current_user(oidc_auth_string: oidc_dependency) -> dict[str, Any]:
config_url = urlopen(auth_settings.OIDC_CONFIG)
config = json.loads(config_url.read())
jwks_uri = config["jwks_uri"]
key_response = requests.get(jwks_uri)
jwk_keys = KeySet.import_key_set(key_response.json())
claims_options = {
"exp": {"essential": True},
"aud": {"essential": True, "value": "account"},
"iss": {"essential": True, "value": auth_settings.OIDC_ISSUER},
}
token = jwt.decode(
oidc_auth_string.replace("Bearer ", ""),
jwk_keys
)
claims_requests = jwt.JWTClaimsRegistry(**claims_options)
claims_requests.validate(token.claims)
return token.claims
claims_dependency = Annotated[dict[str, Any], Depends(get_current_user)]
async def is_authed_user(claims: claims_dependency) -> bool:
authed_users: list[str] = auth_settings.AUTHORISED_USERS
user_email = claims.get("email", None)
if not user_email or user_email not in authed_users:
raise HTTPException(status_code=403, detail="Not authenticated")
return claims.get("email") in authed_users
authed_dependency = Annotated[bool, Depends(is_authed_user)]