Compare commits

...

10 commits

Author SHA1 Message Date
b3689c8af6 feat: org router refactor
- All TODOs done.
- org_model_dependency used for all applicable routes
- ORM relationships used to reduce number of queries being made and simplify endpoint code.
- Missing request and response models added.
- Small bug fixes
2026-05-25 16:54:45 +01:00
2b6d923ae1 feat: contact model restructure
Blank contacts are now generated on org creation and assigned to each contact type. These contacts are linked to the org, only accessible to the org, and removed when the org is removed.

With this all contact endpoints have been removed. Contact manipulation is done via the org only.
2026-05-25 15:15:50 +01:00
707482adc2 feat: condensed org get endpoints
The process also added improved ORM relationships for multiple models.
2026-05-25 12:40:28 +01:00
a80767d870 feat: condensed user get endpoints
The process also added improved ORM relationships for multiple models.
2026-05-25 12:06:24 +01:00
4ff184fe86 feat: sqlalchemy defined orgusers relationship 2026-05-25 10:21:15 +01:00
d51adb4e55 feat: org root user dependency 2026-05-25 09:54:46 +01:00
2a20172d78 fix: questionnaire patch route update for new model 2026-05-25 09:54:19 +01:00
804e21b871 fix: handling for unset org contacts 2026-05-25 09:32:40 +01:00
23f2ce98d7 feat: iam rbac system
Endpoints and db architecture to support a role based IAM system.
2026-05-25 09:05:17 +01:00
7b3ee9d5fa feat: db dependency upgrade
No longer using .begin() and context manager. This means commits must be explicit (already were) but also, it allows for sanity checks within routes after commits.
2026-05-21 16:55:15 +01:00
37 changed files with 859 additions and 559 deletions

View file

@ -10,6 +10,8 @@ from src.config import SQLALCHEMY_DATABASE_URI
from src.contact.models import Contact from src.contact.models import Contact
from src.organisation.models import Organisation, OrgUsers from src.organisation.models import Organisation, OrgUsers
from src.user.models import User from src.user.models import User
from src.service.models import Service
from src.iam.models import Permission, Group, GroupPermissions, UserGroups
from src.database import Base from src.database import Base
# this is the Alembic Config object, which provides # this is the Alembic Config object, which provides

View file

@ -0,0 +1,83 @@
"""Init IAM
Revision ID: a147965e644e
Revises: 8fe51426321d
Create Date: 2026-05-22 15:59:36.469374
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'a147965e644e'
down_revision: Union[str, Sequence[str], None] = '8fe51426321d'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('service',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(), nullable=True),
sa.Column('api_key', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('api_key'),
sa.UniqueConstraint('name')
)
op.create_table('permission',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('resource', sa.String(), nullable=False),
sa.Column('action', sa.String(), nullable=False),
sa.Column('service_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['service_id'], ['service.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_table('group',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(), nullable=False),
sa.Column('org_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['org_id'], ['organisation.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name')
)
op.create_table('group_permissions',
sa.Column('group_id', sa.Integer(), nullable=False),
sa.Column('permission_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['group_id'], ['group.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['permission_id'], ['permission.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('group_id', 'permission_id')
)
op.create_table('user_groups',
sa.Column('org_id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('group_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['group_id'], ['group.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['org_id'], ['organisation.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('org_id', 'user_id', 'group_id')
)
op.add_column('organisation', sa.Column('root_user_id', sa.Integer(), nullable=True))
op.create_unique_constraint("organisation_name_key", 'organisation', ['name'])
op.create_foreign_key("organisation_root_user_fkey", 'organisation', 'user', ['root_user_id'], ['id'])
op.drop_column('orgusers', 'is_admin')
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('orgusers', sa.Column('is_admin', sa.BOOLEAN(), server_default=sa.text('false'), autoincrement=False, nullable=False))
op.drop_constraint("organisation_root_user_fkey", 'organisation', type_='foreignkey')
op.drop_constraint("organisation_name_key", 'organisation', type_='unique')
op.drop_column('organisation', 'root_user_id')
op.drop_table('user_groups')
op.drop_table('group_permissions')
op.drop_table('group')
op.drop_table('permission')
op.drop_table('service')
# ### end Alembic commands ###

View file

@ -0,0 +1,34 @@
"""Contact model changes
Revision ID: 8132c4b88665
Revises: a147965e644e
Create Date: 2026-05-25 13:09:22.635058
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '8132c4b88665'
down_revision: Union[str, Sequence[str], None] = 'a147965e644e'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('contact', sa.Column('org_id', sa.Integer(), nullable=False))
op.create_foreign_key(None, 'contact', 'organisation', ['org_id'], ['id'], ondelete='CASCADE')
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(None, 'contact', type_='foreignkey')
op.drop_column('contact', 'org_id')
# ### end Alembic commands ###

View file

@ -5,43 +5,9 @@ Endpoints:
- List: Description - List: Description
- Endpoints: Description - Endpoints: Description
""" """
from typing import Annotated from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from fastapi.params import Path
from src.organisation.constants import ContactType
from src.organisation.schemas import OrgContactGetResponse
from src.organisation.models import Organisation as Org
from src.contact.models import Contact
from src.auth.service import claims_dependency, org_or_super_admin_dependency
from src.database import db_dependency
router = APIRouter( router = APIRouter(
tags=["admin"], tags=["admin"],
prefix="/admin", prefix="/admin",
) )
@router.get("/{org_id}/contact/{contact_type}", response_model=OrgContactGetResponse)
async def get_contact(db: db_dependency, user: claims_dependency, is_admin: org_or_super_admin_dependency, contact_type: ContactType, org_id: Annotated[int, Path(gt=0)]):
org_model = db.query(Org).filter(Org.id == org_id).first()
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
match contact_type:
case "billing":
contact_id = org_model.billing_contact_id
case "security":
contact_id = org_model.security_contact_id
case "owner":
contact_id = org_model.owner_contact_id
case _:
raise HTTPException(status_code=422, detail="Invalid contact type")
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
return contact_model

View file

@ -8,6 +8,8 @@ from src.contact.router import router as contact_router
from src.organisation.router import router as organisation_router from src.organisation.router import router as organisation_router
from src.user.router import router as user_router from src.user.router import router as user_router
from src.admin.router import router as admin_router from src.admin.router import router as admin_router
from src.iam.router import router as iam_router
from src.service.router import router as service_router
api_router = APIRouter() api_router = APIRouter()
@ -17,6 +19,8 @@ api_router.include_router(contact_router)
api_router.include_router(organisation_router) api_router.include_router(organisation_router)
api_router.include_router(user_router) api_router.include_router(user_router)
api_router.include_router(admin_router) api_router.include_router(admin_router)
api_router.include_router(service_router)
api_router.include_router(iam_router)
@api_router.get("/healthcheck", include_in_schema=False) @api_router.get("/healthcheck", include_in_schema=False)

View file

