Skip to content
Learn Agentic AI13 min read1 views

Building Data Extraction Pipelines: Turning Unstructured Text into Structured Data

Design and implement multi-step data extraction pipelines that transform unstructured text into clean structured data using LLMs. Covers entity extraction, relation extraction, and pipeline orchestration.

Why Build Extraction Pipelines?

Every organization sits on mountains of unstructured text: emails, contracts, support tickets, research papers, meeting notes. The data locked inside these documents is valuable, but only if you can transform it into structured formats that databases, dashboards, and downstream systems can consume.

A single LLM call can extract a few fields from a short paragraph. But real-world extraction requires pipelines: multiple stages that chunk documents, extract entities, resolve references, validate results, and handle failures gracefully.

Pipeline Architecture

A robust extraction pipeline has four stages:

  1. Preprocessing — split documents into manageable chunks
  2. Entity Extraction — pull out named entities and their attributes
  3. Relation Extraction — identify relationships between entities
  4. Postprocessing — deduplicate, validate, and format output
from pydantic import BaseModel, Field
from typing import List, Optional
from openai import OpenAI

client = OpenAI()

# Stage 1: Define your output schemas
class Entity(BaseModel):
    name: str
    entity_type: str = Field(description="person, organization, location, date, money")
    attributes: dict = Field(default_factory=dict)

class Relation(BaseModel):
    subject: str
    predicate: str = Field(description="e.g., works_at, located_in, acquired")
    obj: str  # 'object' is reserved in Python
    confidence: float = Field(ge=0.0, le=1.0)

class ExtractionOutput(BaseModel):
    entities: List[Entity]
    relations: List[Relation]

Stage 1: Document Chunking

Long documents exceed context windows and reduce extraction accuracy. Split them into overlapping chunks:

def chunk_text(text: str, chunk_size: int = 2000, overlap: int = 200) -> List[str]:
    """Split text into overlapping chunks."""
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        start = end - overlap
    return chunks

The overlap ensures entities that span chunk boundaries are not lost. A 200-character overlap is usually sufficient for most document types.

Stage 2: Entity Extraction

Process each chunk through the LLM with a focused extraction prompt:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

def extract_entities(text_chunk: str) -> List[Entity]:
    response = client.beta.chat.completions.parse(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": (
                    "Extract all named entities from the text. "
                    "For each entity, identify its type and any attributes mentioned."
                )
            },
            {"role": "user", "content": text_chunk}
        ],
        response_format=type(
            "EntityList",
            (BaseModel,),
            {"__annotations__": {"entities": List[Entity]}}
        ),
    )
    return response.choices[0].message.parsed.entities

Stage 3: Relation Extraction

Once you have entities, extract the relationships between them. Passing the entity list as context improves accuracy:

def extract_relations(text_chunk: str, entities: List[Entity]) -> List[Relation]:
    entity_names = [e.name for e in entities]

    response = client.beta.chat.completions.parse(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": (
                    "Given these entities and the original text, "
                    "identify all relationships between the entities. "
                    f"Known entities: {entity_names}"
                )
            },
            {"role": "user", "content": text_chunk}
        ],
        response_format=type(
            "RelationList",
            (BaseModel,),
            {"__annotations__": {"relations": List[Relation]}}
        ),
    )
    return response.choices[0].message.parsed.relations

Stage 4: Deduplication and Merging

When processing multiple chunks, the same entity appears in different chunks. Merge them:

def deduplicate_entities(all_entities: List[Entity]) -> List[Entity]:
    """Merge entities by normalized name."""
    merged = {}
    for entity in all_entities:
        key = entity.name.lower().strip()
        if key in merged:
            # Merge attributes from duplicate
            merged[key].attributes.update(entity.attributes)
        else:
            merged[key] = entity.model_copy()
    return list(merged.values())

Putting It All Together

def run_extraction_pipeline(document: str) -> ExtractionOutput:
    chunks = chunk_text(document)

    all_entities = []
    all_relations = []

    for chunk in chunks:
        entities = extract_entities(chunk)
        all_entities.extend(entities)

        relations = extract_relations(chunk, entities)
        all_relations.extend(relations)

    deduped_entities = deduplicate_entities(all_entities)

    return ExtractionOutput(
        entities=deduped_entities,
        relations=all_relations,
    )

# Usage
document = """Acme Corp, founded by Jane Smith in 2019 in San Francisco,
acquired DataFlow Inc for $50M in January 2025. DataFlow's CTO,
Bob Johnson, joined Acme as VP of Engineering..."""

result = run_extraction_pipeline(document)
for entity in result.entities:
    print(f"{entity.entity_type}: {entity.name}")
for rel in result.relations:
    print(f"{rel.subject} --{rel.predicate}--> {rel.obj}")

Adding Retry Logic

LLM calls fail. Network errors happen. Add retries at the extraction stage:

import time

def extract_with_retry(func, *args, max_retries: int = 3, **kwargs):
    for attempt in range(max_retries):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            wait = 2 ** attempt
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait}s...")
            time.sleep(wait)

FAQ

How do I handle documents longer than the model's context window?

Chunking is the standard approach. For very long documents (100+ pages), use a hierarchical strategy: first extract a summary of each section, then do detailed extraction on sections that contain relevant information. This reduces both cost and processing time.

Should I use one LLM call per stage or combine extraction into a single call?

Separate calls per stage produce better results. A single call asking for entities, relations, and summaries simultaneously tends to degrade quality on each individual task. The overhead of multiple calls is offset by higher extraction accuracy.

How do I evaluate extraction quality?

Build a test set of 50-100 manually annotated documents. Measure precision (how many extracted items are correct), recall (how many real items were found), and F1 score. Aim for at least 85% F1 on entity extraction and 75% on relation extraction for production use.


#DataExtraction #NLP #Pipelines #StructuredOutputs #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.