Skip to content
Learn Agentic AI15 min read0 views

Batch Embedding and Ingestion: Processing Millions of Documents for Vector Search

Build a production-grade pipeline for embedding and ingesting millions of documents into a vector database, covering chunking strategies, parallel processing, rate limiting, and progress tracking.

The Ingestion Challenge

Generating a single embedding takes milliseconds. Generating embeddings for a million documents takes hours if you do it naively — one document at a time, waiting for each API response before sending the next request. A well-designed ingestion pipeline uses batching, parallelism, and fault tolerance to reduce that to minutes.

This guide walks through building a production-grade pipeline that chunks documents, generates embeddings in parallel batches, handles rate limits gracefully, and tracks progress so you can resume after failures.

Step 1: Document Chunking

Most documents exceed the token limit of embedding models (8191 tokens for OpenAI text-embedding-3-small). Split them into overlapping chunks to preserve context at boundaries:

from dataclasses import dataclass

@dataclass
class Chunk:
    doc_id: str
    chunk_index: int
    text: str
    metadata: dict

def chunk_document(
    doc_id: str,
    text: str,
    metadata: dict,
    chunk_size: int = 500,
    overlap: int = 50
) -> list[Chunk]:
    words = text.split()
    chunks = []
    start = 0
    index = 0

    while start < len(words):
        end = start + chunk_size
        chunk_text = " ".join(words[start:end])
        chunks.append(Chunk(
            doc_id=doc_id,
            chunk_index=index,
            text=chunk_text,
            metadata={**metadata, "chunk_index": index}
        ))
        start += chunk_size - overlap
        index += 1

    return chunks

Choose chunk size based on your retrieval needs. Smaller chunks (200-300 words) produce more precise results. Larger chunks (500-1000 words) preserve more context. Overlap of 10-15% prevents information from being split across boundaries.

Step 2: Batch Embedding with Rate Limit Handling

OpenAI's embedding API accepts batches of up to 2048 texts. Batching reduces API calls dramatically:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import time
import openai
from openai import OpenAI

client = OpenAI()

def embed_batch(
    texts: list[str],
    model: str = "text-embedding-3-small",
    max_retries: int = 3
) -> list[list[float]]:
    for attempt in range(max_retries):
        try:
            response = client.embeddings.create(
                model=model,
                input=texts
            )
            return [item.embedding for item in response.data]
        except openai.RateLimitError:
            wait_time = 2 ** attempt * 5  # 5s, 10s, 20s
            print(f"Rate limited. Waiting {wait_time}s...")
            time.sleep(wait_time)
        except openai.APIError as e:
            if attempt == max_retries - 1:
                raise
            print(f"API error: {e}. Retrying...")
            time.sleep(2)
    raise RuntimeError("Max retries exceeded")

Step 3: Parallel Processing Pipeline

Use a producer-consumer pattern with a thread pool. The producer chunks documents and fills a queue. Workers pull batches from the queue, embed them, and upsert to the vector database:

import concurrent.futures
from queue import Queue
from threading import Lock

class EmbeddingPipeline:
    def __init__(self, batch_size: int = 100, max_workers: int = 5):
        self.batch_size = batch_size
        self.max_workers = max_workers
        self.progress = {"embedded": 0, "failed": 0}
        self.lock = Lock()

    def process_batch(self, chunks: list[Chunk]) -> list[dict]:
        texts = [c.text for c in chunks]
        embeddings = embed_batch(texts)

        results = []
        for chunk, embedding in zip(chunks, embeddings):
            results.append({
                "id": f"{chunk.doc_id}_{chunk.chunk_index}",
                "values": embedding,
                "metadata": {
                    **chunk.metadata,
                    "doc_id": chunk.doc_id,
                    "text": chunk.text[:500]  # store truncated text
                }
            })

        with self.lock:
            self.progress["embedded"] += len(results)

        return results

    def run(self, chunks: list[Chunk]):
        # Split into batches
        batches = [
            chunks[i:i + self.batch_size]
            for i in range(0, len(chunks), self.batch_size)
        ]

        all_results = []
        with concurrent.futures.ThreadPoolExecutor(
            max_workers=self.max_workers
        ) as executor:
            futures = {
                executor.submit(self.process_batch, batch): i
                for i, batch in enumerate(batches)
            }

            for future in concurrent.futures.as_completed(futures):
                batch_idx = futures[future]
                try:
                    results = future.result()
                    all_results.extend(results)
                    print(
                        f"Batch {batch_idx + 1}/{len(batches)} done. "
                        f"Total: {self.progress['embedded']}"
                    )
                except Exception as e:
                    with self.lock:
                        self.progress["failed"] += self.batch_size
                    print(f"Batch {batch_idx} failed: {e}")

        return all_results

