并发模式

从问题域选模型:消息通道、锁粒度与异步管线;把背压、超时与取消写进设计,而不是事后补洞。与「竞态排查」分工——本页偏结构选型,彼页偏交错证据。

SKILL 让 Agent 先写清瓶颈类别一致性边界,再谈并行度。CPU 并行、I/O 多路复用、扇出聚合、有状态 actor——各自配套不同的队列深度与取消语义。

文档化池大小、有界队列、丢弃策略与监控(队列长度、p95、任务年龄)。无背压的无界缓冲常把问题推迟成 OOM 与尾延迟爆炸。

通道与消息传递

用有界 channel / mailbox 把「谁写谁读」钉死:单消费者或多消费者要显式;关闭语义与「零值可读」陷阱(如 Go)写进评审清单。CSP 风格适合流水线;actor 适合封装可变状态的单点写入。

  • 优先消息传递;必须共享内存时,缩小为只读快照或 copy-on-write。
  • 扇出:为每一跳设独立缓冲与超时,避免一头慢拖死全局。

Go Worker Pool 与 Node.js worker_threads 实现:

// === Go Worker Pool(有界 goroutine 池)===
func workerPool(ctx context.Context, jobs <-chan Job, numWorkers int) {
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case job, ok := <-jobs:
                    if !ok return  // channel 关闭,退出
                    process(job)
                case <-ctx.Done():
                    return  // 上下文取消,退出
                }
            }
        }()
    }
    wg.Wait()
}

// 背压:队列满时阻塞提交(有界 channel 天然背压)
jobs := make(chan Job, 100)  // 缓冲 100,超出则阻塞 send
go func() {
    for _, j := range allJobs {
        select {
        case jobs <- j:   // 队满则阻塞,产生背压
        case <-ctx.Done(): return
        }
    }
    close(jobs)
}()

// === Node.js worker_threads Pool(CPU 密集任务)===
const { Worker, workerData, parentPort, isMainThread } = require("worker_threads");
// worker.js
if (!isMainThread) {
  parentPort.on("message", (data) => {
    const result = heavyCompute(data);
    parentPort.postMessage(result);
  });
}
// main.js:创建 4 个 worker
const workers = Array.from({ length: 4 }, () =>
  new Worker(__filename)
);
let idx = 0;
function runOnWorker(data) {
  return new Promise((resolve, reject) => {
    const w = workers[idx++ % workers.length];
    w.once("message", resolve);
    w.once("error", reject);
    w.postMessage(data);
  });
}

锁与临界区

互斥锁保护不变量;读写锁适合读多写少但要警惕写饥饿。分段锁降低争用,却增加死锁面——锁顺序必须全局一致。分布式场景用租约、fence token 或版本列,而不是拉长本地临界区。

  • 默认拒绝「为并发而并发」:先证明热点再并行化。
  • 新模式上线前与 竞态条件排查 联动做压测矩阵。

Semaphore(限制并发数量)实现示例:

// Node.js Semaphore:限制同时并发的 async 操作数量
class Semaphore {
  constructor(max) {
    this.max = max;
    this.count = 0;
    this.queue = [];
  }
  acquire() {
    return new Promise((resolve) => {
      if (this.count < this.max) {
        this.count++;
        resolve();
      } else {
        this.queue.push(resolve);  // 背压:排队等待
      }
    });
  }
  release() {
    this.count--;
    if (this.queue.length > 0) {
      this.count++;
      const next = this.queue.shift();
      next();
    }
  }
  async run(fn) {
    await this.acquire();
    try { return await fn(); }
    finally { this.release(); }
  }
}

// 使用:最多 5 个并发 HTTP 请求
const sem = new Semaphore(5);
const results = await Promise.all(
  urls.map(url => sem.run(() => fetch(url)))
);

异步与管线

事件循环上避免阻塞;CPU 段交给池或子进程。async pipeline 用 async 生成器、队列或 Rx 式操作符时,要为每级设并发上限(如 map 并发度)。扇入收集需 errgroup 式错误聚合与短路取消。

  • 背压:阻塞、限速、丢样、降级或反馈上游;四选一写进 spec。
  • 取消:context / AbortSignal 贯穿;勿在关闭后仍往通道写。

Circuit Breaker 状态机实现(三态:Closed / Open / Half-Open):

