WebSocket realtime channel

Authenticate during the upgrade handshake; validate message frames with a schema; use heartbeats, reconnect logic, and backpressure to avoid resource exhaustion and zombie connections.

This page gives Agents a complete WebSocket real-time service reference: ws library/Socket.io server-side connection management, heartbeat mechanism (ping/pong + timeout disconnect), client-side exponential backoff reconnect, standard message format (JSON: type/payload/requestId), and room/channel management.

Use WSS; complete authentication in the first message after handshake; re-check room/channel authorization on subscribe; Agent streaming with sequence numbers supports client deduplication and resumption.

  • Abuse prevention: per-IP/per-user connection count and message rate quotas.
  • Testing: proxy and load balancer compatibility configuration for WebSocket (sticky session).

WebSocket server connection management (ws library)

ws library server: connection management and authentication:

// server/websocket.ts — ws library implementation
import { WebSocketServer, WebSocket } from 'ws'
import { IncomingMessage } from 'http'
import { verifyToken } from './auth'

interface WsClient extends WebSocket {
  userId: string
  isAlive: boolean
  rooms: Set<string>
}

const wss = new WebSocketServer({ noServer: true })
const clients = new Map<string, WsClient>()

// HTTP server upgrade handshake
httpServer.on('upgrade', async (req: IncomingMessage, socket, head) => {
  try {
    // Get token from subprotocol header or query param (avoid putting in URL to prevent log leakage)
    const token = req.headers['sec-websocket-protocol']?.split(',')[0]?.trim()
      ?? new URL(req.url!, 'http://x').searchParams.get('token')
    if (!token) return socket.destroy()

    const user = await verifyToken(token)
    wss.handleUpgrade(req, socket, head, (ws) => {
      const client = ws as WsClient
      client.userId = user.id
      client.isAlive = true
      client.rooms = new Set()
      clients.set(user.id, client)
      wss.emit('connection', client, req)
    })
  } catch {
    socket.destroy()
  }
})

wss.on('connection', (ws: WsClient) => {
  ws.on('message', (data) => handleMessage(ws, data))
  ws.on('close', (code, reason) => {
    clients.delete(ws.userId)
    console.log(`Client ${ws.userId} disconnected: ${code} ${reason}`)
  })
  ws.on('pong', () => { ws.isAlive = true })

  // Send connection confirmation
  sendMessage(ws, { type: 'connected', payload: { userId: ws.userId } })
})

Standard message format (JSON: type/payload/requestId):

// types/ws-messages.ts
interface WsMessage<T = unknown> {
  type: string          // message type, e.g. 'chat.send' | 'room.join'
  payload: T            // business data
  requestId?: string    // client-generated, for request-response correlation
  seq?: number          // sequence number, for deduplication and resumption
  timestamp?: number    // server-injected timestamp
}

// Message handler dispatcher
function handleMessage(ws: WsClient, raw: Buffer | string) {
  let msg: WsMessage
  try {
    msg = JSON.parse(raw.toString())
  } catch {
    return sendError(ws, 'invalid_json', 'Message must be valid JSON')
  }

  switch (msg.type) {
    case 'chat.send':   return handleChatSend(ws, msg)
    case 'room.join':   return handleRoomJoin(ws, msg)
    case 'room.leave':  return handleRoomLeave(ws, msg)
    default:
      // Unknown type: silently discard (protocol evolution compatibility)
      console.warn(`Unknown message type: ${msg.type}`)
  }
}

function sendMessage(ws: WebSocket, msg: WsMessage) {
  if (ws.readyState === WebSocket.OPEN) {
    ws.send(JSON.stringify(msg))
  }
}

Heartbeat mechanism (ping/pong + timeout disconnect)

Server heartbeat: periodic ping, timeout to disconnect zombie connections:

// server/heartbeat.ts
const HEARTBEAT_INTERVAL = 30_000  // send ping every 30s
const HEARTBEAT_TIMEOUT  = 10_000  // disconnect if no pong within 10s

function startHeartbeat(wss: WebSocketServer) {
  const interval = setInterval(() => {
    wss.clients.forEach((ws) => {
      const client = ws as WsClient
      if (!client.isAlive) {
        console.log(`Terminating stale connection: ${client.userId}`)
        return client.terminate()
      }
      client.isAlive = false  // mark as pending verification
      client.ping()           // send WebSocket ping frame
    })
  }, HEARTBEAT_INTERVAL)

  wss.on('close', () => clearInterval(interval))
  return interval
}

// Register pong handler in the connection event
wss.on('connection', (ws: WsClient) => {
  ws.isAlive = true
  ws.on('pong', () => {
    ws.isAlive = true  // received pong, mark as alive
  })
})

Client reconnect strategy (exponential backoff + max retries)

// client/wsClient.ts
interface WsClientOptions {
  url: string
  maxRetries?: number
  baseDelayMs?: number
  maxDelayMs?: number
  onMessage?: (msg: WsMessage) => void
}

class ReconnectingWsClient {
  private ws: WebSocket | null = null
  private retryCount = 0
  private lastSeq = 0

