Skip to content

Core Components

This document covers the foundational infrastructure components that power the Rego backend.

Database Layer

Engine Configuration

The database engine is configured as a singleton using SQLAlchemy's async engine:

Location: rego/core/databases/engine.py

@lru_cache
def get_async_engine() -> AsyncEngine:
    settings = get_settings()
    return create_async_engine(
        settings.postgres.url,
        echo=not settings.env.is_production,  # SQL logging in dev
        pool_pre_ping=True,                    # Health check before use
        pool_size=5,                           # Connection pool size
        max_overflow=10,                       # Extra connections under load
    )

Key features:

  • Connection pooling: Maintains 5 persistent connections with up to 10 overflow
  • Health checks: pool_pre_ping validates connections before use
  • Query logging: Enabled in development for debugging
  • Async support: Full async/await throughout

Session Management

Database sessions are managed per-request using FastAPI's dependency injection:

Location: rego/core/databases/session.py

async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session_maker() as session:
        yield session

Usage in endpoints:

@router.get("/cards/{card_id}")
async def get_card(
    card_id: UUID,
    session: AsyncSession = Depends(get_async_session)
):
    result = await session.get(Card, card_id)
    return result

Benefits:

  • Automatic session lifecycle management
  • Proper cleanup even on exceptions
  • No manual commit/rollback needed for reads
  • Services handle commits explicitly

Models Architecture

All models inherit from a base class that provides common functionality:

Location: rego/core/models.py

class BaseTable(Base):
    __abstract__ = True

    id: Mapped[UUID] = mapped_column(default_factory=uuid4, primary_key=True)

    # Audit fields
    created_by: Mapped[UUID | None] = mapped_column(default=None)
    updated_by: Mapped[UUID | None] = mapped_column(default=None)

    # Timestamps (server-side defaults)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now(),
    )

    # Optimistic locking
    version: Mapped[int] = mapped_column(default=1)

Features:

  • UUID primary keys: More secure than sequential integers
  • Audit trail: Track who created/modified each record
  • Automatic timestamps: Server-side defaults ensure consistency
  • Optimistic locking: Version field for concurrent update detection
  • Timezone-aware: All timestamps use UTC

Relationships

The models use SQLAlchemy's declarative relationships:

class Card(BaseTable):
    column_id: Mapped[UUID] = mapped_column(ForeignKey("column.id"))

    # Back-reference to parent
    column: Mapped[Column] = relationship(back_populates="cards")

    # Many-to-many with association table
    labels: Mapped[list[Label]] = relationship(
        secondary="cardlabel",
        back_populates="cards"
    )

    # Many-to-many with extra data (use association object)
    assignees: Mapped[list[User]] = relationship(
        secondary="cardassignee",
        back_populates="assigned_cards"
    )

Patterns:

  • Cascade deletes: cascade="all, delete-orphan" for parent-child
  • Foreign key constraints: Enforced at database level
  • Lazy loading: Relationships loaded on-demand unless eager-loaded
  • Bidirectional: Both sides of relationship are navigable

Redis Integration

Redis serves two critical functions: real-time event distribution and rate limiting.

Location: rego/core/databases/redis.py

@lru_cache
def get_redis() -> Redis:
    settings = get_settings()
    return Redis.from_url(
        settings.redis.url,
        encoding="utf-8",
        decode_responses=True
    )

Use Cases

  1. Pub/Sub for events: Broadcast changes to all connected clients
  2. Presence tracking: Track which users are viewing each board
  3. Rate limiting: Store request counts per user/IP

Connection Management

  • Single Redis connection pool shared across the application
  • Automatic reconnection on connection loss
  • Graceful degradation if Redis unavailable (events fail silently)

Event System

The event system enables real-time updates across all connected clients, even with multiple backend instances.

Architecture

┌─────────┐      ┌─────────┐      ┌───────────┐      ┌──────────┐
│ Client  │ ───> │  API    │ ───> │   Redis   │ ───> │ WebSocket│
│ Request │      │ Handler │      │  Pub/Sub  │      │ Manager  │
└─────────┘      └─────────┘      └───────────┘      └──────────┘
                                                             v
                                                      ┌──────────────┐
                                                      │ All Connected│
                                                      │   Clients    │
                                                      └──────────────┘

Event Publisher

Location: rego/core/events/publisher.py

class EventPublisher:
    def __init__(self, redis: Redis):
        self.redis = redis

    async def publish(self, message: WSMessage) -> None:
        channel = f"{message.channel.value}:{message.channel_id}"
        await self.redis.publish(channel, message.payload.serialize())

Publishers are used in services to broadcast changes:

await self.publish_board_event(
    event_type=EventType.CARD_CREATED,
    board_id=board.id,
    data=CardPublic.model_validate(card),
    user_id=current_user.id,
)

Event Subscriber

