Building a Multi-Agent Data Pipeline: Ingestion, Transformation, and Analysis Agents
Build a three-agent data pipeline with ingestion, transformation, and analysis agents that process data from APIs, CSVs, and databases using Python.
Why Multi-Agent Data Pipelines?
Traditional ETL pipelines are rigid. They break when source schemas change, when data quality degrades, or when new data sources appear. An agentic approach makes each pipeline stage intelligent: the ingestion agent adapts to different data formats, the transformation agent handles messy data gracefully, and the analysis agent generates insights without predefined queries.
In this tutorial, you will build a three-agent data pipeline where each agent is specialized for its role, communicates with the others through a shared data store, and can reason about problems independently.
Pipeline Architecture
┌─────────────────┐ ┌─────────────────────┐ ┌──────────────────┐
│ Ingestion │────▶│ Transformation │────▶│ Analysis │
│ Agent │ │ Agent │ │ Agent │
│ │ │ │ │ │
│ - API fetch │ │ - Null handling │ │ - Statistics │
│ - CSV parse │ │ - Type casting │ │ - Correlations │
│ - DB query │ │ - Deduplication │ │ - Visualization │
│ - Schema detect │ │ - Enrichment │ │ - Report gen │
└────────┬────────┘ └──────────┬──────────┘ └──────────┬───────┘
│ │ │
└─────────────────────────┴────────────────────────────┘
Shared Data Store
(SQLite / Parquet files)
Prerequisites
- Python 3.11+
- OpenAI API key
pip install openai-agents pandas sqlalchemy requests openpyxl matplotlib seaborn
Step 1: Build the Shared Data Store
The agents communicate through a shared SQLite database and a directory of intermediate files:
# pipeline/data_store.py
import sqlite3
import pandas as pd
import json
import os
from datetime import datetime
DATA_DIR = "./pipeline_data"
DB_PATH = os.path.join(DATA_DIR, "pipeline.db")
def init_store():
os.makedirs(DATA_DIR, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.execute("""
CREATE TABLE IF NOT EXISTS pipeline_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
stage TEXT NOT NULL,
status TEXT DEFAULT 'started',
input_path TEXT,
output_path TEXT,
row_count INTEGER,
metadata TEXT,
started_at TEXT DEFAULT CURRENT_TIMESTAMP,
completed_at TEXT
)
""")
conn.commit()
conn.close()
def log_stage(stage: str, status: str, input_path: str = "",
output_path: str = "", row_count: int = 0,
metadata: dict = None) -> int:
conn = sqlite3.connect(DB_PATH)
cur = conn.execute(
"""INSERT INTO pipeline_runs
(stage, status, input_path, output_path, row_count, metadata, completed_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(stage, status, input_path, output_path, row_count,
json.dumps(metadata or {}),
datetime.now().isoformat() if status == "completed" else None)
)
conn.commit()
run_id = cur.lastrowid
conn.close()
return run_id
def save_dataframe(df: pd.DataFrame, name: str) -> str:
path = os.path.join(DATA_DIR, f"{name}.parquet")
df.to_parquet(path, index=False)
return path
def load_dataframe(name: str) -> pd.DataFrame:
path = os.path.join(DATA_DIR, f"{name}.parquet")
return pd.read_parquet(path)
Step 2: Build the Ingestion Agent
The ingestion agent handles three data source types: REST APIs, CSV files, and databases.
# pipeline/agents/ingestion.py
from agents import Agent, function_tool
import pandas as pd
import requests
import sqlalchemy
from pipeline.data_store import save_dataframe, log_stage
@function_tool
def fetch_from_api(url: str, headers: str = "{}", params: str = "{}") -> str:
"""Fetch data from a REST API endpoint. The headers and params should
be JSON strings. Returns a summary of the fetched data."""
import json
try:
resp = requests.get(
url,
headers=json.loads(headers),
params=json.loads(params),
timeout=30,
)
resp.raise_for_status()
data = resp.json()
if isinstance(data, list):
df = pd.DataFrame(data)
elif isinstance(data, dict):
# Try common wrapper keys
for key in ("results", "data", "items", "records"):
if key in data and isinstance(data[key], list):
df = pd.DataFrame(data[key])
break
else:
df = pd.DataFrame([data])
else:
return f"Unexpected response type: {type(data)}"
path = save_dataframe(df, "ingested_api")
log_stage("ingestion", "completed", url, path, len(df),
{"source_type": "api", "columns": list(df.columns)})
return f"Fetched {len(df)} rows with columns: {list(df.columns)}. Saved to {path}"
except Exception as e:
log_stage("ingestion", "failed", url, metadata={"error": str(e)})
return f"API fetch failed: {str(e)}"
@function_tool
def parse_csv(file_path: str, delimiter: str = ",", encoding: str = "utf-8") -> str:
"""Parse a CSV file and save it to the data store. Automatically
detects column types and handles common encoding issues."""
try:
df = pd.read_csv(file_path, delimiter=delimiter, encoding=encoding)
# Detect and report schema
schema = {col: str(dtype) for col, dtype in df.dtypes.items()}
null_counts = df.isnull().sum().to_dict()
path = save_dataframe(df, "ingested_csv")
log_stage("ingestion", "completed", file_path, path, len(df),
{"source_type": "csv", "schema": schema, "nulls": null_counts})
return (
f"Parsed {len(df)} rows, {len(df.columns)} columns.\n"
f"Schema: {schema}\n"
f"Null counts: {null_counts}\n"
f"Saved to {path}"
)
except Exception as e:
log_stage("ingestion", "failed", file_path, metadata={"error": str(e)})
return f"CSV parse failed: {str(e)}"
@function_tool
def query_database(connection_string: str, query: str) -> str:
"""Execute a SQL query against a database and ingest the results.
Supports PostgreSQL, MySQL, and SQLite via SQLAlchemy."""
try:
engine = sqlalchemy.create_engine(connection_string)
df = pd.read_sql(query, engine)
engine.dispose()
path = save_dataframe(df, "ingested_db")
log_stage("ingestion", "completed", f"db:{query[:50]}...", path, len(df),
{"source_type": "database", "columns": list(df.columns)})
return f"Query returned {len(df)} rows with columns: {list(df.columns)}. Saved to {path}"
except Exception as e:
log_stage("ingestion", "failed", metadata={"error": str(e)})
return f"Database query failed: {str(e)}"
@function_tool
def detect_schema(dataset_name: str) -> str:
"""Analyze the schema of an ingested dataset. Returns column names,
types, null percentages, and sample values."""
from pipeline.data_store import load_dataframe
try:
df = load_dataframe(dataset_name)
analysis = []
for col in df.columns:
null_pct = (df[col].isnull().sum() / len(df)) * 100
sample = df[col].dropna().head(3).tolist()
analysis.append(
f" {col}: {df[col].dtype} | {null_pct:.1f}% null | samples: {sample}"
)
return f"Schema for {dataset_name} ({len(df)} rows):\n" + "\n".join(analysis)
except Exception as e:
return f"Schema detection failed: {str(e)}"
ingestion_agent = Agent(
name="Ingestion Agent",
instructions="""You are a data ingestion specialist. Your job is to:
1. Accept data source specifications (API URLs, file paths, or DB connections)
2. Fetch/parse the data using the appropriate tool
3. Detect and report the schema
4. Flag any immediate data quality issues (high null rates, unexpected types)
5. Save the data to the shared store for the transformation agent
Always detect the schema after ingestion and include it in your summary.""",
tools=[fetch_from_api, parse_csv, query_database, detect_schema],
model="gpt-4o",
)
Step 3: Build the Transformation Agent
The transformation agent cleans, validates, and enriches data:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
# pipeline/agents/transformation.py
from agents import Agent, function_tool
import pandas as pd
from pipeline.data_store import load_dataframe, save_dataframe, log_stage
@function_tool
def handle_nulls(dataset_name: str, strategy: str = "{}") -> str:
"""Handle null values in a dataset. Strategy is a JSON dict mapping
column names to strategies: 'drop', 'mean', 'median', 'mode',
'zero', 'forward_fill', or a literal fill value string."""
import json
try:
df = load_dataframe(dataset_name)
strategies = json.loads(strategy) if strategy != "{}" else {}
original_nulls = df.isnull().sum().sum()
for col, strat in strategies.items():
if col not in df.columns:
continue
if strat == "drop":
df = df.dropna(subset=[col])
elif strat == "mean":
df[col] = df[col].fillna(df[col].mean())
elif strat == "median":
df[col] = df[col].fillna(df[col].median())
elif strat == "mode":
df[col] = df[col].fillna(df[col].mode()[0])
elif strat == "zero":
df[col] = df[col].fillna(0)
elif strat == "forward_fill":
df[col] = df[col].ffill()
else:
df[col] = df[col].fillna(strat)
# Drop remaining nulls if no strategy specified
if not strategies:
df = df.dropna()
remaining_nulls = df.isnull().sum().sum()
path = save_dataframe(df, f"{dataset_name}_clean")
log_stage("transformation", "completed", dataset_name, path, len(df),
{"nulls_before": int(original_nulls), "nulls_after": int(remaining_nulls)})
return f"Null handling complete. Before: {original_nulls} nulls, After: {remaining_nulls}. Rows: {len(df)}. Saved to {path}"
except Exception as e:
return f"Null handling failed: {str(e)}"
@function_tool
def deduplicate(dataset_name: str, subset_columns: str = "[]") -> str:
"""Remove duplicate rows from a dataset. If subset_columns (JSON list)
is provided, duplicates are determined by those columns only."""
import json
try:
df = load_dataframe(dataset_name)
original_count = len(df)
cols = json.loads(subset_columns) if subset_columns != "[]" else None
df = df.drop_duplicates(subset=cols, keep="first")
removed = original_count - len(df)
path = save_dataframe(df, f"{dataset_name}_dedup")
log_stage("transformation", "completed", dataset_name, path, len(df),
{"duplicates_removed": removed})
return f"Deduplication complete. Removed {removed} duplicates. {len(df)} rows remaining. Saved to {path}"
except Exception as e:
return f"Deduplication failed: {str(e)}"
@function_tool
def cast_types(dataset_name: str, type_map: str = "{}") -> str:
"""Cast column types in a dataset. Type map is a JSON dict mapping
column names to target types: 'int', 'float', 'str', 'datetime', 'bool'."""
import json
try:
df = load_dataframe(dataset_name)
types = json.loads(type_map)
changes = []
for col, target in types.items():
if col not in df.columns:
continue
old_type = str(df[col].dtype)
if target == "datetime":
df[col] = pd.to_datetime(df[col], errors="coerce")
elif target == "int":
df[col] = pd.to_numeric(df[col], errors="coerce").astype("Int64")
elif target == "float":
df[col] = pd.to_numeric(df[col], errors="coerce")
elif target == "str":
df[col] = df[col].astype(str)
elif target == "bool":
df[col] = df[col].astype(bool)
changes.append(f" {col}: {old_type} -> {target}")
path = save_dataframe(df, f"{dataset_name}_typed")
log_stage("transformation", "completed", dataset_name, path, len(df),
{"type_changes": changes})
return f"Type casting complete:\n" + "\n".join(changes) + f"\nSaved to {path}"
except Exception as e:
return f"Type casting failed: {str(e)}"
@function_tool
def add_computed_column(dataset_name: str, column_name: str, expression: str) -> str:
"""Add a computed column to a dataset using a pandas eval expression.
Example expression: 'price * quantity' or 'col1 + col2'."""
try:
df = load_dataframe(dataset_name)
df[column_name] = df.eval(expression)
path = save_dataframe(df, f"{dataset_name}_enriched")
log_stage("transformation", "completed", dataset_name, path, len(df),
{"new_column": column_name, "expression": expression})
return f"Added column '{column_name}' = {expression}. Sample values: {df[column_name].head(5).tolist()}"
except Exception as e:
return f"Computed column failed: {str(e)}"
transformation_agent = Agent(
name="Transformation Agent",
instructions="""You are a data transformation specialist. Your job is to:
1. Load ingested data from the shared store
2. Handle null values with appropriate strategies per column
3. Remove duplicates
4. Cast columns to correct types
5. Add computed columns for enrichment when useful
6. Save the clean dataset for the analysis agent
Always explain your transformation choices and report before/after statistics.""",
tools=[handle_nulls, deduplicate, cast_types, add_computed_column],
model="gpt-4o",
)
Step 4: Build the Analysis Agent
The analysis agent generates statistics, finds correlations, and creates visualizations:
# pipeline/agents/analysis.py
from agents import Agent, function_tool
import pandas as pd
from pipeline.data_store import load_dataframe, log_stage, DATA_DIR
import os
@function_tool
def compute_statistics(dataset_name: str) -> str:
"""Compute descriptive statistics for all numeric columns in a dataset.
Returns count, mean, std, min, quartiles, max, skewness, and kurtosis."""
try:
df = load_dataframe(dataset_name)
numeric = df.select_dtypes(include="number")
if numeric.empty:
return "No numeric columns found in this dataset."
stats = numeric.describe().T
stats["skew"] = numeric.skew()
stats["kurtosis"] = numeric.kurtosis()
return f"Statistics for {dataset_name} ({len(df)} rows):\n{stats.to_string()}"
except Exception as e:
return f"Statistics failed: {str(e)}"
@function_tool
def find_correlations(dataset_name: str, threshold: float = 0.5) -> str:
"""Find correlations between numeric columns. Returns pairs with
absolute correlation above the threshold."""
try:
df = load_dataframe(dataset_name)
numeric = df.select_dtypes(include="number")
corr = numeric.corr()
strong = []
for i in range(len(corr.columns)):
for j in range(i + 1, len(corr.columns)):
val = corr.iloc[i, j]
if abs(val) >= threshold:
strong.append(
f" {corr.columns[i]} <-> {corr.columns[j]}: {val:.3f}"
)
if not strong:
return f"No correlations above {threshold} threshold found."
return f"Strong correlations (|r| >= {threshold}):\n" + "\n".join(strong)
except Exception as e:
return f"Correlation analysis failed: {str(e)}"
@function_tool
def create_visualization(dataset_name: str, chart_type: str,
x_column: str, y_column: str = "",
title: str = "Chart") -> str:
"""Create a chart and save it as a PNG file. Supported chart types:
histogram, scatter, bar, line, box. For histogram and box, only
x_column is required."""
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import seaborn as sns
try:
df = load_dataframe(dataset_name)
fig, ax = plt.subplots(figsize=(10, 6))
if chart_type == "histogram":
sns.histplot(data=df, x=x_column, ax=ax, kde=True)
elif chart_type == "scatter":
sns.scatterplot(data=df, x=x_column, y=y_column, ax=ax)
elif chart_type == "bar":
top = df[x_column].value_counts().head(20)
sns.barplot(x=top.index, y=top.values, ax=ax)
plt.xticks(rotation=45, ha="right")
elif chart_type == "line":
df_sorted = df.sort_values(x_column)
ax.plot(df_sorted[x_column], df_sorted[y_column])
elif chart_type == "box":
sns.boxplot(data=df, y=x_column, ax=ax)
else:
return f"Unknown chart type: {chart_type}"
ax.set_title(title)
plt.tight_layout()
filename = f"{chart_type}_{x_column}_{y_column}.png".replace(" ", "_")
path = os.path.join(DATA_DIR, filename)
plt.savefig(path, dpi=150)
plt.close()
return f"Chart saved to {path}"
except Exception as e:
return f"Visualization failed: {str(e)}"
@function_tool
def generate_summary_report(dataset_name: str, findings: str) -> str:
"""Generate a text summary report of the analysis findings and save
it to the data store."""
try:
df = load_dataframe(dataset_name)
report = f"""# Data Analysis Report
Dataset: {dataset_name}
Rows: {len(df)}
Columns: {len(df.columns)}
Generated: {pd.Timestamp.now().isoformat()}
## Dataset Overview
Columns: {', '.join(df.columns.tolist())}
Numeric columns: {', '.join(df.select_dtypes(include='number').columns.tolist())}
## Findings
{findings}
"""
path = os.path.join(DATA_DIR, f"{dataset_name}_report.md")
with open(path, "w") as f:
f.write(report)
log_stage("analysis", "completed", dataset_name, path, len(df))
return f"Report saved to {path}"
except Exception as e:
return f"Report generation failed: {str(e)}"
analysis_agent = Agent(
name="Analysis Agent",
instructions="""You are a data analysis specialist. Your job is to:
1. Load the cleaned data from the shared store
2. Compute descriptive statistics for all numeric columns
3. Find correlations and patterns
4. Create appropriate visualizations
5. Generate a summary report with key findings
ANALYSIS APPROACH:
- Start with descriptive statistics to understand distributions
- Look for correlations between numeric columns
- Create at least 2-3 visualizations
- Highlight anomalies, outliers, and unexpected patterns
- Provide actionable insights in the summary report""",
tools=[compute_statistics, find_correlations, create_visualization,
generate_summary_report],
model="gpt-4o",
)
Step 5: Orchestrate the Pipeline
# pipeline/orchestrator.py
import asyncio
from agents import Runner
from pipeline.data_store import init_store
from pipeline.agents.ingestion import ingestion_agent
from pipeline.agents.transformation import transformation_agent
from pipeline.agents.analysis import analysis_agent
async def run_pipeline(source_description: str):
init_store()
print("Phase 1: Ingestion")
print("=" * 50)
ingest_result = await Runner.run(
ingestion_agent,
f"Ingest data from: {source_description}"
)
print(ingest_result.final_output)
print("\nPhase 2: Transformation")
print("=" * 50)
transform_result = await Runner.run(
transformation_agent,
f"Transform the ingested data. Previous stage output: {ingest_result.final_output}"
)
print(transform_result.final_output)
print("\nPhase 3: Analysis")
print("=" * 50)
analysis_result = await Runner.run(
analysis_agent,
f"Analyze the transformed data. Previous stage output: {transform_result.final_output}"
)
print(analysis_result.final_output)
if __name__ == "__main__":
asyncio.run(run_pipeline(
"CSV file at ./sample_data/sales_2026.csv containing "
"columns for date, product, region, units_sold, revenue, and cost"
))
FAQ
How do the agents communicate with each other?
The agents communicate indirectly through the shared data store. Each agent reads data saved by the previous stage using Parquet files. The orchestrator passes a text summary from each stage to the next, giving downstream agents context about what happened upstream. This pattern is simpler and more debuggable than direct agent-to-agent messaging.
Can I run the pipeline stages in parallel?
The three stages in this pipeline are sequential by design — transformation depends on ingestion, and analysis depends on transformation. However, you can parallelize within stages. For example, the ingestion agent could fetch from multiple APIs concurrently, and the analysis agent could generate multiple visualizations in parallel.
What happens if the transformation agent makes a wrong decision?
Each transformation step saves to a new file rather than modifying the original. This means you can always reload the ingested data and retry. The pipeline log in SQLite tracks every action with before/after statistics, making it easy to identify where things went wrong.
How would I add a fourth agent for data loading?
Create a new agent with tools for writing to your target database (e.g., PostgreSQL COPY, BigQuery load, S3 upload). Add it as a fourth phase in the orchestrator. The pattern is the same — the loading agent reads the analyzed data from the shared store and writes it to the destination.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.