WebSocket realtime channel
Authenticate during the upgrade handshake; validate message frames with a schema; use heartbeats, reconnect logic, and backpressure to avoid resource exhaustion and zombie connections.
This page gives Agents a complete WebSocket real-time service reference: ws library/Socket.io server-side connection management, heartbeat mechanism (ping/pong + timeout disconnect), client-side exponential backoff reconnect, standard message format (JSON: type/payload/requestId), and room/channel management.
Use WSS; complete authentication in the first message after handshake; re-check room/channel authorization on subscribe; Agent streaming with sequence numbers supports client deduplication and resumption.
- Abuse prevention: per-IP/per-user connection count and message rate quotas.
- Testing: proxy and load balancer compatibility configuration for WebSocket (sticky session).
WebSocket server connection management (ws library)
ws library server: connection management and authentication:
// server/websocket.ts — ws library implementation
import { WebSocketServer, WebSocket } from 'ws'
import { IncomingMessage } from 'http'
import { verifyToken } from './auth'
interface WsClient extends WebSocket {
userId: string
isAlive: boolean
rooms: Set<string>
}
const wss = new WebSocketServer({ noServer: true })
const clients = new Map<string, WsClient>()
// HTTP server upgrade handshake
httpServer.on('upgrade', async (req: IncomingMessage, socket, head) => {
try {
// Get token from subprotocol header or query param (avoid putting in URL to prevent log leakage)
const token = req.headers['sec-websocket-protocol']?.split(',')[0]?.trim()
?? new URL(req.url!, 'http://x').searchParams.get('token')
if (!token) return socket.destroy()
const user = await verifyToken(token)
wss.handleUpgrade(req, socket, head, (ws) => {
const client = ws as WsClient
client.userId = user.id
client.isAlive = true
client.rooms = new Set()
clients.set(user.id, client)
wss.emit('connection', client, req)
})
} catch {
socket.destroy()
}
})
wss.on('connection', (ws: WsClient) => {
ws.on('message', (data) => handleMessage(ws, data))
ws.on('close', (code, reason) => {
clients.delete(ws.userId)
console.log(`Client ${ws.userId} disconnected: ${code} ${reason}`)
})
ws.on('pong', () => { ws.isAlive = true })
// Send connection confirmation
sendMessage(ws, { type: 'connected', payload: { userId: ws.userId } })
})
Standard message format (JSON: type/payload/requestId):
// types/ws-messages.ts
interface WsMessage<T = unknown> {
type: string // message type, e.g. 'chat.send' | 'room.join'
payload: T // business data
requestId?: string // client-generated, for request-response correlation
seq?: number // sequence number, for deduplication and resumption
timestamp?: number // server-injected timestamp
}
// Message handler dispatcher
function handleMessage(ws: WsClient, raw: Buffer | string) {
let msg: WsMessage
try {
msg = JSON.parse(raw.toString())
} catch {
return sendError(ws, 'invalid_json', 'Message must be valid JSON')
}
switch (msg.type) {
case 'chat.send': return handleChatSend(ws, msg)
case 'room.join': return handleRoomJoin(ws, msg)
case 'room.leave': return handleRoomLeave(ws, msg)
default:
// Unknown type: silently discard (protocol evolution compatibility)
console.warn(`Unknown message type: ${msg.type}`)
}
}
function sendMessage(ws: WebSocket, msg: WsMessage) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(msg))
}
}
Heartbeat mechanism (ping/pong + timeout disconnect)
Server heartbeat: periodic ping, timeout to disconnect zombie connections:
// server/heartbeat.ts
const HEARTBEAT_INTERVAL = 30_000 // send ping every 30s
const HEARTBEAT_TIMEOUT = 10_000 // disconnect if no pong within 10s
function startHeartbeat(wss: WebSocketServer) {
const interval = setInterval(() => {
wss.clients.forEach((ws) => {
const client = ws as WsClient
if (!client.isAlive) {
console.log(`Terminating stale connection: ${client.userId}`)
return client.terminate()
}
client.isAlive = false // mark as pending verification
client.ping() // send WebSocket ping frame
})
}, HEARTBEAT_INTERVAL)
wss.on('close', () => clearInterval(interval))
return interval
}
// Register pong handler in the connection event
wss.on('connection', (ws: WsClient) => {
ws.isAlive = true
ws.on('pong', () => {
ws.isAlive = true // received pong, mark as alive
})
})
Client reconnect strategy (exponential backoff + max retries)
// client/wsClient.ts
interface WsClientOptions {
url: string
maxRetries?: number
baseDelayMs?: number
maxDelayMs?: number
onMessage?: (msg: WsMessage) => void
}
class ReconnectingWsClient {
private ws: WebSocket | null = null
private retryCount = 0
private lastSeq = 0
// Non-recoverable close codes (do not reconnect)
private static FATAL_CODES = new Set([1008, 4001, 4003])
constructor(private options: WsClientOptions) {}
connect() {
const { url, maxRetries = 5, baseDelayMs = 1000, maxDelayMs = 30_000 } = this.options
this.ws = new WebSocket(`${url}?lastSeq=${this.lastSeq}`)
this.ws.onopen = () => {
this.retryCount = 0
console.log('WebSocket connected')
}
this.ws.onmessage = (event) => {
const msg: WsMessage = JSON.parse(event.data)
if (msg.seq) this.lastSeq = msg.seq // record last received sequence number
this.options.onMessage?.(msg)
}
this.ws.onclose = (event) => {
if (ReconnectingWsClient.FATAL_CODES.has(event.code)) {
console.error(`Fatal close code ${event.code}: ${event.reason}`)
return // do not reconnect (auth failure, etc.)
}
if (this.retryCount >= (maxRetries)) {
console.error('Max retries exceeded, giving up')
return
}
// Exponential backoff + jitter
const delay = Math.min(
Math.pow(2, this.retryCount) * baseDelayMs * (0.75 + Math.random() * 0.5),
maxDelayMs
)
console.log(`Reconnecting in ${Math.round(delay)}ms (attempt ${++this.retryCount}/${maxRetries})`)
setTimeout(() => this.connect(), delay)
}
}
send(msg: WsMessage) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(msg))
}
}
}
Room/channel management implementation:
// server/rooms.ts
const rooms = new Map<string, Set<string>>() // roomId -> Set<userId>
function joinRoom(ws: WsClient, roomId: string) {
if (!rooms.has(roomId)) rooms.set(roomId, new Set())
rooms.get(roomId)!.add(ws.userId)
ws.rooms.add(roomId)
sendMessage(ws, { type: 'room.joined', payload: { roomId } })
}
function leaveRoom(ws: WsClient, roomId: string) {
rooms.get(roomId)?.delete(ws.userId)
ws.rooms.delete(roomId)
if (rooms.get(roomId)?.size === 0) rooms.delete(roomId)
}
function broadcastToRoom(roomId: string, msg: WsMessage, excludeUserId?: string) {
const userIds = rooms.get(roomId)
if (!userIds) return
for (const userId of userIds) {
if (userId === excludeUserId) continue
const client = clients.get(userId)
if (client) sendMessage(client, msg)
}
}
Connection and message flow
The sketch below covers happy path and failure branches; framework API names differ—the important parts are auth, subscribe authorization, and backpressure branches.
[ Client starts WSS upgrade ]
│
▼
┌─────────────────┐ Validate token / session; on failure → close with app code
│ Handshake auth │
└─────────────────┘
│ success
▼
┌─────────────────┐ Channel / room / topic: second auth; record subscriptions
│ Subscribe auth │
└─────────────────┘
│
▼
┌─────────────────┐ Inbound: schema, quota, rate
│ Message loop │◄──┐ Outbound: queue depth; over limit → drop / merge / signal
└─────────────────┘ │
│ │
├── Heartbeat / pong timeout ──► Graceful close; client enters reconnect backoff
│
└── Recoverable disconnect ───► Exponential backoff + jitter; resume with cursor
Heartbeat interval estimator
If the server closes after T seconds with no valid frames, the client heartbeat should be well under T, leaving room for RTT and timer jitter. Below, treat usable idle budget as T − slack, then multiply by duty ratio r to estimate a suggested interval (works for ping frames or app-level pings).
Suggested heartbeat interval
Observability and scale-out
Track connection count, per-connection queue depth, message rates, close-code distribution, and heartbeat timeouts. For horizontal scale, combine sticky sessions or shared pub/sub, and document session stickiness vs broadcast consistency trade-offs in the SKILL.
---
name: websocket-realtime-channel
description: Design or review a WebSocket realtime service
---
# Rules
- Always use WSS; complete auth in the first message after upgrade handshake
- Message format: { type, payload, requestId?, seq?, timestamp? } — validate every frame
- Heartbeat: server pings every 30s; disconnect if no pong within 10s
- Close codes: distinguish fatal (4001/4003 = auth failure) from recoverable codes
- Room authorization: re-verify permissions on subscribe, not just at connection time
- Rate limiting: per-user connection count and message rate quotas
# Steps
1. HTTP server 'upgrade' event: verify token, call wss.handleUpgrade on success
2. On connection: set userId/isAlive/rooms, register pong handler, send 'connected' confirmation
3. Start heartbeat interval: ping all clients, terminate those that don't respond
4. handleMessage: parse JSON, dispatch by msg.type, reject unknown types gracefully
5. Room management: joinRoom/leaveRoom/broadcastToRoom — track userId sets per room
6. Client reconnect: exponential backoff + jitter, skip retry for fatal close codes
7. On reconnect: send lastSeq in query param for server-side incremental sync