事件驱动设计
本页给出可直接运行的事件驱动示例:CloudEvents 规范格式(type/source/subject/datacontenttype)、事件 schema 向前/向后兼容版本控制策略、Saga choreography vs orchestration 代码对比,以及分区键选择保证事件顺序;帮助 Agent 区分命令与事件、用 Outbox 对齐一致性、用 Saga 写清补偿流程。
领域事件宜过去式命名并携带最小必要载荷;集成事件与内部事件分层,避免把聚合内部细节泄漏给全局总线。
跨服务一致性优先写清「最终一致时间窗」与人工介入点;Saga 每步需可重试、可补偿或可追溯中止原因。
在 SKILL 中要求列出消费者清单与订阅版本策略,防止隐式耦合导致部署顺序地狱。
一页摘要
- 命令改状态、事件陈述事实;发布方对集成事件的 schema 与语义版本负责。
- 需要与本地事务同事务可见时,用 Outbox(或经审计的 CDC)避免双写竞态。
- 跨多边界的长事务用 Saga:显式状态机、补偿顺序、与消息投递语义(至少一次 + 幂等)对齐。
// CloudEvents 1.0 规范事件格式(JSON)
{
"specversion": "1.0",
"id": "b46d462a-4c6f-4c1e-9e8b-3f5d1c7a9b2c",
"type": "com.acme.ordering.order.placed.v1",
"source": "/ordering-service/orders",
"subject": "order/88421",
"time": "2024-03-15T10:30:00Z",
"datacontenttype": "application/json",
"dataschema": "https://schemas.acme.com/ordering/order-placed/v1.json",
"traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
"data": {
"orderId": "88421",
"customerId": "cust-12345",
"totalAmount": 99.50,
"currency": "CNY",
"items": [{ "skuId": "sku-001", "qty": 2, "price": 49.75 }]
}
}
// 事件命名约定:{domain}.{aggregate}.{event}.{version}
// type: "com.acme.ordering.order.placed.v1"
// ───────── ──────── ───── ─────── ──
// 组织 领域 聚合 事件名 版本
//
// 分区键(Kafka/顺序保证):使用 subject 中的聚合 ID
// kafka_key = "order/88421" → 同一订单的事件落入同一分区,保证顺序
// 事件 schema 版本控制:向前/向后兼容策略
// 向后兼容(新版本消费者可读旧版本事件):
// ✅ 新增可选字段(带 default)
// ✅ 扩展枚举值
// ❌ 删除字段 / 修改字段类型 / 重命名字段
// v1 事件(旧)
{ "orderId": "88421", "status": "placed" }
// v2 事件(新增可选字段,向后兼容)
{ "orderId": "88421", "status": "placed", "customerId": "cust-123" }
// 向前兼容(旧版本消费者可读新版本事件,忽略未知字段):
// 消费者需用 @JsonIgnoreProperties(ignoreUnknown=true) 或等效
// 不兼容变更时:发布新 type(.v2)并维护双版本消费并行窗口
// Python 示例(pydantic 向前兼容)
from pydantic import BaseModel, ConfigDict
class OrderPlacedV1(BaseModel):
model_config = ConfigDict(extra="ignore") # 忽略未知字段
orderId: str
status: str
customerId: str | None = None # 新增可选字段有默认值
端到端流(skill-flow-block)
[ 命令进入:API / 调度 / 集成适配器 ]
│
▼
┌──────────────────────┐
│ 聚合事务:改状态 + │──── 领域内:领域事件(可内存收集)
│ 写 Outbox 行(同事务) │ 集成事件载荷在映射层裁剪
└──────────────────────┘
│
▼
┌──────────────────────┐
│ Outbox 投递器:轮询/ │──── 标记已发布 / 重试 / 死信;
│ CDC / 日志 tail │ 与 broker 的「至少一次」对齐
└──────────────────────┘
│
▼
┌──────────────────────┐
│ 消息总线:topic / │──── 顺序键、TTL、DLQ、schema registry
│ 分区键 / 订阅组 │
└──────────────────────┘
│
┌───────────┴───────────┐
▼ ▼
[ 消费者:幂等处理 ] [ Saga:编排器或协同 ]
│ │
└───────────┬───────────┘
▼
[ 观测:trace / 业务指标 / 审计 ]
若跳过 Outbox,须在 SKILL 中写明可接受的丢失窗口或替代(例如同步 API + 重试风暴风险),避免默认假设「发后即达」。
事务发件箱(Outbox)
在同一数据库事务内写入业务行与 outbox 表(或等价日志条目),由独立进程按序发布,保证「业务已提交」与「对外可见消息」单调一致。
- 表结构:事件类型、payload 版本、聚合 id、因果 id(correlation / causation)、发布状态与时间戳。
- 投递器:单写者或分区锁防重复发布;失败退避与 poison 策略与 DLQ 文档一致。
- 与 CDC 对比:Outbox 显式建模消息边界;CDC 需处理变更噪声与投影延迟,在 SKILL 中写清 SLI。
-- Outbox 表 DDL(PostgreSQL)
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(64) NOT NULL, -- 如 'Order'
aggregate_id VARCHAR(128) NOT NULL, -- 如 '88421'
event_type VARCHAR(128) NOT NULL, -- 如 'com.acme.ordering.order.placed.v1'
payload JSONB NOT NULL,
correlation_id UUID, -- 追踪因果链
causation_id UUID, -- 触发本事件的上游事件 ID
status VARCHAR(16) NOT NULL DEFAULT 'pending', -- pending/published/failed
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
retry_count INT NOT NULL DEFAULT 0
);
CREATE INDEX idx_outbox_pending ON outbox_events (created_at) WHERE status = 'pending';
-- 业务写入 + outbox 同事务(Python + psycopg2)
def place_order(conn, order_data: dict) -> str:
with conn.cursor() as cur:
# 1. 写业务数据
cur.execute("INSERT INTO orders (...) VALUES (...) RETURNING id", order_data)
order_id = cur.fetchone()[0]
# 2. 同事务写 outbox
cur.execute("""
INSERT INTO outbox_events (aggregate_type, aggregate_id, event_type, payload)
VALUES ('Order', %s, 'com.acme.ordering.order.placed.v1', %s)
""", (str(order_id), json.dumps({"orderId": str(order_id), **order_data})))
conn.commit() # 两者原子提交
return order_id
Saga:编排、协同与补偿
编排(orchestration):中央进程发命令并等待完成事件,心智集中但存在协调者可用性风险。协同(choreography):各服务订阅彼此事件,去中心化但需强契约与可观测性。
- 每步定义:输入事件、本地副作用、成功事件、失败时补偿或重试策略、超时与人工工单入口。
- 补偿宜语义化(撤销预订、退款)而非物理删除;无法补偿时写明冻结账户或运营流程。
- 与 Outbox 结合:Saga 日志与本地事务同事务或明确最终一致段落,避免「已发下一步、上步未落库」的幽灵状态。
## Saga 模式对比
### Choreography(协同)— 去中心化,各服务订阅彼此事件
# OrderService 发布 OrderPlaced
# InventoryService 监听 → 扣库存 → 发 InventoryReserved
# PaymentService 监听 → 扣款 → 发 PaymentProcessed
# 补偿:PaymentFailed → InventoryService 监听 → 释放库存 → 发 InventoryReleased
# 优:无单点,松耦合
# 缺:事件链追踪困难,测试复杂,增加服务时需修改多处订阅
### Orchestration(编排)— 中央协调器显式控制步骤
class OrderSaga:
def __init__(self, order_id: str):
self.order_id = order_id
self.state = "STARTED"
def execute(self):
try:
# Step 1: 预留库存
self.state = "RESERVING_INVENTORY"
inventory_svc.reserve(self.order_id)
# Step 2: 扣款
self.state = "PROCESSING_PAYMENT"
payment_svc.charge(self.order_id)
# Step 3: 发货
self.state = "SHIPPING"
shipping_svc.ship(self.order_id)
self.state = "COMPLETED"
except InventoryError:
self.compensate_inventory()
self.state = "FAILED"
raise
except PaymentError:
self.compensate_payment()
self.compensate_inventory() # 补偿顺序与正向相反
self.state = "FAILED"
raise
def compensate_inventory(self):
# 语义化补偿:释放预留,而非物理删除记录
inventory_svc.release(self.order_id)
def compensate_payment(self):
payment_svc.refund(self.order_id)
# 优:流程集中,易追踪,超时与重试集中管理
# 缺:协调者是单点,需高可用部署
事件契约与消费者
- 编排 vs 协同:中央编排器的单点与去中心化权衡。
- 顺序与重复:与消息基础设施的投递语义对齐;消费者幂等键与去重窗口写进契约。
- 演进:事件 schema 兼容 forward / backward 规则摘要;废弃字段与双写读新策略。
SKILL 前置元数据示例
---
name: event-driven-design
description: 梳理领域事件流、CloudEvents 格式与跨服务 Saga 协作
---
# 事件契约
格式: CloudEvents 1.0(specversion/id/type/source/subject/time/data)
type 命名: {org}.{domain}.{aggregate}.{event}.{version}
分区键: subject 中的聚合 ID,保证同一聚合事件顺序
# Schema 版本控制
向后兼容: 新增可选字段(有默认值) / 扩展枚举
不兼容: 发布新 type(.v2),消费者并行消费双版本窗口
消费者侧: extra="ignore"(忽略未知字段)保证向前兼容
# Outbox 模式
表: id/aggregate_type/aggregate_id/event_type/payload/status/retry_count
同事务写入业务行 + outbox;投递器轮询 status=pending 发布
# Saga 选择
Choreography: 简单流程/松耦合/少于 3 步 → 各服务订阅事件
Orchestration: 复杂流程/需统一追踪/超时管理 → 中央协调器
补偿顺序: 与正向步骤相反;语义化(refund/release)而非物理删除
# 输出结构
事件卡片: 名称/type/source/payload字段/消费者清单
Saga 图: 步骤/成功事件/失败补偿/超时入口
失败: 重试(at-least-once+幂等) / 补偿 / 人工工单
事件卡片草稿实验室(evt-page JS)
用于评审或粘贴进工单:根据勾选自动补上 Outbox / Saga 检查项。
实验室输出仅供沟通;真实契约须与 schema registry、消费者仓库及 Saga 状态机定义一致。