Production RAG Architecture: Caching, Monitoring, and Scaling Retrieval Pipelines
Learn how to take a RAG pipeline from prototype to production with response caching, embedding caching, async retrieval, horizontal scaling, monitoring, and operational best practices.
The Gap Between Demo and Production RAG
A demo RAG pipeline answers questions from a Jupyter notebook. A production RAG system handles hundreds of concurrent users, stays fast under load, provides observability into failures, and costs a predictable amount per month. Bridging this gap requires architectural decisions around caching, async processing, scaling, and monitoring.
This post covers the patterns that separate production-grade RAG from prototype-grade RAG.
Layer 1: Response Caching
The most impactful optimization is caching complete responses. If 100 users ask "What is the refund policy?" in the same day, you should compute the answer once and serve it from cache 99 times.
import hashlib
import json
import redis
from datetime import timedelta
class RAGCache:
def __init__(self, redis_url: str = "redis://localhost:6379/0"):
self.redis = redis.from_url(redis_url)
self.default_ttl = timedelta(hours=4)
def _cache_key(self, query: str, filters: dict | None = None) -> str:
"""Deterministic cache key from query + filters."""
payload = {"query": query.strip().lower(), "filters": filters or {}}
content = json.dumps(payload, sort_keys=True)
return f"rag:response:{hashlib.sha256(content.encode()).hexdigest()[:16]}"
def get(self, query: str, filters: dict | None = None) -> dict | None:
key = self._cache_key(query, filters)
cached = self.redis.get(key)
if cached:
return json.loads(cached)
return None
def set(
self, query: str, response: dict, filters: dict | None = None, ttl: timedelta | None = None
):
key = self._cache_key(query, filters)
self.redis.setex(
key,
ttl or self.default_ttl,
json.dumps(response),
)
def invalidate_all(self):
"""Invalidate all RAG cache entries (after re-indexing)."""
keys = self.redis.keys("rag:response:*")
if keys:
self.redis.delete(*keys)
Integrate the cache into your RAG pipeline:
cache = RAGCache()
def ask(query: str, filters: dict | None = None) -> dict:
# Check cache first
cached = cache.get(query, filters)
if cached:
cached["from_cache"] = True
return cached
# Run the full pipeline
docs = retriever.invoke(query)
context = format_docs(docs)
answer = llm.invoke(build_prompt(context, query)).content
result = {
"answer": answer,
"sources": [d.metadata.get("source") for d in docs],
"from_cache": False,
}
# Cache the result
cache.set(query, result, filters)
return result
Cache invalidation strategy: Invalidate all cached responses whenever you re-index documents. Set TTLs based on how frequently your knowledge base changes — 4 hours for frequently updated content, 24 hours for stable documentation.
Layer 2: Embedding Caching
Embedding API calls are the second most common cost center. Cache embeddings for repeated queries and for documents during re-indexing:
class EmbeddingCache:
def __init__(self, redis_url: str = "redis://localhost:6379/1"):
self.redis = redis.from_url(redis_url)
self.ttl = timedelta(days=30)
def _key(self, text: str, model: str) -> str:
content = f"{model}:{text.strip()}"
return f"rag:embed:{hashlib.sha256(content.encode()).hexdigest()[:16]}"
def get_embedding(self, text: str, model: str) -> list[float] | None:
cached = self.redis.get(self._key(text, model))
if cached:
return json.loads(cached)
return None
def set_embedding(self, text: str, model: str, embedding: list[float]):
self.redis.setex(
self._key(text, model),
self.ttl,
json.dumps(embedding),
)
class CachedEmbeddings:
"""Wrapper around any embedding model that adds caching."""
def __init__(self, base_embeddings, cache: EmbeddingCache, model_name: str):
self.base = base_embeddings
self.cache = cache
self.model_name = model_name
def embed_documents(self, texts: list[str]) -> list[list[float]]:
results = []
uncached_texts = []
uncached_indices = []
for i, text in enumerate(texts):
cached = self.cache.get_embedding(text, self.model_name)
if cached:
results.append(cached)
else:
results.append(None)
uncached_texts.append(text)
uncached_indices.append(i)
# Embed only uncached texts
if uncached_texts:
new_embeddings = self.base.embed_documents(uncached_texts)
for idx, emb in zip(uncached_indices, new_embeddings):
results[idx] = emb
self.cache.set_embedding(texts[idx], self.model_name, emb)
return results
This pattern reduces embedding API costs by 60-80% in practice because queries repeat far more than you might expect.
Layer 3: Async Retrieval with FastAPI
Wrap your RAG pipeline in an async API server for production serving:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio
from contextlib import asynccontextmanager
# Global resources
rag_pipeline = None
cache = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global rag_pipeline, cache
# Initialize on startup
rag_pipeline = build_rag_pipeline()
cache = RAGCache()
yield
# Cleanup on shutdown
app = FastAPI(lifespan=lifespan)
class QueryRequest(BaseModel):
question: str
filters: dict | None = None
use_cache: bool = True
class QueryResponse(BaseModel):
answer: str
sources: list[str]
from_cache: bool
latency_ms: float
@app.post("/api/rag/query", response_model=QueryResponse)
async def query_rag(request: QueryRequest):
import time
start = time.monotonic()
# Check cache
if request.use_cache:
cached = cache.get(request.question, request.filters)
if cached:
elapsed = (time.monotonic() - start) * 1000
return QueryResponse(
answer=cached["answer"],
sources=cached["sources"],
from_cache=True,
latency_ms=round(elapsed, 1),
)
# Run retrieval in thread pool (blocking IO)
result = await asyncio.to_thread(
rag_pipeline.ask,
request.question,
request.filters,
)
elapsed = (time.monotonic() - start) * 1000
# Cache the result
cache.set(request.question, result, request.filters)
return QueryResponse(
answer=result["answer"],
sources=result["sources"],
from_cache=False,
latency_ms=round(elapsed, 1),
)
@app.post("/api/rag/invalidate-cache")
async def invalidate_cache():
cache.invalidate_all()
return {"status": "cache invalidated"}
Layer 4: Monitoring and Observability
You cannot improve what you do not measure. Instrument your RAG pipeline with metrics that reveal both performance and quality:
import time
import logging
from dataclasses import dataclass, field
from collections import defaultdict
logger = logging.getLogger("rag")
@dataclass
class RAGMetrics:
query_count: int = 0
cache_hits: int = 0
cache_misses: int = 0
retrieval_latencies: list = field(default_factory=list)
generation_latencies: list = field(default_factory=list)
empty_retrievals: int = 0
error_count: int = 0
@property
def cache_hit_rate(self) -> float:
total = self.cache_hits + self.cache_misses
return self.cache_hits / total if total > 0 else 0
@property
def avg_retrieval_latency_ms(self) -> float:
if not self.retrieval_latencies:
return 0
return sum(self.retrieval_latencies) / len(self.retrieval_latencies) * 1000
@property
def p95_retrieval_latency_ms(self) -> float:
if not self.retrieval_latencies:
return 0
sorted_lats = sorted(self.retrieval_latencies)
idx = int(len(sorted_lats) * 0.95)
return sorted_lats[idx] * 1000
def report(self) -> dict:
return {
"total_queries": self.query_count,
"cache_hit_rate": f"{self.cache_hit_rate:.1%}",
"avg_retrieval_ms": f"{self.avg_retrieval_latency_ms:.0f}",
"p95_retrieval_ms": f"{self.p95_retrieval_latency_ms:.0f}",
"empty_retrieval_rate": f"{self.empty_retrievals / max(self.query_count, 1):.1%}",
"error_rate": f"{self.error_count / max(self.query_count, 1):.1%}",
}
metrics = RAGMetrics()
Key metrics to track:
| Metric | Warning Threshold | What It Tells You |
|---|---|---|
| Cache hit rate | Below 30% | Users are asking diverse questions or cache TTL is too short |
| Empty retrieval rate | Above 10% | Knowledge base gaps or poor embedding quality |
| P95 retrieval latency | Above 500ms | Vector index needs optimization or scaling |
| Error rate | Above 1% | Infrastructure issues or API rate limits |
Layer 5: Horizontal Scaling
When a single instance cannot handle the load, scale the RAG pipeline horizontally:
# kubernetes deployment for RAG API
apiVersion: apps/v1
kind: Deployment
metadata:
name: rag-api
spec:
replicas: 3
selector:
matchLabels:
app: rag-api
template:
metadata:
labels:
app: rag-api
spec:
containers:
- name: rag-api
image: your-registry/rag-api:latest
ports:
- containerPort: 8000
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
env:
- name: REDIS_URL
value: "redis://redis-svc:6379/0"
- name: VECTOR_DB_URL
value: "postgresql://user:pass@pgvector-svc:5432/ragdb"
The key insight is that the RAG API layer is stateless — all state lives in Redis (cache) and the vector database. This means you can scale API replicas freely. The bottleneck shifts to the vector database, which you scale by adding read replicas (pgvector) or upgrading to a higher tier (Pinecone, Weaviate Cloud).
Ingestion Pipeline Separation
Separate the indexing pipeline from the query pipeline. They have different scaling, latency, and reliability requirements:
# ingestion_worker.py — runs as a background job or cron
async def ingest_documents(source_dir: str):
"""Offline ingestion: load, chunk, embed, store."""
documents = load_documents(source_dir)
chunks = chunk_documents(documents)
# Batch embed for efficiency
batch_size = 100
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
vectorstore.add_documents(batch)
logger.info(f"Indexed batch {i//batch_size + 1}: {len(batch)} chunks")
# Invalidate response cache after re-indexing
cache.invalidate_all()
logger.info(f"Ingestion complete: {len(chunks)} total chunks indexed")
Run ingestion on a schedule (hourly, nightly) or trigger it when source documents change via webhooks.
FAQ
How much does Redis caching reduce RAG costs?
In production systems with moderate query diversity, response caching typically reduces LLM generation costs by 40-70%. Embedding caching reduces embedding API costs by 60-80%. The combined effect is usually a 50-60% reduction in total API costs, with Redis adding minimal infrastructure cost (a small instance handles millions of cached entries).
Should I cache at the response level or the retrieval level?
Cache at both levels. Response-level caching (query -> full answer) provides the biggest latency and cost savings for identical queries. Retrieval-level caching (query -> retrieved chunks) helps when the same chunks are relevant to different questions, and is useful if you update your generation prompt frequently but keep the same knowledge base.
How do I handle knowledge base updates without downtime?
Use a blue-green indexing strategy. Build the new index alongside the old one. Once the new index is fully populated and validated, atomically switch the query pipeline to use the new index. In pgvector, this means creating a new table, indexing into it, then swapping table names in a transaction. In Chroma, create a new collection and update the collection reference. Invalidate the response cache after the swap.
#RAG #ProductionArchitecture #Caching #Monitoring #Scaling #MLOps #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.