Agent orchestration

Replace "do everything in one chat" with an explicit DAG + contracts: each step has typed inputs/outputs, failures resume from a breakpoint, parallelism reduces wall-clock time, and human-in-the-loop gates are pluggable. This page provides comparison code for sequential / parallel / conditional topologies, a reusable JSON handoff format, and a cycle-detection implementation.

Orchestration = explicit stages + contracts + state. Add an orchestration layer when any of these conditions is met: ≥2 external write operations, structural data dependencies between steps (B's input is A's structured output), a need to resume from a specific step on failure (not re-run the whole sequence), or a single token budget exceeding 30% of the context cap. Simple Q&A and single-tool queries stay single-turn—orchestration has a cost, don't over-engineer.

When orchestration helps

Use structured output to have the Orchestrator emit a subtask list rather than describing steps in natural language inside the system prompt—this allows static validation (types, cycle detection, budget) before execution:

from pydantic import BaseModel
from typing import Literal

class SubTask(BaseModel):
    id: str                    # snake_case, globally unique
    title: str                 # verb-first, one sentence
    depends_on: list[str]      # list of prerequisite subtask ids (empty = no deps)
    tool: Literal["search", "code_exec", "write_file", "human_review"]
    timeout_seconds: int = 60
    max_retries: int = 3
    is_write_op: bool = False  # write ops require idempotency_key or human-in-loop

class TaskPlan(BaseModel):
    goal: str
    subtasks: list[SubTask]
    max_total_steps: int = 20  # hard cap to prevent runaway loops

# Triggers for adding an orchestration layer (any one condition is sufficient):
# ✓ ≥2 external write operations (DB / file / third-party API)
# ✓ Estimated tokens > 30% of context cap (needs per-step context compression)
# ✓ Clear A→B data dependency (B's input is a structured field from A)
# ✓ Failures must resume from a specific step, not rerun from scratch
# ✗ Single-turn Q&A, single-tool query → use a single-turn prompt instead

plan = await llm.structured_output(
    f"Decompose the goal into a SubTask list: {goal}", schema=TaskPlan
)
Anti-pattern: duplicating the full system prompt per subtask balloons context and drifts behavior. Keep global policy in the orchestrator; inject only the per-step input schema + tool allowlist into each sub-agent.

Topology and data flow

Three core patterns—choose based on dependencies; they can be nested and combined:

Sequential Chain — strong dependency, abort on error
  A ──▶ B ──▶ C ──▶ D
  Use when: A's output is a required input for B; fail immediately on schema validation failure

Parallel Fan-out — independent subtasks, shortest wall-clock time
            ┌──▶ B ──┐
  A ──▶ Fan ├──▶ C ──┤──▶ Merge ──▶ E
            └──▶ D ──┘
  Wall-clock ≈ max(B,C,D); Merge point runs schema + business invariant checks

Conditional Branch — runtime routing
  A ──▶ Router ──▶ confidence > 0.8 ──▶ B (auto-execute)
                └──▶ else            ──▶ C (human review queue)

Run a topological sort to detect cycle dependencies before execution; schedule tasks with no dependencies in the same batch in parallel:

def topological_sort(subtasks: list[SubTask]) -> list[list[SubTask]]:
    """Return execution batches grouped by dependency level; raise on cycles."""
    graph = {t.id: set(t.depends_on) for t in subtasks}
    id_to_task = {t.id: t for t in subtasks}
    batches = []
    while graph:
        ready = [id for id, deps in graph.items() if not deps]
        if not ready:
            raise ValueError(f"Circular dependency detected among: {list(graph.keys())}")
        batches.append([id_to_task[id] for id in ready])
        for id in ready:
            del graph[id]
        for deps in graph.values():
            deps -= set(ready)
    return batches  # [[step1], [step2a, step2b], [step3], ...]

import asyncio

async def run_plan(plan: TaskPlan) -> dict:
    batches = topological_sort(plan.subtasks)
    results = {}
    for batch in batches:
        coros = [execute_subtask(t, {d: results[d] for d in t.depends_on})
                 for t in batch]
        batch_out = await asyncio.gather(*coros, return_exceptions=True)
        for t, r in zip(batch, batch_out):
            if isinstance(r, Exception):
                raise RuntimeError(f"step {t.id} failed: {r}")
            results[t.id] = r
    return results

Sub-agents and handoff

Sub-agents pass context via structured JSON handoffs instead of long natural-language recaps—this prevents information distortion. Below is a reusable handoff payload format including idempotency key, failure reason, and retry count:

{
  "run_id":         "run_20240115T103000Z_abc123",
  "step_id":        "step_02_code_review",
  "parent_step_id": "step_01_code_fetch",
  "payload": {
    "files": [
      { "path": "src/auth.py", "content_hash": "sha256:aef9...", "diff": "..." }
    ],
    "context": { "pr_number": 1234, "author": "alice", "base_branch": "main" }
  },
  "payload_checksum":  "sha256:def0...",
  "idempotency_key":   "run_abc123:step_02",
  "last_error":        null,
  "retry_count":       0,
  "max_retries":       3,
  "timeout_ms":        120000,
  "created_at":        "2024-01-15T10:30:05Z"
}
  • Sub-agents only emit patches or proposals; the Orchestrator performs write operations centrally to avoid uncoordinated writes to the same resource.
  • Parallel branches verify payload_checksum at the merge point before entering any side-effect step.
  • The receiver validates payload_checksum; on mismatch, refuse execution and log to last_error.

Observability, cost, and idempotency

Per step, log trace id, duration, tokens, tool error codes, and retry counts to replay topology and load-test. Budget retrieval and long-context steps with early stop; the overall orchestration should be idempotent—replaying the same run_id must not duplicate side effects (idempotency keys or dedupe tables).

Testing: cover golden paths and fault injection (tool timeout, partial responses, schema drift), not only the all-success path.

---
name: agent-orchestration-flow
description: Design multi-step agent workflows and recovery
triggers:
  - pattern: "needs? (agent )?orchestrat"
  - pattern: "multi.?step (agent|workflow|pipeline)"
  - pattern: "parallel (sub)?tasks?"
steps:
  - id: decompose
    action: Decompose goal using structured output; produce SubTask list with deps
    output: TaskPlan with topological ordering
  - id: validate
    action: Run topological sort; fail fast on cycles; check token budget
  - id: execute
    action: Run each batch with asyncio.gather; write results to state store
  - id: handoff
    action: Pass structured JSON handoff with checksum; sub-agents emit patches only
  - id: observe
    action: Log trace_id, tokens, errors per step; assert idempotency on retry
constraints:
  - Do NOT duplicate the system prompt per subtask
  - Write operations must go through the orchestrator, not sub-agents directly
  - Single-turn Q&A stays single-turn; orchestration only when criteria are met

Implementation checklist

Selections persist in this browser (localStorage) so you can work through the list over multiple sessions.

Back to skills More skills