Skip to content
Learn Agentic AI14 min read0 views

Real-Time Semantic Search: Streaming Updates and Incremental Indexing

Build a semantic search system that handles live document updates with queue-based ingestion, incremental vector indexing, and near-real-time search freshness without rebuilding the entire index.

The Freshness Challenge

Most semantic search tutorials assume a static corpus: embed all documents once, build an index, and search. But real-world applications deal with continuously changing data — new articles published every minute, products added and removed, user-generated content streaming in. Rebuilding the entire index on every change is not feasible at scale. You need incremental indexing that processes updates in near-real-time while keeping search results fresh.

The architecture has three layers:

  1. Change capture — detect document creates, updates, and deletes.
  2. Embedding queue — buffer changes and process embeddings asynchronously.
  3. Live index — update the vector index incrementally without downtime.
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import time

class ChangeType(Enum):
    CREATE = "create"
    UPDATE = "update"
    DELETE = "delete"

@dataclass
class DocumentChange:
    doc_id: str
    change_type: ChangeType
    title: Optional[str] = None
    body: Optional[str] = None
    timestamp: float = 0.0

    def __post_init__(self):
        if self.timestamp == 0.0:
            self.timestamp = time.time()

Queue-Based Ingestion Pipeline

We use an async queue to decouple document changes from the embedding computation. This ensures the write path (document updates) is never blocked by the slow embedding step.

import asyncio
from collections import deque
from sentence_transformers import SentenceTransformer
import numpy as np
import logging

logger = logging.getLogger(__name__)

class EmbeddingQueue:
    def __init__(
        self,
        model_name: str = "all-MiniLM-L6-v2",
        batch_size: int = 32,
        flush_interval: float = 1.0,
    ):
        self.model = SentenceTransformer(model_name)
        self.queue: asyncio.Queue = asyncio.Queue()
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self._running = False

    async def enqueue(self, change: DocumentChange):
        """Add a document change to the processing queue."""
        await self.queue.put(change)

    async def start_worker(self, index: "LiveVectorIndex"):
        """Process queued changes in batches."""
        self._running = True
        while self._running:
            batch = []
            try:
                # Collect items up to batch_size or flush_interval
                deadline = time.time() + self.flush_interval
                while len(batch) < self.batch_size:
                    timeout = max(0, deadline - time.time())
                    try:
                        change = await asyncio.wait_for(
                            self.queue.get(), timeout=timeout
                        )
                        batch.append(change)
                    except asyncio.TimeoutError:
                        break

                if batch:
                    await self._process_batch(batch, index)

            except Exception as e:
                logger.error(f"Worker error: {e}")
                await asyncio.sleep(0.5)

    async def _process_batch(
        self, batch: list, index: "LiveVectorIndex"
    ):
        """Embed and apply a batch of changes."""
        creates_updates = [
            c for c in batch
            if c.change_type in (ChangeType.CREATE, ChangeType.UPDATE)
        ]
        deletes = [
            c for c in batch if c.change_type == ChangeType.DELETE
        ]

        if creates_updates:
            texts = [
                f"{c.title or ''}. {c.body or ''}" for c in creates_updates
            ]
            embeddings = self.model.encode(
                texts, normalize_embeddings=True
            )
            for change, embedding in zip(creates_updates, embeddings):
                index.upsert(change.doc_id, embedding, {
                    "title": change.title,
                    "body": change.body,
                })

        for change in deletes:
            index.delete(change.doc_id)

        logger.info(
            f"Processed batch: {len(creates_updates)} upserts, "
            f"{len(deletes)} deletes"
        )

    def stop(self):
        self._running = False

The Live Vector Index

FAISS does not natively support deletions or updates by ID. We solve this with an ID-mapped index that wraps FAISS and tracks document-to-index mappings.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import faiss
import threading

