WebSocket 实时通道
在升级握手阶段完成身份校验;消息帧用 schema 校验;心跳、重连与背压防止资源耗尽与僵尸连接。
本页为 Agent 提供 WebSocket 实时服务的完整实施参考:ws 库/Socket.io 服务端连接管理、心跳机制(ping/pong + 超时断开)、客户端指数退避重连、标准消息格式(JSON: type/payload/requestId),以及房间/频道管理实现。
使用 WSS;握手后首条消息完成认证;房间/频道授权在订阅时二次校验;Agent 流式输出配合序号便于客户端去重与断点续传。
- 防刷:IP/用户级连接数与消息速率配额。
- 测试:代理与负载均衡对 WebSocket 的兼容配置(sticky session)。
WebSocket 服务端连接管理(ws 库)
ws 库服务端:连接管理与认证:
// server/websocket.ts — ws 库实现
import { WebSocketServer, WebSocket } from 'ws'
import { IncomingMessage } from 'http'
import { verifyToken } from './auth'
interface WsClient extends WebSocket {
userId: string
isAlive: boolean
rooms: Set<string>
}
const wss = new WebSocketServer({ noServer: true })
const clients = new Map<string, WsClient>()
// HTTP server 升级握手
httpServer.on('upgrade', async (req: IncomingMessage, socket, head) => {
try {
// 从子协议头或查询参数获取 token(避免放 URL 查询串被记录到日志)
const token = req.headers['sec-websocket-protocol']?.split(',')[0]?.trim()
?? new URL(req.url!, 'http://x').searchParams.get('token')
if (!token) return socket.destroy()
const user = await verifyToken(token)
wss.handleUpgrade(req, socket, head, (ws) => {
const client = ws as WsClient
client.userId = user.id
client.isAlive = true
client.rooms = new Set()
clients.set(user.id, client)
wss.emit('connection', client, req)
})
} catch {
socket.destroy()
}
})
wss.on('connection', (ws: WsClient) => {
ws.on('message', (data) => handleMessage(ws, data))
ws.on('close', (code, reason) => {
clients.delete(ws.userId)
console.log(`Client ${ws.userId} disconnected: ${code} ${reason}`)
})
ws.on('pong', () => { ws.isAlive = true })
// 发送连接确认
sendMessage(ws, { type: 'connected', payload: { userId: ws.userId } })
})
标准消息格式(JSON: type/payload/requestId):
// types/ws-messages.ts
interface WsMessage<T = unknown> {
type: string // 消息类型,如 'chat.send' | 'room.join'
payload: T // 业务数据
requestId?: string // 客户端生成,用于请求-响应关联
seq?: number // 序列号,用于去重与断点续传
timestamp?: number // 服务端注入的时间戳
}
// 消息处理分发
function handleMessage(ws: WsClient, raw: Buffer | string) {
let msg: WsMessage
try {
msg = JSON.parse(raw.toString())
} catch {
return sendError(ws, 'invalid_json', 'Message must be valid JSON')
}
switch (msg.type) {
case 'chat.send': return handleChatSend(ws, msg)
case 'room.join': return handleRoomJoin(ws, msg)
case 'room.leave': return handleRoomLeave(ws, msg)
default:
// 未知类型:静默丢弃(协议演进兼容)
console.warn(`Unknown message type: ${msg.type}`)
}
}
function sendMessage(ws: WebSocket, msg: WsMessage) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(msg))
}
}
心跳机制(ping/pong + 超时断开)
服务端心跳:定时 ping,超时断开僵尸连接:
// server/heartbeat.ts
const HEARTBEAT_INTERVAL = 30_000 // 每 30s 发一次 ping
const HEARTBEAT_TIMEOUT = 10_000 // 10s 内无 pong 则断开
function startHeartbeat(wss: WebSocketServer) {
const interval = setInterval(() => {
wss.clients.forEach((ws) => {
const client = ws as WsClient
if (!client.isAlive) {
console.log(`Terminating stale connection: ${client.userId}`)
return client.terminate()
}
client.isAlive = false // 标记为待验证
client.ping() // 发送 WebSocket ping 帧
})
}, HEARTBEAT_INTERVAL)
wss.on('close', () => clearInterval(interval))
return interval
}
// 在 connection 事件中注册 pong 响应
wss.on('connection', (ws: WsClient) => {
ws.isAlive = true
ws.on('pong', () => {
ws.isAlive = true // 收到 pong,标记为存活
})
})
客户端重连策略(指数退避 + 最大重试)
// client/wsClient.ts
interface WsClientOptions {
url: string
maxRetries?: number
baseDelayMs?: number
maxDelayMs?: number
onMessage?: (msg: WsMessage) => void
}
class ReconnectingWsClient {
private ws: WebSocket | null = null
private retryCount = 0
private lastSeq = 0
// 不可恢复的关闭码(不重连)
private static FATAL_CODES = new Set([1008, 4001, 4003])
constructor(private options: WsClientOptions) {}
connect() {
const { url, maxRetries = 5, baseDelayMs = 1000, maxDelayMs = 30_000 } = this.options
this.ws = new WebSocket(`${url}?lastSeq=${this.lastSeq}`)
this.ws.onopen = () => {
this.retryCount = 0
console.log('WebSocket connected')
}
this.ws.onmessage = (event) => {
const msg: WsMessage = JSON.parse(event.data)
if (msg.seq) this.lastSeq = msg.seq // 记录最后收到的序列号
this.options.onMessage?.(msg)
}
this.ws.onclose = (event) => {
if (ReconnectingWsClient.FATAL_CODES.has(event.code)) {
console.error(`Fatal close code ${event.code}: ${event.reason}`)
return // 不重连(鉴权失败等)
}
if (this.retryCount >= (maxRetries)) {
console.error('Max retries exceeded, giving up')
return
}
// 指数退避 + 抖动
const delay = Math.min(
Math.pow(2, this.retryCount) * baseDelayMs * (0.75 + Math.random() * 0.5),
maxDelayMs
)
console.log(`Reconnecting in ${Math.round(delay)}ms (attempt ${++this.retryCount}/${maxRetries})`)
setTimeout(() => this.connect(), delay)
}
}
send(msg: WsMessage) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(msg))
}
}
}
房间/频道管理实现:
// server/rooms.ts
const rooms = new Map<string, Set<string>>() // roomId -> Set<userId>
function joinRoom(ws: WsClient, roomId: string) {
if (!rooms.has(roomId)) rooms.set(roomId, new Set())
rooms.get(roomId)!.add(ws.userId)
ws.rooms.add(roomId)
sendMessage(ws, { type: 'room.joined', payload: { roomId } })
}
function leaveRoom(ws: WsClient, roomId: string) {
rooms.get(roomId)?.delete(ws.userId)
ws.rooms.delete(roomId)
if (rooms.get(roomId)?.size === 0) rooms.delete(roomId)
}
function broadcastToRoom(roomId: string, msg: WsMessage, excludeUserId?: string) {
const userIds = rooms.get(roomId)
if (!userIds) return
for (const userId of userIds) {
if (userId === excludeUserId) continue
const client = clients.get(userId)
if (client) sendMessage(client, msg)
}
}
连接与消息主流程
下列示意覆盖从建连到异常路径;与具体框架的 API 名称无关,重点是鉴权、订阅授权与背压分支不要漏。
[ 客户端发起 WSS 升级 ]
│
▼
┌─────────────────┐ 校验 token / 会话;失败 → 关闭并带业务码
│ 握手鉴权 │
└─────────────────┘
│ 成功
▼
┌─────────────────┐ 频道 / 房间 / 主题:二次授权;记录订阅集合
│ 订阅与授权 │
└─────────────────┘
│
▼
┌─────────────────┐ 入站:schema、配额、速率
│ 消息环 │◄──┐ 出站:队列深度;超限 → 丢弃/合并/反压
└─────────────────┘ │
│ │
├── 心跳 / pong 超时 ──► 优雅关闭,客户端进入重连退避
│
└── 可恢复断开 ─────────► 指数退避 + 抖动;携带游标续传
心跳间隔估算
若服务端在「连续无有效帧」T 秒后关闭连接,客户端心跳周期应明显小于 T,并预留 RTT 与定时器抖动。下方按「可用静默预算 = T − 预留」再乘以占空比 r 估算建议间隔(与 ping 帧或应用层 ping 消息均可对照)。
建议心跳间隔
观测与扩展
指标建议包含:连接数、每连接队列深度、消息速率、关闭码分布、心跳超时次数。水平扩展时结合 Sticky Session 或共享 pub/sub,并在 SKILL 中写明会话粘滞与广播一致性的取舍。
---
name: websocket-realtime-channel
description: 设计或审查 WebSocket 实时服务
---
# 规则
- 使用 WSS(TLS 加密);token 通过子协议头传递,避免 URL 查询参数
- 消息格式:JSON { type, payload, requestId?, seq?, timestamp? }
- 心跳:服务端每 30s ping,10s 内无 pong 则 terminate()
- 重连:指数退避(2^n * base)+ 抖动;区分致命关闭码(4001/4003 不重连)
- 携带 lastSeq 重连,支持断点续传与去重
- 房间广播前二次校验订阅权限
# 步骤
1. 实现 HTTP upgrade 处理:验证 token,注入 userId 到连接对象
2. 注册消息处理器:type 分发,未知类型静默丢弃
3. 启动心跳定时器:setInterval ping + pong 监听
4. 实现房间管理:joinRoom / leaveRoom / broadcastToRoom
5. 客户端实现 ReconnectingWsClient:含退避、lastSeq 跟踪
6. 添加连接数/消息速率限流(IP/用户级)
7. 监控指标:连接数、关闭码分布、心跳超时次数、队列深度