Skip to content
Learn Agentic AI14 min read0 views

Building Async Agent Pipelines: Chaining Asynchronous Steps with Dependencies

Design and implement async agent pipelines that chain dependent steps, resolve execution order automatically, and handle backpressure in production AI systems.

Why Pipelines Matter for AI Agents

Real-world AI agents rarely execute a single LLM call. They run multi-step workflows: retrieve context, classify intent, generate a response, validate the output, then format it. Some steps depend on others. Some can run in parallel. A well-designed async pipeline maximizes concurrency while respecting dependencies.

The key insight is modeling your agent workflow as a directed acyclic graph (DAG) of steps, where edges represent data dependencies. Steps without mutual dependencies execute concurrently. Steps that consume another step's output wait automatically.

Simple Linear Pipeline

The simplest pipeline chains steps sequentially, each consuming the previous step's output.

import asyncio
from dataclasses import dataclass
from typing import Any

@dataclass
class PipelineContext:
    """Shared context flowing through the pipeline."""
    user_query: str
    retrieved_docs: list[str] | None = None
    intent: str | None = None
    response: str | None = None
    validated: bool = False

async def retrieve_context(ctx: PipelineContext) -> PipelineContext:
    """Step 1: Retrieve relevant documents."""
    print("Retrieving context...")
    await asyncio.sleep(0.5)  # Simulate vector search
    ctx.retrieved_docs = [
        "Doc about async Python patterns",
        "Doc about agent architectures",
    ]
    return ctx

async def classify_intent(ctx: PipelineContext) -> PipelineContext:
    """Step 2: Classify user intent."""
    print("Classifying intent...")
    await asyncio.sleep(0.3)  # Simulate LLM call
    ctx.intent = "technical_question"
    return ctx

async def generate_response(ctx: PipelineContext) -> PipelineContext:
    """Step 3: Generate response using context and intent."""
    print(f"Generating response for intent: {ctx.intent}")
    await asyncio.sleep(1.0)  # Simulate LLM call
    ctx.response = f"Based on {len(ctx.retrieved_docs)} docs: ..."
    return ctx

async def validate_output(ctx: PipelineContext) -> PipelineContext:
    """Step 4: Validate the response."""
    print("Validating output...")
    await asyncio.sleep(0.2)  # Simulate validation
    ctx.validated = True
    return ctx

async def run_linear_pipeline(query: str) -> PipelineContext:
    ctx = PipelineContext(user_query=query)
    ctx = await retrieve_context(ctx)
    ctx = await classify_intent(ctx)
    ctx = await generate_response(ctx)
    ctx = await validate_output(ctx)
    return ctx

Diamond Pipeline: Parallel Steps with Shared Dependencies

In many agent workflows, some steps are independent and can run concurrently. A diamond pattern runs parallel branches that converge at a later step.

async def run_diamond_pipeline(query: str) -> PipelineContext:
    """
    Pipeline with parallel branches:

        retrieve_context
           /        \
    classify_intent  extract_entities
           \        /
        generate_response
              |
        validate_output
    """
    ctx = PipelineContext(user_query=query)

    # Step 1: Retrieve context (both branches depend on this)
    ctx = await retrieve_context(ctx)

    # Step 2: Run classification and entity extraction in parallel
    async def classify(ctx):
        await asyncio.sleep(0.3)
        ctx.intent = "technical_question"
        return ctx

    async def extract_entities(ctx):
        await asyncio.sleep(0.4)
        ctx.entities = ["asyncio", "Python", "pipelines"]
        return ctx

    await asyncio.gather(classify(ctx), extract_entities(ctx))

    # Step 3: Generate response (depends on both parallel steps)
    ctx = await generate_response(ctx)

    # Step 4: Validate
    ctx = await validate_output(ctx)
    return ctx

Generic DAG-Based Pipeline Engine

For complex agents, build a reusable pipeline engine that resolves dependencies automatically.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from collections import defaultdict

