Incident Response for AI Agent Breaches: Detection, Containment, and Recovery
Build a comprehensive incident response plan for AI agent security breaches, including detection signals, automated containment, investigation procedures, recovery steps, and post-mortem processes for continuous improvement.
Agent Incidents Are Different
Traditional incident response plans assume compromised servers, stolen credentials, or network intrusions. AI agent incidents introduce new failure modes: prompt injection causing unauthorized actions, hallucination-driven data corruption, tool misuse escalating to privilege escalation, and adversarial manipulation of agent reasoning.
The unique challenge is that agent incidents can be subtle. A prompt-injected agent may appear to function normally while quietly exfiltrating data through its legitimate tool calls. Detection requires monitoring not just infrastructure metrics but agent behavior patterns.
Detection Signals
Build a detection system that monitors agent behavior across multiple dimensions: tool call patterns, output characteristics, resource usage, and interaction anomalies:
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from collections import defaultdict
@dataclass
class AgentEvent:
timestamp: datetime
agent_id: str
event_type: str # "tool_call", "output", "error", "auth"
details: dict
@dataclass
class Alert:
severity: str # "low", "medium", "high", "critical"
signal: str
agent_id: str
description: str
events: list[AgentEvent]
timestamp: datetime = field(default_factory=datetime.utcnow)
class AgentAnomalyDetector:
"""Detects suspicious agent behavior patterns."""
def __init__(self):
self.event_history: dict[str, list[AgentEvent]] = defaultdict(list)
self.alerts: list[Alert] = []
self.baselines: dict[str, dict] = {}
def ingest_event(self, event: AgentEvent) -> Alert | None:
"""Process an agent event and check for anomalies."""
self.event_history[event.agent_id].append(event)
checks = [
self._check_tool_call_burst,
self._check_unusual_tool_sequence,
self._check_data_exfiltration_pattern,
self._check_error_spike,
]
for check in checks:
alert = check(event)
if alert:
self.alerts.append(alert)
return alert
return None
def _check_tool_call_burst(self, event: AgentEvent) -> Alert | None:
"""Detect unusually high tool call frequency."""
if event.event_type != "tool_call":
return None
recent = [
e for e in self.event_history[event.agent_id]
if e.event_type == "tool_call"
and e.timestamp > datetime.utcnow() - timedelta(minutes=1)
]
baseline = self.baselines.get(event.agent_id, {})
normal_rate = baseline.get("tool_calls_per_minute", 10)
if len(recent) > normal_rate * 3:
return Alert(
severity="high",
signal="tool_call_burst",
agent_id=event.agent_id,
description=(
f"Agent made {len(recent)} tool calls in 1 minute "
f"(baseline: {normal_rate})"
),
events=recent[-10:],
)
return None
def _check_unusual_tool_sequence(self, event: AgentEvent) -> Alert | None:
"""Detect tool call sequences that suggest compromise."""
if event.event_type != "tool_call":
return None
history = self.event_history[event.agent_id]
recent_tools = [
e.details.get("tool_name")
for e in history[-5:]
if e.event_type == "tool_call"
]
# Flag: reading secrets then making network calls
suspicious_sequences = [
["read_secret", "http_request"],
["database_query", "send_email"],
["file_read", "http_request"],
]
for seq in suspicious_sequences:
if self._is_subsequence(seq, recent_tools):
return Alert(
severity="critical",
signal="suspicious_tool_sequence",
agent_id=event.agent_id,
description=f"Suspicious tool sequence: {recent_tools}",
events=history[-5:],
)
return None
def _check_data_exfiltration_pattern(self, event: AgentEvent) -> Alert | None:
"""Detect patterns suggesting data exfiltration."""
if event.event_type != "tool_call":
return None
tool = event.details.get("tool_name", "")
params = event.details.get("parameters", {})
if tool in ("http_request", "send_email", "webhook_call"):
payload_size = len(str(params.get("body", "")))
if payload_size > 10000:
return Alert(
severity="critical",
signal="data_exfiltration",
agent_id=event.agent_id,
description=(
f"Large outbound payload ({payload_size} bytes) "
f"via {tool}"
),
events=[event],
)
return None
def _check_error_spike(self, event: AgentEvent) -> Alert | None:
"""Detect sudden increase in agent errors (may indicate attack probing)."""
if event.event_type != "error":
return None
recent_errors = [
e for e in self.event_history[event.agent_id]
if e.event_type == "error"
and e.timestamp > datetime.utcnow() - timedelta(minutes=5)
]
if len(recent_errors) > 20:
return Alert(
severity="medium",
signal="error_spike",
agent_id=event.agent_id,
description=f"{len(recent_errors)} errors in 5 minutes",
events=recent_errors[-5:],
)
return None
@staticmethod
def _is_subsequence(subseq: list, seq: list) -> bool:
it = iter(seq)
return all(item in it for item in subseq)
Automated Containment
When a critical alert fires, automatically isolate the compromised agent before a human responder arrives. Speed is essential — a compromised agent can cause significant damage in minutes:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import asyncio
import logging
logger = logging.getLogger("incident_response")
class AgentContainment:
"""Automated containment actions for compromised agents."""
def __init__(self, agent_registry, tool_registry, network_controller):
self.agent_registry = agent_registry
self.tool_registry = tool_registry
self.network_controller = network_controller
async def contain(self, agent_id: str, alert: Alert) -> dict:
"""Execute containment based on alert severity."""
actions_taken = []
if alert.severity in ("critical", "high"):
# Revoke all tool permissions immediately
await self.tool_registry.revoke_all(agent_id)
actions_taken.append("revoked_tool_permissions")
# Block network access
await self.network_controller.isolate(agent_id)
actions_taken.append("network_isolated")
# Pause agent execution
await self.agent_registry.pause(agent_id)
actions_taken.append("agent_paused")
# Rotate any credentials the agent had access to
creds = await self.agent_registry.get_credentials(agent_id)
for cred in creds:
await cred.rotate()
actions_taken.append(f"rotated_{len(creds)}_credentials")
elif alert.severity == "medium":
# Restrict to read-only tools
await self.tool_registry.restrict_to_readonly(agent_id)
actions_taken.append("restricted_to_readonly")
# Log containment actions
logger.critical(
"CONTAINMENT EXECUTED",
extra={
"agent_id": agent_id,
"alert": alert.signal,
"actions": actions_taken,
},
)
return {
"agent_id": agent_id,
"contained": True,
"actions": actions_taken,
}
Investigation and Recovery
After containment, investigate the root cause by analyzing the agent's full event history, tool call logs, and LLM conversation traces:
class IncidentInvestigator:
"""Gathers evidence and produces an investigation report."""
def __init__(self, event_store, log_store):
self.event_store = event_store
self.log_store = log_store
async def investigate(self, agent_id: str, alert: Alert) -> dict:
"""Compile a comprehensive incident report."""
# Gather all events in the time window around the alert
start = alert.timestamp - timedelta(hours=1)
end = alert.timestamp + timedelta(minutes=5)
events = await self.event_store.query(
agent_id=agent_id, start=start, end=end
)
logs = await self.log_store.query(
agent_id=agent_id, start=start, end=end
)
# Identify the initial compromise point
tool_calls = [e for e in events if e.event_type == "tool_call"]
first_suspicious = self._find_first_anomaly(tool_calls)
return {
"incident_id": f"INC-{alert.timestamp.strftime('%Y%m%d%H%M%S')}",
"agent_id": agent_id,
"alert": alert.signal,
"timeline": {
"first_anomaly": first_suspicious,
"alert_triggered": alert.timestamp.isoformat(),
"total_events": len(events),
},
"affected_tools": list({
e.details.get("tool_name") for e in tool_calls
}),
"recommended_actions": self._recommend_actions(alert),
}
def _find_first_anomaly(self, events: list[AgentEvent]):
# Simplified: return the earliest event in the window
return events[0].timestamp.isoformat() if events else None
def _recommend_actions(self, alert: Alert) -> list[str]:
actions = ["Review agent system prompt for injection vulnerabilities"]
if alert.signal == "data_exfiltration":
actions.append("Audit all data accessed by agent in last 24 hours")
actions.append("Notify affected data owners")
if alert.signal == "suspicious_tool_sequence":
actions.append("Review tool permission policies")
actions.append("Conduct post-mortem within 48 hours")
return actions
FAQ
How quickly should automated containment activate?
Target under 30 seconds from detection to containment for critical alerts. The containment pipeline should be fully automated with no human approval required for the initial response. Pre-stage containment actions (network rules, permission revocations) so they execute as API calls rather than manual configuration changes.
Should I shut down all agents when one is compromised?
Not automatically. Isolate the compromised agent and assess whether the attack vector could affect other agents. If the breach exploited a shared vulnerability (such as a common tool or a system prompt weakness), temporarily restrict similar agents while you patch the vulnerability. Full system shutdown should be a manual decision made during investigation.
What should be included in the post-mortem?
Every post-mortem should cover: the timeline of events from initial compromise to full recovery, the root cause analysis, the effectiveness of detection and containment, what data or systems were affected, what changes will prevent recurrence, and action items with owners and deadlines. Publish post-mortems internally to build organizational knowledge about agent security threats.
#IncidentResponse #AISecurity #BreachDetection #AgentMonitoring #SecurityOperations #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.