Building Scalable Real-Time Features with WebSockets and Redis
A comprehensive guide to implementing real-time collaboration features—covering connection management, conflict resolution, and scaling to thousands of concurrent users.
- WebSocket connection lifecycle and reconnection strategies
- Redis pub/sub for horizontal scaling across multiple servers
- Operational transformation for conflict-free concurrent edits
- Production-ready error handling and monitoring
- Performance optimizations for 10,000+ concurrent connections
The Real-Time Challenge
Building real-time features sounds straightforward: open a WebSocket connection, send messages back and forth. But when you need to scale to thousands of concurrent users editing the same document, coordinating presence across multiple servers, and handling network failures gracefully, the complexity explodes.
I learned this the hard way while building a collaborative document editor. Our initial prototype worked beautifully with 10 users. At 100 users, we started seeing conflicts. At 1,000 users, our single Node.js server couldn't handle the load. At 10,000 users, everything fell apart.
This article covers the architecture patterns, code implementations, and lessons learned from building a production-ready real-time system.
WebSocket Connection Management
The foundation of any real-time system is reliable connection management. WebSockets provide full-duplex communication, but they're not magical—connections drop, networks fail, and clients go offline.
Basic WebSocket Server Setup
Let's start with a basic Socket.io server implementation:
// server.js
import express from 'express';
import { createServer } from 'http';
import { Server } from 'socket.io';
import Redis from 'ioredis';
const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer, {
cors: { origin: process.env.CLIENT_URL },
pingTimeout: 60000,
pingInterval: 25000,
});
// Redis clients for pub/sub
const publisher = new Redis(process.env.REDIS_URL);
const subscriber = new Redis(process.env.REDIS_URL);
// Connection tracking
const connections = new Map();
io.on('connection', (socket) => {
console.log(`Client connected: ${socket.id}`);
// Track connection metadata
connections.set(socket.id, {
userId: socket.handshake.auth.userId,
connectedAt: Date.now(),
rooms: new Set(),
});
socket.on('disconnect', (reason) => {
console.log(`Client disconnected: ${socket.id}, reason: ${reason}`);
handleDisconnect(socket);
});
socket.on('error', (error) => {
console.error(`Socket error for ${socket.id}:`, error);
});
});
httpServer.listen(3000, () => {
console.log('WebSocket server running on port 3000');
});
Reconnection Strategy
Clients need to handle disconnections gracefully. Here's a robust reconnection strategy:
// client.js
import { io } from 'socket.io-client';
class RealtimeClient {
constructor(url, options = {}) {
this.socket = io(url, {
auth: { userId: options.userId },
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
reconnectionAttempts: Infinity,
transports: ['websocket', 'polling'],
});
this.setupEventHandlers();
this.messageQueue = [];
this.isConnected = false;
}
setupEventHandlers() {
this.socket.on('connect', () => {
console.log('Connected to server');
this.isConnected = true;
this.flushMessageQueue();
});
this.socket.on('disconnect', (reason) => {
console.log('Disconnected:', reason);
this.isConnected = false;
if (reason === 'io server disconnect') {
// Server disconnected us, reconnect manually
this.socket.connect();
}
});
this.socket.on('reconnect_attempt', (attemptNumber) => {
console.log(`Reconnection attempt ${attemptNumber}`);
});
this.socket.on('reconnect_failed', () => {
console.error('Reconnection failed');
this.handleConnectionFailure();
});
}
send(event, data) {
if (this.isConnected) {
this.socket.emit(event, data);
} else {
// Queue messages while disconnected
this.messageQueue.push({ event, data, timestamp: Date.now() });
}
}
flushMessageQueue() {
while (this.messageQueue.length > 0) {
const { event, data } = this.messageQueue.shift();
this.socket.emit(event, data);
}
}
handleConnectionFailure() {
// Show user-friendly error
this.emit('connection-failed', {
message: 'Unable to connect to server. Please check your connection.',
});
}
}
Horizontal Scaling with Redis Pub/Sub
Once you need to scale beyond a single server, you face a critical problem: WebSocket connections are stateful and tied to specific server instances. If User A connects to Server 1 and User B connects to Server 2, how do they communicate?
The answer is Redis pub/sub, which acts as a message broker between server instances.
Implementing Redis Adapter
// server.js (continued)
import { createAdapter } from '@socket.io/redis-adapter';
const pubClient = new Redis(process.env.REDIS_URL);
const subClient = pubClient.duplicate();
io.adapter(createAdapter(pubClient, subClient));
// Now messages are automatically distributed across all servers
io.to('document:123').emit('update', {
type: 'text-change',
changes: [/* ... */],
});
Custom Event Distribution
For more control, you can implement custom pub/sub logic:
class RoomManager {
constructor(io, redis) {
this.io = io;
this.publisher = redis.duplicate();
this.subscriber = redis.duplicate();
this.rooms = new Map();
this.setupSubscriber();
}
setupSubscriber() {
this.subscriber.psubscribe('room:*', (err) => {
if (err) console.error('Subscribe error:', err);
});
this.subscriber.on('pmessage', (pattern, channel, message) => {
const roomId = channel.split(':')[1];
const data = JSON.parse(message);
// Broadcast to local connections only
this.io.to(roomId).emit(data.event, data.payload);
});
}
async broadcastToRoom(roomId, event, payload) {
// Publish to Redis for cross-server distribution
await this.publisher.publish(`room:${roomId}`, JSON.stringify({
event,
payload,
timestamp: Date.now(),
serverId: process.env.SERVER_ID,
}));
}
async joinRoom(socket, roomId, metadata = {}) {
socket.join(roomId);
// Track room membership in Redis
await this.publisher.sadd(`room:${roomId}:members`, socket.id);
await this.publisher.hset(`member:${socket.id}`, {
userId: metadata.userId,
roomId,
joinedAt: Date.now(),
});
// Broadcast presence update
await this.broadcastToRoom(roomId, 'user-joined', {
userId: metadata.userId,
socketId: socket.id,
});
}
async leaveRoom(socket, roomId) {
socket.leave(roomId);
await this.publisher.srem(`room:${roomId}:members`, socket.id);
await this.publisher.del(`member:${socket.id}`);
const metadata = await this.publisher.hgetall(`member:${socket.id}`);
await this.broadcastToRoom(roomId, 'user-left', {
userId: metadata.userId,
socketId: socket.id,
});
}
}
Operational Transformation for Conflict Resolution
When multiple users edit the same document simultaneously, you need a strategy to resolve conflicts. Operational Transformation (OT) is the battle-tested approach used by Google Docs and other collaborative editors.
Basic OT Implementation
class OperationalTransform {
// Transform operation against another operation
static transform(op1, op2) {
// Insert vs Insert
if (op1.type === 'insert' && op2.type === 'insert') {
if (op1.position < op2.position) {
return { ...op2, position: op2.position + op1.text.length };
} else if (op1.position > op2.position) {
return { ...op1, position: op1.position + op2.text.length };
} else {
// Same position - use client ID for tie-breaking
return op1.clientId < op2.clientId
? { ...op2, position: op2.position + op1.text.length }
: { ...op1, position: op1.position + op2.text.length };
}
}
// Insert vs Delete
if (op1.type === 'insert' && op2.type === 'delete') {
if (op1.position <= op2.position) {
return { ...op2, position: op2.position + op1.text.length };
} else if (op1.position > op2.position + op2.length) {
return { ...op1, position: op1.position - op2.length };
} else {
// Insert is within deleted range
return { ...op1, position: op2.position };
}
}
// Delete vs Delete
if (op1.type === 'delete' && op2.type === 'delete') {
if (op1.position + op1.length <= op2.position) {
return { ...op2, position: op2.position - op1.length };
} else if (op2.position + op2.length <= op1.position) {
return { ...op1, position: op1.position - op2.length };
} else {
// Overlapping deletes - adjust ranges
const start = Math.max(op1.position, op2.position);
const end = Math.min(
op1.position + op1.length,
op2.position + op2.length
);
return {
...op2,
position: Math.min(op1.position, op2.position),
length: op2.length - (end - start),
};
}
}
return op2;
}
// Apply operation to document
static apply(document, operation) {
if (operation.type === 'insert') {
return (
document.slice(0, operation.position) +
operation.text +
document.slice(operation.position)
);
} else if (operation.type === 'delete') {
return (
document.slice(0, operation.position) +
document.slice(operation.position + operation.length)
);
}
return document;
}
}
// Server-side operation handling
class DocumentServer {
constructor() {
this.documents = new Map();
this.operationHistory = new Map();
}
async applyOperation(documentId, operation, clientVersion) {
const doc = this.documents.get(documentId);
const history = this.operationHistory.get(documentId) || [];
// Transform against all operations since client's version
let transformedOp = operation;
for (let i = clientVersion; i < history.length; i++) {
transformedOp = OperationalTransform.transform(
transformedOp,
history[i]
);
}
// Apply transformed operation
const newContent = OperationalTransform.apply(
doc.content,
transformedOp
);
doc.content = newContent;
doc.version++;
history.push(transformedOp);
return {
operation: transformedOp,
version: doc.version,
};
}
}
Performance Optimization
Connection Pooling
Limit the number of connections per client to prevent resource exhaustion:
const connectionLimiter = new Map();
io.use((socket, next) => {
const userId = socket.handshake.auth.userId;
const userConnections = connectionLimiter.get(userId) || 0;
if (userConnections >= 5) {
return next(new Error('Too many connections'));
}
connectionLimiter.set(userId, userConnections + 1);
socket.on('disconnect', () => {
connectionLimiter.set(userId, connectionLimiter.get(userId) - 1);
});
next();
});
Message Batching
Reduce network overhead by batching rapid updates:
class MessageBatcher {
constructor(flushInterval = 50) {
this.queue = [];
this.flushInterval = flushInterval;
this.timer = null;
}
add(message) {
this.queue.push(message);
if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
}
flush() {
if (this.queue.length === 0) return;
const batch = this.queue.splice(0);
this.emit('batch', batch);
clearTimeout(this.timer);
this.timer = null;
}
}
Binary Protocol
For high-throughput scenarios, use binary encoding instead of JSON:
// Using MessagePack for efficient binary serialization
import msgpack from 'msgpack-lite';
socket.emit('update', msgpack.encode({
type: 'change',
changes: largeChangeset,
}));
socket.on('update', (data) => {
const decoded = msgpack.decode(data);
applyChanges(decoded.changes);
});
Monitoring and Observability
Production real-time systems need comprehensive monitoring:
class MetricsCollector {
constructor() {
this.metrics = {
connections: 0,
messagesPerSecond: 0,
avgLatency: 0,
errors: 0,
};
setInterval(() => this.calculateMetrics(), 1000);
}
recordMessage(latency) {
this.messageCount++;
this.totalLatency += latency;
}
recordError(error) {
this.metrics.errors++;
console.error('Real-time error:', error);
}
calculateMetrics() {
this.metrics.messagesPerSecond = this.messageCount;
this.metrics.avgLatency = this.totalLatency / this.messageCount || 0;
// Reset counters
this.messageCount = 0;
this.totalLatency = 0;
// Export to monitoring system
this.exportMetrics();
}
exportMetrics() {
// Send to Prometheus, DataDog, etc.
console.log('Metrics:', this.metrics);
}
}
Production Checklist
- ✓ Implement connection rate limiting
- ✓ Add authentication and authorization
- ✓ Set up Redis persistence for critical data
- ✓ Configure load balancer with sticky sessions (or Redis adapter)
- ✓ Add comprehensive error logging
- ✓ Implement graceful shutdown handling
- ✓ Set up health check endpoints
- ✓ Test reconnection scenarios extensively
- ✓ Monitor memory usage and connection counts
- ✓ Document message protocols and versioning
Lessons Learned
Start with Socket.io, not raw WebSockets. The battle-tested reconnection logic and fallback transports are worth the overhead.
Redis is non-negotiable for scaling. Don't try to build cross-server communication yourself.
Operational Transformation is complex but necessary. For document collaboration, there's no simpler alternative that works reliably.
Monitor everything. Real-time systems fail in subtle ways. You need visibility into connection states, message rates, and latencies.
Design for disconnections. Networks fail constantly. Your system should handle disconnections as a normal part of operation, not an exceptional case.