WebSocket 实时通道

在升级握手阶段完成身份校验;消息帧用 schema 校验;心跳、重连与背压防止资源耗尽与僵尸连接。

本页为 Agent 提供 WebSocket 实时服务的完整实施参考:ws 库/Socket.io 服务端连接管理、心跳机制(ping/pong + 超时断开)、客户端指数退避重连、标准消息格式(JSON: type/payload/requestId),以及房间/频道管理实现。

使用 WSS;握手后首条消息完成认证;房间/频道授权在订阅时二次校验;Agent 流式输出配合序号便于客户端去重与断点续传。

  • 防刷:IP/用户级连接数与消息速率配额。
  • 测试:代理与负载均衡对 WebSocket 的兼容配置(sticky session)。

WebSocket 服务端连接管理(ws 库)

ws 库服务端:连接管理与认证

// server/websocket.ts — ws 库实现
import { WebSocketServer, WebSocket } from 'ws'
import { IncomingMessage } from 'http'
import { verifyToken } from './auth'

interface WsClient extends WebSocket {
  userId: string
  isAlive: boolean
  rooms: Set<string>
}

const wss = new WebSocketServer({ noServer: true })
const clients = new Map<string, WsClient>()

// HTTP server 升级握手
httpServer.on('upgrade', async (req: IncomingMessage, socket, head) => {
  try {
    // 从子协议头或查询参数获取 token(避免放 URL 查询串被记录到日志)
    const token = req.headers['sec-websocket-protocol']?.split(',')[0]?.trim()
      ?? new URL(req.url!, 'http://x').searchParams.get('token')
    if (!token) return socket.destroy()

    const user = await verifyToken(token)
    wss.handleUpgrade(req, socket, head, (ws) => {
      const client = ws as WsClient
      client.userId = user.id
      client.isAlive = true
      client.rooms = new Set()
      clients.set(user.id, client)
      wss.emit('connection', client, req)
    })
  } catch {
    socket.destroy()
  }
})

wss.on('connection', (ws: WsClient) => {
  ws.on('message', (data) => handleMessage(ws, data))
  ws.on('close', (code, reason) => {
    clients.delete(ws.userId)
    console.log(`Client ${ws.userId} disconnected: ${code} ${reason}`)
  })
  ws.on('pong', () => { ws.isAlive = true })

  // 发送连接确认
  sendMessage(ws, { type: 'connected', payload: { userId: ws.userId } })
})

标准消息格式(JSON: type/payload/requestId)

// types/ws-messages.ts
interface WsMessage<T = unknown> {
  type: string          // 消息类型,如 'chat.send' | 'room.join'
  payload: T            // 业务数据
  requestId?: string    // 客户端生成,用于请求-响应关联
  seq?: number          // 序列号,用于去重与断点续传
  timestamp?: number    // 服务端注入的时间戳
}

// 消息处理分发
function handleMessage(ws: WsClient, raw: Buffer | string) {
  let msg: WsMessage
  try {
    msg = JSON.parse(raw.toString())
  } catch {
    return sendError(ws, 'invalid_json', 'Message must be valid JSON')
  }

  switch (msg.type) {
    case 'chat.send':   return handleChatSend(ws, msg)
    case 'room.join':   return handleRoomJoin(ws, msg)
    case 'room.leave':  return handleRoomLeave(ws, msg)
    default:
      // 未知类型:静默丢弃(协议演进兼容)
      console.warn(`Unknown message type: ${msg.type}`)
  }
}

function sendMessage(ws: WebSocket, msg: WsMessage) {
  if (ws.readyState === WebSocket.OPEN) {
    ws.send(JSON.stringify(msg))
  }
}

心跳机制(ping/pong + 超时断开)

服务端心跳:定时 ping,超时断开僵尸连接

// server/heartbeat.ts
const HEARTBEAT_INTERVAL = 30_000  // 每 30s 发一次 ping
const HEARTBEAT_TIMEOUT  = 10_000  // 10s 内无 pong 则断开

function startHeartbeat(wss: WebSocketServer) {
  const interval = setInterval(() => {
    wss.clients.forEach((ws) => {
      const client = ws as WsClient
      if (!client.isAlive) {
        console.log(`Terminating stale connection: ${client.userId}`)
        return client.terminate()
      }
      client.isAlive = false  // 标记为待验证
      client.ping()           // 发送 WebSocket ping 帧
    })
  }, HEARTBEAT_INTERVAL)

  wss.on('close', () => clearInterval(interval))
  return interval
}

// 在 connection 事件中注册 pong 响应
wss.on('connection', (ws: WsClient) => {
  ws.isAlive = true
  ws.on('pong', () => {
    ws.isAlive = true  // 收到 pong,标记为存活
  })
})

客户端重连策略(指数退避 + 最大重试)

// client/wsClient.ts
interface WsClientOptions {
  url: string
  maxRetries?: number
  baseDelayMs?: number
  maxDelayMs?: number
  onMessage?: (msg: WsMessage) => void
}

class ReconnectingWsClient {
  private ws: WebSocket | null = null
  private retryCount = 0
  private lastSeq = 0

  // 不可恢复的关闭码(不重连)
  private static FATAL_CODES = new Set([1008, 4001, 4003])