  // Non-recoverable close codes (do not reconnect)
  private static FATAL_CODES = new Set([1008, 4001, 4003])

  constructor(private options: WsClientOptions) {}

  connect() {
    const { url, maxRetries = 5, baseDelayMs = 1000, maxDelayMs = 30_000 } = this.options
    this.ws = new WebSocket(`${url}?lastSeq=${this.lastSeq}`)

    this.ws.onopen = () => {
      this.retryCount = 0
      console.log('WebSocket connected')
    }

    this.ws.onmessage = (event) => {
      const msg: WsMessage = JSON.parse(event.data)
      if (msg.seq) this.lastSeq = msg.seq  // record last received sequence number
      this.options.onMessage?.(msg)
    }

    this.ws.onclose = (event) => {
      if (ReconnectingWsClient.FATAL_CODES.has(event.code)) {
        console.error(`Fatal close code ${event.code}: ${event.reason}`)
        return  // do not reconnect (auth failure, etc.)
      }

      if (this.retryCount >= (maxRetries)) {
        console.error('Max retries exceeded, giving up')
        return
      }

      // Exponential backoff + jitter
      const delay = Math.min(
        Math.pow(2, this.retryCount) * baseDelayMs * (0.75 + Math.random() * 0.5),
        maxDelayMs
      )
      console.log(`Reconnecting in ${Math.round(delay)}ms (attempt ${++this.retryCount}/${maxRetries})`)
      setTimeout(() => this.connect(), delay)
    }
  }

  send(msg: WsMessage) {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(msg))
    }
  }
}

Room/channel management implementation:

// server/rooms.ts
const rooms = new Map<string, Set<string>>()  // roomId -> Set<userId>

function joinRoom(ws: WsClient, roomId: string) {
  if (!rooms.has(roomId)) rooms.set(roomId, new Set())
  rooms.get(roomId)!.add(ws.userId)
  ws.rooms.add(roomId)
  sendMessage(ws, { type: 'room.joined', payload: { roomId } })
}

function leaveRoom(ws: WsClient, roomId: string) {
  rooms.get(roomId)?.delete(ws.userId)
  ws.rooms.delete(roomId)
  if (rooms.get(roomId)?.size === 0) rooms.delete(roomId)
}

function broadcastToRoom(roomId: string, msg: WsMessage, excludeUserId?: string) {
  const userIds = rooms.get(roomId)
  if (!userIds) return
  for (const userId of userIds) {
    if (userId === excludeUserId) continue
    const client = clients.get(userId)
    if (client) sendMessage(client, msg)
  }
}

Connection and message flow

The sketch below covers happy path and failure branches; framework API names differ—the important parts are auth, subscribe authorization, and backpressure branches.

  [ Client starts WSS upgrade ]
        │
        ▼
  ┌─────────────────┐     Validate token / session; on failure → close with app code
  │ Handshake auth  │
  └─────────────────┘
        │ success
        ▼
  ┌─────────────────┐     Channel / room / topic: second auth; record subscriptions
  │ Subscribe auth  │
  └─────────────────┘
        │
        ▼
  ┌─────────────────┐     Inbound: schema, quota, rate
  │  Message loop   │◄──┐ Outbound: queue depth; over limit → drop / merge / signal
  └─────────────────┘   │
        │               │
        ├── Heartbeat / pong timeout ──► Graceful close; client enters reconnect backoff
        │
        └── Recoverable disconnect ───► Exponential backoff + jitter; resume with cursor

Heartbeat interval estimator

If the server closes after T seconds with no valid frames, the client heartbeat should be well under T, leaving room for RTT and timer jitter. Below, treat usable idle budget as T − slack, then multiply by duty ratio r to estimate a suggested interval (works for ping frames or app-level pings).

Suggested heartbeat interval


              

Observability and scale-out

Track connection count, per-connection queue depth, message rates, close-code distribution, and heartbeat timeouts. For horizontal scale, combine sticky sessions or shared pub/sub, and document session stickiness vs broadcast consistency trade-offs in the SKILL.

---
name: websocket-realtime-channel
description: Design or review a WebSocket realtime service
---
# Rules
- Always use WSS; complete auth in the first message after upgrade handshake
- Message format: { type, payload, requestId?, seq?, timestamp? } — validate every frame
- Heartbeat: server pings every 30s; disconnect if no pong within 10s
- Close codes: distinguish fatal (4001/4003 = auth failure) from recoverable codes
- Room authorization: re-verify permissions on subscribe, not just at connection time
- Rate limiting: per-user connection count and message rate quotas

# Steps
1. HTTP server 'upgrade' event: verify token, call wss.handleUpgrade on success
2. On connection: set userId/isAlive/rooms, register pong handler, send 'connected' confirmation
3. Start heartbeat interval: ping all clients, terminate those that don't respond
4. handleMessage: parse JSON, dispatch by msg.type, reject unknown types gracefully
5. Room management: joinRoom/leaveRoom/broadcastToRoom — track userId sets per room
6. Client reconnect: exponential backoff + jitter, skip retry for fatal close codes
7. On reconnect: send lastSeq in query param for server-side incremental sync

Back to skills More skills