消息队列使用

让 Agent 明确 at-least-once 下的幂等键、去重窗口与顺序分区规则,并把重试、死信(DLQ)与可观测性写进同一套契约。

消息体应版本化并含 trace / correlation id;避免在队列里传递不可序列化句柄或大 blob,必要时用对象存储引用。

消费者须声明:最大并发、预取、可见性超时与 poison 处理;重试退避策略与 DLQ 人工介入流程要可执行。

与事务发件箱(outbox)或 CDC 集成时,在 SKILL 中写清「何时认为消息已提交」与双写故障的恢复步骤。

  • 顺序:同一实体 id 作为分区键;跨实体顺序不强保证时显式写出。
  • 幂等:业务键 + 去重表或自然主键冲突处理。
  • 背压:队列深度告警与消费者扩容/降级策略联动。

At-least-once 与幂等

多数托管队列默认为 at-least-once:同一条可能被投递多次(网络抖动、消费者崩溃在 ack 之前、分区再均衡等)。SKILL 必须假定「处理函数会重复执行」,用幂等键、去重表或自然主键冲突吞掉重复,而不是依赖「恰好一次」幻想。

在契约中写明:幂等键字段名、去重时间窗(与业务 SLA 一致)、以及「重复到达时」的语义(忽略 / 合并 / 报错进 DLQ)。

对照:exactly-once 端到端需生产者与消费者协同(事务日志、幂等消费、去重存储),成本与运维复杂度显著升高;除非有强约束,否则在 SKILL 中优先文档化 at-least-once + 幂等。
# 幂等键设计:Redis 去重 + DB 唯一键双重保障(Python)
import redis, json
from datetime import timedelta

r = redis.Redis(decode_responses=True)

DEDUP_WINDOW = timedelta(hours=24)   # 去重窗口与业务 SLA 对齐

def process_order_event(message: dict) -> bool:
    """
    消息结构:{ "idempotency_key": "evt-uuid-abc", "order_id": 123, ... }
    返回 True=处理成功, False=重复忽略
    """
    idem_key = message.get("idempotency_key")
    if not idem_key:
        raise ValueError("missing idempotency_key")

    redis_key = f"dedup:order_event:{idem_key}"

    # ① Redis SETNX:窗口内第一次到达时设置,已存在则为重复
    is_new = r.set(redis_key, "1", ex=int(DEDUP_WINDOW.total_seconds()), nx=True)
    if not is_new:
        # 重复消息:忽略并 ack,不进 DLQ
        return False

    try:
        # ② DB 唯一键兜底(Redis 重启或键过期后的二次保护)
        db_insert_processed_event(
            idempotency_key=idem_key,
            payload=json.dumps(message)
        )
        # ③ 实际业务处理
        execute_order_business_logic(message)
        return True
    except UniqueConstraintError:
        # DB 唯一键冲突 → 也是重复,忽略
        return False
    except Exception:
        # 处理失败 → 删除 Redis key,让消息可重试
        r.delete(redis_key)
        raise

# DB 唯一键 DDL(防止 Redis 键过期后重复)
# CREATE TABLE processed_events (
#     idempotency_key VARCHAR(128) PRIMARY KEY,
#     created_at      TIMESTAMP DEFAULT NOW()
# );
# -- 定期清理过期记录(保留窗口内的)
# DELETE FROM processed_events WHERE created_at < NOW() - INTERVAL '24 hours';

重试、退避与 DLQ

瞬时故障用有限次重试 + 指数退避(含抖动),避免惊群与热点分区;永久错误(schema 不兼容、业务不变量违反)应尽快进入 DLQ(死信队列),并附带原始头、失败原因码与重试次数,便于人工或离线工具修复后回放。

  • 定义 maxAttempts、退避上下界、以及「何种异常可重试 vs 直接 DLQ」。
  • DLQ 消费权限、告警路由、回放审批(谁在什么条件下允许重新入队)。
  • Poison message:单条反复失败时隔离,避免阻塞整组消费者。
# 指数退避 + 抖动(Python)
import time, random, math

def exponential_backoff_with_jitter(
    attempt: int,
    base_ms: int = 100,
    max_ms: int = 30_000,
    jitter_factor: float = 0.3,
) -> float:
    """
    Full Jitter 策略:sleep = random(0, min(cap, base * 2^attempt))
    避免多个消费者同时重试形成惊群
    """
    cap = min(max_ms, base_ms * (2 ** attempt))
    jitter = random.uniform(0, cap * jitter_factor)
    delay_ms = cap + jitter
    return delay_ms / 1000   # 返回秒

def process_with_retry(message: dict, handler, max_attempts: int = 5):
    RETRYABLE = (ConnectionError, TimeoutError)
    NON_RETRYABLE = (ValueError, KeyError)  # schema 错误直接进 DLQ

    for attempt in range(max_attempts):
        try:
            handler(message)
            return  # 成功
        except NON_RETRYABLE as e:
            # 永久错误 → 立即发往 DLQ,不重试
            send_to_dlq(message, error=str(e), attempt=attempt)
            return
        except RETRYABLE as e:
            if attempt == max_attempts - 1:
                send_to_dlq(message, error=str(e), attempt=attempt)
                return
            delay = exponential_backoff_with_jitter(attempt)
            time.sleep(delay)

