feat: all unique constraints tested

This commit is contained in:
Chris Milne 2026-06-08 16:05:20 +01:00
parent 52990aae13
commit e9b272811f
7 changed files with 28 additions and 11 deletions

View file

@ -33,8 +33,13 @@ class Permission(Base):
service_id = Column(Integer, ForeignKey("service.id", ondelete="CASCADE")) service_id = Column(Integer, ForeignKey("service.id", ondelete="CASCADE"))
UniqueConstraint( __table_args__ = (
"service_id", "resource", "action", name="uniq_permission_resource_and_action" UniqueConstraint(
"service_id",
"resource",
"action",
name="uniq_permission_resource_and_action",
),
) )
service_rel = relationship("Service", foreign_keys=[service_id]) service_rel = relationship("Service", foreign_keys=[service_id])

View file

@ -18,7 +18,6 @@ Endpoints:
from fastapi import APIRouter, status from fastapi import APIRouter, status
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from psycopg import errors
from src.service.exceptions import ServiceNotFoundException from src.service.exceptions import ServiceNotFoundException
from src.exceptions import ConflictException from src.exceptions import ConflictException
@ -264,12 +263,15 @@ async def create_new_permission(
if service_model is None: if service_model is None:
raise ServiceNotFoundException(service_id=request_model.service_id) raise ServiceNotFoundException(service_id=request_model.service_id)
perm_model = Perm(**request_model.__dict__) perm_model = Perm(**request_model.__dict__)
db.add(perm_model)
try: try:
db.add(perm_model) db.flush()
except IntegrityError as e: except IntegrityError as e:
if isinstance(e.orig, errors.UniqueViolation): if (
getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation
or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation
):
raise ConflictException(message="Permission already exists") raise ConflictException(message="Permission already exists")
db.flush()
response = { response = {
"service_name": perm_model.service_name, "service_name": perm_model.service_name,
"resource": perm_model.resource, "resource": perm_model.resource,

View file

@ -20,7 +20,6 @@ from typing import Annotated
from fastapi import APIRouter, status from fastapi import APIRouter, status
from fastapi.params import Query from fastapi.params import Query
from psycopg.errors import UniqueViolation
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from src.contact.schemas import ContactModel from src.contact.schemas import ContactModel
@ -143,7 +142,10 @@ async def create_org(
try: try:
db.flush() db.flush()
except IntegrityError as e: except IntegrityError as e:
if isinstance(e.orig, UniqueViolation): if (
getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation
or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation
):
raise ConflictException( raise ConflictException(
message="Organisation with this name already exists" message="Organisation with this name already exists"
) )

View file

@ -9,7 +9,6 @@ Endpoints:
""" """
from fastapi import APIRouter, status from fastapi import APIRouter, status
from psycopg.errors import UniqueViolation
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from src.exceptions import ConflictException from src.exceptions import ConflictException
@ -87,7 +86,10 @@ async def register_service(
try: try:
db.flush() db.flush()
except IntegrityError as e: except IntegrityError as e:
if isinstance(e.orig, UniqueViolation): if (
getattr(e.orig, "pgcode", None) == "23505" # Postgres unique violation
or "UNIQUE constraint failed" in str(e.orig) # SQLite unique violation
):
raise ConflictException(message="Service with this name already exists") raise ConflictException(message="Service with this name already exists")
response = ServiceWithKeySchema(**service_model.__dict__) response = ServiceWithKeySchema(**service_model.__dict__)
db.commit() db.commit()

View file

@ -549,6 +549,10 @@ async def test_post_perm_success(default_client: AsyncClient, db_session):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"body, expected_status", "body, expected_status",
[ [
(
{"service_id": 1, "resource": "test_resource", "action": "read"},
409,
),
# service_id tests # service_id tests
( (
{"service_id": 42, "resource": "test_resource", "action": "read"}, {"service_id": 42, "resource": "test_resource", "action": "read"},

View file

@ -49,6 +49,7 @@ async def test_post_org_success(default_client: AsyncClient):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"body, expected_status", "body, expected_status",
[ [
({"name": "Test Org"}, 409),
({"name": 42}, 422), ({"name": 42}, 422),
({}, 422), ({}, 422),
({"name": "New Test Org", "intake_questionnaire": {"question_one": 42}}, 422), ({"name": "New Test Org", "intake_questionnaire": {"question_one": 42}}, 422),

View file

@ -46,12 +46,13 @@ async def test_post_service_success(default_client: AsyncClient):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"body, expected_status", "body, expected_status",
[ [
({"name": "Test Service"}, 409),
({"name": 42}, 422), ({"name": 42}, 422),
({}, 422), ({}, 422),
], ],
) )
@pytest.mark.anyio @pytest.mark.anyio
async def test_post_services_status_checks( async def test_post_service_status_checks(
default_client: AsyncClient, body: dict[str, str], expected_status: int default_client: AsyncClient, body: dict[str, str], expected_status: int
): ):
resp = await default_client.post("/service/", json=body) resp = await default_client.post("/service/", json=body)