Skip to content
Learn Agentic AI12 min read0 views

Production Text-to-SQL: Caching, Monitoring, and Scaling Natural Language Database Access

Learn how to take text-to-SQL from prototype to production with query caching, usage analytics, performance monitoring, cost optimization, and scaling strategies for high-traffic deployments.

The Gap Between Demo and Production

A text-to-SQL demo answers a question in 3 seconds and costs $0.02 per query. A production system serves 10,000 users, needs sub-second responses for repeated questions, must handle schema changes gracefully, and cannot let costs spiral. Bridging this gap requires caching, monitoring, and operational infrastructure.

Query Caching with Semantic Similarity

The most impactful optimization is caching. Many users ask the same questions in different words: "How many users signed up today?" and "What's today's signup count?" should return the same cached SQL.

import hashlib
import json
import time
import redis
import openai
import numpy as np
from typing import Optional

class SemanticQueryCache:
    """Cache text-to-SQL results using semantic similarity."""

    def __init__(self, redis_url: str, similarity_threshold: float = 0.92):
        self.redis = redis.from_url(redis_url)
        self.client = openai.OpenAI()
        self.threshold = similarity_threshold
        self.ttl = 3600  # 1 hour cache TTL

    def _get_embedding(self, text: str) -> list[float]:
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text,
        )
        return response.data[0].embedding

    def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
        a_np, b_np = np.array(a), np.array(b)
        return float(np.dot(a_np, b_np) / (np.linalg.norm(a_np) * np.linalg.norm(b_np)))

    def get(self, question: str) -> Optional[dict]:
        """Check cache for a semantically similar question."""
        embedding = self._get_embedding(question)

        # Scan cached embeddings
        cached_keys = self.redis.keys("sql_cache:*")
        best_match = None
        best_score = 0

        for key in cached_keys:
            cached = json.loads(self.redis.get(key))
            score = self._cosine_similarity(embedding, cached["embedding"])
            if score > best_score:
                best_score = score
                best_match = cached

        if best_match and best_score >= self.threshold:
            return {
                "sql": best_match["sql"],
                "cached": True,
                "similarity": best_score,
            }
        return None

    def set(self, question: str, sql: str, results: list[dict]):
        """Cache a question-SQL-results triple."""
        embedding = self._get_embedding(question)
        cache_key = f"sql_cache:{hashlib.md5(question.encode()).hexdigest()}"
        self.redis.setex(
            cache_key,
            self.ttl,
            json.dumps({
                "question": question,
                "sql": sql,
                "results": results[:100],
                "embedding": embedding,
                "timestamp": time.time(),
            }),
        )

Exact-Match Cache Layer

Before checking semantic similarity (which requires an embedding API call), check for exact question matches. This handles repeat queries with zero latency.

class TwoTierCache:
    """Fast exact-match cache with semantic similarity fallback."""

    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.semantic_cache = SemanticQueryCache(redis_url)

    def get(self, question: str) -> Optional[dict]:
        # Tier 1: Exact match (microseconds)
        normalized = question.strip().lower()
        exact_key = f"sql_exact:{hashlib.md5(normalized.encode()).hexdigest()}"
        cached = self.redis.get(exact_key)
        if cached:
            result = json.loads(cached)
            result["cache_tier"] = "exact"
            return result

        # Tier 2: Semantic match (hundreds of milliseconds)
        return self.semantic_cache.get(question)

    def set(self, question: str, sql: str, results: list[dict]):
        # Store in both tiers
        normalized = question.strip().lower()
        exact_key = f"sql_exact:{hashlib.md5(normalized.encode()).hexdigest()}"
        self.redis.setex(exact_key, 3600, json.dumps({
            "sql": sql, "results": results[:100],
        }))
        self.semantic_cache.set(question, sql, results)

Usage Analytics and Monitoring

Track every query for operational visibility and continuous improvement.

from datetime import datetime
from dataclasses import dataclass, asdict
import logging

@dataclass
class QueryMetrics:
    question: str
    generated_sql: str
    execution_time_ms: float
    llm_latency_ms: float
    db_latency_ms: float
    row_count: int
    was_cached: bool
    cache_tier: str
    had_error: bool
    error_message: str
    retry_count: int
    model: str
    token_count: int
    estimated_cost_usd: float
    user_id: str
    timestamp: str

