feat: user and org defaults
All checks were successful
ci / lint_and_test (push) Successful in 15s

Root and User defaults made more generic and merged.

Root user group assignment merged with org default perm assignment.

Root user granted all default org permissions at org creation.
This commit is contained in:
Chris Milne 2026-06-17 10:49:58 +01:00
parent 2c5edd1b0f
commit d5854cc2c4
4 changed files with 104 additions and 57 deletions

View file

@ -34,9 +34,8 @@ from src.contact.models import Contact
from src.contact.schemas import ContactAddress
from src.contact.exceptions import ContactNotFoundException
from src.database import db_dependency
from src.iam.service import assign_default_user_group, assign_default_root_group
from src.organisation.schemas_questionnaires import QuestionnaireQuestionsVersion0
from src.organisation.service import add_default_org_permissions
from src.organisation.service import assign_defaults
from src.user.dependencies import (
user_model_body_dependency,
user_model_claims_dependency,
@ -47,6 +46,7 @@ from src.auth.dependencies import (
org_model_root_claim_query_dependency,
org_model_root_claim_body_dependency,
)
from src.iam.models import Group
from src.organisation.dependencies import (
org_model_body_dependency,
@ -189,9 +189,10 @@ async def create_org(
org_model.user_rel.append(user_model)
org_model.root_user_rel = user_model
# Creates default user and default root IAM groups and assigns them
await assign_default_user_group(db, org_model, user_model)
await assign_default_root_group(db, org_model, user_model)
background_tasks.add_task(
assign_defaults, db, org_id=org_model.id, user_id=user_model.id
)
for contact_type in [
"billing_contact_id",
"security_contact_id",
@ -202,7 +203,6 @@ async def create_org(
db.flush()
org_model.__setattr__(contact_type, contact_model.id)
response = OrgPostOrgResponse(**org_model.__dict__)
background_tasks.add_task(add_default_org_permissions, db, org_model.id)
db.commit()
return response
@ -357,7 +357,14 @@ async def add_user_to_org(
raise ConflictException(message="User already a part of this organisation")
org_model.user_rel.append(user_model)
db.flush()
await assign_default_user_group(db=db, org_model=org_model, user_model=user_model)
group_model = (
db.query(Group)
.filter(Group.org_id == org_model.id)
.filter(Group.name == "Default Users")
.first()
)
if group_model is not None:
user_model.group_rel.append(group_model)
response = {
"organisation": org_model,
"users": [{"id": user.id, "email": user.email} for user in org_model.user_rel],

View file

@ -3,27 +3,20 @@ Reusable business logic functions for the organisation module
"""
from sqlalchemy.orm import Session
from typing import cast
from src.iam.service import assign_default_group
from src.organisation.models import Organisation as Org
from src.iam.models import Permission as Perm
from src.user.models import User
async def add_default_org_permissions(
db: Session,
org_id: int,
org_model: Org,
perm_list: list[int],
):
default_org_permissions = [
1, # test_service res_one read
2, # test_service res_one create
10, # tor-bridge-service collector read
13, # tor-bridge-service samples read
]
org_model = db.get(Org, org_id)
if org_model is None:
print("Org not found while adding defaults")
return
for permission in default_org_permissions:
for permission in perm_list:
perm_model = db.get(Perm, permission)
if perm_model is None:
@ -36,3 +29,43 @@ async def add_default_org_permissions(
db.flush()
db.commit()
async def assign_defaults(
db: Session,
org_id: int,
user_id: int,
):
default_org_permissions = []
default_user_permissions = []
org_model = db.get(Org, org_id)
if org_model is None:
print("Org not found while adding defaults")
return
user_model = db.get(User, user_id)
if user_model is None:
print("User not found while adding defaults")
return
org_model = cast(Org, org_model)
user_model = cast(User, user_model)
await add_default_org_permissions(db, org_model, default_org_permissions)
await assign_default_group(
db=db,
org_model=org_model,
user_model=user_model,
group_name="Default Users",
perm_list=default_user_permissions,
)
await assign_default_group(
db=db,
org_model=org_model,
user_model=user_model,
group_name="Root User",
perm_list=default_org_permissions,
)
db.commit()