SQL Query Validation and Safety: Preventing Dangerous Queries from AI Agents
Learn how to build a robust SQL validation layer that prevents injection attacks, enforces read-only access, limits query complexity, and ensures AI-generated queries cannot damage your database.
The Safety Problem with AI-Generated SQL
When an LLM generates SQL queries, you face a unique security challenge. Unlike traditional SQL injection where attackers craft malicious input, here the LLM itself might produce destructive queries. A user asking "Clean up old records" could cause the model to generate a DELETE statement. A prompt injection hidden in a user message could trick the LLM into running DROP TABLE.
You cannot rely on the LLM to self-police. Safety must be enforced at the execution layer, not the generation layer.
Layer 1: Statement Type Enforcement
The most fundamental check is ensuring only SELECT statements are executed. Use SQL parsing, not string matching, because string matching is easily bypassed.
import sqlparse
class SQLValidator:
"""Multi-layer SQL validation for AI-generated queries."""
ALLOWED_STATEMENT_TYPES = {"SELECT"}
FORBIDDEN_KEYWORDS = {
"DROP", "DELETE", "UPDATE", "INSERT", "ALTER", "TRUNCATE",
"CREATE", "GRANT", "REVOKE", "EXEC", "EXECUTE",
"INTO OUTFILE", "INTO DUMPFILE", "LOAD_FILE",
}
def validate(self, sql: str) -> tuple[bool, str]:
"""Run all validation checks. Returns (is_valid, reason)."""
checks = [
self._check_statement_type,
self._check_forbidden_keywords,
self._check_multiple_statements,
self._check_comments_and_unions,
self._check_complexity,
]
for check in checks:
is_valid, reason = check(sql)
if not is_valid:
return False, reason
return True, "Query passed all checks"
def _check_statement_type(self, sql: str) -> tuple[bool, str]:
parsed = sqlparse.parse(sql)
if not parsed:
return False, "Empty or unparseable query"
stmt_type = parsed[0].get_type()
if stmt_type not in self.ALLOWED_STATEMENT_TYPES:
return False, f"Statement type '{stmt_type}' is not allowed"
return True, "OK"
def _check_forbidden_keywords(self, sql: str) -> tuple[bool, str]:
upper = sql.upper()
for keyword in self.FORBIDDEN_KEYWORDS:
if keyword in upper:
return False, f"Forbidden keyword found: {keyword}"
return True, "OK"
def _check_multiple_statements(self, sql: str) -> tuple[bool, str]:
statements = [s for s in sqlparse.parse(sql) if str(s).strip()]
if len(statements) > 1:
return False, "Multiple statements detected — only single queries allowed"
return True, "OK"
def _check_comments_and_unions(self, sql: str) -> tuple[bool, str]:
# Block SQL comment-based injection patterns
if "--" in sql or "/*" in sql:
return False, "SQL comments are not allowed in generated queries"
return True, "OK"
def _check_complexity(self, sql: str) -> tuple[bool, str]:
upper = sql.upper()
join_count = upper.count(" JOIN ")
subquery_count = upper.count("(SELECT")
if join_count > 5:
return False, f"Too many JOINs ({join_count}). Maximum is 5"
if subquery_count > 3:
return False, f"Too many subqueries ({subquery_count}). Maximum is 3"
return True, "OK"
Layer 2: Database-Level Read-Only Enforcement
Application-level validation is your first line of defense, but database-level permissions are your safety net. Create a dedicated read-only user for text-to-SQL queries.
-- PostgreSQL: Create a read-only role
CREATE ROLE text_to_sql_reader WITH LOGIN PASSWORD 'secure_password';
GRANT CONNECT ON DATABASE analytics TO text_to_sql_reader;
GRANT USAGE ON SCHEMA public TO text_to_sql_reader;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO text_to_sql_reader;
-- Ensure future tables are also readable
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO text_to_sql_reader;
Even if your application validation has a bug, the database will reject any write operations from this user.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Layer 3: Query Timeout and Result Limits
AI-generated queries can accidentally create expensive operations — Cartesian products from missing JOIN conditions, full table scans without WHERE clauses, or recursive CTEs that never terminate.
import psycopg2
from contextlib import contextmanager
@contextmanager
def safe_query_connection(conn_string: str, timeout_ms: int = 10000,
max_rows: int = 1000):
"""Connection context manager with timeout and row limits."""
conn = psycopg2.connect(conn_string)
try:
cur = conn.cursor()
# Set statement timeout (PostgreSQL specific)
cur.execute(f"SET statement_timeout = {timeout_ms}")
# Set a work_mem limit to prevent memory-intensive operations
cur.execute("SET work_mem = '50MB'")
yield cur
finally:
conn.close()
def execute_safe(conn_string: str, sql: str, validator: SQLValidator) -> dict:
"""Execute a validated query with safety constraints."""
# Step 1: Validate
is_valid, reason = validator.validate(sql)
if not is_valid:
return {"error": reason, "sql": sql}
# Step 2: Force a LIMIT if none exists
if "LIMIT" not in sql.upper():
sql = f"{sql.rstrip(';')} LIMIT 1000"
# Step 3: Execute with timeout
try:
with safe_query_connection(conn_string) as cur:
cur.execute(sql)
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
return {
"columns": columns,
"rows": [dict(zip(columns, row)) for row in rows],
"row_count": len(rows),
}
except psycopg2.extensions.QueryCanceledError:
return {"error": "Query timed out after 10 seconds", "sql": sql}
except Exception as e:
return {"error": str(e), "sql": sql}
Layer 4: Query Logging and Auditing
Every AI-generated query should be logged for auditing. This helps you detect patterns of abuse and debug accuracy issues.
import logging
import hashlib
from datetime import datetime
query_logger = logging.getLogger("text_to_sql.queries")
def log_query(user_id: str, question: str, sql: str,
result: dict, duration_ms: float):
query_logger.info(
"text_to_sql_execution",
extra={
"user_id": user_id,
"question": question,
"sql_hash": hashlib.sha256(sql.encode()).hexdigest()[:12],
"sql": sql,
"row_count": result.get("row_count", 0),
"had_error": "error" in result,
"duration_ms": duration_ms,
"timestamp": datetime.utcnow().isoformat(),
},
)
Putting It All Together
validator = SQLValidator()
# This passes
ok, msg = validator.validate("SELECT name, COUNT(*) FROM orders GROUP BY name")
print(ok, msg) # True, "Query passed all checks"
# This is blocked
ok, msg = validator.validate("DELETE FROM orders WHERE id = 1")
print(ok, msg) # False, "Statement type 'DELETE' is not allowed"
# This is blocked (injection attempt)
ok, msg = validator.validate(
"SELECT * FROM users; DROP TABLE users; --"
)
print(ok, msg) # False, "Multiple statements detected"
FAQ
Can an LLM bypass these validations?
Application-level validation can theoretically be bypassed by sufficiently creative SQL. For example, a query might use SELECT ... INTO to write data in some databases. This is why database-level read-only enforcement is essential — it is the ultimate safety net that no SQL manipulation can bypass.
Should I block all subqueries?
No. Subqueries are necessary for many legitimate analytical questions like "Show me customers whose order total exceeds the average." Instead of blocking them entirely, limit the nesting depth (typically 2-3 levels is sufficient) and enforce the statement timeout to catch runaway recursive patterns.
How do I handle database-specific syntax differences?
Build dialect-specific validators. PostgreSQL allows ILIKE and ::type casting syntax that would be invalid in MySQL. Maintain a set of allowed functions and syntax patterns for each supported database engine, and validate against the appropriate set based on your target database.
#SQLSecurity #QueryValidation #TextToSQL #DatabaseSafety #SQLInjection #AgenticAI #SecureCoding #PythonSecurity
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.