Skip to content
Learn Agentic AI11 min read0 views

Building an Embedding Pipeline: Batch Processing Millions of Documents for Vector Search

Learn how to build a scalable embedding pipeline that processes millions of documents with parallelization, rate limiting, progress tracking, and incremental updates for production vector search.

The Challenge of Embedding at Scale

Generating embeddings for a hundred documents is trivial. Generating embeddings for a million documents introduces a different class of problems: API rate limits, network failures mid-batch, cost optimization, memory management, and the need to incrementally update without re-processing everything.

A naive loop that sends one document at a time to the embedding API would take days for a million documents. A production pipeline parallelizes requests, batches efficiently, tracks progress for resumability, and only re-embeds documents that have actually changed.

Pipeline Architecture

The pipeline has four components: a document source that yields unprocessed records, a batcher that groups documents for efficient API calls, an embedder that handles rate limiting and retries, and a writer that stores results in the vector database.

from dataclasses import dataclass, field
from typing import List, Optional, AsyncIterator
from datetime import datetime
import hashlib

@dataclass
class Document:
    id: str
    text: str
    metadata: dict
    content_hash: str = ""

    def __post_init__(self):
        if not self.content_hash:
            self.content_hash = hashlib.sha256(
                self.text.encode()
            ).hexdigest()

@dataclass
class EmbeddedDocument:
    id: str
    text: str
    embedding: List[float]
    metadata: dict
    content_hash: str

@dataclass
class PipelineStats:
    total: int = 0
    processed: int = 0
    skipped: int = 0
    failed: int = 0
    started_at: Optional[datetime] = None

    @property
    def progress_pct(self) -> float:
        if self.total == 0:
            return 0.0
        return (self.processed + self.skipped) / self.total * 100

    @property
    def rate(self) -> float:
        if not self.started_at:
            return 0.0
        elapsed = (datetime.utcnow() - self.started_at).total_seconds()
        return self.processed / max(elapsed, 1)

Incremental Processing with Content Hashing

The single biggest optimization is skipping documents that have not changed. Store a content hash alongside each embedding and compare before re-processing.

class IncrementalSource:
    def __init__(self, db_pool, vector_store):
        self.db_pool = db_pool
        self.vector_store = vector_store

    async def get_documents(self) -> AsyncIterator[Document]:
        async with self.db_pool.acquire() as conn:
            rows = await conn.fetch(
                "SELECT id, content, metadata FROM documents"
            )

        existing_hashes = await self.vector_store.get_hashes(
            [row["id"] for row in rows]
        )

        for row in rows:
            doc = Document(
                id=row["id"],
                text=row["content"],
                metadata=dict(row["metadata"]),
            )
            if existing_hashes.get(doc.id) == doc.content_hash:
                continue  # content unchanged, skip
            yield doc

Rate-Limited Parallel Embedder

The embedder sends batched requests with concurrency control and exponential backoff on rate limit errors.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import asyncio
from openai import AsyncOpenAI, RateLimitError
import logging

logger = logging.getLogger(__name__)

class BatchEmbedder:
    def __init__(
        self,
        model: str = "text-embedding-3-small",
        batch_size: int = 100,
        max_concurrent: int = 5,
        max_retries: int = 5,
    ):
        self.client = AsyncOpenAI()
        self.model = model
        self.batch_size = batch_size
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.max_retries = max_retries

    async def embed_batch(
        self, docs: List[Document]
    ) -> List[EmbeddedDocument]:
        async with self.semaphore:
            for attempt in range(self.max_retries):
                try:
                    response = await self.client.embeddings.create(
                        model=self.model,
                        input=[d.text[:8191] for d in docs],
                    )
                    return [
                        EmbeddedDocument(
                            id=docs[i].id,
                            text=docs[i].text,
                            embedding=response.data[i].embedding,
                            metadata=docs[i].metadata,
                            content_hash=docs[i].content_hash,
                        )
                        for i in range(len(docs))
                    ]
                except RateLimitError:
                    wait = 2 ** attempt
                    logger.warning(
                        f"Rate limited, retrying in {wait}s "
                        f"(attempt {attempt + 1})"
                    )
                    await asyncio.sleep(wait)
                except Exception as e:
                    logger.error(f"Embedding failed: {e}")
                    raise
            raise RuntimeError(
                f"Failed after {self.max_retries} retries"
            )

Progress Tracking and Resumability

For million-document pipelines, crashes are inevitable. A checkpoint system lets you resume from where you left off.

import json
from pathlib import Path

class CheckpointManager:
    def __init__(self, checkpoint_path: str = "embed_checkpoint.json"):
        self.path = Path(checkpoint_path)
        self.state = self._load()

    def _load(self) -> dict:
        if self.path.exists():
            return json.loads(self.path.read_text())
        return {"processed_ids": [], "stats": {}}

    def save(self, stats: PipelineStats, batch_ids: List[str]):
        self.state["processed_ids"].extend(batch_ids)
        self.state["stats"] = {
            "total": stats.total,
            "processed": stats.processed,
            "skipped": stats.skipped,
            "failed": stats.failed,
        }
        self.path.write_text(json.dumps(self.state))

    def is_processed(self, doc_id: str) -> bool:
        return doc_id in set(self.state["processed_ids"])

Orchestrating the Full Pipeline

Tie all components together with an orchestrator that coordinates batching, embedding, and writing.

async def run_pipeline(source, embedder, vector_store, checkpoint):
    stats = PipelineStats(started_at=datetime.utcnow())
    batch = []

    async for doc in source.get_documents():
        stats.total += 1
        if checkpoint.is_processed(doc.id):
            stats.skipped += 1
            continue

        batch.append(doc)
        if len(batch) >= embedder.batch_size:
            results = await embedder.embed_batch(batch)
            await vector_store.upsert_batch(results)
            checkpoint.save(stats, [d.id for d in batch])
            stats.processed += len(results)
            batch = []

            if stats.processed % 1000 == 0:
                logger.info(
                    f"Progress: {stats.progress_pct:.1f}% "
                    f"({stats.processed}/{stats.total}) "
                    f"Rate: {stats.rate:.1f} docs/sec"
                )

    # Process remaining
    if batch:
        results = await embedder.embed_batch(batch)
        await vector_store.upsert_batch(results)
        checkpoint.save(stats, [d.id for d in batch])
        stats.processed += len(results)

    logger.info(f"Pipeline complete: {stats.processed} embedded, "
                f"{stats.skipped} skipped, {stats.failed} failed")

FAQ

How much does it cost to embed a million documents?

With OpenAI's text-embedding-3-small at approximately $0.02 per million tokens, a million documents averaging 500 tokens each costs around $10. The larger text-embedding-3-large model costs roughly $0.13 per million tokens. These costs make re-embedding feasible when you upgrade models, but incremental processing still saves significant time and API calls.

Should I use a local embedding model instead of an API?

For datasets under 100,000 documents, API-based embeddings are simpler and produce excellent quality. For larger datasets or when you need to avoid sending data to external services, local models like sentence-transformers running on GPU are more cost-effective. A single A100 GPU can embed roughly 10,000 documents per second with a local model.

How do I handle documents that exceed the embedding model's token limit?

Truncation is the simplest approach — the code above clips to 8191 tokens. A better approach is chunking long documents before embedding and storing multiple vectors per document with shared metadata. At query time, retrieve chunks and group them by document ID to reconstruct context.


#Embeddings #VectorSearch #BatchProcessing #DataPipelines #Scalability #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.