Real-Time Semantic Search: Streaming Updates and Incremental Indexing
Build a semantic search system that handles live document updates with queue-based ingestion, incremental vector indexing, and near-real-time search freshness without rebuilding the entire index.
The Freshness Challenge
Most semantic search tutorials assume a static corpus: embed all documents once, build an index, and search. But real-world applications deal with continuously changing data — new articles published every minute, products added and removed, user-generated content streaming in. Rebuilding the entire index on every change is not feasible at scale. You need incremental indexing that processes updates in near-real-time while keeping search results fresh.
Architecture for Real-Time Semantic Search
The architecture has three layers:
- Change capture — detect document creates, updates, and deletes.
- Embedding queue — buffer changes and process embeddings asynchronously.
- Live index — update the vector index incrementally without downtime.
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import time
class ChangeType(Enum):
CREATE = "create"
UPDATE = "update"
DELETE = "delete"
@dataclass
class DocumentChange:
doc_id: str
change_type: ChangeType
title: Optional[str] = None
body: Optional[str] = None
timestamp: float = 0.0
def __post_init__(self):
if self.timestamp == 0.0:
self.timestamp = time.time()
Queue-Based Ingestion Pipeline
We use an async queue to decouple document changes from the embedding computation. This ensures the write path (document updates) is never blocked by the slow embedding step.
import asyncio
from collections import deque
from sentence_transformers import SentenceTransformer
import numpy as np
import logging
logger = logging.getLogger(__name__)
class EmbeddingQueue:
def __init__(
self,
model_name: str = "all-MiniLM-L6-v2",
batch_size: int = 32,
flush_interval: float = 1.0,
):
self.model = SentenceTransformer(model_name)
self.queue: asyncio.Queue = asyncio.Queue()
self.batch_size = batch_size
self.flush_interval = flush_interval
self._running = False
async def enqueue(self, change: DocumentChange):
"""Add a document change to the processing queue."""
await self.queue.put(change)
async def start_worker(self, index: "LiveVectorIndex"):
"""Process queued changes in batches."""
self._running = True
while self._running:
batch = []
try:
# Collect items up to batch_size or flush_interval
deadline = time.time() + self.flush_interval
while len(batch) < self.batch_size:
timeout = max(0, deadline - time.time())
try:
change = await asyncio.wait_for(
self.queue.get(), timeout=timeout
)
batch.append(change)
except asyncio.TimeoutError:
break
if batch:
await self._process_batch(batch, index)
except Exception as e:
logger.error(f"Worker error: {e}")
await asyncio.sleep(0.5)
async def _process_batch(
self, batch: list, index: "LiveVectorIndex"
):
"""Embed and apply a batch of changes."""
creates_updates = [
c for c in batch
if c.change_type in (ChangeType.CREATE, ChangeType.UPDATE)
]
deletes = [
c for c in batch if c.change_type == ChangeType.DELETE
]
if creates_updates:
texts = [
f"{c.title or ''}. {c.body or ''}" for c in creates_updates
]
embeddings = self.model.encode(
texts, normalize_embeddings=True
)
for change, embedding in zip(creates_updates, embeddings):
index.upsert(change.doc_id, embedding, {
"title": change.title,
"body": change.body,
})
for change in deletes:
index.delete(change.doc_id)
logger.info(
f"Processed batch: {len(creates_updates)} upserts, "
f"{len(deletes)} deletes"
)
def stop(self):
self._running = False
The Live Vector Index
FAISS does not natively support deletions or updates by ID. We solve this with an ID-mapped index that wraps FAISS and tracks document-to-index mappings.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import faiss
import threading
class LiveVectorIndex:
def __init__(self, dimension: int = 384):
self.dimension = dimension
self.index = faiss.IndexFlatIP(dimension)
self.id_to_position: dict = {} # doc_id -> index position
self.position_to_id: dict = {} # index position -> doc_id
self.metadata: dict = {} # doc_id -> metadata
self.deleted: set = set() # positions marked as deleted
self._lock = threading.Lock()
self._total_added = 0
def upsert(self, doc_id: str, embedding: np.ndarray, meta: dict):
"""Insert or update a document in the index."""
with self._lock:
if doc_id in self.id_to_position:
old_pos = self.id_to_position[doc_id]
self.deleted.add(old_pos)
position = self._total_added
self.index.add(embedding.reshape(1, -1))
self.id_to_position[doc_id] = position
self.position_to_id[position] = doc_id
self.metadata[doc_id] = meta
self._total_added += 1
def delete(self, doc_id: str):
"""Mark a document as deleted."""
with self._lock:
if doc_id in self.id_to_position:
position = self.id_to_position[doc_id]
self.deleted.add(position)
del self.id_to_position[doc_id]
del self.position_to_id[position]
self.metadata.pop(doc_id, None)
def search(
self, query_embedding: np.ndarray, top_k: int = 10
) -> list:
"""Search with deleted document filtering."""
with self._lock:
fetch_k = top_k + len(self.deleted)
scores, positions = self.index.search(
query_embedding.reshape(1, -1),
min(fetch_k, self.index.ntotal),
)
results = []
for score, pos in zip(scores[0], positions[0]):
if pos == -1 or pos in self.deleted:
continue
doc_id = self.position_to_id.get(pos)
if doc_id is None:
continue
results.append({
"doc_id": doc_id,
"score": float(score),
**self.metadata.get(doc_id, {}),
})
if len(results) >= top_k:
break
return results
def compact(self):
"""Rebuild index without deleted entries to reclaim space."""
with self._lock:
active_ids = list(self.id_to_position.keys())
active_positions = [
self.id_to_position[did] for did in active_ids
]
vectors = np.array([
self.index.reconstruct(pos) for pos in active_positions
])
self.index = faiss.IndexFlatIP(self.dimension)
self.index.add(vectors)
self.id_to_position = {}
self.position_to_id = {}
self.deleted = set()
for i, doc_id in enumerate(active_ids):
self.id_to_position[doc_id] = i
self.position_to_id[i] = doc_id
self._total_added = len(active_ids)
logger.info(f"Compacted index to {len(active_ids)} vectors")
Putting It Together
async def main():
index = LiveVectorIndex(dimension=384)
queue = EmbeddingQueue(batch_size=16, flush_interval=0.5)
worker_task = asyncio.create_task(queue.start_worker(index))
# Simulate incoming document changes
await queue.enqueue(DocumentChange(
doc_id="article-1",
change_type=ChangeType.CREATE,
title="Introduction to Vector Databases",
body="Vector databases store high-dimensional embeddings...",
))
await queue.enqueue(DocumentChange(
doc_id="article-2",
change_type=ChangeType.CREATE,
title="FAISS Performance Tuning",
body="Optimize FAISS indexes for production workloads...",
))
await asyncio.sleep(2) # wait for processing
# Search the live index
model = SentenceTransformer("all-MiniLM-L6-v2")
query_emb = model.encode(
["how to optimize vector search"], normalize_embeddings=True
)
results = index.search(query_emb)
for r in results:
print(f"{r['score']:.3f} — {r.get('title', 'N/A')}")
queue.stop()
When to Compact
The deleted set grows over time as documents are updated or removed. Schedule compaction during low-traffic periods or when the deleted ratio exceeds a threshold.
def should_compact(index: LiveVectorIndex, threshold: float = 0.3) -> bool:
"""Compact when deleted entries exceed threshold of total."""
total = index.index.ntotal
if total == 0:
return False
deleted_ratio = len(index.deleted) / total
return deleted_ratio > threshold
FAQ
What is the typical delay between a document being added and it appearing in search results?
With the queue-based architecture shown here, the delay is roughly the flush interval (default 1 second) plus embedding computation time (50-200ms per document depending on hardware). For most applications, a 1-2 second delay between write and searchability is acceptable. If you need sub-second freshness, reduce the flush interval and use GPU-accelerated embedding.
How do I handle consistency between the source database and the search index?
Use a write-ahead pattern: write the document to your primary database first, then enqueue the change for indexing. If the embedding worker crashes, replay unprocessed changes from the database changelog. For stronger guarantees, use a change data capture (CDC) tool like Debezium that streams database changes to your embedding queue.
When should I rebuild the entire index vs using incremental updates?
Rebuild when changing embedding models (old vectors are incompatible with new ones), after a major data migration, or when the deleted ratio exceeds 50% and compaction alone is insufficient. For day-to-day operations, incremental updates with periodic compaction are more efficient and avoid downtime.
#RealTimeSearch #IncrementalIndexing #Streaming #QueueProcessing #VectorSearch #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.