Compare commits

...

3 commits

Author SHA1 Message Date
662b9c8e26 feat: permission permissions
All checks were successful
ci / lint_and_test (push) Successful in 16s
Orgs can only grant permissions to groups that they themselves have been granted access to.

Super admin bypasses not added, flagged as todos.
2026-06-16 13:51:31 +01:00
0a867c9c90 minor: relationship key definitions to strings
Using the objects directly was causing type checking issues. Strings are equivalent so no functional change.
2026-06-16 11:19:22 +01:00
3e4f68dd9b fix: unique violations
Directly using Psycopg error instead of the error code.

Also, raise all other IntegrityErrors instead of silently dropping them.
2026-06-15 14:38:14 +01:00
8 changed files with 94 additions and 14 deletions

View file

@ -0,0 +1,38 @@
"""org perm perms join table
Revision ID: 85edbf9a176c
Revises: 98e20aae555c
Create Date: 2026-06-16 13:31:57.427953
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '85edbf9a176c'
down_revision: Union[str, Sequence[str], None] = '98e20aae555c'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('org_permissions',
sa.Column('org_id', sa.Integer(), nullable=False),
sa.Column('permission_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['org_id'], ['organisation.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['permission_id'], ['permission.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('org_id', 'permission_id')
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('org_permissions')
# ### end Alembic commands ###

View file

@ -42,7 +42,7 @@ class Permission(Base):
),
)
service_rel = relationship("Service", foreign_keys=[service_id])
service_rel = relationship("Service", foreign_keys="Permission.service_id")
@property
def service_name(self):
@ -52,6 +52,10 @@ class Permission(Base):
"Group", secondary="group_permissions", back_populates="permission_rel"
)
org_rel = relationship(
"Organisation", secondary="org_permissions", back_populates="permission_rel"
)
class Group(Base):
__tablename__ = "group"
@ -95,3 +99,13 @@ class UserGroups(Base):
group_id = Column(
Integer, ForeignKey("group.id", ondelete="CASCADE"), primary_key=True
)
class OrgPermissions(Base):
__tablename__ = "org_permissions"
org_id = Column(
Integer, ForeignKey("organisation.id", ondelete="CASCADE"), primary_key=True
)
permission_id = Column(
Integer, ForeignKey("permission.id", ondelete="CASCADE"), primary_key=True
)

View file

@ -20,6 +20,7 @@ Endpoints:
from fastapi import APIRouter, status, BackgroundTasks
from sqlalchemy.exc import IntegrityError
from psycopg.errors import UniqueViolation
from src.iam.exceptions import GroupNotFoundException
from src.organisation.exceptions import OrgNotFoundException
@ -283,10 +284,11 @@ async def create_group(
db.flush()
except IntegrityError as e:
if (
getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation
isinstance(e.orig, UniqueViolation) # Postgres unique violation
or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation
):
raise ConflictException("Group with this name already exists")
raise
group_response = GroupSummary(**group_model.__dict__)
org_response = OrgSummary(**org_model.__dict__)
db.commit()
@ -323,6 +325,9 @@ async def add_group_permission(
if perm_model in group_model.permission_rel:
raise ConflictException("Group already has this permission")
if perm_model not in org_model.permission_rel: # TODO: and not su
raise ForbiddenException("You cannot grant this permission")
group_model.permission_rel.append(perm_model)
db.flush()
@ -469,8 +474,10 @@ async def get_permissions(
"""
Returns a full list of permissions.
"""
permission_models = db.query(Perm).all()
# TODO: if su:
# permission_models = db.query(Perm).all()
# else
permission_models = db.query(Perm).filter(Perm.org_rel.any(id=org_model.id)).all()
return {"permissions": permission_models}
@ -500,10 +507,11 @@ async def create_new_permission(
db.flush()
except IntegrityError as e:
if (
getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation
isinstance(e.orig, UniqueViolation) # Postgres unique violation
or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation
):
raise ConflictException(message="Permission already exists")
raise
response = {
"id": perm_model.id,
"service_name": perm_model.service_name,
@ -563,6 +571,9 @@ async def post_permissions(
if not (request_model.action is None or request_model.action == ""):
permission_query = permission_query.filter(Perm.action == request_model.action)
# TODO: if not su:
permission_query = permission_query.filter(Perm.org_rel.any(id=org_model.id))
permission_models = permission_query.all()
return {"permissions": permission_models}

View file

@ -39,15 +39,25 @@ class Organisation(Base):
)
group_rel = relationship("Group", back_populates="org_rel")
root_user_rel = relationship("User", foreign_keys=[root_user_id])
root_user_rel = relationship("User", foreign_keys="Organisation.root_user_id")
@property
def root_user_email(self):
return self.root_user_rel.email if self.root_user_rel else None
billing_contact_rel = relationship("Contact", foreign_keys=[billing_contact_id])
security_contact_rel = relationship("Contact", foreign_keys=[security_contact_id])
owner_contact_rel = relationship("Contact", foreign_keys=[owner_contact_id])
billing_contact_rel = relationship(
"Contact", foreign_keys="Organisation.billing_contact_id"
)
security_contact_rel = relationship(
"Contact", foreign_keys="Organisation.security_contact_id"
)
owner_contact_rel = relationship(
"Contact", foreign_keys="Organisation.owner_contact_id"
)
permission_rel = relationship(
"Permission", secondary="org_permissions", back_populates="org_rel"
)
class OrgUsers(Base):

View file

@ -18,10 +18,11 @@ Endpoints:
from datetime import datetime, timezone
from typing import Annotated
from sqlalchemy.exc import IntegrityError
from psycopg.errors import UniqueViolation
from fastapi import APIRouter, status
from fastapi.params import Query
from sqlalchemy.exc import IntegrityError
from src.contact.schemas import ContactModel
from src.exceptions import (
@ -175,12 +176,13 @@ async def create_org(
db.flush()
except IntegrityError as e:
if (
getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation
isinstance(e.orig, UniqueViolation) # Postgres unique violation
or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation
):
raise ConflictException(
message="Organisation with this name already exists"
)
raise
# Adds currently logged-in user to org users list and sets them as root_user
org_model.user_rel.append(user_model)
org_model.root_user_rel = user_model

View file

@ -10,6 +10,7 @@ Endpoints:
from fastapi import APIRouter, status
from sqlalchemy.exc import IntegrityError
from psycopg.errors import UniqueViolation
from src.exceptions import ConflictException
from src.database import db_dependency
@ -110,10 +111,11 @@ async def register_service(
db.flush()
except IntegrityError as e:
if (
getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation
isinstance(e.orig, UniqueViolation) # Postgres unique violation
or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation
):
raise ConflictException(message="Service with this name already exists")
raise
response = ServiceWithKeySchema(**service_model.__dict__)
db.commit()
return {"service": response}

View file

@ -10,7 +10,7 @@ from src.user.models import User
from src.service.models import Service
from src.organisation.models import Organisation as Org, OrgUsers
from src.contact.models import Contact
from src.iam.models import Group, Permission
from src.iam.models import Group, Permission, OrgPermissions
from src.auth.service import get_current_user, get_dev_user
from src.auth.dependencies import empty_su_list, get_super_admin_list, testing_su_list
from src.main import app # inited FastAPI app
@ -163,6 +163,9 @@ def _seed(db):
db.add(Service(name="Test Service", api_key="123456789"))
db.add(Permission(service_id=1, resource="test_resource", action="read"))
db.add(Permission(service_id=1, resource="test_resource", action="move"))
db.add(Permission(service_id=1, resource="test_resource", action="delete"))
db.add(OrgPermissions(org_id=1, permission_id=1))
db.add(OrgPermissions(org_id=1, permission_id=2))
db.add(Group(name="Org One Group", org_id=1))
db.add(Group(name="Org Two Group", org_id=2))
db.add(Group(name="Org One Group Two", org_id=1))

View file

@ -437,7 +437,7 @@ async def test_post_perm_success(default_client: AsyncClient):
assert "permission" in data
assert isinstance(data["permission"], dict)
assert data["permission"]["id"] == 3
assert data["permission"]["id"] == 4
assert data["permission"]["service_name"] == "Test Service"
assert data["permission"]["resource"] == "test_resource"
assert data["permission"]["action"] == "create"