Building a Data Pipeline Agent: ETL Workflows with AI-Powered Transformation
Build an intelligent ETL pipeline agent that uses LLMs to infer schemas from messy data, transform records with natural language instructions, and validate data quality at each stage.
Why AI Transforms ETL
Traditional ETL (Extract, Transform, Load) pipelines are brittle. When a source system changes its data format, column names, or encoding, the pipeline breaks. Data engineers spend weeks writing transformation rules that handle every edge case.
AI-powered ETL agents flip this model. Instead of hand-coding transformation rules, you describe the desired output schema in natural language, and the LLM figures out the mapping. The agent can infer schemas from sample data, handle messy formats, and flag quality issues — all tasks that previously required human intervention.
The ETL Agent Architecture
The pipeline has three stages, each managed by the agent:
from dataclasses import dataclass, field
from typing import Any
from enum import Enum
class PipelineStage(Enum):
EXTRACT = "extract"
TRANSFORM = "transform"
LOAD = "load"
@dataclass
class PipelineContext:
"""Shared state across pipeline stages."""
source_config: dict[str, Any] = field(default_factory=dict)
raw_records: list[dict] = field(default_factory=list)
transformed_records: list[dict] = field(default_factory=list)
inferred_schema: dict = field(default_factory=dict)
quality_report: dict = field(default_factory=dict)
errors: list[str] = field(default_factory=list)
class ETLAgent:
def __init__(self, llm_client):
self.llm = llm_client
async def run_pipeline(self, source_config: dict, target_schema: dict) -> PipelineContext:
ctx = PipelineContext(source_config=source_config)
# Stage 1: Extract
ctx = await self.extract(ctx)
if ctx.errors:
return ctx
# Stage 2: Transform (AI-powered)
ctx = await self.transform(ctx, target_schema)
# Stage 3: Load
ctx = await self.load(ctx)
return ctx
Stage 1: Extraction with Format Detection
The extraction stage pulls raw data from various sources. The agent detects the format automatically:
import csv
import json
import io
async def extract(self, ctx: PipelineContext) -> PipelineContext:
"""Extract data from the configured source."""
source_type = ctx.source_config.get("type", "file")
raw_data = await self._fetch_raw(ctx.source_config)
# Auto-detect format
fmt = await self._detect_format(raw_data)
if fmt == "json":
ctx.raw_records = json.loads(raw_data)
elif fmt == "csv":
reader = csv.DictReader(io.StringIO(raw_data))
ctx.raw_records = list(reader)
elif fmt == "jsonl":
ctx.raw_records = [
json.loads(line) for line in raw_data.strip().split("\n") if line
]
else:
ctx.errors.append(f"Unsupported format: {fmt}")
return ctx
async def _detect_format(self, raw_data: str) -> str:
"""Use heuristics to detect data format."""
stripped = raw_data.strip()
if stripped.startswith("[") or stripped.startswith("{"):
return "json"
if "\n{" in stripped and stripped.startswith("{"):
return "jsonl"
return "csv"
Stage 2: AI-Powered Schema Inference and Transformation
This is where the LLM shines. Given sample records and a target schema, it generates the mapping:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
async def transform(
self, ctx: PipelineContext, target_schema: dict
) -> PipelineContext:
"""Use LLM to infer mapping and transform records."""
# Step 1: Infer the source schema from sample data
sample = ctx.raw_records[:5]
ctx.inferred_schema = await self._infer_schema(sample)
# Step 2: Generate transformation instructions
mapping = await self._generate_mapping(
source_schema=ctx.inferred_schema,
target_schema=target_schema,
sample_records=sample,
)
# Step 3: Apply transformation to all records
ctx.transformed_records = []
for record in ctx.raw_records:
transformed = await self._apply_mapping(record, mapping)
ctx.transformed_records.append(transformed)
# Step 4: Validate data quality
ctx.quality_report = self._validate_quality(
ctx.transformed_records, target_schema
)
return ctx
async def _infer_schema(self, sample_records: list[dict]) -> dict:
"""Ask the LLM to infer a schema from sample data."""
prompt = f"""Analyze these sample records and infer the schema.
Return a JSON object where keys are field names and values are objects
with "type" (string, integer, float, boolean, date, email) and "nullable" (bool).
Sample records:
{json.dumps(sample_records[:3], indent=2)}"""
response = await self.llm.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
)
return json.loads(response.choices[0].message.content)
Generating and Applying Mappings
The LLM produces a mapping specification that the agent applies deterministically:
async def _generate_mapping(
self, source_schema: dict, target_schema: dict, sample_records: list
) -> dict:
"""LLM generates a field mapping from source to target."""
prompt = f"""Map source fields to target fields.
Source schema: {json.dumps(source_schema)}
Target schema: {json.dumps(target_schema)}
Sample source record: {json.dumps(sample_records[0])}
Return JSON with this structure:
{{
"mappings": [
{{
"target_field": "name",
"source_expression": "first_name + ' ' + last_name",
"transform": "concatenate"
}}
]
}}"""
response = await self.llm.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
)
return json.loads(response.choices[0].message.content)
def _apply_single_mapping(self, record: dict, mapping_rule: dict) -> Any:
"""Apply a single mapping rule to a record."""
transform = mapping_rule.get("transform", "direct")
source = mapping_rule.get("source_expression", "")
if transform == "direct":
return record.get(source)
elif transform == "concatenate":
parts = [record.get(f.strip(), "") for f in source.split("+")]
return " ".join(str(p) for p in parts if p).strip()
elif transform == "lowercase":
return str(record.get(source, "")).lower()
elif transform == "to_integer":
raw = record.get(source, 0)
return int(float(str(raw).replace(",", "")))
return record.get(source)
Data Quality Validation
After transformation, validate that the output meets quality standards:
def _validate_quality(
self, records: list[dict], target_schema: dict
) -> dict:
"""Run quality checks on transformed records."""
total = len(records)
issues = {"missing_fields": 0, "type_mismatches": 0, "empty_values": 0}
field_completeness = {}
for field_name, field_spec in target_schema.items():
present_count = sum(
1 for r in records if r.get(field_name) is not None
)
field_completeness[field_name] = present_count / total if total else 0
if not field_spec.get("nullable", True):
missing = total - present_count
issues["missing_fields"] += missing
return {
"total_records": total,
"field_completeness": field_completeness,
"issues": issues,
"quality_score": 1.0 - (sum(issues.values()) / (total * len(target_schema) or 1)),
}
FAQ
How do I prevent the LLM from generating incorrect field mappings?
Use a two-pass approach. First, let the LLM propose the mapping. Then, apply it to your sample records and present the results back to the LLM for verification. Ask it to confirm each mapping makes semantic sense. Additionally, run the quality validation on the sample output — if completeness drops below a threshold, flag the mapping for human review.
Should I use the LLM to transform every record, or just to generate the transformation rules?
Generate rules once, apply them deterministically to all records. Sending every record through an LLM is prohibitively slow and expensive for large datasets. The LLM's role is to understand the semantic relationship between source and target schemas and produce a mapping specification. The actual transformation uses simple Python code driven by that specification.
How do I handle schema drift when the source data changes over time?
Run schema inference on each batch and compare against the previously inferred schema. If new fields appear or types change, trigger a re-mapping step. Store the mapping version alongside the loaded data so you can trace which transformation rules produced each batch. Alert the data team when drift exceeds a configurable threshold.
#ETL #DataPipeline #SchemaInference #DataQuality #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.