智能体编排

把「一个提示做完所有事」改成显式 DAG + 契约:每步有类型校验的输入输出,失败从断点恢复,并行缩短墙钟,人在环可插拔。本页给出 sequential / parallel / conditional 三种拓扑的对比代码、可直接复用的 JSON handoff 格式与循环依赖检测实现。

编排 = 显式阶段 + 契约 + 状态。满足以下任意一条即应引入编排层:≥2 个外部写操作、步骤间存在结构化数据依赖、需从特定步骤恢复(非整段重跑)、或单次 token 预算超过上下文容量 30%。简单问答和单工具查询仍用单轮提示——编排有成本,别过度设计。

何时需要编排

用 structured output 让 Orchestrator 输出 subtask 列表,而非在系统提示里用自然语言描述步骤——这样可在执行前做静态校验(类型、循环依赖、预算):

from pydantic import BaseModel
from typing import Literal

class SubTask(BaseModel):
    id: str                    # snake_case,全局唯一
    title: str                 # 动词开头,一句话说清
    depends_on: list[str]      # 前置 subtask id 列表(空列表=无依赖)
    tool: Literal["search", "code_exec", "write_file", "human_review"]
    timeout_seconds: int = 60
    max_retries: int = 3
    is_write_op: bool = False  # 写操作强制 idempotency_key 或人在环

class TaskPlan(BaseModel):
    goal: str
    subtasks: list[SubTask]
    max_total_steps: int = 20  # 硬上限,防止失控循环

# 触发编排的判断(满足任意一条):
# ✓ ≥2 个外部写操作(数据库/文件/第三方 API)
# ✓ 预估 token > 上下文容量 30%(需分步压缩上下文)
# ✓ 存在明确的 A→B 数据依赖(B 的输入是 A 的结构化字段)
# ✓ 失败后需从特定步骤恢复,而非整段重跑
# ✗ 单轮问答、单工具查询 → 直接用单轮提示,无需编排

plan = await llm.structured_output(
    f"将目标分解为 SubTask 列表:{goal}", schema=TaskPlan
)
反模式:为每个子任务复制完整系统提示,导致上下文膨胀与行为漂移。正确做法:Orchestrator 只保留全局策略,每步子代理只注入该步的输入 schema + 工具白名单。

拓扑与数据流

三种核心模式,根据依赖关系选择,可嵌套组合:

Sequential Chain(串行链)—— 强依赖、出错即中止
  A ──▶ B ──▶ C ──▶ D
  适用:A 的输出是 B 的必填输入;每步 schema 校验失败立即中止

Parallel Fan-out(并行扇出)—— 独立子任务,墙钟最短
            ┌──▶ B ──┐
  A ──▶ Fan ├──▶ C ──┤──▶ Merge ──▶ E
            └──▶ D ──┘
  墙钟 ≈ max(B,C,D);Merge 点做 schema + 业务不变量校验

Conditional Branch(条件分支)—— 运行时路由
  A ──▶ Router ──▶ confidence > 0.8 ──▶ B(自动执行)
                └──▶ else            ──▶ C(人工审核队列)

执行前用拓扑排序检测循环依赖,同批次无依赖的任务并行执行:

def topological_sort(subtasks: list[SubTask]) -> list[list[SubTask]]:
    """返回按依赖层级分组的执行批次;检测到循环依赖立即抛出"""
    graph = {t.id: set(t.depends_on) for t in subtasks}
    id_to_task = {t.id: t for t in subtasks}
    batches = []
    while graph:
        ready = [id for id, deps in graph.items() if not deps]
        if not ready:
            raise ValueError(f"循环依赖,涉及节点: {list(graph.keys())}")
        batches.append([id_to_task[id] for id in ready])
        for id in ready:
            del graph[id]
        for deps in graph.values():
            deps -= set(ready)
    return batches  # [[step1], [step2a, step2b], [step3], ...]

import asyncio

async def run_plan(plan: TaskPlan) -> dict:
    batches = topological_sort(plan.subtasks)
    results = {}
    for batch in batches:
        coros = [execute_subtask(t, {d: results[d] for d in t.depends_on})
                 for t in batch]
        batch_out = await asyncio.gather(*coros, return_exceptions=True)
        for t, r in zip(batch, batch_out):
            if isinstance(r, Exception):
                raise RuntimeError(f"step {t.id} failed: {r}")
            results[t.id] = r
    return results

子代理与交接

子代理间通过结构化 JSON handoff 传递上下文,避免自然语言长复述导致信息失真。以下是可直接复用的 Handoff 载荷格式,含幂等键、失败原因与重试计数:

{
  "run_id":         "run_20240115T103000Z_abc123",
  "step_id":        "step_02_code_review",
  "parent_step_id": "step_01_code_fetch",
  "payload": {
    "files": [
      { "path": "src/auth.py", "content_hash": "sha256:aef9...", "diff": "..." }
    ],
    "context": { "pr_number": 1234, "author": "alice", "base_branch": "main" }
  },
  "payload_checksum":  "sha256:def0...",
  "idempotency_key":   "run_abc123:step_02",
  "last_error":        null,
  "retry_count":       0,
  "max_retries":       3,
  "timeout_ms":        120000,
  "created_at":        "2024-01-15T10:30:05Z"
}
  • 子代理只产出 patch 或建议,由 Orchestrator 统一执行写操作,避免多代理无锁写同一资源。
  • 并行分支在合并点用 payload_checksum 验证数据完整性,再进入副作用步骤。
  • 接收方校验 payload_checksum;不匹配时拒绝执行并记录 last_error

可观测、成本与幂等

每步记录 trace id、耗时、token、工具错误码与重试次数,便于回放拓扑与压测。对检索与长上下文步骤设预算与早停;编排整体应幂等:同一 run_id 重复执行不产生重复副作用(用幂等键或去重表)。

测试侧:用固定 fixture 覆盖黄金路径与故障注入(工具超时、部分返回、schema 漂移),避免只在「全成功」路径上验收。

---
name: agent-orchestration-flow
description: 设计多步智能体工作流与恢复策略
---
# 步骤
1. 分解目标与依赖图
2. 每步契约、人在环与幂等
3. 观测、预算与失败恢复

落地检查清单

勾选会在本机浏览器中记住(localStorage),便于分多次对照。

返回技能库 更多技能入口