Skip to content

Domain Services

This document describes the business logic layer organized by domain. Each domain handles a specific area of functionality with its own routers, services, schemas, and dependencies.

Architecture Pattern

Each domain follows a consistent structure:

domains/{domain}/
├── router.py         # FastAPI route handlers
├── service.py        # Business logic
├── schemas.py        # Pydantic models for requests/responses
├── dependencies.py   # FastAPI dependency injection
└── models.py         # Domain-specific SQLAlchemy models (optional)

Separation of concerns:

  • Routers handle HTTP requests and responses
  • Services contain all business logic and database operations
  • Schemas define request/response formats and validation
  • Dependencies provide reusable parameter extraction and validation

Authentication Domain

Handles user registration, login, and authentication.

Location: rego/domains/auth/

Key Components

Manager: Configured fastapi-users instance

# rego/domains/auth/manager.py
fastapi_users = FastAPIUsers[User, UUID](
    get_user_manager,
    [auth_backend],
)

Backends: JWT authentication with Bearer token and cookie support

# rego/domains/auth/backends.py
bearer_transport = BearerTransport(tokenUrl="auth/login")
cookie_transport = CookieTransport(cookie_name="rego_auth")

auth_backend = AuthenticationBackend(
    name="jwt",
    transport=bearer_transport,
    get_strategy=get_jwt_strategy,
)

Dependencies: Current user extraction

# rego/domains/auth/dependencies.py
current_user = fastapi_users.current_user(active=True)
current_superuser = fastapi_users.current_user(active=True, superuser=True)

Endpoints

  • POST /auth/register - Create new user account
  • POST /auth/login - Authenticate and get token
  • POST /auth/logout - End session
  • GET /auth/me - Get current user profile
  • PATCH /auth/me - Update profile
  • POST /auth/change-password - Change password

Users Domain

Manages user profiles and user-related operations.

Location: rego/domains/users/

Service: UserService

Handles user data operations beyond authentication.

Key methods:

async def get_by_id(user_id: UUID) -> User
async def get_by_ids(user_ids: list[UUID]) -> list[User]
async def search(query: str) -> list[User]

Dependencies

