Skip to content
Learn Agentic AI10 min read0 views

AI Agent for Database Operations: Automated Query Optimization and Index Management

Build an AI agent that monitors PostgreSQL performance, detects slow queries, recommends optimal indexes, schedules vacuum operations, and plans for capacity growth.

Why Database Operations Need an Agent

Database performance degrades gradually. A query that ran in 5ms at launch takes 500ms with 10 million rows. An index that was perfect for v1 of your schema becomes a liability after v5. Dead tuples pile up. Connection pools saturate. By the time a human notices, users are already complaining. An AI database operations agent monitors these signals continuously and acts before problems become outages.

Slow Query Detection

The agent reads from PostgreSQL's pg_stat_statements extension, which tracks query execution statistics without modifying your application.

import asyncpg
from dataclasses import dataclass
from typing import Optional

@dataclass
class SlowQuery:
    query: str
    calls: int
    total_time_ms: float
    mean_time_ms: float
    rows_returned: int
    shared_blks_hit: int
    shared_blks_read: int
    cache_hit_ratio: float

class QueryMonitor:
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool: Optional[asyncpg.Pool] = None

    async def connect(self):
        self.pool = await asyncpg.create_pool(self.dsn, min_size=2, max_size=5)

    async def get_slow_queries(
        self, min_mean_ms: float = 100, min_calls: int = 10, limit: int = 20
    ) -> list[SlowQuery]:
        rows = await self.pool.fetch("""
            SELECT
                query,
                calls,
                total_exec_time AS total_time_ms,
                mean_exec_time AS mean_time_ms,
                rows,
                shared_blks_hit,
                shared_blks_read,
                CASE WHEN shared_blks_hit + shared_blks_read = 0
                     THEN 1.0
                     ELSE shared_blks_hit::float /
                          (shared_blks_hit + shared_blks_read)
                END AS cache_hit_ratio
            FROM pg_stat_statements
            WHERE mean_exec_time > $1
              AND calls > $2
              AND query NOT LIKE '%pg_stat%'
            ORDER BY total_exec_time DESC
            LIMIT $3
        """, min_mean_ms, min_calls, limit)

        return [SlowQuery(
            query=r["query"],
            calls=r["calls"],
            total_time_ms=r["total_time_ms"],
            mean_time_ms=r["mean_time_ms"],
            rows_returned=r["rows"],
            shared_blks_hit=r["shared_blks_hit"],
            shared_blks_read=r["shared_blks_read"],
            cache_hit_ratio=r["cache_hit_ratio"],
        ) for r in rows]

Index Recommendation Engine

The agent analyzes query plans and existing indexes to recommend new indexes or identify unused ones.

import openai
import json

async def get_query_plan(pool: asyncpg.Pool, query: str) -> str:
    rows = await pool.fetch(f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}")
    return json.dumps(rows[0]["QUERY PLAN"], indent=2)

async def get_existing_indexes(pool: asyncpg.Pool, table: str) -> list[dict]:
    rows = await pool.fetch("""
        SELECT
            indexname, indexdef,
            idx_scan, idx_tup_read, idx_tup_fetch,
            pg_size_pretty(pg_relation_size(indexrelid)) AS size
        FROM pg_stat_user_indexes
        JOIN pg_indexes ON indexname = pg_stat_user_indexes.indexrelname
        WHERE schemaname = 'public' AND tablename = $1
        ORDER BY idx_scan DESC
    """, table)
    return [dict(r) for r in rows]

async def recommend_indexes(
    pool: asyncpg.Pool, slow_query: SlowQuery
) -> dict:
    plan = await get_query_plan(pool, slow_query.query)

    # Extract table names from the query for index lookup
    tables = await pool.fetch("""
        SELECT DISTINCT tablename FROM pg_tables
        WHERE schemaname = 'public'
    """)
    table_names = [t["tablename"] for t in tables]

    mentioned_tables = [
        t for t in table_names if t in slow_query.query.lower()
    ]

    existing = {}
    for table in mentioned_tables:
        existing[table] = await get_existing_indexes(pool, table)

    client = openai.AsyncOpenAI()
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{
            "role": "user",
            "content": f"""Analyze this slow PostgreSQL query and recommend indexes.

Query: {slow_query.query}
Mean execution time: {slow_query.mean_time_ms:.1f}ms
Total calls: {slow_query.calls}
Cache hit ratio: {slow_query.cache_hit_ratio:.3f}

Query plan:
{plan}

Existing indexes on mentioned tables:
{json.dumps(existing, indent=2, default=str)}

Return JSON with:
- recommended_indexes: list of CREATE INDEX statements
- unused_indexes: list of indexes with zero scans that could be dropped
- explanation: why these indexes help
- estimated_improvement: percentage speed improvement estimate"""
        }],
        response_format={"type": "json_object"},
        temperature=0.0,
    )
    return json.loads(response.choices[0].message.content)

