事件驱动设计

本页给出可直接运行的事件驱动示例:CloudEvents 规范格式(type/source/subject/datacontenttype)、事件 schema 向前/向后兼容版本控制策略、Saga choreography vs orchestration 代码对比,以及分区键选择保证事件顺序;帮助 Agent 区分命令与事件、用 Outbox 对齐一致性、用 Saga 写清补偿流程。

领域事件宜过去式命名并携带最小必要载荷;集成事件与内部事件分层,避免把聚合内部细节泄漏给全局总线。

跨服务一致性优先写清「最终一致时间窗」与人工介入点;Saga 每步需可重试、可补偿或可追溯中止原因。

在 SKILL 中要求列出消费者清单与订阅版本策略,防止隐式耦合导致部署顺序地狱。

一页摘要

  • 命令改状态、事件陈述事实;发布方对集成事件的 schema 与语义版本负责。
  • 需要与本地事务同事务可见时,用 Outbox(或经审计的 CDC)避免双写竞态。
  • 跨多边界的长事务用 Saga:显式状态机、补偿顺序、与消息投递语义(至少一次 + 幂等)对齐。
// CloudEvents 1.0 规范事件格式(JSON)
{
  "specversion": "1.0",
  "id": "b46d462a-4c6f-4c1e-9e8b-3f5d1c7a9b2c",
  "type": "com.acme.ordering.order.placed.v1",
  "source": "/ordering-service/orders",
  "subject": "order/88421",
  "time": "2024-03-15T10:30:00Z",
  "datacontenttype": "application/json",
  "dataschema": "https://schemas.acme.com/ordering/order-placed/v1.json",
  "traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
  "data": {
    "orderId": "88421",
    "customerId": "cust-12345",
    "totalAmount": 99.50,
    "currency": "CNY",
    "items": [{ "skuId": "sku-001", "qty": 2, "price": 49.75 }]
  }
}

// 事件命名约定:{domain}.{aggregate}.{event}.{version}
// type: "com.acme.ordering.order.placed.v1"
//       ───────── ──────── ───── ─────── ──
//       组织       领域     聚合  事件名  版本
//
// 分区键(Kafka/顺序保证):使用 subject 中的聚合 ID
// kafka_key = "order/88421"  → 同一订单的事件落入同一分区,保证顺序
// 事件 schema 版本控制:向前/向后兼容策略
// 向后兼容(新版本消费者可读旧版本事件):
//   ✅ 新增可选字段(带 default)
//   ✅ 扩展枚举值
//   ❌ 删除字段 / 修改字段类型 / 重命名字段

// v1 事件(旧)
{ "orderId": "88421", "status": "placed" }

// v2 事件(新增可选字段,向后兼容)
{ "orderId": "88421", "status": "placed", "customerId": "cust-123" }

// 向前兼容(旧版本消费者可读新版本事件,忽略未知字段):
// 消费者需用 @JsonIgnoreProperties(ignoreUnknown=true) 或等效
// 不兼容变更时:发布新 type(.v2)并维护双版本消费并行窗口

// Python 示例(pydantic 向前兼容)
from pydantic import BaseModel, ConfigDict
class OrderPlacedV1(BaseModel):
    model_config = ConfigDict(extra="ignore")   # 忽略未知字段
    orderId: str
    status: str
    customerId: str | None = None               # 新增可选字段有默认值

端到端流(skill-flow-block)

  [ 命令进入:API / 调度 / 集成适配器 ]
                    │
                    ▼
         ┌──────────────────────┐
         │  聚合事务:改状态 +     │──── 领域内:领域事件(可内存收集)
         │  写 Outbox 行(同事务) │      集成事件载荷在映射层裁剪
         └──────────────────────┘
                    │
                    ▼
         ┌──────────────────────┐
         │  Outbox 投递器:轮询/    │──── 标记已发布 / 重试 / 死信;
         │  CDC / 日志 tail       │      与 broker 的「至少一次」对齐
         └──────────────────────┘
                    │
                    ▼
         ┌──────────────────────┐
         │  消息总线:topic /      │──── 顺序键、TTL、DLQ、schema registry
         │  分区键 / 订阅组       │
         └──────────────────────┘
                    │
        ┌───────────┴───────────┐
        ▼                       ▼
  [ 消费者:幂等处理 ]    [ Saga:编排器或协同 ]
        │                       │
        └───────────┬───────────┘
                    ▼
              [ 观测:trace / 业务指标 / 审计 ]

若跳过 Outbox,须在 SKILL 中写明可接受的丢失窗口或替代(例如同步 API + 重试风暴风险),避免默认假设「发后即达」。

事务发件箱(Outbox)

在同一数据库事务内写入业务行与 outbox 表(或等价日志条目),由独立进程按序发布,保证「业务已提交」与「对外可见消息」单调一致。

  • 表结构:事件类型、payload 版本、聚合 id、因果 id(correlation / causation)、发布状态与时间戳。
  • 投递器:单写者或分区锁防重复发布;失败退避与 poison 策略与 DLQ 文档一致。
  • 与 CDC 对比:Outbox 显式建模消息边界;CDC 需处理变更噪声与投影延迟,在 SKILL 中写清 SLI。
