Concurrency patterns
Choose the model from the problem domain: message channels, lock granularity, async pipelines; bake in backpressure, timeouts, and cancellation up front—not as afterthoughts. This page is structural selection; race condition debugging is for interleaving evidence.
The SKILL has the agent write bottleneck class and consistency boundary before talking parallelism. CPU parallelism, I/O multiplexing, fan-out aggregation, stateful actors—each needs different queue depth and cancellation semantics.
Document pool sizes, bounded queues, drop policies, and metrics (queue depth, p95, task age). Unbounded buffers without backpressure often defer pain into OOM and tail-latency explosions.
Channels and message passing
Use bounded channels / mailboxes to pin “who writes, who reads”: single vs multi-consumer must be explicit; document close semantics and “zero value readable” traps (e.g. Go). CSP style fits pipelines; actors fit single-writer encapsulation of mutable state.
- Prefer message passing; when memory must be shared, shrink to read-only snapshots or copy-on-write.
- Fan-out: give each hop its own buffer and timeout so one slow stage does not stall the world.
Go Worker Pool and Node.js worker_threads implementation:
// === Go Worker Pool (bounded goroutine pool) ===
func workerPool(ctx context.Context, jobs <-chan Job, numWorkers int) {
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok return // channel closed, exit
process(job)
case <-ctx.Done():
return // context cancelled, exit
}
}
}()
}
wg.Wait()
}
// Backpressure: block on full queue (bounded channel is naturally backpressured)
jobs := make(chan Job, 100) // buffer 100; send blocks when full
go func() {
for _, j := range allJobs {
select {
case jobs <- j: // blocks on full queue, producing backpressure
case <-ctx.Done(): return
}
}
close(jobs)
}()
// === Node.js worker_threads Pool (CPU-bound tasks) ===
const { Worker, workerData, parentPort, isMainThread } = require("worker_threads");
// worker.js
if (!isMainThread) {
parentPort.on("message", (data) => {
const result = heavyCompute(data);
parentPort.postMessage(result);
});
}
// main.js: create 4 workers
const workers = Array.from({ length: 4 }, () =>
new Worker(__filename)
);
let idx = 0;
function runOnWorker(data) {
return new Promise((resolve, reject) => {
const w = workers[idx++ % workers.length];
w.once("message", resolve);
w.once("error", reject);
w.postMessage(data);
});
}
Locks and critical sections
Mutexes protect invariants; RW locks suit read-heavy workloads but watch writer starvation. Striped locks reduce contention but widen deadlock surface—lock order must be global. In distributed settings use leases, fence tokens, or version columns instead of stretching local critical sections.
- Default to rejecting “concurrency for concurrency’s sake”: prove a hotspot before parallelizing.
- Before shipping a new pattern, pair with race condition debugging on a stress matrix.
Semaphore (limit concurrency count) implementation:
// Node.js Semaphore: limit the number of concurrent async operations
class Semaphore {
constructor(max) {
this.max = max;
this.count = 0;
this.queue = [];
}
acquire() {
return new Promise((resolve) => {
if (this.count < this.max) {
this.count++;
resolve();
} else {
this.queue.push(resolve); // backpressure: queue and wait
}
});
}
release() {
this.count--;
if (this.queue.length > 0) {
this.count++;
const next = this.queue.shift();
next();
}
}
async run(fn) {
await this.acquire();
try { return await fn(); }
finally { this.release(); }
}
}
// Usage: max 5 concurrent HTTP requests
const sem = new Semaphore(5);
const results = await Promise.all(
urls.map(url => sem.run(() => fetch(url)))
);
Async and pipelines
Avoid blocking the event loop; offload CPU segments to pools or child processes. For async pipelines (async generators, queues, Rx-style operators), cap concurrency per stage (e.g. map parallelism). Fan-in collection needs errgroup-style error aggregation and short-circuit cancellation.
- Backpressure: block, rate-limit, sample drop, degrade, or upstream feedback—pick one and write it in the spec.
- Cancellation: thread context / AbortSignal end-to-end; do not keep writing to a channel after shutdown.
Circuit Breaker state machine (three states: Closed / Open / Half-Open):
// Circuit Breaker state machine (Node.js)
const STATE = { CLOSED: "CLOSED", OPEN: "OPEN", HALF_OPEN: "HALF_OPEN" };
class CircuitBreaker {
constructor(opts = {}) {
this.failureThreshold = opts.failureThreshold ?? 5;
this.recoveryTimeout = opts.recoveryTimeout ?? 30_000;
this.state = STATE.CLOSED;
this.failures = 0;
this.nextAttempt = 0;
}
async call(fn) {
if (this.state === STATE.OPEN) {
if (Date.now() < this.nextAttempt)
throw new Error("Circuit OPEN: request rejected");
this.state = STATE.HALF_OPEN; // enter probe state
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (err) {
this.onFailure();
throw err;
}
}
onSuccess() {
this.failures = 0;
this.state = STATE.CLOSED;
}
onFailure() {
this.failures++;
if (this.failures >= this.failureThreshold || this.state === STATE.HALF_OPEN) {
this.state = STATE.OPEN;
this.nextAttempt = Date.now() + this.recoveryTimeout;
}
}
}
// Usage
const cb = new CircuitBreaker({ failureThreshold: 3, recoveryTimeout: 10_000 });
const data = await cb.call(() => fetch("https://api.example.com/data"));
Selection flow (skill-flow-block)
Treat the flow below as a PR description or agent plan skeleton: fix resources and SLA first, structure second, then tune pool width and queues.
[ Workload: CPU / I/O / mixed / strong consistency coordination ]
│
▼
┌──────────────────────┐
│ Where does state live? │
│ single point / sharded / share-nothing │
└──────────────────────┘
│
┌───────────┼───────────┐
▼ ▼ ▼
[ Message [ Locks / [ Pure async pipeline
channel striped + pool offload ]
actor ] critical ]
│ │ │
└───────────┼───────────┘
▼
[ Bounded queue + backpressure (block / rate-limit / sample / degrade) ]
│
▼
[ Cancel/timeout propagate + metrics: queue depth / age / p95 ]
│
▼
[ Stress + race matrix ]
Pattern suggester
Combinations below are design sketches only—not a substitute for profiling. Drag “parallelism intent” to tune copy (pure front-end, not persisted).
Unlike the race page: no timing forensics here—only structural bias and checklist items.
---
name: concurrency-patterns
description: Choose concurrency models and backpressure strategy
---
# Steps
1. Judge CPU / I/O / coordination cost and SLA
2. Pick channels, lock granularity, or async pipeline + bounded queues
3. Design backpressure, cancellation propagation, and observability