Skip to content
Learn Agentic AI12 min read0 views

Incident Response for AI Agent Breaches: Detection, Containment, and Recovery

Build a comprehensive incident response plan for AI agent security breaches, including detection signals, automated containment, investigation procedures, recovery steps, and post-mortem processes for continuous improvement.

Agent Incidents Are Different

Traditional incident response plans assume compromised servers, stolen credentials, or network intrusions. AI agent incidents introduce new failure modes: prompt injection causing unauthorized actions, hallucination-driven data corruption, tool misuse escalating to privilege escalation, and adversarial manipulation of agent reasoning.

The unique challenge is that agent incidents can be subtle. A prompt-injected agent may appear to function normally while quietly exfiltrating data through its legitimate tool calls. Detection requires monitoring not just infrastructure metrics but agent behavior patterns.

Detection Signals

Build a detection system that monitors agent behavior across multiple dimensions: tool call patterns, output characteristics, resource usage, and interaction anomalies:

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from collections import defaultdict


@dataclass
class AgentEvent:
    timestamp: datetime
    agent_id: str
    event_type: str  # "tool_call", "output", "error", "auth"
    details: dict


@dataclass
class Alert:
    severity: str  # "low", "medium", "high", "critical"
    signal: str
    agent_id: str
    description: str
    events: list[AgentEvent]
    timestamp: datetime = field(default_factory=datetime.utcnow)


class AgentAnomalyDetector:
    """Detects suspicious agent behavior patterns."""

    def __init__(self):
        self.event_history: dict[str, list[AgentEvent]] = defaultdict(list)
        self.alerts: list[Alert] = []
        self.baselines: dict[str, dict] = {}

    def ingest_event(self, event: AgentEvent) -> Alert | None:
        """Process an agent event and check for anomalies."""
        self.event_history[event.agent_id].append(event)

        checks = [
            self._check_tool_call_burst,
            self._check_unusual_tool_sequence,
            self._check_data_exfiltration_pattern,
            self._check_error_spike,
        ]

        for check in checks:
            alert = check(event)
            if alert:
                self.alerts.append(alert)
                return alert
        return None

    def _check_tool_call_burst(self, event: AgentEvent) -> Alert | None:
        """Detect unusually high tool call frequency."""
        if event.event_type != "tool_call":
            return None

        recent = [
            e for e in self.event_history[event.agent_id]
            if e.event_type == "tool_call"
            and e.timestamp > datetime.utcnow() - timedelta(minutes=1)
        ]

        baseline = self.baselines.get(event.agent_id, {})
        normal_rate = baseline.get("tool_calls_per_minute", 10)

        if len(recent) > normal_rate * 3:
            return Alert(
                severity="high",
                signal="tool_call_burst",
                agent_id=event.agent_id,
                description=(
                    f"Agent made {len(recent)} tool calls in 1 minute "
                    f"(baseline: {normal_rate})"
                ),
                events=recent[-10:],
            )
        return None

    def _check_unusual_tool_sequence(self, event: AgentEvent) -> Alert | None:
        """Detect tool call sequences that suggest compromise."""
        if event.event_type != "tool_call":
            return None

        history = self.event_history[event.agent_id]
        recent_tools = [
            e.details.get("tool_name")
            for e in history[-5:]
            if e.event_type == "tool_call"
        ]

        # Flag: reading secrets then making network calls
        suspicious_sequences = [
            ["read_secret", "http_request"],
            ["database_query", "send_email"],
            ["file_read", "http_request"],
        ]

        for seq in suspicious_sequences:
            if self._is_subsequence(seq, recent_tools):
                return Alert(
                    severity="critical",
                    signal="suspicious_tool_sequence",
                    agent_id=event.agent_id,
                    description=f"Suspicious tool sequence: {recent_tools}",
                    events=history[-5:],
                )
        return None

    def _check_data_exfiltration_pattern(self, event: AgentEvent) -> Alert | None:
        """Detect patterns suggesting data exfiltration."""
        if event.event_type != "tool_call":
            return None

        tool = event.details.get("tool_name", "")
        params = event.details.get("parameters", {})

        if tool in ("http_request", "send_email", "webhook_call"):
            payload_size = len(str(params.get("body", "")))
            if payload_size > 10000:
                return Alert(
                    severity="critical",
                    signal="data_exfiltration",
                    agent_id=event.agent_id,
                    description=(
                        f"Large outbound payload ({payload_size} bytes) "
                        f"via {tool}"
                    ),
                    events=[event],
                )
        return None

    def _check_error_spike(self, event: AgentEvent) -> Alert | None:
        """Detect sudden increase in agent errors (may indicate attack probing)."""
        if event.event_type != "error":
            return None

        recent_errors = [
            e for e in self.event_history[event.agent_id]
            if e.event_type == "error"
            and e.timestamp > datetime.utcnow() - timedelta(minutes=5)
        ]

        if len(recent_errors) > 20:
            return Alert(
                severity="medium",
                signal="error_spike",
                agent_id=event.agent_id,
                description=f"{len(recent_errors)} errors in 5 minutes",
                events=recent_errors[-5:],
            )
        return None

    @staticmethod
    def _is_subsequence(subseq: list, seq: list) -> bool:
        it = iter(seq)
        return all(item in it for item in subseq)

