Compare commits

...

3 commits

Author SHA1 Message Date
662b9c8e26 feat: permission permissions
All checks were successful
ci / lint_and_test (push) Successful in 16s
Orgs can only grant permissions to groups that they themselves have been granted access to.

Super admin bypasses not added, flagged as todos.
2026-06-16 13:51:31 +01:00
0a867c9c90 minor: relationship key definitions to strings
Using the objects directly was causing type checking issues. Strings are equivalent so no functional change.
2026-06-16 11:19:22 +01:00
3e4f68dd9b fix: unique violations
Directly using Psycopg error instead of the error code.

Also, raise all other IntegrityErrors instead of silently dropping them.
2026-06-15 14:38:14 +01:00
8 changed files with 94 additions and 14 deletions

View file

@ -0,0 +1,38 @@
"""org perm perms join table
Revision ID: 85edbf9a176c
Revises: 98e20aae555c
Create Date: 2026-06-16 13:31:57.427953
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '85edbf9a176c'
down_revision: Union[str, Sequence[str], None] = '98e20aae555c'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('org_permissions',
sa.Column('org_id', sa.Integer(), nullable=False),
sa.Column('permission_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['org_id'], ['organisation.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['permission_id'], ['permission.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('org_id', 'permission_id')
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('org_permissions')
# ### end Alembic commands ###

View file

@ -42,7 +42,7 @@ class Permission(Base):
), ),
) )
service_rel = relationship("Service", foreign_keys=[service_id]) service_rel = relationship("Service", foreign_keys="Permission.service_id")
@property @property
def service_name(self): def service_name(self):
@ -52,6 +52,10 @@ class Permission(Base):
"Group", secondary="group_permissions", back_populates="permission_rel" "Group", secondary="group_permissions", back_populates="permission_rel"
) )
org_rel = relationship(
"Organisation", secondary="org_permissions", back_populates="permission_rel"
)
class Group(Base): class Group(Base):
__tablename__ = "group" __tablename__ = "group"
@ -95,3 +99,13 @@ class UserGroups(Base):
group_id = Column( group_id = Column(
Integer, ForeignKey("group.id", ondelete="CASCADE"), primary_key=True Integer, ForeignKey("group.id", ondelete="CASCADE"), primary_key=True
) )
class OrgPermissions(Base):
__tablename__ = "org_permissions"
org_id = Column(
Integer, ForeignKey("organisation.id", ondelete="CASCADE"), primary_key=True
)
permission_id = Column(
Integer, ForeignKey("permission.id", ondelete="CASCADE"), primary_key=True
)

View file

@ -20,6 +20,7 @@ Endpoints:
from fastapi import APIRouter, status, BackgroundTasks from fastapi import APIRouter, status, BackgroundTasks
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from psycopg.errors import UniqueViolation
from src.iam.exceptions import GroupNotFoundException from src.iam.exceptions import GroupNotFoundException
from src.organisation.exceptions import OrgNotFoundException from src.organisation.exceptions import OrgNotFoundException
@ -283,10 +284,11 @@ async def create_group(
db.flush() db.flush()
except IntegrityError as e: except IntegrityError as e:
if ( if (
getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation isinstance(e.orig, UniqueViolation) # Postgres unique violation
or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation
): ):
raise ConflictException("Group with this name already exists") raise ConflictException("Group with this name already exists")
raise
group_response = GroupSummary(**group_model.__dict__) group_response = GroupSummary(**group_model.__dict__)
org_response = OrgSummary(**org_model.__dict__) org_response = OrgSummary(**org_model.__dict__)
db.commit() db.commit()
@ -323,6 +325,9 @@ async def add_group_permission(
if perm_model in group_model.permission_rel: if perm_model in group_model.permission_rel:
raise ConflictException("Group already has this permission") raise ConflictException("Group already has this permission")
if perm_model not in org_model.permission_rel: # TODO: and not su
raise ForbiddenException("You cannot grant this permission")
group_model.permission_rel.append(perm_model) group_model.permission_rel.append(perm_model)
db.flush() db.flush()
@ -469,8 +474,10 @@ async def get_permissions(
""" """
Returns a full list of permissions. Returns a full list of permissions.
""" """
permission_models = db.query(Perm).all() # TODO: if su:
# permission_models = db.query(Perm).all()
# else
permission_models = db.query(Perm).filter(Perm.org_rel.any(id=org_model.id)).all()
return {"permissions": permission_models} return {"permissions": permission_models}
@ -500,10 +507,11 @@ async def create_new_permission(
db.flush() db.flush()
except IntegrityError as e: except IntegrityError as e:
if ( if (
getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation isinstance(e.orig, UniqueViolation) # Postgres unique violation
or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation
): ):
raise ConflictException(message="Permission already exists") raise ConflictException(message="Permission already exists")
raise
response = { response = {
"id": perm_model.id, "id": perm_model.id,
"service_name": perm_model.service_name, "service_name": perm_model.service_name,
@ -563,6 +571,9 @@ async def post_permissions(
if not (request_model.action is None or request_model.action == ""): if not (request_model.action is None or request_model.action == ""):
permission_query = permission_query.filter(Perm.action == request_model.action) permission_query = permission_query.filter(Perm.action == request_model.action)
# TODO: if not su:
permission_query = permission_query.filter(Perm.org_rel.any(id=org_model.id))
permission_models = permission_query.all() permission_models = permission_query.all()
return {"permissions": permission_models} return {"permissions": permission_models}

View file

@ -39,15 +39,25 @@ class Organisation(Base):
) )
group_rel = relationship("Group", back_populates="org_rel") group_rel = relationship("Group", back_populates="org_rel")
root_user_rel = relationship("User", foreign_keys=[root_user_id]) root_user_rel = relationship("User", foreign_keys="Organisation.root_user_id")
@property @property
def root_user_email(self): def root_user_email(self):
return self.root_user_rel.email if self.root_user_rel else None return self.root_user_rel.email if self.root_user_rel else None
billing_contact_rel = relationship("Contact", foreign_keys=[billing_contact_id]) billing_contact_rel = relationship(
security_contact_rel = relationship("Contact", foreign_keys=[security_contact_id]) "Contact", foreign_keys="Organisation.billing_contact_id"
owner_contact_rel = relationship("Contact", foreign_keys=[owner_contact_id]) )
security_contact_rel = relationship(
"Contact", foreign_keys="Organisation.security_contact_id"
)
owner_contact_rel = relationship(
"Contact", foreign_keys="Organisation.owner_contact_id"
)
permission_rel = relationship(
"Permission", secondary="org_permissions", back_populates="org_rel"
)
class OrgUsers(Base): class OrgUsers(Base):

View file

@ -18,10 +18,11 @@ Endpoints:
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Annotated from typing import Annotated
from sqlalchemy.exc import IntegrityError
from psycopg.errors import UniqueViolation
from fastapi import APIRouter, status from fastapi import APIRouter, status
from fastapi.params import Query from fastapi.params import Query
from sqlalchemy.exc import IntegrityError
from src.contact.schemas import ContactModel from src.contact.schemas import ContactModel
from src.exceptions import ( from src.exceptions import (
@ -175,12 +176,13 @@ async def create_org(
db.flush() db.flush()
except IntegrityError as e: except IntegrityError as e:
if ( if (
getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation isinstance(e.orig, UniqueViolation) # Postgres unique violation
or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation
): ):
raise ConflictException( raise ConflictException(
message="Organisation with this name already exists" message="Organisation with this name already exists"
) )
raise
# Adds currently logged-in user to org users list and sets them as root_user # Adds currently logged-in user to org users list and sets them as root_user
org_model.user_rel.append(user_model) org_model.user_rel.append(user_model)
org_model.root_user_rel = user_model org_model.root_user_rel = user_model

View file

@ -10,6 +10,7 @@ Endpoints:
from fastapi import APIRouter, status from fastapi import APIRouter, status
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from psycopg.errors import UniqueViolation
from src.exceptions import ConflictException from src.exceptions import ConflictException
from src.database import db_dependency from src.database import db_dependency
@ -110,10 +111,11 @@ async def register_service(
db.flush() db.flush()
except IntegrityError as e: except IntegrityError as e:
if ( if (
getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation isinstance(e.orig, UniqueViolation) # Postgres unique violation
or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation
): ):
raise ConflictException(message="Service with this name already exists") raise ConflictException(message="Service with this name already exists")
raise
response = ServiceWithKeySchema(**service_model.__dict__) response = ServiceWithKeySchema(**service_model.__dict__)
db.commit() db.commit()
return {"service": response} return {"service": response}

View file

@ -10,7 +10,7 @@ from src.user.models import User
from src.service.models import Service from src.service.models import Service
from src.organisation.models import Organisation as Org, OrgUsers from src.organisation.models import Organisation as Org, OrgUsers
from src.contact.models import Contact from src.contact.models import Contact
from src.iam.models import Group, Permission from src.iam.models import Group, Permission, OrgPermissions
from src.auth.service import get_current_user, get_dev_user from src.auth.service import get_current_user, get_dev_user
from src.auth.dependencies import empty_su_list, get_super_admin_list, testing_su_list from src.auth.dependencies import empty_su_list, get_super_admin_list, testing_su_list
from src.main import app # inited FastAPI app from src.main import app # inited FastAPI app
@ -163,6 +163,9 @@ def _seed(db):
db.add(Service(name="Test Service", api_key="123456789")) db.add(Service(name="Test Service", api_key="123456789"))
db.add(Permission(service_id=1, resource="test_resource", action="read")) db.add(Permission(service_id=1, resource="test_resource", action="read"))
db.add(Permission(service_id=1, resource="test_resource", action="move")) db.add(Permission(service_id=1, resource="test_resource", action="move"))
db.add(Permission(service_id=1, resource="test_resource", action="delete"))
db.add(OrgPermissions(org_id=1, permission_id=1))
db.add(OrgPermissions(org_id=1, permission_id=2))
db.add(Group(name="Org One Group", org_id=1)) db.add(Group(name="Org One Group", org_id=1))
db.add(Group(name="Org Two Group", org_id=2)) db.add(Group(name="Org Two Group", org_id=2))
db.add(Group(name="Org One Group Two", org_id=1)) db.add(Group(name="Org One Group Two", org_id=1))

View file

@ -437,7 +437,7 @@ async def test_post_perm_success(default_client: AsyncClient):
assert "permission" in data assert "permission" in data
assert isinstance(data["permission"], dict) assert isinstance(data["permission"], dict)
assert data["permission"]["id"] == 3 assert data["permission"]["id"] == 4
assert data["permission"]["service_name"] == "Test Service" assert data["permission"]["service_name"] == "Test Service"
assert data["permission"]["resource"] == "test_resource" assert data["permission"]["resource"] == "test_resource"
assert data["permission"]["action"] == "create" assert data["permission"]["action"] == "create"