Skip to content
Learn Agentic AI10 min read0 views

AI Agent for Capacity Planning: Predicting Resource Needs Before They Become Critical

Build an AI agent that analyzes infrastructure usage trends, forecasts resource exhaustion, sets dynamic threshold alerts, and generates scaling recommendations before outages occur.

The Capacity Planning Problem

Capacity planning fails in two directions. Over-provision and you waste money. Under-provision and you face outages. Static thresholds like "alert at 80% disk" are better than nothing but they do not account for growth rate. A disk at 80% that grows 0.1% per day gives you months. A disk at 60% that grows 5% per day gives you a week. An AI capacity planning agent focuses on trajectories rather than snapshots.

Collecting Historical Resource Data

The agent needs time-series data for compute, memory, disk, network, and application-specific metrics. It stores daily snapshots for trend analysis.

import asyncpg
import httpx
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional

@dataclass
class ResourceSnapshot:
    resource_id: str
    resource_type: str  # "cpu", "memory", "disk", "connections"
    current_value: float
    max_value: float
    utilization_pct: float
    timestamp: datetime

class CapacityCollector:
    def __init__(self, prometheus_url: str, db_dsn: str):
        self.prom_url = prometheus_url
        self.db_dsn = db_dsn
        self.http = httpx.AsyncClient(timeout=30)

    async def collect_snapshots(self) -> list[ResourceSnapshot]:
        queries = {
            "cpu": (
                'avg(rate(container_cpu_usage_seconds_total[5m])) by (pod)',
                'avg(kube_pod_container_resource_limits{resource="cpu"}) by (pod)',
            ),
            "memory": (
                'avg(container_memory_usage_bytes) by (pod)',
                'avg(kube_pod_container_resource_limits{resource="memory"}) by (pod)',
            ),
            "disk": (
                'node_filesystem_size_bytes - node_filesystem_avail_bytes',
                'node_filesystem_size_bytes',
            ),
        }

        snapshots = []
        for rtype, (usage_q, limit_q) in queries.items():
            usage = await self._query_prometheus(usage_q)
            limits = await self._query_prometheus(limit_q)

            for metric in usage:
                pod = metric["metric"].get("pod", "node")
                value = float(metric["value"][1])
                limit = self._find_limit(limits, pod)
                if limit and limit > 0:
                    snapshots.append(ResourceSnapshot(
                        resource_id=pod,
                        resource_type=rtype,
                        current_value=value,
                        max_value=limit,
                        utilization_pct=(value / limit) * 100,
                        timestamp=datetime.utcnow(),
                    ))
        return snapshots

    async def _query_prometheus(self, query: str) -> list:
        resp = await self.http.get(
            f"{self.prom_url}/api/v1/query",
            params={"query": query},
        )
        return resp.json()["data"]["result"]

    def _find_limit(self, limits: list, pod: str) -> Optional[float]:
        for m in limits:
            if m["metric"].get("pod") == pod:
                return float(m["value"][1])
        return None

    async def store_snapshot(self, snapshot: ResourceSnapshot):
        pool = await asyncpg.create_pool(self.db_dsn)
        await pool.execute("""
            INSERT INTO capacity_snapshots
            (resource_id, resource_type, current_value, max_value,
             utilization_pct, timestamp)
            VALUES ($1, $2, $3, $4, $5, $6)
        """, snapshot.resource_id, snapshot.resource_type,
            snapshot.current_value, snapshot.max_value,
            snapshot.utilization_pct, snapshot.timestamp)
        await pool.close()

Trend Analysis and Forecasting

The agent uses linear regression on historical snapshots to project when resources will be exhausted.

import numpy as np
from scipy.stats import linregress

@dataclass
class CapacityForecast:
    resource_id: str
    resource_type: str
    current_pct: float
    growth_rate_per_day: float
    days_to_80_pct: Optional[int]
    days_to_90_pct: Optional[int]
    days_to_100_pct: Optional[int]
    confidence: float
    trend: str  # "growing", "stable", "shrinking"

