Batch Embedding and Ingestion: Processing Millions of Documents for Vector Search
Build a production-grade pipeline for embedding and ingesting millions of documents into a vector database, covering chunking strategies, parallel processing, rate limiting, and progress tracking.
The Ingestion Challenge
Generating a single embedding takes milliseconds. Generating embeddings for a million documents takes hours if you do it naively — one document at a time, waiting for each API response before sending the next request. A well-designed ingestion pipeline uses batching, parallelism, and fault tolerance to reduce that to minutes.
This guide walks through building a production-grade pipeline that chunks documents, generates embeddings in parallel batches, handles rate limits gracefully, and tracks progress so you can resume after failures.
Step 1: Document Chunking
Most documents exceed the token limit of embedding models (8191 tokens for OpenAI text-embedding-3-small). Split them into overlapping chunks to preserve context at boundaries:
from dataclasses import dataclass
@dataclass
class Chunk:
doc_id: str
chunk_index: int
text: str
metadata: dict
def chunk_document(
doc_id: str,
text: str,
metadata: dict,
chunk_size: int = 500,
overlap: int = 50
) -> list[Chunk]:
words = text.split()
chunks = []
start = 0
index = 0
while start < len(words):
end = start + chunk_size
chunk_text = " ".join(words[start:end])
chunks.append(Chunk(
doc_id=doc_id,
chunk_index=index,
text=chunk_text,
metadata={**metadata, "chunk_index": index}
))
start += chunk_size - overlap
index += 1
return chunks
Choose chunk size based on your retrieval needs. Smaller chunks (200-300 words) produce more precise results. Larger chunks (500-1000 words) preserve more context. Overlap of 10-15% prevents information from being split across boundaries.
Step 2: Batch Embedding with Rate Limit Handling
OpenAI's embedding API accepts batches of up to 2048 texts. Batching reduces API calls dramatically:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import time
import openai
from openai import OpenAI
client = OpenAI()
def embed_batch(
texts: list[str],
model: str = "text-embedding-3-small",
max_retries: int = 3
) -> list[list[float]]:
for attempt in range(max_retries):
try:
response = client.embeddings.create(
model=model,
input=texts
)
return [item.embedding for item in response.data]
except openai.RateLimitError:
wait_time = 2 ** attempt * 5 # 5s, 10s, 20s
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
except openai.APIError as e:
if attempt == max_retries - 1:
raise
print(f"API error: {e}. Retrying...")
time.sleep(2)
raise RuntimeError("Max retries exceeded")
Step 3: Parallel Processing Pipeline
Use a producer-consumer pattern with a thread pool. The producer chunks documents and fills a queue. Workers pull batches from the queue, embed them, and upsert to the vector database:
import concurrent.futures
from queue import Queue
from threading import Lock
class EmbeddingPipeline:
def __init__(self, batch_size: int = 100, max_workers: int = 5):
self.batch_size = batch_size
self.max_workers = max_workers
self.progress = {"embedded": 0, "failed": 0}
self.lock = Lock()
def process_batch(self, chunks: list[Chunk]) -> list[dict]:
texts = [c.text for c in chunks]
embeddings = embed_batch(texts)
results = []
for chunk, embedding in zip(chunks, embeddings):
results.append({
"id": f"{chunk.doc_id}_{chunk.chunk_index}",
"values": embedding,
"metadata": {
**chunk.metadata,
"doc_id": chunk.doc_id,
"text": chunk.text[:500] # store truncated text
}
})
with self.lock:
self.progress["embedded"] += len(results)
return results
def run(self, chunks: list[Chunk]):
# Split into batches
batches = [
chunks[i:i + self.batch_size]
for i in range(0, len(chunks), self.batch_size)
]
all_results = []
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
futures = {
executor.submit(self.process_batch, batch): i
for i, batch in enumerate(batches)
}
for future in concurrent.futures.as_completed(futures):
batch_idx = futures[future]
try:
results = future.result()
all_results.extend(results)
print(
f"Batch {batch_idx + 1}/{len(batches)} done. "
f"Total: {self.progress['embedded']}"
)
except Exception as e:
with self.lock:
self.progress["failed"] += self.batch_size
print(f"Batch {batch_idx} failed: {e}")
return all_results
Step 4: Progress Tracking and Resumability
For million-document pipelines, failures are inevitable. Track progress to enable resuming:
import json
from pathlib import Path
class ProgressTracker:
def __init__(self, checkpoint_path: str = "ingestion_progress.json"):
self.path = Path(checkpoint_path)
self.processed_ids: set[str] = set()
self._load()
def _load(self):
if self.path.exists():
data = json.loads(self.path.read_text())
self.processed_ids = set(data.get("processed_ids", []))
print(f"Resumed: {len(self.processed_ids)} already processed")
def mark_done(self, doc_ids: list[str]):
self.processed_ids.update(doc_ids)
# Save checkpoint every 1000 documents
if len(self.processed_ids) % 1000 == 0:
self._save()
def _save(self):
self.path.write_text(json.dumps({
"processed_ids": list(self.processed_ids),
"count": len(self.processed_ids)
}))
def should_process(self, doc_id: str) -> bool:
return doc_id not in self.processed_ids
Step 5: Putting It All Together
def ingest_documents(documents: list[dict]):
tracker = ProgressTracker()
pipeline = EmbeddingPipeline(batch_size=100, max_workers=5)
# Filter already-processed documents
pending = [
doc for doc in documents
if tracker.should_process(doc["id"])
]
print(f"Processing {len(pending)} of {len(documents)} documents")
# Chunk all pending documents
all_chunks = []
for doc in pending:
chunks = chunk_document(
doc_id=doc["id"],
text=doc["content"],
metadata={"source": doc.get("source", "unknown")}
)
all_chunks.append((doc["id"], chunks))
# Flatten chunks for pipeline
flat_chunks = [c for _, chunks in all_chunks for c in chunks]
# Embed and collect results
results = pipeline.run(flat_chunks)
# Upsert to vector database in batches
for i in range(0, len(results), 100):
batch = results[i:i + 100]
index.upsert(vectors=batch) # Pinecone example
doc_ids = list(set(r["metadata"]["doc_id"] for r in batch))
tracker.mark_done(doc_ids)
print(f"Done. Embedded: {pipeline.progress['embedded']}, "
f"Failed: {pipeline.progress['failed']}")
Performance Tips
- Use the largest batch size your API allows. OpenAI supports up to 2048 inputs per request. Larger batches reduce HTTP overhead.
- Set max_workers to 3-5. More workers hit rate limits faster. Fewer workers leave throughput on the table.
- Pre-filter empty or duplicate documents before chunking to avoid wasting embedding API calls.
- Monitor token usage. Embedding cost is per token, not per request. Long documents cost more.
FAQ
How long does it take to embed one million documents?
With OpenAI text-embedding-3-small at 5 parallel workers and 100 documents per batch, expect roughly 2-4 hours for one million average-length documents (500 words each). The bottleneck is API rate limits, not compute. Local embedding models like sentence-transformers on a GPU can process the same volume in 30-60 minutes.
Should I embed entire documents or chunks?
Almost always chunks. Embedding models have token limits, and embedding a long document into a single vector loses fine-grained information. Chunking lets you retrieve the specific paragraph that answers a question rather than the entire document. Store the document ID in chunk metadata to reconstruct the full document when needed.
How do I handle documents that change after initial ingestion?
Track document versions or content hashes. When a document changes, re-chunk and re-embed it, then upsert the new chunks (overwriting by ID). Delete orphaned chunk IDs that no longer correspond to any chunk in the updated document. Most vector databases support upsert natively, making this straightforward.
#BatchProcessing #Embeddings #DataIngestion #Pipeline #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.