Skip to content
Learn Agentic AI13 min read0 views

Production RAG Architecture: Caching, Monitoring, and Scaling Retrieval Pipelines

Learn how to take a RAG pipeline from prototype to production with response caching, embedding caching, async retrieval, horizontal scaling, monitoring, and operational best practices.

The Gap Between Demo and Production RAG

A demo RAG pipeline answers questions from a Jupyter notebook. A production RAG system handles hundreds of concurrent users, stays fast under load, provides observability into failures, and costs a predictable amount per month. Bridging this gap requires architectural decisions around caching, async processing, scaling, and monitoring.

This post covers the patterns that separate production-grade RAG from prototype-grade RAG.

Layer 1: Response Caching

The most impactful optimization is caching complete responses. If 100 users ask "What is the refund policy?" in the same day, you should compute the answer once and serve it from cache 99 times.

import hashlib
import json
import redis
from datetime import timedelta

class RAGCache:
    def __init__(self, redis_url: str = "redis://localhost:6379/0"):
        self.redis = redis.from_url(redis_url)
        self.default_ttl = timedelta(hours=4)

    def _cache_key(self, query: str, filters: dict | None = None) -> str:
        """Deterministic cache key from query + filters."""
        payload = {"query": query.strip().lower(), "filters": filters or {}}
        content = json.dumps(payload, sort_keys=True)
        return f"rag:response:{hashlib.sha256(content.encode()).hexdigest()[:16]}"

    def get(self, query: str, filters: dict | None = None) -> dict | None:
        key = self._cache_key(query, filters)
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)
        return None

    def set(
        self, query: str, response: dict, filters: dict | None = None, ttl: timedelta | None = None
    ):
        key = self._cache_key(query, filters)
        self.redis.setex(
            key,
            ttl or self.default_ttl,
            json.dumps(response),
        )

    def invalidate_all(self):
        """Invalidate all RAG cache entries (after re-indexing)."""
        keys = self.redis.keys("rag:response:*")
        if keys:
            self.redis.delete(*keys)

Integrate the cache into your RAG pipeline:

cache = RAGCache()

def ask(query: str, filters: dict | None = None) -> dict:
    # Check cache first
    cached = cache.get(query, filters)
    if cached:
        cached["from_cache"] = True
        return cached

    # Run the full pipeline
    docs = retriever.invoke(query)
    context = format_docs(docs)
    answer = llm.invoke(build_prompt(context, query)).content

    result = {
        "answer": answer,
        "sources": [d.metadata.get("source") for d in docs],
        "from_cache": False,
    }

    # Cache the result
    cache.set(query, result, filters)
    return result

Cache invalidation strategy: Invalidate all cached responses whenever you re-index documents. Set TTLs based on how frequently your knowledge base changes — 4 hours for frequently updated content, 24 hours for stable documentation.

Layer 2: Embedding Caching

Embedding API calls are the second most common cost center. Cache embeddings for repeated queries and for documents during re-indexing:

class EmbeddingCache:
    def __init__(self, redis_url: str = "redis://localhost:6379/1"):
        self.redis = redis.from_url(redis_url)
        self.ttl = timedelta(days=30)

    def _key(self, text: str, model: str) -> str:
        content = f"{model}:{text.strip()}"
        return f"rag:embed:{hashlib.sha256(content.encode()).hexdigest()[:16]}"

    def get_embedding(self, text: str, model: str) -> list[float] | None:
        cached = self.redis.get(self._key(text, model))
        if cached:
            return json.loads(cached)
        return None

    def set_embedding(self, text: str, model: str, embedding: list[float]):
        self.redis.setex(
            self._key(text, model),
            self.ttl,
            json.dumps(embedding),
        )

class CachedEmbeddings:
    """Wrapper around any embedding model that adds caching."""

    def __init__(self, base_embeddings, cache: EmbeddingCache, model_name: str):
        self.base = base_embeddings
        self.cache = cache
        self.model_name = model_name

    def embed_documents(self, texts: list[str]) -> list[list[float]]:
        results = []
        uncached_texts = []
        uncached_indices = []

        for i, text in enumerate(texts):
            cached = self.cache.get_embedding(text, self.model_name)
            if cached:
                results.append(cached)
            else:
                results.append(None)
                uncached_texts.append(text)
                uncached_indices.append(i)

        # Embed only uncached texts
        if uncached_texts:
            new_embeddings = self.base.embed_documents(uncached_texts)
            for idx, emb in zip(uncached_indices, new_embeddings):
                results[idx] = emb
                self.cache.set_embedding(texts[idx], self.model_name, emb)

        return results

This pattern reduces embedding API costs by 60-80% in practice because queries repeat far more than you might expect.

Layer 3: Async Retrieval with FastAPI

Wrap your RAG pipeline in an async API server for production serving:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio
from contextlib import asynccontextmanager

# Global resources
rag_pipeline = None
cache = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global rag_pipeline, cache
    # Initialize on startup
    rag_pipeline = build_rag_pipeline()
    cache = RAGCache()
    yield
    # Cleanup on shutdown

app = FastAPI(lifespan=lifespan)

class QueryRequest(BaseModel):
    question: str
    filters: dict | None = None
    use_cache: bool = True

class QueryResponse(BaseModel):
    answer: str
    sources: list[str]
    from_cache: bool
    latency_ms: float

