Production Text-to-SQL: Caching, Monitoring, and Scaling Natural Language Database Access
Learn how to take text-to-SQL from prototype to production with query caching, usage analytics, performance monitoring, cost optimization, and scaling strategies for high-traffic deployments.
The Gap Between Demo and Production
A text-to-SQL demo answers a question in 3 seconds and costs $0.02 per query. A production system serves 10,000 users, needs sub-second responses for repeated questions, must handle schema changes gracefully, and cannot let costs spiral. Bridging this gap requires caching, monitoring, and operational infrastructure.
Query Caching with Semantic Similarity
The most impactful optimization is caching. Many users ask the same questions in different words: "How many users signed up today?" and "What's today's signup count?" should return the same cached SQL.
import hashlib
import json
import time
import redis
import openai
import numpy as np
from typing import Optional
class SemanticQueryCache:
"""Cache text-to-SQL results using semantic similarity."""
def __init__(self, redis_url: str, similarity_threshold: float = 0.92):
self.redis = redis.from_url(redis_url)
self.client = openai.OpenAI()
self.threshold = similarity_threshold
self.ttl = 3600 # 1 hour cache TTL
def _get_embedding(self, text: str) -> list[float]:
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return response.data[0].embedding
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
a_np, b_np = np.array(a), np.array(b)
return float(np.dot(a_np, b_np) / (np.linalg.norm(a_np) * np.linalg.norm(b_np)))
def get(self, question: str) -> Optional[dict]:
"""Check cache for a semantically similar question."""
embedding = self._get_embedding(question)
# Scan cached embeddings
cached_keys = self.redis.keys("sql_cache:*")
best_match = None
best_score = 0
for key in cached_keys:
cached = json.loads(self.redis.get(key))
score = self._cosine_similarity(embedding, cached["embedding"])
if score > best_score:
best_score = score
best_match = cached
if best_match and best_score >= self.threshold:
return {
"sql": best_match["sql"],
"cached": True,
"similarity": best_score,
}
return None
def set(self, question: str, sql: str, results: list[dict]):
"""Cache a question-SQL-results triple."""
embedding = self._get_embedding(question)
cache_key = f"sql_cache:{hashlib.md5(question.encode()).hexdigest()}"
self.redis.setex(
cache_key,
self.ttl,
json.dumps({
"question": question,
"sql": sql,
"results": results[:100],
"embedding": embedding,
"timestamp": time.time(),
}),
)
Exact-Match Cache Layer
Before checking semantic similarity (which requires an embedding API call), check for exact question matches. This handles repeat queries with zero latency.
class TwoTierCache:
"""Fast exact-match cache with semantic similarity fallback."""
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.semantic_cache = SemanticQueryCache(redis_url)
def get(self, question: str) -> Optional[dict]:
# Tier 1: Exact match (microseconds)
normalized = question.strip().lower()
exact_key = f"sql_exact:{hashlib.md5(normalized.encode()).hexdigest()}"
cached = self.redis.get(exact_key)
if cached:
result = json.loads(cached)
result["cache_tier"] = "exact"
return result
# Tier 2: Semantic match (hundreds of milliseconds)
return self.semantic_cache.get(question)
def set(self, question: str, sql: str, results: list[dict]):
# Store in both tiers
normalized = question.strip().lower()
exact_key = f"sql_exact:{hashlib.md5(normalized.encode()).hexdigest()}"
self.redis.setex(exact_key, 3600, json.dumps({
"sql": sql, "results": results[:100],
}))
self.semantic_cache.set(question, sql, results)
Usage Analytics and Monitoring
Track every query for operational visibility and continuous improvement.
from datetime import datetime
from dataclasses import dataclass, asdict
import logging
@dataclass
class QueryMetrics:
question: str
generated_sql: str
execution_time_ms: float
llm_latency_ms: float
db_latency_ms: float
row_count: int
was_cached: bool
cache_tier: str
had_error: bool
error_message: str
retry_count: int
model: str
token_count: int
estimated_cost_usd: float
user_id: str
timestamp: str
class QueryMonitor:
"""Collect and report text-to-SQL usage metrics."""
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.logger = logging.getLogger("text_to_sql.monitor")
def record(self, metrics: QueryMetrics):
# Log for observability
self.logger.info("query_executed", extra=asdict(metrics))
# Update real-time counters
date_key = datetime.utcnow().strftime("%Y-%m-%d")
pipe = self.redis.pipeline()
pipe.incr(f"stats:{date_key}:total_queries")
pipe.incrbyfloat(f"stats:{date_key}:total_cost", metrics.estimated_cost_usd)
if metrics.was_cached:
pipe.incr(f"stats:{date_key}:cache_hits")
if metrics.had_error:
pipe.incr(f"stats:{date_key}:errors")
pipe.lpush(f"stats:{date_key}:latencies",
metrics.execution_time_ms)
pipe.execute()
def get_daily_stats(self, date: str) -> dict:
pipe = self.redis.pipeline()
pipe.get(f"stats:{date}:total_queries")
pipe.get(f"stats:{date}:total_cost")
pipe.get(f"stats:{date}:cache_hits")
pipe.get(f"stats:{date}:errors")
pipe.lrange(f"stats:{date}:latencies", 0, -1)
total, cost, cache_hits, errors, latencies = pipe.execute()
total = int(total or 0)
latency_list = [float(l) for l in (latencies or [])]
return {
"date": date,
"total_queries": total,
"total_cost_usd": float(cost or 0),
"cache_hit_rate": int(cache_hits or 0) / total if total > 0 else 0,
"error_rate": int(errors or 0) / total if total > 0 else 0,
"avg_latency_ms": sum(latency_list) / len(latency_list) if latency_list else 0,
"p95_latency_ms": sorted(latency_list)[int(len(latency_list) * 0.95)] if latency_list else 0,
}
Schema Change Detection
When your database schema changes, cached queries may become invalid. Detect schema changes and invalidate affected caches automatically.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
class SchemaChangeDetector:
"""Detect database schema changes and invalidate stale caches."""
def __init__(self, conn_string: str, redis_url: str):
self.conn_string = conn_string
self.redis = redis.from_url(redis_url)
def get_schema_hash(self) -> str:
import psycopg2
conn = psycopg2.connect(self.conn_string)
cur = conn.cursor()
cur.execute("""
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'public'
ORDER BY table_name, ordinal_position
""")
schema_str = str(cur.fetchall())
conn.close()
return hashlib.sha256(schema_str.encode()).hexdigest()
def check_and_invalidate(self):
current_hash = self.get_schema_hash()
stored_hash = self.redis.get("schema_hash")
if stored_hash and stored_hash.decode() != current_hash:
# Schema changed — flush all SQL caches
keys = self.redis.keys("sql_cache:*") + self.redis.keys("sql_exact:*")
if keys:
self.redis.delete(*keys)
print(f"Schema change detected. Invalidated {len(keys)} cached queries.")
self.redis.set("schema_hash", current_hash)
Cost Optimization Strategies
At scale, LLM costs for text-to-SQL can add up. Here are the highest-impact optimizations:
- Cache aggressively — a 60% cache hit rate cuts LLM costs by 60%
- Use smaller models for simple queries — route single-table questions to GPT-4o-mini instead of GPT-4o
- Batch embeddings — when building the semantic cache, batch embedding requests to reduce API calls
- Set short context — only include relevant tables in the schema, not the full database DDL
def estimate_monthly_cost(queries_per_day: int, cache_hit_rate: float,
avg_input_tokens: int, avg_output_tokens: int) -> dict:
"""Estimate monthly text-to-SQL API costs."""
llm_queries = queries_per_day * (1 - cache_hit_rate) * 30
embedding_queries = queries_per_day * 30 # One embedding per query for cache lookup
# GPT-4o pricing (per 1M tokens)
llm_cost = llm_queries * (
avg_input_tokens * 2.50 / 1_000_000 +
avg_output_tokens * 10.00 / 1_000_000
)
# Embedding pricing
embedding_cost = embedding_queries * avg_input_tokens * 0.02 / 1_000_000
return {
"llm_queries_per_month": int(llm_queries),
"llm_cost_usd": round(llm_cost, 2),
"embedding_cost_usd": round(embedding_cost, 2),
"total_cost_usd": round(llm_cost + embedding_cost, 2),
}
# Example: 5000 queries/day, 60% cache hit rate
print(estimate_monthly_cost(5000, 0.60, 2000, 200))
FAQ
What cache TTL should I use for text-to-SQL results?
It depends on your data freshness requirements. For real-time dashboards, use 5-15 minutes. For analytical queries where data changes daily, 1-4 hours works well. For reference data that rarely changes (product catalog, employee directory), 24 hours is appropriate. Always invalidate on schema changes regardless of TTL.
How do I handle peak traffic without degrading quality?
Implement a tiered degradation strategy. Under normal load, use your best model (GPT-4o). When request queues exceed a threshold, switch to a faster model (GPT-4o-mini) or increase cache TTL. As a last resort, serve cached results even for non-matching questions with a disclaimer that results may be approximate.
Should I use a separate database for text-to-SQL queries?
Yes, strongly recommended. Use a read replica or a dedicated analytics database for AI-generated queries. This prevents a runaway query from affecting your production database, provides isolation for setting aggressive query timeouts, and lets you tune the replica for analytical workloads (larger work_mem, different indexes) without impacting transactional performance.
#ProductionAI #QueryCaching #Monitoring #TextToSQL #Scaling #MLOps #CostOptimization #AgenticAI
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.