Skip to content

Real-time Events & WebSockets

This document explains how real-time collaboration works in Rego using WebSockets and Redis pub/sub.

Overview

Rego provides real-time updates to all connected clients whenever data changes. When a user creates, updates, or deletes a resource, all other users viewing the same board see the changes immediately without refreshing.

This is achieved through:

  1. WebSocket connections - Persistent connections between clients and server
  2. Redis pub/sub - Message distribution across multiple backend instances
  3. Event system - Structured messages for all state changes

Architecture

┌─────────────┐
│   Client 1  │──┐
└─────────────┘  │
                 │    ┌──────────────┐      ┌───────────┐
┌─────────────┐  │    │              │      │           │
│   Client 2  │──┼───▶│  Backend A   │─────▶│   Redis   │
└─────────────┘  │    │  (WebSocket) │      │  Pub/Sub  │
                 │    └──────────────┘      └───────────┘
┌─────────────┐  │                                │
│   Client 3  │──┘                                │
└─────────────┘                                   │
                      ┌──────────────┐            │
                      │              │            │
                      │  Backend B   │◀───────────┘
                      │  (WebSocket) │
                      └──────────────┘
                      ┌─────────────┐
                      │   Client 4  │
                      └─────────────┘

Flow:

  1. User performs action (e.g., creates a card)
  2. Backend A processes request and updates database
  3. Backend A publishes event to Redis channel
  4. Redis broadcasts to all subscribed backend instances
  5. Each backend forwards event to its connected WebSocket clients
  6. All clients receive update and refresh their UI

WebSocket Connection

Establishing Connection

Clients connect to board-specific WebSocket endpoints:

const token = localStorage.getItem('auth_token');
const boardId = '123e4567-e89b-12d3-a456-426614174000';

const ws = new WebSocket(
  `ws://localhost:8000/ws/boards/${boardId}?token=${token}`
);

ws.onopen = () => {
  console.log('Connected to board');
};

ws.onmessage = (event) => {
  const message = JSON.parse(event.data);
  handleBoardEvent(message);
};

ws.onerror = (error) => {
  console.error('WebSocket error:', error);
};

ws.onclose = () => {
  console.log('Disconnected from board');
  // Implement reconnection logic
};

Authentication

WebSocket connections must be authenticated with a JWT token:

  • Token passed as query parameter: ?token=<jwt>
  • Server validates token before accepting connection
  • Invalid/expired tokens result in immediate closure (code 1008)

Authorization

The server verifies the user has access to the requested board:

  1. Extract user from JWT token
  2. Check BoardMember table for membership
  3. Reject connection if user is not a member

Location: rego/domains/websocket/router.py

@router.websocket("/boards/{board_id}")
async def board_websocket(websocket: WebSocket, board_id: UUID):
    token = websocket.query_params.get("token")
    user = await get_user_from_token(token, session)

    if not user:
        await websocket.close(code=1008)
        return

    # Check board membership
    membership = await get_membership(user.id, board_id)
    if not membership:
        await websocket.close(code=1008)
        return

    await manager.connect(board_id, websocket)

Connection Management

Connection Manager

The ConnectionManager maintains all active WebSocket connections grouped by board.

Location: rego/core/events/manager.py

class ConnectionManager:
    def __init__(self):
        self.connections: dict[UUID, list[WebSocket]] = defaultdict(list)
        self._lock = asyncio.Lock()

Key operations:

  • connect() - Add client to board's connection list
  • disconnect() - Remove client and cleanup
  • broadcast_to_board() - Send message to all clients viewing a board
  • get_stats() - Return connection statistics

Heartbeat Mechanism

To detect dead connections, the server sends periodic ping messages:

// Server sends every 30 seconds
{
  "type": "ping",
  "board_id": "123e4567-e89b-12d3-a456-426614174000"
}

// Client must respond with pong
ws.send(JSON.stringify({
  "type": "pong"
}));

If the client doesn't respond within 10 seconds, the server closes the connection.

Why this matters:

  • Network issues may leave connections open indefinitely
  • Clients may close without sending proper close frame
  • Heartbeat ensures stale connections are cleaned up
  • Prevents memory leaks and ghost connections

Automatic Reconnection

Clients should implement reconnection logic:

let ws;
let reconnectInterval = 1000;
const maxReconnectInterval = 30000;

