Domain Services
This document describes the business logic layer organized by domain. Each domain handles a specific area of functionality with its own routers, services, schemas, and dependencies.
Architecture Pattern
Each domain follows a consistent structure:
domains/{domain}/
├── router.py # FastAPI route handlers
├── service.py # Business logic
├── schemas.py # Pydantic models for requests/responses
├── dependencies.py # FastAPI dependency injection
└── models.py # Domain-specific SQLAlchemy models (optional)
Separation of concerns:
- Routers handle HTTP requests and responses
- Services contain all business logic and database operations
- Schemas define request/response formats and validation
- Dependencies provide reusable parameter extraction and validation
Authentication Domain
Handles user registration, login, and authentication.
Location: rego/domains/auth/
Key Components
Manager: Configured fastapi-users instance
# rego/domains/auth/manager.py
fastapi_users = FastAPIUsers[User, UUID](
get_user_manager,
[auth_backend],
)
Backends: JWT authentication with Bearer token and cookie support
# rego/domains/auth/backends.py
bearer_transport = BearerTransport(tokenUrl="auth/login")
cookie_transport = CookieTransport(cookie_name="rego_auth")
auth_backend = AuthenticationBackend(
name="jwt",
transport=bearer_transport,
get_strategy=get_jwt_strategy,
)
Dependencies: Current user extraction
# rego/domains/auth/dependencies.py
current_user = fastapi_users.current_user(active=True)
current_superuser = fastapi_users.current_user(active=True, superuser=True)
Endpoints
POST /auth/register- Create new user accountPOST /auth/login- Authenticate and get tokenPOST /auth/logout- End sessionGET /auth/me- Get current user profilePATCH /auth/me- Update profilePOST /auth/change-password- Change password
Users Domain
Manages user profiles and user-related operations.
Location: rego/domains/users/
Service: UserService
Handles user data operations beyond authentication.
Key methods:
async def get_by_id(user_id: UUID) -> User
async def get_by_ids(user_ids: list[UUID]) -> list[User]
async def search(query: str) -> list[User]
Dependencies
async def get_user_by_id(
user_id: UUID,
session: AsyncSession = Depends(get_async_session)
) -> User:
"""Fetch user by ID, raise 404 if not found"""
user = await session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
Endpoints
GET /users/{user_id}- Get user profileGET /users- Search users (by username or email)
Boards Domain
Manages board creation, membership, and board-level operations.
Location: rego/domains/boards/
Service: BoardService
Relationship loading:
Key methods:
async def create(
payload: BoardCreate,
current_user: User
) -> Board:
"""Create board and add creator as owner"""
board = Board(title=payload.title, description=payload.description)
board.created_by = current_user.id
# Add creator as owner
member = BoardMember(
board_id=board.id,
user_id=current_user.id,
role=MemberRole.owner
)
session.add(board)
session.add(member)
await session.commit()
await self._refresh(board)
return board
async def update(
board: Board,
payload: BoardUpdate,
current_user: User
) -> Board:
"""Update board details"""
await super().update(board, payload, update={"updated_by": current_user.id})
# Publish event
await self.publish_board_event(
event_type=EventType.BOARD_UPDATED,
board_id=board.id,
data=BoardPublic.model_validate(board),
user_id=current_user.id,
)
return board
async def delete(board: Board, current_user: User) -> None:
"""Delete board and cascade to all related data"""
await self.publish_board_event(
event_type=EventType.BOARD_DELETED,
board_id=board.id,
data={"id": str(board.id)},
user_id=current_user.id,
)
await super().delete(board)
async def invite_user(
board: Board,
payload: BoardInvite
) -> BoardInviteResponse:
"""Add user to board as member"""
user = await session.get(User, payload.user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Check if already member
existing = await session.execute(
select(BoardMember).where(
BoardMember.board_id == board.id,
BoardMember.user_id == user.id
)
)
if existing.scalars().first():
raise HTTPException(status_code=409, detail="User already member")
# Add as member
member = BoardMember(
board_id=board.id,
user_id=user.id,
role=MemberRole.member
)
session.add(member)
await session.commit()
return BoardInviteResponse(user_id=user.id, role=MemberRole.member)
Dependencies
async def get_board_by_id(board_id: UUID) -> Board:
"""Fetch board by ID without relationships"""
async def get_board_by_id_full(board_id: UUID) -> Board:
"""Fetch board with all relationships loaded"""
result = await session.execute(
select(Board)
.where(Board.id == board_id)
.options(
selectinload(Board.columns).selectinload(Column.cards),
selectinload(Board.labels),
selectinload(Board.members),
)
)
Endpoints
POST /boards- Create new boardGET /boards- List user's boardsGET /boards/{board_id}- Get board with full dataPATCH /boards/{board_id}- Update board (owners only)DELETE /boards/{board_id}- Delete board (owners only)POST /boards/{board_id}/invite- Add member (owners only)GET /boards/{board_id}/labels- Get board labelsGET /boards/{board_id}/labels/stats- Get label usage statistics
Columns Domain
Manages columns (lists) within boards.
Location: rego/domains/columns/
Service: ColumnService
Relationship loading:
Key methods:
async def create(
payload: ColumnCreate,
board: Board,
current_user: User
) -> Column:
"""Create column at specified position"""
# Calculate rank
if payload.position is not None:
rank = await self._calculate_rank(board.id, payload.position)
else:
rank = await self._get_next_rank(board.id)
column = Column(
board_id=board.id,
title=payload.title,
rank=rank,
color=payload.color,
is_done_column=payload.is_done_column,
)
column.created_by = current_user.id
session.add(column)
await session.commit()
await self._refresh(column)
# Publish event
await self.publish_board_event(
event_type=EventType.COLUMN_CREATED,
board_id=board.id,
data=ColumnPublic.model_validate(column),
user_id=current_user.id,
)
return column
async def move(
column: Column,
position: int,
current_user: User
) -> Column:
"""Move column to new position"""
try:
new_rank = await self._calculate_rank(column.board_id, position)
column.rank = new_rank
await session.commit()
except RebalanceNeeded:
await self._rebalance_columns(column.board_id)
new_rank = await self._calculate_rank(column.board_id, position)
column.rank = new_rank
await session.commit()
await self.publish_board_event(
event_type=EventType.COLUMN_MOVED,
board_id=column.board_id,
data=ColumnPublic.model_validate(column),
user_id=current_user.id,
)
return column
Endpoints
POST /boards/{board_id}/columns- Create columnPATCH /columns/{column_id}- Update columnDELETE /columns/{column_id}- Delete columnPOST /columns/{column_id}/move- Move to new position
Cards Domain
Manages cards (tasks) within columns.
Location: rego/domains/cards/
Service: CardService
Relationship loading:
class CardService(BaseService[Card]):
_refresh_relationships = ["labels", "assignees", "checklists", "attachments"]
Key methods:
async def create(
payload: CardCreate,
column: Column,
current_user: User
) -> Card:
"""Create card in column"""
# Calculate rank
if payload.position is not None:
rank = await self._calculate_rank(column.id, payload.position)
else:
rank = await self._get_next_rank(column.id)
card = Card(
column_id=column.id,
title=payload.title,
description=payload.description,
rank=rank,
)
card.created_by = current_user.id
session.add(card)
await session.commit()
await self._refresh(card)
# Publish event
await self.publish_board_event(
event_type=EventType.CARD_CREATED,
board_id=column.board_id,
data=CardPublic.model_validate(card),
user_id=current_user.id,
)
# Check automations
await automation_service.execute_automations(
board_id=column.board_id,
trigger_type=TriggerType.CARD_CREATED,
context={"card_id": card.id, "column_id": column.id}
)
return card
async def move(
card: Card,
payload: CardMove,
current_user: User
) -> Card:
"""Move card to different column/position"""
old_column_id = card.column_id
# Update column if changed
if payload.column_id and payload.column_id != card.column_id:
card.column_id = payload.column_id
# Calculate new rank
try:
new_rank = await self._calculate_rank(
card.column_id,
payload.position
)
card.rank = new_rank
except RebalanceNeeded:
await self._rebalance_cards(card.column_id)
new_rank = await self._calculate_rank(card.column_id, payload.position)
card.rank = new_rank
await session.commit()
await self._refresh(card)
# Publish event
await self.publish_board_event(
event_type=EventType.CARD_MOVED,
board_id=card.column.board_id,
data=CardPublic.model_validate(card),
user_id=current_user.id,
)
# Check automations if column changed
if old_column_id != card.column_id:
await automation_service.execute_automations(
board_id=card.column.board_id,
trigger_type=TriggerType.CARD_MOVED,
context={"card_id": card.id, "column_id": card.column_id}
)
return card
async def assign_users(
card: Card,
user_ids: list[UUID],
current_user: User
) -> Card:
"""Assign users to card"""
# Users are already validated by dependency
users = await get_users_by_ids(user_ids)
# Add users not already assigned
for user in users:
if user not in card.assignees:
card.assignees.append(user)
await session.commit()
await self._refresh(card)
# Publish event for each new assignment
for user in users:
await self.publish_board_event(
event_type=EventType.CARD_ASSIGNED,
board_id=card.column.board_id,
data=CardPublic.model_validate(card),
user_id=current_user.id,
)
return card
async def add_label(
card: Card,
label_id: UUID,
current_user: User
) -> Card:
"""Add label to card"""
label = await session.get(Label, label_id)
if not label:
raise HTTPException(status_code=404, detail="Label not found")
if label not in card.labels:
card.labels.append(label)
await session.commit()
await self._refresh(card)
# Publish event
await self.publish_board_event(
event_type=EventType.CARD_LABELED,
board_id=card.column.board_id,
data=CardPublic.model_validate(card),
user_id=current_user.id,
)
# Check automations
await automation_service.execute_automations(
board_id=card.column.board_id,
trigger_type=TriggerType.LABEL_ADDED,
context={"card_id": card.id, "label_id": label.id}
)
return card
Endpoints
POST /boards/{board_id}/columns/{column_id}/cards- Create cardGET /cards/{card_id}- Get card detailsPATCH /cards/{card_id}- Update cardDELETE /cards/{card_id}- Delete cardPOST /cards/{card_id}/move- Move cardPUT /cards/{card_id}/assignees- Set assigneesPOST /cards/{card_id}/labels/{label_id}- Add labelDELETE /cards/{card_id}/labels/{label_id}- Remove label
Labels Domain
Manages labels for categorizing cards.
Location: rego/domains/labels/
Service: LabelService
Key methods:
async def create(
payload: LabelCreate,
board: Board,
current_user: User
) -> Label:
"""Create label for board"""
label = Label(
board_id=board.id,
name=payload.name,
color=payload.color,
)
label.created_by = current_user.id
session.add(label)
await session.commit()
await session.refresh(label)
await self.publish_board_event(
event_type=EventType.LABEL_CREATED,
board_id=board.id,
data=LabelPublic.model_validate(label),
user_id=current_user.id,
)
return label
Endpoints
POST /boards/{board_id}/labels- Create labelPATCH /labels/{label_id}- Update labelDELETE /labels/{label_id}- Delete label
Checklists Domain
Manages checklists and checklist items within cards.
Location: rego/domains/checklists/
Service: ChecklistService
Relationship loading:
Key methods:
async def create(
payload: ChecklistCreate,
card: Card,
current_user: User
) -> Checklist:
"""Create checklist in card"""
rank = await self._get_next_rank(card.id)
checklist = Checklist(
card_id=card.id,
title=payload.title,
rank=rank,
)
checklist.created_by = current_user.id
session.add(checklist)
await session.commit()
await self._refresh(checklist)
await self.publish_board_event(
event_type=EventType.CHECKLIST_CREATED,
board_id=card.column.board_id,
data=ChecklistPublic.model_validate(checklist),
user_id=current_user.id,
)
return checklist
async def create_item(
payload: ChecklistItemCreate,
checklist: Checklist,
current_user: User
) -> ChecklistItem:
"""Create item in checklist"""
rank = await self._get_next_rank(checklist.id)
item = ChecklistItem(
checklist_id=checklist.id,
content=payload.content,
rank=rank,
)
item.created_by = current_user.id
session.add(item)
await session.commit()
await session.refresh(item)
await self.publish_board_event(
event_type=EventType.CHECKLIST_ITEM_CREATED,
board_id=checklist.card.column.board_id,
data=ChecklistItemPublic.model_validate(item),
user_id=current_user.id,
)
return item
Endpoints
POST /cards/{card_id}/checklists- Create checklistPATCH /checklists/{checklist_id}- Update checklistDELETE /checklists/{checklist_id}- Delete checklistPOST /checklists/{checklist_id}/move- Move checklistPOST /checklists/{checklist_id}/items- Create itemPATCH /items/{item_id}- Update itemDELETE /items/{item_id}- Delete itemPOST /items/{item_id}/move- Move itemPUT /items/{item_id}/assignees- Set assignees
Attachments Domain
Manages file attachments for cards.
Location: rego/domains/attachments/
Service: AttachmentService
Key methods:
async def create(
file: UploadFile,
card: Card,
current_user: User
) -> Attachment:
"""Upload file attachment to card"""
# Read file content
content = await file.read()
# Validate size
if len(content) > MAX_FILE_SIZE:
raise HTTPException(
status_code=400,
detail=f"File too large. Max size: {MAX_FILE_SIZE} bytes"
)
attachment = Attachment(
card_id=card.id,
name=file.filename,
mime_type=file.content_type,
size=len(content),
blob=content,
)
attachment.created_by = current_user.id
session.add(attachment)
await session.commit()
# Don't include blob in event
await self.publish_board_event(
event_type=EventType.ATTACHMENT_CREATED,
board_id=card.column.board_id,
data=AttachmentPublic.model_validate(attachment),
user_id=current_user.id,
)
return attachment
async def download(attachment: Attachment) -> StreamingResponse:
"""Download attachment file"""
# Load blob (deferred by default)
await session.refresh(attachment, ["blob"])
return StreamingResponse(
io.BytesIO(attachment.blob),
media_type=attachment.mime_type,
headers={
"Content-Disposition": f'attachment; filename="{attachment.name}"'
}
)
Endpoints
POST /cards/{card_id}/attachments- Upload fileGET /attachments/{attachment_id}- Download fileDELETE /attachments/{attachment_id}- Delete file
Automations Domain
Manages automation rules for boards.
Location: rego/domains/automations/
Service: AutomationService
Key methods:
async def create(
payload: AutomationRuleCreate,
board: Board,
current_user: User
) -> AutomationRule:
"""Create automation rule"""
# Validate trigger and action references
await self._validate_trigger_references(payload, board.id)
await self._validate_action_references(payload, board.id)
rule = AutomationRule(
board_id=board.id,
name=payload.name,
trigger_type=payload.trigger_type,
trigger_column_id=payload.trigger_column_id,
trigger_label_id=payload.trigger_label_id,
action_type=payload.action_type,
action_column_id=payload.action_column_id,
action_user_id=payload.action_user_id,
is_enabled=payload.is_enabled,
)
rule.created_by = current_user.id
session.add(rule)
await session.commit()
return rule
async def execute_automations(
board_id: UUID,
trigger_type: TriggerType,
context: dict,
) -> None:
"""Execute matching automation rules"""
# Get all enabled rules for this trigger
rules = await self.get_board_rules(board_id, trigger_type)
for rule in rules:
if self._matches_trigger(rule, context):
await self._execute_action(rule, context, board_id)
def _matches_trigger(rule: AutomationRule, context: dict) -> bool:
"""Check if rule matches context"""
if rule.trigger_type == TriggerType.CARD_MOVED:
return context.get("column_id") == rule.trigger_column_id
elif rule.trigger_type == TriggerType.LABEL_ADDED:
return context.get("label_id") == rule.trigger_label_id
return False
async def _execute_action(
rule: AutomationRule,
context: dict,
board_id: UUID
) -> bool:
"""Execute automation action"""
card_id = context.get("card_id")
card = await session.get(Card, card_id)
if rule.action_type == ActionType.MOVE_TO_COLUMN:
# Prevent infinite loops
if card.column_id == rule.action_column_id:
return False
# Move card
card.column_id = rule.action_column_id
await session.commit()
# Publish event (user_id=None for system events)
await self.publish_board_event(
event_type=EventType.CARD_MOVED,
board_id=board_id,
data=CardPublic.model_validate(card),
user_id=None,
)
return True
elif rule.action_type == ActionType.ASSIGN_MEMBER:
# Check not already assigned
if rule.action_user_id in [u.id for u in card.assignees]:
return False
user = await session.get(User, rule.action_user_id)
card.assignees.append(user)
await session.commit()
await self.publish_board_event(
event_type=EventType.CARD_ASSIGNED,
board_id=board_id,
data=CardPublic.model_validate(card),
user_id=None,
)
return True
return False
Endpoints
POST /boards/{board_id}/automations- Create rule (owners only)GET /boards/{board_id}/automations- List rulesPATCH /automations/{rule_id}- Update rule (owners only)DELETE /automations/{rule_id}- Delete rule (owners only)
Access Control Domain
Provides reusable dependencies for authorization checks.
Location: rego/domains/access/
Dependencies
def require_board_access(
required_role: MemberRole | None = None,
get_board: Callable = get_board_by_id_full,
) -> Callable:
"""Ensure user is board member with optional role requirement"""
def require_board_members(
get_board: Callable = get_board_by_id_full,
) -> Callable:
"""Validate multiple users are board members"""
def require_user_is_board_member(
get_board: Callable = get_board_by_id_full,
) -> Callable:
"""Validate specific user is board member"""
These are used throughout other domains to enforce board-level permissions.
WebSocket Domain
Manages WebSocket connections for real-time updates.
Location: rego/domains/websocket/
Router
WebSocket endpoint:
@router.websocket("/boards/{board_id}")
async def board_websocket(
websocket: WebSocket,
board_id: UUID,
):
# Authenticate from query param
token = websocket.query_params.get("token")
user = await get_user_from_token(token)
# Verify board membership
is_member = await check_board_membership(user.id, board_id)
if not is_member:
await websocket.close(code=1008)
return
# Accept connection
await manager.connect(board_id, websocket)
# Register presence
await redis.setex(
f"board:{board_id}:presence:{user.id}",
PRESENCE_TTL,
"online"
)
# Broadcast user joined
await manager.broadcast_to_board(
board_id,
json.dumps({"type": "user_joined", "user_id": str(user.id)})
)
# Start heartbeat
heartbeat = asyncio.create_task(
heartbeat_task(websocket, board_id, user.id)
)
try:
while True:
message = await websocket.receive_json()
if message.get("type") == "pong":
pong_received.set()
except WebSocketDisconnect:
pass
finally:
heartbeat.cancel()
await manager.disconnect(board_id, websocket)
await redis.delete(f"board:{board_id}:presence:{user.id}")
Endpoints
WS /ws/boards/{board_id}- Connect to board updatesGET /ws/boards/{board_id}/presence- Get online users
Common Service Patterns
Transaction Management
Services handle commits explicitly:
async def create(...):
entity = Model(...)
session.add(entity)
await session.commit() # Explicit commit
await self._refresh(entity)
return entity
Error Handling
Use HTTPException for client errors:
if not entity:
raise HTTPException(status_code=404, detail="Entity not found")
if not has_permission:
raise HTTPException(status_code=403, detail="Insufficient permissions")
Event Publishing
Always publish events after successful database updates:
await session.commit()
await self._refresh(entity)
await self.publish_board_event(
event_type=EventType.ENTITY_CREATED,
board_id=board.id,
data=EntityPublic.model_validate(entity),
user_id=current_user.id,
)
Relationship Loading
Use _refresh_relationships for consistent loading:
class CardService(BaseService[Card]):
_refresh_relationships = ["labels", "assignees", "checklists"]
# Automatically applied in _refresh() and _post_update()
Validation
Validate business rules before database operations:
# Check entity exists
entity = await session.get(Model, entity_id)
if not entity:
raise HTTPException(status_code=404)
# Check permissions
if not user_is_owner:
raise HTTPException(status_code=403)
# Check constraints
if duplicate_exists:
raise HTTPException(status_code=409)
# Proceed with operation
This pattern ensures clean separation between HTTP handling (routers), business logic (services), and data validation (schemas + dependencies).