@ -8,54 +8,4 @@ from fastapi import APIRouter
router = APIRouter( router = APIRouter(
tags=["auth"], tags=["auth"],
) )
# oauth = OAuth()
# oauth.register(
# name="oidc",
# server_metadata_url=auth_settings.OIDC_CONFIG,
# client_id=auth_settings.CLIENT_ID,
# client_secret=None,
# code_challenge_method="S256",
# client_kwargs={
# "code_challenge_method": "S256",
# "scope": "openid profile email",
# }
# )
# @auth_router.get('/login')
# async def login(request: Request):
# redirect_uri = request.url_for('auth')
# return await oauth.oidc.authorize_redirect(request, redirect_uri, code_challenge_method="S256")
#
#
# @auth_router.get('/auth', include_in_schema=False)
# async def auth(db: db_dependency, request: Request):
# token = await oauth.oidc.authorize_access_token(request)
# user = token.get("userinfo")
# request.session["user"] = user
#
# try:
# valid_user = OIDCUser(first_name=user["given_name"], last_name=user["family_name"], email=user["email"], oidc_id=user["sub"])
# except Exception as e:
# print(e)
# raise HTTPException(status_code=422, detail="Invalid or missing OIDC data")
#
# user_exists = db.query(exists().where(User.oidc_id == valid_user.oidc_id)).scalar()
#
# if not user_exists:
# user_model = User(**valid_user.model_dump())
# db.add(user_model)
# db.commit()
#
# return RedirectResponse(url="/")
#
#
# @auth_router.get('/logout')
# async def logout(request: Request):
# request.session.pop('user', None)
# return RedirectResponse(url='/')

View file

