Database Reliability for AI Agents: Replication, Failover, and Backup Strategies
Ensure database reliability for AI agent systems with high-availability setups, automatic failover, backup testing, disaster recovery planning, and connection management strategies that keep agents running through database failures.
Why Database Reliability Is Critical for AI Agents
AI agents depend on databases for conversation history, tool state, user preferences, task queues, and retrieved context. Unlike stateless web APIs that can retry on a different server, an agent mid-conversation needs its state. A database failure during an agent task does not just drop a request — it can corrupt an entire workflow that took minutes of LLM inference to build.
The cost of database downtime for agents is measured not just in lost requests, but in lost LLM computation, which has a direct dollar cost.
High-Availability Database Architecture
from dataclasses import dataclass
from typing import List, Optional
import asyncpg
import time
@dataclass
class DatabaseNode:
host: str
port: int
role: str # "primary", "replica", "witness"
region: str
pool: Optional[asyncpg.Pool] = None
class AgentDatabaseCluster:
def __init__(self, nodes: List[DatabaseNode]):
self.nodes = nodes
self.primary = next(n for n in nodes if n.role == "primary")
self.replicas = [n for n in nodes if n.role == "replica"]
self._current_primary = self.primary
async def initialize_pools(self):
for node in self.nodes:
if node.role != "witness":
node.pool = await asyncpg.create_pool(
host=node.host,
port=node.port,
database="agent_db",
min_size=5,
max_size=20,
command_timeout=10,
server_settings={
"application_name": "ai-agent",
"statement_timeout": "30000",
},
)
async def execute_write(self, query: str, *args):
"""Route writes to the current primary."""
try:
async with self._current_primary.pool.acquire() as conn:
return await conn.execute(query, *args)
except asyncpg.ConnectionDoesNotExistError:
await self._handle_primary_failure()
async with self._current_primary.pool.acquire() as conn:
return await conn.execute(query, *args)
async def execute_read(self, query: str, *args,
consistency: str = "eventual"):
"""Route reads to replicas or primary based on consistency needs."""
if consistency == "strong":
pool = self._current_primary.pool
else:
# Round-robin across replicas
replica = self._pick_healthy_replica()
pool = replica.pool if replica else self._current_primary.pool
async with pool.acquire() as conn:
return await conn.fetch(query, *args)
def _pick_healthy_replica(self) -> Optional[DatabaseNode]:
for replica in self.replicas:
if replica.pool and replica.pool.get_size() > 0:
return replica
return None
async def _handle_primary_failure(self):
"""Promote a replica to primary."""
for replica in self.replicas:
try:
async with replica.pool.acquire() as conn:
await conn.execute("SELECT 1")
self._current_primary = replica
return
except Exception:
continue
raise Exception("All database nodes are unreachable")
The read/write split is critical for agent workloads. Agent conversation reads (loading history) can hit replicas, while state mutations (saving new messages) must go to the primary.
Automatic Failover Configuration
# patroni-config.yaml (PostgreSQL HA with Patroni)
scope: agent-db-cluster
namespace: /agent-db/
restapi:
listen: 0.0.0.0:8008
connect_address: "${POD_IP}:8008"
bootstrap:
dcs:
ttl: 30
loop_wait: 10
retry_timeout: 10
maximum_lag_on_failover: 1048576 # 1MB
postgresql:
use_pg_rewind: true
parameters:
max_connections: 200
shared_buffers: 2GB
wal_level: replica
hot_standby: "on"
max_wal_senders: 10
max_replication_slots: 10
wal_keep_size: 1GB
synchronous_commit: "on" # data safety for agent state
initdb:
- encoding: UTF8
- data-checksums
postgresql:
listen: 0.0.0.0:5432
connect_address: "${POD_IP}:5432"
data_dir: /var/lib/postgresql/data
pgpass: /tmp/pgpass
authentication:
replication:
username: replicator
superuser:
username: postgres
tags:
nofailover: false
noloadbalance: false
clonefrom: false
The maximum_lag_on_failover setting prevents promoting a replica that is too far behind. For AI agents, losing recent conversation turns is worse than brief downtime.
Connection Resilience in Agent Code
import asyncio
from contextlib import asynccontextmanager
class ResilientDBConnection:
def __init__(self, cluster: AgentDatabaseCluster, max_retries: int = 3):
self.cluster = cluster
self.max_retries = max_retries
@asynccontextmanager
async def transaction(self):
"""Provide a resilient transaction with automatic retry."""
last_error = None
for attempt in range(self.max_retries):
try:
async with self.cluster._current_primary.pool.acquire() as conn:
async with conn.transaction():
yield conn
return
except asyncpg.DeadlockDetectedError:
last_error = "deadlock"
await asyncio.sleep(0.1 * (2 ** attempt))
except asyncpg.ConnectionDoesNotExistError:
last_error = "connection_lost"
await self.cluster._handle_primary_failure()
await asyncio.sleep(0.5)
except asyncpg.SerializationError:
last_error = "serialization_conflict"
await asyncio.sleep(0.1 * (2 ** attempt))
raise Exception(f"Transaction failed after {self.max_retries} attempts: {last_error}")
async def save_agent_state(self, agent_id: str, state: dict):
"""Save agent state with conflict resolution."""
async with self.transaction() as conn:
await conn.execute("""
INSERT INTO agent_state (agent_id, state, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (agent_id)
DO UPDATE SET state = $2, updated_at = NOW()
WHERE agent_state.updated_at < NOW()
""", agent_id, state)
Deadlocks and serialization conflicts are common when multiple agents write to shared state tables. Retry with exponential backoff handles transient conflicts without failing the agent task.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Backup Testing and Disaster Recovery
import subprocess
from datetime import datetime
class BackupManager:
def __init__(self, primary_host: str, backup_path: str,
s3_bucket: str, notifier):
self.primary_host = primary_host
self.backup_path = backup_path
self.s3_bucket = s3_bucket
self.notifier = notifier
def create_backup(self) -> dict:
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
backup_file = f"{self.backup_path}/agent_db_{timestamp}.sql.gz"
result = subprocess.run(
["pg_dump", "-h", self.primary_host, "-U", "postgres",
"-d", "agent_db", "--format=custom",
"--compress=9", f"--file={backup_file}"],
capture_output=True, text=True,
)
if result.returncode != 0:
raise Exception(f"Backup failed: {result.stderr}")
# Upload to S3
subprocess.run(
["aws", "s3", "cp", backup_file,
f"s3://{self.s3_bucket}/backups/{timestamp}/"],
check=True,
)
return {"file": backup_file, "timestamp": timestamp}
def test_backup_restore(self, backup_file: str) -> dict:
"""Restore a backup to a test database and verify integrity."""
test_db = "agent_db_restore_test"
# Create test database
subprocess.run(
["createdb", "-h", self.primary_host, "-U", "postgres", test_db],
check=True,
)
try:
# Restore backup
start = datetime.utcnow()
subprocess.run(
["pg_restore", "-h", self.primary_host, "-U", "postgres",
"-d", test_db, "--no-owner", backup_file],
check=True,
)
restore_seconds = (datetime.utcnow() - start).total_seconds()
# Verify data integrity
result = subprocess.run(
["psql", "-h", self.primary_host, "-U", "postgres",
"-d", test_db, "-t", "-c",
"SELECT COUNT(*) FROM agent_conversations"],
capture_output=True, text=True,
)
row_count = int(result.stdout.strip())
return {
"status": "success",
"restore_time_seconds": restore_seconds,
"conversation_count": row_count,
"verified": row_count > 0,
}
finally:
subprocess.run(
["dropdb", "-h", self.primary_host, "-U", "postgres", test_db],
)
Test your backups regularly. A backup that has never been restored is a hypothesis, not a backup.
# backup-schedule.yaml
backup_policy:
full_backup:
schedule: "0 2 * * *" # daily at 2 AM
retention_days: 30
storage: "s3://agent-backups/daily/"
wal_archiving:
enabled: true
archive_command: "aws s3 cp %p s3://agent-backups/wal/%f"
recovery_target_time: "point-in-time within 5 minutes"
restore_testing:
schedule: "0 6 * * 0" # weekly Sunday at 6 AM
alert_on_failure: true
max_restore_time_minutes: 30
FAQ
Should AI agents use synchronous or asynchronous replication?
Use synchronous replication for agent state that is expensive to recreate — conversation history, completed tool results, and task progress. Use asynchronous replication for data that can be regenerated — cached LLM responses, analytics events, and audit logs. Synchronous replication adds latency to writes but prevents data loss during failover.
How do I handle database failover during an active agent conversation?
Implement connection retry at the application level with the conversation ID as the recovery key. When the database fails over, the agent should reconnect, reload the conversation state from the new primary, and resume from the last committed checkpoint. Design agent state saves as idempotent operations so partial writes during failover do not corrupt state.
What is the right backup frequency for AI agent databases?
Daily full backups plus continuous WAL archiving for point-in-time recovery. The key metric is Recovery Point Objective (RPO) — how much data you can afford to lose. For agent systems where each conversation represents significant LLM inference cost, target an RPO of under 5 minutes using WAL shipping. Test restores weekly and measure your Recovery Time Objective (RTO) to ensure it meets your SLA.
#DatabaseReliability #AIAgents #Replication #Failover #DisasterRecovery #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.