Building a Chaos Engineering Agent: AI-Driven Resilience Testing
Learn how to build an AI agent that designs chaos experiments, controls blast radius, injects faults into production systems, observes behavior, and verifies automated recovery.
Why Chaos Engineering Needs Intelligence
Traditional chaos engineering tools inject random faults. Kill a pod. Add latency. Fill a disk. But randomness wastes time testing failures your system already handles well. An AI chaos engineering agent is strategic. It analyzes your architecture, identifies the weakest points, designs experiments that test specific hypotheses, and verifies that recovery actually works.
Experiment Design
The agent designs chaos experiments based on the system architecture and past incident history.
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class FaultType(Enum):
POD_KILL = "pod_kill"
NETWORK_LATENCY = "network_latency"
NETWORK_PARTITION = "network_partition"
CPU_STRESS = "cpu_stress"
MEMORY_STRESS = "memory_stress"
DISK_FILL = "disk_fill"
DNS_FAILURE = "dns_failure"
@dataclass
class ChaosExperiment:
experiment_id: str
name: str
hypothesis: str
fault_type: FaultType
target_service: str
target_namespace: str
blast_radius: str # "single_pod", "service", "namespace"
duration_seconds: int
parameters: dict = field(default_factory=dict)
steady_state_checks: list[dict] = field(default_factory=list)
abort_conditions: list[dict] = field(default_factory=list)
import openai
import json
async def design_experiment(
architecture: dict, past_incidents: list[dict]
) -> ChaosExperiment:
client = openai.AsyncOpenAI()
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "user",
"content": f"""Design a chaos engineering experiment.
Architecture:
{json.dumps(architecture, indent=2)}
Past incidents (to avoid re-testing known weaknesses already fixed):
{json.dumps(past_incidents[:5], indent=2)}
Design an experiment that:
1. Tests a realistic failure mode
2. Has a clear hypothesis about expected behavior
3. Minimizes blast radius
4. Includes abort conditions to prevent real outages
Return JSON with: name, hypothesis, fault_type (one of: pod_kill,
network_latency, network_partition, cpu_stress, memory_stress,
disk_fill, dns_failure), target_service, blast_radius, duration_seconds,
parameters, steady_state_checks (list of PromQL queries with expected ranges),
abort_conditions (list of PromQL queries that trigger immediate halt)."""
}],
response_format={"type": "json_object"},
temperature=0.3,
)
data = json.loads(response.choices[0].message.content)
return ChaosExperiment(
experiment_id=f"chaos-{int(datetime.utcnow().timestamp())}",
name=data["name"],
hypothesis=data["hypothesis"],
fault_type=FaultType(data["fault_type"]),
target_service=data["target_service"],
target_namespace=architecture.get("namespace", "default"),
blast_radius=data["blast_radius"],
duration_seconds=data["duration_seconds"],
parameters=data.get("parameters", {}),
steady_state_checks=data.get("steady_state_checks", []),
abort_conditions=data.get("abort_conditions", []),
)
Blast Radius Control
The most critical part of chaos engineering is preventing experiments from becoming real incidents. The agent enforces strict blast radius limits.
from datetime import datetime
class BlastRadiusController:
def __init__(self, max_affected_pods: int = 1, excluded_namespaces: list = None):
self.max_affected_pods = max_affected_pods
self.excluded_namespaces = excluded_namespaces or [
"kube-system", "monitoring", "istio-system"
]
self.active_experiments: list[str] = []
def validate_experiment(self, experiment: ChaosExperiment) -> tuple[bool, str]:
# Never touch system namespaces
if experiment.target_namespace in self.excluded_namespaces:
return False, f"Namespace {experiment.target_namespace} is protected"
# Only one experiment at a time
if self.active_experiments:
return False, "Another experiment is already running"
# Duration limits
if experiment.duration_seconds > 300:
return False, "Experiment duration exceeds 5-minute maximum"
# Blast radius check
if experiment.blast_radius == "namespace":
return False, "Namespace-wide fault not permitted in production"
return True, "Approved"
def register_experiment(self, experiment_id: str):
self.active_experiments.append(experiment_id)
def deregister_experiment(self, experiment_id: str):
self.active_experiments.remove(experiment_id)
Fault Injection Engine
The agent generates Kubernetes-native fault injection manifests using Chaos Mesh or LitmusChaos CRDs.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import yaml
class FaultInjector:
def generate_chaos_mesh_manifest(
self, experiment: ChaosExperiment
) -> dict:
if experiment.fault_type == FaultType.POD_KILL:
return {
"apiVersion": "chaos-mesh.org/v1alpha1",
"kind": "PodChaos",
"metadata": {
"name": experiment.experiment_id,
"namespace": experiment.target_namespace,
},
"spec": {
"action": "pod-kill",
"mode": "one",
"selector": {
"labelSelectors": {
"app": experiment.target_service,
},
},
"duration": f"{experiment.duration_seconds}s",
},
}
elif experiment.fault_type == FaultType.NETWORK_LATENCY:
latency_ms = experiment.parameters.get("latency_ms", 500)
return {
"apiVersion": "chaos-mesh.org/v1alpha1",
"kind": "NetworkChaos",
"metadata": {
"name": experiment.experiment_id,
"namespace": experiment.target_namespace,
},
"spec": {
"action": "delay",
"mode": "one",
"selector": {
"labelSelectors": {
"app": experiment.target_service,
},
},
"delay": {
"latency": f"{latency_ms}ms",
"jitter": "50ms",
"correlation": "50",
},
"duration": f"{experiment.duration_seconds}s",
},
}
elif experiment.fault_type == FaultType.CPU_STRESS:
return {
"apiVersion": "chaos-mesh.org/v1alpha1",
"kind": "StressChaos",
"metadata": {
"name": experiment.experiment_id,
"namespace": experiment.target_namespace,
},
"spec": {
"mode": "one",
"selector": {
"labelSelectors": {
"app": experiment.target_service,
},
},
"stressors": {
"cpu": {
"workers": experiment.parameters.get("workers", 2),
"load": experiment.parameters.get("load", 80),
},
},
"duration": f"{experiment.duration_seconds}s",
},
}
async def inject(self, experiment: ChaosExperiment) -> bool:
manifest = self.generate_chaos_mesh_manifest(experiment)
manifest_yaml = yaml.dump(manifest)
import subprocess
result = subprocess.run(
["kubectl", "apply", "-f", "-"],
input=manifest_yaml, capture_output=True, text=True,
)
return result.returncode == 0
Observation and Recovery Verification
The agent monitors steady-state metrics during the experiment and verifies the system recovers after the fault is removed.
import asyncio
import httpx
class ExperimentObserver:
def __init__(self, prometheus_url: str):
self.prom_url = prometheus_url
self.http = httpx.AsyncClient(timeout=10)
async def check_steady_state(
self, checks: list[dict]
) -> tuple[bool, list[str]]:
violations = []
for check in checks:
query = check["query"]
min_val = check.get("min")
max_val = check.get("max")
resp = await self.http.get(
f"{self.prom_url}/api/v1/query",
params={"query": query},
)
result = resp.json()["data"]["result"]
if not result:
violations.append(f"No data for: {query}")
continue
value = float(result[0]["value"][1])
if min_val is not None and value < min_val:
violations.append(f"{query} = {value} (below min {min_val})")
if max_val is not None and value > max_val:
violations.append(f"{query} = {value} (above max {max_val})")
return len(violations) == 0, violations
async def run_experiment_loop(
self, experiment: ChaosExperiment, check_interval: int = 10
) -> dict:
results = {"violations": [], "aborted": False, "recovered": False}
elapsed = 0
while elapsed < experiment.duration_seconds:
healthy, violations = await self.check_steady_state(
experiment.steady_state_checks
)
if violations:
results["violations"].extend(violations)
_, abort_violations = await self.check_steady_state(
experiment.abort_conditions
)
if abort_violations:
results["aborted"] = True
await self._abort_experiment(experiment)
break
await asyncio.sleep(check_interval)
elapsed += check_interval
# Post-experiment: verify recovery within 60 seconds
for _ in range(6):
await asyncio.sleep(10)
healthy, _ = await self.check_steady_state(
experiment.steady_state_checks
)
if healthy:
results["recovered"] = True
break
return results
async def _abort_experiment(self, experiment: ChaosExperiment):
import subprocess
subprocess.run([
"kubectl", "delete", "podchaos,networkchaos,stresschaos",
experiment.experiment_id,
"-n", experiment.target_namespace,
], capture_output=True)
FAQ
How do I convince my team to run chaos experiments in production?
Start in staging with the agent in "report-only" mode where it designs experiments but only simulates results. Once the team sees the value of the insights, move to production with strict blast radius controls: single-pod only, 60-second maximum duration, and automatic abort on any user-facing degradation. The agent builds confidence gradually.
What if the abort mechanism itself fails?
Implement a hardware timer. The Chaos Mesh duration field ensures the CRD expires automatically even if the agent crashes. Additionally, run a separate watchdog process that queries for active chaos experiments older than the maximum allowed duration and deletes them unconditionally.
How does the agent decide which experiments to run next?
The agent maintains a coverage map of tested failure modes per service. It prioritizes untested combinations: if the payment service has been tested for pod-kill but never for network latency, network latency gets priority. It also weighs services by business criticality and recent change frequency since recently modified code is more likely to have resilience gaps.
#ChaosEngineering #Resilience #SRE #FaultInjection #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.