Real-time Events & WebSockets
This document explains how real-time collaboration works in Rego using WebSockets and Redis pub/sub.
Overview
Rego provides real-time updates to all connected clients whenever data changes. When a user creates, updates, or deletes a resource, all other users viewing the same board see the changes immediately without refreshing.
This is achieved through:
- WebSocket connections - Persistent connections between clients and server
- Redis pub/sub - Message distribution across multiple backend instances
- Event system - Structured messages for all state changes
Architecture
┌─────────────┐
│ Client 1 │──┐
└─────────────┘ │
│ ┌──────────────┐ ┌───────────┐
┌─────────────┐ │ │ │ │ │
│ Client 2 │──┼───▶│ Backend A │─────▶│ Redis │
└─────────────┘ │ │ (WebSocket) │ │ Pub/Sub │
│ └──────────────┘ └───────────┘
┌─────────────┐ │ │
│ Client 3 │──┘ │
└─────────────┘ │
│
┌──────────────┐ │
│ │ │
│ Backend B │◀───────────┘
│ (WebSocket) │
└──────────────┘
│
▼
┌─────────────┐
│ Client 4 │
└─────────────┘
Flow:
- User performs action (e.g., creates a card)
- Backend A processes request and updates database
- Backend A publishes event to Redis channel
- Redis broadcasts to all subscribed backend instances
- Each backend forwards event to its connected WebSocket clients
- All clients receive update and refresh their UI
WebSocket Connection
Establishing Connection
Clients connect to board-specific WebSocket endpoints:
const token = localStorage.getItem('auth_token');
const boardId = '123e4567-e89b-12d3-a456-426614174000';
const ws = new WebSocket(
`ws://localhost:8000/ws/boards/${boardId}?token=${token}`
);
ws.onopen = () => {
console.log('Connected to board');
};
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
handleBoardEvent(message);
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
ws.onclose = () => {
console.log('Disconnected from board');
// Implement reconnection logic
};
Authentication
WebSocket connections must be authenticated with a JWT token:
- Token passed as query parameter:
?token=<jwt> - Server validates token before accepting connection
- Invalid/expired tokens result in immediate closure (code 1008)
Authorization
The server verifies the user has access to the requested board:
- Extract user from JWT token
- Check
BoardMembertable for membership - Reject connection if user is not a member
Location: rego/domains/websocket/router.py
@router.websocket("/boards/{board_id}")
async def board_websocket(websocket: WebSocket, board_id: UUID):
token = websocket.query_params.get("token")
user = await get_user_from_token(token, session)
if not user:
await websocket.close(code=1008)
return
# Check board membership
membership = await get_membership(user.id, board_id)
if not membership:
await websocket.close(code=1008)
return
await manager.connect(board_id, websocket)
Connection Management
Connection Manager
The ConnectionManager maintains all active WebSocket connections grouped by board.
Location: rego/core/events/manager.py
class ConnectionManager:
def __init__(self):
self.connections: dict[UUID, list[WebSocket]] = defaultdict(list)
self._lock = asyncio.Lock()
Key operations:
connect()- Add client to board's connection listdisconnect()- Remove client and cleanupbroadcast_to_board()- Send message to all clients viewing a boardget_stats()- Return connection statistics
Heartbeat Mechanism
To detect dead connections, the server sends periodic ping messages:
// Server sends every 30 seconds
{
"type": "ping",
"board_id": "123e4567-e89b-12d3-a456-426614174000"
}
// Client must respond with pong
ws.send(JSON.stringify({
"type": "pong"
}));
If the client doesn't respond within 10 seconds, the server closes the connection.
Why this matters:
- Network issues may leave connections open indefinitely
- Clients may close without sending proper close frame
- Heartbeat ensures stale connections are cleaned up
- Prevents memory leaks and ghost connections
Automatic Reconnection
Clients should implement reconnection logic:
let ws;
let reconnectInterval = 1000;
const maxReconnectInterval = 30000;
function connect() {
ws = new WebSocket(`ws://localhost:8000/ws/boards/${boardId}?token=${token}`);
ws.onopen = () => {
console.log('Connected');
reconnectInterval = 1000; // Reset backoff
};
ws.onclose = () => {
console.log('Disconnected, reconnecting...');
setTimeout(() => {
reconnectInterval = Math.min(reconnectInterval * 2, maxReconnectInterval);
connect();
}, reconnectInterval);
};
}
connect();
Event Types
All events are strongly typed using the EventType enum.
Location: rego/core/events/types.py
Board Events
BOARD_UPDATED = "board.updated" # Board details changed
BOARD_DELETED = "board.deleted" # Board was deleted
BOARD_MEMBER_ADDED = "board.member_added" # New member joined
BOARD_MEMBER_REMOVED = "board.member_removed" # Member removed
Column Events
COLUMN_CREATED = "column.created" # New column added
COLUMN_UPDATED = "column.updated" # Column renamed or color changed
COLUMN_DELETED = "column.deleted" # Column removed
COLUMN_MOVED = "column.moved" # Column reordered
Card Events
CARD_CREATED = "card.created" # New card added
CARD_UPDATED = "card.updated" # Card modified (title, description, etc.)
CARD_DELETED = "card.deleted" # Card removed
CARD_MOVED = "card.moved" # Card moved to different column/position
CARD_LABELED = "card.labeled" # Label added to card
CARD_UNLABELED = "card.unlabeled" # Label removed from card
CARD_ASSIGNED = "card.assigned" # User assigned to card
CARD_UNASSIGNED = "card.unassigned" # User unassigned from card
Label Events
LABEL_CREATED = "label.created" # New label created
LABEL_UPDATED = "label.updated" # Label name/color changed
LABEL_DELETED = "label.deleted" # Label removed
Checklist Events
CHECKLIST_CREATED = "checklist.created" # New checklist added
CHECKLIST_UPDATED = "checklist.updated" # Checklist renamed
CHECKLIST_DELETED = "checklist.deleted" # Checklist removed
CHECKLIST_MOVED = "checklist.moved" # Checklist reordered
CHECKLIST_ITEM_CREATED = "checklist_item.created" # New item added
CHECKLIST_ITEM_UPDATED = "checklist_item.updated" # Item modified
CHECKLIST_ITEM_DELETED = "checklist_item.deleted" # Item removed
CHECKLIST_ITEM_MOVED = "checklist_item.moved" # Item reordered
CHECKLIST_ITEM_ASSIGNED = "checklist_item.assigned" # User assigned
CHECKLIST_ITEM_UNASSIGNED = "checklist_item.unassigned" # User unassigned
Attachment Events
ATTACHMENT_CREATED = "attachment.created" # File uploaded
ATTACHMENT_DELETED = "attachment.deleted" # File removed
Event Message Format
All events follow a consistent structure:
{
"type": "card.created",
"data": {
"id": "123e4567-e89b-12d3-a456-426614174000",
"title": "New card",
"column_id": "456e7890-e89b-12d3-a456-426614174000",
"rank": 1234567890,
"description": null,
"labels": [],
"assignees": [],
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-15T10:30:00Z"
},
"user_id": "789e0123-e89b-12d3-a456-426614174000",
"timestamp": "2024-01-15T10:30:00Z",
"version": "1.0"
}
Fields:
type- Event type (from EventType enum)data- The resource that changed (full object)user_id- User who triggered the change (null for system/automation)timestamp- When the event occurred (UTC)version- Event schema version for backward compatibility
Client Event Handling
Basic Handler
function handleBoardEvent(event) {
switch (event.type) {
case 'card.created':
addCardToUI(event.data);
break;
case 'card.updated':
updateCardInUI(event.data);
break;
case 'card.deleted':
removeCardFromUI(event.data.id);
break;
case 'card.moved':
moveCardInUI(event.data);
break;
case 'column.created':
addColumnToUI(event.data);
break;
// ... handle other event types
default:
console.warn('Unknown event type:', event.type);
}
}
Optimistic Updates
For better UX, clients should update their UI immediately (optimistically) before the WebSocket event arrives:
async function createCard(payload) {
// 1. Optimistic UI update
const tempCard = {
id: generateTempId(),
...payload,
isOptimistic: true
};
addCardToUI(tempCard);
try {
// 2. Send API request
const response = await fetch('/cards', {
method: 'POST',
body: JSON.stringify(payload)
});
const realCard = await response.json();
// 3. Replace temp with real data
replaceCardInUI(tempCard.id, realCard);
// 4. WebSocket event will arrive shortly
// If it's our own event (matching user_id), ignore it
} catch (error) {
// 5. Rollback optimistic update on error
removeCardFromUI(tempCard.id);
showError('Failed to create card');
}
}
Filtering Own Events
To avoid duplicate updates, clients should check if the event was triggered by themselves:
const currentUserId = getCurrentUser().id;
function handleBoardEvent(event) {
// Skip events we triggered ourselves (already updated optimistically)
if (event.user_id === currentUserId) {
return;
}
// Process events from other users
switch (event.type) {
// ... handle events
}
}
Presence Tracking
The WebSocket system tracks which users are currently viewing each board.
How It Works
- When a user connects, their presence is stored in Redis with TTL
- Heartbeat refreshes the TTL every 30 seconds
- When disconnected, presence is removed
- If connection dies without proper close, TTL expires after 60 seconds
Redis keys:
Presence API
Get list of users currently viewing a board:
Response:
{
"board_id": "123e4567-e89b-12d3-a456-426614174000",
"online_users": [
{
"user_id": "456e7890-e89b-12d3-a456-426614174000",
"username": "alice"
},
{
"user_id": "789e0123-e89b-12d3-a456-426614174000",
"username": "bob"
}
]
}
Presence Events
When users join or leave, special events are broadcast:
User joined:
User left:
Client Implementation
let onlineUsers = new Set();
function handlePresenceEvent(event) {
if (event.type === 'user_joined') {
onlineUsers.add(event.user_id);
showUserAvatar(event.username);
} else if (event.type === 'user_left') {
onlineUsers.delete(event.user_id);
hideUserAvatar(event.username);
}
}
// Refresh full presence list periodically
setInterval(async () => {
const response = await fetch(`/ws/boards/${boardId}/presence`);
const presence = await response.json();
onlineUsers = new Set(presence.online_users.map(u => u.user_id));
updatePresenceUI(presence.online_users);
}, 30000);
Publishing Events
Services publish events after successfully updating the database.
Location: rego/core/services/base.py
class BaseService:
async def publish_board_event(
self,
event_type: EventType,
board_id: UUID,
data: dict | BaseModel,
user_id: UUID | None = None,
):
message = WSMessage(
channel=WSChannel.BOARD,
channel_id=board_id,
payload=WSPayload(
type=event_type,
data=data,
user_id=user_id,
),
)
await self.publisher.publish(message)
Example from CardService:
async def create(self, payload: CardCreate, user: User) -> Card:
card = Card(...)
self.session.add(card)
await self.session.commit()
await self._refresh(card)
# Publish event
await self.publish_board_event(
event_type=EventType.CARD_CREATED,
board_id=column.board_id,
data=CardPublic.model_validate(card),
user_id=user.id,
)
return card
Redis Pub/Sub
Publisher
Location: rego/core/events/publisher.py
class EventPublisher:
async def publish(self, message: WSMessage) -> None:
channel = f"{message.channel.value}:{message.channel_id}"
await self.redis.publish(channel, message.payload.serialize())
Channel naming:
Example: board:123e4567-e89b-12d3-a456-426614174000
Subscriber
Location: rego/core/events/subscriber.py
The subscriber runs as a background task during application startup:
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
redis = get_redis()
manager = get_websocket_manager()
subscriber = EventSubscriber(redis, manager)
subscriber_task = asyncio.create_task(subscriber.start())
yield
# Shutdown
subscriber_task.cancel()
await subscriber_task
The subscriber listens to all board channels using pattern matching:
class EventSubscriber:
async def start(self):
pubsub = self.redis.pubsub()
await pubsub.psubscribe("board:*")
async for message in pubsub.listen():
if message["type"] == "pmessage":
channel = message["channel"]
board_id = self._extract_board_id(channel)
await self.manager.broadcast_to_board(
board_id,
message["data"]
)
Error Handling
Connection Errors
Client-side:
ws.onerror = (error) => {
console.error('WebSocket error:', error);
// Connection will automatically close and trigger onclose
};
ws.onclose = (event) => {
if (event.code === 1008) {
// Policy violation (auth failure)
console.error('Authentication failed');
redirectToLogin();
} else {
// Normal close or network issue
console.log('Connection closed, reconnecting...');
setTimeout(connect, 1000);
}
};
Server-side:
The server handles various error conditions:
- Invalid/expired token → Close with code 1008
- User not board member → Close with code 1008
- Dead connection → Detected via heartbeat timeout
- Redis unavailable → Events fail silently (logged)
Event Publishing Failures
If Redis is unavailable, event publishing fails gracefully:
async def publish(self, message: WSMessage) -> None:
try:
await self.redis.publish(channel, payload)
except Exception as e:
logger.error(f"Failed to publish event: {e}")
# Don't re-raise - event publishing shouldn't break user operations
Why fail silently?
- API requests should succeed even if real-time updates fail
- Users can refresh to see changes
- Redis issues shouldn't cascade to application errors
Performance Considerations
Message Size
Keep event payloads reasonable:
- Full resource objects are sent (not just IDs)
- Relationships are included (labels, assignees)
- Large fields (attachments) use deferred loading
- Average message size: 1-5 KB
Connection Limits
Each WebSocket connection consumes:
- Memory for connection state
- Redis key for presence tracking
- Background task for heartbeat
Limits:
- ~10,000 concurrent connections per backend instance (estimated)
- Multiple backend instances can scale horizontally
- Consider load balancer sticky sessions for WebSockets
Redis Channels
- Each board has its own Redis channel
- Pattern subscription (
board:*) matches all boards - Single subscriber per backend instance
- Messages only sent to relevant connections
Testing WebSockets
Manual Testing
Use a WebSocket client like wscat:
# Install wscat
npm install -g wscat
# Connect to board (replace TOKEN and BOARD_ID)
wscat -c "ws://localhost:8000/ws/boards/BOARD_ID?token=TOKEN"
# Server will send ping messages
< {"type": "ping", "board_id": "..."}
# Respond with pong
> {"type": "pong"}
# In another terminal, create a card via API
# You'll see the event:
< {"type": "card.created", "data": {...}, ...}
Automated Testing
Test WebSocket connections in pytest:
async def test_websocket_connection(client, auth_token, test_board):
async with client.websocket_connect(
f"/ws/boards/{test_board.id}?token={auth_token}"
) as websocket:
# Connection established
# Create a card via API
response = await client.post(
f"/boards/{test_board.id}/columns/{column_id}/cards",
json={"title": "Test card"}
)
# Receive event via WebSocket
message = await websocket.receive_json()
assert message["type"] == "card.created"
assert message["data"]["title"] == "Test card"
Troubleshooting
Connection immediately closes
- Check token is valid and not expired
- Verify user is a member of the board
- Check WebSocket URL is correct
- Ensure token is in query parameter, not header
Not receiving events
- Verify WebSocket connection is established
- Check Redis is running and connected
- Ensure event subscriber task is running (check
/health) - Verify user has board access
Events arrive but UI doesn't update
- Check event handler is registered
- Verify event type is being handled
- Check for JavaScript errors in console
- Ensure data structure matches expectations
Duplicate events
- Remove optimistic UI update
- Filter out own events using user_id
- Check event handler isn't registered multiple times
Presence not updating
- Check Redis connection
- Verify heartbeat is working (send pong responses)
- Check TTL on presence keys (should be 60s)
- Ensure cleanup happens on disconnect