@ -20,7 +20,9 @@ from sqlalchemy.sql import exists
from src.auth.config import auth_settings from src.auth.config import auth_settings
from src.user.service import add_user_to_db from src.user.service import add_user_to_db
from src.organisation.models import OrgUsers, Organisation as Org from src.organisation.models import OrgUsers, Organisation as Org
from src.user.models import User
from src.database import db_dependency from src.database import db_dependency
from src.organisation.dependencies import org_model_dependency
oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG) oidc = OpenIdConnect(openIdConnectUrl=auth_settings.OIDC_CONFIG)
@ -88,31 +90,18 @@ async def is_org_user(claims: claims_dependency, db: db_dependency, org_id: int
org_user_dependency = Annotated[dict[str, Any], Depends(is_org_user)] org_user_dependency = Annotated[dict[str, Any], Depends(is_org_user)]
async def is_org_admin(claims: claims_dependency, db: db_dependency, org_id: int = Path(gt=0)): async def is_org_root(claims: claims_dependency, db: db_dependency, org_model: org_model_dependency, org_id: int = Path(gt=0)):
org_exists = db.query(exists().where(Org.id == org_id)).scalar()
if not org_exists:
raise HTTPException(status_code=404, detail="Organisation not found")
db_id = claims.get("db_id", None) db_id = claims.get("db_id", None)
if db_id is None: if db_id is None:
raise HTTPException(status_code=404, detail="User not found in db") raise HTTPException(status_code=404, detail="User not found in db")
exists_query = (db.query(OrgUsers) if org_model.root_user_id == db_id:
.filter(OrgUsers.org_id == org_id, return db.query(User).filter(User.id == db_id).first()
OrgUsers.user_id == db_id,
OrgUsers.is_admin == True
).exists()
)
org_admin_exists = db.query(exists_query).scalar() raise HTTPException(status_code=401, detail="Not authorised")
if not org_admin_exists:
raise HTTPException(status_code=401, detail="Not authorised")
return org_admin_exists
org_admin_dependency = Annotated[dict[str, Any], Depends(is_org_admin)] root_user_dependency = Annotated[dict[str, Any], Depends(is_org_root)]
async def is_super_admin(claims: claims_dependency): async def is_super_admin(claims: claims_dependency):
@ -128,114 +117,3 @@ async def is_super_admin(claims: claims_dependency):
super_admin_dependency = Annotated[dict[str, Any], Depends(is_super_admin)] super_admin_dependency = Annotated[dict[str, Any], Depends(is_super_admin)]
async def is_admin(claims: claims_dependency, db: db_dependency, org_id: int = Path(gt=0)):
try:
await is_super_admin(claims)
return True
except HTTPException as e:
pass
try:
await is_org_admin(claims, db, org_id)
return True
except HTTPException as e:
raise HTTPException(status_code=401, detail="Not authorised")
org_or_super_admin_dependency = Annotated[dict[str, Any], Depends(is_admin)]
# Middleware version of user auth
# import json
# import logging
#
# from threading import Timer
# from urllib.request import urlopen
# from starlette.requests import HTTPConnection, Request
#
# from authlib.jose.rfc7517.jwk import JsonWebKey
# from authlib.jose.rfc7517.key_set import KeySet
# from authlib.oauth2 import OAuth2Error, ResourceProtector
# from authlib.oauth2.rfc6749 import MissingAuthorizationError
# from authlib.oauth2.rfc7523 import JWTBearerTokenValidator
# from authlib.oauth2.rfc7523.validator import JWTBearerToken
#
# from starlette.authentication import (
# AuthCredentials,
# AuthenticationBackend,
# AuthenticationError,
# SimpleUser,
# )
#
# logger = logging.getLogger(__name__)
#
#
# class RepeatTimer(Timer):
# def __init__(self, *args, **kwargs) -> None:
# super().__init__(*args, **kwargs)
# self.daemon = True
#
# def run(self):
# while not self.finished.wait(self.interval):
# self.function(*self.args, **self.kwargs)
#
#
# class BearerTokenValidator(JWTBearerTokenValidator):
# def __init__(self, issuer: str, audience: str):
# self._issuer = issuer
# self._jwks_uri: str | None = None
# super().__init__(public_key=self.fetch_key(), issuer=issuer)
# self.claims_options = {
# "exp": {"essential": True},
# "aud": {"essential": True, "value": audience},
# "iss": {"essential": True, "value": issuer},
# }
# self._timer = RepeatTimer(3600, self.refresh)
# self._timer.start()
#
# def refresh(self):
# try:
# self.public_key = self.fetch_key()
# except Exception as exc:
# logger.warning(f"Could not update jwks public key: {exc}")
#
# def fetch_key(self) -> KeySet:
# """Fetch the jwks_uri document and return the KeySet."""
# response = urlopen(self.jwks_uri)
# logger.debug(f"OK GET {self.jwks_uri}")
# return JsonWebKey.import_key_set(json.loads(response.read()))
#
# @property
# def jwks_uri(self) -> str:
# """The jwks_uri field of the openid-configuration document."""
# if self._jwks_uri is None:
# config_url = urlopen(f"{self._issuer}/.well-known/openid-configuration")
# config = json.loads(config_url.read())
# self._jwks_uri = config["jwks_uri"]
# return self._jwks_uri
#
#
# class BearerTokenAuthBackend(AuthenticationBackend):
# def __init__(self, issuer: str, audience: str) -> None:
# rp = ResourceProtector()
# validator = BearerTokenValidator(
# issuer=issuer,
# audience=audience,
# )
# rp.register_token_validator(validator)
# self.resource_protector = rp
#
# async def authenticate(self, conn: HTTPConnection):
# if "Authorization" not in conn.headers:
# return
# request = Request(conn.scope)
# try:
# token: JWTBearerToken = self.resource_protector.validate_request(
# scopes=["openid"],
# request=request,
# )
# except (MissingAuthorizationError, OAuth2Error) as error:
# raise AuthenticationError(error.description) from error
# scope: str = token.get_scope()
# scopes = scope.split()
# scopes.append("authenticated")
# return AuthCredentials(scopes=scopes), SimpleUser(username=token["email"])

View file

@ -5,7 +5,7 @@ Models:
- Contact: id[pk], email, first_name, last_name, phonenumber, vat_number - Contact: id[pk], email, first_name, last_name, phonenumber, vat_number
street_address, post_office_box_number, address_locality, country_code, address_region, postal_code street_address, post_office_box_number, address_locality, country_code, address_region, postal_code
""" """
from sqlalchemy import Column, Integer, String from sqlalchemy import Column, Integer, String, ForeignKey
from src.database import Base from src.database import Base
@ -27,3 +27,5 @@ class Contact(Base):
country_code = Column(String) # Eg GB country_code = Column(String) # Eg GB
address_region = Column(String, default=None, nullable=True) address_region = Column(String, default=None, nullable=True)
postal_code = Column(String) postal_code = Column(String)
org_id = Column(Integer, ForeignKey("organisation.id", ondelete="CASCADE"), nullable=False)

View file

@ -9,110 +9,10 @@ Endpoints:
- [patch]/{contact_id} - Updates the details of an existing contact - [patch]/{contact_id} - Updates the details of an existing contact
- [delete]/{contact_id} - Deletes a contact by ID - [delete]/{contact_id} - Deletes a contact by ID
""" """
from typing import Annotated from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from fastapi.params import Path
from sqlalchemy import or_
from src.contact.schemas import ContactContactGetResponse, ContactAddressGetResponse, ContactContactPostRequest, \
ContactUpdateRequest, ContactOrgGetResponse
from src.contact.models import Contact
from src.database import db_dependency
from src.organisation.models import Organisation as Org
from src.organisation.constants import ContactType
router = APIRouter( router = APIRouter(
prefix="/contact", prefix="/contact",
tags=["contact"], tags=["contact"],
) )
@router.get("/{contact_id}", response_model=ContactContactGetResponse)
async def get_contact_details_by_id(contact_id: int, db: db_dependency):
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
return contact_model
@router.get("/{contact_id}/address", response_model=ContactAddressGetResponse)
async def get_contact_address_by_id(contact_id: int, db: db_dependency):
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
return contact_model
@router.post("/")
async def create_contact(db: db_dependency, contact_request: ContactContactPostRequest):
contact_model = Contact(**contact_request.model_dump())
db.add(contact_model)
db.commit()
@router.patch("/{contact_id}")
async def update_contact(db: db_dependency, contact_request: ContactUpdateRequest, contact_id: Annotated[int, Path(gt=0)]):
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
update_data = contact_request.model_dump(exclude_none=True)
for key, value in update_data.items():
if hasattr(contact_model, key):
setattr(contact_model, key, value)
else:
raise HTTPException(status_code=422, detail="Invalid keys in update request")
db.add(contact_model)
db.commit()
@router.delete("/{contact_id}")
async def delete_contact(db: db_dependency, contact_id: Annotated[int, Path(gt=0)]):
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
db.delete(contact_model)
db.commit()
@router.get("/{contact_id}/orgs", response_model=list[ContactOrgGetResponse])
async def get_contact_orgs(db: db_dependency, contact_id: Annotated[int, Path(gt=0)]):
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
org_models = (db.query(Org).filter(
or_(
Org.owner_contact_id == contact_id,
Org.billing_contact_id == contact_id,
Org.security_contact_id == contact_id
)
).all())
response = []
for org in org_models:
types=[]
if org.owner_contact_id == contact_id:
types.append(ContactType.OWNER)
if org.billing_contact_id == contact_id:
types.append(ContactType.BILLING)
if org.security_contact_id == contact_id:
types.append(ContactType.SECURITY)
org_response_model = ContactOrgGetResponse(
name=str(org.name),
contact_types=types,
)
response.append(org_response_model)
return response

View file

@ -7,12 +7,24 @@ Models:
""" """
from typing import Optional from typing import Optional
from pydantic import EmailStr from pydantic import EmailStr, ConfigDict
from src.organisation.constants import ContactType from src.organisation.constants import ContactType
from src.schemas import CustomBaseModel from src.schemas import CustomBaseModel
class ContactAddress(CustomBaseModel):
model_config = ConfigDict(from_attributes=True, extra="ignore")
post_office_box_number: Optional[str] = None
street_address: Optional[str] = None
street_address_line_2: Optional[str] = None
locality: Optional[str] = None
address_region: Optional[str] = None
country_code: Optional[str] = None
postal_code: Optional[str] = None
class ContactContactGetResponse(CustomBaseModel): class ContactContactGetResponse(CustomBaseModel):
email: str email: str
first_name: str first_name: str

View file

@ -17,13 +17,16 @@ engine = create_engine(SQLALCHEMY_DATABASE_URI.get_secret_value())
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db(): def get_db():
with SessionLocal.begin() as db: db = SessionLocal()
try: try:
yield db yield db
finally: except:
db.rollback() # Anything not explicitly commited is rolled back db.rollback()
db.close() raise
finally:
db.close()
db_dependency = Annotated[Session, Depends(get_db)] db_dependency = Annotated[Session, Depends(get_db)]

7
src/iam/config.py Normal file
View file

@ -0,0 +1,7 @@
"""
Configurations for <this module>
Configurations:
- List: Description
- Configs: Description
"""

7
src/iam/constants.py Normal file
View file

@ -0,0 +1,7 @@
"""
Constants and error codes for <this module>
Constants:
- List: Description
- Consts: Description
"""

11
src/iam/dependencies.py Normal file
View file

@ -0,0 +1,11 @@
"""
Router dependencies for <this module>
Classes:
- List: Description
- Classes: Description
Functions:
- List: Description
- Functions: Description
"""

7
src/iam/exceptions.py Normal file
View file

@ -0,0 +1,7 @@
"""
Module specific exceptions for <this module>
Exceptions:
- List: Description
- Exceptions: Description
"""

52
src/iam/models.py Normal file
View file

@ -0,0 +1,52 @@
"""
Database models for the IAM module
Models:
- List: Description
- Models: Description
"""
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint
from sqlalchemy.orm import relationship
from src.database import Base
class Permission(Base):
__tablename__ = "permission"
id = Column(Integer, primary_key=True)
resource = Column(String, nullable=False)
action = Column(String, nullable=False)
service_id = Column(Integer, ForeignKey("service.id", ondelete="CASCADE"))
UniqueConstraint("service_id", "resource", "action", name="uniq_permission_resource_and_action")
class Group(Base):
__tablename__ = "group"
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False, unique=True)
org_id = Column(Integer, ForeignKey("organisation.id", ondelete="CASCADE"))
user_rel = relationship(
"User",
secondary="user_groups",
back_populates="group_rel"
)
org_rel = relationship("Organisation", back_populates="group_rel")
class GroupPermissions(Base):
__tablename__ = "group_permissions"
group_id = Column(Integer, ForeignKey("group.id", ondelete="CASCADE"), primary_key=True)
permission_id = Column(Integer, ForeignKey("permission.id", ondelete="CASCADE"), primary_key=True)
class UserGroups(Base):
__tablename__ = "user_groups"
org_id = Column(Integer, ForeignKey("organisation.id", ondelete="CASCADE"), primary_key=True)
user_id = Column(Integer, ForeignKey("user.id", ondelete="CASCADE"), primary_key=True)
group_id = Column(Integer, ForeignKey("group.id", ondelete="CASCADE"), primary_key=True)

