feat: contact model restructure

Blank contacts are now generated on org creation and assigned to each contact type. These contacts are linked to the org, only accessible to the org, and removed when the org is removed.

With this all contact endpoints have been removed. Contact manipulation is done via the org only.
This commit is contained in:
Chris Milne 2026-05-25 15:15:50 +01:00
parent 707482adc2
commit 2b6d923ae1
7 changed files with 146 additions and 165 deletions

View file

@ -5,7 +5,7 @@ Models:
- Contact: id[pk], email, first_name, last_name, phonenumber, vat_number
street_address, post_office_box_number, address_locality, country_code, address_region, postal_code
"""
from sqlalchemy import Column, Integer, String
from sqlalchemy import Column, Integer, String, ForeignKey
from src.database import Base
@ -27,3 +27,5 @@ class Contact(Base):
country_code = Column(String) # Eg GB
address_region = Column(String, default=None, nullable=True)
postal_code = Column(String)
org_id = Column(Integer, ForeignKey("organisation.id", ondelete="CASCADE"), nullable=False)

View file

@ -9,110 +9,10 @@ Endpoints:
- [patch]/{contact_id} - Updates the details of an existing contact
- [delete]/{contact_id} - Deletes a contact by ID
"""
from typing import Annotated
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from fastapi.params import Path
from sqlalchemy import or_
from src.contact.schemas import ContactContactGetResponse, ContactAddressGetResponse, ContactContactPostRequest, \
ContactUpdateRequest, ContactOrgGetResponse
from src.contact.models import Contact
from src.database import db_dependency
from src.organisation.models import Organisation as Org
from src.organisation.constants import ContactType
router = APIRouter(
prefix="/contact",
tags=["contact"],
)
@router.get("/{contact_id}", response_model=ContactContactGetResponse)
async def get_contact_details_by_id(contact_id: int, db: db_dependency):
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
return contact_model
@router.get("/{contact_id}/address", response_model=ContactAddressGetResponse)
async def get_contact_address_by_id(contact_id: int, db: db_dependency):
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
return contact_model
@router.post("/")
async def create_contact(db: db_dependency, contact_request: ContactContactPostRequest):
contact_model = Contact(**contact_request.model_dump())
db.add(contact_model)
db.commit()
@router.patch("/{contact_id}")
async def update_contact(db: db_dependency, contact_request: ContactUpdateRequest, contact_id: Annotated[int, Path(gt=0)]):
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
update_data = contact_request.model_dump(exclude_none=True)
for key, value in update_data.items():
if hasattr(contact_model, key):
setattr(contact_model, key, value)
else:
raise HTTPException(status_code=422, detail="Invalid keys in update request")
db.add(contact_model)
db.commit()
@router.delete("/{contact_id}")
async def delete_contact(db: db_dependency, contact_id: Annotated[int, Path(gt=0)]):
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
db.delete(contact_model)
db.commit()
@router.get("/{contact_id}/orgs", response_model=list[ContactOrgGetResponse])
async def get_contact_orgs(db: db_dependency, contact_id: Annotated[int, Path(gt=0)]):
contact_model = (db.query(Contact).filter(Contact.id == contact_id).first())
if contact_model is None:
raise HTTPException(status_code=404, detail="Contact not found")
org_models = (db.query(Org).filter(
or_(
Org.owner_contact_id == contact_id,
Org.billing_contact_id == contact_id,
Org.security_contact_id == contact_id
)
).all())
response = []
for org in org_models:
types=[]
if org.owner_contact_id == contact_id:
types.append(ContactType.OWNER)
if org.billing_contact_id == contact_id:
types.append(ContactType.BILLING)
if org.security_contact_id == contact_id:
types.append(ContactType.SECURITY)
org_response_model = ContactOrgGetResponse(
name=str(org.name),
contact_types=types,
)
response.append(org_response_model)
return response
)

View file

@ -7,12 +7,24 @@ Models:
"""
from typing import Optional
from pydantic import EmailStr
from pydantic import EmailStr, ConfigDict
from src.organisation.constants import ContactType
from src.schemas import CustomBaseModel
class ContactAddress(CustomBaseModel):
model_config = ConfigDict(from_attributes=True, extra="ignore")
post_office_box_number: Optional[str] = None
street_address: Optional[str] = None
street_address_line_2: Optional[str] = None
locality: Optional[str] = None
address_region: Optional[str] = None
country_code: Optional[str] = None
postal_code: Optional[str] = None
class ContactContactGetResponse(CustomBaseModel):
email: str
first_name: str