# DLQ 消息标准格式
def build_dlq_message(original: dict, error: str, attempt: int) -> dict:
    return {
        "dlq_metadata": {
            "original_topic": original.get("_topic", "unknown"),
            "original_partition": original.get("_partition"),
            "original_offset": original.get("_offset"),
            "failed_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
            "attempt_count": attempt + 1,
            "failure_reason": error,
            "idempotency_key": original.get("idempotency_key"),
        },
        "original_message": original,  # 完整原始消息,便于重放
    }
# Kafka vs RabbitMQ vs SQS 选择标准
# ┌─────────────────┬──────────────┬──────────────┬──────────────┐
# │ 维度            │ Kafka        │ RabbitMQ     │ AWS SQS      │
# ├─────────────────┼──────────────┼──────────────┼──────────────┤
# │ 投递语义        │ at-least-once│ at-least-once│ at-least-once│
# │                 │ (exactly-once│ (可配置)     │              │
# │                 │  需事务)     │              │              │
# │ 顺序保证        │ 分区内有序   │ 队列内有序   │ 无(FIFO 队  │
# │                 │              │              │ 列除外)      │
# │ 消息保留        │ 持久化可回放 │ ACK 后删除   │ ACK 后删除   │
# │ 吞吐量          │ 极高(百万/s)│ 高(万级/s) │ 高(弹性)   │
# │ 延迟            │ 毫秒级       │ 亚毫秒级     │ 毫秒级       │
# │ 消费者模型      │ Consumer     │ Push/Pull    │ Long polling │
# │                 │ Group Pull   │              │              │
# │ 运维复杂度      │ 高(需 ZK/   │ 中           │ 低(托管)   │
# │                 │ KRaft)      │              │              │
# │ 适合场景        │ 事件溯源、   │ 任务队列、   │ 简单任务队列 │
# │                 │ 日志流处理   │ RPC-over-MQ  │ 无状态微服务 │
# └─────────────────┴──────────────┴──────────────┴──────────────┘
# 选择规则:
# - 需要消息回放/事件溯源 → Kafka
# - 需要灵活路由(exchange/binding)→ RabbitMQ
# - 全托管、AWS 生态、简单场景 → SQS
# 并发配置推导:目标队列延迟 ÷ 单消息处理时间 = 所需并发数
# 例:目标延迟 2s,单消息处理 200ms → 并发 = 2000ms / 200ms = 10

Topic、载荷与观测

Topic / 队列命名、分区键与消费者组 id 写进 SKILL;消息头含版本、trace、租户(若适用)。监控队列深度、龄期、DLQ 速率与消费者 lag,并与扩容或降级 playbook 挂钩。

消费者声明:并发度、预取、可见性/锁超时;与下游 DB 或 API 的超时对齐,避免「消息已再次可见而业务侧仍以为在处理中」的双写歧义。

发件箱与提交边界

与数据库同事务写入 outbox 表,再由 relay 投递到 MQ,可缩小「库已提交但消息未发出」与反过来的窗口。若用 CDC,需说明 ordering、初始快照与 schema 变更策略。SKILL 中明确:业务上何时视为「事件已发布」。

处理链路示意

将下列路径与你们栈中的 ack/nack、可见性超时或 delay queue 术语一一对应,避免文档与实现两套说法。

    [ Producer / Outbox relay ]
              │
              ▼
         [ Topic / 队列 ]
              │
              ▼
         [ Consumer ]
              │
         ┌────┴────┐
         ▼         ▼
      处理成功    处理失败
         │            │
    ack / 提交     重试次数 < max?
         │            │ yes ──► 退避后再次投递(可见性 / delay)
         │            │ no  ──► [ DLQ ]
         │            │              │
         │            │              └──► 告警 / 人工修复 / 受控回放
         ▼
    (at-least-once:成功路径也可能重复投递 → 依赖幂等)
---
name: message-queue-usage
description: 定义 topic、幂等键、重试退避与 DLQ 契约
---
# 幂等设计
幂等键字段:idempotency_key(UUID,生产者生成,随消息头携带)
Redis SETNX 去重窗口 = 业务 SLA(通常 24h)
DB 唯一键兜底(PRIMARY KEY on processed_events.idempotency_key)
重复到达语义:忽略并 ack,不进 DLQ
# 重试退避(Full Jitter)
sleep = random(0, min(30000ms, 100ms * 2^attempt))
maxAttempts = 5;可重试:ConnectionError/TimeoutError
不可重试(直接 DLQ):ValueError/KeyError/schema 不兼容
# DLQ 消息格式
dlq_metadata: original_topic, partition, offset, failed_at, attempt_count
dlq_metadata: failure_reason, idempotency_key
original_message: 完整原始消息(便于修复后回放)
# 选型速查
Kafka: 事件溯源/日志流/消息回放;分区内有序;高吞吐
RabbitMQ: 任务队列/灵活路由(exchange)/RPC-over-MQ
SQS: 全托管/AWS生态/简单场景/无顺序需求
# 并发配置推导
所需并发 = 目标队列延迟目标(ms) / 单消息处理时间(ms)
示例:目标延迟 2s,处理 200ms → 并发 = 2000/200 = 10

落地检查清单

勾选会在本机浏览器中记住(localStorage),便于分多次对照。

返回技能库 更多技能入口