class QueryMonitor:
    """Collect and report text-to-SQL usage metrics."""

    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.logger = logging.getLogger("text_to_sql.monitor")

    def record(self, metrics: QueryMetrics):
        # Log for observability
        self.logger.info("query_executed", extra=asdict(metrics))

        # Update real-time counters
        date_key = datetime.utcnow().strftime("%Y-%m-%d")
        pipe = self.redis.pipeline()
        pipe.incr(f"stats:{date_key}:total_queries")
        pipe.incrbyfloat(f"stats:{date_key}:total_cost", metrics.estimated_cost_usd)
        if metrics.was_cached:
            pipe.incr(f"stats:{date_key}:cache_hits")
        if metrics.had_error:
            pipe.incr(f"stats:{date_key}:errors")
        pipe.lpush(f"stats:{date_key}:latencies",
                    metrics.execution_time_ms)
        pipe.execute()

    def get_daily_stats(self, date: str) -> dict:
        pipe = self.redis.pipeline()
        pipe.get(f"stats:{date}:total_queries")
        pipe.get(f"stats:{date}:total_cost")
        pipe.get(f"stats:{date}:cache_hits")
        pipe.get(f"stats:{date}:errors")
        pipe.lrange(f"stats:{date}:latencies", 0, -1)
        total, cost, cache_hits, errors, latencies = pipe.execute()

        total = int(total or 0)
        latency_list = [float(l) for l in (latencies or [])]

        return {
            "date": date,
            "total_queries": total,
            "total_cost_usd": float(cost or 0),
            "cache_hit_rate": int(cache_hits or 0) / total if total > 0 else 0,
            "error_rate": int(errors or 0) / total if total > 0 else 0,
            "avg_latency_ms": sum(latency_list) / len(latency_list) if latency_list else 0,
            "p95_latency_ms": sorted(latency_list)[int(len(latency_list) * 0.95)] if latency_list else 0,
        }

Schema Change Detection

When your database schema changes, cached queries may become invalid. Detect schema changes and invalidate affected caches automatically.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

class SchemaChangeDetector:
    """Detect database schema changes and invalidate stale caches."""

    def __init__(self, conn_string: str, redis_url: str):
        self.conn_string = conn_string
        self.redis = redis.from_url(redis_url)

    def get_schema_hash(self) -> str:
        import psycopg2
        conn = psycopg2.connect(self.conn_string)
        cur = conn.cursor()
        cur.execute("""
            SELECT table_name, column_name, data_type
            FROM information_schema.columns
            WHERE table_schema = 'public'
            ORDER BY table_name, ordinal_position
        """)
        schema_str = str(cur.fetchall())
        conn.close()
        return hashlib.sha256(schema_str.encode()).hexdigest()

    def check_and_invalidate(self):
        current_hash = self.get_schema_hash()
        stored_hash = self.redis.get("schema_hash")

        if stored_hash and stored_hash.decode() != current_hash:
            # Schema changed — flush all SQL caches
            keys = self.redis.keys("sql_cache:*") + self.redis.keys("sql_exact:*")
            if keys:
                self.redis.delete(*keys)
            print(f"Schema change detected. Invalidated {len(keys)} cached queries.")

        self.redis.set("schema_hash", current_hash)

Cost Optimization Strategies

At scale, LLM costs for text-to-SQL can add up. Here are the highest-impact optimizations:

  1. Cache aggressively — a 60% cache hit rate cuts LLM costs by 60%
  2. Use smaller models for simple queries — route single-table questions to GPT-4o-mini instead of GPT-4o
  3. Batch embeddings — when building the semantic cache, batch embedding requests to reduce API calls
  4. Set short context — only include relevant tables in the schema, not the full database DDL
def estimate_monthly_cost(queries_per_day: int, cache_hit_rate: float,
                          avg_input_tokens: int, avg_output_tokens: int) -> dict:
    """Estimate monthly text-to-SQL API costs."""
    llm_queries = queries_per_day * (1 - cache_hit_rate) * 30
    embedding_queries = queries_per_day * 30  # One embedding per query for cache lookup

    # GPT-4o pricing (per 1M tokens)
    llm_cost = llm_queries * (
        avg_input_tokens * 2.50 / 1_000_000 +
        avg_output_tokens * 10.00 / 1_000_000
    )

    # Embedding pricing
    embedding_cost = embedding_queries * avg_input_tokens * 0.02 / 1_000_000

    return {
        "llm_queries_per_month": int(llm_queries),
        "llm_cost_usd": round(llm_cost, 2),
        "embedding_cost_usd": round(embedding_cost, 2),
        "total_cost_usd": round(llm_cost + embedding_cost, 2),
    }

# Example: 5000 queries/day, 60% cache hit rate
print(estimate_monthly_cost(5000, 0.60, 2000, 200))

FAQ

What cache TTL should I use for text-to-SQL results?

It depends on your data freshness requirements. For real-time dashboards, use 5-15 minutes. For analytical queries where data changes daily, 1-4 hours works well. For reference data that rarely changes (product catalog, employee directory), 24 hours is appropriate. Always invalidate on schema changes regardless of TTL.

How do I handle peak traffic without degrading quality?

Implement a tiered degradation strategy. Under normal load, use your best model (GPT-4o). When request queues exceed a threshold, switch to a faster model (GPT-4o-mini) or increase cache TTL. As a last resort, serve cached results even for non-matching questions with a disclaimer that results may be approximate.

Should I use a separate database for text-to-SQL queries?

Yes, strongly recommended. Use a read replica or a dedicated analytics database for AI-generated queries. This prevents a runaway query from affecting your production database, provides isolation for setting aggressive query timeouts, and lets you tune the replica for analytical workloads (larger work_mem, different indexes) without impacting transactional performance.


#ProductionAI #QueryCaching #Monitoring #TextToSQL #Scaling #MLOps #CostOptimization #AgenticAI

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.