Skip to content
Back to Blog
Agentic AI7 min read

Building Autonomous Database Management with AI Agents

How to build AI agents that monitor, optimize, and manage databases autonomously. Covers query optimization, index recommendation, anomaly detection, automated migration generation, and safety guardrails for database operations.

The Case for AI-Driven Database Management

Database administration is one of the most tool-call-heavy, repetitive, and high-stakes domains in backend engineering. DBAs spend significant time on tasks that follow well-defined patterns: analyzing slow queries, recommending indexes, reviewing schema changes, and responding to performance alerts. These patterns make database management an excellent candidate for AI agent automation.

The key constraint is safety. A bad query can lock a table. A wrong index can slow writes. A botched migration can corrupt data. Any AI database agent must operate within strict guardrails that prevent destructive actions without human approval.

Architecture: The Database Agent

The database agent connects to your database through read-only and carefully scoped write tools. It ingests performance metrics, slow query logs, and schema information to provide recommendations and execute approved changes.

import anthropic
import psycopg2
from psycopg2 import sql

client = anthropic.Anthropic()

# Read-only database connection for analysis
read_conn = psycopg2.connect(
    host="db-replica.internal",  # Always use a replica for reads
    dbname="production",
    user="ai_agent_readonly",
    password=AGENT_DB_PASSWORD,
    options="-c statement_timeout=30000"  # 30s timeout
)

# Scoped write connection (only for approved operations)
write_conn = psycopg2.connect(
    host="db-primary.internal",
    dbname="production",
    user="ai_agent_writer",
    password=AGENT_WRITE_PASSWORD,
    options="-c statement_timeout=60000"
)

db_tools = [
    {
        "name": "query_slow_log",
        "description": "Retrieve the slowest queries from pg_stat_statements.",
        "input_schema": {
            "type": "object",
            "properties": {
                "min_duration_ms": {
                    "type": "integer",
                    "description": "Minimum average execution time in milliseconds"
                },
                "limit": {
                    "type": "integer",
                    "description": "Number of queries to return",
                    "default": 20
                }
            },
            "required": ["min_duration_ms"]
        }
    },
    {
        "name": "explain_query",
        "description": "Run EXPLAIN ANALYZE on a query and return the execution plan.",
        "input_schema": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "The SQL query to analyze"
                }
            },
            "required": ["query"]
        }
    },
    {
        "name": "get_table_stats",
        "description": "Get table statistics including row count, dead tuples, last vacuum, and index usage.",
        "input_schema": {
            "type": "object",
            "properties": {
                "table_name": {"type": "string"}
            },
            "required": ["table_name"]
        }
    },
    {
        "name": "get_index_usage",
        "description": "Get index usage statistics for a table.",
        "input_schema": {
            "type": "object",
            "properties": {
                "table_name": {"type": "string"}
            },
            "required": ["table_name"]
        }
    },
    {
        "name": "recommend_index",
        "description": "Generate an index recommendation. Does NOT execute it.",
        "input_schema": {
            "type": "object",
            "properties": {
                "table_name": {"type": "string"},
                "columns": {
                    "type": "array",
                    "items": {"type": "string"}
                },
                "index_type": {
                    "type": "string",
                    "enum": ["btree", "hash", "gin", "gist", "brin"]
                },
                "reasoning": {
                    "type": "string",
                    "description": "Explanation of why this index would help"
                }
            },
            "required": ["table_name", "columns", "index_type", "reasoning"]
        }
    }
]

Tool Implementation with Safety Guardrails

Every tool that interacts with the database must be wrapped in safety checks.

FORBIDDEN_KEYWORDS = {
    "DROP", "DELETE", "TRUNCATE", "ALTER", "UPDATE", "INSERT",
    "CREATE", "GRANT", "REVOKE"
}

def execute_read_query(query: str) -> str:
    """Execute a read-only query with safety validation."""
    # Safety check: reject any mutation keywords
    query_upper = query.upper().strip()
    for keyword in FORBIDDEN_KEYWORDS:
        if keyword in query_upper and keyword != "CREATE":
            raise SecurityError(
                f"Forbidden keyword '{keyword}' in read query: {query}"
            )

    # Only allow SELECT and EXPLAIN
    if not (query_upper.startswith("SELECT") or
            query_upper.startswith("EXPLAIN")):
        raise SecurityError(f"Only SELECT/EXPLAIN allowed: {query}")

    with read_conn.cursor() as cur:
        cur.execute(query)
        columns = [desc[0] for desc in cur.description]
        rows = cur.fetchall()
        return format_results(columns, rows)


def handle_tool_call(name: str, input_data: dict) -> str:
    """Route tool calls to their implementations."""
    if name == "query_slow_log":
        return execute_read_query(f"""
            SELECT query, calls, mean_exec_time, total_exec_time
            FROM pg_stat_statements
            WHERE mean_exec_time > {int(input_data['min_duration_ms'])}
            ORDER BY mean_exec_time DESC
            LIMIT {int(input_data.get('limit', 20))}
        """)

    elif name == "explain_query":
        query = input_data["query"]
        return execute_read_query(f"EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) {query}")

    elif name == "get_table_stats":
        table = input_data["table_name"]
        return execute_read_query(f"""
            SELECT
                n_live_tup AS live_rows,
                n_dead_tup AS dead_rows,
                last_vacuum,
                last_autovacuum,
                last_analyze,
                seq_scan,
                idx_scan
            FROM pg_stat_user_tables
            WHERE relname = '{table}'
        """)

    elif name == "recommend_index":
        # This tool generates a recommendation, not an execution
        return json.dumps({
            "status": "recommendation_saved",
            "ddl": f"CREATE INDEX CONCURRENTLY idx_{input_data['table_name']}_"
                   f"{'_'.join(input_data['columns'])} "
                   f"ON {input_data['table_name']} "
                   f"USING {input_data['index_type']} "
                   f"({', '.join(input_data['columns'])})",
            "reasoning": input_data["reasoning"],
            "requires_approval": True
        })