class TrendAnalyzer:
    def __init__(self, warning_days: int = 14, critical_days: int = 7):
        self.warning_days = warning_days
        self.critical_days = critical_days

    def forecast(
        self, snapshots: list[ResourceSnapshot]
    ) -> CapacityForecast:
        if len(snapshots) < 7:
            return self._insufficient_data(snapshots[-1])

        timestamps = np.array([
            s.timestamp.timestamp() for s in snapshots
        ])
        values = np.array([s.utilization_pct for s in snapshots])

        # Convert to days from first observation
        days = (timestamps - timestamps[0]) / 86400.0
        slope, intercept, r_value, p_value, std_err = linregress(days, values)

        current = values[-1]
        daily_growth = slope  # percentage points per day

        def days_to_threshold(threshold: float) -> Optional[int]:
            if daily_growth <= 0:
                return None
            remaining = threshold - current
            if remaining <= 0:
                return 0
            return int(remaining / daily_growth)

        if abs(daily_growth) < 0.1:
            trend = "stable"
        elif daily_growth > 0:
            trend = "growing"
        else:
            trend = "shrinking"

        return CapacityForecast(
            resource_id=snapshots[-1].resource_id,
            resource_type=snapshots[-1].resource_type,
            current_pct=current,
            growth_rate_per_day=daily_growth,
            days_to_80_pct=days_to_threshold(80),
            days_to_90_pct=days_to_threshold(90),
            days_to_100_pct=days_to_threshold(100),
            confidence=r_value ** 2,
            trend=trend,
        )

    def _insufficient_data(self, latest: ResourceSnapshot) -> CapacityForecast:
        return CapacityForecast(
            resource_id=latest.resource_id,
            resource_type=latest.resource_type,
            current_pct=latest.utilization_pct,
            growth_rate_per_day=0.0,
            days_to_80_pct=None,
            days_to_90_pct=None,
            days_to_100_pct=None,
            confidence=0.0,
            trend="unknown",
        )

Scaling Recommendations with LLM Reasoning

The agent uses an LLM to turn raw forecasts into actionable scaling recommendations.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import openai
import json

async def generate_scaling_plan(
    forecasts: list[CapacityForecast],
) -> list[dict]:
    critical = [f for f in forecasts if f.days_to_90_pct is not None and f.days_to_90_pct < 14]
    if not critical:
        return []

    client = openai.AsyncOpenAI()
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{
            "role": "user",
            "content": f"""Generate scaling recommendations for these resources
that will hit capacity limits within 14 days.

Resources approaching limits:
{json.dumps([{
    "resource": f.resource_id,
    "type": f.resource_type,
    "current": f"{f.current_pct:.1f}%",
    "daily_growth": f"{f.growth_rate_per_day:.2f}%/day",
    "days_to_90": f.days_to_90_pct,
    "days_to_100": f.days_to_100_pct,
} for f in critical], indent=2)}

For each resource provide JSON array with:
- resource_id, action (scale_up, add_node, increase_limit, archive_data),
  urgency (immediate, this_week, next_sprint), specific_steps (list),
  estimated_cost_impact"""
        }],
        response_format={"type": "json_object"},
        temperature=0.1,
    )
    return json.loads(response.choices[0].message.content).get("recommendations", [])

Dynamic Threshold Alerts

Instead of static thresholds, the agent sets alerts based on how fast a resource is approaching its limit.

async def evaluate_alerts(forecasts: list[CapacityForecast]) -> list[dict]:
    alerts = []
    for f in forecasts:
        if f.days_to_100_pct is not None and f.days_to_100_pct <= 3:
            alerts.append({
                "severity": "critical",
                "resource": f.resource_id,
                "message": (
                    f"{f.resource_type} at {f.current_pct:.1f}% and growing "
                    f"{f.growth_rate_per_day:.1f}%/day. Exhaustion in "
                    f"{f.days_to_100_pct} days."
                ),
            })
        elif f.days_to_90_pct is not None and f.days_to_90_pct <= 7:
            alerts.append({
                "severity": "warning",
                "resource": f.resource_id,
                "message": (
                    f"{f.resource_type} at {f.current_pct:.1f}%, "
                    f"reaching 90% in {f.days_to_90_pct} days."
                ),
            })
    return alerts

FAQ

How do I account for seasonal traffic patterns like Black Friday or month-end processing?

Augment linear regression with seasonal decomposition. Store at least one full cycle of historical data (one year for annual patterns, one month for monthly). Use the seasonal component to adjust forecasts. The agent should flag upcoming high-traffic events from a calendar and factor in the expected multiplier.

What if the growth rate changes suddenly due to a new feature launch?

Use a weighted regression that gives more importance to recent data points. A 7-day exponentially weighted average reacts faster to trend changes than a flat 90-day average. The agent should also watch for change points where the growth rate itself shifts and alert when the slope increases significantly.

How do I handle resources that have hard limits that cannot be scaled (like database connections)?

For hard-limited resources, the agent must recommend architectural changes rather than simple scaling. If PostgreSQL max_connections is at 80% and growing, the recommendation might be to add PgBouncer for connection pooling or to implement connection sharing in the application layer. The LLM reasoning step should know about these architectural options.


#CapacityPlanning #Forecasting #SRE #DevOps #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.