Automated Containment

When a critical alert fires, automatically isolate the compromised agent before a human responder arrives. Speed is essential — a compromised agent can cause significant damage in minutes:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import asyncio
import logging

logger = logging.getLogger("incident_response")


class AgentContainment:
    """Automated containment actions for compromised agents."""

    def __init__(self, agent_registry, tool_registry, network_controller):
        self.agent_registry = agent_registry
        self.tool_registry = tool_registry
        self.network_controller = network_controller

    async def contain(self, agent_id: str, alert: Alert) -> dict:
        """Execute containment based on alert severity."""
        actions_taken = []

        if alert.severity in ("critical", "high"):
            # Revoke all tool permissions immediately
            await self.tool_registry.revoke_all(agent_id)
            actions_taken.append("revoked_tool_permissions")

            # Block network access
            await self.network_controller.isolate(agent_id)
            actions_taken.append("network_isolated")

            # Pause agent execution
            await self.agent_registry.pause(agent_id)
            actions_taken.append("agent_paused")

            # Rotate any credentials the agent had access to
            creds = await self.agent_registry.get_credentials(agent_id)
            for cred in creds:
                await cred.rotate()
            actions_taken.append(f"rotated_{len(creds)}_credentials")

        elif alert.severity == "medium":
            # Restrict to read-only tools
            await self.tool_registry.restrict_to_readonly(agent_id)
            actions_taken.append("restricted_to_readonly")

        # Log containment actions
        logger.critical(
            "CONTAINMENT EXECUTED",
            extra={
                "agent_id": agent_id,
                "alert": alert.signal,
                "actions": actions_taken,
            },
        )

        return {
            "agent_id": agent_id,
            "contained": True,
            "actions": actions_taken,
        }

Investigation and Recovery

After containment, investigate the root cause by analyzing the agent's full event history, tool call logs, and LLM conversation traces:

class IncidentInvestigator:
    """Gathers evidence and produces an investigation report."""

    def __init__(self, event_store, log_store):
        self.event_store = event_store
        self.log_store = log_store

    async def investigate(self, agent_id: str, alert: Alert) -> dict:
        """Compile a comprehensive incident report."""
        # Gather all events in the time window around the alert
        start = alert.timestamp - timedelta(hours=1)
        end = alert.timestamp + timedelta(minutes=5)

        events = await self.event_store.query(
            agent_id=agent_id, start=start, end=end
        )
        logs = await self.log_store.query(
            agent_id=agent_id, start=start, end=end
        )

        # Identify the initial compromise point
        tool_calls = [e for e in events if e.event_type == "tool_call"]
        first_suspicious = self._find_first_anomaly(tool_calls)

        return {
            "incident_id": f"INC-{alert.timestamp.strftime('%Y%m%d%H%M%S')}",
            "agent_id": agent_id,
            "alert": alert.signal,
            "timeline": {
                "first_anomaly": first_suspicious,
                "alert_triggered": alert.timestamp.isoformat(),
                "total_events": len(events),
            },
            "affected_tools": list({
                e.details.get("tool_name") for e in tool_calls
            }),
            "recommended_actions": self._recommend_actions(alert),
        }

    def _find_first_anomaly(self, events: list[AgentEvent]):
        # Simplified: return the earliest event in the window
        return events[0].timestamp.isoformat() if events else None

    def _recommend_actions(self, alert: Alert) -> list[str]:
        actions = ["Review agent system prompt for injection vulnerabilities"]
        if alert.signal == "data_exfiltration":
            actions.append("Audit all data accessed by agent in last 24 hours")
            actions.append("Notify affected data owners")
        if alert.signal == "suspicious_tool_sequence":
            actions.append("Review tool permission policies")
        actions.append("Conduct post-mortem within 48 hours")
        return actions

FAQ

How quickly should automated containment activate?

Target under 30 seconds from detection to containment for critical alerts. The containment pipeline should be fully automated with no human approval required for the initial response. Pre-stage containment actions (network rules, permission revocations) so they execute as API calls rather than manual configuration changes.

Should I shut down all agents when one is compromised?

Not automatically. Isolate the compromised agent and assess whether the attack vector could affect other agents. If the breach exploited a shared vulnerability (such as a common tool or a system prompt weakness), temporarily restrict similar agents while you patch the vulnerability. Full system shutdown should be a manual decision made during investigation.

What should be included in the post-mortem?

Every post-mortem should cover: the timeline of events from initial compromise to full recovery, the root cause analysis, the effectiveness of detection and containment, what data or systems were affected, what changes will prevent recurrence, and action items with owners and deadlines. Publish post-mortems internally to build organizational knowledge about agent security threats.


#IncidentResponse #AISecurity #BreachDetection #AgentMonitoring #SecurityOperations #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.