🎭 Beyond Single Agents
Single-purpose AI agents are useful, but the real power emerges when multiple agents collaborate. Multi-agent orchestration is one of the most exciting — and most challenging — patterns in enterprise AI today.
I’ve helped teams implement several orchestration patterns. Here’s what works in production.
The Core Patterns
1. Supervisor Pattern (Hierarchical)
A central supervisor agent receives tasks and delegates to specialized workers:
class SupervisorAgent:
def __init__(self):
self.workers = {
"infrastructure": InfraAgent(tools=["ansible", "terraform", "kubectl"]),
"data": DataAgent(tools=["sql", "api_client", "spark"]),
"security": SecurityAgent(tools=["scanner", "policy_engine"]),
}
async def handle(self, task: str) -> str:
# Classify and route
classification = await self.classify(task)
worker = self.workers[classification.domain]
# Delegate with context
result = await worker.execute(
task=task,
context=classification.context,
max_steps=15,
timeout=300,
)
# Validate result before returning
if not await self.validate(result):
return await self.handle_failure(task, result)
return result
Best for: Well-defined domains with clear boundaries. Most enterprises should start here.
Pitfall: The supervisor becomes a bottleneck. Keep classification logic simple — if you need an LLM call just to route, your domains aren’t well-defined enough.
2. Pipeline Pattern (Sequential)
Agents process tasks in sequence, each enriching the output:
Intake Agent → Analysis Agent → Action Agent → Review Agent
↓ ↓ ↓ ↓
"Parse and "Diagnose "Generate "Validate
categorize root cause remediation and approve
the alert" analysis" plan" actions"
# Kubernetes implementation using Argo Workflows
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: incident-pipeline
spec:
entrypoint: incident-response
templates:
- name: incident-response
dag:
tasks:
- name: intake
template: agent-step
arguments:
parameters:
- name: agent-type
value: "intake"
- name: analysis
template: agent-step
dependencies: [intake]
arguments:
parameters:
- name: agent-type
value: "analysis"
- name: remediation
template: agent-step
dependencies: [analysis]
arguments:
parameters:
- name: agent-type
value: "action"
Best for: Processes with clear stages where each step needs different expertise.
3. Debate Pattern (Adversarial)
Two or more agents analyze the same problem independently, then a judge agent synthesizes the best answer:
async def debate_analysis(problem: str) -> str:
# Get independent analyses
analysis_a = await agent_a.analyze(problem)
analysis_b = await agent_b.analyze(problem)
# Judge synthesizes
result = await judge.synthesize(
problem=problem,
perspectives=[analysis_a, analysis_b],
criteria="accuracy, completeness, actionability",
)
return result
Best for: High-stakes decisions where errors are costly (security assessments, architecture reviews, compliance checks).
4. Swarm Pattern (Emergent)
Multiple identical agents work on sub-tasks in parallel, with a coordinator aggregating results:
async def swarm_analyze(targets: list[str]) -> dict:
tasks = [
agent_pool.submit(
task=f"Analyze security posture of {target}",
timeout=120,
)
for target in targets
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return coordinator.aggregate(
results=[r for r in results if not isinstance(r, Exception)],
failures=[r for r in results if isinstance(r, Exception)],
)
Best for: Embarrassingly parallel tasks — scanning multiple systems, analyzing multiple documents, testing multiple configurations.
🔌 Inter-Agent Communication
Agents need to share context. Here are the three approaches I’ve seen work:
Shared Memory (Redis)
# Agent A writes context
await redis.hset(f"task:{task_id}", "finding", json.dumps(finding))
# Agent B reads context
context = json.loads(await redis.hget(f"task:{task_id}", "finding"))
Message Queue (NATS/Kafka)
Better for loose coupling and audit trails:
# Agent publishes
await nats.publish(f"agent.findings.{task_id}", finding.to_json())
# Downstream agent subscribes
async for msg in nats.subscribe("agent.findings.*"):
await process_finding(msg)
Direct Handoff (Function Calls)
Simplest for tightly coupled pipelines. The supervisor passes output from one agent as input to the next.
⚙️ Production Considerations
Resource Management
Each agent type may need different resources. Use Kubernetes node affinity:
# GPU agents for inference
nodeSelector:
node-type: gpu
# CPU agents for tool execution
nodeSelector:
node-type: compute
Failure Handling
Multi-agent systems have more failure modes. Implement:
- Per-agent timeouts: Don’t let one slow agent block the pipeline
- Dead letter queues: Failed tasks go to DLQ for human review
- Graceful degradation: If the security agent is down, the pipeline continues with a flag for manual security review
- Idempotent operations: Agents may retry; ensure tool actions are safe to repeat
Cost Tracking
Track token usage per agent, per task:
@trace_cost
async def agent_step(agent, task):
result = await agent.execute(task)
metrics.record(
agent=agent.name,
tokens_in=result.usage.input_tokens,
tokens_out=result.usage.output_tokens,
model=agent.model,
cost=calculate_cost(result.usage),
)
return result
Choosing the Right Pattern
| Pattern | Complexity | Latency | Use Case |
|---|
| Supervisor | Low | Medium | Most enterprise workflows |
| Pipeline | Medium | High | Multi-stage processes |
| Debate | Medium | High | High-stakes decisions |
| Swarm | High | Low | Parallel processing |
My recommendation: Start with the Supervisor pattern. It’s the simplest to debug, monitor, and explain to stakeholders. Move to more complex patterns only when you have evidence that they’d improve outcomes.
Getting Started
- Map your workflow — draw the current human process before automating it
- Identify natural agent boundaries — where does expertise or tooling change?
- Start with 2 agents — a supervisor and one worker. Add agents incrementally
- Instrument everything — you can’t improve what you can’t measure
Building a multi-agent system? I help teams design agent architectures that actually work in production. Let’s talk.