async def get_user_by_id(
    user_id: UUID,
    session: AsyncSession = Depends(get_async_session)
) -> User:
    """Fetch user by ID, raise 404 if not found"""
    user = await session.get(User, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

Endpoints

  • GET /users/{user_id} - Get user profile
  • GET /users - Search users (by username or email)

Boards Domain

Manages board creation, membership, and board-level operations.

Location: rego/domains/boards/

Service: BoardService

Relationship loading:

class BoardService(BaseService[Board]):
    _refresh_relationships = ["columns", "labels", "members"]

Key methods:

async def create(
    payload: BoardCreate,
    current_user: User
) -> Board:
    """Create board and add creator as owner"""
    board = Board(title=payload.title, description=payload.description)
    board.created_by = current_user.id

    # Add creator as owner
    member = BoardMember(
        board_id=board.id,
        user_id=current_user.id,
        role=MemberRole.owner
    )

    session.add(board)
    session.add(member)
    await session.commit()
    await self._refresh(board)
    return board

async def update(
    board: Board,
    payload: BoardUpdate,
    current_user: User
) -> Board:
    """Update board details"""
    await super().update(board, payload, update={"updated_by": current_user.id})

    # Publish event
    await self.publish_board_event(
        event_type=EventType.BOARD_UPDATED,
        board_id=board.id,
        data=BoardPublic.model_validate(board),
        user_id=current_user.id,
    )
    return board

async def delete(board: Board, current_user: User) -> None:
    """Delete board and cascade to all related data"""
    await self.publish_board_event(
        event_type=EventType.BOARD_DELETED,
        board_id=board.id,
        data={"id": str(board.id)},
        user_id=current_user.id,
    )
    await super().delete(board)

async def invite_user(
    board: Board,
    payload: BoardInvite
) -> BoardInviteResponse:
    """Add user to board as member"""
    user = await session.get(User, payload.user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    # Check if already member
    existing = await session.execute(
        select(BoardMember).where(
            BoardMember.board_id == board.id,
            BoardMember.user_id == user.id
        )
    )
    if existing.scalars().first():
        raise HTTPException(status_code=409, detail="User already member")

    # Add as member
    member = BoardMember(
        board_id=board.id,
        user_id=user.id,
        role=MemberRole.member
    )
    session.add(member)
    await session.commit()

    return BoardInviteResponse(user_id=user.id, role=MemberRole.member)

Dependencies

async def get_board_by_id(board_id: UUID) -> Board:
    """Fetch board by ID without relationships"""

async def get_board_by_id_full(board_id: UUID) -> Board:
    """Fetch board with all relationships loaded"""
    result = await session.execute(
        select(Board)
        .where(Board.id == board_id)
        .options(
            selectinload(Board.columns).selectinload(Column.cards),
            selectinload(Board.labels),
            selectinload(Board.members),
        )
    )

Endpoints

  • POST /boards - Create new board
  • GET /boards - List user's boards
  • GET /boards/{board_id} - Get board with full data
  • PATCH /boards/{board_id} - Update board (owners only)
  • DELETE /boards/{board_id} - Delete board (owners only)
  • POST /boards/{board_id}/invite - Add member (owners only)
  • GET /boards/{board_id}/labels - Get board labels
  • GET /boards/{board_id}/labels/stats - Get label usage statistics

Columns Domain

Manages columns (lists) within boards.

Location: rego/domains/columns/

Service: ColumnService

Relationship loading:

class ColumnService(BaseService[Column]):
    _refresh_relationships = ["cards"]

Key methods:

async def create(
    payload: ColumnCreate,
    board: Board,
    current_user: User
) -> Column:
    """Create column at specified position"""
    # Calculate rank
    if payload.position is not None:
        rank = await self._calculate_rank(board.id, payload.position)
    else:
        rank = await self._get_next_rank(board.id)

    column = Column(
        board_id=board.id,
        title=payload.title,
        rank=rank,
        color=payload.color,
        is_done_column=payload.is_done_column,
    )
    column.created_by = current_user.id

    session.add(column)
    await session.commit()
    await self._refresh(column)

    # Publish event
    await self.publish_board_event(
        event_type=EventType.COLUMN_CREATED,
        board_id=board.id,
        data=ColumnPublic.model_validate(column),
        user_id=current_user.id,
    )
    return column

async def move(
    column: Column,
    position: int,
    current_user: User
) -> Column:
    """Move column to new position"""
    try:
        new_rank = await self._calculate_rank(column.board_id, position)
        column.rank = new_rank
        await session.commit()
    except RebalanceNeeded:
        await self._rebalance_columns(column.board_id)
        new_rank = await self._calculate_rank(column.board_id, position)
        column.rank = new_rank
        await session.commit()

    await self.publish_board_event(
        event_type=EventType.COLUMN_MOVED,
        board_id=column.board_id,
        data=ColumnPublic.model_validate(column),
        user_id=current_user.id,
    )
    return column

Endpoints

  • POST /boards/{board_id}/columns - Create column
  • PATCH /columns/{column_id} - Update column
  • DELETE /columns/{column_id} - Delete column
  • POST /columns/{column_id}/move - Move to new position

Cards Domain

Manages cards (tasks) within columns.

Location: rego/domains/cards/

Service: CardService

Relationship loading:

class CardService(BaseService[Card]):
    _refresh_relationships = ["labels", "assignees", "checklists", "attachments"]

Key methods:

async def create(
    payload: CardCreate,
    column: Column,
    current_user: User
) -> Card:
    """Create card in column"""
    # Calculate rank
    if payload.position is not None:
        rank = await self._calculate_rank(column.id, payload.position)
    else:
        rank = await self._get_next_rank(column.id)

    card = Card(
        column_id=column.id,
        title=payload.title,
        description=payload.description,
        rank=rank,
    )
    card.created_by = current_user.id

    session.add(card)
    await session.commit()
    await self._refresh(card)

    # Publish event
    await self.publish_board_event(
        event_type=EventType.CARD_CREATED,
        board_id=column.board_id,
        data=CardPublic.model_validate(card),
        user_id=current_user.id,
    )

    # Check automations
    await automation_service.execute_automations(
        board_id=column.board_id,
        trigger_type=TriggerType.CARD_CREATED,
        context={"card_id": card.id, "column_id": column.id}
    )

    return card

async def move(
    card: Card,
    payload: CardMove,
    current_user: User
) -> Card:
    """Move card to different column/position"""
    old_column_id = card.column_id

    # Update column if changed
    if payload.column_id and payload.column_id != card.column_id:
        card.column_id = payload.column_id

    # Calculate new rank
    try:
        new_rank = await self._calculate_rank(
            card.column_id,
            payload.position
        )
        card.rank = new_rank
    except RebalanceNeeded:
        await self._rebalance_cards(card.column_id)
        new_rank = await self._calculate_rank(card.column_id, payload.position)
        card.rank = new_rank

    await session.commit()
    await self._refresh(card)

    # Publish event
    await self.publish_board_event(
        event_type=EventType.CARD_MOVED,
        board_id=card.column.board_id,
        data=CardPublic.model_validate(card),
        user_id=current_user.id,
    )

    # Check automations if column changed
    if old_column_id != card.column_id:
        await automation_service.execute_automations(
            board_id=card.column.board_id,
            trigger_type=TriggerType.CARD_MOVED,
            context={"card_id": card.id, "column_id": card.column_id}
        )

    return card

async def assign_users(
    card: Card,
    user_ids: list[UUID],
    current_user: User
) -> Card:
    """Assign users to card"""
    # Users are already validated by dependency
    users = await get_users_by_ids(user_ids)

    # Add users not already assigned
    for user in users:
        if user not in card.assignees:
            card.assignees.append(user)

    await session.commit()
    await self._refresh(card)

    # Publish event for each new assignment
    for user in users:
        await self.publish_board_event(
            event_type=EventType.CARD_ASSIGNED,
            board_id=card.column.board_id,
            data=CardPublic.model_validate(card),
            user_id=current_user.id,
        )

    return card

async def add_label(
    card: Card,
    label_id: UUID,
    current_user: User
) -> Card:
    """Add label to card"""
    label = await session.get(Label, label_id)
    if not label:
        raise HTTPException(status_code=404, detail="Label not found")

    if label not in card.labels:
        card.labels.append(label)
        await session.commit()
        await self._refresh(card)

        # Publish event
        await self.publish_board_event(
            event_type=EventType.CARD_LABELED,
            board_id=card.column.board_id,
            data=CardPublic.model_validate(card),
            user_id=current_user.id,
        )

        # Check automations
        await automation_service.execute_automations(
            board_id=card.column.board_id,
            trigger_type=TriggerType.LABEL_ADDED,
            context={"card_id": card.id, "label_id": label.id}
        )

    return card

Endpoints

  • POST /boards/{board_id}/columns/{column_id}/cards - Create card
  • GET /cards/{card_id} - Get card details
  • PATCH /cards/{card_id} - Update card
  • DELETE /cards/{card_id} - Delete card
  • POST /cards/{card_id}/move - Move card
  • PUT /cards/{card_id}/assignees - Set assignees
  • POST /cards/{card_id}/labels/{label_id} - Add label
  • DELETE /cards/{card_id}/labels/{label_id} - Remove label

Labels Domain

Manages labels for categorizing cards.

Location: rego/domains/labels/

Service: LabelService

Key methods:

async def create(
    payload: LabelCreate,
    board: Board,
    current_user: User
) -> Label:
    """Create label for board"""
    label = Label(
        board_id=board.id,
        name=payload.name,
        color=payload.color,
    )
    label.created_by = current_user.id

    session.add(label)
    await session.commit()
    await session.refresh(label)

    await self.publish_board_event(
        event_type=EventType.LABEL_CREATED,
        board_id=board.id,
        data=LabelPublic.model_validate(label),
        user_id=current_user.id,
    )
    return label

Endpoints

  • POST /boards/{board_id}/labels - Create label
  • PATCH /labels/{label_id} - Update label
  • DELETE /labels/{label_id} - Delete label

Checklists Domain

Manages checklists and checklist items within cards.

Location: rego/domains/checklists/

Service: ChecklistService

Relationship loading:

class ChecklistService(BaseService[Checklist]):
    _refresh_relationships = ["items"]

Key methods:

async def create(
    payload: ChecklistCreate,
    card: Card,
    current_user: User
) -> Checklist:
    """Create checklist in card"""
    rank = await self._get_next_rank(card.id)

    checklist = Checklist(
        card_id=card.id,
        title=payload.title,
        rank=rank,
    )
    checklist.created_by = current_user.id

    session.add(checklist)
    await session.commit()
    await self._refresh(checklist)

    await self.publish_board_event(
        event_type=EventType.CHECKLIST_CREATED,
        board_id=card.column.board_id,
        data=ChecklistPublic.model_validate(checklist),
        user_id=current_user.id,
    )
    return checklist

async def create_item(
    payload: ChecklistItemCreate,
    checklist: Checklist,
    current_user: User
) -> ChecklistItem:
    """Create item in checklist"""
    rank = await self._get_next_rank(checklist.id)

    item = ChecklistItem(
        checklist_id=checklist.id,
        content=payload.content,
        rank=rank,
    )
    item.created_by = current_user.id

    session.add(item)
    await session.commit()
    await session.refresh(item)

    await self.publish_board_event(
        event_type=EventType.CHECKLIST_ITEM_CREATED,
        board_id=checklist.card.column.board_id,
        data=ChecklistItemPublic.model_validate(item),
        user_id=current_user.id,
    )
    return item

Endpoints

  • POST /cards/{card_id}/checklists - Create checklist
  • PATCH /checklists/{checklist_id} - Update checklist
  • DELETE /checklists/{checklist_id} - Delete checklist
  • POST /checklists/{checklist_id}/move - Move checklist
  • POST /checklists/{checklist_id}/items - Create item
  • PATCH /items/{item_id} - Update item
  • DELETE /items/{item_id} - Delete item
  • POST /items/{item_id}/move - Move item
  • PUT /items/{item_id}/assignees - Set assignees

Attachments Domain

Manages file attachments for cards.

Location: rego/domains/attachments/

Service: AttachmentService

Key methods:

async def create(
    file: UploadFile,
    card: Card,
    current_user: User
) -> Attachment:
    """Upload file attachment to card"""
    # Read file content
    content = await file.read()

    # Validate size
    if len(content) > MAX_FILE_SIZE:
        raise HTTPException(
            status_code=400,
            detail=f"File too large. Max size: {MAX_FILE_SIZE} bytes"
        )

    attachment = Attachment(
        card_id=card.id,
        name=file.filename,
        mime_type=file.content_type,
        size=len(content),
        blob=content,
    )
    attachment.created_by = current_user.id

    session.add(attachment)
    await session.commit()

    # Don't include blob in event
    await self.publish_board_event(
        event_type=EventType.ATTACHMENT_CREATED,
        board_id=card.column.board_id,
        data=AttachmentPublic.model_validate(attachment),
        user_id=current_user.id,
    )
    return attachment

async def download(attachment: Attachment) -> StreamingResponse:
    """Download attachment file"""
    # Load blob (deferred by default)
    await session.refresh(attachment, ["blob"])

    return StreamingResponse(
        io.BytesIO(attachment.blob),
        media_type=attachment.mime_type,
        headers={
            "Content-Disposition": f'attachment; filename="{attachment.name}"'
        }
    )

Endpoints

  • POST /cards/{card_id}/attachments - Upload file
  • GET /attachments/{attachment_id} - Download file
  • DELETE /attachments/{attachment_id} - Delete file

Automations Domain

Manages automation rules for boards.

Location: rego/domains/automations/

Service: AutomationService

Key methods:

async def create(
    payload: AutomationRuleCreate,
    board: Board,
    current_user: User
) -> AutomationRule:
    """Create automation rule"""
    # Validate trigger and action references
    await self._validate_trigger_references(payload, board.id)
    await self._validate_action_references(payload, board.id)

    rule = AutomationRule(
        board_id=board.id,
        name=payload.name,
        trigger_type=payload.trigger_type,
        trigger_column_id=payload.trigger_column_id,
        trigger_label_id=payload.trigger_label_id,
        action_type=payload.action_type,
        action_column_id=payload.action_column_id,
        action_user_id=payload.action_user_id,
        is_enabled=payload.is_enabled,
    )
    rule.created_by = current_user.id

    session.add(rule)
    await session.commit()
    return rule

async def execute_automations(
    board_id: UUID,
    trigger_type: TriggerType,
    context: dict,
) -> None:
    """Execute matching automation rules"""
    # Get all enabled rules for this trigger
    rules = await self.get_board_rules(board_id, trigger_type)

    for rule in rules:
        if self._matches_trigger(rule, context):
            await self._execute_action(rule, context, board_id)

def _matches_trigger(rule: AutomationRule, context: dict) -> bool:
    """Check if rule matches context"""
    if rule.trigger_type == TriggerType.CARD_MOVED:
        return context.get("column_id") == rule.trigger_column_id
    elif rule.trigger_type == TriggerType.LABEL_ADDED:
        return context.get("label_id") == rule.trigger_label_id
    return False

async def _execute_action(
    rule: AutomationRule,
    context: dict,
    board_id: UUID
) -> bool:
    """Execute automation action"""
    card_id = context.get("card_id")
    card = await session.get(Card, card_id)

    if rule.action_type == ActionType.MOVE_TO_COLUMN:
        # Prevent infinite loops
        if card.column_id == rule.action_column_id:
            return False

        # Move card
        card.column_id = rule.action_column_id
        await session.commit()

        # Publish event (user_id=None for system events)
        await self.publish_board_event(
            event_type=EventType.CARD_MOVED,
            board_id=board_id,
            data=CardPublic.model_validate(card),
            user_id=None,
        )
        return True

    elif rule.action_type == ActionType.ASSIGN_MEMBER:
        # Check not already assigned
        if rule.action_user_id in [u.id for u in card.assignees]:
            return False

        user = await session.get(User, rule.action_user_id)
        card.assignees.append(user)
        await session.commit()

        await self.publish_board_event(
            event_type=EventType.CARD_ASSIGNED,
            board_id=board_id,
            data=CardPublic.model_validate(card),
            user_id=None,
        )
        return True

    return False

Endpoints

  • POST /boards/{board_id}/automations - Create rule (owners only)
  • GET /boards/{board_id}/automations - List rules
  • PATCH /automations/{rule_id} - Update rule (owners only)
  • DELETE /automations/{rule_id} - Delete rule (owners only)

Access Control Domain

Provides reusable dependencies for authorization checks.

Location: rego/domains/access/

Dependencies

def require_board_access(
    required_role: MemberRole | None = None,
    get_board: Callable = get_board_by_id_full,
) -> Callable:
    """Ensure user is board member with optional role requirement"""

def require_board_members(
    get_board: Callable = get_board_by_id_full,
) -> Callable:
    """Validate multiple users are board members"""

def require_user_is_board_member(
    get_board: Callable = get_board_by_id_full,
) -> Callable:
    """Validate specific user is board member"""

These are used throughout other domains to enforce board-level permissions.

WebSocket Domain

Manages WebSocket connections for real-time updates.

Location: rego/domains/websocket/

Router

WebSocket endpoint:

@router.websocket("/boards/{board_id}")
async def board_websocket(
    websocket: WebSocket,
    board_id: UUID,
):
    # Authenticate from query param
    token = websocket.query_params.get("token")
    user = await get_user_from_token(token)

    # Verify board membership
    is_member = await check_board_membership(user.id, board_id)
    if not is_member:
        await websocket.close(code=1008)
        return

    # Accept connection
    await manager.connect(board_id, websocket)

    # Register presence
    await redis.setex(
        f"board:{board_id}:presence:{user.id}",
        PRESENCE_TTL,
        "online"
    )

    # Broadcast user joined
    await manager.broadcast_to_board(
        board_id,
        json.dumps({"type": "user_joined", "user_id": str(user.id)})
    )

    # Start heartbeat
    heartbeat = asyncio.create_task(
        heartbeat_task(websocket, board_id, user.id)
    )

    try:
        while True:
            message = await websocket.receive_json()
            if message.get("type") == "pong":
                pong_received.set()
    except WebSocketDisconnect:
        pass
    finally:
        heartbeat.cancel()
        await manager.disconnect(board_id, websocket)
        await redis.delete(f"board:{board_id}:presence:{user.id}")

Endpoints

  • WS /ws/boards/{board_id} - Connect to board updates
  • GET /ws/boards/{board_id}/presence - Get online users

Common Service Patterns

Transaction Management

Services handle commits explicitly:

async def create(...):
    entity = Model(...)
    session.add(entity)
    await session.commit()  # Explicit commit
    await self._refresh(entity)
    return entity

Error Handling

Use HTTPException for client errors:

if not entity:
    raise HTTPException(status_code=404, detail="Entity not found")

if not has_permission:
    raise HTTPException(status_code=403, detail="Insufficient permissions")

Event Publishing

Always publish events after successful database updates:

await session.commit()
await self._refresh(entity)

await self.publish_board_event(
    event_type=EventType.ENTITY_CREATED,
    board_id=board.id,
    data=EntityPublic.model_validate(entity),
    user_id=current_user.id,
)

Relationship Loading

Use _refresh_relationships for consistent loading:

class CardService(BaseService[Card]):
    _refresh_relationships = ["labels", "assignees", "checklists"]

    # Automatically applied in _refresh() and _post_update()

Validation

Validate business rules before database operations:

# Check entity exists
entity = await session.get(Model, entity_id)
if not entity:
    raise HTTPException(status_code=404)

# Check permissions
if not user_is_owner:
    raise HTTPException(status_code=403)

# Check constraints
if duplicate_exists:
    raise HTTPException(status_code=409)

# Proceed with operation

This pattern ensures clean separation between HTTP handling (routers), business logic (services), and data validation (schemas + dependencies).