AI Agent for Database Operations: Automated Query Optimization and Index Management
Build an AI agent that monitors PostgreSQL performance, detects slow queries, recommends optimal indexes, schedules vacuum operations, and plans for capacity growth.
Why Database Operations Need an Agent
Database performance degrades gradually. A query that ran in 5ms at launch takes 500ms with 10 million rows. An index that was perfect for v1 of your schema becomes a liability after v5. Dead tuples pile up. Connection pools saturate. By the time a human notices, users are already complaining. An AI database operations agent monitors these signals continuously and acts before problems become outages.
Slow Query Detection
The agent reads from PostgreSQL's pg_stat_statements extension, which tracks query execution statistics without modifying your application.
import asyncpg
from dataclasses import dataclass
from typing import Optional
@dataclass
class SlowQuery:
query: str
calls: int
total_time_ms: float
mean_time_ms: float
rows_returned: int
shared_blks_hit: int
shared_blks_read: int
cache_hit_ratio: float
class QueryMonitor:
def __init__(self, dsn: str):
self.dsn = dsn
self.pool: Optional[asyncpg.Pool] = None
async def connect(self):
self.pool = await asyncpg.create_pool(self.dsn, min_size=2, max_size=5)
async def get_slow_queries(
self, min_mean_ms: float = 100, min_calls: int = 10, limit: int = 20
) -> list[SlowQuery]:
rows = await self.pool.fetch("""
SELECT
query,
calls,
total_exec_time AS total_time_ms,
mean_exec_time AS mean_time_ms,
rows,
shared_blks_hit,
shared_blks_read,
CASE WHEN shared_blks_hit + shared_blks_read = 0
THEN 1.0
ELSE shared_blks_hit::float /
(shared_blks_hit + shared_blks_read)
END AS cache_hit_ratio
FROM pg_stat_statements
WHERE mean_exec_time > $1
AND calls > $2
AND query NOT LIKE '%pg_stat%'
ORDER BY total_exec_time DESC
LIMIT $3
""", min_mean_ms, min_calls, limit)
return [SlowQuery(
query=r["query"],
calls=r["calls"],
total_time_ms=r["total_time_ms"],
mean_time_ms=r["mean_time_ms"],
rows_returned=r["rows"],
shared_blks_hit=r["shared_blks_hit"],
shared_blks_read=r["shared_blks_read"],
cache_hit_ratio=r["cache_hit_ratio"],
) for r in rows]
Index Recommendation Engine
The agent analyzes query plans and existing indexes to recommend new indexes or identify unused ones.
import openai
import json
async def get_query_plan(pool: asyncpg.Pool, query: str) -> str:
rows = await pool.fetch(f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}")
return json.dumps(rows[0]["QUERY PLAN"], indent=2)
async def get_existing_indexes(pool: asyncpg.Pool, table: str) -> list[dict]:
rows = await pool.fetch("""
SELECT
indexname, indexdef,
idx_scan, idx_tup_read, idx_tup_fetch,
pg_size_pretty(pg_relation_size(indexrelid)) AS size
FROM pg_stat_user_indexes
JOIN pg_indexes ON indexname = pg_stat_user_indexes.indexrelname
WHERE schemaname = 'public' AND tablename = $1
ORDER BY idx_scan DESC
""", table)
return [dict(r) for r in rows]
async def recommend_indexes(
pool: asyncpg.Pool, slow_query: SlowQuery
) -> dict:
plan = await get_query_plan(pool, slow_query.query)
# Extract table names from the query for index lookup
tables = await pool.fetch("""
SELECT DISTINCT tablename FROM pg_tables
WHERE schemaname = 'public'
""")
table_names = [t["tablename"] for t in tables]
mentioned_tables = [
t for t in table_names if t in slow_query.query.lower()
]
existing = {}
for table in mentioned_tables:
existing[table] = await get_existing_indexes(pool, table)
client = openai.AsyncOpenAI()
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "user",
"content": f"""Analyze this slow PostgreSQL query and recommend indexes.
Query: {slow_query.query}
Mean execution time: {slow_query.mean_time_ms:.1f}ms
Total calls: {slow_query.calls}
Cache hit ratio: {slow_query.cache_hit_ratio:.3f}
Query plan:
{plan}
Existing indexes on mentioned tables:
{json.dumps(existing, indent=2, default=str)}
Return JSON with:
- recommended_indexes: list of CREATE INDEX statements
- unused_indexes: list of indexes with zero scans that could be dropped
- explanation: why these indexes help
- estimated_improvement: percentage speed improvement estimate"""
}],
response_format={"type": "json_object"},
temperature=0.0,
)
return json.loads(response.choices[0].message.content)
Vacuum Scheduling
The agent monitors dead tuple counts and schedules vacuum operations during low-traffic windows.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from datetime import datetime
@dataclass
class TableBloat:
table_name: str
live_tuples: int
dead_tuples: int
dead_ratio: float
last_vacuum: Optional[datetime]
last_autovacuum: Optional[datetime]
table_size: str
async def check_vacuum_needs(pool: asyncpg.Pool) -> list[TableBloat]:
rows = await pool.fetch("""
SELECT
relname AS table_name,
n_live_tup AS live_tuples,
n_dead_tup AS dead_tuples,
CASE WHEN n_live_tup = 0 THEN 0
ELSE n_dead_tup::float / n_live_tup
END AS dead_ratio,
last_vacuum,
last_autovacuum,
pg_size_pretty(pg_total_relation_size(relid)) AS table_size
FROM pg_stat_user_tables
WHERE n_dead_tup > 1000
ORDER BY n_dead_tup DESC
""")
return [TableBloat(**dict(r)) for r in rows]
async def schedule_vacuum(
pool: asyncpg.Pool, table: str, is_low_traffic: bool
) -> str:
if not is_low_traffic:
return f"Skipping vacuum for {table}: not in low-traffic window"
await pool.execute(f"VACUUM (VERBOSE, ANALYZE) {table}")
return f"Vacuum completed for {table}"
def is_low_traffic_window() -> bool:
hour = datetime.utcnow().hour
return 2 <= hour <= 6 # UTC 2-6 AM
Capacity Planning
The agent tracks database growth trends and projects when you will hit capacity limits.
import numpy as np
async def analyze_growth_trend(
pool: asyncpg.Pool, table: str, days_history: int = 90
) -> dict:
"""Analyze table growth and project when limits will be reached."""
rows = await pool.fetch("""
SELECT
pg_total_relation_size($1) AS current_bytes,
(SELECT setting::bigint * pg_size_bytes('1kB')
FROM pg_settings WHERE name = 'max_wal_size') AS max_wal,
(SELECT setting FROM pg_settings
WHERE name = 'max_connections') AS max_conn
""", table)
current_size = rows[0]["current_bytes"]
# Simulate historical growth (in production, store daily snapshots)
daily_growth_rate = current_size * 0.02 # 2% daily growth estimate
days_to_100gb = (100 * 1024**3 - current_size) / daily_growth_rate
return {
"table": table,
"current_size_gb": current_size / (1024**3),
"daily_growth_gb": daily_growth_rate / (1024**3),
"days_to_100gb": max(0, int(days_to_100gb)),
"recommendation": (
"Consider partitioning" if days_to_100gb < 90
else "Growth is manageable"
),
}
FAQ
How do I safely apply index recommendations in production?
Always create indexes with CREATE INDEX CONCURRENTLY to avoid locking writes. The agent should generate the concurrent version of the DDL. Test the index on a replica first by running the slow query before and after index creation. Only apply to production after confirming improvement on the replica.
Should the agent run VACUUM FULL automatically?
Never. VACUUM FULL rewrites the entire table and takes an exclusive lock, blocking all reads and writes. The regular VACUUM (which the agent schedules) is safe for online operation. If VACUUM FULL is needed, the agent should flag it as a manual action requiring a maintenance window.
How does the agent handle parameterized queries in pg_stat_statements?
PostgreSQL normalizes queries in pg_stat_statements by replacing literal values with $1, $2, etc. The agent works with these normalized forms since that is what matters for index recommendations. When generating EXPLAIN plans, it substitutes reasonable sample values for the parameters.
#Database #PostgreSQL #QueryOptimization #DevOps #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.