-- Outbox 表 DDL(PostgreSQL)
CREATE TABLE outbox_events (
    id              UUID          PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type  VARCHAR(64)   NOT NULL,   -- 如 'Order'
    aggregate_id    VARCHAR(128)  NOT NULL,   -- 如 '88421'
    event_type      VARCHAR(128)  NOT NULL,   -- 如 'com.acme.ordering.order.placed.v1'
    payload         JSONB         NOT NULL,
    correlation_id  UUID,                      -- 追踪因果链
    causation_id    UUID,                      -- 触发本事件的上游事件 ID
    status          VARCHAR(16)   NOT NULL DEFAULT 'pending',  -- pending/published/failed
    created_at      TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMPTZ,
    retry_count     INT           NOT NULL DEFAULT 0
);

CREATE INDEX idx_outbox_pending ON outbox_events (created_at) WHERE status = 'pending';

-- 业务写入 + outbox 同事务(Python + psycopg2)
def place_order(conn, order_data: dict) -> str:
    with conn.cursor() as cur:
        # 1. 写业务数据
        cur.execute("INSERT INTO orders (...) VALUES (...) RETURNING id", order_data)
        order_id = cur.fetchone()[0]
        # 2. 同事务写 outbox
        cur.execute("""
            INSERT INTO outbox_events (aggregate_type, aggregate_id, event_type, payload)
            VALUES ('Order', %s, 'com.acme.ordering.order.placed.v1', %s)
        """, (str(order_id), json.dumps({"orderId": str(order_id), **order_data})))
    conn.commit()   # 两者原子提交
    return order_id

Saga:编排、协同与补偿

编排(orchestration):中央进程发命令并等待完成事件,心智集中但存在协调者可用性风险。协同(choreography):各服务订阅彼此事件,去中心化但需强契约与可观测性。

  • 每步定义:输入事件、本地副作用、成功事件、失败时补偿或重试策略、超时与人工工单入口。
  • 补偿宜语义化(撤销预订、退款)而非物理删除;无法补偿时写明冻结账户或运营流程。
  • 与 Outbox 结合:Saga 日志与本地事务同事务或明确最终一致段落,避免「已发下一步、上步未落库」的幽灵状态。
## Saga 模式对比

### Choreography(协同)— 去中心化,各服务订阅彼此事件
# OrderService 发布 OrderPlaced
# InventoryService 监听 → 扣库存 → 发 InventoryReserved
# PaymentService   监听 → 扣款   → 发 PaymentProcessed
# 补偿:PaymentFailed → InventoryService 监听 → 释放库存 → 发 InventoryReleased

# 优:无单点,松耦合
# 缺:事件链追踪困难,测试复杂,增加服务时需修改多处订阅

### Orchestration(编排)— 中央协调器显式控制步骤
class OrderSaga:
    def __init__(self, order_id: str):
        self.order_id = order_id
        self.state = "STARTED"

    def execute(self):
        try:
            # Step 1: 预留库存
            self.state = "RESERVING_INVENTORY"
            inventory_svc.reserve(self.order_id)

            # Step 2: 扣款
            self.state = "PROCESSING_PAYMENT"
            payment_svc.charge(self.order_id)

            # Step 3: 发货
            self.state = "SHIPPING"
            shipping_svc.ship(self.order_id)

            self.state = "COMPLETED"
        except InventoryError:
            self.compensate_inventory()
            self.state = "FAILED"
            raise
        except PaymentError:
            self.compensate_payment()
            self.compensate_inventory()  # 补偿顺序与正向相反
            self.state = "FAILED"
            raise

    def compensate_inventory(self):
        # 语义化补偿:释放预留,而非物理删除记录
        inventory_svc.release(self.order_id)

    def compensate_payment(self):
        payment_svc.refund(self.order_id)

# 优:流程集中,易追踪,超时与重试集中管理
# 缺:协调者是单点,需高可用部署

事件契约与消费者

  • 编排 vs 协同:中央编排器的单点与去中心化权衡。
  • 顺序与重复:与消息基础设施的投递语义对齐;消费者幂等键与去重窗口写进契约。
  • 演进:事件 schema 兼容 forward / backward 规则摘要;废弃字段与双写读新策略。

SKILL 前置元数据示例

---
name: event-driven-design
description: 梳理领域事件流、CloudEvents 格式与跨服务 Saga 协作
---
# 事件契约
格式: CloudEvents 1.0(specversion/id/type/source/subject/time/data)
type 命名: {org}.{domain}.{aggregate}.{event}.{version}
分区键: subject 中的聚合 ID,保证同一聚合事件顺序
# Schema 版本控制
向后兼容: 新增可选字段(有默认值) / 扩展枚举
不兼容: 发布新 type(.v2),消费者并行消费双版本窗口
消费者侧: extra="ignore"(忽略未知字段)保证向前兼容
# Outbox 模式
表: id/aggregate_type/aggregate_id/event_type/payload/status/retry_count
同事务写入业务行 + outbox;投递器轮询 status=pending 发布
# Saga 选择
Choreography: 简单流程/松耦合/少于 3 步 → 各服务订阅事件
Orchestration: 复杂流程/需统一追踪/超时管理 → 中央协调器
补偿顺序: 与正向步骤相反;语义化(refund/release)而非物理删除
# 输出结构
事件卡片: 名称/type/source/payload字段/消费者清单
Saga 图: 步骤/成功事件/失败补偿/超时入口
失败: 重试(at-least-once+幂等) / 补偿 / 人工工单

事件卡片草稿实验室(evt-page JS)

用于评审或粘贴进工单:根据勾选自动补上 Outbox / Saga 检查项。

模式勾选

              

实验室输出仅供沟通;真实契约须与 schema registry、消费者仓库及 Saga 状态机定义一致。

返回技能库 更多技能入口