Core Components
This document covers the foundational infrastructure components that power the Rego backend.
Database Layer
Engine Configuration
The database engine is configured as a singleton using SQLAlchemy's async engine:
Location: rego/core/databases/engine.py
@lru_cache
def get_async_engine() -> AsyncEngine:
settings = get_settings()
return create_async_engine(
settings.postgres.url,
echo=not settings.env.is_production, # SQL logging in dev
pool_pre_ping=True, # Health check before use
pool_size=5, # Connection pool size
max_overflow=10, # Extra connections under load
)
Key features:
- Connection pooling: Maintains 5 persistent connections with up to 10 overflow
- Health checks:
pool_pre_pingvalidates connections before use - Query logging: Enabled in development for debugging
- Async support: Full async/await throughout
Session Management
Database sessions are managed per-request using FastAPI's dependency injection:
Location: rego/core/databases/session.py
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
Usage in endpoints:
@router.get("/cards/{card_id}")
async def get_card(
card_id: UUID,
session: AsyncSession = Depends(get_async_session)
):
result = await session.get(Card, card_id)
return result
Benefits:
- Automatic session lifecycle management
- Proper cleanup even on exceptions
- No manual commit/rollback needed for reads
- Services handle commits explicitly
Models Architecture
All models inherit from a base class that provides common functionality:
Location: rego/core/models.py
class BaseTable(Base):
__abstract__ = True
id: Mapped[UUID] = mapped_column(default_factory=uuid4, primary_key=True)
# Audit fields
created_by: Mapped[UUID | None] = mapped_column(default=None)
updated_by: Mapped[UUID | None] = mapped_column(default=None)
# Timestamps (server-side defaults)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
)
# Optimistic locking
version: Mapped[int] = mapped_column(default=1)
Features:
- UUID primary keys: More secure than sequential integers
- Audit trail: Track who created/modified each record
- Automatic timestamps: Server-side defaults ensure consistency
- Optimistic locking: Version field for concurrent update detection
- Timezone-aware: All timestamps use UTC
Relationships
The models use SQLAlchemy's declarative relationships:
class Card(BaseTable):
column_id: Mapped[UUID] = mapped_column(ForeignKey("column.id"))
# Back-reference to parent
column: Mapped[Column] = relationship(back_populates="cards")
# Many-to-many with association table
labels: Mapped[list[Label]] = relationship(
secondary="cardlabel",
back_populates="cards"
)
# Many-to-many with extra data (use association object)
assignees: Mapped[list[User]] = relationship(
secondary="cardassignee",
back_populates="assigned_cards"
)
Patterns:
- Cascade deletes:
cascade="all, delete-orphan"for parent-child - Foreign key constraints: Enforced at database level
- Lazy loading: Relationships loaded on-demand unless eager-loaded
- Bidirectional: Both sides of relationship are navigable
Redis Integration
Redis serves two critical functions: real-time event distribution and rate limiting.
Location: rego/core/databases/redis.py
@lru_cache
def get_redis() -> Redis:
settings = get_settings()
return Redis.from_url(
settings.redis.url,
encoding="utf-8",
decode_responses=True
)
Use Cases
- Pub/Sub for events: Broadcast changes to all connected clients
- Presence tracking: Track which users are viewing each board
- Rate limiting: Store request counts per user/IP
Connection Management
- Single Redis connection pool shared across the application
- Automatic reconnection on connection loss
- Graceful degradation if Redis unavailable (events fail silently)
Event System
The event system enables real-time updates across all connected clients, even with multiple backend instances.
Architecture
┌─────────┐ ┌─────────┐ ┌───────────┐ ┌──────────┐
│ Client │ ───> │ API │ ───> │ Redis │ ───> │ WebSocket│
│ Request │ │ Handler │ │ Pub/Sub │ │ Manager │
└─────────┘ └─────────┘ └───────────┘ └──────────┘
│
v
┌──────────────┐
│ All Connected│
│ Clients │
└──────────────┘
Event Publisher
Location: rego/core/events/publisher.py
class EventPublisher:
def __init__(self, redis: Redis):
self.redis = redis
async def publish(self, message: WSMessage) -> None:
channel = f"{message.channel.value}:{message.channel_id}"
await self.redis.publish(channel, message.payload.serialize())
Publishers are used in services to broadcast changes:
await self.publish_board_event(
event_type=EventType.CARD_CREATED,
board_id=board.id,
data=CardPublic.model_validate(card),
user_id=current_user.id,
)
Event Subscriber
Location: rego/core/events/subscriber.py
The subscriber runs as a background task, listening to Redis channels and forwarding messages to WebSocket connections:
class EventSubscriber:
async def start(self):
pubsub = self.redis.pubsub()
await pubsub.psubscribe("board:*") # Subscribe to all boards
async for message in pubsub.listen():
if message["type"] == "pmessage":
# Extract board_id from channel
channel = message["channel"]
board_id = UUID(channel.split(":")[-1])
# Forward to all WebSocket connections for this board
await self.manager.broadcast_to_board(
board_id,
message["data"]
)
Event Types
Location: rego/core/events/types.py
All events are strongly typed using enums:
class EventType(str, Enum):
# Board events
BOARD_UPDATED = "board.updated"
BOARD_DELETED = "board.deleted"
# Column events
COLUMN_CREATED = "column.created"
COLUMN_UPDATED = "column.updated"
COLUMN_DELETED = "column.deleted"
COLUMN_MOVED = "column.moved"
# Card events
CARD_CREATED = "card.created"
CARD_UPDATED = "card.updated"
CARD_DELETED = "card.deleted"
CARD_MOVED = "card.moved"
CARD_LABELED = "card.labeled"
CARD_UNLABELED = "card.unlabeled"
# ... and more
Message Structure
Events are structured as:
class WSPayload(BaseModel):
type: EventType # What happened
data: dict | BaseModel # The resource that changed
user_id: UUID | None # Who made the change (or None for system)
timestamp: datetime # When it happened
version: str = "1.0" # Schema version
class WSMessage(BaseModel):
channel: WSChannel # Always "board" currently
channel_id: UUID # Which board
payload: WSPayload # The actual event data
WebSocket Manager
The connection manager handles all active WebSocket connections.
Location: rego/core/events/manager.py
class ConnectionManager:
def __init__(self):
# Connections grouped by board
self.connections: dict[UUID, list[WebSocket]] = defaultdict(list)
self._lock = asyncio.Lock() # Thread-safe operations
Key Operations
Connect:
async def connect(self, board_id: UUID, websocket: WebSocket):
await websocket.accept()
async with self._lock:
self.connections[board_id].append(websocket)
Disconnect:
async def disconnect(self, board_id: UUID, websocket: WebSocket):
async with self._lock:
self.connections[board_id].remove(websocket)
if not self.connections[board_id]:
del self.connections[board_id] # Cleanup empty boards
Broadcast:
async def broadcast_to_board(self, board_id: UUID, message: str):
async with self._lock:
connections_snapshot = list(self.connections[board_id])
# Send outside lock to prevent blocking
for connection in connections_snapshot:
try:
await connection.send_text(message)
except Exception:
await self.disconnect(board_id, connection) # Dead connection
Thread Safety
All operations use asyncio locks to prevent race conditions:
- Multiple clients connecting/disconnecting simultaneously
- Broadcasting while connections are being removed
- Reading stats while modifying connections
Service Layer
All business logic lives in service classes that inherit from BaseService.
Location: rego/core/services/base.py
Base Service Features
class BaseService(Generic[T]):
_refresh_relationships: list[str] | None = None
def __init__(self, session: AsyncSession):
self.session = session
self.publisher = EventPublisher(redis=get_redis())
Automatic relationship loading:
class CardService(BaseService[Card]):
_refresh_relationships = ["labels", "assignees", "attachments"]
async def create(self, payload: CardCreate, user: User) -> Card:
card = Card(...)
self.session.add(card)
await self.session.commit()
# Automatically refreshes with labels, assignees, attachments
await self._refresh(card)
return card
Update helper:
async def update(
self,
instance: T,
obj: dict | BaseModel,
update: dict | None = None
) -> T:
self._update_from_dict(instance, obj, update=update)
await self.session.commit()
await self._post_update(instance)
return instance
Event publishing:
await self.publish_board_event(
event_type=EventType.CARD_UPDATED,
board_id=card.column.board_id,
data=CardPublic.model_validate(card),
user_id=current_user.id,
)
Ranking System
The ranking system enables efficient drag-and-drop reordering without updating all items.
Location: rego/core/services/rank.py
Algorithm
Items are ordered using 63-bit integers. When inserting between two items, we calculate the midpoint:
def rank_move(left: int | None, right: int | None) -> int:
"""Calculate rank for item between left and right neighbors"""
if left is None and right is None:
return MAX // 2 # Middle of entire space
if left is None:
return (MIN + right) // 2 # Before first item
if right is None:
return (left + MAX) // 2 # After last item
if right - left <= 1:
raise RebalanceNeeded() # No space between
return (left + right) // 2
Rebalancing
When space runs out (items too close together), services trigger a rebalance that redistributes all items evenly:
async def _rebalance_ranks(self, items: list[Card]):
"""Redistribute items evenly across the rank space"""
count = len(items)
spacing = MAX // (count + 1)
for i, item in enumerate(items):
item.rank = spacing * (i + 1)
await self.session.commit()
Usage Example
Moving a card to a new position:
# Get neighbors at target position
prev_card = await get_card_at_position(target_position - 1)
next_card = await get_card_at_position(target_position)
try:
new_rank = rank_move(
prev_card.rank if prev_card else None,
next_card.rank if next_card else None
)
card.rank = new_rank
except RebalanceNeeded:
await self._rebalance_ranks(all_cards_in_column)
# Retry after rebalance
Settings Management
All configuration is centralized and type-safe using Pydantic.
Location: rego/core/settings/
Settings Structure
class Settings(BaseModel):
jwt: JWTSettings
env: EnvironmentSettings
postgres: PostgresSettings
redis: RedisSettings
cors: CORSSettings
rate_limit: RateLimitSettings
Environment Variables
Each settings class loads from environment variables:
class PostgresSettings(BaseSettings):
host: str = Field(default="localhost", alias="POSTGRES_HOST")
port: int = Field(default=5432, alias="POSTGRES_PORT")
user: str = Field(default="postgres", alias="POSTGRES_USER")
password: str = Field(default="password", alias="POSTGRES_PASSWORD")
database: str = Field(default="rego", alias="POSTGRES_DB")
@property
def url(self) -> str:
return f"postgresql+asyncpg://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"
Production Validation
Settings include validators that enforce security requirements in production:
@model_validator(mode="after")
def validate_secret_security(self):
if not self.env.is_production:
return self
if self.jwt.secret == "CHANGE_ME":
raise ValueError("JWT_SECRET must be changed in production")
if len(self.jwt.secret) < 32:
raise ValueError("JWT_SECRET must be at least 32 characters")
return self
Usage
Settings are accessed via dependency injection:
@router.get("/info")
async def get_info(settings: Settings = Depends(get_settings)):
return {
"environment": settings.env.environment,
"debug": settings.env.enable_debug,
}
Rate Limiting
The rate limiting system protects the API from abuse and ensures fair usage.
Location: rego/core/middlewares/ratelimit.py
Configuration
Rate limits are defined per operation type:
class RateLimitSettings(BaseSettings):
enabled: bool = True
storage_url: str = "memory://" # or Redis URL for distributed
# Limits in "requests/period" format
read: str = "100/minute" # GET requests
write: str = "50/minute" # POST/PUT/PATCH/DELETE
default: str = "60/minute" # Everything else
Per-Endpoint Limits
Individual endpoints can override defaults:
@router.post("/cards")
@rate_limit("10/minute") # Custom limit for this endpoint
async def create_card(...):
...
Identification
Rate limits are tracked per:
- Authenticated users: By user ID from JWT token
- Anonymous users: By IP address (respects X-Forwarded-For)
Algorithm
Uses a moving window rate limiter:
- Tracks request timestamps in a sliding window
- More accurate than fixed windows
- Prevents burst traffic at window boundaries
Response Headers
All responses include rate limit information:
Middleware Stack
The application uses several middleware layers (applied in order):
- CORS Middleware: Handles cross-origin requests
- Rate Limit Middleware: Throttles excessive requests
- FastAPI Middleware: Request routing and validation
- Exception Handlers: Standardized error responses
CORS Configuration
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors.allowed_origins,
allow_credentials=True,
allow_methods=settings.cors.allowed_methods,
allow_headers=settings.cors.allowed_headers,
)
In development: allow_origins=["*"]
In production: Specific frontend URLs only
Custom Types
The application defines custom SQLAlchemy types for complex data.
Location: rego/core/types/
Color Type
Stores colors as hex strings but validates format:
class ColorType(TypeDecorator):
impl = String(7) # #RRGGBB
def process_bind_param(self, value, dialect):
if value is None:
return None
return str(value) # Convert Color object to string
def process_result_value(self, value, dialect):
if value is None:
return None
return Color(value) # Convert string to Color object
Usage in models:
This ensures colors are always valid hex codes and provides a consistent interface.