""" Router endpoints for contact module Endpoints: - [get]/{contact_id} - Returns non-address type details for contact - [get]/{contact_id}/address - Returns address details for contact - [get]/{contact_id}/orgs - Returns a list of orgs which the contact is assigned to, and what they are assigned as - [post]/ - Creates a new contact - [patch]/{contact_id} - Updates the details of an existing contact - [delete]/{contact_id} - Deletes a contact by ID """ from typing import Annotated from fastapi import APIRouter, HTTPException from fastapi.params import Path from sqlalchemy import or_ from src.contact.schemas import ContactContactGetResponse, ContactAddressGetResponse, ContactContactPostRequest, \ ContactUpdateRequest, ContactOrgGetResponse from src.contact.models import Contact from src.database import db_dependency from src.organisation.models import Organisation as Org from src.organisation.constants import ContactType router = APIRouter( prefix="/contact", tags=["contact"], ) @router.get("/{contact_id}", response_model=ContactContactGetResponse) async def get_contact_details_by_id(contact_id: int, db: db_dependency): contact_model = (db.query(Contact).filter(Contact.id == contact_id).first()) if contact_model is None: raise HTTPException(status_code=404, detail="Contact not found") return contact_model @router.get("/{contact_id}/address", response_model=ContactAddressGetResponse) async def get_contact_address_by_id(contact_id: int, db: db_dependency): contact_model = (db.query(Contact).filter(Contact.id == contact_id).first()) if contact_model is None: raise HTTPException(status_code=404, detail="Contact not found") return contact_model @router.post("/") async def create_contact(db: db_dependency, contact_request: ContactContactPostRequest): contact_model = Contact(**contact_request.model_dump()) db.add(contact_model) db.commit() @router.patch("/{contact_id}") async def update_contact(db: db_dependency, contact_request: ContactUpdateRequest, contact_id: Annotated[int, Path(gt=0)]): contact_model = (db.query(Contact).filter(Contact.id == contact_id).first()) if contact_model is None: raise HTTPException(status_code=404, detail="Contact not found") update_data = contact_request.model_dump(exclude_none=True) for key, value in update_data.items(): if hasattr(contact_model, key): setattr(contact_model, key, value) else: raise HTTPException(status_code=422, detail="Invalid keys in update request") db.add(contact_model) db.commit() @router.delete("/{contact_id}") async def delete_contact(db: db_dependency, contact_id: Annotated[int, Path(gt=0)]): contact_model = (db.query(Contact).filter(Contact.id == contact_id).first()) if contact_model is None: raise HTTPException(status_code=404, detail="Contact not found") db.delete(contact_model) db.commit() @router.get("/{contact_id}/orgs", response_model=list[ContactOrgGetResponse]) async def get_contact_orgs(db: db_dependency, contact_id: Annotated[int, Path(gt=0)]): contact_model = (db.query(Contact).filter(Contact.id == contact_id).first()) if contact_model is None: raise HTTPException(status_code=404, detail="Contact not found") org_models = (db.query(Org).filter( or_( Org.owner_contact_id == contact_id, Org.billing_contact_id == contact_id, Org.security_contact_id == contact_id ) ).all()) response = [] for org in org_models: types=[] if org.owner_contact_id == contact_id: types.append(ContactType.OWNER) if org.billing_contact_id == contact_id: types.append(ContactType.BILLING) if org.security_contact_id == contact_id: types.append(ContactType.SECURITY) org_response_model = ContactOrgGetResponse( name=str(org.name), contact_types=types, ) response.append(org_response_model) return response