Monitoring AI Agent Behavior: Detecting Anomalies and Preventing Misuse
Build a behavioral monitoring system for AI agents that establishes baselines, detects anomalies in tool usage and output patterns, triggers alerts, and implements automated shutdown for runaway agents.
Why Agent Monitoring Differs from API Monitoring
Traditional API monitoring tracks latency, error rates, and throughput. AI agent monitoring must go deeper because agents make autonomous decisions. A compromised or misbehaving agent might have perfect latency and zero HTTP errors while systematically leaking data through legitimate tool calls. Behavioral monitoring watches what the agent does, not just whether it responds.
This post builds a monitoring system that establishes behavioral baselines, detects anomalies in real-time, and can automatically shut down agents exhibiting dangerous behavior.
Defining Behavioral Metrics
Start by defining the signals you need to track:
flowchart TD
START["Monitoring AI Agent Behavior: Detecting Anomalies…"] --> A
A["Why Agent Monitoring Differs from API M…"]
A --> B
B["Defining Behavioral Metrics"]
B --> C
C["Real-Time Anomaly Detector"]
C --> D
D["Automated Circuit Breaker"]
D --> E
E["Integrating Monitoring Into the Agent L…"]
E --> F
F["Dashboard Metrics to Track"]
F --> G
G["FAQ"]
G --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
from enum import Enum
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class AgentEvent:
timestamp: datetime
session_id: str
user_id: str
agent_name: str
event_type: str # "tool_call", "llm_call", "handoff", "response"
tool_name: Optional[str] = None
tokens_used: int = 0
latency_ms: int = 0
success: bool = True
metadata: dict = field(default_factory=dict)
@dataclass
class BehavioralBaseline:
"""Expected behavior ranges computed from historical data."""
avg_tools_per_session: float = 3.0
max_tools_per_session: int = 15
avg_llm_calls_per_session: float = 4.0
max_llm_calls_per_session: int = 20
avg_session_duration_seconds: float = 120.0
max_session_duration_seconds: int = 600
avg_tokens_per_session: int = 3000
max_tokens_per_session: int = 20000
common_tool_sequences: list[list[str]] = field(default_factory=list)
sensitive_tools: set[str] = field(default_factory=lambda: {
"get_customer_pii", "process_refund", "modify_account",
"delete_record", "send_email", "execute_query",
})
Real-Time Anomaly Detector
The anomaly detector tracks session-level metrics and compares them against baselines:
import time
from collections import defaultdict
class AnomalyDetector:
def __init__(self, baseline: BehavioralBaseline):
self.baseline = baseline
self.active_sessions: dict[str, list[AgentEvent]] = defaultdict(list)
self.alerts: list[dict] = []
def record_event(self, event: AgentEvent) -> list[dict]:
"""Record an event and return any triggered alerts."""
self.active_sessions[event.session_id].append(event)
new_alerts = self._check_anomalies(event.session_id, event)
for alert in new_alerts:
self.alerts.append(alert)
return new_alerts
def _check_anomalies(
self,
session_id: str,
latest_event: AgentEvent,
) -> list[dict]:
alerts = []
events = self.active_sessions[session_id]
# Check 1: Excessive tool calls
tool_calls = [e for e in events if e.event_type == "tool_call"]
if len(tool_calls) > self.baseline.max_tools_per_session:
alerts.append(self._create_alert(
severity=AlertSeverity.WARNING,
session_id=session_id,
rule="excessive_tool_calls",
message=f"Session has {len(tool_calls)} tool calls "
f"(max: {self.baseline.max_tools_per_session})",
))
# Check 2: Excessive LLM calls (possible infinite loop)
llm_calls = [e for e in events if e.event_type == "llm_call"]
if len(llm_calls) > self.baseline.max_llm_calls_per_session:
alerts.append(self._create_alert(
severity=AlertSeverity.CRITICAL,
session_id=session_id,
rule="possible_infinite_loop",
message=f"Session has {len(llm_calls)} LLM calls — possible loop",
))
# Check 3: Rapid sensitive tool access
alerts.extend(self._check_sensitive_tool_burst(session_id, events))
# Check 4: Token consumption spike
total_tokens = sum(e.tokens_used for e in events)
if total_tokens > self.baseline.max_tokens_per_session:
alerts.append(self._create_alert(
severity=AlertSeverity.WARNING,
session_id=session_id,
rule="token_budget_exceeded",
message=f"Session consumed {total_tokens} tokens "
f"(max: {self.baseline.max_tokens_per_session})",
))
return alerts
def _check_sensitive_tool_burst(
self,
session_id: str,
events: list[AgentEvent],
) -> list[dict]:
"""Detect rapid succession of sensitive tool calls."""
sensitive_calls = [
e for e in events
if e.event_type == "tool_call"
and e.tool_name in self.baseline.sensitive_tools
]
if len(sensitive_calls) < 3:
return []
# Check if 3+ sensitive calls happened within 10 seconds
for i in range(len(sensitive_calls) - 2):
window = sensitive_calls[i:i + 3]
time_span = (window[-1].timestamp - window[0].timestamp).total_seconds()
if time_span < 10:
return [self._create_alert(
severity=AlertSeverity.CRITICAL,
session_id=session_id,
rule="sensitive_tool_burst",
message=f"3 sensitive tool calls in {time_span:.1f}s: "
f"{[e.tool_name for e in window]}",
)]
return []
def _create_alert(
self,
severity: AlertSeverity,
session_id: str,
rule: str,
message: str,
) -> dict:
return {
"timestamp": datetime.now(timezone.utc).isoformat(),
"severity": severity.value,
"session_id": session_id,
"rule": rule,
"message": message,
}
Automated Circuit Breaker
When critical anomalies are detected, the circuit breaker automatically stops the agent:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
class AgentCircuitBreaker:
"""Automatically shut down agent sessions exhibiting dangerous behavior."""
def __init__(self, detector: AnomalyDetector):
self.detector = detector
self.killed_sessions: set[str] = set()
self.kill_rules = {
"possible_infinite_loop",
"sensitive_tool_burst",
}
def should_continue(self, session_id: str) -> bool:
"""Check if a session should be allowed to continue."""
if session_id in self.killed_sessions:
return False
session_alerts = [
a for a in self.detector.alerts
if a["session_id"] == session_id
and a["severity"] == "critical"
and a["rule"] in self.kill_rules
]
if session_alerts:
self.killed_sessions.add(session_id)
self._notify_operators(session_id, session_alerts)
return False
return True
def _notify_operators(self, session_id: str, alerts: list[dict]) -> None:
"""Send notification to on-call team about killed session."""
print(f"CIRCUIT BREAKER: Session {session_id} terminated")
for alert in alerts:
print(f" Reason: {alert['message']}")
Integrating Monitoring Into the Agent Loop
class MonitoredAgentRunner:
"""Wrap agent execution with behavioral monitoring."""
def __init__(self, agent, detector: AnomalyDetector, breaker: AgentCircuitBreaker):
self.agent = agent
self.detector = detector
self.breaker = breaker
async def run(self, session_id: str, user_id: str, user_input: str) -> str:
if not self.breaker.should_continue(session_id):
return "This session has been terminated due to unusual activity."
event = AgentEvent(
timestamp=datetime.now(timezone.utc),
session_id=session_id,
user_id=user_id,
agent_name=self.agent.name,
event_type="llm_call",
)
alerts = self.detector.record_event(event)
if not self.breaker.should_continue(session_id):
return "This session has been terminated due to unusual activity."
# Run the actual agent (simplified)
from agents import Runner
result = await Runner.run(self.agent, user_input)
return result.final_output
# Setup
baseline = BehavioralBaseline()
detector = AnomalyDetector(baseline)
breaker = AgentCircuitBreaker(detector)
Dashboard Metrics to Track
Beyond real-time alerting, track these metrics on your observability dashboard for trend analysis:
class MetricsCollector:
"""Collect aggregate metrics for dashboard visualization."""
def compute_session_metrics(self, events: list[AgentEvent]) -> dict:
tool_calls = [e for e in events if e.event_type == "tool_call"]
llm_calls = [e for e in events if e.event_type == "llm_call"]
return {
"total_tool_calls": len(tool_calls),
"total_llm_calls": len(llm_calls),
"total_tokens": sum(e.tokens_used for e in events),
"avg_latency_ms": (
sum(e.latency_ms for e in events) / len(events)
if events else 0
),
"unique_tools_used": len(set(e.tool_name for e in tool_calls if e.tool_name)),
"error_rate": (
sum(1 for e in events if not e.success) / len(events)
if events else 0
),
"sensitive_tool_calls": len([
e for e in tool_calls
if e.tool_name in BehavioralBaseline().sensitive_tools
]),
}
FAQ
How do I establish behavioral baselines for a new agent?
Run the agent in a controlled environment with representative test queries for at least one week to collect baseline data. Use the 95th percentile of each metric as your initial max thresholds. After deploying to production, refine baselines using real traffic data. Recalculate baselines monthly to account for usage pattern changes as features evolve and the user base grows.
What is the false positive rate for anomaly detection?
It depends on how tight your thresholds are. Starting with 95th percentile thresholds typically yields a 2-5% false positive rate on individual alerts. The circuit breaker pattern reduces the impact of false positives by requiring multiple critical alerts before killing a session. Monitor your false positive rate weekly and adjust thresholds. It is better to start with loose thresholds and tighten them gradually than to start tight and overwhelm operators with false alerts.
Should monitoring run in-process or as a separate service?
For low-latency requirements, run lightweight checks (event counting, threshold comparisons) in-process. For expensive checks (ML-based anomaly detection, pattern analysis across sessions), offload to a separate service via an event queue. The pattern shown in this post works in-process for fast checks, but the notification and dashboard components should be decoupled from the agent process.
#Monitoring #AnomalyDetection #AISafety #Observability #Python #AgenticAI #LearnAI #AIEngineering
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.