class AsyncPipeline:
    """DAG-based async pipeline with automatic dependency resolution."""

    def __init__(self):
        self.steps: dict[str, Any] = {}
        self.dependencies: dict[str, list[str]] = defaultdict(list)

    def add_step(self, name: str, func, depends_on: list[str] = None):
        """Register a pipeline step with its dependencies."""
        self.steps[name] = func
        if depends_on:
            self.dependencies[name] = depends_on

    async def execute(self, ctx: dict) -> dict:
        """Execute the pipeline respecting dependency order."""
        completed: dict[str, asyncio.Event] = {
            name: asyncio.Event() for name in self.steps
        }
        results: dict[str, Any] = {}

        async def run_step(name: str):
            # Wait for all dependencies to complete
            for dep in self.dependencies[name]:
                await completed[dep].wait()

            # Execute the step
            result = await self.steps[name](ctx, results)
            results[name] = result
            completed[name].set()

        # Launch all steps concurrently — they self-schedule
        # via dependency waits
        await asyncio.gather(
            *[run_step(name) for name in self.steps]
        )
        return results

# Usage
pipeline = AsyncPipeline()
pipeline.add_step("retrieve", retrieve_docs_step)
pipeline.add_step("classify", classify_step, depends_on=["retrieve"])
pipeline.add_step("entities", extract_entities_step, depends_on=["retrieve"])
pipeline.add_step("generate", generate_step,
                   depends_on=["classify", "entities"])
pipeline.add_step("validate", validate_step, depends_on=["generate"])

results = asyncio.run(pipeline.execute({"query": "How does asyncio work?"}))

Backpressure: Preventing Pipeline Overload

When a fast producer feeds a slow consumer, work accumulates unboundedly. Use asyncio.Queue with a max size to apply backpressure.

async def pipeline_with_backpressure(
    queries: list[str],
    max_in_flight: int = 10,
):
    """Pipeline with bounded queues for backpressure."""
    retrieval_queue = asyncio.Queue(maxsize=max_in_flight)
    generation_queue = asyncio.Queue(maxsize=max_in_flight)
    results = []

    async def retriever():
        while True:
            query = await retrieval_queue.get()
            if query is None:
                await generation_queue.put(None)
                break
            docs = await fetch_documents(query)
            await generation_queue.put((query, docs))
            retrieval_queue.task_done()

    async def generator():
        while True:
            item = await generation_queue.get()
            if item is None:
                break
            query, docs = item
            response = await generate_llm_response(query, docs)
            results.append(response)
            generation_queue.task_done()

    # Start workers
    ret_task = asyncio.create_task(retriever())
    gen_task = asyncio.create_task(generator())

    # Feed queries — this blocks when queue is full
    for query in queries:
        await retrieval_queue.put(query)
    await retrieval_queue.put(None)  # Sentinel

    await asyncio.gather(ret_task, gen_task)
    return results

The maxsize parameter on the queue ensures the producer blocks when the consumer cannot keep up, preventing unbounded memory growth.

FAQ

How do I handle a step failure in a multi-step pipeline?

Wrap each step in a try/except and decide on a failure strategy: skip the step and continue with degraded results, retry the step with backoff, or abort the entire pipeline. The DAG engine approach makes this clean — each step checks its dependencies' results and can short-circuit if a required dependency failed.

Can I add conditional branching to async pipelines?

Yes. In the DAG engine, make the step function inspect the context or prior results and return early if the branch should not execute. For example, a moderation step might set a flag that causes the generation step to return a canned safety response instead of calling the LLM.

What is the performance overhead of the DAG pipeline approach compared to hand-coded async?

The overhead is negligible. The asyncio.Event-based synchronization adds microseconds of latency per step. The dominant cost is always the LLM API calls, which take hundreds of milliseconds. The DAG approach pays for itself in maintainability and correctness.


#Python #Asyncio #Pipeline #AIAgents #Architecture #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.