Location: rego/core/events/subscriber.py

The subscriber runs as a background task, listening to Redis channels and forwarding messages to WebSocket connections:

class EventSubscriber:
    async def start(self):
        pubsub = self.redis.pubsub()
        await pubsub.psubscribe("board:*")  # Subscribe to all boards

        async for message in pubsub.listen():
            if message["type"] == "pmessage":
                # Extract board_id from channel
                channel = message["channel"]
                board_id = UUID(channel.split(":")[-1])

                # Forward to all WebSocket connections for this board
                await self.manager.broadcast_to_board(
                    board_id,
                    message["data"]
                )

Event Types

Location: rego/core/events/types.py

All events are strongly typed using enums:

class EventType(str, Enum):
    # Board events
    BOARD_UPDATED = "board.updated"
    BOARD_DELETED = "board.deleted"

    # Column events
    COLUMN_CREATED = "column.created"
    COLUMN_UPDATED = "column.updated"
    COLUMN_DELETED = "column.deleted"
    COLUMN_MOVED = "column.moved"

    # Card events
    CARD_CREATED = "card.created"
    CARD_UPDATED = "card.updated"
    CARD_DELETED = "card.deleted"
    CARD_MOVED = "card.moved"
    CARD_LABELED = "card.labeled"
    CARD_UNLABELED = "card.unlabeled"

    # ... and more

Message Structure

Events are structured as:

class WSPayload(BaseModel):
    type: EventType                  # What happened
    data: dict | BaseModel           # The resource that changed
    user_id: UUID | None             # Who made the change (or None for system)
    timestamp: datetime              # When it happened
    version: str = "1.0"             # Schema version
class WSMessage(BaseModel):
    channel: WSChannel               # Always "board" currently
    channel_id: UUID                 # Which board
    payload: WSPayload               # The actual event data

WebSocket Manager

The connection manager handles all active WebSocket connections.

Location: rego/core/events/manager.py

class ConnectionManager:
    def __init__(self):
        # Connections grouped by board
        self.connections: dict[UUID, list[WebSocket]] = defaultdict(list)
        self._lock = asyncio.Lock()  # Thread-safe operations

Key Operations

Connect:

async def connect(self, board_id: UUID, websocket: WebSocket):
    await websocket.accept()
    async with self._lock:
        self.connections[board_id].append(websocket)

Disconnect:

async def disconnect(self, board_id: UUID, websocket: WebSocket):
    async with self._lock:
        self.connections[board_id].remove(websocket)
        if not self.connections[board_id]:
            del self.connections[board_id]  # Cleanup empty boards

Broadcast:

async def broadcast_to_board(self, board_id: UUID, message: str):
    async with self._lock:
        connections_snapshot = list(self.connections[board_id])

    # Send outside lock to prevent blocking
    for connection in connections_snapshot:
        try:
            await connection.send_text(message)
        except Exception:
            await self.disconnect(board_id, connection)  # Dead connection

Thread Safety

All operations use asyncio locks to prevent race conditions:

  • Multiple clients connecting/disconnecting simultaneously
  • Broadcasting while connections are being removed
  • Reading stats while modifying connections

Service Layer

All business logic lives in service classes that inherit from BaseService.

Location: rego/core/services/base.py

Base Service Features

class BaseService(Generic[T]):
    _refresh_relationships: list[str] | None = None

    def __init__(self, session: AsyncSession):
        self.session = session
        self.publisher = EventPublisher(redis=get_redis())

Automatic relationship loading:

class CardService(BaseService[Card]):
    _refresh_relationships = ["labels", "assignees", "attachments"]

    async def create(self, payload: CardCreate, user: User) -> Card:
        card = Card(...)
        self.session.add(card)
        await self.session.commit()

        # Automatically refreshes with labels, assignees, attachments
        await self._refresh(card)
        return card

Update helper:

async def update(
    self,
    instance: T,
    obj: dict | BaseModel,
    update: dict | None = None
) -> T:
    self._update_from_dict(instance, obj, update=update)
    await self.session.commit()
    await self._post_update(instance)
    return instance

Event publishing:

await self.publish_board_event(
    event_type=EventType.CARD_UPDATED,
    board_id=card.column.board_id,
    data=CardPublic.model_validate(card),
    user_id=current_user.id,
)

Ranking System

The ranking system enables efficient drag-and-drop reordering without updating all items.

Location: rego/core/services/rank.py

Algorithm

Items are ordered using 63-bit integers. When inserting between two items, we calculate the midpoint:

def rank_move(left: int | None, right: int | None) -> int:
    """Calculate rank for item between left and right neighbors"""

    if left is None and right is None:
        return MAX // 2  # Middle of entire space

    if left is None:
        return (MIN + right) // 2  # Before first item

    if right is None:
        return (left + MAX) // 2  # After last item

    if right - left <= 1:
        raise RebalanceNeeded()  # No space between

    return (left + right) // 2

