Building an Embedding Pipeline: Batch Processing Millions of Documents for Vector Search
Learn how to build a scalable embedding pipeline that processes millions of documents with parallelization, rate limiting, progress tracking, and incremental updates for production vector search.
The Challenge of Embedding at Scale
Generating embeddings for a hundred documents is trivial. Generating embeddings for a million documents introduces a different class of problems: API rate limits, network failures mid-batch, cost optimization, memory management, and the need to incrementally update without re-processing everything.
A naive loop that sends one document at a time to the embedding API would take days for a million documents. A production pipeline parallelizes requests, batches efficiently, tracks progress for resumability, and only re-embeds documents that have actually changed.
Pipeline Architecture
The pipeline has four components: a document source that yields unprocessed records, a batcher that groups documents for efficient API calls, an embedder that handles rate limiting and retries, and a writer that stores results in the vector database.
from dataclasses import dataclass, field
from typing import List, Optional, AsyncIterator
from datetime import datetime
import hashlib
@dataclass
class Document:
id: str
text: str
metadata: dict
content_hash: str = ""
def __post_init__(self):
if not self.content_hash:
self.content_hash = hashlib.sha256(
self.text.encode()
).hexdigest()
@dataclass
class EmbeddedDocument:
id: str
text: str
embedding: List[float]
metadata: dict
content_hash: str
@dataclass
class PipelineStats:
total: int = 0
processed: int = 0
skipped: int = 0
failed: int = 0
started_at: Optional[datetime] = None
@property
def progress_pct(self) -> float:
if self.total == 0:
return 0.0
return (self.processed + self.skipped) / self.total * 100
@property
def rate(self) -> float:
if not self.started_at:
return 0.0
elapsed = (datetime.utcnow() - self.started_at).total_seconds()
return self.processed / max(elapsed, 1)
Incremental Processing with Content Hashing
The single biggest optimization is skipping documents that have not changed. Store a content hash alongside each embedding and compare before re-processing.
class IncrementalSource:
def __init__(self, db_pool, vector_store):
self.db_pool = db_pool
self.vector_store = vector_store
async def get_documents(self) -> AsyncIterator[Document]:
async with self.db_pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, content, metadata FROM documents"
)
existing_hashes = await self.vector_store.get_hashes(
[row["id"] for row in rows]
)
for row in rows:
doc = Document(
id=row["id"],
text=row["content"],
metadata=dict(row["metadata"]),
)
if existing_hashes.get(doc.id) == doc.content_hash:
continue # content unchanged, skip
yield doc
Rate-Limited Parallel Embedder
The embedder sends batched requests with concurrency control and exponential backoff on rate limit errors.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import asyncio
from openai import AsyncOpenAI, RateLimitError
import logging
logger = logging.getLogger(__name__)
class BatchEmbedder:
def __init__(
self,
model: str = "text-embedding-3-small",
batch_size: int = 100,
max_concurrent: int = 5,
max_retries: int = 5,
):
self.client = AsyncOpenAI()
self.model = model
self.batch_size = batch_size
self.semaphore = asyncio.Semaphore(max_concurrent)
self.max_retries = max_retries
async def embed_batch(
self, docs: List[Document]
) -> List[EmbeddedDocument]:
async with self.semaphore:
for attempt in range(self.max_retries):
try:
response = await self.client.embeddings.create(
model=self.model,
input=[d.text[:8191] for d in docs],
)
return [
EmbeddedDocument(
id=docs[i].id,
text=docs[i].text,
embedding=response.data[i].embedding,
metadata=docs[i].metadata,
content_hash=docs[i].content_hash,
)
for i in range(len(docs))
]
except RateLimitError:
wait = 2 ** attempt
logger.warning(
f"Rate limited, retrying in {wait}s "
f"(attempt {attempt + 1})"
)
await asyncio.sleep(wait)
except Exception as e:
logger.error(f"Embedding failed: {e}")
raise
raise RuntimeError(
f"Failed after {self.max_retries} retries"
)
Progress Tracking and Resumability
For million-document pipelines, crashes are inevitable. A checkpoint system lets you resume from where you left off.
import json
from pathlib import Path
class CheckpointManager:
def __init__(self, checkpoint_path: str = "embed_checkpoint.json"):
self.path = Path(checkpoint_path)
self.state = self._load()
def _load(self) -> dict:
if self.path.exists():
return json.loads(self.path.read_text())
return {"processed_ids": [], "stats": {}}
def save(self, stats: PipelineStats, batch_ids: List[str]):
self.state["processed_ids"].extend(batch_ids)
self.state["stats"] = {
"total": stats.total,
"processed": stats.processed,
"skipped": stats.skipped,
"failed": stats.failed,
}
self.path.write_text(json.dumps(self.state))
def is_processed(self, doc_id: str) -> bool:
return doc_id in set(self.state["processed_ids"])
Orchestrating the Full Pipeline
Tie all components together with an orchestrator that coordinates batching, embedding, and writing.
async def run_pipeline(source, embedder, vector_store, checkpoint):
stats = PipelineStats(started_at=datetime.utcnow())
batch = []
async for doc in source.get_documents():
stats.total += 1
if checkpoint.is_processed(doc.id):
stats.skipped += 1
continue
batch.append(doc)
if len(batch) >= embedder.batch_size:
results = await embedder.embed_batch(batch)
await vector_store.upsert_batch(results)
checkpoint.save(stats, [d.id for d in batch])
stats.processed += len(results)
batch = []
if stats.processed % 1000 == 0:
logger.info(
f"Progress: {stats.progress_pct:.1f}% "
f"({stats.processed}/{stats.total}) "
f"Rate: {stats.rate:.1f} docs/sec"
)
# Process remaining
if batch:
results = await embedder.embed_batch(batch)
await vector_store.upsert_batch(results)
checkpoint.save(stats, [d.id for d in batch])
stats.processed += len(results)
logger.info(f"Pipeline complete: {stats.processed} embedded, "
f"{stats.skipped} skipped, {stats.failed} failed")
FAQ
How much does it cost to embed a million documents?
With OpenAI's text-embedding-3-small at approximately $0.02 per million tokens, a million documents averaging 500 tokens each costs around $10. The larger text-embedding-3-large model costs roughly $0.13 per million tokens. These costs make re-embedding feasible when you upgrade models, but incremental processing still saves significant time and API calls.
Should I use a local embedding model instead of an API?
For datasets under 100,000 documents, API-based embeddings are simpler and produce excellent quality. For larger datasets or when you need to avoid sending data to external services, local models like sentence-transformers running on GPU are more cost-effective. A single A100 GPU can embed roughly 10,000 documents per second with a local model.
How do I handle documents that exceed the embedding model's token limit?
Truncation is the simplest approach — the code above clips to 8191 tokens. A better approach is chunking long documents before embedding and storing multiple vectors per document with shared metadata. At query time, retrieve chunks and group them by document ID to reconstruct context.
#Embeddings #VectorSearch #BatchProcessing #DataPipelines #Scalability #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.