Building Async Agent Pipelines: Chaining Asynchronous Steps with Dependencies
Design and implement async agent pipelines that chain dependent steps, resolve execution order automatically, and handle backpressure in production AI systems.
Why Pipelines Matter for AI Agents
Real-world AI agents rarely execute a single LLM call. They run multi-step workflows: retrieve context, classify intent, generate a response, validate the output, then format it. Some steps depend on others. Some can run in parallel. A well-designed async pipeline maximizes concurrency while respecting dependencies.
The key insight is modeling your agent workflow as a directed acyclic graph (DAG) of steps, where edges represent data dependencies. Steps without mutual dependencies execute concurrently. Steps that consume another step's output wait automatically.
Simple Linear Pipeline
The simplest pipeline chains steps sequentially, each consuming the previous step's output.
import asyncio
from dataclasses import dataclass
from typing import Any
@dataclass
class PipelineContext:
"""Shared context flowing through the pipeline."""
user_query: str
retrieved_docs: list[str] | None = None
intent: str | None = None
response: str | None = None
validated: bool = False
async def retrieve_context(ctx: PipelineContext) -> PipelineContext:
"""Step 1: Retrieve relevant documents."""
print("Retrieving context...")
await asyncio.sleep(0.5) # Simulate vector search
ctx.retrieved_docs = [
"Doc about async Python patterns",
"Doc about agent architectures",
]
return ctx
async def classify_intent(ctx: PipelineContext) -> PipelineContext:
"""Step 2: Classify user intent."""
print("Classifying intent...")
await asyncio.sleep(0.3) # Simulate LLM call
ctx.intent = "technical_question"
return ctx
async def generate_response(ctx: PipelineContext) -> PipelineContext:
"""Step 3: Generate response using context and intent."""
print(f"Generating response for intent: {ctx.intent}")
await asyncio.sleep(1.0) # Simulate LLM call
ctx.response = f"Based on {len(ctx.retrieved_docs)} docs: ..."
return ctx
async def validate_output(ctx: PipelineContext) -> PipelineContext:
"""Step 4: Validate the response."""
print("Validating output...")
await asyncio.sleep(0.2) # Simulate validation
ctx.validated = True
return ctx
async def run_linear_pipeline(query: str) -> PipelineContext:
ctx = PipelineContext(user_query=query)
ctx = await retrieve_context(ctx)
ctx = await classify_intent(ctx)
ctx = await generate_response(ctx)
ctx = await validate_output(ctx)
return ctx
Diamond Pipeline: Parallel Steps with Shared Dependencies
In many agent workflows, some steps are independent and can run concurrently. A diamond pattern runs parallel branches that converge at a later step.
async def run_diamond_pipeline(query: str) -> PipelineContext:
"""
Pipeline with parallel branches:
retrieve_context
/ \
classify_intent extract_entities
\ /
generate_response
|
validate_output
"""
ctx = PipelineContext(user_query=query)
# Step 1: Retrieve context (both branches depend on this)
ctx = await retrieve_context(ctx)
# Step 2: Run classification and entity extraction in parallel
async def classify(ctx):
await asyncio.sleep(0.3)
ctx.intent = "technical_question"
return ctx
async def extract_entities(ctx):
await asyncio.sleep(0.4)
ctx.entities = ["asyncio", "Python", "pipelines"]
return ctx
await asyncio.gather(classify(ctx), extract_entities(ctx))
# Step 3: Generate response (depends on both parallel steps)
ctx = await generate_response(ctx)
# Step 4: Validate
ctx = await validate_output(ctx)
return ctx
Generic DAG-Based Pipeline Engine
For complex agents, build a reusable pipeline engine that resolves dependencies automatically.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from collections import defaultdict
class AsyncPipeline:
"""DAG-based async pipeline with automatic dependency resolution."""
def __init__(self):
self.steps: dict[str, Any] = {}
self.dependencies: dict[str, list[str]] = defaultdict(list)
def add_step(self, name: str, func, depends_on: list[str] = None):
"""Register a pipeline step with its dependencies."""
self.steps[name] = func
if depends_on:
self.dependencies[name] = depends_on
async def execute(self, ctx: dict) -> dict:
"""Execute the pipeline respecting dependency order."""
completed: dict[str, asyncio.Event] = {
name: asyncio.Event() for name in self.steps
}
results: dict[str, Any] = {}
async def run_step(name: str):
# Wait for all dependencies to complete
for dep in self.dependencies[name]:
await completed[dep].wait()
# Execute the step
result = await self.steps[name](ctx, results)
results[name] = result
completed[name].set()
# Launch all steps concurrently — they self-schedule
# via dependency waits
await asyncio.gather(
*[run_step(name) for name in self.steps]
)
return results
# Usage
pipeline = AsyncPipeline()
pipeline.add_step("retrieve", retrieve_docs_step)
pipeline.add_step("classify", classify_step, depends_on=["retrieve"])
pipeline.add_step("entities", extract_entities_step, depends_on=["retrieve"])
pipeline.add_step("generate", generate_step,
depends_on=["classify", "entities"])
pipeline.add_step("validate", validate_step, depends_on=["generate"])
results = asyncio.run(pipeline.execute({"query": "How does asyncio work?"}))
Backpressure: Preventing Pipeline Overload
When a fast producer feeds a slow consumer, work accumulates unboundedly. Use asyncio.Queue with a max size to apply backpressure.
async def pipeline_with_backpressure(
queries: list[str],
max_in_flight: int = 10,
):
"""Pipeline with bounded queues for backpressure."""
retrieval_queue = asyncio.Queue(maxsize=max_in_flight)
generation_queue = asyncio.Queue(maxsize=max_in_flight)
results = []
async def retriever():
while True:
query = await retrieval_queue.get()
if query is None:
await generation_queue.put(None)
break
docs = await fetch_documents(query)
await generation_queue.put((query, docs))
retrieval_queue.task_done()
async def generator():
while True:
item = await generation_queue.get()
if item is None:
break
query, docs = item
response = await generate_llm_response(query, docs)
results.append(response)
generation_queue.task_done()
# Start workers
ret_task = asyncio.create_task(retriever())
gen_task = asyncio.create_task(generator())
# Feed queries — this blocks when queue is full
for query in queries:
await retrieval_queue.put(query)
await retrieval_queue.put(None) # Sentinel
await asyncio.gather(ret_task, gen_task)
return results
The maxsize parameter on the queue ensures the producer blocks when the consumer cannot keep up, preventing unbounded memory growth.
FAQ
How do I handle a step failure in a multi-step pipeline?
Wrap each step in a try/except and decide on a failure strategy: skip the step and continue with degraded results, retry the step with backoff, or abort the entire pipeline. The DAG engine approach makes this clean — each step checks its dependencies' results and can short-circuit if a required dependency failed.
Can I add conditional branching to async pipelines?
Yes. In the DAG engine, make the step function inspect the context or prior results and return early if the branch should not execute. For example, a moderation step might set a flag that causes the generation step to return a canned safety response instead of calling the LLM.
What is the performance overhead of the DAG pipeline approach compared to hand-coded async?
The overhead is negligible. The asyncio.Event-based synchronization adds microseconds of latency per step. The dominant cost is always the LLM API calls, which take hundreds of milliseconds. The DAG approach pays for itself in maintainability and correctness.
#Python #Asyncio #Pipeline #AIAgents #Architecture #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.