智能体编排
把「一个提示做完所有事」改成显式 DAG + 契约:每步有类型校验的输入输出,失败从断点恢复,并行缩短墙钟,人在环可插拔。本页给出 sequential / parallel / conditional 三种拓扑的对比代码、可直接复用的 JSON handoff 格式与循环依赖检测实现。
编排 = 显式阶段 + 契约 + 状态。满足以下任意一条即应引入编排层:≥2 个外部写操作、步骤间存在结构化数据依赖、需从特定步骤恢复(非整段重跑)、或单次 token 预算超过上下文容量 30%。简单问答和单工具查询仍用单轮提示——编排有成本,别过度设计。
何时需要编排
用 structured output 让 Orchestrator 输出 subtask 列表,而非在系统提示里用自然语言描述步骤——这样可在执行前做静态校验(类型、循环依赖、预算):
from pydantic import BaseModel
from typing import Literal
class SubTask(BaseModel):
id: str # snake_case,全局唯一
title: str # 动词开头,一句话说清
depends_on: list[str] # 前置 subtask id 列表(空列表=无依赖)
tool: Literal["search", "code_exec", "write_file", "human_review"]
timeout_seconds: int = 60
max_retries: int = 3
is_write_op: bool = False # 写操作强制 idempotency_key 或人在环
class TaskPlan(BaseModel):
goal: str
subtasks: list[SubTask]
max_total_steps: int = 20 # 硬上限,防止失控循环
# 触发编排的判断(满足任意一条):
# ✓ ≥2 个外部写操作(数据库/文件/第三方 API)
# ✓ 预估 token > 上下文容量 30%(需分步压缩上下文)
# ✓ 存在明确的 A→B 数据依赖(B 的输入是 A 的结构化字段)
# ✓ 失败后需从特定步骤恢复,而非整段重跑
# ✗ 单轮问答、单工具查询 → 直接用单轮提示,无需编排
plan = await llm.structured_output(
f"将目标分解为 SubTask 列表:{goal}", schema=TaskPlan
)
拓扑与数据流
三种核心模式,根据依赖关系选择,可嵌套组合:
Sequential Chain(串行链)—— 强依赖、出错即中止
A ──▶ B ──▶ C ──▶ D
适用:A 的输出是 B 的必填输入;每步 schema 校验失败立即中止
Parallel Fan-out(并行扇出)—— 独立子任务,墙钟最短
┌──▶ B ──┐
A ──▶ Fan ├──▶ C ──┤──▶ Merge ──▶ E
└──▶ D ──┘
墙钟 ≈ max(B,C,D);Merge 点做 schema + 业务不变量校验
Conditional Branch(条件分支)—— 运行时路由
A ──▶ Router ──▶ confidence > 0.8 ──▶ B(自动执行)
└──▶ else ──▶ C(人工审核队列)
执行前用拓扑排序检测循环依赖,同批次无依赖的任务并行执行:
def topological_sort(subtasks: list[SubTask]) -> list[list[SubTask]]:
"""返回按依赖层级分组的执行批次;检测到循环依赖立即抛出"""
graph = {t.id: set(t.depends_on) for t in subtasks}
id_to_task = {t.id: t for t in subtasks}
batches = []
while graph:
ready = [id for id, deps in graph.items() if not deps]
if not ready:
raise ValueError(f"循环依赖,涉及节点: {list(graph.keys())}")
batches.append([id_to_task[id] for id in ready])
for id in ready:
del graph[id]
for deps in graph.values():
deps -= set(ready)
return batches # [[step1], [step2a, step2b], [step3], ...]
import asyncio
async def run_plan(plan: TaskPlan) -> dict:
batches = topological_sort(plan.subtasks)
results = {}
for batch in batches:
coros = [execute_subtask(t, {d: results[d] for d in t.depends_on})
for t in batch]
batch_out = await asyncio.gather(*coros, return_exceptions=True)
for t, r in zip(batch, batch_out):
if isinstance(r, Exception):
raise RuntimeError(f"step {t.id} failed: {r}")
results[t.id] = r
return results
子代理与交接
子代理间通过结构化 JSON handoff 传递上下文,避免自然语言长复述导致信息失真。以下是可直接复用的 Handoff 载荷格式,含幂等键、失败原因与重试计数:
{
"run_id": "run_20240115T103000Z_abc123",
"step_id": "step_02_code_review",
"parent_step_id": "step_01_code_fetch",
"payload": {
"files": [
{ "path": "src/auth.py", "content_hash": "sha256:aef9...", "diff": "..." }
],
"context": { "pr_number": 1234, "author": "alice", "base_branch": "main" }
},
"payload_checksum": "sha256:def0...",
"idempotency_key": "run_abc123:step_02",
"last_error": null,
"retry_count": 0,
"max_retries": 3,
"timeout_ms": 120000,
"created_at": "2024-01-15T10:30:05Z"
}
- 子代理只产出 patch 或建议,由 Orchestrator 统一执行写操作,避免多代理无锁写同一资源。
- 并行分支在合并点用
payload_checksum验证数据完整性,再进入副作用步骤。 - 接收方校验
payload_checksum;不匹配时拒绝执行并记录last_error。
可观测、成本与幂等
每步记录 trace id、耗时、token、工具错误码与重试次数,便于回放拓扑与压测。对检索与长上下文步骤设预算与早停;编排整体应幂等:同一 run_id 重复执行不产生重复副作用(用幂等键或去重表)。
测试侧:用固定 fixture 覆盖黄金路径与故障注入(工具超时、部分返回、schema 漂移),避免只在「全成功」路径上验收。
---
name: agent-orchestration-flow
description: 设计多步智能体工作流与恢复策略
---
# 步骤
1. 分解目标与依赖图
2. 每步契约、人在环与幂等
3. 观测、预算与失败恢复
落地检查清单
勾选会在本机浏览器中记住(localStorage),便于分多次对照。