Database Integration Patterns for AI Agents: Read-Only, Write-Through, and Event-Driven
How AI agents interact with databases safely using read-only tools for queries, write-through validation layers, and event-driven updates via message queues.
The Database Access Problem for AI Agents
Giving an AI agent access to a database is one of the most powerful things you can do — and one of the most dangerous. A well-designed database tool lets the agent answer questions like "what were our top 10 customers by revenue last quarter?" without requiring a human analyst to write the query. A poorly designed one lets the agent accidentally run DROP TABLE customers because the user said "remove the customer data from my view."
The core tension is between capability and safety. Agents need enough database access to be useful, but every write operation is a potential irreversible mistake. The solution is not to avoid database access entirely — it is to design the access patterns carefully, with appropriate safeguards at each layer.
This post covers three database integration patterns, ordered from safest to most powerful: read-only access, write-through with validation, and event-driven updates.
Pattern 1: Read-Only Database Tools
The simplest and safest pattern gives the agent read-only access to the database. The agent can query data but cannot modify it. This covers a surprisingly large portion of use cases: data analysis, report generation, customer lookup, inventory checking, and troubleshooting.
# Read-only database tool with parameterized queries
import asyncpg
from typing import Any
class ReadOnlyDBTool:
"""Database tool that only allows SELECT queries."""
def __init__(self, dsn: str, max_rows: int = 100):
self.dsn = dsn
self.max_rows = max_rows
self._pool: asyncpg.Pool | None = None
async def connect(self):
# Use a read-only database user
self._pool = await asyncpg.create_pool(
self.dsn,
min_size=2,
max_size=10,
# Set statement timeout to prevent long-running queries
server_settings={"statement_timeout": "10000"}, # 10 seconds
)
async def execute_query(self, sql: str, params: list[Any] | None = None) -> dict:
"""
Execute a read-only SQL query with safety checks.
Args:
sql: A SELECT query. Mutations are rejected.
params: Parameterized query values (prevents SQL injection).
Returns:
Dictionary with columns and rows.
"""
# Safety check: reject non-SELECT statements
normalized = sql.strip().upper()
if not normalized.startswith("SELECT") and not normalized.startswith("WITH"):
return {
"error": "Only SELECT queries are allowed. "
"This tool cannot modify data.",
"suggestion": "Rephrase your query as a SELECT statement."
}
# Additional safety: reject known dangerous patterns
dangerous_patterns = [
"INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE",
"CREATE", "GRANT", "REVOKE", "EXEC", "EXECUTE",
]
for pattern in dangerous_patterns:
if pattern in normalized:
return {
"error": f"Query contains forbidden keyword: {pattern}",
"suggestion": "This is a read-only tool. Use only SELECT statements."
}
# Enforce row limit
if "LIMIT" not in normalized:
sql = f"{sql} LIMIT {self.max_rows}"
async with self._pool.acquire() as conn:
try:
rows = await conn.fetch(sql, *(params or []))
columns = list(rows[0].keys()) if rows else []
return {
"columns": columns,
"rows": [dict(row) for row in rows],
"row_count": len(rows),
"truncated": len(rows) == self.max_rows,
}
except asyncpg.PostgresError as e:
return {"error": f"Query failed: {e}", "sql": sql}
# Register as an agent tool
read_db = ReadOnlyDBTool(dsn="postgresql://readonly_user:***@db:5432/app")
TOOL_DEFINITION = {
"type": "function",
"function": {
"name": "query_database",
"description": (
"Execute a read-only SQL query against the application database. "
"Only SELECT queries are allowed. Results are limited to 100 rows. "
"Use parameterized queries with $1, $2 placeholders for user-provided values. "
"Available tables: customers, orders, products, support_tickets."
),
"parameters": {
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "A SELECT SQL query"
},
"params": {
"type": "array",
"items": {"type": "string"},
"description": "Values for parameterized query placeholders ($1, $2, etc.)"
}
},
"required": ["sql"]
}
}
}
The read-only pattern uses multiple safety layers: a database user with only SELECT permissions, application-level SQL parsing to reject mutations, query timeouts to prevent resource exhaustion, and row limits to prevent the agent from dumping entire tables.
Pattern 2: Write-Through with Validation
Some agent use cases require write access: creating support tickets, updating order statuses, modifying user preferences. The write-through pattern allows mutations but routes them through a validation layer that checks every write against a set of business rules before executing it.
# Write-through database tool with validation layer
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable
class WriteAction(Enum):
CREATE_TICKET = "create_ticket"
UPDATE_ORDER_STATUS = "update_order_status"
ADD_NOTE = "add_note"
@dataclass
class WriteRequest:
action: WriteAction
table: str
data: dict[str, Any]
conditions: dict[str, Any] | None = None # WHERE clause for updates
@dataclass
class ValidationResult:
approved: bool
reason: str
modified_data: dict[str, Any] | None = None # Sanitized version
# Validation rules per write action
VALIDATION_RULES: dict[WriteAction, list[Callable]] = {
WriteAction.CREATE_TICKET: [
lambda data: (True, "") if "customer_id" in data else (False, "customer_id is required"),
lambda data: (True, "") if "summary" in data and len(data["summary"]) < 500
else (False, "summary is required and must be under 500 chars"),
lambda data: (True, "") if data.get("priority") in ["low", "medium", "high", "critical"]
else (False, "priority must be low, medium, high, or critical"),
],
WriteAction.UPDATE_ORDER_STATUS: [
lambda data: (True, "") if "order_id" in data else (False, "order_id is required"),
lambda data: (True, "")
if data.get("new_status") in ["processing", "shipped", "delivered", "cancelled"]
else (False, "invalid status transition"),
# Prevent status rollback
lambda data: validate_status_transition(data.get("current_status"), data.get("new_status")),
],
}
async def validate_write(request: WriteRequest) -> ValidationResult:
"""Validate a write request against business rules."""
rules = VALIDATION_RULES.get(request.action, [])
for rule in rules:
passed, reason = rule(request.data)
if not passed:
return ValidationResult(approved=False, reason=reason)
return ValidationResult(approved=True, reason="All validations passed")
async def execute_write(request: WriteRequest) -> dict[str, Any]:
"""Execute a validated write operation."""
validation = await validate_write(request)
if not validation.approved:
return {"error": validation.reason, "action": "rejected"}
# Log the write for audit
await audit_log.record(
action=request.action.value,
table=request.table,
data=request.data,
timestamp=datetime.utcnow(),
)
# Execute the actual write
if request.action == WriteAction.CREATE_TICKET:
ticket_id = await db.insert("support_tickets", request.data)
return {"success": True, "ticket_id": ticket_id}
elif request.action == WriteAction.UPDATE_ORDER_STATUS:
await db.update(
"orders",
{"status": request.data["new_status"]},
{"order_id": request.data["order_id"]},
)
return {"success": True, "order_id": request.data["order_id"]}
return {"error": "Unknown action"}
The write-through pattern constrains the agent to a predefined set of write actions with explicit validation. The agent cannot construct arbitrary INSERT or UPDATE statements — it must use the defined actions, and each action has its own validation rules.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Pattern 3: Event-Driven Updates via Message Queues
The most decoupled pattern separates the agent from the database entirely. Instead of writing directly, the agent publishes events to a message queue. Downstream consumers process these events, validate them against the current database state, and apply the changes.
# Event-driven agent database interaction
import json
from datetime import datetime, timezone
from uuid import uuid4
import aio_pika
@dataclass
class AgentEvent:
event_id: str
event_type: str
agent_id: str
session_id: str
payload: dict[str, Any]
timestamp: str
requires_approval: bool = False
class AgentEventPublisher:
"""Publish agent actions as events to a message queue."""
def __init__(self, amqp_url: str, exchange_name: str = "agent-events"):
self.amqp_url = amqp_url
self.exchange_name = exchange_name
async def connect(self):
self.connection = await aio_pika.connect_robust(self.amqp_url)
self.channel = await self.connection.channel()
self.exchange = await self.channel.declare_exchange(
self.exchange_name, aio_pika.ExchangeType.TOPIC, durable=True
)
async def publish(self, event: AgentEvent) -> str:
"""Publish an agent event and return the event ID for tracking."""
message = aio_pika.Message(
body=json.dumps({
"event_id": event.event_id,
"event_type": event.event_type,
"agent_id": event.agent_id,
"session_id": event.session_id,
"payload": event.payload,
"timestamp": event.timestamp,
"requires_approval": event.requires_approval,
}).encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
message_id=event.event_id,
)
routing_key = f"agent.{event.event_type}"
await self.exchange.publish(message, routing_key=routing_key)
return event.event_id
# Agent tool that publishes events instead of writing directly
async def request_order_cancellation(
order_id: str,
reason: str,
agent_id: str,
session_id: str,
) -> dict:
"""Request an order cancellation. The request is queued for processing."""
event = AgentEvent(
event_id=str(uuid4()),
event_type="order.cancellation_requested",
agent_id=agent_id,
session_id=session_id,
payload={
"order_id": order_id,
"reason": reason,
"requested_at": datetime.now(timezone.utc).isoformat(),
},
timestamp=datetime.now(timezone.utc).isoformat(),
requires_approval=True, # Cancellations require human approval
)
event_id = await publisher.publish(event)
return {
"status": "queued",
"event_id": event_id,
"message": "Your cancellation request has been submitted and "
"will be processed within 5 minutes.",
}
The event-driven pattern has three advantages. First, it provides natural rate limiting — the queue consumer processes events at a controlled pace regardless of how many requests the agent generates. Second, it enables event sourcing — every agent action is recorded as an immutable event, providing a complete audit trail. Third, it decouples the agent from the database schema — the consumer handles the mapping from events to database operations, so the agent does not need to know table structures.
Choosing the Right Pattern
Use read-only when the agent's primary job is answering questions, generating reports, or looking up information. This covers most customer support, analytics, and research agent use cases.
Use write-through when the agent needs to take actions that directly modify application state but the set of possible actions is well-defined and bounded. Support ticket creation, status updates, and preference changes fit this pattern.
Use event-driven when the agent's actions have downstream consequences that require coordination across multiple systems, when actions may need human approval, or when you need a complete, immutable audit trail of every agent action.
Many production agents combine all three patterns: read-only tools for data retrieval, write-through tools for simple mutations, and event publishing for complex or high-risk actions.
FAQ
How do you prevent SQL injection when giving an AI agent database access?
Always use parameterized queries. The agent provides the query structure and the parameter values separately, and the database driver handles escaping. Never concatenate user-provided values into SQL strings. The read-only tool example above uses asyncpg's parameterized query syntax ($1, $2) which prevents injection at the driver level.
What happens if the event consumer is down when the agent publishes an event?
That is the advantage of a durable message queue. Events are persisted to disk and survive consumer restarts. When the consumer comes back online, it processes the backlog in order. The agent receives immediate confirmation that the event was queued (not processed), so the user knows their request was received even if processing is delayed.
Should agents generate SQL directly or use predefined query templates?
It depends on the use case. For analytical agents that need to answer ad-hoc questions, letting the agent generate SQL (within read-only constraints) provides maximum flexibility. For operational agents that perform specific actions, predefined templates are safer and more predictable. A common hybrid approach uses agent-generated SQL for reads and predefined templates for writes.
How do you handle database schema changes when agents have learned the old schema?
Include the current schema in the agent's system prompt or tool description, and update it whenever the schema changes. For agents that generate SQL, provide a dynamic schema description that is generated from the database's information_schema at startup. This ensures the agent always has an accurate view of available tables and columns.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.