Skip to content
Learn Agentic AI11 min read0 views

Text-to-SQL Error Correction: Self-Healing Queries That Fix Their Own Mistakes

Build a text-to-SQL system with automatic error detection, self-correction loops, and intelligent retry strategies that fix query mistakes without user intervention.

Why Error Correction Matters

Even the best text-to-SQL systems produce invalid queries 10-20% of the time. Column name mismatches, syntax errors, incorrect data type comparisons, and ambiguous GROUP BY clauses are common failure modes. Without error correction, these failures become dead ends for the user.

A self-healing query system catches execution errors, feeds them back to the LLM with the original context, and asks for a corrected version. This simple loop recovers from 60-80% of first-attempt failures, pushing overall system accuracy from 85% to 95%+.

Error Categories in AI-Generated SQL

Understanding error types helps you build targeted correction strategies:

Syntax errors — Missing parentheses, incorrect keyword order, invalid function names. These produce immediate database errors with clear messages.

Schema errors — Referencing non-existent tables or columns. The error message includes the problematic identifier, making correction straightforward.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Type errors — Comparing incompatible types, like WHERE date_column = 42. These may produce errors or silently return wrong results.

Logic errors — Valid SQL that returns incorrect results. The query runs successfully but answers the wrong question. These are the hardest to detect.

Building the Self-Correction Loop

import openai
import psycopg2
from dataclasses import dataclass
from typing import Optional

@dataclass
class QueryAttempt:
    sql: str
    error: Optional[str]
    results: Optional[list[dict]]

class SelfHealingSQLAgent:
    def __init__(self, conn_string: str, schema: str):
        self.conn_string = conn_string
        self.schema = schema
        self.client = openai.OpenAI()
        self.max_retries = 3

    def _generate(self, question: str,
                  previous_attempts: list[QueryAttempt]) -> str:
        """Generate SQL, incorporating error context from previous attempts."""
        system_msg = f"""You are a PostgreSQL expert. Generate a SQL query for
the user's question using this schema:

{self.schema}

Return ONLY the SQL query."""

        # Build error context from previous attempts
        if previous_attempts:
            error_context = "\n\nPrevious attempts that failed:\n"
            for i, attempt in enumerate(previous_attempts, 1):
                error_context += f"\nAttempt {i}:\n"
                error_context += f"SQL: {attempt.sql}\n"
                error_context += f"Error: {attempt.error}\n"
            error_context += "\nFix the issues and generate a corrected query."
            system_msg += error_context

        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": system_msg},
                {"role": "user", "content": question},
            ],
            temperature=0,
        )
        return response.choices[0].message.content.strip()

    def _execute(self, sql: str) -> QueryAttempt:
        """Execute SQL and return the attempt result."""
        try:
            conn = psycopg2.connect(self.conn_string)
            cur = conn.cursor()
            cur.execute(sql)
            columns = [desc[0] for desc in cur.description]
            rows = [dict(zip(columns, row)) for row in cur.fetchall()]
            conn.close()
            return QueryAttempt(sql=sql, error=None, results=rows)
        except Exception as e:
            return QueryAttempt(sql=sql, error=str(e), results=None)

    def ask(self, question: str) -> dict:
        """Ask a question with automatic error correction."""
        attempts = []

        for retry in range(self.max_retries):
            sql = self._generate(question, attempts)
            attempt = self._execute(sql)
            attempts.append(attempt)

            if attempt.error is None:
                return {
                    "sql": sql,
                    "results": attempt.results,
                    "total_attempts": retry + 1,
                    "corrections": [a.error for a in attempts[:-1]],
                }

        # All retries exhausted
        return {
            "error": "Failed after all retry attempts",
            "attempts": [
                {"sql": a.sql, "error": a.error} for a in attempts
            ],
        }

Detecting Logic Errors with Result Validation

Syntax and schema errors produce exceptions. Logic errors are harder because the query succeeds but returns wrong data. Use heuristic checks to catch common logic errors:

