Building Agentic AI Data Pipelines: When ETL Meets LLM Extraction
Explore how to build agentic AI data pipelines that combine traditional ETL with LLM-powered extraction, classification, and validation loops.
The Convergence of Data Engineering and Agentic AI
Traditional ETL pipelines follow rigid rules: extract data from source A, transform it according to schema B, load it into warehouse C. When the data is structured and predictable — CSV exports, database replications, API responses with fixed schemas — this works well.
But modern organizations drown in unstructured data: contracts, invoices, support emails, meeting transcripts, regulatory filings, medical records, and research papers. Traditional ETL chokes on these because the transformation rules cannot be written as deterministic code. The schema varies across documents, the language is ambiguous, and edge cases multiply faster than engineers can write parsers.
This is where agentic AI data pipelines come in. Instead of hardcoded transformation logic, you deploy LLM-powered agents at each stage of the pipeline — agents that classify documents, extract structured fields from free text, validate extracted data against business rules, and route exceptions for human review.
At CallSphere, we process thousands of call transcripts daily through agentic data pipelines that extract customer intent, sentiment, action items, and compliance flags — data that would be impossible to extract with regex or rule-based systems.
Architecture of an Agentic Data Pipeline
An agentic data pipeline has five core stages:
Stage 1: Intelligent Document Ingestion
The ingestion layer must handle diverse formats — PDF, DOCX, email, images, audio transcripts — and normalize them into a processable form.
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class DocumentType(Enum):
PDF = "pdf"
DOCX = "docx"
EMAIL = "email"
IMAGE = "image"
TRANSCRIPT = "transcript"
HTML = "html"
@dataclass
class IngestedDocument:
id: str
source: str
doc_type: DocumentType
raw_text: str
metadata: dict
page_count: Optional[int] = None
confidence: float = 1.0
class IngestionAgent:
"""Agent that handles document ingestion and normalization."""
async def ingest(self, file_path: str) -> IngestedDocument:
doc_type = self.detect_type(file_path)
raw_text = await self.extract_text(file_path, doc_type)
metadata = await self.extract_metadata(file_path, doc_type)
# OCR fallback for scanned documents
if len(raw_text.strip()) < 50 and doc_type == DocumentType.PDF:
raw_text = await self.ocr_extract(file_path)
metadata["ocr_applied"] = True
return IngestedDocument(
id=generate_uuid(),
source=file_path,
doc_type=doc_type,
raw_text=raw_text,
metadata=metadata,
)
The ingestion agent is not just a file reader. It makes decisions: should it apply OCR? Is this document corrupted? Should it split a 200-page PDF into logical sections before passing downstream?
Stage 2: LLM-Powered Classification
Once documents are ingested, a classification agent determines what each document is and how it should be processed.
CLASSIFICATION_PROMPT = """Analyze the following document and classify it.
Document text (first 2000 chars):
{document_text}
Respond with JSON:
{{
"document_class": "invoice | contract | support_ticket | meeting_notes | regulatory_filing | other",
"confidence": 0.0 to 1.0,
"language": "ISO 639-1 code",
"contains_pii": true/false,
"summary": "One sentence summary"
}}"""
class ClassificationAgent:
def __init__(self, llm_client, confidence_threshold: float = 0.85):
self.llm = llm_client
self.threshold = confidence_threshold
async def classify(self, doc: IngestedDocument) -> ClassificationResult:
result = await self.llm.complete(
CLASSIFICATION_PROMPT.format(document_text=doc.raw_text[:2000])
)
parsed = json.loads(result)
if parsed["confidence"] < self.threshold:
# Route to human review queue
await self.send_to_review(doc, parsed)
return ClassificationResult(status="pending_review", **parsed)
return ClassificationResult(status="classified", **parsed)
The confidence threshold is critical. Documents that the LLM cannot confidently classify get routed to a human review queue rather than polluting downstream stages with bad labels.
Stage 3: Structured Extraction
This is where agentic AI pipelines deliver the most value. The extraction agent pulls structured fields from unstructured text, adapting to each document class.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
INVOICE_EXTRACTION_PROMPT = """Extract the following fields from this invoice.
Return JSON with these exact keys. Use null for missing fields.
Fields: vendor_name, vendor_address, invoice_number, invoice_date,
due_date, line_items (array of {{description, quantity, unit_price, total}}),
subtotal, tax_amount, total_amount, currency, payment_terms
Invoice text:
{document_text}"""
class ExtractionAgent:
def __init__(self, llm_client):
self.llm = llm_client
self.prompts = {
"invoice": INVOICE_EXTRACTION_PROMPT,
"contract": CONTRACT_EXTRACTION_PROMPT,
"support_ticket": TICKET_EXTRACTION_PROMPT,
}
async def extract(
self, doc: IngestedDocument, classification: ClassificationResult
) -> ExtractionResult:
prompt_template = self.prompts.get(classification.document_class)
if not prompt_template:
return ExtractionResult(status="unsupported_class")
result = await self.llm.complete(
prompt_template.format(document_text=doc.raw_text)
)
extracted = json.loads(result)
return ExtractionResult(
status="extracted",
fields=extracted,
source_doc_id=doc.id,
)
Stage 4: Validation Loops
Extracted data must be validated before loading. The validation agent checks business rules, cross-references external systems, and can re-extract when validation fails.
class ValidationAgent:
def __init__(self, llm_client, max_retries: int = 2):
self.llm = llm_client
self.max_retries = max_retries
async def validate(
self, extraction: ExtractionResult, doc: IngestedDocument
) -> ValidationResult:
errors = []
# Rule-based validation
if extraction.fields.get("total_amount"):
line_sum = sum(
item["total"] for item in extraction.fields.get("line_items", [])
if item.get("total")
)
if abs(line_sum - extraction.fields["total_amount"]) > 0.01:
errors.append("Line items do not sum to total")
# LLM-powered validation for ambiguous cases
if extraction.fields.get("invoice_date"):
date_check = await self.llm.complete(
f"Is '{extraction.fields['invoice_date']}' a valid date? "
f"Context: {doc.raw_text[:500]}. Reply YES or NO with corrected date."
)
if "NO" in date_check.upper():
errors.append(f"Invalid date: {date_check}")
if errors and self.retry_count < self.max_retries:
# Re-extract with error context
return await self.re_extract_with_feedback(doc, extraction, errors)
return ValidationResult(
valid=len(errors) == 0,
errors=errors,
extraction=extraction,
)
The validation loop is what makes this agentic rather than just an LLM call. When extraction fails validation, the system feeds the errors back to the extraction agent as additional context and retries. This self-correcting behavior dramatically improves accuracy.
Stage 5: Pipeline Orchestration
The orchestrator ties all stages together. You can use Airflow, Prefect, Temporal, or a simple task queue.
from prefect import flow, task
@task(retries=2, retry_delay_seconds=30)
async def ingest_document(file_path: str) -> IngestedDocument:
agent = IngestionAgent()
return await agent.ingest(file_path)
@task(retries=1)
async def classify_document(doc: IngestedDocument) -> ClassificationResult:
agent = ClassificationAgent(llm_client, confidence_threshold=0.85)
return await agent.classify(doc)
@task(retries=2, retry_delay_seconds=10)
async def extract_fields(
doc: IngestedDocument, classification: ClassificationResult
) -> ExtractionResult:
agent = ExtractionAgent(llm_client)
return await agent.extract(doc, classification)
@task(retries=1)
async def validate_extraction(
extraction: ExtractionResult, doc: IngestedDocument
) -> ValidationResult:
agent = ValidationAgent(llm_client, max_retries=2)
return await agent.validate(extraction, doc)
@flow(name="agentic-etl-pipeline")
async def process_document(file_path: str):
doc = await ingest_document(file_path)
classification = await classify_document(doc)
if classification.status == "pending_review":
await send_to_review_queue(doc, classification)
return
extraction = await extract_fields(doc, classification)
validation = await validate_extraction(extraction, doc)
if validation.valid:
await load_to_warehouse(validation.extraction)
else:
await send_to_exception_queue(doc, validation)
Performance Considerations
Batching LLM Calls
Processing documents one at a time is expensive. Batch classification calls by grouping documents and sending them in a single prompt with instructions to return an array of results. This reduces per-document latency and cost.
Caching Extraction Templates
If you process many documents of the same type (e.g., invoices from the same vendor), cache the extraction results and use them as few-shot examples for subsequent documents from that vendor. This improves consistency and reduces token usage.
Parallel Stage Execution
Stages within the pipeline are sequential per document, but documents should flow through the pipeline in parallel. Use a task queue like Celery, NATS JetStream, or Prefect's concurrent task runner to process multiple documents simultaneously.
| Optimization | Impact | Complexity |
|---|---|---|
| Batch classification | 40-60% cost reduction | Low |
| Extraction caching | 20-30% faster, more consistent | Medium |
| Parallel document flow | 5-10x throughput increase | Medium |
| Model routing (small/large) | 50-70% cost reduction | High |
| Prompt caching (Anthropic) | 90% reduction on cache hits | Low |
Error Handling and Dead Letter Queues
Every stage in the pipeline can fail. Design for failure with dead letter queues at each stage:
- Ingestion failures: Corrupted files, unsupported formats, encoding issues
- Classification failures: LLM timeouts, ambiguous documents, low confidence
- Extraction failures: JSON parse errors, missing required fields, hallucinated data
- Validation failures: Business rule violations, impossible values, cross-reference mismatches
Each dead letter queue should capture the original document, the stage that failed, the error details, and the number of retry attempts. A human review dashboard pulls from these queues, and corrected results are fed back as training examples to improve future extraction accuracy.
Frequently Asked Questions
How accurate is LLM-based data extraction compared to traditional parsers?
For well-structured documents like invoices from known vendors, LLM extraction achieves 92-97% field-level accuracy. Traditional template-based parsers can hit 99% for known formats but fail completely on unseen layouts. The advantage of LLM extraction is generalization — it handles new document formats without code changes.
How do I handle PII in agentic data pipelines?
Flag PII at the classification stage and route those documents through a separate pipeline path with additional safeguards: encrypted storage, audit logging, and redaction before loading into general-purpose warehouses. Use the LLM to identify and mask PII fields before they reach downstream systems.
What is the cost per document for an agentic ETL pipeline?
Costs vary by document complexity, but a typical invoice processing pipeline costs USD 0.02-0.08 per document using Claude or GPT-4o for extraction. Classification with a smaller model can reduce that to USD 0.005 per document. At scale (100k+ documents/month), model routing and prompt caching can cut total costs by 50-70%.
Should I fine-tune a model for extraction or use prompt engineering?
Start with prompt engineering and few-shot examples. Fine-tuning makes sense when you process millions of documents of the same type and need to reduce per-document latency and cost. For most organizations processing diverse document types, prompt engineering with good examples outperforms fine-tuning.
How do I monitor extraction quality over time?
Implement a sampling-based review process: randomly select 2-5% of processed documents for human review. Track accuracy metrics per document class and per extraction field over time. Set alerts when accuracy drops below your threshold — this often indicates a change in document format from a source system.
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.