Rebalancing

When space runs out (items too close together), services trigger a rebalance that redistributes all items evenly:

async def _rebalance_ranks(self, items: list[Card]):
    """Redistribute items evenly across the rank space"""
    count = len(items)
    spacing = MAX // (count + 1)

    for i, item in enumerate(items):
        item.rank = spacing * (i + 1)

    await self.session.commit()

Usage Example

Moving a card to a new position:

# Get neighbors at target position
prev_card = await get_card_at_position(target_position - 1)
next_card = await get_card_at_position(target_position)

try:
    new_rank = rank_move(
        prev_card.rank if prev_card else None,
        next_card.rank if next_card else None
    )
    card.rank = new_rank
except RebalanceNeeded:
    await self._rebalance_ranks(all_cards_in_column)
    # Retry after rebalance

Settings Management

All configuration is centralized and type-safe using Pydantic.

Location: rego/core/settings/

Settings Structure

class Settings(BaseModel):
    jwt: JWTSettings
    env: EnvironmentSettings
    postgres: PostgresSettings
    redis: RedisSettings
    cors: CORSSettings
    rate_limit: RateLimitSettings

Environment Variables

Each settings class loads from environment variables:

class PostgresSettings(BaseSettings):
    host: str = Field(default="localhost", alias="POSTGRES_HOST")
    port: int = Field(default=5432, alias="POSTGRES_PORT")
    user: str = Field(default="postgres", alias="POSTGRES_USER")
    password: str = Field(default="password", alias="POSTGRES_PASSWORD")
    database: str = Field(default="rego", alias="POSTGRES_DB")

    @property
    def url(self) -> str:
        return f"postgresql+asyncpg://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"

Production Validation

Settings include validators that enforce security requirements in production:

@model_validator(mode="after")
def validate_secret_security(self):
    if not self.env.is_production:
        return self

    if self.jwt.secret == "CHANGE_ME":
        raise ValueError("JWT_SECRET must be changed in production")

    if len(self.jwt.secret) < 32:
        raise ValueError("JWT_SECRET must be at least 32 characters")

    return self

Usage

Settings are accessed via dependency injection:

@router.get("/info")
async def get_info(settings: Settings = Depends(get_settings)):
    return {
        "environment": settings.env.environment,
        "debug": settings.env.enable_debug,
    }

Rate Limiting

The rate limiting system protects the API from abuse and ensures fair usage.

Location: rego/core/middlewares/ratelimit.py

Configuration

Rate limits are defined per operation type:

class RateLimitSettings(BaseSettings):
    enabled: bool = True
    storage_url: str = "memory://"  # or Redis URL for distributed

    # Limits in "requests/period" format
    read: str = "100/minute"        # GET requests
    write: str = "50/minute"        # POST/PUT/PATCH/DELETE
    default: str = "60/minute"      # Everything else

Per-Endpoint Limits

Individual endpoints can override defaults:

@router.post("/cards")
@rate_limit("10/minute")  # Custom limit for this endpoint
async def create_card(...):
    ...

Identification

Rate limits are tracked per:

  • Authenticated users: By user ID from JWT token
  • Anonymous users: By IP address (respects X-Forwarded-For)

Algorithm

Uses a moving window rate limiter:

  • Tracks request timestamps in a sliding window
  • More accurate than fixed windows
  • Prevents burst traffic at window boundaries

Response Headers

All responses include rate limit information:

RateLimit-Limit: 100
RateLimit-Remaining: 87
RateLimit-Reset: 42
Retry-After: 42

Middleware Stack

The application uses several middleware layers (applied in order):

  1. CORS Middleware: Handles cross-origin requests
  2. Rate Limit Middleware: Throttles excessive requests
  3. FastAPI Middleware: Request routing and validation
  4. Exception Handlers: Standardized error responses

CORS Configuration

app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.cors.allowed_origins,
    allow_credentials=True,
    allow_methods=settings.cors.allowed_methods,
    allow_headers=settings.cors.allowed_headers,
)

In development: allow_origins=["*"]
In production: Specific frontend URLs only

Custom Types

The application defines custom SQLAlchemy types for complex data.

Location: rego/core/types/

Color Type

Stores colors as hex strings but validates format:

class ColorType(TypeDecorator):
    impl = String(7)  # #RRGGBB

    def process_bind_param(self, value, dialect):
        if value is None:
            return None
        return str(value)  # Convert Color object to string

    def process_result_value(self, value, dialect):
        if value is None:
            return None
        return Color(value)  # Convert string to Color object

Usage in models:

class Label(BaseTable):
    color: Mapped[Color | None] = mapped_column(ColorType(), default=None)

This ensures colors are always valid hex codes and provides a consistent interface.