function connect() {
  ws = new WebSocket(`ws://localhost:8000/ws/boards/${boardId}?token=${token}`);

  ws.onopen = () => {
    console.log('Connected');
    reconnectInterval = 1000; // Reset backoff
  };

  ws.onclose = () => {
    console.log('Disconnected, reconnecting...');
    setTimeout(() => {
      reconnectInterval = Math.min(reconnectInterval * 2, maxReconnectInterval);
      connect();
    }, reconnectInterval);
  };
}

connect();

Event Types

All events are strongly typed using the EventType enum.

Location: rego/core/events/types.py

Board Events

BOARD_UPDATED = "board.updated"           # Board details changed
BOARD_DELETED = "board.deleted"           # Board was deleted
BOARD_MEMBER_ADDED = "board.member_added" # New member joined
BOARD_MEMBER_REMOVED = "board.member_removed" # Member removed

Column Events

COLUMN_CREATED = "column.created"  # New column added
COLUMN_UPDATED = "column.updated"  # Column renamed or color changed
COLUMN_DELETED = "column.deleted"  # Column removed
COLUMN_MOVED = "column.moved"      # Column reordered

Card Events

CARD_CREATED = "card.created"      # New card added
CARD_UPDATED = "card.updated"      # Card modified (title, description, etc.)
CARD_DELETED = "card.deleted"      # Card removed
CARD_MOVED = "card.moved"          # Card moved to different column/position
CARD_LABELED = "card.labeled"      # Label added to card
CARD_UNLABELED = "card.unlabeled"  # Label removed from card
CARD_ASSIGNED = "card.assigned"    # User assigned to card
CARD_UNASSIGNED = "card.unassigned" # User unassigned from card

Label Events

LABEL_CREATED = "label.created"  # New label created
LABEL_UPDATED = "label.updated"  # Label name/color changed
LABEL_DELETED = "label.deleted"  # Label removed

Checklist Events

CHECKLIST_CREATED = "checklist.created"           # New checklist added
CHECKLIST_UPDATED = "checklist.updated"           # Checklist renamed
CHECKLIST_DELETED = "checklist.deleted"           # Checklist removed
CHECKLIST_MOVED = "checklist.moved"               # Checklist reordered
CHECKLIST_ITEM_CREATED = "checklist_item.created" # New item added
CHECKLIST_ITEM_UPDATED = "checklist_item.updated" # Item modified
CHECKLIST_ITEM_DELETED = "checklist_item.deleted" # Item removed
CHECKLIST_ITEM_MOVED = "checklist_item.moved"     # Item reordered
CHECKLIST_ITEM_ASSIGNED = "checklist_item.assigned"     # User assigned
CHECKLIST_ITEM_UNASSIGNED = "checklist_item.unassigned" # User unassigned

Attachment Events

ATTACHMENT_CREATED = "attachment.created"  # File uploaded
ATTACHMENT_DELETED = "attachment.deleted"  # File removed

Event Message Format

All events follow a consistent structure:

{
  "type": "card.created",
  "data": {
    "id": "123e4567-e89b-12d3-a456-426614174000",
    "title": "New card",
    "column_id": "456e7890-e89b-12d3-a456-426614174000",
    "rank": 1234567890,
    "description": null,
    "labels": [],
    "assignees": [],
    "created_at": "2024-01-15T10:30:00Z",
    "updated_at": "2024-01-15T10:30:00Z"
  },
  "user_id": "789e0123-e89b-12d3-a456-426614174000",
  "timestamp": "2024-01-15T10:30:00Z",
  "version": "1.0"
}

Fields:

  • type - Event type (from EventType enum)
  • data - The resource that changed (full object)
  • user_id - User who triggered the change (null for system/automation)
  • timestamp - When the event occurred (UTC)
  • version - Event schema version for backward compatibility

Client Event Handling

Basic Handler

function handleBoardEvent(event) {
  switch (event.type) {
    case 'card.created':
      addCardToUI(event.data);
      break;

    case 'card.updated':
      updateCardInUI(event.data);
      break;

    case 'card.deleted':
      removeCardFromUI(event.data.id);
      break;

    case 'card.moved':
      moveCardInUI(event.data);
      break;

    case 'column.created':
      addColumnToUI(event.data);
      break;

    // ... handle other event types

    default:
      console.warn('Unknown event type:', event.type);
  }
}

Optimistic Updates

For better UX, clients should update their UI immediately (optimistically) before the WebSocket event arrives:

