Compare commits

..

No commits in common. "b3689c8af6855e51eed6e805c7531c20dad86bc5" and "83a24a91f4099a7f7879065bd501cbabf5e5913f" have entirely different histories.

37 changed files with 559 additions and 859 deletions

View file

@ -10,8 +10,6 @@ from src.config import SQLALCHEMY_DATABASE_URI
from src.contact.models import Contact from src.contact.models import Contact
from src.organisation.models import Organisation, OrgUsers from src.organisation.models import Organisation, OrgUsers
from src.user.models import User from src.user.models import User
from src.service.models import Service
from src.iam.models import Permission, Group, GroupPermissions, UserGroups
from src.database import Base from src.database import Base
# this is the Alembic Config object, which provides # this is the Alembic Config object, which provides

View file

@ -1,83 +0,0 @@
"""Init IAM
Revision ID: a147965e644e
Revises: 8fe51426321d
Create Date: 2026-05-22 15:59:36.469374
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'a147965e644e'
down_revision: Union[str, Sequence[str], None] = '8fe51426321d'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('service',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(), nullable=True),
sa.Column('api_key', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('api_key'),
sa.UniqueConstraint('name')
)
op.create_table('permission',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('resource', sa.String(), nullable=False),
sa.Column('action', sa.String(), nullable=False),
sa.Column('service_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['service_id'], ['service.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_table('group',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(), nullable=False),
sa.Column('org_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['org_id'], ['organisation.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name')
)
op.create_table('group_permissions',
sa.Column('group_id', sa.Integer(), nullable=False),
sa.Column('permission_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['group_id'], ['group.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['permission_id'], ['permission.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('group_id', 'permission_id')
)
op.create_table('user_groups',
sa.Column('org_id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('group_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['group_id'], ['group.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['org_id'], ['organisation.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('org_id', 'user_id', 'group_id')
)
op.add_column('organisation', sa.Column('root_user_id', sa.Integer(), nullable=True))
op.create_unique_constraint("organisation_name_key", 'organisation', ['name'])
op.create_foreign_key("organisation_root_user_fkey", 'organisation', 'user', ['root_user_id'], ['id'])
op.drop_column('orgusers', 'is_admin')
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('orgusers', sa.Column('is_admin', sa.BOOLEAN(), server_default=sa.text('false'), autoincrement=False, nullable=False))
op.drop_constraint("organisation_root_user_fkey", 'organisation', type_='foreignkey')
op.drop_constraint("organisation_name_key", 'organisation', type_='unique')
op.drop_column('organisation', 'root_user_id')
op.drop_table('user_groups')
op.drop_table('group_permissions')
op.drop_table('group')
op.drop_table('permission')
op.drop_table('service')
# ### end Alembic commands ###

View file

@ -1,34 +0,0 @@
"""Contact model changes
Revision ID: 8132c4b88665
Revises: a147965e644e
Create Date: 2026-05-25 13:09:22.635058
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '8132c4b88665'
down_revision: Union[str, Sequence[str], None] = 'a147965e644e'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('contact', sa.Column('org_id', sa.Integer(), nullable=False))
op.create_foreign_key(None, 'contact', 'organisation', ['org_id'], ['id'], ondelete='CASCADE')
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(None, 'contact', type_='foreignkey')
op.drop_column('contact', 'org_id')
# ### end Alembic commands ###

View file

@ -5,9 +5,43 @@ Endpoints:
- List: Description - List: Description
- Endpoints: Description - Endpoints: Description
""" """
from fastapi import APIRouter from typing import Annotated
from fastapi import APIRouter, HTTPException
from fastapi.params import Path
from src.organisation.constants import ContactType
from src.organisation.schemas import OrgContactGetResponse
from src.organisation.models import Organisation as Org
from src.contact.models import Contact
from src.auth.service import claims_dependency, org_or_super_admin_dependency
from src.database import db_dependency
router = APIRouter( router = APIRouter(
tags=["admin"], tags=["admin"],
prefix="/admin", prefix="/admin",
) )
@router.get("/{org_id}/contact/{contact_type}", response_model=OrgContactGetResponse)
async def get_contact(db: db_dependency, user: claims_dependency, is_admin: org_or_super_admin_dependency, contact_type: ContactType, org_id: Annotated[int, Path(gt=0)]):
org_model = db.query(Org).filter(Org.id == org_id).first()
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
match contact_type:
case "billing":
contact_id = org_model.billing_contact_id
case "security":
contact_id = org_model.security_contact_id
case "owner":
contact_id = org_model.owner_contact_id
case _:
raise HTTPException(status_code=422, detail="Invalid contact type")
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
return contact_model

View file

@ -8,8 +8,6 @@ from src.contact.router import router as contact_router
from src.organisation.router import router as organisation_router from src.organisation.router import router as organisation_router
from src.user.router import router as user_router from src.user.router import router as user_router
from src.admin.router import router as admin_router from src.admin.router import router as admin_router
from src.iam.router import router as iam_router
from src.service.router import router as service_router
api_router = APIRouter() api_router = APIRouter()
@ -19,8 +17,6 @@ api_router.include_router(contact_router)
api_router.include_router(organisation_router) api_router.include_router(organisation_router)
api_router.include_router(user_router) api_router.include_router(user_router)
api_router.include_router(admin_router) api_router.include_router(admin_router)
api_router.include_router(service_router)
api_router.include_router(iam_router)
@api_router.get("/healthcheck", include_in_schema=False) @api_router.get("/healthcheck", include_in_schema=False)

View file

@ -9,3 +9,53 @@ from fastapi import APIRouter
router = APIRouter( router = APIRouter(
tags=["auth"], tags=["auth"],
) )
# oauth = OAuth()
# oauth.register(
# name="oidc",
# server_metadata_url=auth_settings.OIDC_CONFIG,
# client_id=auth_settings.CLIENT_ID,
# client_secret=None,
# code_challenge_method="S256",
# client_kwargs={
# "code_challenge_method": "S256",
# "scope": "openid profile email",
# }
# )
# @auth_router.get('/login')
# async def login(request: Request):
# redirect_uri = request.url_for('auth')
# return await oauth.oidc.authorize_redirect(request, redirect_uri, code_challenge_method="S256")
#
#
# @auth_router.get('/auth', include_in_schema=False)
# async def auth(db: db_dependency, request: Request):
# token = await oauth.oidc.authorize_access_token(request)
# user = token.get("userinfo")
# request.session["user"] = user
#
# try:
# valid_user = OIDCUser(first_name=user["given_name"], last_name=user["family_name"], email=user["email"], oidc_id=user["sub"])
# except Exception as e:
# print(e)
# raise HTTPException(status_code=422, detail="Invalid or missing OIDC data")
#
# user_exists = db.query(exists().where(User.oidc_id == valid_user.oidc_id)).scalar()
#
# if not user_exists:
# user_model = User(**valid_user.model_dump())
# db.add(user_model)
# db.commit()
#
# return RedirectResponse(url="/")
#
#
# @auth_router.get('/logout')
# async def logout(request: Request):
# request.session.pop('user', None)
# return RedirectResponse(url='/')