// Circuit Breaker 状态机(Node.js)
const STATE = { CLOSED: "CLOSED", OPEN: "OPEN", HALF_OPEN: "HALF_OPEN" };

class CircuitBreaker {
  constructor(opts = {}) {
    this.failureThreshold = opts.failureThreshold ?? 5;
    this.recoveryTimeout = opts.recoveryTimeout ?? 30_000;
    this.state = STATE.CLOSED;
    this.failures = 0;
    this.nextAttempt = 0;
  }

  async call(fn) {
    if (this.state === STATE.OPEN) {
      if (Date.now() < this.nextAttempt)
        throw new Error("Circuit OPEN: 拒绝请求");
      this.state = STATE.HALF_OPEN;  // 进入探测状态
    }
    try {
      const result = await fn();
      this.onSuccess();
      return result;
    } catch (err) {
      this.onFailure();
      throw err;
    }
  }

  onSuccess() {
    this.failures = 0;
    this.state = STATE.CLOSED;
  }

  onFailure() {
    this.failures++;
    if (this.failures >= this.failureThreshold || this.state === STATE.HALF_OPEN) {
      this.state = STATE.OPEN;
      this.nextAttempt = Date.now() + this.recoveryTimeout;
    }
  }
}

// 使用
const cb = new CircuitBreaker({ failureThreshold: 3, recoveryTimeout: 10_000 });
const data = await cb.call(() => fetch("https://api.example.com/data"));

选型主流程(skill-flow-block)

把下面流程当作 PR 描述或 Agent 计划骨架:先定资源与 SLA,再定结构,最后才调池宽与队列。

  [ 工作负载:CPU / I/O / 混合 / 强一致协调 ]
                    │
                    ▼
         ┌──────────────────────┐
         │  状态落点在哪里?      │
         │  单点 / 分片 / 无共享   │
         └──────────────────────┘
                    │
        ┌───────────┼───────────┐
        ▼           ▼           ▼
  [ 消息通道    [ 锁/分段    [ 纯异步管线
     actor ]      临界区 ]     + 池 offload ]
        │           │           │
        └───────────┼───────────┘
                    ▼
         [ 有界队列 + 背压策略(阻塞/限速/丢样/降级)]
                    │
                    ▼
         [ 取消/超时沿调用链传播 + 指标:队列深 / 年龄 / p95 ]
                    │
                    ▼
              [ 压测与竞态矩阵 ]

模式建议器

下列组合仅作设计速写,不替代 profiling。拖动「并行度意图」可粗调建议文案(纯前端计算,不落盘)。

并行度意图 标度 3 / 5

              

与竞态页不同:此处不涉及时序取证,只输出结构倾向与检查项。

---
name: concurrency-patterns
description: 选择并发模型、背压策略与安全验证
---
# 并发模型选型
1. 判定 CPU / I/O / 协调成本与 SLA
2. CPU 密集:Worker Pool(Go goroutine pool / Node worker_threads)
3. I/O 密集:事件循环 + async/await,Semaphore 限并发
4. 强一致协调:单 writer 队列 / 分布式锁 / 租约

# 背压策略(四选一写进 spec)
- 阻塞:有界 channel 满时 send 阻塞(Go 天然支持)
- 限速:令牌桶 / 漏桶(rate limiter)
- 丢样:超出阈值直接拒绝并 metrics
- 降级:返回 cached / default 值

# Worker Pool 关键参数
- 池大小 ≤ CPU 核心数(CPU 密集)或 连接池上限(I/O)
- 任务队列有界(防 OOM),队满告警
- 支持 ctx/AbortSignal 取消

# Semaphore 使用场景
- 限制同时并发的外部 API 调用数
- 控制数据库连接复用数
- 防止突发流量打垮下游

# Circuit Breaker 状态转换
- CLOSED  →(连续 N 次失败)→ OPEN
- OPEN    →(等待 recovery timeout)→ HALF_OPEN
- HALF_OPEN →(探测成功)→ CLOSED
- HALF_OPEN →(探测失败)→ OPEN

# 并发安全验证
- Go:go test -race ./...
- Node.js:Promise.allSettled 并发断言
- 观测:队列深度 / p95 延迟 / 任务年龄

返回技能库 更多技能入口