class ResultValidator:
    """Detect potential logic errors in query results."""

    def validate(self, question: str, sql: str,
                 results: list[dict]) -> tuple[bool, str]:
        checks = [
            self._check_empty_results,
            self._check_unreasonable_counts,
            self._check_null_heavy_results,
            self._check_aggregation_mismatch,
        ]
        for check in checks:
            is_valid, reason = check(question, sql, results)
            if not is_valid:
                return False, reason
        return True, "Results look reasonable"

    def _check_empty_results(self, question: str, sql: str,
                             results: list[dict]) -> tuple[bool, str]:
        if not results:
            # Empty results might be correct, but flag for review
            return False, ("Query returned 0 rows. This may indicate an "
                          "overly restrictive WHERE clause or a wrong table.")
        return True, "OK"

    def _check_unreasonable_counts(self, question: str, sql: str,
                                    results: list[dict]) -> tuple[bool, str]:
        upper_q = question.upper()
        if any(word in upper_q for word in ["HOW MANY", "COUNT", "TOTAL"]):
            # For count questions, check if result is a single row
            if len(results) > 1 and "count" not in str(results[0].keys()).lower():
                return False, ("Question asks for a count but query returned "
                              "multiple rows without aggregation.")
        return True, "OK"

    def _check_null_heavy_results(self, question: str, sql: str,
                                   results: list[dict]) -> tuple[bool, str]:
        if results:
            null_count = sum(
                1 for row in results
                for v in row.values() if v is None
            )
            total_values = len(results) * len(results[0])
            if total_values > 0 and null_count / total_values > 0.5:
                return False, ("Over 50% of result values are NULL, "
                              "suggesting a wrong JOIN or missing data.")
        return True, "OK"

    def _check_aggregation_mismatch(self, question: str, sql: str,
                                     results: list[dict]) -> tuple[bool, str]:
        upper_q = question.upper()
        has_group_word = any(w in upper_q for w in ["EACH", "PER", "BY", "EVERY"])
        if has_group_word and len(results) <= 1:
            return False, ("Question implies grouping ('each', 'per', 'by') "
                          "but only 1 row returned. Missing GROUP BY?")
        return True, "OK"

Integrating Result Validation into the Agent

class SelfHealingSQLAgentV2(SelfHealingSQLAgent):
    def __init__(self, conn_string: str, schema: str):
        super().__init__(conn_string, schema)
        self.result_validator = ResultValidator()

    def ask(self, question: str) -> dict:
        attempts = []

        for retry in range(self.max_retries):
            sql = self._generate(question, attempts)
            attempt = self._execute(sql)

            if attempt.error:
                attempts.append(attempt)
                continue

            # Validate results for logic errors
            is_valid, reason = self.result_validator.validate(
                question, sql, attempt.results
            )
            if not is_valid:
                attempt.error = f"Result validation: {reason}"
                attempts.append(attempt)
                continue

            attempts.append(attempt)
            return {
                "sql": sql,
                "results": attempt.results,
                "total_attempts": retry + 1,
            }

        return {"error": "All attempts failed", "attempts": len(attempts)}

FAQ

How many retries should I allow?

Three retries is the sweet spot. Most recoverable errors are fixed on the second attempt when the LLM sees the error message. A third attempt handles edge cases where the second fix introduces a new issue. Beyond three retries, the success rate drops sharply — if the model cannot fix it in three tries, it likely needs human intervention or a clearer question.

Does error correction increase latency significantly?

On average, 85% of queries succeed on the first attempt, so the typical user experiences no added latency. For the 15% that need correction, each retry adds 1-2 seconds (one LLM call plus one database query). The total worst-case for three retries is about 6 seconds, which is acceptable for analytical queries where users expect some processing time.

Can I use the error correction history to improve the model over time?

Yes. Log all correction events — the original question, the failed SQL, the error message, and the corrected SQL. This creates a training dataset of common mistakes. You can use it for few-shot examples in the prompt or for fine-tuning to reduce first-attempt errors over time.


#ErrorCorrection #SelfHealing #TextToSQL #QueryRetry #AgenticAI #LLMDebugging #SQLFixing #AIReliability

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.