Skip to main content
🎤 Speaking at KubeCon EU 2026 Lessons Learned Orchestrating Multi-Tenant GPUs on OpenShift AI View Session
🎤 Speaking at Red Hat Summit 2026 GPUs take flight: Safety-first multi-tenant Platform Engineering with NVIDIA and OpenShift AI Learn More
AI

Multi-Agent Orchestration Patterns for Production Systems

Luca Berton 2 min read
#ai#agents#orchestration#architecture#production

🎭 Beyond Single Agents

Single-purpose AI agents are useful, but the real power emerges when multiple agents collaborate. Multi-agent orchestration is one of the most exciting — and most challenging — patterns in enterprise AI today.

I’ve helped teams implement several orchestration patterns. Here’s what works in production.

The Core Patterns

1. Supervisor Pattern (Hierarchical)

A central supervisor agent receives tasks and delegates to specialized workers:

class SupervisorAgent:
    def __init__(self):
        self.workers = {
            "infrastructure": InfraAgent(tools=["ansible", "terraform", "kubectl"]),
            "data": DataAgent(tools=["sql", "api_client", "spark"]),
            "security": SecurityAgent(tools=["scanner", "policy_engine"]),
        }
    
    async def handle(self, task: str) -> str:
        # Classify and route
        classification = await self.classify(task)
        worker = self.workers[classification.domain]
        
        # Delegate with context
        result = await worker.execute(
            task=task,
            context=classification.context,
            max_steps=15,
            timeout=300,
        )
        
        # Validate result before returning
        if not await self.validate(result):
            return await self.handle_failure(task, result)
        
        return result

Best for: Well-defined domains with clear boundaries. Most enterprises should start here.

Pitfall: The supervisor becomes a bottleneck. Keep classification logic simple — if you need an LLM call just to route, your domains aren’t well-defined enough.

2. Pipeline Pattern (Sequential)

Agents process tasks in sequence, each enriching the output:

Intake Agent → Analysis Agent → Action Agent → Review Agent
     ↓              ↓              ↓              ↓
  "Parse and     "Diagnose     "Generate       "Validate
   categorize     root cause    remediation     and approve
   the alert"     analysis"     plan"           actions"
# Kubernetes implementation using Argo Workflows
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: incident-pipeline
spec:
  entrypoint: incident-response
  templates:
  - name: incident-response
    dag:
      tasks:
      - name: intake
        template: agent-step
        arguments:
          parameters:
          - name: agent-type
            value: "intake"
      - name: analysis
        template: agent-step
        dependencies: [intake]
        arguments:
          parameters:
          - name: agent-type
            value: "analysis"
      - name: remediation
        template: agent-step
        dependencies: [analysis]
        arguments:
          parameters:
          - name: agent-type
            value: "action"

Best for: Processes with clear stages where each step needs different expertise.

3. Debate Pattern (Adversarial)

Two or more agents analyze the same problem independently, then a judge agent synthesizes the best answer:

async def debate_analysis(problem: str) -> str:
    # Get independent analyses
    analysis_a = await agent_a.analyze(problem)
    analysis_b = await agent_b.analyze(problem)
    
    # Judge synthesizes
    result = await judge.synthesize(
        problem=problem,
        perspectives=[analysis_a, analysis_b],
        criteria="accuracy, completeness, actionability",
    )
    
    return result

Best for: High-stakes decisions where errors are costly (security assessments, architecture reviews, compliance checks).

4. Swarm Pattern (Emergent)

Multiple identical agents work on sub-tasks in parallel, with a coordinator aggregating results:

async def swarm_analyze(targets: list[str]) -> dict:
    tasks = [
        agent_pool.submit(
            task=f"Analyze security posture of {target}",
            timeout=120,
        )
        for target in targets
    ]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    return coordinator.aggregate(
        results=[r for r in results if not isinstance(r, Exception)],
        failures=[r for r in results if isinstance(r, Exception)],
    )

Best for: Embarrassingly parallel tasks — scanning multiple systems, analyzing multiple documents, testing multiple configurations.

🔌 Inter-Agent Communication

Agents need to share context. Here are the three approaches I’ve seen work:

Shared Memory (Redis)

# Agent A writes context
await redis.hset(f"task:{task_id}", "finding", json.dumps(finding))

# Agent B reads context
context = json.loads(await redis.hget(f"task:{task_id}", "finding"))

Message Queue (NATS/Kafka)

Better for loose coupling and audit trails:

# Agent publishes
await nats.publish(f"agent.findings.{task_id}", finding.to_json())

# Downstream agent subscribes
async for msg in nats.subscribe("agent.findings.*"):
    await process_finding(msg)

Direct Handoff (Function Calls)

Simplest for tightly coupled pipelines. The supervisor passes output from one agent as input to the next.

⚙️ Production Considerations

Resource Management

Each agent type may need different resources. Use Kubernetes node affinity:

# GPU agents for inference
nodeSelector:
  node-type: gpu
  
# CPU agents for tool execution
nodeSelector:
  node-type: compute

Failure Handling

Multi-agent systems have more failure modes. Implement:

  • Per-agent timeouts: Don’t let one slow agent block the pipeline
  • Dead letter queues: Failed tasks go to DLQ for human review
  • Graceful degradation: If the security agent is down, the pipeline continues with a flag for manual security review
  • Idempotent operations: Agents may retry; ensure tool actions are safe to repeat

Cost Tracking

Track token usage per agent, per task:

@trace_cost
async def agent_step(agent, task):
    result = await agent.execute(task)
    metrics.record(
        agent=agent.name,
        tokens_in=result.usage.input_tokens,
        tokens_out=result.usage.output_tokens,
        model=agent.model,
        cost=calculate_cost(result.usage),
    )
    return result

Choosing the Right Pattern

PatternComplexityLatencyUse Case
SupervisorLowMediumMost enterprise workflows
PipelineMediumHighMulti-stage processes
DebateMediumHighHigh-stakes decisions
SwarmHighLowParallel processing

My recommendation: Start with the Supervisor pattern. It’s the simplest to debug, monitor, and explain to stakeholders. Move to more complex patterns only when you have evidence that they’d improve outcomes.

Getting Started

  1. Map your workflow — draw the current human process before automating it
  2. Identify natural agent boundaries — where does expertise or tooling change?
  3. Start with 2 agents — a supervisor and one worker. Add agents incrementally
  4. Instrument everything — you can’t improve what you can’t measure

Building a multi-agent system? I help teams design agent architectures that actually work in production. Let’s talk.

Share:

Luca Berton

AI & Cloud Advisor with 18+ years experience. Author of 8 technical books, creator of Ansible Pilot, and instructor at CopyPasteLearn Academy. Speaker at KubeCon EU & Red Hat Summit 2026.

Luca Berton Ansible Pilot Ansible by Example Open Empower K8s Recipes Terraform Pilot CopyPasteLearn ProteinLens TechMeOut