View file

@ -20,9 +20,7 @@ from sqlalchemy.sql import exists
from src.auth.config import auth_settings from src.auth.config import auth_settings
from src.user.service import add_user_to_db from src.user.service import add_user_to_db
from src.organisation.models import OrgUsers, Organisation as Org from src.organisation.models import OrgUsers, Organisation as Org
from src.user.models import User
from src.database import db_dependency from src.database import db_dependency
from src.organisation.dependencies import org_model_dependency
oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG) oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG)
@ -90,18 +88,31 @@ async def is_org_user(claims: claims_dependency, db: db_dependency, org_id: int
org_user_dependency = Annotated[dict[str, Any], Depends(is_org_user)] org_user_dependency = Annotated[dict[str, Any], Depends(is_org_user)]
async def is_org_root(claims: claims_dependency, db: db_dependency, org_model: org_model_dependency, org_id: int = Path(gt=0)): async def is_org_admin(claims: claims_dependency, db: db_dependency, org_id: int = Path(gt=0)):
org_exists = db.query(exists().where(Org.id == org_id)).scalar()
if not org_exists:
raise HTTPException(status_code=404, detail="Organisation not found")
db_id = claims.get("db_id", None) db_id = claims.get("db_id", None)
if db_id is None: if db_id is None:
raise HTTPException(status_code=404, detail="User not found in db") raise HTTPException(status_code=404, detail="User not found in db")
if org_model.root_user_id == db_id: exists_query = (db.query(OrgUsers)
return db.query(User).filter(User.id == db_id).first() .filter(OrgUsers.org_id == org_id,
OrgUsers.user_id == db_id,
OrgUsers.is_admin == True
).exists()
)
raise HTTPException(status_code=401, detail="Not authorised") org_admin_exists = db.query(exists_query).scalar()
if not org_admin_exists:
raise HTTPException(status_code=401, detail="Not authorised")
return org_admin_exists
root_user_dependency = Annotated[dict[str, Any], Depends(is_org_root)] org_admin_dependency = Annotated[dict[str, Any], Depends(is_org_admin)]
async def is_super_admin(claims: claims_dependency): async def is_super_admin(claims: claims_dependency):
@ -117,3 +128,114 @@ async def is_super_admin(claims: claims_dependency):
super_admin_dependency = Annotated[dict[str, Any], Depends(is_super_admin)] super_admin_dependency = Annotated[dict[str, Any], Depends(is_super_admin)]
async def is_admin(claims: claims_dependency, db: db_dependency, org_id: int = Path(gt=0)):
try:
await is_super_admin(claims)
return True
except HTTPException as e:
pass
try:
await is_org_admin(claims, db, org_id)
return True
except HTTPException as e:
raise HTTPException(status_code=401, detail="Not authorised")
org_or_super_admin_dependency = Annotated[dict[str, Any], Depends(is_admin)]
# Middleware version of user auth
# import json
# import logging
#
# from threading import Timer
# from urllib.request import urlopen
# from starlette.requests import HTTPConnection, Request
#
# from authlib.jose.rfc7517.jwk import JsonWebKey
# from authlib.jose.rfc7517.key_set import KeySet
# from authlib.oauth2 import OAuth2Error, ResourceProtector
# from authlib.oauth2.rfc6749 import MissingAuthorizationError
# from authlib.oauth2.rfc7523 import JWTBearerTokenValidator
# from authlib.oauth2.rfc7523.validator import JWTBearerToken
#
# from starlette.authentication import (
# AuthCredentials,
# AuthenticationBackend,
# AuthenticationError,
# SimpleUser,
# )
#
# logger = logging.getLogger(__name__)
#
#
# class RepeatTimer(Timer):
# def __init__(self, *args, **kwargs) -> None:
# super().__init__(*args, **kwargs)
# self.daemon = True
#
# def run(self):
# while not self.finished.wait(self.interval):
# self.function(*self.args, **self.kwargs)
#
#
# class BearerTokenValidator(JWTBearerTokenValidator):
# def __init__(self, issuer: str, audience: str):
# self._issuer = issuer
# self._jwks_uri: str | None = None
# super().__init__(public_key=self.fetch_key(), issuer=issuer)
# self.claims_options = {
# "exp": {"essential": True},
# "aud": {"essential": True, "value": audience},
# "iss": {"essential": True, "value": issuer},
# }
# self._timer = RepeatTimer(3600, self.refresh)
# self._timer.start()
#
# def refresh(self):
# try:
# self.public_key = self.fetch_key()
# except Exception as exc:
# logger.warning(f"Could not update jwks public key: {exc}")
#
# def fetch_key(self) -> KeySet:
# """Fetch the jwks_uri document and return the KeySet."""
# response = urlopen(self.jwks_uri)
# logger.debug(f"OK GET {self.jwks_uri}")
# return JsonWebKey.import_key_set(json.loads(response.read()))
#
# @property
# def jwks_uri(self) -> str:
# """The jwks_uri field of the openid-configuration document."""
# if self._jwks_uri is None:
# config_url = urlopen(f"{self._issuer}/.well-known/openid-configuration")
# config = json.loads(config_url.read())
# self._jwks_uri = config["jwks_uri"]
# return self._jwks_uri
#
#
# class BearerTokenAuthBackend(AuthenticationBackend):
# def __init__(self, issuer: str, audience: str) -> None:
# rp = ResourceProtector()
# validator = BearerTokenValidator(
# issuer=issuer,
# audience=audience,
# )
# rp.register_token_validator(validator)
# self.resource_protector = rp
#
# async def authenticate(self, conn: HTTPConnection):
# if "Authorization" not in conn.headers:
# return
# request = Request(conn.scope)
# try:
# token: JWTBearerToken = self.resource_protector.validate_request(
# scopes=["openid"],
# request=request,
# )
# except (MissingAuthorizationError, OAuth2Error) as error:
# raise AuthenticationError(error.description) from error
# scope: str = token.get_scope()
# scopes = scope.split()
# scopes.append("authenticated")
# return AuthCredentials(scopes=scopes), SimpleUser(username=token["email"])

View file

@ -5,7 +5,7 @@ Models:
- Contact: id[pk], email, first_name, last_name, phonenumber, vat_number - Contact: id[pk], email, first_name, last_name, phonenumber, vat_number
street_address, post_office_box_number, address_locality, country_code, address_region, postal_code street_address, post_office_box_number, address_locality, country_code, address_region, postal_code
""" """
from sqlalchemy import Column, Integer, String, ForeignKey from sqlalchemy import Column, Integer, String
from src.database import Base from src.database import Base
@ -27,5 +27,3 @@ class Contact(Base):
country_code = Column(String) # Eg GB country_code = Column(String) # Eg GB
address_region = Column(String, default=None, nullable=True) address_region = Column(String, default=None, nullable=True)
postal_code = Column(String) postal_code = Column(String)
org_id = Column(Integer, ForeignKey("organisation.id", ondelete="CASCADE"), nullable=False)

View file

@ -9,10 +9,110 @@ Endpoints:
- [patch]/{contact_id} - Updates the details of an existing contact - [patch]/{contact_id} - Updates the details of an existing contact
- [delete]/{contact_id} - Deletes a contact by ID - [delete]/{contact_id} - Deletes a contact by ID
""" """
from fastapi import APIRouter from typing import Annotated
from fastapi import APIRouter, HTTPException
from fastapi.params import Path
from sqlalchemy import or_
from src.contact.schemas import ContactContactGetResponse, ContactAddressGetResponse, ContactContactPostRequest, \
ContactUpdateRequest, ContactOrgGetResponse
from src.contact.models import Contact
from src.database import db_dependency
from src.organisation.models import Organisation as Org
from src.organisation.constants import ContactType
router = APIRouter( router = APIRouter(
prefix="/contact", prefix="/contact",
tags=["contact"], tags=["contact"],
) )
@router.get("/{contact_id}", response_model=ContactContactGetResponse)
async def get_contact_details_by_id(contact_id: int, db: db_dependency):
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
return contact_model
@router.get("/{contact_id}/address", response_model=ContactAddressGetResponse)
async def get_contact_address_by_id(contact_id: int, db: db_dependency):
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
return contact_model
@router.post("/")
async def create_contact(db: db_dependency, contact_request: ContactContactPostRequest):
contact_model = Contact(**contact_request.model_dump())
db.add(contact_model)
db.commit()
@router.patch("/{contact_id}")
async def update_contact(db: db_dependency, contact_request: ContactUpdateRequest, contact_id: Annotated[int, Path(gt=0)]):
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
update_data = contact_request.model_dump(exclude_none=True)
for key, value in update_data.items():
if hasattr(contact_model, key):
setattr(contact_model, key, value)
else:
raise HTTPException(status_code=422, detail="Invalid keys in update request")
db.add(contact_model)
db.commit()
@router.delete("/{contact_id}")
async def delete_contact(db: db_dependency, contact_id: Annotated[int, Path(gt=0)]):
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
db.delete(contact_model)
db.commit()
@router.get("/{contact_id}/orgs", response_model=list[ContactOrgGetResponse])
async def get_contact_orgs(db: db_dependency, contact_id: Annotated[int, Path(gt=0)]):
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
org_models = (db.query(Org).filter(
or_(
Org.owner_contact_id == contact_id,
Org.billing_contact_id == contact_id,
Org.security_contact_id == contact_id
)
).all())
response = []
for org in org_models:
types=[]
if org.owner_contact_id == contact_id:
types.append(ContactType.OWNER)
if org.billing_contact_id == contact_id:
types.append(ContactType.BILLING)
if org.security_contact_id == contact_id:
types.append(ContactType.SECURITY)
org_response_model = ContactOrgGetResponse(
name=str(org.name),
contact_types=types,
)
response.append(org_response_model)
return response

View file

@ -7,24 +7,12 @@ Models:
""" """
from typing import Optional from typing import Optional
from pydantic import EmailStr, ConfigDict from pydantic import EmailStr
from src.organisation.constants import ContactType from src.organisation.constants import ContactType
from src.schemas import CustomBaseModel from src.schemas import CustomBaseModel
class ContactAddress(CustomBaseModel):
model_config = ConfigDict(from_attributes=True, extra="ignore")
post_office_box_number: Optional[str] = None
street_address: Optional[str] = None
street_address_line_2: Optional[str] = None
locality: Optional[str] = None
address_region: Optional[str] = None
country_code: Optional[str] = None
postal_code: Optional[str] = None
class ContactContactGetResponse(CustomBaseModel): class ContactContactGetResponse(CustomBaseModel):
email: str email: str
first_name: str first_name: str

View file

@ -17,16 +17,13 @@ engine = create_engine(SQLALCHEMY_DATABASE_URI.get_secret_value())
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db(): def get_db():
db = SessionLocal() with SessionLocal.begin() as db:
try: try:
yield db yield db
except: finally:
db.rollback() db.rollback() # Anything not explicitly commited is rolled back
raise db.close()
finally:
db.close()
db_dependency = Annotated[Session, Depends(get_db)] db_dependency = Annotated[Session, Depends(get_db)]

View file

@ -1,7 +0,0 @@
"""
Configurations for <this module>
Configurations:
- List: Description
- Configs: Description
"""

View file

@ -1,7 +0,0 @@
"""
Constants and error codes for <this module>
Constants:
- List: Description
- Consts: Description
"""

View file

@ -1,11 +0,0 @@
"""
Router dependencies for <this module>
Classes:
- List: Description
- Classes: Description
Functions:
- List: Description
- Functions: Description
"""

View file

@ -1,7 +0,0 @@
"""
Module specific exceptions for <this module>
Exceptions:
- List: Description
- Exceptions: Description
"""

View file

@ -1,52 +0,0 @@
"""
Database models for the IAM module
Models:
- List: Description
- Models: Description
"""
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint
from sqlalchemy.orm import relationship
from src.database import Base
class Permission(Base):
__tablename__ = "permission"
id = Column(Integer, primary_key=True)
resource = Column(String, nullable=False)
action = Column(String, nullable=False)
service_id = Column(Integer, ForeignKey("service.id", ondelete="CASCADE"))
UniqueConstraint("service_id", "resource", "action", name="uniq_permission_resource_and_action")
class Group(Base):
__tablename__ = "group"
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False, unique=True)
org_id = Column(Integer, ForeignKey("organisation.id", ondelete="CASCADE"))
user_rel = relationship(
"User",
secondary="user_groups",
back_populates="group_rel"
)
org_rel = relationship("Organisation", back_populates="group_rel")
class GroupPermissions(Base):
__tablename__ = "group_permissions"
group_id = Column(Integer, ForeignKey("group.id", ondelete="CASCADE"), primary_key=True)
permission_id = Column(Integer, ForeignKey("permission.id", ondelete="CASCADE"), primary_key=True)
class UserGroups(Base):
__tablename__ = "user_groups"
org_id = Column(Integer, ForeignKey("organisation.id", ondelete="CASCADE"), primary_key=True)
user_id = Column(Integer, ForeignKey("user.id", ondelete="CASCADE"), primary_key=True)
group_id = Column(Integer, ForeignKey("group.id", ondelete="CASCADE"), primary_key=True)