Step 4: Progress Tracking and Resumability

For million-document pipelines, failures are inevitable. Track progress to enable resuming:

import json
from pathlib import Path

class ProgressTracker:
    def __init__(self, checkpoint_path: str = "ingestion_progress.json"):
        self.path = Path(checkpoint_path)
        self.processed_ids: set[str] = set()
        self._load()

    def _load(self):
        if self.path.exists():
            data = json.loads(self.path.read_text())
            self.processed_ids = set(data.get("processed_ids", []))
            print(f"Resumed: {len(self.processed_ids)} already processed")

    def mark_done(self, doc_ids: list[str]):
        self.processed_ids.update(doc_ids)
        # Save checkpoint every 1000 documents
        if len(self.processed_ids) % 1000 == 0:
            self._save()

    def _save(self):
        self.path.write_text(json.dumps({
            "processed_ids": list(self.processed_ids),
            "count": len(self.processed_ids)
        }))

    def should_process(self, doc_id: str) -> bool:
        return doc_id not in self.processed_ids

Step 5: Putting It All Together

def ingest_documents(documents: list[dict]):
    tracker = ProgressTracker()
    pipeline = EmbeddingPipeline(batch_size=100, max_workers=5)

    # Filter already-processed documents
    pending = [
        doc for doc in documents
        if tracker.should_process(doc["id"])
    ]
    print(f"Processing {len(pending)} of {len(documents)} documents")

    # Chunk all pending documents
    all_chunks = []
    for doc in pending:
        chunks = chunk_document(
            doc_id=doc["id"],
            text=doc["content"],
            metadata={"source": doc.get("source", "unknown")}
        )
        all_chunks.append((doc["id"], chunks))

    # Flatten chunks for pipeline
    flat_chunks = [c for _, chunks in all_chunks for c in chunks]

    # Embed and collect results
    results = pipeline.run(flat_chunks)

    # Upsert to vector database in batches
    for i in range(0, len(results), 100):
        batch = results[i:i + 100]
        index.upsert(vectors=batch)  # Pinecone example
        doc_ids = list(set(r["metadata"]["doc_id"] for r in batch))
        tracker.mark_done(doc_ids)

    print(f"Done. Embedded: {pipeline.progress['embedded']}, "
          f"Failed: {pipeline.progress['failed']}")

Performance Tips

  • Use the largest batch size your API allows. OpenAI supports up to 2048 inputs per request. Larger batches reduce HTTP overhead.
  • Set max_workers to 3-5. More workers hit rate limits faster. Fewer workers leave throughput on the table.
  • Pre-filter empty or duplicate documents before chunking to avoid wasting embedding API calls.
  • Monitor token usage. Embedding cost is per token, not per request. Long documents cost more.

FAQ

How long does it take to embed one million documents?

With OpenAI text-embedding-3-small at 5 parallel workers and 100 documents per batch, expect roughly 2-4 hours for one million average-length documents (500 words each). The bottleneck is API rate limits, not compute. Local embedding models like sentence-transformers on a GPU can process the same volume in 30-60 minutes.

Should I embed entire documents or chunks?

Almost always chunks. Embedding models have token limits, and embedding a long document into a single vector loses fine-grained information. Chunking lets you retrieve the specific paragraph that answers a question rather than the entire document. Store the document ID in chunk metadata to reconstruct the full document when needed.

How do I handle documents that change after initial ingestion?

Track document versions or content hashes. When a document changes, re-chunk and re-embed it, then upsert the new chunks (overwriting by ID). Delete orphaned chunk IDs that no longer correspond to any chunk in the updated document. Most vector databases support upsert natively, making this straightforward.


#BatchProcessing #Embeddings #DataIngestion #Pipeline #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.