Performance

Building Scalable Real-Time Features with WebSockets and Redis

A comprehensive guide to implementing real-time collaboration features—covering connection management, conflict resolution, and scaling to thousands of concurrent users.

WebSockets 12 min read
What You'll Learn
  • WebSocket connection lifecycle and reconnection strategies
  • Redis pub/sub for horizontal scaling across multiple servers
  • Operational transformation for conflict-free concurrent edits
  • Production-ready error handling and monitoring
  • Performance optimizations for 10,000+ concurrent connections

The Real-Time Challenge

Building real-time features sounds straightforward: open a WebSocket connection, send messages back and forth. But when you need to scale to thousands of concurrent users editing the same document, coordinating presence across multiple servers, and handling network failures gracefully, the complexity explodes.

I learned this the hard way while building a collaborative document editor. Our initial prototype worked beautifully with 10 users. At 100 users, we started seeing conflicts. At 1,000 users, our single Node.js server couldn't handle the load. At 10,000 users, everything fell apart.

This article covers the architecture patterns, code implementations, and lessons learned from building a production-ready real-time system.

WebSocket Connection Management

The foundation of any real-time system is reliable connection management. WebSockets provide full-duplex communication, but they're not magical—connections drop, networks fail, and clients go offline.

Basic WebSocket Server Setup

Let's start with a basic Socket.io server implementation:

// server.js
import express from 'express';
import { createServer } from 'http';
import { Server } from 'socket.io';
import Redis from 'ioredis';

const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer, {
  cors: { origin: process.env.CLIENT_URL },
  pingTimeout: 60000,
  pingInterval: 25000,
});

// Redis clients for pub/sub
const publisher = new Redis(process.env.REDIS_URL);
const subscriber = new Redis(process.env.REDIS_URL);

// Connection tracking
const connections = new Map();

io.on('connection', (socket) => {
  console.log(`Client connected: ${socket.id}`);

  // Track connection metadata
  connections.set(socket.id, {
    userId: socket.handshake.auth.userId,
    connectedAt: Date.now(),
    rooms: new Set(),
  });

  socket.on('disconnect', (reason) => {
    console.log(`Client disconnected: ${socket.id}, reason: ${reason}`);
    handleDisconnect(socket);
  });

  socket.on('error', (error) => {
    console.error(`Socket error for ${socket.id}:`, error);
  });
});

httpServer.listen(3000, () => {
  console.log('WebSocket server running on port 3000');
});

Reconnection Strategy

Clients need to handle disconnections gracefully. Here's a robust reconnection strategy:

// client.js
import { io } from 'socket.io-client';

class RealtimeClient {
  constructor(url, options = {}) {
    this.socket = io(url, {
      auth: { userId: options.userId },
      reconnection: true,
      reconnectionDelay: 1000,
      reconnectionDelayMax: 5000,
      reconnectionAttempts: Infinity,
      transports: ['websocket', 'polling'],
    });

    this.setupEventHandlers();
    this.messageQueue = [];
    this.isConnected = false;
  }

  setupEventHandlers() {
    this.socket.on('connect', () => {
      console.log('Connected to server');
      this.isConnected = true;
      this.flushMessageQueue();
    });

    this.socket.on('disconnect', (reason) => {
      console.log('Disconnected:', reason);
      this.isConnected = false;

      if (reason === 'io server disconnect') {
        // Server disconnected us, reconnect manually
        this.socket.connect();
      }
    });

    this.socket.on('reconnect_attempt', (attemptNumber) => {
      console.log(`Reconnection attempt ${attemptNumber}`);
    });

    this.socket.on('reconnect_failed', () => {
      console.error('Reconnection failed');
      this.handleConnectionFailure();
    });
  }

  send(event, data) {
    if (this.isConnected) {
      this.socket.emit(event, data);
    } else {
      // Queue messages while disconnected
      this.messageQueue.push({ event, data, timestamp: Date.now() });
    }
  }

  flushMessageQueue() {
    while (this.messageQueue.length > 0) {
      const { event, data } = this.messageQueue.shift();
      this.socket.emit(event, data);
    }
  }

  handleConnectionFailure() {
    // Show user-friendly error
    this.emit('connection-failed', {
      message: 'Unable to connect to server. Please check your connection.',
    });
  }
}

Horizontal Scaling with Redis Pub/Sub

Once you need to scale beyond a single server, you face a critical problem: WebSocket connections are stateful and tied to specific server instances. If User A connects to Server 1 and User B connects to Server 2, how do they communicate?