async function createCard(payload) {
  // 1. Optimistic UI update
  const tempCard = {
    id: generateTempId(),
    ...payload,
    isOptimistic: true
  };
  addCardToUI(tempCard);

  try {
    // 2. Send API request
    const response = await fetch('/cards', {
      method: 'POST',
      body: JSON.stringify(payload)
    });
    const realCard = await response.json();

    // 3. Replace temp with real data
    replaceCardInUI(tempCard.id, realCard);

    // 4. WebSocket event will arrive shortly
    // If it's our own event (matching user_id), ignore it

  } catch (error) {
    // 5. Rollback optimistic update on error
    removeCardFromUI(tempCard.id);
    showError('Failed to create card');
  }
}

Filtering Own Events

To avoid duplicate updates, clients should check if the event was triggered by themselves:

const currentUserId = getCurrentUser().id;

function handleBoardEvent(event) {
  // Skip events we triggered ourselves (already updated optimistically)
  if (event.user_id === currentUserId) {
    return;
  }

  // Process events from other users
  switch (event.type) {
    // ... handle events
  }
}

Presence Tracking

The WebSocket system tracks which users are currently viewing each board.

How It Works

  1. When a user connects, their presence is stored in Redis with TTL
  2. Heartbeat refreshes the TTL every 30 seconds
  3. When disconnected, presence is removed
  4. If connection dies without proper close, TTL expires after 60 seconds

Redis keys:

board:{board_id}:presence:{user_id} = "online"
TTL: 60 seconds

Presence API

Get list of users currently viewing a board:

GET /ws/boards/{board_id}/presence
Authorization: Bearer <token>

Response:

{
  "board_id": "123e4567-e89b-12d3-a456-426614174000",
  "online_users": [
    {
      "user_id": "456e7890-e89b-12d3-a456-426614174000",
      "username": "alice"
    },
    {
      "user_id": "789e0123-e89b-12d3-a456-426614174000",
      "username": "bob"
    }
  ]
}

Presence Events

When users join or leave, special events are broadcast:

User joined:

{
  "type": "user_joined",
  "user_id": "456e7890-e89b-12d3-a456-426614174000",
  "username": "alice"
}

User left:

{
  "type": "user_left",
  "user_id": "456e7890-e89b-12d3-a456-426614174000",
  "username": "alice"
}

Client Implementation

let onlineUsers = new Set();

function handlePresenceEvent(event) {
  if (event.type === 'user_joined') {
    onlineUsers.add(event.user_id);
    showUserAvatar(event.username);
  } else if (event.type === 'user_left') {
    onlineUsers.delete(event.user_id);
    hideUserAvatar(event.username);
  }
}

// Refresh full presence list periodically
setInterval(async () => {
  const response = await fetch(`/ws/boards/${boardId}/presence`);
  const presence = await response.json();
  onlineUsers = new Set(presence.online_users.map(u => u.user_id));
  updatePresenceUI(presence.online_users);
}, 30000);

Publishing Events

Services publish events after successfully updating the database.

Location: rego/core/services/base.py

class BaseService:
    async def publish_board_event(
        self,
        event_type: EventType,
        board_id: UUID,
        data: dict | BaseModel,
        user_id: UUID | None = None,
    ):
        message = WSMessage(
            channel=WSChannel.BOARD,
            channel_id=board_id,
            payload=WSPayload(
                type=event_type,
                data=data,
                user_id=user_id,
            ),
        )
        await self.publisher.publish(message)

Example from CardService:

async def create(self, payload: CardCreate, user: User) -> Card:
    card = Card(...)
    self.session.add(card)
    await self.session.commit()
    await self._refresh(card)

    # Publish event
    await self.publish_board_event(
        event_type=EventType.CARD_CREATED,
        board_id=column.board_id,
        data=CardPublic.model_validate(card),
        user_id=user.id,
    )

    return card

Redis Pub/Sub

Publisher

Location: rego/core/events/publisher.py

class EventPublisher:
    async def publish(self, message: WSMessage) -> None:
        channel = f"{message.channel.value}:{message.channel_id}"
        await self.redis.publish(channel, message.payload.serialize())

Channel naming:

board:{board_id}

Example: board:123e4567-e89b-12d3-a456-426614174000

Subscriber

Location: rego/core/events/subscriber.py

