Skip to content
Learn Agentic AI11 min read0 views

The Pipeline Pattern: Sequential Agent Stages for Complex Data Processing

Master the Pipeline pattern for AI agents — design sequential processing stages with intermediate results, error propagation, and checkpointing for resilient multi-step workflows.

What Is the Pipeline Pattern?

The Pipeline pattern structures work as a sequence of stages, where the output of one stage becomes the input of the next. In AI agent systems, each stage can be a distinct agent or function that performs a specific transformation — extracting data, enriching it, validating it, and finally producing a result.

This pattern shines when you have a complex task that can be decomposed into well-defined, ordered steps. Instead of one monolithic agent trying to do everything at once, you break the work into focused stages that are individually testable, replaceable, and observable.

Designing the Pipeline Framework

A good pipeline framework needs four capabilities: stage registration, intermediate result passing, error propagation, and checkpointing for recovery.

from dataclasses import dataclass, field
from typing import Any, Callable
from datetime import datetime
import json


@dataclass
class StageResult:
    stage_name: str
    output: Any
    started_at: datetime
    completed_at: datetime
    success: bool
    error: str | None = None


@dataclass
class PipelineContext:
    initial_input: Any
    results: list[StageResult] = field(default_factory=list)
    metadata: dict = field(default_factory=dict)

    @property
    def last_output(self) -> Any:
        if self.results:
            return self.results[-1].output
        return self.initial_input

    def checkpoint(self, path: str):
        data = {
            "initial_input": self.initial_input,
            "completed_stages": [r.stage_name for r in self.results
                                 if r.success],
            "last_output": self.last_output,
            "metadata": self.metadata,
        }
        with open(path, "w") as f:
            json.dump(data, f)


class Pipeline:
    def __init__(self, name: str, checkpoint_dir: str | None = None):
        self.name = name
        self.stages: list[tuple[str, Callable]] = []
        self.checkpoint_dir = checkpoint_dir

    def add_stage(self, name: str, handler: Callable):
        self.stages.append((name, handler))
        return self  # fluent interface

    def run(self, initial_input: Any) -> PipelineContext:
        ctx = PipelineContext(initial_input=initial_input)

        for stage_name, handler in self.stages:
            started = datetime.now()
            try:
                output = handler(ctx.last_output, ctx)
                ctx.results.append(StageResult(
                    stage_name=stage_name,
                    output=output,
                    started_at=started,
                    completed_at=datetime.now(),
                    success=True,
                ))
                if self.checkpoint_dir:
                    ctx.checkpoint(
                        f"{self.checkpoint_dir}/{self.name}_{stage_name}.json"
                    )
            except Exception as e:
                ctx.results.append(StageResult(
                    stage_name=stage_name,
                    output=None,
                    started_at=started,
                    completed_at=datetime.now(),
                    success=False,
                    error=str(e),
                ))
                raise PipelineError(stage_name, e, ctx)

        return ctx


class PipelineError(Exception):
    def __init__(self, stage: str, cause: Exception,
                 context: PipelineContext):
        self.stage = stage
        self.cause = cause
        self.context = context
        super().__init__(f"Pipeline failed at stage '{stage}': {cause}")

Building a Real Pipeline

Here is a document-processing pipeline that extracts text, summarizes it, classifies sentiment, and generates a report:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import openai

client = openai.OpenAI()

def extract_text(raw_input: str, ctx: PipelineContext) -> str:
    # Simulate extraction from raw document
    return raw_input.strip()

def summarize(text: str, ctx: PipelineContext) -> str:
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Summarize in 2-3 sentences."},
            {"role": "user", "content": text},
        ],
    )
    return response.choices[0].message.content

def classify_sentiment(summary: str, ctx: PipelineContext) -> dict:
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system",
             "content": "Return JSON: {"sentiment": "positive|negative|neutral", "score": 0.0-1.0}"},
            {"role": "user", "content": summary},
        ],
        response_format={"type": "json_object"},
    )
    import json
    result = json.loads(response.choices[0].message.content)
    ctx.metadata["sentiment"] = result["sentiment"]
    return result

pipeline = (
    Pipeline("doc_analysis", checkpoint_dir="/tmp/checkpoints")
    .add_stage("extract", extract_text)
    .add_stage("summarize", summarize)
    .add_stage("classify", classify_sentiment)
)

result = pipeline.run("Long document text here...")
print(result.last_output)  # {"sentiment": "positive", "score": 0.87}

Error Propagation and Recovery

When a stage fails, PipelineError captures the stage name, the original exception, and the full pipeline context including all completed stage results. This lets you resume from the last successful checkpoint rather than re-running the entire pipeline — critical when early stages involve expensive API calls.

FAQ

How do I resume a pipeline from a checkpoint after a failure?

Load the checkpoint file, find the last completed stage, then create a new pipeline run starting from the next stage using the saved last_output as the initial input. Skip stages that already completed successfully in the checkpoint.

Should each pipeline stage be a separate agent or a simple function?

Use simple functions for deterministic transformations (parsing, formatting, validation) and agents for stages that require reasoning or decision-making (summarization, classification). Mixing both keeps the pipeline fast where possible and intelligent where needed.

How do I handle stages that need to branch conditionally?

Add a conditional stage that inspects the input and returns a flag in ctx.metadata. Subsequent stages check that flag to decide their behavior. For complex branching, consider combining the Pipeline pattern with the Router pattern.


#AgentDesignPatterns #PipelinePattern #Python #AgenticAI #DataProcessing #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.