并发模式
从问题域选模型:消息通道、锁粒度与异步管线;把背压、超时与取消写进设计,而不是事后补洞。与「竞态排查」分工——本页偏结构选型,彼页偏交错证据。
SKILL 让 Agent 先写清瓶颈类别与一致性边界,再谈并行度。CPU 并行、I/O 多路复用、扇出聚合、有状态 actor——各自配套不同的队列深度与取消语义。
文档化池大小、有界队列、丢弃策略与监控(队列长度、p95、任务年龄)。无背压的无界缓冲常把问题推迟成 OOM 与尾延迟爆炸。
通道与消息传递
用有界 channel / mailbox 把「谁写谁读」钉死:单消费者或多消费者要显式;关闭语义与「零值可读」陷阱(如 Go)写进评审清单。CSP 风格适合流水线;actor 适合封装可变状态的单点写入。
- 优先消息传递;必须共享内存时,缩小为只读快照或 copy-on-write。
- 扇出:为每一跳设独立缓冲与超时,避免一头慢拖死全局。
Go Worker Pool 与 Node.js worker_threads 实现:
// === Go Worker Pool(有界 goroutine 池)===
func workerPool(ctx context.Context, jobs <-chan Job, numWorkers int) {
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok return // channel 关闭,退出
process(job)
case <-ctx.Done():
return // 上下文取消,退出
}
}
}()
}
wg.Wait()
}
// 背压:队列满时阻塞提交(有界 channel 天然背压)
jobs := make(chan Job, 100) // 缓冲 100,超出则阻塞 send
go func() {
for _, j := range allJobs {
select {
case jobs <- j: // 队满则阻塞,产生背压
case <-ctx.Done(): return
}
}
close(jobs)
}()
// === Node.js worker_threads Pool(CPU 密集任务)===
const { Worker, workerData, parentPort, isMainThread } = require("worker_threads");
// worker.js
if (!isMainThread) {
parentPort.on("message", (data) => {
const result = heavyCompute(data);
parentPort.postMessage(result);
});
}
// main.js:创建 4 个 worker
const workers = Array.from({ length: 4 }, () =>
new Worker(__filename)
);
let idx = 0;
function runOnWorker(data) {
return new Promise((resolve, reject) => {
const w = workers[idx++ % workers.length];
w.once("message", resolve);
w.once("error", reject);
w.postMessage(data);
});
}
锁与临界区
互斥锁保护不变量;读写锁适合读多写少但要警惕写饥饿。分段锁降低争用,却增加死锁面——锁顺序必须全局一致。分布式场景用租约、fence token 或版本列,而不是拉长本地临界区。
- 默认拒绝「为并发而并发」:先证明热点再并行化。
- 新模式上线前与 竞态条件排查 联动做压测矩阵。
Semaphore(限制并发数量)实现示例:
// Node.js Semaphore:限制同时并发的 async 操作数量
class Semaphore {
constructor(max) {
this.max = max;
this.count = 0;
this.queue = [];
}
acquire() {
return new Promise((resolve) => {
if (this.count < this.max) {
this.count++;
resolve();
} else {
this.queue.push(resolve); // 背压:排队等待
}
});
}
release() {
this.count--;
if (this.queue.length > 0) {
this.count++;
const next = this.queue.shift();
next();
}
}
async run(fn) {
await this.acquire();
try { return await fn(); }
finally { this.release(); }
}
}
// 使用:最多 5 个并发 HTTP 请求
const sem = new Semaphore(5);
const results = await Promise.all(
urls.map(url => sem.run(() => fetch(url)))
);
异步与管线
事件循环上避免阻塞;CPU 段交给池或子进程。async pipeline 用 async 生成器、队列或 Rx 式操作符时,要为每级设并发上限(如 map 并发度)。扇入收集需 errgroup 式错误聚合与短路取消。
- 背压:阻塞、限速、丢样、降级或反馈上游;四选一写进 spec。
- 取消:context / AbortSignal 贯穿;勿在关闭后仍往通道写。
Circuit Breaker 状态机实现(三态:Closed / Open / Half-Open):
// Circuit Breaker 状态机(Node.js)
const STATE = { CLOSED: "CLOSED", OPEN: "OPEN", HALF_OPEN: "HALF_OPEN" };
class CircuitBreaker {
constructor(opts = {}) {
this.failureThreshold = opts.failureThreshold ?? 5;
this.recoveryTimeout = opts.recoveryTimeout ?? 30_000;
this.state = STATE.CLOSED;
this.failures = 0;
this.nextAttempt = 0;
}
async call(fn) {
if (this.state === STATE.OPEN) {
if (Date.now() < this.nextAttempt)
throw new Error("Circuit OPEN: 拒绝请求");
this.state = STATE.HALF_OPEN; // 进入探测状态
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (err) {
this.onFailure();
throw err;
}
}
onSuccess() {
this.failures = 0;
this.state = STATE.CLOSED;
}
onFailure() {
this.failures++;
if (this.failures >= this.failureThreshold || this.state === STATE.HALF_OPEN) {
this.state = STATE.OPEN;
this.nextAttempt = Date.now() + this.recoveryTimeout;
}
}
}
// 使用
const cb = new CircuitBreaker({ failureThreshold: 3, recoveryTimeout: 10_000 });
const data = await cb.call(() => fetch("https://api.example.com/data"));
选型主流程(skill-flow-block)
把下面流程当作 PR 描述或 Agent 计划骨架:先定资源与 SLA,再定结构,最后才调池宽与队列。
[ 工作负载:CPU / I/O / 混合 / 强一致协调 ]
│
▼
┌──────────────────────┐
│ 状态落点在哪里? │
│ 单点 / 分片 / 无共享 │
└──────────────────────┘
│
┌───────────┼───────────┐
▼ ▼ ▼
[ 消息通道 [ 锁/分段 [ 纯异步管线
actor ] 临界区 ] + 池 offload ]
│ │ │
└───────────┼───────────┘
▼
[ 有界队列 + 背压策略(阻塞/限速/丢样/降级)]
│
▼
[ 取消/超时沿调用链传播 + 指标:队列深 / 年龄 / p95 ]
│
▼
[ 压测与竞态矩阵 ]
模式建议器
下列组合仅作设计速写,不替代 profiling。拖动「并行度意图」可粗调建议文案(纯前端计算,不落盘)。
与竞态页不同:此处不涉及时序取证,只输出结构倾向与检查项。
---
name: concurrency-patterns
description: 选择并发模型、背压策略与安全验证
---
# 并发模型选型
1. 判定 CPU / I/O / 协调成本与 SLA
2. CPU 密集:Worker Pool(Go goroutine pool / Node worker_threads)
3. I/O 密集:事件循环 + async/await,Semaphore 限并发
4. 强一致协调:单 writer 队列 / 分布式锁 / 租约
# 背压策略(四选一写进 spec)
- 阻塞:有界 channel 满时 send 阻塞(Go 天然支持)
- 限速:令牌桶 / 漏桶(rate limiter)
- 丢样:超出阈值直接拒绝并 metrics
- 降级:返回 cached / default 值
# Worker Pool 关键参数
- 池大小 ≤ CPU 核心数(CPU 密集)或 连接池上限(I/O)
- 任务队列有界(防 OOM),队满告警
- 支持 ctx/AbortSignal 取消
# Semaphore 使用场景
- 限制同时并发的外部 API 调用数
- 控制数据库连接复用数
- 防止突发流量打垮下游
# Circuit Breaker 状态转换
- CLOSED →(连续 N 次失败)→ OPEN
- OPEN →(等待 recovery timeout)→ HALF_OPEN
- HALF_OPEN →(探测成功)→ CLOSED
- HALF_OPEN →(探测失败)→ OPEN
# 并发安全验证
- Go:go test -race ./...
- Node.js:Promise.allSettled 并发断言
- 观测:队列深度 / p95 延迟 / 任务年龄