Vacuum Scheduling

The agent monitors dead tuple counts and schedules vacuum operations during low-traffic windows.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from datetime import datetime

@dataclass
class TableBloat:
    table_name: str
    live_tuples: int
    dead_tuples: int
    dead_ratio: float
    last_vacuum: Optional[datetime]
    last_autovacuum: Optional[datetime]
    table_size: str

async def check_vacuum_needs(pool: asyncpg.Pool) -> list[TableBloat]:
    rows = await pool.fetch("""
        SELECT
            relname AS table_name,
            n_live_tup AS live_tuples,
            n_dead_tup AS dead_tuples,
            CASE WHEN n_live_tup = 0 THEN 0
                 ELSE n_dead_tup::float / n_live_tup
            END AS dead_ratio,
            last_vacuum,
            last_autovacuum,
            pg_size_pretty(pg_total_relation_size(relid)) AS table_size
        FROM pg_stat_user_tables
        WHERE n_dead_tup > 1000
        ORDER BY n_dead_tup DESC
    """)
    return [TableBloat(**dict(r)) for r in rows]

async def schedule_vacuum(
    pool: asyncpg.Pool, table: str, is_low_traffic: bool
) -> str:
    if not is_low_traffic:
        return f"Skipping vacuum for {table}: not in low-traffic window"

    await pool.execute(f"VACUUM (VERBOSE, ANALYZE) {table}")
    return f"Vacuum completed for {table}"

def is_low_traffic_window() -> bool:
    hour = datetime.utcnow().hour
    return 2 <= hour <= 6  # UTC 2-6 AM

Capacity Planning

The agent tracks database growth trends and projects when you will hit capacity limits.

import numpy as np

async def analyze_growth_trend(
    pool: asyncpg.Pool, table: str, days_history: int = 90
) -> dict:
    """Analyze table growth and project when limits will be reached."""
    rows = await pool.fetch("""
        SELECT
            pg_total_relation_size($1) AS current_bytes,
            (SELECT setting::bigint * pg_size_bytes('1kB')
             FROM pg_settings WHERE name = 'max_wal_size') AS max_wal,
            (SELECT setting FROM pg_settings
             WHERE name = 'max_connections') AS max_conn
    """, table)

    current_size = rows[0]["current_bytes"]

    # Simulate historical growth (in production, store daily snapshots)
    daily_growth_rate = current_size * 0.02  # 2% daily growth estimate
    days_to_100gb = (100 * 1024**3 - current_size) / daily_growth_rate

    return {
        "table": table,
        "current_size_gb": current_size / (1024**3),
        "daily_growth_gb": daily_growth_rate / (1024**3),
        "days_to_100gb": max(0, int(days_to_100gb)),
        "recommendation": (
            "Consider partitioning" if days_to_100gb < 90
            else "Growth is manageable"
        ),
    }

FAQ

How do I safely apply index recommendations in production?

Always create indexes with CREATE INDEX CONCURRENTLY to avoid locking writes. The agent should generate the concurrent version of the DDL. Test the index on a replica first by running the slow query before and after index creation. Only apply to production after confirming improvement on the replica.

Should the agent run VACUUM FULL automatically?

Never. VACUUM FULL rewrites the entire table and takes an exclusive lock, blocking all reads and writes. The regular VACUUM (which the agent schedules) is safe for online operation. If VACUUM FULL is needed, the agent should flag it as a manual action requiring a maintenance window.

How does the agent handle parameterized queries in pg_stat_statements?

PostgreSQL normalizes queries in pg_stat_statements by replacing literal values with $1, $2, etc. The agent works with these normalized forms since that is what matters for index recommendations. When generating EXPLAIN plans, it substitutes reasonable sample values for the parameters.


#Database #PostgreSQL #QueryOptimization #DevOps #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.