Automated Slow Query Analysis

The agent can continuously monitor slow queries and generate optimization recommendations.

async def analyze_slow_queries():
    """Run periodic slow query analysis."""
    messages = [{
        "role": "user",
        "content": """Analyze the current slow queries in the database.
For each slow query:
1. Use query_slow_log to find queries over 500ms average
2. For each slow query, use explain_query to get the execution plan
3. Check table statistics and index usage for involved tables
4. Recommend specific optimizations (indexes, query rewrites, etc.)

Provide a prioritized list of recommendations with estimated impact."""
    }]

    # Run the agent loop
    result = await run_agent_loop(
        system_prompt=DB_AGENT_SYSTEM_PROMPT,
        tools=db_tools,
        messages=messages,
        max_steps=30
    )

    return result

Schema Change Review

The agent can review proposed migration files and flag potential issues.

async def review_migration(migration_sql: str) -> dict:
    """AI review of a database migration before execution."""
    response = await async_client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=4096,
        thinking={"type": "enabled", "budget_tokens": 5000},
        messages=[{
            "role": "user",
            "content": f"""Review this database migration for safety and performance.

Migration SQL:
{migration_sql}

Check for:
1. Missing CONCURRENTLY on index creation (locks table)
2. NOT NULL additions without defaults (table rewrite)
3. Missing transactions around multi-statement migrations
4. Column type changes that require table rewrites
5. Foreign key additions without indexes on the referencing column
6. Potential for long-running locks on large tables
7. Missing rollback strategy

Rate the migration: SAFE / NEEDS_REVIEW / DANGEROUS

Provide specific concerns and suggested improvements."""
        }]
    )

    return {
        "review": response.content[0].text,
        "thinking": next(
            (b.thinking for b in response.content if hasattr(b, "thinking")),
            None
        )
    }

Anomaly Detection Pipeline

Connect the agent to your monitoring system to detect and diagnose database anomalies.

async def investigate_anomaly(alert: dict) -> dict:
    """Investigate a database performance anomaly."""
    messages = [{
        "role": "user",
        "content": f"""A database performance anomaly has been detected:

Alert type: {alert['type']}
Metric: {alert['metric']}
Current value: {alert['current_value']}
Normal range: {alert['normal_range']}
Affected tables: {alert.get('tables', 'unknown')}
Time: {alert['timestamp']}

Investigate this anomaly:
1. Check slow query log for recent changes
2. Examine table statistics for the affected tables
3. Look for missing indexes or sequential scans
4. Check for lock contention or dead tuples
5. Provide a root cause hypothesis and recommended fix"""
    }]

    result = await run_agent_loop(
        system_prompt=DB_AGENT_SYSTEM_PROMPT,
        tools=db_tools,
        messages=messages,
        max_steps=20
    )

    return {
        "investigation": result,
        "alert": alert,
        "timestamp": datetime.utcnow().isoformat()
    }

Safety Architecture: The Approval Pipeline

All write operations go through an approval pipeline. The agent can recommend, but humans approve.

class ApprovalPipeline:
    """Approval pipeline for database changes."""

    def __init__(self, db_conn, notification_service):
        self.db_conn = db_conn
        self.notifications = notification_service

    async def submit_recommendation(self, recommendation: dict) -> str:
        """Submit a recommendation for human approval."""
        ticket_id = str(uuid4())

        await self.db_conn.execute("""
            INSERT INTO db_change_requests
            (id, ddl, reasoning, risk_level, status, created_at)
            VALUES ($1, $2, $3, $4, 'pending', NOW())
        """, ticket_id, recommendation["ddl"],
           recommendation["reasoning"],
           recommendation.get("risk_level", "medium"))

        await self.notifications.send(
            channel="db-changes",
            message=f"New DB change request #{ticket_id}: "
                    f"{recommendation['reasoning']}"
        )

        return ticket_id

    async def execute_approved(self, ticket_id: str):
        """Execute a previously approved change."""
        request = await self.db_conn.fetchrow(
            "SELECT * FROM db_change_requests WHERE id = $1", ticket_id
        )

        if request["status"] != "approved":
            raise PermissionError("Change not yet approved")

        # Execute with monitoring
        start = time.time()
        try:
            await self.db_conn.execute(request["ddl"])
            duration = time.time() - start
            await self.db_conn.execute("""
                UPDATE db_change_requests
                SET status = 'executed', executed_at = NOW(),
                    execution_time_ms = $1
                WHERE id = $2
            """, int(duration * 1000), ticket_id)
        except Exception as e:
            await self.db_conn.execute("""
                UPDATE db_change_requests
                SET status = 'failed', error = $1
                WHERE id = $2
            """, str(e), ticket_id)
            raise

Summary

AI-driven database management is one of the most impactful applications of agentic AI in backend engineering. The pattern is consistent: read-only tools for analysis, structured recommendations for changes, and human approval for execution. By connecting Claude to your database's statistics views, slow query logs, and monitoring metrics, you can build an agent that continuously identifies optimization opportunities, reviews schema changes for safety, and investigates anomalies faster than any manual process. The critical requirement is the safety architecture: strict read-only defaults, forbidden keyword filtering, approval pipelines, and comprehensive audit logging.

Share this article
N

NYC News

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.