Building a Document Ingestion Pipeline for RAG: PDF, DOCX, HTML, and CSV Processing
Learn how to build a production document ingestion pipeline that detects file formats, extracts text, chunks content intelligently, generates embeddings, and stores everything for retrieval-augmented generation.
Why Document Ingestion Is the Foundation of RAG
Retrieval-augmented generation only works if the retrieval layer has clean, well-structured data to search. Most RAG failures are not prompt engineering problems — they are data ingestion problems. If your pipeline silently drops tables from PDFs, strips formatting from DOCX headers, or produces overlapping chunks with no context, your agent will hallucinate confidently from incomplete information.
A production ingestion pipeline must handle four concerns: format detection and extraction, intelligent chunking, embedding generation, and indexed storage. Each stage has pitfalls that compound downstream.
Format Detection and Text Extraction
The first challenge is reliably extracting text from heterogeneous file types. Never rely on file extensions alone — a renamed .txt file might contain HTML.
import magic
from pathlib import Path
from dataclasses import dataclass
from typing import List
@dataclass
class ExtractedDocument:
source: str
content: str
metadata: dict
pages: List[str]
class FormatDetector:
MIME_MAP = {
"application/pdf": "pdf",
"application/vnd.openxmlformats-officedocument"
".wordprocessingml.document": "docx",
"text/html": "html",
"text/csv": "csv",
"text/plain": "text",
}
def detect(self, file_path: str) -> str:
mime = magic.from_file(file_path, mime=True)
fmt = self.MIME_MAP.get(mime)
if not fmt:
raise ValueError(
f"Unsupported format: {mime} for {file_path}"
)
return fmt
class DocumentExtractor:
def __init__(self):
self.detector = FormatDetector()
def extract(self, file_path: str) -> ExtractedDocument:
fmt = self.detector.detect(file_path)
extractor = getattr(self, f"_extract_{fmt}")
return extractor(file_path)
def _extract_pdf(self, path: str) -> ExtractedDocument:
import pdfplumber
pages = []
with pdfplumber.open(path) as pdf:
for page in pdf.pages:
text = page.extract_text() or ""
tables = page.extract_tables()
for table in tables:
rows = [
" | ".join(str(c or "") for c in row)
for row in table
]
text += "\n" + "\n".join(rows)
pages.append(text)
return ExtractedDocument(
source=path,
content="\n\n".join(pages),
metadata={"format": "pdf", "page_count": len(pages)},
pages=pages,
)
def _extract_docx(self, path: str) -> ExtractedDocument:
from docx import Document
doc = Document(path)
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
return ExtractedDocument(
source=path,
content="\n\n".join(paragraphs),
metadata={"format": "docx", "paragraph_count": len(paragraphs)},
pages=paragraphs,
)
def _extract_html(self, path: str) -> ExtractedDocument:
from bs4 import BeautifulSoup
with open(path, "r", encoding="utf-8") as f:
soup = BeautifulSoup(f.read(), "html.parser")
for tag in soup(["script", "style", "nav", "footer"]):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
return ExtractedDocument(
source=path,
content=text,
metadata={"format": "html", "title": soup.title.string if soup.title else ""},
pages=[text],
)
def _extract_csv(self, path: str) -> ExtractedDocument:
import csv
rows = []
with open(path, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
line = " | ".join(
f"{k}: {v}" for k, v in row.items()
)
rows.append(line)
return ExtractedDocument(
source=path,
content="\n".join(rows),
metadata={"format": "csv", "row_count": len(rows)},
pages=rows,
)
The key design decision here is using pdfplumber over PyPDF2 because it handles table extraction natively. Tables are a major source of lost information in PDF pipelines.
Intelligent Chunking
Naive fixed-size chunking breaks sentences mid-thought and loses section context. A better approach uses recursive splitting with overlap and respects document structure.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from typing import List
from dataclasses import dataclass
@dataclass
class Chunk:
text: str
metadata: dict
index: int
class RecursiveChunker:
def __init__(
self,
max_tokens: int = 512,
overlap_tokens: int = 64,
separators: list = None,
):
self.max_tokens = max_tokens
self.overlap_tokens = overlap_tokens
self.separators = separators or [
"\n\n", "\n", ". ", " "
]
def chunk(
self, doc: ExtractedDocument
) -> List[Chunk]:
raw_chunks = self._split(
doc.content, self.separators
)
chunks = []
for i, text in enumerate(raw_chunks):
chunks.append(Chunk(
text=text.strip(),
metadata={
**doc.metadata,
"source": doc.source,
"chunk_index": i,
"total_chunks": len(raw_chunks),
},
index=i,
))
return chunks
def _split(self, text: str, seps: list) -> List[str]:
if not seps:
return self._fixed_split(text)
sep = seps[0]
parts = text.split(sep)
merged = []
current = ""
for part in parts:
candidate = current + sep + part if current else part
if self._token_count(candidate) <= self.max_tokens:
current = candidate
else:
if current:
merged.append(current)
if self._token_count(part) > self.max_tokens:
merged.extend(self._split(part, seps[1:]))
else:
current = part
continue
current = ""
if current:
merged.append(current)
return self._add_overlap(merged)
def _add_overlap(self, chunks: List[str]) -> List[str]:
if len(chunks) <= 1:
return chunks
result = [chunks[0]]
for i in range(1, len(chunks)):
prev_words = chunks[i - 1].split()
overlap = " ".join(prev_words[-self.overlap_tokens:])
result.append(overlap + " " + chunks[i])
return result
def _fixed_split(self, text: str) -> List[str]:
words = text.split()
return [
" ".join(words[i:i + self.max_tokens])
for i in range(0, len(words), self.max_tokens)
]
def _token_count(self, text: str) -> int:
return len(text.split())
Embedding and Storage
Once chunks are ready, generate embeddings and store them in a vector database. Batch processing with rate limiting prevents API throttling.
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def embed_and_store(chunks: List[Chunk], collection):
batch_size = 100
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
response = await client.embeddings.create(
model="text-embedding-3-small",
input=[c.text for c in batch],
)
ids = [f"{batch[j].metadata['source']}_{batch[j].index}" for j in range(len(batch))]
embeddings = [e.embedding for e in response.data]
metadatas = [c.metadata for c in batch]
documents = [c.text for c in batch]
collection.upsert(
ids=ids,
embeddings=embeddings,
metadatas=metadatas,
documents=documents,
)
await asyncio.sleep(0.5) # rate limiting
FAQ
How should I handle scanned PDFs with no extractable text?
Use OCR as a fallback. Check if pdfplumber returns empty text for a page, then run that page through pytesseract or a cloud OCR service like AWS Textract. Add an ocr_applied: true flag to chunk metadata so downstream consumers know the text quality may be lower.
What chunk size works best for RAG?
Start with 512 tokens with 64-token overlap. Smaller chunks (256 tokens) improve precision for factual Q&A but lose context for summarization tasks. Larger chunks (1024 tokens) work better for complex reasoning. Test with your actual queries and measure retrieval recall to find the right size for your use case.
Should I re-embed everything when the embedding model changes?
Yes. Embedding spaces are model-specific and not interchangeable. When you upgrade models, re-process all documents and rebuild your vector index. Use a versioned collection naming scheme like docs_v2_embedding3small so you can run both indexes in parallel during migration.
#RAG #DocumentProcessing #DataPipelines #Embeddings #VectorSearch #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.