View file

@ -1,192 +0,0 @@
"""
Router endpoints for <this module>
Endpoints:
- List: Description
- Endpoints: Description
"""
from typing import Annotated, Optional
from fastapi import APIRouter, Query, HTTPException
from src.database import db_dependency
from src.schemas import ResourceName
from src.auth.service import claims_dependency
from src.user.models import User
from src.organisation.models import Organisation as Org
from src.service.models import Service
from src.organisation.dependencies import org_model_dependency
from src.iam.service import service_key_dependency
from src.iam.models import Permission as Perm, GroupPermissions as GPerms, Group, UserGroups
router = APIRouter(
tags=["IAM"],
prefix="/iam",
)
@router.post("/can_act_on_resource")
async def can_act_on_resource(valid_key: service_key_dependency, db: db_dependency, user_claims: claims_dependency,
rn: ResourceName, action: str) -> bool:
try:
user_id = user_claims["db_id"]
rn_org = rn.organisation
rn_service = rn.service
rn_resource = rn.resource
result = (db.query(Perm)
.join(Service, Service.id == Perm.service_id)
.join(GPerms, GPerms.permission_id == Perm.id)
.join(Group, Group.id == GPerms.group_id)
.join(Org, Org.id == Group.org_id)
.join(UserGroups, UserGroups.group_id == Group.id)
.join(User, User.id == UserGroups.user_id)
.filter(User.id == user_id)
.filter(Org.name == rn_org)
.filter(Service.name == rn_service)
.filter(Perm.resource == rn_resource)
.filter(Perm.action == action)
).first()
if result:
return True
else:
return False
except Exception as e:
print(e)
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/group/permissions")
async def get_group_permissions(db: db_dependency, group_id: Annotated[int, Query(gt=0)]):
# TODO: iam_admin_dependency
group_perms = db.query(Perm).join(GPerms).filter(GPerms.group_id==group_id).all()
# TODO: Response model
return group_perms
@router.get("/group/users")
async def get_group_users(db: db_dependency, group_id: Annotated[int, Query(gt=0)]):
# TODO: iam_admin_dependency
group_users = db.query(User).join(UserGroups).filter(UserGroups.group_id == group_id).all()
# TODO: Response model
return group_users
@router.post("/group")
async def create_group(db: db_dependency, group_name: str, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
group_model = Group(name=group_name, org_id=org_id)
db.add(group_model)
db.commit()
# TODO: Response model
@router.put("/group/permissions")
async def add_group_permissions(db: db_dependency, group_id: int, permission_id: int, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
g_perm_model = GPerms(group_id=group_id, permission_id=permission_id)
db.add(g_perm_model)
db.commit()
# TODO: Response model
@router.put("/group/users")
async def add_group_users(db: db_dependency, group_id: int, user_ids: list[int], org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
for user_id in user_ids:
user_group_model = UserGroups(group_id=group_id, user_id=user_id, org_id=org_id)
db.add(user_group_model)
db.commit()
# TODO: Response model
@router.delete("/group/permissions")
async def remove_group_permissions(db: db_dependency, group_id: int, org_model: org_model_dependency, org_id: int, permission_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
g_perm_model = db.query(GPerms).filter(GPerms.group_id == group_id, GPerms.permission_id == permission_id).first()
if g_perm_model is None:
return
db.delete(g_perm_model)
db.commit()
return
# TODO: Response model
@router.delete("/group/user")
async def remove_group_user(db: db_dependency, group_id: int, user_id: int, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
user_group_model = db.query(UserGroups).filter(UserGroups.group_id == group_id, UserGroups.user_id == user_id).first()
if user_group_model is None:
return
db.delete(user_group_model)
db.commit()
return
# TODO: Response model
@router.get("/permissions")
async def get_permissions(db: db_dependency, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: request model
permission_models = db.query(Perm).all()
# TODO: Response model
return permission_models
@router.post("/permission")
async def create_new_permission(db: db_dependency, service_id: int, resource: str, action: str):
# TODO: super_admin_dependency
perm_model = Perm(service_id=service_id, resource=resource, action=action)
db.add(perm_model)
db.commit()
@router.delete("/permission")
async def delete_permission(db: db_dependency, service_id: int, resource: str, action: str, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
perm_model = db.query(Perm).filter(Perm.service_id==service_id, Perm.resource==resource, Perm.action==action).first()
if perm_model is None:
return
db.delete(perm_model)
db.commit()
return
# TODO: Response model
@router.get("/permissions/search")
async def get_permissions(db: db_dependency, org_model: org_model_dependency, org_id: int, service_id: Optional[int] = None, resource: Optional[str] = None, action: Optional[str] = None):
# TODO: iam_admin_dependency
# TODO: request model
permission_query = db.query(Perm)
if service_id is not None:
permission_query = permission_query.filter(Perm.service_id == service_id)
if resource is not None:
permission_query = permission_query.filter(Perm.resource == resource)
if action is not None:
permission_query = permission_query.filter(Perm.action == action)
permission_models = permission_query.all()
# TODO: Response model
return permission_models

View file

@ -1,7 +0,0 @@
"""
Pydantic models for <this module>
Models:
- List: Description
- Models: Description
"""

View file

@ -1,26 +0,0 @@
"""
Module specific business logic for <this module>
Exports service_key_dependency
"""
from typing import Annotated
from src.service.models import Service
from src.database import db_dependency
from src.schemas import ResourceName
from fastapi import HTTPException, status, Request, Depends
def valid_service_key(db: db_dependency, request: Request, rn: ResourceName) -> bool:
api_key = request.headers.get("X-API-Key", None)
if not api_key:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
service = rn.service
result = db.query(Service).filter(Service.name == service).filter(Service.api_key == api_key).first()
if result is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return True
service_key_dependency = Annotated[bool, Depends(valid_service_key)]

View file

@ -1,11 +0,0 @@
"""
Non-business logic reusable functions and classes for <this module>
Classes:
- List: Description
- Classes: Description
Functions:
- List: Description
- Functions: Description
"""

View file

@ -9,20 +9,3 @@ Functions:
- List: Description - List: Description
- Functions: Description - Functions: Description
""" """
from typing import Annotated
from fastapi import HTTPException, Depends
from src.database import db_dependency
from src.organisation.models import Organisation as Org
def get_org_model(db: db_dependency, org_id: int) -> type[Org]:
org_model = db.query(Org).filter(Org.id == org_id).first()
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
return org_model
org_model_dependency = Annotated[type[Org], Depends(get_org_model)]

View file

@ -6,8 +6,7 @@ Models:
billing_contact_id[fk], security_contact_id[fk], owner_contact_id[fk] billing_contact_id[fk], security_contact_id[fk], owner_contact_id[fk]
- OrgUsers: org_id[fk][cpk], user_id[fk][cpk], is_admin - OrgUsers: org_id[fk][cpk], user_id[fk][cpk], is_admin
""" """
from sqlalchemy import Column, Integer, String, ForeignKey, JSON from sqlalchemy import Column, Integer, String, Boolean, ForeignKey, JSON, false
from sqlalchemy.orm import relationship
from src.database import Base from src.database import Base
@ -16,36 +15,18 @@ class Organisation(Base):
__tablename__ = "organisation" __tablename__ = "organisation"
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
name = Column(String, unique=True) name = Column(String)
status = Column(String, default="partial") status = Column(String, default="partial")
intake_questionnaire = Column(JSON) intake_questionnaire = Column(JSON)
root_user_id = Column(Integer, ForeignKey("user.id"))
billing_contact_id = Column(Integer, ForeignKey("contact.id")) billing_contact_id = Column(Integer, ForeignKey("contact.id"))
security_contact_id = Column(Integer, ForeignKey("contact.id")) security_contact_id = Column(Integer, ForeignKey("contact.id"))
owner_contact_id = Column(Integer, ForeignKey("contact.id")) owner_contact_id = Column(Integer, ForeignKey("contact.id"))
user_rel = relationship(
"User",
secondary="orgusers",
back_populates="organisation_rel"
)
group_rel = relationship("Group", back_populates="org_rel")
root_user_rel = relationship("User", foreign_keys=[root_user_id])
@property
def root_user_email(self):
return self.root_user_rel.email if self.root_user_rel else None
billing_contact_rel = relationship("Contact", foreign_keys=[billing_contact_id])
security_contact_rel = relationship("Contact", foreign_keys=[security_contact_id])
owner_contact_rel = relationship("Contact", foreign_keys=[owner_contact_id])
class OrgUsers(Base): class OrgUsers(Base):
__tablename__ = "orgusers" __tablename__ = "orgusers"
org_id = Column(Integer, ForeignKey("organisation.id", ondelete="CASCADE"), primary_key=True) org_id = Column(Integer, ForeignKey("organisation.id", ondelete="CASCADE"), primary_key=True)
user_id = Column(Integer, ForeignKey("user.id", ondelete="CASCADE"), primary_key=True) user_id = Column(Integer, ForeignKey("user.id", ondelete="CASCADE"), primary_key=True)
is_admin = Column(Boolean, nullable=False, server_default=false())

View file

@ -8,30 +8,27 @@ Endpoints:
- [patch]/{org_id}/status - Updates the status of an organisation - [patch]/{org_id}/status - Updates the status of an organisation
- [patch]/{org_id}/contact - Assigns a contact to an organisation (as billing, security, or owner) - [patch]/{org_id}/contact - Assigns a contact to an organisation (as billing, security, or owner)
- [get]/{org_id}/users - Retrieves all users associated with an organisation - [get]/{org_id}/users - Retrieves all users associated with an organisation
- [get]/{org_id}/users/admins - Retrieves only the admin users of an organisation
- [post]/{org_id}/users - Adds a new user to an organisation - [post]/{org_id}/users - Adds a new user to an organisation
- [patch]/{org_id}/users - Updates details of an existing organisation user (e.g., admin status)
- [delete]/{org_id} - Deletes an organisation by ID - [delete]/{org_id} - Deletes an organisation by ID
- [get]/{org_id}/contact/{contact_type} - Retrieves the contact of a specific type (owner, billing, security) for an organisation - [get]/{org_id}/contact/{contact_type} - Retrieves the contact of a specific type (owner, billing, security) for an organisation
""" """
from typing import Annotated, Optional from typing import Annotated
from fastapi import APIRouter, HTTPException, status from fastapi import APIRouter, HTTPException
from fastapi.params import Path, Query from fastapi.params import Path
from sqlalchemy.sql import exists
from src.contact.schemas import ContactAddress
from src.database import db_dependency from src.database import db_dependency
from src.contact.models import Contact from src.contact.models import Contact
from src.user.models import User
from src.user.exceptions import UserNotFoundException
from src.auth.service import root_user_dependency, claims_dependency
from src.organisation.dependencies import org_model_dependency
from src.organisation.constants import ContactType from src.organisation.constants import ContactType
from src.organisation.models import Organisation as Org from src.organisation.models import Organisation as Org, OrgUsers
from src.organisation.schemas import OrgOrgPostRequest, OrgQuestionnairePatchRequest, OrgStatusPatchRequest, \ from src.organisation.schemas import OrgOrgPostRequest, OrgQuestionnairePatchRequest, OrgStatusPatchRequest, \
OrgContactPatchRequest, \ OrgContactPatchRequest, \
OrgUserPostRequest, OrgUserGetResponse, OrgContactGetResponse, OrgOrgGetResponse, OrgRootPatchRequest, \ OrgUserPostRequest, OrgUserGetResponse, OrgContactGetResponse, OrgOrgGetResponse
OrgGroupGetResponse, OrgUserDeleteRequest
router = APIRouter( router = APIRouter(
prefix="/org", prefix="/org",
@ -40,165 +37,167 @@ router = APIRouter(
@router.get("/id/{org_id}", response_model=OrgOrgGetResponse) @router.get("/id/{org_id}", response_model=OrgOrgGetResponse)
async def get_org_by_id(org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]): async def get_org_by_id(db: db_dependency, org_id: Annotated[int, Path(gt=0)]):
org_model = (db.query(Org).filter(Org.id == org_id).first())
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
response = { response = {
"name": org_model.name, "name": org_model.name,
"status": org_model.status, "status": org_model.status,
"owner_contact": org_model.owner_contact_rel.email, "owner_contact": (db.query(Contact).filter(Contact.id == org_model.owner_contact_id).first()),
"billing_contact": org_model.billing_contact_rel.email, "billing_contact": (db.query(Contact).filter(Contact.id == org_model.billing_contact_id).first()),
"security_contact": org_model.security_contact_rel.email, "security_contact": (db.query(Contact).filter(Contact.id == org_model.security_contact_id).first()),
"root_user": org_model.root_user_email
} }
return response return response
@router.post("/") @router.post("/")
async def create_org(db: db_dependency, user: claims_dependency, org_request: OrgOrgPostRequest): async def create_org(db: db_dependency, org_request: OrgOrgPostRequest):
db_id: Optional[int] = user.get("db_id", None) org_model = Org(**org_request.model_dump())
if db_id is None:
raise UserNotFoundException()
org_model = Org(name=org_request.name, intake_questionnaire=org_request.intake_questionnaire.model_dump())
org_model.status = "partial" # Status is always set to partial at first, see update_questionnaire() doc org_model.status = "partial" # Status is always set to partial at first, see update_questionnaire() doc
db.add(org_model) db.add(org_model)
db.flush()
# Adds currently logged-in user to org users list and sets them as root_user
user_model = db.get(User, db_id)
org_model.user_rel.append(user_model)
org_model.root_user_rel = user_model
for contact_type in ["billing_contact_id", "security_contact_id", "owner_contact_id"]:
contact_model = Contact(org_id=org_model.id)
db.add(contact_model)
db.flush()
org_model.__setattr__(contact_type, contact_model.id)
db.commit() db.commit()
@router.patch("/{org_id}/questionnaire") @router.patch("/{org_id}/questionnaire")
async def update_questionnaire(db: db_dependency, org_model: org_model_dependency, q_request: OrgQuestionnairePatchRequest, org_id: Annotated[int, Path(gt=0)]): async def update_questionnaire(db: db_dependency, q_request: OrgQuestionnairePatchRequest, org_id: Annotated[int, Path(gt=0)]):
""" """
Route for updating questionnaire. Route for updating questionnaire.
The partial bool allows for submission of partially completed questionnaire and/or The partial bool allows for submission of partially completed questionnaire and/or
final "are you sure" check before setting the org to be in "submitted" status, awaiting admin approval. final "are you sure" check before setting the org to be in "submitted" status, awaiting admin approval.
""" """
org_model.intake_questionnaire = q_request.intake_questionnaire.model_dump() org_model = db.query(Org).filter(Org.id == org_id).first()
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
org_model.intake_questionnaire = q_request.intake_questionnaire
# Allows for partially completed questionnaires to be saved without being submitted for review # Allows for partially completed questionnaires to be saved without being submitted for review
if not q_request.partial: if not q_request.partial:
org_model.status = "submitted" org_model.status = "submitted"
db.add(org_model)
db.commit() db.commit()
@router.patch("/{org_id}/status") @router.patch("/{org_id}/status")
async def update_status(db: db_dependency, org_model: org_model_dependency, status_request: OrgStatusPatchRequest, org_id: Annotated[int, Path(gt=0)]): async def update_status(db: db_dependency, status_request: OrgStatusPatchRequest, org_id: Annotated[int, Path(gt=0)]):
org_model = db.query(Org).filter(Org.id == org_id).first()
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
org_model.status = status_request.status org_model.status = status_request.status
db.add(org_model)
db.commit() db.commit()
@router.get("/{org_id}/users", response_model=OrgUserGetResponse) @router.patch("/{org_id}/contact")
async def get_users(org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]): async def update_contact(db: db_dependency, contact_request: OrgContactPatchRequest, org_id: Annotated[int, Path(gt=0)]):
return {"users": [user.email for user in org_model.user_rel]} org_model = db.query(Org).filter(Org.id == org_id).first()
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
match contact_request.contact_type:
case "billing":
org_model.billing_contact_id = contact_request.contact_id
case "security":
org_model.security_contact_id = contact_request.contact_id
case "owner":
org_model.owner_contact_id = contact_request.contact_id
case _:
raise HTTPException(status_code=422, detail="Invalid contact type")
db.add(org_model)
db.commit()
@router.get("/{org_id}/users", response_model=list[OrgUserGetResponse])
async def get_users(db: db_dependency, org_id: Annotated[int, Path(gt=0)]):
org_exists = db.query(exists().where(Org.id == org_id)).scalar()
if not org_exists:
raise HTTPException(status_code=404, detail="Organisation not found")
org_user_models = db.query(OrgUsers).filter(OrgUsers.org_id == org_id).all()
return org_user_models
@router.get("/{org_id}/users/admins", response_model=list[OrgUserGetResponse])
async def get_admin_users(db: db_dependency, org_id: Annotated[int, Path(gt=0)]):
org_exists = db.query(exists().where(Org.id == org_id)).scalar()
if not org_exists:
raise HTTPException(status_code=404, detail="Organisation not found")
org_user_models = db.query(OrgUsers).filter(OrgUsers.org_id == org_id).filter(OrgUsers.is_admin == True).all()
return org_user_models
@router.post("/{org_id}/users") @router.post("/{org_id}/users")
async def add_user_to_org(db: db_dependency, org_model: org_model_dependency, user_request: OrgUserPostRequest, org_id: Annotated[int, Path(gt=0)]): async def add_user_to_org(db: db_dependency, user_request: OrgUserPostRequest, org_id: Annotated[int, Path(gt=0)]):
user_model = db.get(User, user_request.user_id) org_model = (db.query(Org).filter(Org.id == org_id).first())
if user_model in org_model.user_rel: if org_model is None:
return raise HTTPException(status_code=404, detail="Organisation not found")
org_model.user_rel.append(user_model)
org_user_model = OrgUsers(**user_request.model_dump(), org_id=org_id)
db.add(org_user_model)
db.commit() db.commit()
@router.delete("/{org_id}", status_code=status.HTTP_204_NO_CONTENT) @router.patch("/{org_id}/users")
async def delete_organisation_by_id(db: db_dependency, org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]): async def update_user_details(db: db_dependency, user_request: OrgUserPostRequest, org_id: Annotated[int, Path(gt=0)]):
"""
Currently used only to update user admin status for organisation.
"""
org_model = (db.query(Org).filter(Org.id == org_id).first())
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
org_user_model = db.query(OrgUsers).filter(OrgUsers.org_id == org_id).filter(OrgUsers.user_id == user_request.user_id).first()
if org_user_model is None:
raise HTTPException(status_code=404, detail="Organisation user not found")
if user_request.is_admin is not None:
org_user_model.is_admin = user_request.is_admin
db.add(org_user_model)
db.commit()
@router.delete("/{org_id}")
async def delete_organisation_by_id(db: db_dependency, org_id: Annotated[int, Path(gt=0)]):
org_model = (db.query(Org).filter(Org.id == org_id).first())
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
db.delete(org_model) db.delete(org_model)
db.commit() db.commit()
@router.patch("/{org_id}/root_user", status_code=status.HTTP_204_NO_CONTENT) @router.get("/{org_id}/contact/{contact_type}", response_model=OrgContactGetResponse)
async def update_root_user(db: db_dependency, org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)], user_request: OrgRootPatchRequest): async def get_contact(db: db_dependency, contact_type: ContactType, org_id: Annotated[int, Path(gt=0)]):
root_user_model = db.get(User, user_request.user_id) org_model = db.query(Org).filter(Org.id == org_id).first()
if root_user_model is None: if org_model is None:
raise UserNotFoundException(user_id=user_request.user_id) raise HTTPException(status_code=404, detail="Organisation not found")
org_model.root_user_rel = root_user_model
db.commit()
@router.get("/{org_id}/groups", response_model=OrgGroupGetResponse)
async def get_org_groups(org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]):
return {"groups": [group.name for group in org_model.group_rel]}
@router.delete("/{org_id}/user", status_code=status.HTTP_204_NO_CONTENT)
async def remove_user_from_org(db: db_dependency, org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)], user_request: OrgUserDeleteRequest):
user_id = user_request.user_id
user = db.get(User, user_id)
if user is None:
raise UserNotFoundException(user_id=user_id)
if user not in org_model.user_rel:
raise HTTPException(status_code=status.HTTP_204_NOT_FOUND)
org_model.user_rel.remove(user)
db.commit()
@router.get("/{org_id}/contact", response_model=OrgContactGetResponse)
async def get_contact(org_model: org_model_dependency, contact_type: Annotated[ContactType, Query()], org_id: Annotated[int, Path(gt=0)]):
match contact_type: match contact_type:
case "billing": case "billing":
contact_model = org_model.billing_contact_rel contact_id = org_model.billing_contact_id
case "security": case "security":
contact_model = org_model.security_contact_rel contact_id = org_model.security_contact_id
case "owner": case "owner":
contact_model = org_model.owner_contact_rel contact_id = org_model.owner_contact_id
case _: case _:
raise HTTPException(status_code=422, detail="Invalid contact type") raise HTTPException(status_code=422, detail="Invalid contact type")
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None: if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found") raise HTTPException(status_code=404, detail="Contact not found")
return OrgContactGetResponse.model_construct( return contact_model
**contact_model.__dict__,
address=ContactAddress.model_validate(contact_model)
)
@router.patch("/{org_id}/contact", response_model=OrgContactGetResponse)
async def update_contact(db: db_dependency, org_model: org_model_dependency, contact_type: Annotated[ContactType, Query()], contact_request: OrgContactPatchRequest, org_id: Annotated[int, Path(gt=0)]):
match contact_type:
case "billing":
contact_model = org_model.billing_contact_rel
case "security":
contact_model = org_model.security_contact_rel
case "owner":
contact_model = org_model.owner_contact_rel
case _:
raise HTTPException(status_code=422, detail="Invalid contact type")
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
update_data = contact_request.model_dump(exclude_none=True)
for key, value in update_data.items():
if hasattr(contact_model, key):
setattr(contact_model, key, value)
else:
raise HTTPException(status_code=422, detail="Invalid keys in update request")
db.flush()
response = OrgContactGetResponse.model_construct(
**contact_model.__dict__,
address=ContactAddress.model_validate(contact_model)
)
db.commit()
return response

View file

@ -7,11 +7,9 @@ Models:
""" """
from typing import Optional from typing import Optional
from pydantic import EmailStr, ConfigDict
from src.schemas import CustomBaseModel from src.schemas import CustomBaseModel
from src.organisation.constants import Status, ContactType from src.organisation.constants import Status, ContactType
from src.contact.schemas import ContactAddress
class OrgQuestionnaire(CustomBaseModel): class OrgQuestionnaire(CustomBaseModel):
question_one: str question_one: str
@ -23,6 +21,10 @@ class OrgOrgPostRequest(CustomBaseModel):
name: str name: str
intake_questionnaire: Optional[OrgQuestionnaire] = None intake_questionnaire: Optional[OrgQuestionnaire] = None
billing_contact_id: Optional[int] = None
security_contact_id: Optional[int] = None
owner_contact_id: Optional[int] = None
class OrgQuestionnairePatchRequest(CustomBaseModel): class OrgQuestionnairePatchRequest(CustomBaseModel):
intake_questionnaire: OrgQuestionnaire intake_questionnaire: OrgQuestionnaire
partial: bool partial: bool
@ -31,49 +33,27 @@ class OrgStatusPatchRequest(CustomBaseModel):
status: Status status: Status
class OrgContactPatchRequest(CustomBaseModel): class OrgContactPatchRequest(CustomBaseModel):
email: Optional[EmailStr] = None contact_id: int
first_name: Optional[str] = None contact_type: ContactType
last_name: Optional[str] = None
phonenumber: Optional[str] = None
vat_number: Optional[str] = None
post_office_box_number: Optional[str] = None
street_address: Optional[str] = None
street_address_line_2: Optional[str] = None
locality: Optional[str] = None
address_region: Optional[str] = None
country_code: Optional[str] = None
postal_code: Optional[str] = None
class OrgUserPostRequest(CustomBaseModel): class OrgUserPostRequest(CustomBaseModel):
user_id: int user_id: int
is_admin: Optional[bool] = False
class OrgUserDeleteRequest(CustomBaseModel):
user_id: int
class OrgRootPatchRequest(CustomBaseModel):
user_id: int
class OrgUserGetResponse(CustomBaseModel): class OrgUserGetResponse(CustomBaseModel):
users: list[str] user_id: int
is_admin: bool
class OrgGroupGetResponse(CustomBaseModel):
groups: list[str]
class OrgContactGetResponse(CustomBaseModel): class OrgContactGetResponse(CustomBaseModel):
model_config = ConfigDict(from_attributes=True, extra="ignore") email: str
first_name: str
email: Optional[str] = None last_name: str
first_name: Optional[str] = None phonenumber: str
last_name: Optional[str] = None
phonenumber: Optional[str] = None
vat_number: Optional[str] = None vat_number: Optional[str] = None
address: ContactAddress
class OrgOrgGetResponse(CustomBaseModel): class OrgOrgGetResponse(CustomBaseModel):
name: str name: str
status: Status status: Status
root_user: Optional[str] = None owner_contact: OrgContactGetResponse
owner_contact: Optional[str] = None billing_contact: OrgContactGetResponse
billing_contact: Optional[str] = None security_contact: OrgContactGetResponse
security_contact: Optional[str] = None

View file

@ -1,13 +1,5 @@
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional
class CustomBaseModel(BaseModel): class CustomBaseModel(BaseModel):
pass pass
class ResourceName(CustomBaseModel):
service: str
organisation: str
resource: str
instance: Optional[str] = None

View file

@ -1,7 +0,0 @@
"""
Configurations for <this module>
Configurations:
- List: Description
- Configs: Description
"""

View file

@ -1,7 +0,0 @@
"""
Constants and error codes for <this module>
Constants:
- List: Description
- Consts: Description
"""

View file

@ -1,11 +0,0 @@
"""
Router dependencies for <this module>
Classes:
- List: Description
- Classes: Description
Functions:
- List: Description
- Functions: Description
"""

View file

@ -1,7 +0,0 @@
"""
Module specific exceptions for <this module>
Exceptions:
- List: Description
- Exceptions: Description
"""

View file

@ -1,18 +0,0 @@
"""
Database models for the services module
Models:
- List: Description
- Models: Description
"""
from sqlalchemy import Column, Integer, String
from src.database import Base
class Service(Base):
__tablename__ = "service"
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
api_key = Column(String, unique=True)

View file

@ -1,63 +0,0 @@
"""
Router endpoints for <this module>
Endpoints:
- List: Description
- Endpoints: Description
"""
from fastapi import APIRouter
from src.database import db_dependency
from src.service.models import Service
from src.service.utils import generate_api_key
router = APIRouter(
tags=["Service"],
prefix="/service",
)
@router.get("/")
async def get_all_services(db: db_dependency):
# TODO: user_dependency
# TODO: request model
permission_models = db.query(Service).all()
# TODO: Response model
return permission_models
@router.post("/")
async def register_service(db: db_dependency, service_name: str):
# TODO: super_admin_dependency
# TODO: request model
key = generate_api_key()
service_model = Service(name=service_name, api_key=key)
db.add(service_model)
db.commit()
# TODO: response model
@router.patch("/{service_id}/key")
async def regenerate_api_key(db: db_dependency, service_id: int):
# TODO: super_admin_dependency
# TODO: request model
key = generate_api_key()
service_model = db.query(Service).filter(Service.id==service_id).first()
service_model.api_key = key
db.add(service_model)
db.commit()
# TODO: response model
@router.delete("/{service_id}")
async def remove_service(db: db_dependency, service_id: int):
# TODO: super_admin_dependency
# TODO: request model
service_model = db.query(Service).filter(Service.id==service_id).first()
if service_model is None:
return
db.delete(service_model)
db.commit()
# TODO: response model

View file

@ -1,7 +0,0 @@
"""
Pydantic models for <this module>
Models:
- List: Description
- Models: Description
"""

View file

@ -1,11 +0,0 @@
"""
Module specific business logic for <this module>
Classes:
- List: Description
- Classes: Description
Functions:
- List: Description
- Functions: Description
"""

View file

@ -1,16 +0,0 @@
"""
Non-business logic reusable functions and classes for <this module>
Classes:
- List: Description
- Classes: Description
Functions:
- List: Description
- Functions: Description
"""
import uuid
def generate_api_key() -> str:
return str(uuid.uuid4())

View file

@ -4,13 +4,9 @@ Database models for user module
Models: Models:
- User - id[pk], email, first_name, last_name, oidc_id - User - id[pk], email, first_name, last_name, oidc_id
""" """
from collections import defaultdict
from sqlalchemy import Column, Integer, String from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship
from src.database import Base from src.database import Base
from src.iam.models import Group
class User(Base): class User(Base):
@ -21,26 +17,3 @@ class User(Base):
first_name = Column(String) first_name = Column(String)
last_name = Column(String) last_name = Column(String)
oidc_id = Column(String, index=True, unique=True) oidc_id = Column(String, index=True, unique=True)
organisation_rel = relationship(
"Organisation",
secondary="orgusers",
back_populates="user_rel"
)
@property
def organisations(self):
return [org.name for org in self.organisation_rel]
group_rel = relationship(
"Group",
secondary="user_groups",
back_populates="user_rel"
)
@property
def groups(self):
result = defaultdict(list)
for group in self.group_rel:
result[group.org_rel.name].append(group.name)
return dict(result)

View file

@ -15,12 +15,15 @@ from typing import Annotated
from fastapi import APIRouter from fastapi import APIRouter
from fastapi.params import Path from fastapi.params import Path
from sqlalchemy.sql import exists
from starlette import status from starlette import status
from src.user.models import User from src.user.models import User
from src.user.schemas import UserResponse, OIDCClaims from src.user.schemas import UserResponse, OrgResponse, OIDCClaims
from src.user.exceptions import UserNotFoundException from src.user.exceptions import UserNotFoundException
from src.organisation.models import OrgUsers, Organisation
from src.auth.service import claims_dependency from src.auth.service import claims_dependency
from src.database import db_dependency from src.database import db_dependency
@ -60,6 +63,55 @@ async def current_user(user: claims_dependency, db: db_dependency):
return user_model return user_model
@router.get("/self/orgs", response_model=list[OrgResponse], status_code=status.HTTP_200_OK, responses={
status.HTTP_404_NOT_FOUND: {"description": "User not found"},
status.HTTP_200_OK: {"description": "Successful retrieval from database"},
})
async def get_current_organisations(db: db_dependency, user: claims_dependency):
"""
Returns all organisations associated with the currently logged-in user.
"""
user_id = user.get("db_id", None)
if user_id is None:
raise UserNotFoundException()
user_exists = db.query(exists().where(User.id == user_id)).scalar()
if not user_exists:
raise UserNotFoundException(user_id=user_id)
org_user_models = (db.query(OrgUsers.org_id, OrgUsers.is_admin, Organisation.name)
.join(OrgUsers, Organisation.id == OrgUsers.org_id)
.filter(OrgUsers.user_id == user_id)
.all()
)
return org_user_models
@router.get("/self/orgs/admin", response_model=list[OrgResponse], status_code=status.HTTP_200_OK, responses={
status.HTTP_404_NOT_FOUND: {"description": "User not found"},
status.HTTP_200_OK: {"description": "Successful retrieval from database"},
})
async def get_current_admin_organisations(db: db_dependency, user: claims_dependency):
"""
Returns the organisations for which the currently logged-in user is an admin.
"""
user_id = user.get("db_id", None)
if user_id is None:
raise UserNotFoundException()
user_exists = db.query(exists().where(User.id == user_id)).scalar()
if not user_exists:
raise UserNotFoundException(user_id=user_id)
org_user_models = (db.query(OrgUsers.org_id, OrgUsers.is_admin, Organisation.name)
.join(OrgUsers, Organisation.id == OrgUsers.org_id)
.filter(OrgUsers.user_id == user_id)
.filter(OrgUsers.is_admin == True)
.all()
)
return org_user_models
@router.get("/{user_id}", response_model=UserResponse, status_code=status.HTTP_200_OK, responses={ @router.get("/{user_id}", response_model=UserResponse, status_code=status.HTTP_200_OK, responses={
status.HTTP_404_NOT_FOUND: {"description": "User not found"}, status.HTTP_404_NOT_FOUND: {"description": "User not found"},
status.HTTP_200_OK: {"description": "Successful retrieval from database"}, status.HTTP_200_OK: {"description": "Successful retrieval from database"},
@ -75,6 +127,49 @@ async def get_user_by_id(db: db_dependency, user_id: Annotated[int, Path(gt=0,de
return user_model return user_model
@router.get("/{user_id}/orgs", response_model=list[OrgResponse], status_code=status.HTTP_200_OK, responses={
status.HTTP_404_NOT_FOUND: {"description": "User not found"},
status.HTTP_200_OK: {"description": "Successful retrieval from database"},
})
async def get_organisations(db: db_dependency, user_id: Annotated[int, Path(gt=0,description="User database ID")]):
"""
Returns all organisations associated with the provided user ID.
"""
user_exists = db.query(exists().where(User.id == user_id)).scalar()
if not user_exists:
raise UserNotFoundException(user_id=user_id)
org_user_models = (db.query(OrgUsers.org_id, OrgUsers.is_admin, Organisation.name)
.join(OrgUsers, Organisation.id == OrgUsers.org_id)
.filter(OrgUsers.user_id == user_id)
.all()
)
return org_user_models
@router.get("/{user_id}/orgs/admin", response_model=list[OrgResponse], status_code=status.HTTP_200_OK, responses={
status.HTTP_404_NOT_FOUND: {"description": "User not found"},
status.HTTP_200_OK: {"description": "Successful retrieval from database"},
})
async def get_admin_organisations(db: db_dependency, user_id: Annotated[int, Path(gt=0,description="User database ID")]):
"""
Returns the organisations for which the user with the provided user ID is an admin.
"""
user_exists = db.query(exists().where(User.id == user_id)).scalar()
if not user_exists:
raise UserNotFoundException(user_id=user_id)
org_user_models = (db.query(OrgUsers.org_id, OrgUsers.is_admin, Organisation.name)
.join(OrgUsers, Organisation.id == OrgUsers.org_id)
.filter(OrgUsers.user_id == user_id)
.filter(OrgUsers.is_admin == True)
.all()
)
return org_user_models
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT, responses={ @router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT, responses={
status.HTTP_204_NO_CONTENT: {"description": "User deleted"}, status.HTTP_204_NO_CONTENT: {"description": "User deleted"},
status.HTTP_404_NOT_FOUND: {"description": "User not found"}, status.HTTP_404_NOT_FOUND: {"description": "User not found"},

View file

@ -5,7 +5,6 @@ Models:
- List: Description - List: Description
- Models: Description - Models: Description
""" """
from typing import Optional
from src.schemas import CustomBaseModel from src.schemas import CustomBaseModel
@ -45,10 +44,9 @@ class UserResponse(CustomBaseModel):
first_name: str first_name: str
last_name: str last_name: str
email: str email: str
organisations: list[Optional[str]]
groups: dict[str, list[str]]
class OrgResponse(CustomBaseModel): class OrgResponse(CustomBaseModel):
org_id: int org_id: int
name: str name: str
is_admin: bool