@app.post("/api/rag/query", response_model=QueryResponse)
async def query_rag(request: QueryRequest):
    import time
    start = time.monotonic()

    # Check cache
    if request.use_cache:
        cached = cache.get(request.question, request.filters)
        if cached:
            elapsed = (time.monotonic() - start) * 1000
            return QueryResponse(
                answer=cached["answer"],
                sources=cached["sources"],
                from_cache=True,
                latency_ms=round(elapsed, 1),
            )

    # Run retrieval in thread pool (blocking IO)
    result = await asyncio.to_thread(
        rag_pipeline.ask,
        request.question,
        request.filters,
    )

    elapsed = (time.monotonic() - start) * 1000

    # Cache the result
    cache.set(request.question, result, request.filters)

    return QueryResponse(
        answer=result["answer"],
        sources=result["sources"],
        from_cache=False,
        latency_ms=round(elapsed, 1),
    )

@app.post("/api/rag/invalidate-cache")
async def invalidate_cache():
    cache.invalidate_all()
    return {"status": "cache invalidated"}

Layer 4: Monitoring and Observability

You cannot improve what you do not measure. Instrument your RAG pipeline with metrics that reveal both performance and quality:

import time
import logging
from dataclasses import dataclass, field
from collections import defaultdict

logger = logging.getLogger("rag")

@dataclass
class RAGMetrics:
    query_count: int = 0
    cache_hits: int = 0
    cache_misses: int = 0
    retrieval_latencies: list = field(default_factory=list)
    generation_latencies: list = field(default_factory=list)
    empty_retrievals: int = 0
    error_count: int = 0

    @property
    def cache_hit_rate(self) -> float:
        total = self.cache_hits + self.cache_misses
        return self.cache_hits / total if total > 0 else 0

    @property
    def avg_retrieval_latency_ms(self) -> float:
        if not self.retrieval_latencies:
            return 0
        return sum(self.retrieval_latencies) / len(self.retrieval_latencies) * 1000

    @property
    def p95_retrieval_latency_ms(self) -> float:
        if not self.retrieval_latencies:
            return 0
        sorted_lats = sorted(self.retrieval_latencies)
        idx = int(len(sorted_lats) * 0.95)
        return sorted_lats[idx] * 1000

    def report(self) -> dict:
        return {
            "total_queries": self.query_count,
            "cache_hit_rate": f"{self.cache_hit_rate:.1%}",
            "avg_retrieval_ms": f"{self.avg_retrieval_latency_ms:.0f}",
            "p95_retrieval_ms": f"{self.p95_retrieval_latency_ms:.0f}",
            "empty_retrieval_rate": f"{self.empty_retrievals / max(self.query_count, 1):.1%}",
            "error_rate": f"{self.error_count / max(self.query_count, 1):.1%}",
        }

metrics = RAGMetrics()

Key metrics to track:

Metric Warning Threshold What It Tells You
Cache hit rate Below 30% Users are asking diverse questions or cache TTL is too short
Empty retrieval rate Above 10% Knowledge base gaps or poor embedding quality
P95 retrieval latency Above 500ms Vector index needs optimization or scaling
Error rate Above 1% Infrastructure issues or API rate limits

Layer 5: Horizontal Scaling

When a single instance cannot handle the load, scale the RAG pipeline horizontally:

# kubernetes deployment for RAG API
apiVersion: apps/v1
kind: Deployment
metadata:
  name: rag-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: rag-api
  template:
    metadata:
      labels:
        app: rag-api
    spec:
      containers:
      - name: rag-api
        image: your-registry/rag-api:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        env:
        - name: REDIS_URL
          value: "redis://redis-svc:6379/0"
        - name: VECTOR_DB_URL
          value: "postgresql://user:pass@pgvector-svc:5432/ragdb"

The key insight is that the RAG API layer is stateless — all state lives in Redis (cache) and the vector database. This means you can scale API replicas freely. The bottleneck shifts to the vector database, which you scale by adding read replicas (pgvector) or upgrading to a higher tier (Pinecone, Weaviate Cloud).

Ingestion Pipeline Separation

Separate the indexing pipeline from the query pipeline. They have different scaling, latency, and reliability requirements:

# ingestion_worker.py — runs as a background job or cron
async def ingest_documents(source_dir: str):
    """Offline ingestion: load, chunk, embed, store."""
    documents = load_documents(source_dir)
    chunks = chunk_documents(documents)

    # Batch embed for efficiency
    batch_size = 100
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i + batch_size]
        vectorstore.add_documents(batch)
        logger.info(f"Indexed batch {i//batch_size + 1}: {len(batch)} chunks")

    # Invalidate response cache after re-indexing
    cache.invalidate_all()
    logger.info(f"Ingestion complete: {len(chunks)} total chunks indexed")

Run ingestion on a schedule (hourly, nightly) or trigger it when source documents change via webhooks.

FAQ

How much does Redis caching reduce RAG costs?

In production systems with moderate query diversity, response caching typically reduces LLM generation costs by 40-70%. Embedding caching reduces embedding API costs by 60-80%. The combined effect is usually a 50-60% reduction in total API costs, with Redis adding minimal infrastructure cost (a small instance handles millions of cached entries).

Should I cache at the response level or the retrieval level?

Cache at both levels. Response-level caching (query -> full answer) provides the biggest latency and cost savings for identical queries. Retrieval-level caching (query -> retrieved chunks) helps when the same chunks are relevant to different questions, and is useful if you update your generation prompt frequently but keep the same knowledge base.

How do I handle knowledge base updates without downtime?

Use a blue-green indexing strategy. Build the new index alongside the old one. Once the new index is fully populated and validated, atomically switch the query pipeline to use the new index. In pgvector, this means creating a new table, indexing into it, then swapping table names in a transaction. In Chroma, create a new collection and update the collection reference. Invalidate the response cache after the swap.


#RAG #ProductionArchitecture #Caching #Monitoring #Scaling #MLOps #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.