Skip to content
Learn Agentic AI11 min read0 views

Building a Data Pipeline Agent: ETL Workflows with AI-Powered Transformation

Build an intelligent ETL pipeline agent that uses LLMs to infer schemas from messy data, transform records with natural language instructions, and validate data quality at each stage.

Why AI Transforms ETL

Traditional ETL (Extract, Transform, Load) pipelines are brittle. When a source system changes its data format, column names, or encoding, the pipeline breaks. Data engineers spend weeks writing transformation rules that handle every edge case.

AI-powered ETL agents flip this model. Instead of hand-coding transformation rules, you describe the desired output schema in natural language, and the LLM figures out the mapping. The agent can infer schemas from sample data, handle messy formats, and flag quality issues — all tasks that previously required human intervention.

The ETL Agent Architecture

The pipeline has three stages, each managed by the agent:

from dataclasses import dataclass, field
from typing import Any
from enum import Enum

class PipelineStage(Enum):
    EXTRACT = "extract"
    TRANSFORM = "transform"
    LOAD = "load"

@dataclass
class PipelineContext:
    """Shared state across pipeline stages."""
    source_config: dict[str, Any] = field(default_factory=dict)
    raw_records: list[dict] = field(default_factory=list)
    transformed_records: list[dict] = field(default_factory=list)
    inferred_schema: dict = field(default_factory=dict)
    quality_report: dict = field(default_factory=dict)
    errors: list[str] = field(default_factory=list)

class ETLAgent:
    def __init__(self, llm_client):
        self.llm = llm_client

    async def run_pipeline(self, source_config: dict, target_schema: dict) -> PipelineContext:
        ctx = PipelineContext(source_config=source_config)

        # Stage 1: Extract
        ctx = await self.extract(ctx)
        if ctx.errors:
            return ctx

        # Stage 2: Transform (AI-powered)
        ctx = await self.transform(ctx, target_schema)

        # Stage 3: Load
        ctx = await self.load(ctx)

        return ctx

Stage 1: Extraction with Format Detection

The extraction stage pulls raw data from various sources. The agent detects the format automatically:

import csv
import json
import io

async def extract(self, ctx: PipelineContext) -> PipelineContext:
    """Extract data from the configured source."""
    source_type = ctx.source_config.get("type", "file")
    raw_data = await self._fetch_raw(ctx.source_config)

    # Auto-detect format
    fmt = await self._detect_format(raw_data)
    if fmt == "json":
        ctx.raw_records = json.loads(raw_data)
    elif fmt == "csv":
        reader = csv.DictReader(io.StringIO(raw_data))
        ctx.raw_records = list(reader)
    elif fmt == "jsonl":
        ctx.raw_records = [
            json.loads(line) for line in raw_data.strip().split("\n") if line
        ]
    else:
        ctx.errors.append(f"Unsupported format: {fmt}")

    return ctx

async def _detect_format(self, raw_data: str) -> str:
    """Use heuristics to detect data format."""
    stripped = raw_data.strip()
    if stripped.startswith("[") or stripped.startswith("{"):
        return "json"
    if "\n{" in stripped and stripped.startswith("{"):
        return "jsonl"
    return "csv"

Stage 2: AI-Powered Schema Inference and Transformation

This is where the LLM shines. Given sample records and a target schema, it generates the mapping:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

async def transform(
    self, ctx: PipelineContext, target_schema: dict
) -> PipelineContext:
    """Use LLM to infer mapping and transform records."""

    # Step 1: Infer the source schema from sample data
    sample = ctx.raw_records[:5]
    ctx.inferred_schema = await self._infer_schema(sample)

    # Step 2: Generate transformation instructions
    mapping = await self._generate_mapping(
        source_schema=ctx.inferred_schema,
        target_schema=target_schema,
        sample_records=sample,
    )

    # Step 3: Apply transformation to all records
    ctx.transformed_records = []
    for record in ctx.raw_records:
        transformed = await self._apply_mapping(record, mapping)
        ctx.transformed_records.append(transformed)

    # Step 4: Validate data quality
    ctx.quality_report = self._validate_quality(
        ctx.transformed_records, target_schema
    )

    return ctx