The subscriber runs as a background task during application startup:

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    redis = get_redis()
    manager = get_websocket_manager()
    subscriber = EventSubscriber(redis, manager)
    subscriber_task = asyncio.create_task(subscriber.start())

    yield

    # Shutdown
    subscriber_task.cancel()
    await subscriber_task

The subscriber listens to all board channels using pattern matching:

class EventSubscriber:
    async def start(self):
        pubsub = self.redis.pubsub()
        await pubsub.psubscribe("board:*")

        async for message in pubsub.listen():
            if message["type"] == "pmessage":
                channel = message["channel"]
                board_id = self._extract_board_id(channel)
                await self.manager.broadcast_to_board(
                    board_id,
                    message["data"]
                )

Error Handling

Connection Errors

Client-side:

ws.onerror = (error) => {
  console.error('WebSocket error:', error);
  // Connection will automatically close and trigger onclose
};

ws.onclose = (event) => {
  if (event.code === 1008) {
    // Policy violation (auth failure)
    console.error('Authentication failed');
    redirectToLogin();
  } else {
    // Normal close or network issue
    console.log('Connection closed, reconnecting...');
    setTimeout(connect, 1000);
  }
};

Server-side:

The server handles various error conditions:

  • Invalid/expired token → Close with code 1008
  • User not board member → Close with code 1008
  • Dead connection → Detected via heartbeat timeout
  • Redis unavailable → Events fail silently (logged)

Event Publishing Failures

If Redis is unavailable, event publishing fails gracefully:

async def publish(self, message: WSMessage) -> None:
    try:
        await self.redis.publish(channel, payload)
    except Exception as e:
        logger.error(f"Failed to publish event: {e}")
        # Don't re-raise - event publishing shouldn't break user operations

Why fail silently?

  • API requests should succeed even if real-time updates fail
  • Users can refresh to see changes
  • Redis issues shouldn't cascade to application errors

Performance Considerations

Message Size

Keep event payloads reasonable:

  • Full resource objects are sent (not just IDs)
  • Relationships are included (labels, assignees)
  • Large fields (attachments) use deferred loading
  • Average message size: 1-5 KB

Connection Limits

Each WebSocket connection consumes:

  • Memory for connection state
  • Redis key for presence tracking
  • Background task for heartbeat

Limits:

  • ~10,000 concurrent connections per backend instance (estimated)
  • Multiple backend instances can scale horizontally
  • Consider load balancer sticky sessions for WebSockets

Redis Channels

  • Each board has its own Redis channel
  • Pattern subscription (board:*) matches all boards
  • Single subscriber per backend instance
  • Messages only sent to relevant connections

Testing WebSockets

Manual Testing

Use a WebSocket client like wscat:

# Install wscat
npm install -g wscat

# Connect to board (replace TOKEN and BOARD_ID)
wscat -c "ws://localhost:8000/ws/boards/BOARD_ID?token=TOKEN"

# Server will send ping messages
< {"type": "ping", "board_id": "..."}

# Respond with pong
> {"type": "pong"}

# In another terminal, create a card via API
# You'll see the event:
< {"type": "card.created", "data": {...}, ...}

Automated Testing

Test WebSocket connections in pytest:

async def test_websocket_connection(client, auth_token, test_board):
    async with client.websocket_connect(
        f"/ws/boards/{test_board.id}?token={auth_token}"
    ) as websocket:
        # Connection established

        # Create a card via API
        response = await client.post(
            f"/boards/{test_board.id}/columns/{column_id}/cards",
            json={"title": "Test card"}
        )

        # Receive event via WebSocket
        message = await websocket.receive_json()
        assert message["type"] == "card.created"
        assert message["data"]["title"] == "Test card"

Troubleshooting

Connection immediately closes

  • Check token is valid and not expired
  • Verify user is a member of the board
  • Check WebSocket URL is correct
  • Ensure token is in query parameter, not header

Not receiving events

  • Verify WebSocket connection is established
  • Check Redis is running and connected
  • Ensure event subscriber task is running (check /health)
  • Verify user has board access

Events arrive but UI doesn't update

  • Check event handler is registered
  • Verify event type is being handled
  • Check for JavaScript errors in console
  • Ensure data structure matches expectations

Duplicate events

  • Remove optimistic UI update
  • Filter out own events using user_id
  • Check event handler isn't registered multiple times

Presence not updating

  • Check Redis connection
  • Verify heartbeat is working (send pong responses)
  • Check TTL on presence keys (should be 60s)
  • Ensure cleanup happens on disconnect