192
src/iam/router.py Normal file
View file

@ -0,0 +1,192 @@
"""
Router endpoints for <this module>
Endpoints:
- List: Description
- Endpoints: Description
"""
from typing import Annotated, Optional
from fastapi import APIRouter, Query, HTTPException
from src.database import db_dependency
from src.schemas import ResourceName
from src.auth.service import claims_dependency
from src.user.models import User
from src.organisation.models import Organisation as Org
from src.service.models import Service
from src.organisation.dependencies import org_model_dependency
from src.iam.service import service_key_dependency
from src.iam.models import Permission as Perm, GroupPermissions as GPerms, Group, UserGroups
router = APIRouter(
tags=["IAM"],
prefix="/iam",
)
@router.post("/can_act_on_resource")
async def can_act_on_resource(valid_key: service_key_dependency, db: db_dependency, user_claims: claims_dependency,
rn: ResourceName, action: str) -> bool:
try:
user_id = user_claims["db_id"]
rn_org = rn.organisation
rn_service = rn.service
rn_resource = rn.resource
result = (db.query(Perm)
.join(Service, Service.id == Perm.service_id)
.join(GPerms, GPerms.permission_id == Perm.id)
.join(Group, Group.id == GPerms.group_id)
.join(Org, Org.id == Group.org_id)
.join(UserGroups, UserGroups.group_id == Group.id)
.join(User, User.id == UserGroups.user_id)
.filter(User.id == user_id)
.filter(Org.name == rn_org)
.filter(Service.name == rn_service)
.filter(Perm.resource == rn_resource)
.filter(Perm.action == action)
).first()
if result:
return True
else:
return False
except Exception as e:
print(e)
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/group/permissions")
async def get_group_permissions(db: db_dependency, group_id: Annotated[int, Query(gt=0)]):
# TODO: iam_admin_dependency
group_perms = db.query(Perm).join(GPerms).filter(GPerms.group_id==group_id).all()
# TODO: Response model
return group_perms
@router.get("/group/users")
async def get_group_users(db: db_dependency, group_id: Annotated[int, Query(gt=0)]):
# TODO: iam_admin_dependency
group_users = db.query(User).join(UserGroups).filter(UserGroups.group_id == group_id).all()
# TODO: Response model
return group_users
@router.post("/group")
async def create_group(db: db_dependency, group_name: str, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
group_model = Group(name=group_name, org_id=org_id)
db.add(group_model)
db.commit()
# TODO: Response model
@router.put("/group/permissions")
async def add_group_permissions(db: db_dependency, group_id: int, permission_id: int, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
g_perm_model = GPerms(group_id=group_id, permission_id=permission_id)
db.add(g_perm_model)
db.commit()
# TODO: Response model
@router.put("/group/users")
async def add_group_users(db: db_dependency, group_id: int, user_ids: list[int], org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
for user_id in user_ids:
user_group_model = UserGroups(group_id=group_id, user_id=user_id, org_id=org_id)
db.add(user_group_model)
db.commit()
# TODO: Response model
@router.delete("/group/permissions")
async def remove_group_permissions(db: db_dependency, group_id: int, org_model: org_model_dependency, org_id: int, permission_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
g_perm_model = db.query(GPerms).filter(GPerms.group_id == group_id, GPerms.permission_id == permission_id).first()
if g_perm_model is None:
return
db.delete(g_perm_model)
db.commit()
return
# TODO: Response model
@router.delete("/group/user")
async def remove_group_user(db: db_dependency, group_id: int, user_id: int, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
user_group_model = db.query(UserGroups).filter(UserGroups.group_id == group_id, UserGroups.user_id == user_id).first()
if user_group_model is None:
return
db.delete(user_group_model)
db.commit()
return
# TODO: Response model
@router.get("/permissions")
async def get_permissions(db: db_dependency, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: request model
permission_models = db.query(Perm).all()
# TODO: Response model
return permission_models
@router.post("/permission")
async def create_new_permission(db: db_dependency, service_id: int, resource: str, action: str):
# TODO: super_admin_dependency
perm_model = Perm(service_id=service_id, resource=resource, action=action)
db.add(perm_model)
db.commit()
@router.delete("/permission")
async def delete_permission(db: db_dependency, service_id: int, resource: str, action: str, org_model: org_model_dependency, org_id: int):
# TODO: iam_admin_dependency
# TODO: Request model
perm_model = db.query(Perm).filter(Perm.service_id==service_id, Perm.resource==resource, Perm.action==action).first()
if perm_model is None:
return
db.delete(perm_model)
db.commit()
return
# TODO: Response model
@router.get("/permissions/search")
async def get_permissions(db: db_dependency, org_model: org_model_dependency, org_id: int, service_id: Optional[int] = None, resource: Optional[str] = None, action: Optional[str] = None):
# TODO: iam_admin_dependency
# TODO: request model
permission_query = db.query(Perm)
if service_id is not None:
permission_query = permission_query.filter(Perm.service_id == service_id)
if resource is not None:
permission_query = permission_query.filter(Perm.resource == resource)
if action is not None:
permission_query = permission_query.filter(Perm.action == action)
permission_models = permission_query.all()
# TODO: Response model
return permission_models

7
src/iam/schemas.py Normal file
View file

@ -0,0 +1,7 @@
"""
Pydantic models for <this module>
Models:
- List: Description
- Models: Description
"""

26
src/iam/service.py Normal file
View file

@ -0,0 +1,26 @@
"""
Module specific business logic for <this module>
Exports service_key_dependency
"""
from typing import Annotated
from src.service.models import Service
from src.database import db_dependency
from src.schemas import ResourceName
from fastapi import HTTPException, status, Request, Depends
def valid_service_key(db: db_dependency, request: Request, rn: ResourceName) -> bool:
api_key = request.headers.get("X-API-Key", None)
if not api_key:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
service = rn.service
result = db.query(Service).filter(Service.name == service).filter(Service.api_key == api_key).first()
if result is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return True
service_key_dependency = Annotated[bool, Depends(valid_service_key)]

11
src/iam/utils.py Normal file
View file

@ -0,0 +1,11 @@
"""
Non-business logic reusable functions and classes for <this module>
Classes:
- List: Description
- Classes: Description
Functions:
- List: Description
- Functions: Description
"""

View file

@ -8,4 +8,21 @@ Classes:
Functions: Functions:
- List: Description - List: Description
- Functions: Description - Functions: Description
""" """
from typing import Annotated
from fastapi import HTTPException, Depends
from src.database import db_dependency
from src.organisation.models import Organisation as Org
def get_org_model(db: db_dependency, org_id: int) -> type[Org]:
org_model = db.query(Org).filter(Org.id == org_id).first()
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
return org_model
org_model_dependency = Annotated[type[Org], Depends(get_org_model)]

View file

@ -6,7 +6,8 @@ Models:
billing_contact_id[fk], security_contact_id[fk], owner_contact_id[fk] billing_contact_id[fk], security_contact_id[fk], owner_contact_id[fk]
- OrgUsers: org_id[fk][cpk], user_id[fk][cpk], is_admin - OrgUsers: org_id[fk][cpk], user_id[fk][cpk], is_admin
""" """
from sqlalchemy import Column, Integer, String, Boolean, ForeignKey, JSON, false from sqlalchemy import Column, Integer, String, ForeignKey, JSON
from sqlalchemy.orm import relationship
from src.database import Base from src.database import Base
@ -15,18 +16,36 @@ class Organisation(Base):
__tablename__ = "organisation" __tablename__ = "organisation"
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
name = Column(String) name = Column(String, unique=True)
status = Column(String, default="partial") status = Column(String, default="partial")
intake_questionnaire = Column(JSON) intake_questionnaire = Column(JSON)
root_user_id = Column(Integer, ForeignKey("user.id"))
billing_contact_id = Column(Integer, ForeignKey("contact.id")) billing_contact_id = Column(Integer, ForeignKey("contact.id"))
security_contact_id = Column(Integer, ForeignKey("contact.id")) security_contact_id = Column(Integer, ForeignKey("contact.id"))
owner_contact_id = Column(Integer, ForeignKey("contact.id")) owner_contact_id = Column(Integer, ForeignKey("contact.id"))
user_rel = relationship(
"User",
secondary="orgusers",
back_populates="organisation_rel"
)
group_rel = relationship("Group", back_populates="org_rel")
root_user_rel = relationship("User", foreign_keys=[root_user_id])
@property
def root_user_email(self):
return self.root_user_rel.email if self.root_user_rel else None
billing_contact_rel = relationship("Contact", foreign_keys=[billing_contact_id])
security_contact_rel = relationship("Contact", foreign_keys=[security_contact_id])
owner_contact_rel = relationship("Contact", foreign_keys=[owner_contact_id])
class OrgUsers(Base): class OrgUsers(Base):
__tablename__ = "orgusers" __tablename__ = "orgusers"
org_id = Column(Integer, ForeignKey("organisation.id", ondelete="CASCADE"), primary_key=True) org_id = Column(Integer, ForeignKey("organisation.id", ondelete="CASCADE"), primary_key=True)
user_id = Column(Integer, ForeignKey("user.id", ondelete="CASCADE"), primary_key=True) user_id = Column(Integer, ForeignKey("user.id", ondelete="CASCADE"), primary_key=True)
is_admin = Column(Boolean, nullable=False, server_default=false())

View file

@ -8,27 +8,30 @@ Endpoints:
- [patch]/{org_id}/status - Updates the status of an organisation - [patch]/{org_id}/status - Updates the status of an organisation
- [patch]/{org_id}/contact - Assigns a contact to an organisation (as billing, security, or owner) - [patch]/{org_id}/contact - Assigns a contact to an organisation (as billing, security, or owner)
- [get]/{org_id}/users - Retrieves all users associated with an organisation - [get]/{org_id}/users - Retrieves all users associated with an organisation
- [get]/{org_id}/users/admins - Retrieves only the admin users of an organisation
- [post]/{org_id}/users - Adds a new user to an organisation - [post]/{org_id}/users - Adds a new user to an organisation
- [patch]/{org_id}/users - Updates details of an existing organisation user (e.g., admin status)
- [delete]/{org_id} - Deletes an organisation by ID - [delete]/{org_id} - Deletes an organisation by ID
- [get]/{org_id}/contact/{contact_type} - Retrieves the contact of a specific type (owner, billing, security) for an organisation - [get]/{org_id}/contact/{contact_type} - Retrieves the contact of a specific type (owner, billing, security) for an organisation
""" """
from typing import Annotated from typing import Annotated, Optional
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException, status
from fastapi.params import Path from fastapi.params import Path, Query
from sqlalchemy.sql import exists
from src.contact.schemas import ContactAddress
from src.database import db_dependency from src.database import db_dependency
from src.contact.models import Contact from src.contact.models import Contact
from src.user.models import User
from src.user.exceptions import UserNotFoundException
from src.auth.service import root_user_dependency, claims_dependency
from src.organisation.dependencies import org_model_dependency
from src.organisation.constants import ContactType from src.organisation.constants import ContactType
from src.organisation.models import Organisation as Org, OrgUsers from src.organisation.models import Organisation as Org
from src.organisation.schemas import OrgOrgPostRequest, OrgQuestionnairePatchRequest, OrgStatusPatchRequest, \ from src.organisation.schemas import OrgOrgPostRequest, OrgQuestionnairePatchRequest, OrgStatusPatchRequest, \
OrgContactPatchRequest, \ OrgContactPatchRequest, \
OrgUserPostRequest, OrgUserGetResponse, OrgContactGetResponse, OrgOrgGetResponse OrgUserPostRequest, OrgUserGetResponse, OrgContactGetResponse, OrgOrgGetResponse, OrgRootPatchRequest, \
OrgGroupGetResponse, OrgUserDeleteRequest
router = APIRouter( router = APIRouter(
prefix="/org", prefix="/org",
@ -37,167 +40,165 @@ router = APIRouter(
@router.get("/id/{org_id}", response_model=OrgOrgGetResponse) @router.get("/id/{org_id}", response_model=OrgOrgGetResponse)
async def get_org_by_id(db: db_dependency, org_id: Annotated[int, Path(gt=0)]): async def get_org_by_id(org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]):
org_model = (db.query(Org).filter(Org.id == org_id).first())
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
response = { response = {
"name": org_model.name, "name": org_model.name,
"status": org_model.status, "status": org_model.status,
"owner_contact": (db.query(Contact).filter(Contact.id == org_model.owner_contact_id).first()), "owner_contact": org_model.owner_contact_rel.email,
"billing_contact": (db.query(Contact).filter(Contact.id == org_model.billing_contact_id).first()), "billing_contact": org_model.billing_contact_rel.email,
"security_contact": (db.query(Contact).filter(Contact.id == org_model.security_contact_id).first()), "security_contact": org_model.security_contact_rel.email,
"root_user": org_model.root_user_email
} }
return response return response
@router.post("/") @router.post("/")
async def create_org(db: db_dependency, org_request: OrgOrgPostRequest): async def create_org(db: db_dependency, user: claims_dependency, org_request: OrgOrgPostRequest):
org_model = Org(**org_request.model_dump()) db_id: Optional[int] = user.get("db_id", None)
if db_id is None:
raise UserNotFoundException()
org_model = Org(name=org_request.name, intake_questionnaire=org_request.intake_questionnaire.model_dump())
org_model.status = "partial" # Status is always set to partial at first, see update_questionnaire() doc org_model.status = "partial" # Status is always set to partial at first, see update_questionnaire() doc
db.add(org_model) db.add(org_model)
db.flush()
# Adds currently logged-in user to org users list and sets them as root_user
user_model = db.get(User, db_id)
org_model.user_rel.append(user_model)
org_model.root_user_rel = user_model
for contact_type in ["billing_contact_id", "security_contact_id", "owner_contact_id"]:
contact_model = Contact(org_id=org_model.id)
db.add(contact_model)
db.flush()
org_model.__setattr__(contact_type, contact_model.id)
db.commit() db.commit()
@router.patch("/{org_id}/questionnaire") @router.patch("/{org_id}/questionnaire")
async def update_questionnaire(db: db_dependency, q_request: OrgQuestionnairePatchRequest, org_id: Annotated[int, Path(gt=0)]): async def update_questionnaire(db: db_dependency, org_model: org_model_dependency, q_request: OrgQuestionnairePatchRequest, org_id: Annotated[int, Path(gt=0)]):
""" """
Route for updating questionnaire. Route for updating questionnaire.
The partial bool allows for submission of partially completed questionnaire and/or The partial bool allows for submission of partially completed questionnaire and/or
final "are you sure" check before setting the org to be in "submitted" status, awaiting admin approval. final "are you sure" check before setting the org to be in "submitted" status, awaiting admin approval.
""" """
org_model = db.query(Org).filter(Org.id == org_id).first() org_model.intake_questionnaire = q_request.intake_questionnaire.model_dump()
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
org_model.intake_questionnaire = q_request.intake_questionnaire
# Allows for partially completed questionnaires to be saved without being submitted for review # Allows for partially completed questionnaires to be saved without being submitted for review
if not q_request.partial: if not q_request.partial:
org_model.status = "submitted" org_model.status = "submitted"
db.add(org_model)
db.commit() db.commit()
@router.patch("/{org_id}/status") @router.patch("/{org_id}/status")
async def update_status(db: db_dependency, status_request: OrgStatusPatchRequest, org_id: Annotated[int, Path(gt=0)]): async def update_status(db: db_dependency, org_model: org_model_dependency, status_request: OrgStatusPatchRequest, org_id: Annotated[int, Path(gt=0)]):
org_model = db.query(Org).filter(Org.id == org_id).first()
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
org_model.status = status_request.status org_model.status = status_request.status
db.add(org_model)
db.commit() db.commit()
@router.patch("/{org_id}/contact") @router.get("/{org_id}/users", response_model=OrgUserGetResponse)
async def update_contact(db: db_dependency, contact_request: OrgContactPatchRequest, org_id: Annotated[int, Path(gt=0)]): async def get_users(org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]):
org_model = db.query(Org).filter(Org.id == org_id).first() return {"users": [user.email for user in org_model.user_rel]}
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
match contact_request.contact_type:
case "billing":
org_model.billing_contact_id = contact_request.contact_id
case "security":
org_model.security_contact_id = contact_request.contact_id
case "owner":
org_model.owner_contact_id = contact_request.contact_id
case _:
raise HTTPException(status_code=422, detail="Invalid contact type")
db.add(org_model)
db.commit()
@router.get("/{org_id}/users", response_model=list[OrgUserGetResponse])
async def get_users(db: db_dependency, org_id: Annotated[int, Path(gt=0)]):
org_exists = db.query(exists().where(Org.id == org_id)).scalar()
if not org_exists:
raise HTTPException(status_code=404, detail="Organisation not found")
org_user_models = db.query(OrgUsers).filter(OrgUsers.org_id == org_id).all()
return org_user_models
@router.get("/{org_id}/users/admins", response_model=list[OrgUserGetResponse])
async def get_admin_users(db: db_dependency, org_id: Annotated[int, Path(gt=0)]):
org_exists = db.query(exists().where(Org.id == org_id)).scalar()
if not org_exists:
raise HTTPException(status_code=404, detail="Organisation not found")
org_user_models = db.query(OrgUsers).filter(OrgUsers.org_id == org_id).filter(OrgUsers.is_admin == True).all()
return org_user_models
@router.post("/{org_id}/users") @router.post("/{org_id}/users")
async def add_user_to_org(db: db_dependency, user_request: OrgUserPostRequest, org_id: Annotated[int, Path(gt=0)]): async def add_user_to_org(db: db_dependency, org_model: org_model_dependency, user_request: OrgUserPostRequest, org_id: Annotated[int, Path(gt=0)]):
org_model = (db.query(Org).filter(Org.id == org_id).first()) user_model = db.get(User, user_request.user_id)
if org_model is None: if user_model in org_model.user_rel:
raise HTTPException(status_code=404, detail="Organisation not found") return
org_model.user_rel.append(user_model)
org_user_model = OrgUsers(**user_request.model_dump(), org_id=org_id)
db.add(org_user_model)
db.commit() db.commit()
@router.patch("/{org_id}/users") @router.delete("/{org_id}", status_code=status.HTTP_204_NO_CONTENT)
async def update_user_details(db: db_dependency, user_request: OrgUserPostRequest, org_id: Annotated[int, Path(gt=0)]): async def delete_organisation_by_id(db: db_dependency, org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]):
"""
Currently used only to update user admin status for organisation.
"""
org_model = (db.query(Org).filter(Org.id == org_id).first())
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
org_user_model = db.query(OrgUsers).filter(OrgUsers.org_id == org_id).filter(OrgUsers.user_id == user_request.user_id).first()
if org_user_model is None:
raise HTTPException(status_code=404, detail="Organisation user not found")
if user_request.is_admin is not None:
org_user_model.is_admin = user_request.is_admin
db.add(org_user_model)
db.commit()
@router.delete("/{org_id}")
async def delete_organisation_by_id(db: db_dependency, org_id: Annotated[int, Path(gt=0)]):
org_model = (db.query(Org).filter(Org.id == org_id).first())
if org_model is None:
raise HTTPException(status_code=404, detail="Organisation not found")
db.delete(org_model) db.delete(org_model)
db.commit() db.commit()
@router.get("/{org_id}/contact/{contact_type}", response_model=OrgContactGetResponse) @router.patch("/{org_id}/root_user", status_code=status.HTTP_204_NO_CONTENT)
async def get_contact(db: db_dependency, contact_type: ContactType, org_id: Annotated[int, Path(gt=0)]): async def update_root_user(db: db_dependency, org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)], user_request: OrgRootPatchRequest):
org_model = db.query(Org).filter(Org.id == org_id).first() root_user_model = db.get(User, user_request.user_id)
if org_model is None: if root_user_model is None:
raise HTTPException(status_code=404, detail="Organisation not found") raise UserNotFoundException(user_id=user_request.user_id)
org_model.root_user_rel = root_user_model
db.commit()
@router.get("/{org_id}/groups", response_model=OrgGroupGetResponse)
async def get_org_groups(org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)]):
return {"groups": [group.name for group in org_model.group_rel]}
@router.delete("/{org_id}/user", status_code=status.HTTP_204_NO_CONTENT)
async def remove_user_from_org(db: db_dependency, org_model: org_model_dependency, org_id: Annotated[int, Path(gt=0)], user_request: OrgUserDeleteRequest):
user_id = user_request.user_id
user = db.get(User, user_id)
if user is None:
raise UserNotFoundException(user_id=user_id)
if user not in org_model.user_rel:
raise HTTPException(status_code=status.HTTP_204_NOT_FOUND)
org_model.user_rel.remove(user)
db.commit()
@router.get("/{org_id}/contact", response_model=OrgContactGetResponse)
async def get_contact(org_model: org_model_dependency, contact_type: Annotated[ContactType, Query()], org_id: Annotated[int, Path(gt=0)]):
match contact_type: match contact_type:
case "billing": case "billing":
contact_id = org_model.billing_contact_id contact_model = org_model.billing_contact_rel
case "security": case "security":
contact_id = org_model.security_contact_id contact_model = org_model.security_contact_rel
case "owner": case "owner":
contact_id = org_model.owner_contact_id contact_model = org_model.owner_contact_rel
case _: case _:
raise HTTPException(status_code=422, detail="Invalid contact type") raise HTTPException(status_code=422, detail="Invalid contact type")
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None: if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found") raise HTTPException(status_code=404, detail="Contact not found")
return contact_model return OrgContactGetResponse.model_construct(
**contact_model.__dict__,
address=ContactAddress.model_validate(contact_model)
)
@router.patch("/{org_id}/contact", response_model=OrgContactGetResponse)
async def update_contact(db: db_dependency, org_model: org_model_dependency, contact_type: Annotated[ContactType, Query()], contact_request: OrgContactPatchRequest, org_id: Annotated[int, Path(gt=0)]):
match contact_type:
case "billing":
contact_model = org_model.billing_contact_rel
case "security":
contact_model = org_model.security_contact_rel
case "owner":
contact_model = org_model.owner_contact_rel
case _:
raise HTTPException(status_code=422, detail="Invalid contact type")
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
update_data = contact_request.model_dump(exclude_none=True)
for key, value in update_data.items():
if hasattr(contact_model, key):
setattr(contact_model, key, value)
else:
raise HTTPException(status_code=422, detail="Invalid keys in update request")
db.flush()
response = OrgContactGetResponse.model_construct(
**contact_model.__dict__,
address=ContactAddress.model_validate(contact_model)
)
db.commit()
return response

View file

@ -7,9 +7,11 @@ Models:
""" """
from typing import Optional from typing import Optional
from pydantic import EmailStr, ConfigDict
from src.schemas import CustomBaseModel from src.schemas import CustomBaseModel
from src.organisation.constants import Status, ContactType from src.organisation.constants import Status, ContactType
from src.contact.schemas import ContactAddress
class OrgQuestionnaire(CustomBaseModel): class OrgQuestionnaire(CustomBaseModel):
question_one: str question_one: str
@ -21,10 +23,6 @@ class OrgOrgPostRequest(CustomBaseModel):
name: str name: str
intake_questionnaire: Optional[OrgQuestionnaire] = None intake_questionnaire: Optional[OrgQuestionnaire] = None
billing_contact_id: Optional[int] = None
security_contact_id: Optional[int] = None
owner_contact_id: Optional[int] = None
class OrgQuestionnairePatchRequest(CustomBaseModel): class OrgQuestionnairePatchRequest(CustomBaseModel):
intake_questionnaire: OrgQuestionnaire intake_questionnaire: OrgQuestionnaire
partial: bool partial: bool
@ -33,27 +31,49 @@ class OrgStatusPatchRequest(CustomBaseModel):
status: Status status: Status
class OrgContactPatchRequest(CustomBaseModel): class OrgContactPatchRequest(CustomBaseModel):
contact_id: int email: Optional[EmailStr] = None
contact_type: ContactType first_name: Optional[str] = None
last_name: Optional[str] = None
phonenumber: Optional[str] = None
vat_number: Optional[str] = None
post_office_box_number: Optional[str] = None
street_address: Optional[str] = None
street_address_line_2: Optional[str] = None
locality: Optional[str] = None
address_region: Optional[str] = None
country_code: Optional[str] = None
postal_code: Optional[str] = None
class OrgUserPostRequest(CustomBaseModel): class OrgUserPostRequest(CustomBaseModel):
user_id: int user_id: int
is_admin: Optional[bool] = False
class OrgUserDeleteRequest(CustomBaseModel):
user_id: int
class OrgRootPatchRequest(CustomBaseModel):
user_id: int
class OrgUserGetResponse(CustomBaseModel): class OrgUserGetResponse(CustomBaseModel):
user_id: int users: list[str]
is_admin: bool
class OrgGroupGetResponse(CustomBaseModel):
groups: list[str]
class OrgContactGetResponse(CustomBaseModel): class OrgContactGetResponse(CustomBaseModel):
email: str model_config = ConfigDict(from_attributes=True, extra="ignore")
first_name: str
last_name: str email: Optional[str] = None
phonenumber: str first_name: Optional[str] = None
last_name: Optional[str] = None
phonenumber: Optional[str] = None
vat_number: Optional[str] = None vat_number: Optional[str] = None
address: ContactAddress
class OrgOrgGetResponse(CustomBaseModel): class OrgOrgGetResponse(CustomBaseModel):
name: str name: str
status: Status status: Status
owner_contact: OrgContactGetResponse root_user: Optional[str] = None
billing_contact: OrgContactGetResponse owner_contact: Optional[str] = None
security_contact: OrgContactGetResponse billing_contact: Optional[str] = None
security_contact: Optional[str] = None

View file

@ -1,5 +1,13 @@
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional
class CustomBaseModel(BaseModel): class CustomBaseModel(BaseModel):
pass pass
class ResourceName(CustomBaseModel):
service: str
organisation: str
resource: str
instance: Optional[str] = None

7
src/service/config.py Normal file
View file

@ -0,0 +1,7 @@
"""
Configurations for <this module>
Configurations:
- List: Description
- Configs: Description
"""

7
src/service/constants.py Normal file
View file

@ -0,0 +1,7 @@
"""
Constants and error codes for <this module>
Constants:
- List: Description
- Consts: Description
"""

View file

@ -0,0 +1,11 @@
"""
Router dependencies for <this module>
Classes:
- List: Description
- Classes: Description
Functions:
- List: Description
- Functions: Description
"""

View file

@ -0,0 +1,7 @@
"""
Module specific exceptions for <this module>
Exceptions:
- List: Description
- Exceptions: Description
"""

18
src/service/models.py Normal file
View file

@ -0,0 +1,18 @@
"""
Database models for the services module
Models:
- List: Description
- Models: Description
"""
from sqlalchemy import Column, Integer, String
from src.database import Base
class Service(Base):
__tablename__ = "service"
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
api_key = Column(String, unique=True)

63
src/service/router.py Normal file
View file

@ -0,0 +1,63 @@
"""
Router endpoints for <this module>
Endpoints:
- List: Description
- Endpoints: Description
"""
from fastapi import APIRouter
from src.database import db_dependency
from src.service.models import Service
from src.service.utils import generate_api_key
router = APIRouter(
tags=["Service"],
prefix="/service",
)
@router.get("/")
async def get_all_services(db: db_dependency):
# TODO: user_dependency
# TODO: request model
permission_models = db.query(Service).all()
# TODO: Response model
return permission_models
@router.post("/")
async def register_service(db: db_dependency, service_name: str):
# TODO: super_admin_dependency
# TODO: request model
key = generate_api_key()
service_model = Service(name=service_name, api_key=key)
db.add(service_model)
db.commit()
# TODO: response model
@router.patch("/{service_id}/key")
async def regenerate_api_key(db: db_dependency, service_id: int):
# TODO: super_admin_dependency
# TODO: request model
key = generate_api_key()
service_model = db.query(Service).filter(Service.id==service_id).first()
service_model.api_key = key
db.add(service_model)
db.commit()
# TODO: response model
@router.delete("/{service_id}")
async def remove_service(db: db_dependency, service_id: int):
# TODO: super_admin_dependency
# TODO: request model
service_model = db.query(Service).filter(Service.id==service_id).first()
if service_model is None:
return
db.delete(service_model)
db.commit()
# TODO: response model

7
src/service/schemas.py Normal file
View file

@ -0,0 +1,7 @@
"""
Pydantic models for <this module>
Models:
- List: Description
- Models: Description
"""

11
src/service/service.py Normal file
View file

@ -0,0 +1,11 @@
"""
Module specific business logic for <this module>
Classes:
- List: Description
- Classes: Description
Functions:
- List: Description
- Functions: Description
"""

16
src/service/utils.py Normal file
View file

@ -0,0 +1,16 @@
"""
Non-business logic reusable functions and classes for <this module>
Classes:
- List: Description
- Classes: Description
Functions:
- List: Description
- Functions: Description
"""
import uuid
def generate_api_key() -> str:
return str(uuid.uuid4())

View file

@ -4,9 +4,13 @@ Database models for user module
Models: Models:
- User - id[pk], email, first_name, last_name, oidc_id - User - id[pk], email, first_name, last_name, oidc_id
""" """
from collections import defaultdict
from sqlalchemy import Column, Integer, String from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship
from src.database import Base from src.database import Base
from src.iam.models import Group
class User(Base): class User(Base):
@ -17,3 +21,26 @@ class User(Base):
first_name = Column(String) first_name = Column(String)
last_name = Column(String) last_name = Column(String)
oidc_id = Column(String, index=True, unique=True) oidc_id = Column(String, index=True, unique=True)
organisation_rel = relationship(
"Organisation",
secondary="orgusers",
back_populates="user_rel"
)
@property
def organisations(self):
return [org.name for org in self.organisation_rel]
group_rel = relationship(
"Group",
secondary="user_groups",
back_populates="user_rel"
)
@property
def groups(self):
result = defaultdict(list)
for group in self.group_rel:
result[group.org_rel.name].append(group.name)
return dict(result)

View file

@ -15,15 +15,12 @@ from typing import Annotated
from fastapi import APIRouter from fastapi import APIRouter
from fastapi.params import Path from fastapi.params import Path
from sqlalchemy.sql import exists
from starlette import status from starlette import status
from src.user.models import User from src.user.models import User
from src.user.schemas import UserResponse, OrgResponse, OIDCClaims from src.user.schemas import UserResponse, OIDCClaims
from src.user.exceptions import UserNotFoundException from src.user.exceptions import UserNotFoundException
from src.organisation.models import OrgUsers, Organisation
from src.auth.service import claims_dependency from src.auth.service import claims_dependency
from src.database import db_dependency from src.database import db_dependency
@ -63,55 +60,6 @@ async def current_user(user: claims_dependency, db: db_dependency):
return user_model return user_model
@router.get("/self/orgs", response_model=list[OrgResponse], status_code=status.HTTP_200_OK, responses={
status.HTTP_404_NOT_FOUND: {"description": "User not found"},
status.HTTP_200_OK: {"description": "Successful retrieval from database"},
})
async def get_current_organisations(db: db_dependency, user: claims_dependency):
"""
Returns all organisations associated with the currently logged-in user.
"""
user_id = user.get("db_id", None)
if user_id is None:
raise UserNotFoundException()
user_exists = db.query(exists().where(User.id == user_id)).scalar()
if not user_exists:
raise UserNotFoundException(user_id=user_id)
org_user_models = (db.query(OrgUsers.org_id, OrgUsers.is_admin, Organisation.name)
.join(OrgUsers, Organisation.id == OrgUsers.org_id)
.filter(OrgUsers.user_id == user_id)
.all()
)
return org_user_models
@router.get("/self/orgs/admin", response_model=list[OrgResponse], status_code=status.HTTP_200_OK, responses={
status.HTTP_404_NOT_FOUND: {"description": "User not found"},
status.HTTP_200_OK: {"description": "Successful retrieval from database"},
})
async def get_current_admin_organisations(db: db_dependency, user: claims_dependency):
"""
Returns the organisations for which the currently logged-in user is an admin.
"""
user_id = user.get("db_id", None)
if user_id is None:
raise UserNotFoundException()
user_exists = db.query(exists().where(User.id == user_id)).scalar()
if not user_exists:
raise UserNotFoundException(user_id=user_id)
org_user_models = (db.query(OrgUsers.org_id, OrgUsers.is_admin, Organisation.name)
.join(OrgUsers, Organisation.id == OrgUsers.org_id)
.filter(OrgUsers.user_id == user_id)
.filter(OrgUsers.is_admin == True)
.all()
)
return org_user_models
@router.get("/{user_id}", response_model=UserResponse, status_code=status.HTTP_200_OK, responses={ @router.get("/{user_id}", response_model=UserResponse, status_code=status.HTTP_200_OK, responses={
status.HTTP_404_NOT_FOUND: {"description": "User not found"}, status.HTTP_404_NOT_FOUND: {"description": "User not found"},
status.HTTP_200_OK: {"description": "Successful retrieval from database"}, status.HTTP_200_OK: {"description": "Successful retrieval from database"},
@ -127,49 +75,6 @@ async def get_user_by_id(db: db_dependency, user_id: Annotated[int, Path(gt=0,de
return user_model return user_model
@router.get("/{user_id}/orgs", response_model=list[OrgResponse], status_code=status.HTTP_200_OK, responses={
status.HTTP_404_NOT_FOUND: {"description": "User not found"},
status.HTTP_200_OK: {"description": "Successful retrieval from database"},
})
async def get_organisations(db: db_dependency, user_id: Annotated[int, Path(gt=0,description="User database ID")]):
"""
Returns all organisations associated with the provided user ID.
"""
user_exists = db.query(exists().where(User.id == user_id)).scalar()
if not user_exists:
raise UserNotFoundException(user_id=user_id)
org_user_models = (db.query(OrgUsers.org_id, OrgUsers.is_admin, Organisation.name)
.join(OrgUsers, Organisation.id == OrgUsers.org_id)
.filter(OrgUsers.user_id == user_id)
.all()
)
return org_user_models
@router.get("/{user_id}/orgs/admin", response_model=list[OrgResponse], status_code=status.HTTP_200_OK, responses={
status.HTTP_404_NOT_FOUND: {"description": "User not found"},
status.HTTP_200_OK: {"description": "Successful retrieval from database"},
})
async def get_admin_organisations(db: db_dependency, user_id: Annotated[int, Path(gt=0,description="User database ID")]):
"""
Returns the organisations for which the user with the provided user ID is an admin.
"""
user_exists = db.query(exists().where(User.id == user_id)).scalar()
if not user_exists:
raise UserNotFoundException(user_id=user_id)
org_user_models = (db.query(OrgUsers.org_id, OrgUsers.is_admin, Organisation.name)
.join(OrgUsers, Organisation.id == OrgUsers.org_id)
.filter(OrgUsers.user_id == user_id)
.filter(OrgUsers.is_admin == True)
.all()
)
return org_user_models
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT, responses={ @router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT, responses={
status.HTTP_204_NO_CONTENT: {"description": "User deleted"}, status.HTTP_204_NO_CONTENT: {"description": "User deleted"},
status.HTTP_404_NOT_FOUND: {"description": "User not found"}, status.HTTP_404_NOT_FOUND: {"description": "User not found"},

View file

@ -5,6 +5,7 @@ Models:
- List: Description - List: Description
- Models: Description - Models: Description
""" """
from typing import Optional
from src.schemas import CustomBaseModel from src.schemas import CustomBaseModel
@ -44,9 +45,10 @@ class UserResponse(CustomBaseModel):
first_name: str first_name: str
last_name: str last_name: str
email: str email: str
organisations: list[Optional[str]]
groups: dict[str, list[str]]
class OrgResponse(CustomBaseModel): class OrgResponse(CustomBaseModel):
org_id: int org_id: int
name: str name: str
is_admin: bool