The answer is Redis pub/sub, which acts as a message broker between server instances.

Implementing Redis Adapter

// server.js (continued)
import { createAdapter } from '@socket.io/redis-adapter';

const pubClient = new Redis(process.env.REDIS_URL);
const subClient = pubClient.duplicate();

io.adapter(createAdapter(pubClient, subClient));

// Now messages are automatically distributed across all servers
io.to('document:123').emit('update', {
  type: 'text-change',
  changes: [/* ... */],
});

Custom Event Distribution

For more control, you can implement custom pub/sub logic:

class RoomManager {
  constructor(io, redis) {
    this.io = io;
    this.publisher = redis.duplicate();
    this.subscriber = redis.duplicate();
    this.rooms = new Map();

    this.setupSubscriber();
  }

  setupSubscriber() {
    this.subscriber.psubscribe('room:*', (err) => {
      if (err) console.error('Subscribe error:', err);
    });

    this.subscriber.on('pmessage', (pattern, channel, message) => {
      const roomId = channel.split(':')[1];
      const data = JSON.parse(message);

      // Broadcast to local connections only
      this.io.to(roomId).emit(data.event, data.payload);
    });
  }

  async broadcastToRoom(roomId, event, payload) {
    // Publish to Redis for cross-server distribution
    await this.publisher.publish(`room:${roomId}`, JSON.stringify({
      event,
      payload,
      timestamp: Date.now(),
      serverId: process.env.SERVER_ID,
    }));
  }

  async joinRoom(socket, roomId, metadata = {}) {
    socket.join(roomId);

    // Track room membership in Redis
    await this.publisher.sadd(`room:${roomId}:members`, socket.id);
    await this.publisher.hset(`member:${socket.id}`, {
      userId: metadata.userId,
      roomId,
      joinedAt: Date.now(),
    });

    // Broadcast presence update
    await this.broadcastToRoom(roomId, 'user-joined', {
      userId: metadata.userId,
      socketId: socket.id,
    });
  }

  async leaveRoom(socket, roomId) {
    socket.leave(roomId);

    await this.publisher.srem(`room:${roomId}:members`, socket.id);
    await this.publisher.del(`member:${socket.id}`);

    const metadata = await this.publisher.hgetall(`member:${socket.id}`);
    await this.broadcastToRoom(roomId, 'user-left', {
      userId: metadata.userId,
      socketId: socket.id,
    });
  }
}

Operational Transformation for Conflict Resolution

When multiple users edit the same document simultaneously, you need a strategy to resolve conflicts. Operational Transformation (OT) is the battle-tested approach used by Google Docs and other collaborative editors.

Basic OT Implementation

class OperationalTransform {
  // Transform operation against another operation
  static transform(op1, op2) {
    // Insert vs Insert
    if (op1.type === 'insert' && op2.type === 'insert') {
      if (op1.position < op2.position) {
        return { ...op2, position: op2.position + op1.text.length };
      } else if (op1.position > op2.position) {
        return { ...op1, position: op1.position + op2.text.length };
      } else {
        // Same position - use client ID for tie-breaking
        return op1.clientId < op2.clientId
          ? { ...op2, position: op2.position + op1.text.length }
          : { ...op1, position: op1.position + op2.text.length };
      }
    }

    // Insert vs Delete
    if (op1.type === 'insert' && op2.type === 'delete') {
      if (op1.position <= op2.position) {
        return { ...op2, position: op2.position + op1.text.length };
      } else if (op1.position > op2.position + op2.length) {
        return { ...op1, position: op1.position - op2.length };
      } else {
        // Insert is within deleted range
        return { ...op1, position: op2.position };
      }
    }

    // Delete vs Delete
    if (op1.type === 'delete' && op2.type === 'delete') {
      if (op1.position + op1.length <= op2.position) {
        return { ...op2, position: op2.position - op1.length };
      } else if (op2.position + op2.length <= op1.position) {
        return { ...op1, position: op1.position - op2.length };
      } else {
        // Overlapping deletes - adjust ranges
        const start = Math.max(op1.position, op2.position);
        const end = Math.min(
          op1.position + op1.length,
          op2.position + op2.length
        );
        return {
          ...op2,
          position: Math.min(op1.position, op2.position),
          length: op2.length - (end - start),
        };
      }
    }

    return op2;
  }

  // Apply operation to document
  static apply(document, operation) {
    if (operation.type === 'insert') {
      return (
        document.slice(0, operation.position) +
        operation.text +
        document.slice(operation.position)
      );
    } else if (operation.type === 'delete') {
      return (
        document.slice(0, operation.position) +
        document.slice(operation.position + operation.length)
      );
    }
    return document;
  }
}

