Agent orchestration
Replace "do everything in one chat" with an explicit DAG + contracts: each step has typed inputs/outputs, failures resume from a breakpoint, parallelism reduces wall-clock time, and human-in-the-loop gates are pluggable. This page provides comparison code for sequential / parallel / conditional topologies, a reusable JSON handoff format, and a cycle-detection implementation.
Orchestration = explicit stages + contracts + state. Add an orchestration layer when any of these conditions is met: ≥2 external write operations, structural data dependencies between steps (B's input is A's structured output), a need to resume from a specific step on failure (not re-run the whole sequence), or a single token budget exceeding 30% of the context cap. Simple Q&A and single-tool queries stay single-turn—orchestration has a cost, don't over-engineer.
When orchestration helps
Use structured output to have the Orchestrator emit a subtask list rather than describing steps in natural language inside the system prompt—this allows static validation (types, cycle detection, budget) before execution:
from pydantic import BaseModel
from typing import Literal
class SubTask(BaseModel):
id: str # snake_case, globally unique
title: str # verb-first, one sentence
depends_on: list[str] # list of prerequisite subtask ids (empty = no deps)
tool: Literal["search", "code_exec", "write_file", "human_review"]
timeout_seconds: int = 60
max_retries: int = 3
is_write_op: bool = False # write ops require idempotency_key or human-in-loop
class TaskPlan(BaseModel):
goal: str
subtasks: list[SubTask]
max_total_steps: int = 20 # hard cap to prevent runaway loops
# Triggers for adding an orchestration layer (any one condition is sufficient):
# ✓ ≥2 external write operations (DB / file / third-party API)
# ✓ Estimated tokens > 30% of context cap (needs per-step context compression)
# ✓ Clear A→B data dependency (B's input is a structured field from A)
# ✓ Failures must resume from a specific step, not rerun from scratch
# ✗ Single-turn Q&A, single-tool query → use a single-turn prompt instead
plan = await llm.structured_output(
f"Decompose the goal into a SubTask list: {goal}", schema=TaskPlan
)
Topology and data flow
Three core patterns—choose based on dependencies; they can be nested and combined:
Sequential Chain — strong dependency, abort on error
A ──▶ B ──▶ C ──▶ D
Use when: A's output is a required input for B; fail immediately on schema validation failure
Parallel Fan-out — independent subtasks, shortest wall-clock time
┌──▶ B ──┐
A ──▶ Fan ├──▶ C ──┤──▶ Merge ──▶ E
└──▶ D ──┘
Wall-clock ≈ max(B,C,D); Merge point runs schema + business invariant checks
Conditional Branch — runtime routing
A ──▶ Router ──▶ confidence > 0.8 ──▶ B (auto-execute)
└──▶ else ──▶ C (human review queue)
Run a topological sort to detect cycle dependencies before execution; schedule tasks with no dependencies in the same batch in parallel:
def topological_sort(subtasks: list[SubTask]) -> list[list[SubTask]]:
"""Return execution batches grouped by dependency level; raise on cycles."""
graph = {t.id: set(t.depends_on) for t in subtasks}
id_to_task = {t.id: t for t in subtasks}
batches = []
while graph:
ready = [id for id, deps in graph.items() if not deps]
if not ready:
raise ValueError(f"Circular dependency detected among: {list(graph.keys())}")
batches.append([id_to_task[id] for id in ready])
for id in ready:
del graph[id]
for deps in graph.values():
deps -= set(ready)
return batches # [[step1], [step2a, step2b], [step3], ...]
import asyncio
async def run_plan(plan: TaskPlan) -> dict:
batches = topological_sort(plan.subtasks)
results = {}
for batch in batches:
coros = [execute_subtask(t, {d: results[d] for d in t.depends_on})
for t in batch]
batch_out = await asyncio.gather(*coros, return_exceptions=True)
for t, r in zip(batch, batch_out):
if isinstance(r, Exception):
raise RuntimeError(f"step {t.id} failed: {r}")
results[t.id] = r
return results
Sub-agents and handoff
Sub-agents pass context via structured JSON handoffs instead of long natural-language recaps—this prevents information distortion. Below is a reusable handoff payload format including idempotency key, failure reason, and retry count:
{
"run_id": "run_20240115T103000Z_abc123",
"step_id": "step_02_code_review",
"parent_step_id": "step_01_code_fetch",
"payload": {
"files": [
{ "path": "src/auth.py", "content_hash": "sha256:aef9...", "diff": "..." }
],
"context": { "pr_number": 1234, "author": "alice", "base_branch": "main" }
},
"payload_checksum": "sha256:def0...",
"idempotency_key": "run_abc123:step_02",
"last_error": null,
"retry_count": 0,
"max_retries": 3,
"timeout_ms": 120000,
"created_at": "2024-01-15T10:30:05Z"
}
- Sub-agents only emit patches or proposals; the Orchestrator performs write operations centrally to avoid uncoordinated writes to the same resource.
- Parallel branches verify
payload_checksumat the merge point before entering any side-effect step. - The receiver validates
payload_checksum; on mismatch, refuse execution and log tolast_error.
Observability, cost, and idempotency
Per step, log trace id, duration, tokens, tool error codes, and retry counts to replay topology and load-test. Budget retrieval and long-context steps with early stop; the overall orchestration should be idempotent—replaying the same run_id must not duplicate side effects (idempotency keys or dedupe tables).
Testing: cover golden paths and fault injection (tool timeout, partial responses, schema drift), not only the all-success path.
---
name: agent-orchestration-flow
description: Design multi-step agent workflows and recovery
triggers:
- pattern: "needs? (agent )?orchestrat"
- pattern: "multi.?step (agent|workflow|pipeline)"
- pattern: "parallel (sub)?tasks?"
steps:
- id: decompose
action: Decompose goal using structured output; produce SubTask list with deps
output: TaskPlan with topological ordering
- id: validate
action: Run topological sort; fail fast on cycles; check token budget
- id: execute
action: Run each batch with asyncio.gather; write results to state store
- id: handoff
action: Pass structured JSON handoff with checksum; sub-agents emit patches only
- id: observe
action: Log trace_id, tokens, errors per step; assert idempotency on retry
constraints:
- Do NOT duplicate the system prompt per subtask
- Write operations must go through the orchestrator, not sub-agents directly
- Single-turn Q&A stays single-turn; orchestration only when criteria are met
Implementation checklist
Selections persist in this browser (localStorage) so you can work through the list over multiple sessions.