async def _infer_schema(self, sample_records: list[dict]) -> dict:
    """Ask the LLM to infer a schema from sample data."""
    prompt = f"""Analyze these sample records and infer the schema.
Return a JSON object where keys are field names and values are objects
with "type" (string, integer, float, boolean, date, email) and "nullable" (bool).

Sample records:
{json.dumps(sample_records[:3], indent=2)}"""

    response = await self.llm.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        response_format={"type": "json_object"},
    )
    return json.loads(response.choices[0].message.content)

Generating and Applying Mappings

The LLM produces a mapping specification that the agent applies deterministically:

async def _generate_mapping(
    self, source_schema: dict, target_schema: dict, sample_records: list
) -> dict:
    """LLM generates a field mapping from source to target."""
    prompt = f"""Map source fields to target fields.

Source schema: {json.dumps(source_schema)}
Target schema: {json.dumps(target_schema)}
Sample source record: {json.dumps(sample_records[0])}

Return JSON with this structure:
{{
    "mappings": [
        {{
            "target_field": "name",
            "source_expression": "first_name + ' ' + last_name",
            "transform": "concatenate"
        }}
    ]
}}"""

    response = await self.llm.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        response_format={"type": "json_object"},
    )
    return json.loads(response.choices[0].message.content)

def _apply_single_mapping(self, record: dict, mapping_rule: dict) -> Any:
    """Apply a single mapping rule to a record."""
    transform = mapping_rule.get("transform", "direct")
    source = mapping_rule.get("source_expression", "")

    if transform == "direct":
        return record.get(source)
    elif transform == "concatenate":
        parts = [record.get(f.strip(), "") for f in source.split("+")]
        return " ".join(str(p) for p in parts if p).strip()
    elif transform == "lowercase":
        return str(record.get(source, "")).lower()
    elif transform == "to_integer":
        raw = record.get(source, 0)
        return int(float(str(raw).replace(",", "")))
    return record.get(source)

Data Quality Validation

After transformation, validate that the output meets quality standards:

def _validate_quality(
    self, records: list[dict], target_schema: dict
) -> dict:
    """Run quality checks on transformed records."""
    total = len(records)
    issues = {"missing_fields": 0, "type_mismatches": 0, "empty_values": 0}
    field_completeness = {}

    for field_name, field_spec in target_schema.items():
        present_count = sum(
            1 for r in records if r.get(field_name) is not None
        )
        field_completeness[field_name] = present_count / total if total else 0

        if not field_spec.get("nullable", True):
            missing = total - present_count
            issues["missing_fields"] += missing

    return {
        "total_records": total,
        "field_completeness": field_completeness,
        "issues": issues,
        "quality_score": 1.0 - (sum(issues.values()) / (total * len(target_schema) or 1)),
    }

FAQ

How do I prevent the LLM from generating incorrect field mappings?

Use a two-pass approach. First, let the LLM propose the mapping. Then, apply it to your sample records and present the results back to the LLM for verification. Ask it to confirm each mapping makes semantic sense. Additionally, run the quality validation on the sample output — if completeness drops below a threshold, flag the mapping for human review.

Should I use the LLM to transform every record, or just to generate the transformation rules?

Generate rules once, apply them deterministically to all records. Sending every record through an LLM is prohibitively slow and expensive for large datasets. The LLM's role is to understand the semantic relationship between source and target schemas and produce a mapping specification. The actual transformation uses simple Python code driven by that specification.

How do I handle schema drift when the source data changes over time?

Run schema inference on each batch and compare against the previously inferred schema. If new fields appear or types change, trigger a re-mapping step. Store the mapping version alongside the loaded data so you can trace which transformation rules produced each batch. Alert the data team when drift exceeds a configurable threshold.


#ETL #DataPipeline #SchemaInference #DataQuality #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.