// Server-side operation handling
class DocumentServer {
  constructor() {
    this.documents = new Map();
    this.operationHistory = new Map();
  }

  async applyOperation(documentId, operation, clientVersion) {
    const doc = this.documents.get(documentId);
    const history = this.operationHistory.get(documentId) || [];

    // Transform against all operations since client's version
    let transformedOp = operation;
    for (let i = clientVersion; i < history.length; i++) {
      transformedOp = OperationalTransform.transform(
        transformedOp,
        history[i]
      );
    }

    // Apply transformed operation
    const newContent = OperationalTransform.apply(
      doc.content,
      transformedOp
    );

    doc.content = newContent;
    doc.version++;
    history.push(transformedOp);

    return {
      operation: transformedOp,
      version: doc.version,
    };
  }
}

Performance Optimization

Connection Pooling

Limit the number of connections per client to prevent resource exhaustion:

const connectionLimiter = new Map();

io.use((socket, next) => {
  const userId = socket.handshake.auth.userId;
  const userConnections = connectionLimiter.get(userId) || 0;

  if (userConnections >= 5) {
    return next(new Error('Too many connections'));
  }

  connectionLimiter.set(userId, userConnections + 1);

  socket.on('disconnect', () => {
    connectionLimiter.set(userId, connectionLimiter.get(userId) - 1);
  });

  next();
});

Message Batching

Reduce network overhead by batching rapid updates:

class MessageBatcher {
  constructor(flushInterval = 50) {
    this.queue = [];
    this.flushInterval = flushInterval;
    this.timer = null;
  }

  add(message) {
    this.queue.push(message);

    if (!this.timer) {
      this.timer = setTimeout(() => this.flush(), this.flushInterval);
    }
  }

  flush() {
    if (this.queue.length === 0) return;

    const batch = this.queue.splice(0);
    this.emit('batch', batch);

    clearTimeout(this.timer);
    this.timer = null;
  }
}

Binary Protocol

For high-throughput scenarios, use binary encoding instead of JSON:

// Using MessagePack for efficient binary serialization
import msgpack from 'msgpack-lite';

socket.emit('update', msgpack.encode({
  type: 'change',
  changes: largeChangeset,
}));

socket.on('update', (data) => {
  const decoded = msgpack.decode(data);
  applyChanges(decoded.changes);
});

Monitoring and Observability

Production real-time systems need comprehensive monitoring:

class MetricsCollector {
  constructor() {
    this.metrics = {
      connections: 0,
      messagesPerSecond: 0,
      avgLatency: 0,
      errors: 0,
    };

    setInterval(() => this.calculateMetrics(), 1000);
  }

  recordMessage(latency) {
    this.messageCount++;
    this.totalLatency += latency;
  }

  recordError(error) {
    this.metrics.errors++;
    console.error('Real-time error:', error);
  }

  calculateMetrics() {
    this.metrics.messagesPerSecond = this.messageCount;
    this.metrics.avgLatency = this.totalLatency / this.messageCount || 0;

    // Reset counters
    this.messageCount = 0;
    this.totalLatency = 0;

    // Export to monitoring system
    this.exportMetrics();
  }

  exportMetrics() {
    // Send to Prometheus, DataDog, etc.
    console.log('Metrics:', this.metrics);
  }
}

Production Checklist

Before Going Live
  • ✓ Implement connection rate limiting
  • ✓ Add authentication and authorization
  • ✓ Set up Redis persistence for critical data
  • ✓ Configure load balancer with sticky sessions (or Redis adapter)
  • ✓ Add comprehensive error logging
  • ✓ Implement graceful shutdown handling
  • ✓ Set up health check endpoints
  • ✓ Test reconnection scenarios extensively
  • ✓ Monitor memory usage and connection counts
  • ✓ Document message protocols and versioning

Lessons Learned

Start with Socket.io, not raw WebSockets. The battle-tested reconnection logic and fallback transports are worth the overhead.

Redis is non-negotiable for scaling. Don't try to build cross-server communication yourself.

Operational Transformation is complex but necessary. For document collaboration, there's no simpler alternative that works reliably.

Monitor everything. Real-time systems fail in subtle ways. You need visibility into connection states, message rates, and latencies.

Design for disconnections. Networks fail constantly. Your system should handle disconnections as a normal part of operation, not an exceptional case.