1
0
Fork 0
forked from sr2/cloud-api

minor: ruff formatter

All changes are either:
- Correcting tabs
- Adding/removing line breaks
- Adding trailing commas
This commit is contained in:
Chris Milne 2026-06-08 15:31:37 +01:00
parent b2e5dd2ebb
commit c689ac1e10
91 changed files with 1710 additions and 689 deletions

View file

@ -4,12 +4,14 @@ Configurations for the auth module
Exports:
- auth_settings: Contains OIDC information
"""
from src.config import CustomBaseSettings
class AuthConfig(CustomBaseSettings):
OIDC_CONFIG: str = ""
OIDC_ISSUER: str = ""
CLIENT_ID: str = ""
OIDC_CONFIG: str = ""
OIDC_ISSUER: str = ""
CLIENT_ID: str = ""
auth_settings = AuthConfig()

View file

@ -1,3 +1,3 @@
"""
Constants for the auth module
"""
"""

View file

@ -7,18 +7,24 @@ Exports:
- org_model_root_claim_body_dependency: org_model: verifies org exists and user is either root or su, gets org from body
- super_admin_dependency: user_model: verifies the user is a super admin
"""
from typing import Annotated
from fastapi import Depends
from src.user.dependencies import user_model_claims_dependency
from src.user.models import User
from src.organisation.dependencies import org_model_query_dependency, org_model_body_dependency
from src.organisation.dependencies import (
org_model_query_dependency,
org_model_body_dependency,
)
from src.organisation.models import Organisation as Org
from src.auth.exceptions import UnauthorizedException
async def org_query_user_claims(org_model: org_model_query_dependency, user_model: user_model_claims_dependency):
async def org_query_user_claims(
org_model: org_model_query_dependency, user_model: user_model_claims_dependency
):
if user_model in org_model.user_rel:
return True
@ -28,7 +34,11 @@ async def org_query_user_claims(org_model: org_model_query_dependency, user_mode
org_query_user_claims_dependency = Annotated[bool, Depends(org_query_user_claims)]
async def org_query_root_claims(user_model: user_model_claims_dependency, org_model: org_model_query_dependency, su_emails: su_list_dependency):
async def org_query_root_claims(
user_model: user_model_claims_dependency,
org_model: org_model_query_dependency,
su_emails: su_list_dependency,
):
if org_model.root_user_id == user_model.id:
return org_model
@ -41,10 +51,16 @@ async def org_query_root_claims(user_model: user_model_claims_dependency, org_mo
raise UnauthorizedException(message="Must be the org's root user")
org_model_root_claim_query_dependency = Annotated[type[Org], Depends(org_query_root_claims)]
org_model_root_claim_query_dependency = Annotated[
type[Org], Depends(org_query_root_claims)
]
async def org_body_root_claims(user_model: user_model_claims_dependency, org_model: org_model_body_dependency, su_emails: su_list_dependency):
async def org_body_root_claims(
user_model: user_model_claims_dependency,
org_model: org_model_body_dependency,
su_emails: su_list_dependency,
):
if org_model.root_user_id == user_model.id:
return org_model
@ -57,21 +73,29 @@ async def org_body_root_claims(user_model: user_model_claims_dependency, org_mod
raise UnauthorizedException(message="Must be the org's root user")
org_model_root_claim_body_dependency = Annotated[type[Org], Depends(org_body_root_claims)]
org_model_root_claim_body_dependency = Annotated[
type[Org], Depends(org_body_root_claims)
]
def get_super_admin_list():
return []
def empty_su_list():
return []
def testing_su_list():
return ["admin@test.com"]
su_list_dependency = Annotated[list[User], Depends(get_super_admin_list)]
async def user_model_super_admin(user_model: user_model_claims_dependency, super_admin_emails: su_list_dependency):
async def user_model_super_admin(
user_model: user_model_claims_dependency, super_admin_emails: su_list_dependency
):
if user_model.email in super_admin_emails:
return user_model

View file

@ -4,6 +4,7 @@ Module specific exceptions for the auth module
Exceptions:
- UnauthorizedException: Takes an optional message string
"""
from typing import Optional
from fastapi import HTTPException, status

View file

@ -1,3 +1,3 @@
"""
Database models for the auth module
"""
"""

View file

@ -4,8 +4,9 @@ Router endpoints for the auth module
Exports:
- router: fastapi.APIRouter
"""
from fastapi import APIRouter
router = APIRouter(
tags=["auth"],
)
)

View file

@ -1,3 +1,3 @@
"""
Pydantic models for the auth module
"""
"""

View file

@ -4,6 +4,7 @@ Module specific business logic for the auth module
Exports:
- claims_dependency: Dict[str, Any] containing OIDC claims and database ID
"""
import json
import requests
@ -25,11 +26,14 @@ from src.database import db_dependency
oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG)
oidc_dependency = Annotated[str, Depends(oidc)]
def get_dev_user():
return {"db_id": 1}
async def get_current_user(oidc_auth_string: oidc_dependency, db: db_dependency) -> dict[str, Any]:
async def get_current_user(
oidc_auth_string: oidc_dependency, db: db_dependency
) -> dict[str, Any]:
config_url = urlopen(auth_settings.OIDC_CONFIG)
config = json.loads(config_url.read())
jwks_uri = config["jwks_uri"]
@ -41,10 +45,7 @@ async def get_current_user(oidc_auth_string: oidc_dependency, db: db_dependency)
"iss": {"essential": True, "value": auth_settings.OIDC_ISSUER},
}
token = jwt.decode(
oidc_auth_string.replace("Bearer ", ""),
jwk_keys
)
token = jwt.decode(oidc_auth_string.replace("Bearer ", ""), jwk_keys)
claims_requests = jwt.JWTClaimsRegistry(**claims_options)

View file

@ -1,3 +1,3 @@
"""
Non-business logic reusable functions and classes for the auth module
"""
"""