class LiveVectorIndex:
    def __init__(self, dimension: int = 384):
        self.dimension = dimension
        self.index = faiss.IndexFlatIP(dimension)
        self.id_to_position: dict = {}  # doc_id -> index position
        self.position_to_id: dict = {}  # index position -> doc_id
        self.metadata: dict = {}  # doc_id -> metadata
        self.deleted: set = set()  # positions marked as deleted
        self._lock = threading.Lock()
        self._total_added = 0

    def upsert(self, doc_id: str, embedding: np.ndarray, meta: dict):
        """Insert or update a document in the index."""
        with self._lock:
            if doc_id in self.id_to_position:
                old_pos = self.id_to_position[doc_id]
                self.deleted.add(old_pos)

            position = self._total_added
            self.index.add(embedding.reshape(1, -1))
            self.id_to_position[doc_id] = position
            self.position_to_id[position] = doc_id
            self.metadata[doc_id] = meta
            self._total_added += 1

    def delete(self, doc_id: str):
        """Mark a document as deleted."""
        with self._lock:
            if doc_id in self.id_to_position:
                position = self.id_to_position[doc_id]
                self.deleted.add(position)
                del self.id_to_position[doc_id]
                del self.position_to_id[position]
                self.metadata.pop(doc_id, None)

    def search(
        self, query_embedding: np.ndarray, top_k: int = 10
    ) -> list:
        """Search with deleted document filtering."""
        with self._lock:
            fetch_k = top_k + len(self.deleted)
            scores, positions = self.index.search(
                query_embedding.reshape(1, -1),
                min(fetch_k, self.index.ntotal),
            )

            results = []
            for score, pos in zip(scores[0], positions[0]):
                if pos == -1 or pos in self.deleted:
                    continue
                doc_id = self.position_to_id.get(pos)
                if doc_id is None:
                    continue
                results.append({
                    "doc_id": doc_id,
                    "score": float(score),
                    **self.metadata.get(doc_id, {}),
                })
                if len(results) >= top_k:
                    break
            return results

    def compact(self):
        """Rebuild index without deleted entries to reclaim space."""
        with self._lock:
            active_ids = list(self.id_to_position.keys())
            active_positions = [
                self.id_to_position[did] for did in active_ids
            ]

            vectors = np.array([
                self.index.reconstruct(pos) for pos in active_positions
            ])

            self.index = faiss.IndexFlatIP(self.dimension)
            self.index.add(vectors)

            self.id_to_position = {}
            self.position_to_id = {}
            self.deleted = set()

            for i, doc_id in enumerate(active_ids):
                self.id_to_position[doc_id] = i
                self.position_to_id[i] = doc_id

            self._total_added = len(active_ids)
            logger.info(f"Compacted index to {len(active_ids)} vectors")

Putting It Together

async def main():
    index = LiveVectorIndex(dimension=384)
    queue = EmbeddingQueue(batch_size=16, flush_interval=0.5)

    worker_task = asyncio.create_task(queue.start_worker(index))

    # Simulate incoming document changes
    await queue.enqueue(DocumentChange(
        doc_id="article-1",
        change_type=ChangeType.CREATE,
        title="Introduction to Vector Databases",
        body="Vector databases store high-dimensional embeddings...",
    ))
    await queue.enqueue(DocumentChange(
        doc_id="article-2",
        change_type=ChangeType.CREATE,
        title="FAISS Performance Tuning",
        body="Optimize FAISS indexes for production workloads...",
    ))

    await asyncio.sleep(2)  # wait for processing

    # Search the live index
    model = SentenceTransformer("all-MiniLM-L6-v2")
    query_emb = model.encode(
        ["how to optimize vector search"], normalize_embeddings=True
    )
    results = index.search(query_emb)
    for r in results:
        print(f"{r['score']:.3f} — {r.get('title', 'N/A')}")

    queue.stop()

When to Compact

The deleted set grows over time as documents are updated or removed. Schedule compaction during low-traffic periods or when the deleted ratio exceeds a threshold.

def should_compact(index: LiveVectorIndex, threshold: float = 0.3) -> bool:
    """Compact when deleted entries exceed threshold of total."""
    total = index.index.ntotal
    if total == 0:
        return False
    deleted_ratio = len(index.deleted) / total
    return deleted_ratio > threshold

FAQ

What is the typical delay between a document being added and it appearing in search results?

With the queue-based architecture shown here, the delay is roughly the flush interval (default 1 second) plus embedding computation time (50-200ms per document depending on hardware). For most applications, a 1-2 second delay between write and searchability is acceptable. If you need sub-second freshness, reduce the flush interval and use GPU-accelerated embedding.

How do I handle consistency between the source database and the search index?

Use a write-ahead pattern: write the document to your primary database first, then enqueue the change for indexing. If the embedding worker crashes, replay unprocessed changes from the database changelog. For stronger guarantees, use a change data capture (CDC) tool like Debezium that streams database changes to your embedding queue.

When should I rebuild the entire index vs using incremental updates?

Rebuild when changing embedding models (old vectors are incompatible with new ones), after a major data migration, or when the deleted ratio exceeds 50% and compaction alone is insufficient. For day-to-day operations, incremental updates with periodic compaction are more efficient and avoid downtime.


#RealTimeSearch #IncrementalIndexing #Streaming #QueueProcessing #VectorSearch #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.