Building Real-Time Applications with WebSockets
Introduction
Real-time applications have become essential for modern web experiences. From chat applications to live dashboards, WebSockets provide full-duplex communication channels over a single TCP connection.
WebSocket Basics
Native WebSocket API
javascript1class WebSocketClient { 2 constructor(url) { 3 this.url = url; 4 this.socket = null; 5 this.reconnectAttempts = 0; 6 this.maxReconnectAttempts = 5; 7 this.reconnectDelay = 1000; 8 this.listeners = new Map(); 9 } 10 11 connect() { 12 this.socket = new WebSocket(this.url); 13 14 this.socket.onopen = () => { 15 console.log('WebSocket connected'); 16 this.reconnectAttempts = 0; 17 this.emit('connect'); 18 }; 19 20 this.socket.onmessage = (event) => { 21 try { 22 const data = JSON.parse(event.data); 23 this.emit('message', data); 24 25 // Emit specific event types 26 if (data.type) { 27 this.emit(data.type, data); 28 } 29 } catch (error) { 30 console.error('Failed to parse message:', error); 31 this.emit('error', error); 32 } 33 }; 34 35 this.socket.onerror = (error) => { 36 console.error('WebSocket error:', error); 37 this.emit('error', error); 38 }; 39 40 this.socket.onclose = (event) => { 41 console.log('WebSocket disconnected:', event.code, event.reason); 42 this.emit('disconnect', { code: event.code, reason: event.reason }); 43 44 // Attempt reconnection 45 if (event.code !== 1000) { // Normal closure 46 this.attemptReconnect(); 47 } 48 }; 49 } 50 51 send(data) { 52 if (this.socket && this.socket.readyState === WebSocket.OPEN) { 53 const message = typeof data === 'string' ? data : JSON.stringify(data); 54 this.socket.send(message); 55 return true; 56 } 57 return false; 58 } 59 60 close(code = 1000, reason = 'Normal closure') { 61 if (this.socket) { 62 this.socket.close(code, reason); 63 } 64 } 65 66 on(event, callback) { 67 if (!this.listeners.has(event)) { 68 this.listeners.set(event, []); 69 } 70 this.listeners.get(event).push(callback); 71 72 // Return unsubscribe function 73 return () => { 74 const callbacks = this.listeners.get(event); 75 const index = callbacks.indexOf(callback); 76 if (index > -1) { 77 callbacks.splice(index, 1); 78 } 79 }; 80 } 81 82 emit(event, data) { 83 const callbacks = this.listeners.get(event); 84 if (callbacks) { 85 callbacks.forEach(callback => callback(data)); 86 } 87 } 88 89 attemptReconnect() { 90 if (this.reconnectAttempts >= this.maxReconnectAttempts) { 91 console.error('Max reconnection attempts reached'); 92 this.emit('reconnect_failed'); 93 return; 94 } 95 96 this.reconnectAttempts++; 97 const delay = this.reconnectDelay * Math.pow(1.5, this.reconnectAttempts - 1); 98 99 console.log(`Reconnecting in ${delay}ms... (attempt ${this.reconnectAttempts})`); 100 101 setTimeout(() => { 102 if (this.socket?.readyState === WebSocket.CLOSED) { 103 this.connect(); 104 } 105 }, delay); 106 } 107} 108 109// Usage 110const wsClient = new WebSocketClient('ws://localhost:3000'); 111wsClient.connect(); 112 113// Subscribe to messages 114const unsubscribe = wsClient.on('message', (data) => { 115 console.log('Received:', data); 116}); 117 118// Send message 119wsClient.send({ type: 'chat', message: 'Hello World' }); 120 121// Cleanup 122// unsubscribe(); 123// wsClient.close();
Socket.io Implementation
Server-side with Node.js
javascript1// server.js 2import { Server } from 'socket.io'; 3import { createServer } from 'http'; 4import express from 'express'; 5 6const app = express(); 7const httpServer = createServer(app); 8const io = new Server(httpServer, { 9 cors: { 10 origin: process.env.CLIENT_URL || 'http://localhost:5173', 11 credentials: true 12 }, 13 connectionStateRecovery: { 14 maxDisconnectionDuration: 2 * 60 * 1000, // 2 minutes 15 skipMiddlewares: true 16 } 17}); 18 19// Middleware for authentication 20io.use(async (socket, next) => { 21 try { 22 const token = socket.handshake.auth.token; 23 if (!token) { 24 return next(new Error('Authentication error')); 25 } 26 27 // Verify token (JWT example) 28 const user = await verifyToken(token); 29 socket.data.user = user; 30 next(); 31 } catch (error) { 32 next(new Error('Authentication failed')); 33 } 34}); 35 36// Connection handler 37io.on('connection', (socket) => { 38 console.log(`User connected: ${socket.id}`, socket.data.user); 39 40 // Join user to their room 41 socket.join(`user:${socket.data.user.id}`); 42 43 // Handle chat messages 44 socket.on('chat:message', async (data, callback) => { 45 try { 46 const message = { 47 id: generateId(), 48 userId: socket.data.user.id, 49 username: socket.data.user.username, 50 content: data.content, 51 timestamp: new Date().toISOString(), 52 roomId: data.roomId 53 }; 54 55 // Save to database 56 await saveMessage(message); 57 58 // Broadcast to room 59 socket.to(`room:${data.roomId}`).emit('chat:message', message); 60 61 // Send to sender 62 socket.emit('chat:message:sent', message); 63 64 // Acknowledge 65 if (callback) { 66 callback({ success: true, message }); 67 } 68 } catch (error) { 69 console.error('Failed to send message:', error); 70 if (callback) { 71 callback({ success: false, error: error.message }); 72 } 73 } 74 }); 75 76 // Handle typing indicators 77 socket.on('chat:typing', (data) => { 78 socket.to(`room:${data.roomId}`).emit('chat:typing', { 79 userId: socket.data.user.id, 80 username: socket.data.user.username, 81 isTyping: data.isTyping 82 }); 83 }); 84 85 // Handle room joining 86 socket.on('room:join', (roomId) => { 87 socket.join(`room:${roomId}`); 88 socket.data.roomId = roomId; 89 90 // Notify others in room 91 socket.to(`room:${roomId}`).emit('room:user_joined', { 92 userId: socket.data.user.id, 93 username: socket.data.user.username 94 }); 95 }); 96 97 // Handle disconnection 98 socket.on('disconnect', (reason) => { 99 console.log(`User disconnected: ${socket.id}`, reason); 100 101 // Notify room members 102 if (socket.data.roomId) { 103 socket.to(`room:${socket.data.roomId}`).emit('room:user_left', { 104 userId: socket.data.user.id, 105 username: socket.data.user.username 106 }); 107 } 108 }); 109 110 // Error handling 111 socket.on('error', (error) => { 112 console.error('Socket error:', error); 113 }); 114}); 115 116// Presence tracking 117const userSockets = new Map(); 118 119io.of('/').adapter.on('join-room', (room, id) => { 120 if (room.startsWith('user:')) { 121 const userId = room.replace('user:', ''); 122 if (!userSockets.has(userId)) { 123 userSockets.set(userId, new Set()); 124 } 125 userSockets.get(userId).add(id); 126 127 // Update user status 128 io.emit('user:online', { userId, socketCount: userSockets.get(userId).size }); 129 } 130}); 131 132io.of('/').adapter.on('leave-room', (room, id) => { 133 if (room.startsWith('user:')) { 134 const userId = room.replace('user:', ''); 135 if (userSockets.has(userId)) { 136 userSockets.get(userId).delete(id); 137 if (userSockets.get(userId).size === 0) { 138 userSockets.delete(userId); 139 // User went offline 140 io.emit('user:offline', { userId }); 141 } 142 } 143 } 144}); 145 146const PORT = process.env.PORT || 3000; 147httpServer.listen(PORT, () => { 148 console.log(`Server running on port ${PORT}`); 149});
Client-side with Socket.io
javascript1// client.js 2import { io } from 'socket.io-client'; 3 4class RealTimeClient { 5 constructor(options = {}) { 6 this.options = { 7 autoConnect: true, 8 reconnection: true, 9 reconnectionAttempts: Infinity, 10 reconnectionDelay: 1000, 11 reconnectionDelayMax: 5000, 12 timeout: 20000, 13 ...options 14 }; 15 16 this.socket = null; 17 this.listeners = new Map(); 18 this.queue = []; 19 this.isConnected = false; 20 } 21 22 connect(token) { 23 if (this.socket?.connected) { 24 return this.socket; 25 } 26 27 this.socket = io(this.options.url, { 28 ...this.options, 29 auth: { token }, 30 transports: ['websocket', 'polling'] 31 }); 32 33 // Connection events 34 this.socket.on('connect', () => { 35 this.isConnected = true; 36 console.log('Connected to server'); 37 this.emit('connect'); 38 39 // Process queued messages 40 this.processQueue(); 41 }); 42 43 this.socket.on('disconnect', (reason) => { 44 this.isConnected = false; 45 console.log('Disconnected:', reason); 46 this.emit('disconnect', reason); 47 }); 48 49 this.socket.on('connect_error', (error) => { 50 console.error('Connection error:', error); 51 this.emit('error', error); 52 }); 53 54 // Handle incoming events 55 this.socket.onAny((event, ...args) => { 56 this.emit(event, ...args); 57 }); 58 59 return this.socket; 60 } 61 62 emit(event, data, callback) { 63 if (this.isConnected && this.socket) { 64 return this.socket.emit(event, data, callback); 65 } else { 66 // Queue message for when connection is restored 67 this.queue.push({ event, data, callback }); 68 return false; 69 } 70 } 71 72 on(event, callback) { 73 if (!this.listeners.has(event)) { 74 this.listeners.set(event, []); 75 } 76 this.listeners.get(event).push(callback); 77 78 // Return unsubscribe function 79 return () => { 80 const callbacks = this.listeners.get(event); 81 const index = callbacks.indexOf(callback); 82 if (index > -1) { 83 callbacks.splice(index, 1); 84 } 85 }; 86 } 87 88 processQueue() { 89 while (this.queue.length > 0) { 90 const { event, data, callback } = this.queue.shift(); 91 this.emit(event, data, callback); 92 } 93 } 94 95 disconnect() { 96 if (this.socket) { 97 this.socket.disconnect(); 98 } 99 } 100} 101 102// Usage with React 103import React, { createContext, useContext, useEffect, useState } from 'react'; 104 105const SocketContext = createContext(null); 106 107export function SocketProvider({ children, token }) { 108 const [socket, setSocket] = useState(null); 109 const [isConnected, setIsConnected] = useState(false); 110 111 useEffect(() => { 112 const client = new RealTimeClient({ 113 url: process.env.REACT_APP_WS_URL 114 }); 115 116 const socketInstance = client.connect(token); 117 setSocket(client); 118 119 const unsubscribeConnect = client.on('connect', () => { 120 setIsConnected(true); 121 }); 122 123 const unsubscribeDisconnect = client.on('disconnect', () => { 124 setIsConnected(false); 125 }); 126 127 return () => { 128 unsubscribeConnect(); 129 unsubscribeDisconnect(); 130 client.disconnect(); 131 }; 132 }, [token]); 133 134 return ( 135 <SocketContext.Provider value={{ socket, isConnected }}> 136 {children} 137 </SocketContext.Provider> 138 ); 139} 140 141export function useSocket() { 142 const context = useContext(SocketContext); 143 if (!context) { 144 throw new Error('useSocket must be used within SocketProvider'); 145 } 146 return context; 147} 148 149export function useSocketEvent(event, callback) { 150 const { socket } = useSocket(); 151 152 useEffect(() => { 153 if (socket) { 154 const unsubscribe = socket.on(event, callback); 155 return unsubscribe; 156 } 157 }, [socket, event, callback]); 158} 159 160// Chat component using the hook 161function ChatRoom({ roomId }) { 162 const { socket, isConnected } = useSocket(); 163 const [messages, setMessages] = useState([]); 164 const [typingUsers, setTypingUsers] = useState(new Set()); 165 166 // Join room on mount 167 useEffect(() => { 168 if (socket && isConnected) { 169 socket.emit('room:join', roomId); 170 } 171 }, [socket, isConnected, roomId]); 172 173 // Listen for messages 174 useSocketEvent('chat:message', (message) => { 175 setMessages(prev => [...prev, message]); 176 }); 177 178 // Listen for typing indicators 179 useSocketEvent('chat:typing', ({ userId, username, isTyping }) => { 180 setTypingUsers(prev => { 181 const newSet = new Set(prev); 182 if (isTyping) { 183 newSet.add(username); 184 } else { 185 newSet.delete(username); 186 } 187 return newSet; 188 }); 189 }); 190 191 const sendMessage = (content) => { 192 if (socket) { 193 socket.emit('chat:message', { content, roomId }, (response) => { 194 if (!response.success) { 195 console.error('Failed to send message:', response.error); 196 } 197 }); 198 } 199 }; 200 201 const sendTypingIndicator = (isTyping) => { 202 if (socket) { 203 socket.emit('chat:typing', { roomId, isTyping }); 204 } 205 }; 206 207 return ( 208 <div className="chat-room"> 209 <div className="messages"> 210 {messages.map(message => ( 211 <Message key={message.id} message={message} /> 212 ))} 213 </div> 214 215 {typingUsers.size > 0 && ( 216 <div className="typing-indicator"> 217 {Array.from(typingUsers).join(', ')} is typing... 218 </div> 219 )} 220 221 <MessageInput 222 onSend={sendMessage} 223 onTyping={sendTypingIndicator} 224 /> 225 </div> 226 ); 227}
Scaling WebSocket Servers
Using Redis for Horizontal Scaling
javascript1// scalable-server.js 2import { Server } from 'socket.io'; 3import { createAdapter } from '@socket.io/redis-adapter'; 4import { createClient } from 'redis'; 5import cluster from 'cluster'; 6import os from 'os'; 7 8if (cluster.isMaster) { 9 const numCPUs = os.cpus().length; 10 console.log(`Master ${process.pid} is running`); 11 console.log(`Forking ${numCPUs} workers...`); 12 13 for (let i = 0; i < numCPUs; i++) { 14 cluster.fork(); 15 } 16 17 cluster.on('exit', (worker, code, signal) => { 18 console.log(`Worker ${worker.process.pid} died`); 19 cluster.fork(); // Restart worker 20 }); 21} else { 22 const app = express(); 23 const httpServer = createServer(app); 24 const io = new Server(httpServer); 25 26 // Redis setup for pub/sub 27 const pubClient = createClient({ 28 url: process.env.REDIS_URL 29 }); 30 31 const subClient = pubClient.duplicate(); 32 33 Promise.all([pubClient.connect(), subClient.connect()]).then(() => { 34 io.adapter(createAdapter(pubClient, subClient)); 35 36 console.log(`Worker ${process.pid} started with Redis adapter`); 37 }); 38 39 // Middleware to attach worker ID 40 io.use((socket, next) => { 41 socket.data.workerId = process.pid; 42 next(); 43 }); 44 45 io.on('connection', (socket) => { 46 console.log(`Client connected to worker ${socket.data.workerId}`); 47 48 // Join user to a room across all workers 49 socket.on('join:global', (roomId) => { 50 socket.join(roomId); 51 // Notify all instances about the join 52 io.of('/').to(roomId).emit('user:joined', { 53 userId: socket.id, 54 workerId: socket.data.workerId 55 }); 56 }); 57 58 // Broadcast to all instances 59 socket.on('broadcast:global', (data) => { 60 io.emit('global:message', { 61 ...data, 62 fromWorker: socket.data.workerId 63 }); 64 }); 65 }); 66 67 const PORT = process.env.PORT || 3000; 68 httpServer.listen(PORT, () => { 69 console.log(`Worker ${process.pid} listening on port ${PORT}`); 70 }); 71}
Load Balancing with Nginx
nginx1# nginx.conf 2upstream socket_nodes { 3 ip_hash; 4 server 127.0.0.1:3001; 5 server 127.0.0.1:3002; 6 server 127.0.0.1:3003; 7 server 127.0.0.1:3004; 8} 9 10server { 11 listen 80; 12 server_name example.com; 13 14 location / { 15 proxy_pass http://socket_nodes; 16 proxy_http_version 1.1; 17 proxy_set_header Upgrade $http_upgrade; 18 proxy_set_header Connection "upgrade"; 19 proxy_set_header Host $host; 20 proxy_set_header X-Real-IP $remote_addr; 21 proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; 22 proxy_set_header X-Forwarded-Proto $scheme; 23 24 # Timeouts 25 proxy_connect_timeout 7d; 26 proxy_send_timeout 7d; 27 proxy_read_timeout 7d; 28 } 29}
Performance Optimization
javascript1// performance-optimized-client.js 2class OptimizedWebSocketClient { 3 constructor() { 4 this.batchQueue = []; 5 this.batchSize = 10; 6 this.batchTimeout = 100; // ms 7 this.batchTimer = null; 8 this.stats = { 9 messagesSent: 0, 10 batchesSent: 0, 11 bytesSent: 0 12 }; 13 } 14 15 sendOptimized(event, data) { 16 // Compress data before sending 17 const compressed = this.compressData(data); 18 19 // Add to batch queue 20 this.batchQueue.push({ event, data: compressed }); 21 22 // Start batch timer if not already running 23 if (!this.batchTimer) { 24 this.batchTimer = setTimeout(() => this.sendBatch(), this.batchTimeout); 25 } 26 27 // Send immediately if batch size reached 28 if (this.batchQueue.length >= this.batchSize) { 29 this.sendBatch(); 30 } 31 } 32 33 sendBatch() { 34 if (this.batchQueue.length === 0) return; 35 36 clearTimeout(this.batchTimer); 37 this.batchTimer = null; 38 39 const batch = this.batchQueue.splice(0, this.batchSize); 40 41 // Send as single WebSocket message 42 const batchMessage = { 43 type: 'batch', 44 timestamp: Date.now(), 45 messages: batch 46 }; 47 48 this.sendRaw(JSON.stringify(batchMessage)); 49 50 this.stats.batchesSent++; 51 this.stats.messagesSent += batch.length; 52 53 // Update stats every 100 batches 54 if (this.stats.batchesSent % 100 === 0) { 55 this.logStats(); 56 } 57 } 58 59 compressData(data) { 60 // Simple compression - remove null/undefined, use short keys 61 const compressed = {}; 62 63 for (const [key, value] of Object.entries(data)) { 64 if (value !== null && value !== undefined) { 65 // Use short key if available 66 const shortKey = this.keyMap[key] || key; 67 compressed[shortKey] = value; 68 } 69 } 70 71 return compressed; 72 } 73 74 // Throttle high-frequency events 75 createThrottledEmitter(delay = 100) { 76 const lastCall = new Map(); 77 78 return (event, data) => { 79 const now = Date.now(); 80 const last = lastCall.get(event) || 0; 81 82 if (now - last >= delay) { 83 lastCall.set(event, now); 84 this.sendOptimized(event, data); 85 } 86 }; 87 } 88 89 // Handle reconnection with message buffering 90 setupReconnectionBuffer() { 91 const buffer = []; 92 const maxBufferSize = 1000; 93 94 this.on('disconnect', () => { 95 // Start buffering messages 96 const originalSend = this.send; 97 this.send = (event, data) => { 98 if (buffer.length < maxBufferSize) { 99 buffer.push({ event, data, timestamp: Date.now() }); 100 } 101 return false; 102 }; 103 104 // Restore original send on reconnect 105 this.once('connect', () => { 106 this.send = originalSend; 107 108 // Send buffered messages 109 buffer.forEach(item => { 110 this.send(item.event, item.data); 111 }); 112 113 buffer.length = 0; // Clear buffer 114 }); 115 }); 116 } 117}
Monitoring and Analytics
javascript1// monitoring.js 2class WebSocketMonitor { 3 constructor(socket) { 4 this.socket = socket; 5 this.metrics = { 6 connections: 0, 7 messagesReceived: 0, 8 messagesSent: 0, 9 bytesReceived: 0, 10 bytesSent: 0, 11 errors: 0, 12 reconnections: 0 13 }; 14 15 this.setupMonitoring(); 16 } 17 18 setupMonitoring() { 19 // Track connections 20 this.socket.on('connection', (socket) => { 21 this.metrics.connections++; 22 23 // Track per-socket metrics 24 socket.metrics = { 25 connectedAt: Date.now(), 26 messagesSent: 0, 27 messagesReceived: 0 28 }; 29 30 // Track message events 31 const originalEmit = socket.emit; 32 socket.emit = (event, data, ...args) => { 33 socket.metrics.messagesSent++; 34 this.metrics.messagesSent++; 35 36 if (typeof data === 'string') { 37 this.metrics.bytesSent += data.length; 38 } else if (data) { 39 this.metrics.bytesSent += JSON.stringify(data).length; 40 } 41 42 return originalEmit.call(socket, event, data, ...args); 43 }; 44 45 socket.onAny((event, data) => { 46 socket.metrics.messagesReceived++; 47 this.metrics.messagesReceived++; 48 49 if (typeof data === 'string') { 50 this.metrics.bytesReceived += data.length; 51 } else if (data) { 52 this.metrics.bytesReceived += JSON.stringify(data).length; 53 } 54 }); 55 56 // Track disconnection 57 socket.on('disconnect', () => { 58 this.metrics.connections--; 59 const duration = Date.now() - socket.metrics.connectedAt; 60 console.log(`Socket disconnected after ${duration}ms`); 61 }); 62 }); 63 64 // Track errors 65 this.socket.on('error', () => { 66 this.metrics.errors++; 67 }); 68 69 // Export metrics periodically 70 setInterval(() => this.exportMetrics(), 60000); // Every minute 71 } 72 73 exportMetrics() { 74 const metrics = { 75 ...this.metrics, 76 timestamp: new Date().toISOString(), 77 avgMessageSize: this.metrics.messagesSent > 0 78 ? this.metrics.bytesSent / this.metrics.messagesSent 79 : 0 80 }; 81 82 // Send to monitoring service 83 fetch('/api/metrics', { 84 method: 'POST', 85 headers: { 'Content-Type': 'application/json' }, 86 body: JSON.stringify(metrics) 87 }); 88 } 89 90 getHealth() { 91 return { 92 healthy: this.metrics.errors < 10, 93 connections: this.metrics.connections, 94 messageRate: this.metrics.messagesSent / 60, // Per minute 95 errorRate: this.metrics.errors / 60 96 }; 97 } 98}