消息队列使用
让 Agent 明确 at-least-once 下的幂等键、去重窗口与顺序分区规则,并把重试、死信(DLQ)与可观测性写进同一套契约。
消息体应版本化并含 trace / correlation id;避免在队列里传递不可序列化句柄或大 blob,必要时用对象存储引用。
消费者须声明:最大并发、预取、可见性超时与 poison 处理;重试退避策略与 DLQ 人工介入流程要可执行。
与事务发件箱(outbox)或 CDC 集成时,在 SKILL 中写清「何时认为消息已提交」与双写故障的恢复步骤。
- 顺序:同一实体 id 作为分区键;跨实体顺序不强保证时显式写出。
- 幂等:业务键 + 去重表或自然主键冲突处理。
- 背压:队列深度告警与消费者扩容/降级策略联动。
At-least-once 与幂等
多数托管队列默认为 at-least-once:同一条可能被投递多次(网络抖动、消费者崩溃在 ack 之前、分区再均衡等)。SKILL 必须假定「处理函数会重复执行」,用幂等键、去重表或自然主键冲突吞掉重复,而不是依赖「恰好一次」幻想。
在契约中写明:幂等键字段名、去重时间窗(与业务 SLA 一致)、以及「重复到达时」的语义(忽略 / 合并 / 报错进 DLQ)。
# 幂等键设计:Redis 去重 + DB 唯一键双重保障(Python)
import redis, json
from datetime import timedelta
r = redis.Redis(decode_responses=True)
DEDUP_WINDOW = timedelta(hours=24) # 去重窗口与业务 SLA 对齐
def process_order_event(message: dict) -> bool:
"""
消息结构:{ "idempotency_key": "evt-uuid-abc", "order_id": 123, ... }
返回 True=处理成功, False=重复忽略
"""
idem_key = message.get("idempotency_key")
if not idem_key:
raise ValueError("missing idempotency_key")
redis_key = f"dedup:order_event:{idem_key}"
# ① Redis SETNX:窗口内第一次到达时设置,已存在则为重复
is_new = r.set(redis_key, "1", ex=int(DEDUP_WINDOW.total_seconds()), nx=True)
if not is_new:
# 重复消息:忽略并 ack,不进 DLQ
return False
try:
# ② DB 唯一键兜底(Redis 重启或键过期后的二次保护)
db_insert_processed_event(
idempotency_key=idem_key,
payload=json.dumps(message)
)
# ③ 实际业务处理
execute_order_business_logic(message)
return True
except UniqueConstraintError:
# DB 唯一键冲突 → 也是重复,忽略
return False
except Exception:
# 处理失败 → 删除 Redis key,让消息可重试
r.delete(redis_key)
raise
# DB 唯一键 DDL(防止 Redis 键过期后重复)
# CREATE TABLE processed_events (
# idempotency_key VARCHAR(128) PRIMARY KEY,
# created_at TIMESTAMP DEFAULT NOW()
# );
# -- 定期清理过期记录(保留窗口内的)
# DELETE FROM processed_events WHERE created_at < NOW() - INTERVAL '24 hours';
重试、退避与 DLQ
瞬时故障用有限次重试 + 指数退避(含抖动),避免惊群与热点分区;永久错误(schema 不兼容、业务不变量违反)应尽快进入 DLQ(死信队列),并附带原始头、失败原因码与重试次数,便于人工或离线工具修复后回放。
- 定义 maxAttempts、退避上下界、以及「何种异常可重试 vs 直接 DLQ」。
- DLQ 消费权限、告警路由、回放审批(谁在什么条件下允许重新入队)。
- Poison message:单条反复失败时隔离,避免阻塞整组消费者。
# 指数退避 + 抖动(Python)
import time, random, math
def exponential_backoff_with_jitter(
attempt: int,
base_ms: int = 100,
max_ms: int = 30_000,
jitter_factor: float = 0.3,
) -> float:
"""
Full Jitter 策略:sleep = random(0, min(cap, base * 2^attempt))
避免多个消费者同时重试形成惊群
"""
cap = min(max_ms, base_ms * (2 ** attempt))
jitter = random.uniform(0, cap * jitter_factor)
delay_ms = cap + jitter
return delay_ms / 1000 # 返回秒
def process_with_retry(message: dict, handler, max_attempts: int = 5):
RETRYABLE = (ConnectionError, TimeoutError)
NON_RETRYABLE = (ValueError, KeyError) # schema 错误直接进 DLQ
for attempt in range(max_attempts):
try:
handler(message)
return # 成功
except NON_RETRYABLE as e:
# 永久错误 → 立即发往 DLQ,不重试
send_to_dlq(message, error=str(e), attempt=attempt)
return
except RETRYABLE as e:
if attempt == max_attempts - 1:
send_to_dlq(message, error=str(e), attempt=attempt)
return
delay = exponential_backoff_with_jitter(attempt)
time.sleep(delay)
# DLQ 消息标准格式
def build_dlq_message(original: dict, error: str, attempt: int) -> dict:
return {
"dlq_metadata": {
"original_topic": original.get("_topic", "unknown"),
"original_partition": original.get("_partition"),
"original_offset": original.get("_offset"),
"failed_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"attempt_count": attempt + 1,
"failure_reason": error,
"idempotency_key": original.get("idempotency_key"),
},
"original_message": original, # 完整原始消息,便于重放
}
# Kafka vs RabbitMQ vs SQS 选择标准
# ┌─────────────────┬──────────────┬──────────────┬──────────────┐
# │ 维度 │ Kafka │ RabbitMQ │ AWS SQS │
# ├─────────────────┼──────────────┼──────────────┼──────────────┤
# │ 投递语义 │ at-least-once│ at-least-once│ at-least-once│
# │ │ (exactly-once│ (可配置) │ │
# │ │ 需事务) │ │ │
# │ 顺序保证 │ 分区内有序 │ 队列内有序 │ 无(FIFO 队 │
# │ │ │ │ 列除外) │
# │ 消息保留 │ 持久化可回放 │ ACK 后删除 │ ACK 后删除 │
# │ 吞吐量 │ 极高(百万/s)│ 高(万级/s) │ 高(弹性) │
# │ 延迟 │ 毫秒级 │ 亚毫秒级 │ 毫秒级 │
# │ 消费者模型 │ Consumer │ Push/Pull │ Long polling │
# │ │ Group Pull │ │ │
# │ 运维复杂度 │ 高(需 ZK/ │ 中 │ 低(托管) │
# │ │ KRaft) │ │ │
# │ 适合场景 │ 事件溯源、 │ 任务队列、 │ 简单任务队列 │
# │ │ 日志流处理 │ RPC-over-MQ │ 无状态微服务 │
# └─────────────────┴──────────────┴──────────────┴──────────────┘
# 选择规则:
# - 需要消息回放/事件溯源 → Kafka
# - 需要灵活路由(exchange/binding)→ RabbitMQ
# - 全托管、AWS 生态、简单场景 → SQS
# 并发配置推导:目标队列延迟 ÷ 单消息处理时间 = 所需并发数
# 例:目标延迟 2s,单消息处理 200ms → 并发 = 2000ms / 200ms = 10
Topic、载荷与观测
Topic / 队列命名、分区键与消费者组 id 写进 SKILL;消息头含版本、trace、租户(若适用)。监控队列深度、龄期、DLQ 速率与消费者 lag,并与扩容或降级 playbook 挂钩。
消费者声明:并发度、预取、可见性/锁超时;与下游 DB 或 API 的超时对齐,避免「消息已再次可见而业务侧仍以为在处理中」的双写歧义。
发件箱与提交边界
与数据库同事务写入 outbox 表,再由 relay 投递到 MQ,可缩小「库已提交但消息未发出」与反过来的窗口。若用 CDC,需说明 ordering、初始快照与 schema 变更策略。SKILL 中明确:业务上何时视为「事件已发布」。
处理链路示意
将下列路径与你们栈中的 ack/nack、可见性超时或 delay queue 术语一一对应,避免文档与实现两套说法。
[ Producer / Outbox relay ]
│
▼
[ Topic / 队列 ]
│
▼
[ Consumer ]
│
┌────┴────┐
▼ ▼
处理成功 处理失败
│ │
ack / 提交 重试次数 < max?
│ │ yes ──► 退避后再次投递(可见性 / delay)
│ │ no ──► [ DLQ ]
│ │ │
│ │ └──► 告警 / 人工修复 / 受控回放
▼
(at-least-once:成功路径也可能重复投递 → 依赖幂等)
---
name: message-queue-usage
description: 定义 topic、幂等键、重试退避与 DLQ 契约
---
# 幂等设计
幂等键字段:idempotency_key(UUID,生产者生成,随消息头携带)
Redis SETNX 去重窗口 = 业务 SLA(通常 24h)
DB 唯一键兜底(PRIMARY KEY on processed_events.idempotency_key)
重复到达语义:忽略并 ack,不进 DLQ
# 重试退避(Full Jitter)
sleep = random(0, min(30000ms, 100ms * 2^attempt))
maxAttempts = 5;可重试:ConnectionError/TimeoutError
不可重试(直接 DLQ):ValueError/KeyError/schema 不兼容
# DLQ 消息格式
dlq_metadata: original_topic, partition, offset, failed_at, attempt_count
dlq_metadata: failure_reason, idempotency_key
original_message: 完整原始消息(便于修复后回放)
# 选型速查
Kafka: 事件溯源/日志流/消息回放;分区内有序;高吞吐
RabbitMQ: 任务队列/灵活路由(exchange)/RPC-over-MQ
SQS: 全托管/AWS生态/简单场景/无顺序需求
# 并发配置推导
所需并发 = 目标队列延迟目标(ms) / 单消息处理时间(ms)
示例:目标延迟 2s,处理 200ms → 并发 = 2000/200 = 10
落地检查清单
勾选会在本机浏览器中记住(localStorage),便于分多次对照。