Skip to content
Learn Agentic AI12 min read0 views

API Monitoring and Analytics for AI Agent Services: Tracking Usage, Errors, and Performance

Build comprehensive monitoring and analytics for AI agent APIs, covering structured request logging, error tracking with context, usage analytics dashboards, and alerting systems that catch agent failures before users notice.

Why Standard Monitoring Falls Short for Agent APIs

Standard API monitoring tracks request counts, latency percentiles, and error rates. These metrics matter, but AI agent APIs have additional dimensions that generic monitoring misses. You need to track token consumption and costs per agent, tool call success rates, conversation completion rates, and the chain of requests that form a single agent interaction.

A single user conversation might generate 15 API requests — a session creation, five message exchanges, four tool calls, and five tool result submissions. Traditional per-request monitoring shows 15 independent events. Agent-aware monitoring connects them into one conversation trace with aggregate cost, latency, and outcome metrics.

Structured Request Logging

Build a logging middleware that captures agent-specific context alongside standard HTTP metrics:

from fastapi import FastAPI, Request, Response
import time
import json
import logging
from uuid import uuid4

app = FastAPI()
logger = logging.getLogger("agent_api")
logger.setLevel(logging.INFO)

@app.middleware("http")
async def request_logging_middleware(request: Request, call_next):
    request_id = str(uuid4())
    start_time = time.perf_counter()

    # Extract agent context from headers and path
    agent_id = request.headers.get("X-Agent-ID", "unknown")
    api_key_prefix = (request.headers.get("X-API-Key", "")[:12] + "...")
    session_id = request.path_params.get("conv_id", "none")

    # Store request_id for use in route handlers
    request.state.request_id = request_id

    response: Response = await call_next(request)

    duration_ms = (time.perf_counter() - start_time) * 1000

    log_entry = {
        "request_id": request_id,
        "timestamp": time.time(),
        "method": request.method,
        "path": request.url.path,
        "status_code": response.status_code,
        "duration_ms": round(duration_ms, 2),
        "agent_id": agent_id,
        "session_id": session_id,
        "api_key_prefix": api_key_prefix,
        "content_length": response.headers.get("content-length", 0),
    }

    if response.status_code >= 400:
        logger.warning(json.dumps(log_entry))
    else:
        logger.info(json.dumps(log_entry))

    response.headers["X-Request-ID"] = request_id
    response.headers["X-Response-Time"] = f"{duration_ms:.0f}ms"
    return response

Every log entry includes the agent ID, session ID, and API key prefix (never the full key). This lets you filter logs by agent, conversation, or consumer without exposing secrets.

Token Usage and Cost Tracking

AI agent APIs need cost-aware monitoring. Track token consumption at both the request and conversation level:

from dataclasses import dataclass, field
from collections import defaultdict
import asyncio

@dataclass
class UsageRecord:
    agent_id: str
    session_id: str
    prompt_tokens: int
    completion_tokens: int
    model: str
    cost_usd: float
    timestamp: float

class UsageTracker:
    def __init__(self):
        self.records: list[UsageRecord] = []
        self._lock = asyncio.Lock()

    # Cost per 1K tokens (example rates)
    MODEL_COSTS = {
        "gpt-4o": {"prompt": 0.005, "completion": 0.015},
        "gpt-4o-mini": {"prompt": 0.00015, "completion": 0.0006},
        "claude-sonnet-4-20250514": {"prompt": 0.003, "completion": 0.015},
    }

    def calculate_cost(
        self, model: str, prompt_tokens: int, completion_tokens: int
    ) -> float:
        rates = self.MODEL_COSTS.get(model, {"prompt": 0.01, "completion": 0.03})
        return (
            (prompt_tokens / 1000) * rates["prompt"]
            + (completion_tokens / 1000) * rates["completion"]
        )

    async def record(
        self, agent_id: str, session_id: str,
        prompt_tokens: int, completion_tokens: int, model: str,
    ):
        cost = self.calculate_cost(model, prompt_tokens, completion_tokens)
        async with self._lock:
            self.records.append(UsageRecord(
                agent_id=agent_id,
                session_id=session_id,
                prompt_tokens=prompt_tokens,
                completion_tokens=completion_tokens,
                model=model,
                cost_usd=cost,
                timestamp=time.time(),
            ))

    async def get_agent_usage(self, agent_id: str, since: float) -> dict:
        records = [
            r for r in self.records
            if r.agent_id == agent_id and r.timestamp >= since
        ]
        return {
            "total_requests": len(records),
            "total_prompt_tokens": sum(r.prompt_tokens for r in records),
            "total_completion_tokens": sum(r.completion_tokens for r in records),
            "total_cost_usd": round(sum(r.cost_usd for r in records), 4),
        }

usage_tracker = UsageTracker()

Error Tracking with Agent Context

When an agent API call fails, you need more context than a stack trace. Capture the conversation state, the agent configuration, and the specific step that failed:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from pydantic import BaseModel
from typing import Optional
import traceback

class AgentError(BaseModel):
    request_id: str
    agent_id: str
    session_id: str
    error_type: str
    error_message: str
    step: str  # "preprocessing", "llm_call", "tool_execution", "postprocessing"
    context: dict
    stack_trace: Optional[str] = None

