Building Data Extraction Pipelines: Turning Unstructured Text into Structured Data
Design and implement multi-step data extraction pipelines that transform unstructured text into clean structured data using LLMs. Covers entity extraction, relation extraction, and pipeline orchestration.
Why Build Extraction Pipelines?
Every organization sits on mountains of unstructured text: emails, contracts, support tickets, research papers, meeting notes. The data locked inside these documents is valuable, but only if you can transform it into structured formats that databases, dashboards, and downstream systems can consume.
A single LLM call can extract a few fields from a short paragraph. But real-world extraction requires pipelines: multiple stages that chunk documents, extract entities, resolve references, validate results, and handle failures gracefully.
Pipeline Architecture
A robust extraction pipeline has four stages:
- Preprocessing — split documents into manageable chunks
- Entity Extraction — pull out named entities and their attributes
- Relation Extraction — identify relationships between entities
- Postprocessing — deduplicate, validate, and format output
from pydantic import BaseModel, Field
from typing import List, Optional
from openai import OpenAI
client = OpenAI()
# Stage 1: Define your output schemas
class Entity(BaseModel):
name: str
entity_type: str = Field(description="person, organization, location, date, money")
attributes: dict = Field(default_factory=dict)
class Relation(BaseModel):
subject: str
predicate: str = Field(description="e.g., works_at, located_in, acquired")
obj: str # 'object' is reserved in Python
confidence: float = Field(ge=0.0, le=1.0)
class ExtractionOutput(BaseModel):
entities: List[Entity]
relations: List[Relation]
Stage 1: Document Chunking
Long documents exceed context windows and reduce extraction accuracy. Split them into overlapping chunks:
def chunk_text(text: str, chunk_size: int = 2000, overlap: int = 200) -> List[str]:
"""Split text into overlapping chunks."""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start = end - overlap
return chunks
The overlap ensures entities that span chunk boundaries are not lost. A 200-character overlap is usually sufficient for most document types.
Stage 2: Entity Extraction
Process each chunk through the LLM with a focused extraction prompt:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
def extract_entities(text_chunk: str) -> List[Entity]:
response = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[
{
"role": "system",
"content": (
"Extract all named entities from the text. "
"For each entity, identify its type and any attributes mentioned."
)
},
{"role": "user", "content": text_chunk}
],
response_format=type(
"EntityList",
(BaseModel,),
{"__annotations__": {"entities": List[Entity]}}
),
)
return response.choices[0].message.parsed.entities
Stage 3: Relation Extraction
Once you have entities, extract the relationships between them. Passing the entity list as context improves accuracy:
def extract_relations(text_chunk: str, entities: List[Entity]) -> List[Relation]:
entity_names = [e.name for e in entities]
response = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[
{
"role": "system",
"content": (
"Given these entities and the original text, "
"identify all relationships between the entities. "
f"Known entities: {entity_names}"
)
},
{"role": "user", "content": text_chunk}
],
response_format=type(
"RelationList",
(BaseModel,),
{"__annotations__": {"relations": List[Relation]}}
),
)
return response.choices[0].message.parsed.relations
Stage 4: Deduplication and Merging
When processing multiple chunks, the same entity appears in different chunks. Merge them:
def deduplicate_entities(all_entities: List[Entity]) -> List[Entity]:
"""Merge entities by normalized name."""
merged = {}
for entity in all_entities:
key = entity.name.lower().strip()
if key in merged:
# Merge attributes from duplicate
merged[key].attributes.update(entity.attributes)
else:
merged[key] = entity.model_copy()
return list(merged.values())
Putting It All Together
def run_extraction_pipeline(document: str) -> ExtractionOutput:
chunks = chunk_text(document)
all_entities = []
all_relations = []
for chunk in chunks:
entities = extract_entities(chunk)
all_entities.extend(entities)
relations = extract_relations(chunk, entities)
all_relations.extend(relations)
deduped_entities = deduplicate_entities(all_entities)
return ExtractionOutput(
entities=deduped_entities,
relations=all_relations,
)
# Usage
document = """Acme Corp, founded by Jane Smith in 2019 in San Francisco,
acquired DataFlow Inc for $50M in January 2025. DataFlow's CTO,
Bob Johnson, joined Acme as VP of Engineering..."""
result = run_extraction_pipeline(document)
for entity in result.entities:
print(f"{entity.entity_type}: {entity.name}")
for rel in result.relations:
print(f"{rel.subject} --{rel.predicate}--> {rel.obj}")
Adding Retry Logic
LLM calls fail. Network errors happen. Add retries at the extraction stage:
import time
def extract_with_retry(func, *args, max_retries: int = 3, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
wait = 2 ** attempt
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait}s...")
time.sleep(wait)
FAQ
How do I handle documents longer than the model's context window?
Chunking is the standard approach. For very long documents (100+ pages), use a hierarchical strategy: first extract a summary of each section, then do detailed extraction on sections that contain relevant information. This reduces both cost and processing time.
Should I use one LLM call per stage or combine extraction into a single call?
Separate calls per stage produce better results. A single call asking for entities, relations, and summaries simultaneously tends to degrade quality on each individual task. The overhead of multiple calls is offset by higher extraction accuracy.
How do I evaluate extraction quality?
Build a test set of 50-100 manually annotated documents. Measure precision (how many extracted items are correct), recall (how many real items were found), and F1 score. Aim for at least 85% F1 on entity extraction and 75% on relation extraction for production use.
#DataExtraction #NLP #Pipelines #StructuredOutputs #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.