  constructor(private options: WsClientOptions) {}

  connect() {
    const { url, maxRetries = 5, baseDelayMs = 1000, maxDelayMs = 30_000 } = this.options
    this.ws = new WebSocket(`${url}?lastSeq=${this.lastSeq}`)

    this.ws.onopen = () => {
      this.retryCount = 0
      console.log('WebSocket connected')
    }

    this.ws.onmessage = (event) => {
      const msg: WsMessage = JSON.parse(event.data)
      if (msg.seq) this.lastSeq = msg.seq  // 记录最后收到的序列号
      this.options.onMessage?.(msg)
    }

    this.ws.onclose = (event) => {
      if (ReconnectingWsClient.FATAL_CODES.has(event.code)) {
        console.error(`Fatal close code ${event.code}: ${event.reason}`)
        return  // 不重连(鉴权失败等)
      }

      if (this.retryCount >= (maxRetries)) {
        console.error('Max retries exceeded, giving up')
        return
      }

      // 指数退避 + 抖动
      const delay = Math.min(
        Math.pow(2, this.retryCount) * baseDelayMs * (0.75 + Math.random() * 0.5),
        maxDelayMs
      )
      console.log(`Reconnecting in ${Math.round(delay)}ms (attempt ${++this.retryCount}/${maxRetries})`)
      setTimeout(() => this.connect(), delay)
    }
  }

  send(msg: WsMessage) {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(msg))
    }
  }
}

房间/频道管理实现

// server/rooms.ts
const rooms = new Map<string, Set<string>>()  // roomId -> Set<userId>

function joinRoom(ws: WsClient, roomId: string) {
  if (!rooms.has(roomId)) rooms.set(roomId, new Set())
  rooms.get(roomId)!.add(ws.userId)
  ws.rooms.add(roomId)
  sendMessage(ws, { type: 'room.joined', payload: { roomId } })
}

function leaveRoom(ws: WsClient, roomId: string) {
  rooms.get(roomId)?.delete(ws.userId)
  ws.rooms.delete(roomId)
  if (rooms.get(roomId)?.size === 0) rooms.delete(roomId)
}

function broadcastToRoom(roomId: string, msg: WsMessage, excludeUserId?: string) {
  const userIds = rooms.get(roomId)
  if (!userIds) return
  for (const userId of userIds) {
    if (userId === excludeUserId) continue
    const client = clients.get(userId)
    if (client) sendMessage(client, msg)
  }
}

连接与消息主流程

下列示意覆盖从建连到异常路径;与具体框架的 API 名称无关,重点是鉴权、订阅授权与背压分支不要漏。

  [ 客户端发起 WSS 升级 ]
        │
        ▼
  ┌─────────────────┐     校验 token / 会话;失败 → 关闭并带业务码
  │  握手鉴权        │
  └─────────────────┘
        │ 成功
        ▼
  ┌─────────────────┐     频道 / 房间 / 主题:二次授权;记录订阅集合
  │  订阅与授权      │
  └─────────────────┘
        │
        ▼
  ┌─────────────────┐     入站:schema、配额、速率
  │  消息环          │◄──┐ 出站:队列深度;超限 → 丢弃/合并/反压
  └─────────────────┘   │
        │               │
        ├── 心跳 / pong 超时 ──► 优雅关闭,客户端进入重连退避
        │
        └── 可恢复断开 ─────────► 指数退避 + 抖动;携带游标续传

心跳间隔估算

若服务端在「连续无有效帧」T 秒后关闭连接,客户端心跳周期应明显小于 T,并预留 RTT 与定时器抖动。下方按「可用静默预算 = T − 预留」再乘以占空比 r 估算建议间隔(与 ping 帧或应用层 ping 消息均可对照)。

建议心跳间隔


              

观测与扩展

指标建议包含:连接数、每连接队列深度、消息速率、关闭码分布、心跳超时次数。水平扩展时结合 Sticky Session 或共享 pub/sub,并在 SKILL 中写明会话粘滞与广播一致性的取舍。

---
name: websocket-realtime-channel
description: 设计或审查 WebSocket 实时服务
---
# 规则
- 使用 WSS(TLS 加密);token 通过子协议头传递,避免 URL 查询参数
- 消息格式:JSON { type, payload, requestId?, seq?, timestamp? }
- 心跳:服务端每 30s ping,10s 内无 pong 则 terminate()
- 重连:指数退避(2^n * base)+ 抖动;区分致命关闭码(4001/4003 不重连)
- 携带 lastSeq 重连,支持断点续传与去重
- 房间广播前二次校验订阅权限

# 步骤
1. 实现 HTTP upgrade 处理:验证 token,注入 userId 到连接对象
2. 注册消息处理器:type 分发,未知类型静默丢弃
3. 启动心跳定时器:setInterval ping + pong 监听
4. 实现房间管理:joinRoom / leaveRoom / broadcastToRoom
5. 客户端实现 ReconnectingWsClient:含退避、lastSeq 跟踪
6. 添加连接数/消息速率限流(IP/用户级)
7. 监控指标:连接数、关闭码分布、心跳超时次数、队列深度

返回技能库 更多技能入口