error_log: list[AgentError] = []

async def track_error(
    request_id: str, agent_id: str, session_id: str,
    error: Exception, step: str, context: dict,
):
    error_entry = AgentError(
        request_id=request_id,
        agent_id=agent_id,
        session_id=session_id,
        error_type=type(error).__name__,
        error_message=str(error),
        step=step,
        context={k: v for k, v in context.items() if k != "api_key"},
        stack_trace=traceback.format_exc(),
    )
    error_log.append(error_entry)
    logger.error(json.dumps(error_entry.model_dump()))

# Usage in agent execution
async def execute_agent_with_tracking(request_id: str, agent_id: str, session_id: str, message: str):
    try:
        result = await call_llm(agent_id, message)
    except Exception as e:
        await track_error(
            request_id=request_id,
            agent_id=agent_id,
            session_id=session_id,
            error=e,
            step="llm_call",
            context={"message_length": len(message), "agent_id": agent_id},
        )
        raise
    return result

The step field is critical — it tells you whether the failure happened during input preprocessing, LLM inference, tool execution, or response postprocessing. This narrows debugging from "something failed" to "the tool execution step failed for agent X in session Y."

Analytics Endpoints

Expose analytics through dedicated API endpoints for dashboards:

@app.get("/v1/analytics/usage", tags=["Analytics"])
async def get_usage_analytics(
    agent_id: str | None = None,
    hours: int = 24,
):
    since = time.time() - (hours * 3600)
    if agent_id:
        return await usage_tracker.get_agent_usage(agent_id, since)

    # Aggregate across all agents
    all_records = [r for r in usage_tracker.records if r.timestamp >= since]
    by_agent = defaultdict(list)
    for r in all_records:
        by_agent[r.agent_id].append(r)

    return {
        "period_hours": hours,
        "total_requests": len(all_records),
        "total_cost_usd": round(sum(r.cost_usd for r in all_records), 4),
        "by_agent": {
            aid: {
                "requests": len(records),
                "cost_usd": round(sum(r.cost_usd for r in records), 4),
                "avg_latency_ms": 0,  # Compute from request logs
            }
            for aid, records in by_agent.items()
        },
    }

@app.get("/v1/analytics/errors", tags=["Analytics"])
async def get_error_analytics(hours: int = 24):
    since = time.time() - (hours * 3600)
    recent_errors = [
        e for e in error_log if e.context.get("timestamp", 0) >= since
        or True  # Simplified; use proper timestamp in production
    ]
    by_step = defaultdict(int)
    by_type = defaultdict(int)
    for e in recent_errors[-1000:]:
        by_step[e.step] += 1
        by_type[e.error_type] += 1

    return {
        "total_errors": len(recent_errors),
        "by_step": dict(by_step),
        "by_error_type": dict(by_type),
        "recent": [e.model_dump() for e in recent_errors[-10:]],
    }

Alerting Rules

Define alert conditions that catch agent-specific failure patterns:

ALERT_RULES = [
    {
        "name": "high_error_rate",
        "condition": "error_rate > 5%",
        "window_minutes": 5,
        "severity": "critical",
    },
    {
        "name": "cost_spike",
        "condition": "hourly_cost > 2x rolling_avg",
        "window_minutes": 60,
        "severity": "warning",
    },
    {
        "name": "tool_failure_streak",
        "condition": "consecutive_tool_failures > 10",
        "window_minutes": 10,
        "severity": "critical",
    },
    {
        "name": "latency_degradation",
        "condition": "p95_latency > 10000ms",
        "window_minutes": 5,
        "severity": "warning",
    },
]

async def check_alerts():
    """Run on a schedule (e.g., every minute)."""
    for rule in ALERT_RULES:
        if await evaluate_condition(rule):
            await send_alert(
                channel="slack",
                message=f"[{rule['severity'].upper()}] {rule['name']}: "
                        f"{rule['condition']}",
            )

The cost spike alert is particularly important for AI agent APIs. A single misconfigured agent loop can burn through your LLM budget in minutes. Alerting when hourly cost exceeds twice the rolling average catches these runaway scenarios quickly.

FAQ

What is the most important metric to monitor for AI agent APIs?

Cost per conversation is the single most actionable metric. It combines token usage, tool call frequency, and conversation length into one number that directly impacts your bottom line. Track it per agent and set alerts when it exceeds expected thresholds. High cost per conversation often indicates prompt inefficiency or unnecessary tool call loops.

How do I trace a full agent conversation across multiple API requests?

Use the session or conversation ID as a correlation key across all log entries, metrics, and traces. Every request within a conversation carries this ID. Your logging middleware captures it, your analytics aggregates by it, and your dashboards filter on it. This turns 15 independent log entries into one coherent conversation timeline.

Should I store monitoring data in the same database as application data?

No. Use a separate time-series database or logging service for monitoring data. Application databases are optimized for transactional reads and writes. Monitoring data is append-heavy with time-range queries. Mixing them risks monitoring writes degrading application performance during traffic spikes — exactly when monitoring matters most.


#APIMonitoring #AIAgents #Analytics #FastAPI #Observability #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.