RAG with Structured Data: Querying Databases and APIs Alongside Document Search
Learn how to build hybrid RAG systems that combine document retrieval with SQL database queries and API calls, unifying structured and unstructured data in a single pipeline.
Beyond Documents: The Structured Data Gap
Most RAG tutorials focus exclusively on unstructured text — PDFs, documentation, web pages. But in enterprise environments, the most authoritative answers often live in structured data: relational databases, APIs, spreadsheets, and data warehouses.
When a user asks "How many customers churned last quarter?", the answer is not in a document — it is in a database. When they ask "What is the current status of order 12345?", the answer comes from an API. And when they ask "Why are enterprise customers churning and what does our retention playbook recommend?", the answer requires both a database query and a document retrieval.
A truly useful RAG system must unify these data sources into a single retrieval layer.
Architecture for Hybrid Retrieval
The hybrid system has three retrieval paths that run in parallel:
- Document retrieval — Vector similarity search over unstructured text
- SQL retrieval — Text-to-SQL conversion for database queries
- API retrieval — Function calling for live data from external services
A router decides which paths to activate based on the query, and a merger combines results into a unified context for the LLM.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Implementing Text-to-SQL Retrieval
from openai import OpenAI
import psycopg2
client = OpenAI()
# Database schema context for the LLM
DB_SCHEMA = """
Tables:
- customers(id, name, plan, mrr, created_at, churned_at)
- orders(id, customer_id, total, status, created_at)
- support_tickets(id, customer_id, subject, priority,
status, created_at, resolved_at)
"""
def text_to_sql(query: str) -> str:
"""Convert natural language to SQL query."""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "system",
"content": f"""Convert the user's question to a
PostgreSQL query. Schema:
{DB_SCHEMA}
Rules:
- Return ONLY the SQL query, no explanation
- Always use LIMIT 100 to prevent large results
- Use date functions for time-based questions
- Never use DELETE, UPDATE, INSERT, or DROP"""
}, {
"role": "user",
"content": query
}],
)
return response.choices[0].message.content.strip()
def execute_sql_safely(sql: str) -> list[dict]:
"""Execute SQL with safety checks."""
# Block dangerous operations
forbidden = ["DELETE", "UPDATE", "INSERT", "DROP",
"ALTER", "TRUNCATE"]
sql_upper = sql.upper()
for keyword in forbidden:
if keyword in sql_upper:
raise ValueError(
f"Forbidden SQL operation: {keyword}"
)
conn = psycopg2.connect(
host="localhost", database="app",
user="readonly_user", password="password"
)
try:
with conn.cursor() as cur:
cur.execute(sql)
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
return [dict(zip(columns, row)) for row in rows]
finally:
conn.close()
Adding API Retrieval Tools
import requests
from typing import Any
class APIRetriever:
"""Retrieve live data from external APIs."""
def __init__(self, api_configs: dict):
self.apis = api_configs
def get_order_status(self, order_id: str) -> dict:
"""Fetch current order status from the order service."""
response = requests.get(
f"{self.apis['orders_url']}/orders/{order_id}",
headers={"Authorization": f"Bearer {self.apis['token']}"},
timeout=5,
)
response.raise_for_status()
return response.json()
def get_customer_health(
self, customer_id: str
) -> dict:
"""Fetch customer health score from analytics API."""
response = requests.get(
f"{self.apis['analytics_url']}/health/{customer_id}",
headers={"Authorization": f"Bearer {self.apis['token']}"},
timeout=5,
)
response.raise_for_status()
return response.json()
The Unified Hybrid Pipeline
import json
class HybridRAG:
def __init__(self, vectorstore, api_retriever):
self.vectorstore = vectorstore
self.api_retriever = api_retriever
def classify_query(self, query: str) -> dict:
"""Determine which retrieval paths to activate."""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "system",
"content": """Classify the query for retrieval
routing. Return JSON:
{
"needs_documents": true/false,
"needs_database": true/false,
"needs_api": true/false,
"sql_query_hint": "what to query if DB needed",
"api_action": "which API if needed"
}"""
}, {
"role": "user",
"content": query
}],
response_format={"type": "json_object"}
)
return json.loads(response.choices[0].message.content)
def retrieve(self, query: str) -> str:
"""Unified retrieval across all data sources."""
routing = self.classify_query(query)
context_parts = []
# Path 1: Document retrieval
if routing.get("needs_documents"):
docs = self.vectorstore.similarity_search(
query, k=5
)
doc_context = "\n".join(
d.page_content for d in docs
)
context_parts.append(
f"## Document Results\n{doc_context}"
)
# Path 2: Database retrieval
if routing.get("needs_database"):
try:
sql = text_to_sql(query)
results = execute_sql_safely(sql)
db_context = json.dumps(
results, indent=2, default=str
)
context_parts.append(
f"## Database Results\n"
f"Query: {sql}\n"
f"Results:\n{db_context}"
)
except Exception as e:
context_parts.append(
f"## Database Error\n{str(e)}"
)
# Path 3: API retrieval
if routing.get("needs_api"):
action = routing.get("api_action", "")
try:
if "order" in action.lower():
# Extract order ID from query
api_data = self.api_retriever.get_order_status(
routing.get("entity_id", "")
)
context_parts.append(
f"## Live API Data\n"
f"{json.dumps(api_data, indent=2)}"
)
except Exception as e:
context_parts.append(
f"## API Error\n{str(e)}"
)
return "\n\n".join(context_parts)
def answer(self, query: str) -> str:
"""Full hybrid RAG pipeline."""
context = self.retrieve(query)
response = client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "system",
"content": "Answer using the provided context "
"which may include document excerpts, "
"database query results, and live API "
"data. Cite which source type supports "
"each part of your answer."
}, {
"role": "user",
"content": f"Context:\n{context}\n\n"
f"Question: {query}"
}],
)
return response.choices[0].message.content
Security Considerations
Text-to-SQL introduces SQL injection risk. Always use a read-only database user, validate generated SQL against an allow-list of operations, run queries with statement timeouts, and log all generated SQL for audit. Never let the LLM compose SQL that gets executed with write permissions.
FAQ
How do I prevent the LLM from generating dangerous SQL?
Use three layers of defense: a read-only database user that physically cannot modify data, keyword filtering that rejects queries with DDL or DML statements, and a statement timeout (5-10 seconds) that kills runaway queries. Additionally, log all generated SQL so you can audit patterns and refine your prompt.
Should I use text-to-SQL or pre-built SQL templates?
For narrow, well-defined question patterns, pre-built templates with parameter extraction are more reliable and faster. For open-ended analytical questions where users explore freely, text-to-SQL is necessary. Many production systems use templates for common queries and fall back to text-to-SQL for novel questions.
How do I handle conflicting information between documents and database results?
Always prioritize structured database results for quantitative facts (numbers, dates, statuses) because they represent the system of record. Use documents for qualitative context (explanations, recommendations, procedures). When presenting the answer, clearly attribute which source each piece of information comes from.
#StructuredDataRAG #TexttoSQL #HybridRetrieval #APIIntegration #RAG #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.