Building Resilient AI Agents: Circuit Breakers, Retries, and Graceful Degradation
Production resilience patterns for AI agents: circuit breakers for LLM APIs, exponential backoff with jitter, fallback models, and graceful degradation strategies.
Why Resilience Matters for AI Agents
AI agents depend on external services that fail. LLM APIs experience rate limits, timeouts, and outages. Tool servers crash. Databases become unreachable. A production agent that lacks resilience patterns will fail catastrophically when any dependency hiccups — and in a system that chains multiple LLM calls and tool executions, the probability of at least one failure per request is significant.
Consider an agent that makes 5 tool calls per request, each with 99% reliability. The probability that all 5 succeed is 0.99 to the power of 5, which is 95.1%. That means roughly 1 in 20 requests will encounter at least one failure. Without resilience patterns, those requests fail completely. With proper retries, circuit breakers, and fallbacks, you can push the effective reliability back above 99.9%.
Pattern 1: Retry with Exponential Backoff and Jitter
The most fundamental resilience pattern. When a call fails, wait and try again — but do it intelligently.
# resilience/retry.py
import asyncio
import random
import time
from functools import wraps
from typing import Type
class RetryConfig:
def __init__(
self,
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
retryable_exceptions: tuple[Type[Exception], ...] = (Exception,),
):
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
self.retryable_exceptions = retryable_exceptions
def calculate_delay(attempt: int, config: RetryConfig) -> float:
"""Calculate delay with exponential backoff and optional jitter."""
delay = config.base_delay * (config.exponential_base ** attempt)
delay = min(delay, config.max_delay)
if config.jitter:
# Full jitter: random value between 0 and the calculated delay
delay = random.uniform(0, delay)
return delay
def retry_async(config: RetryConfig = None):
"""Decorator for async functions with retry logic."""
if config is None:
config = RetryConfig()
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(config.max_attempts):
try:
return await func(*args, **kwargs)
except config.retryable_exceptions as e:
last_exception = e
if attempt < config.max_attempts - 1:
delay = calculate_delay(attempt, config)
print(
f"Attempt {attempt + 1} failed: {e}. "
f"Retrying in {delay:.2f}s..."
)
await asyncio.sleep(delay)
else:
print(f"All {config.max_attempts} attempts failed.")
raise last_exception
return wrapper
return decorator
Why Jitter Matters
Without jitter, when a service recovers from an outage, all clients retry at exactly the same time — creating a thundering herd that immediately overloads the service again. Jitter spreads retries over time, giving the service room to recover.
# Applying retry to LLM calls
from resilience.retry import retry_async, RetryConfig
import openai
llm_retry_config = RetryConfig(
max_attempts=3,
base_delay=1.0,
max_delay=30.0,
retryable_exceptions=(
openai.RateLimitError,
openai.APITimeoutError,
openai.InternalServerError,
openai.APIConnectionError,
),
)
@retry_async(llm_retry_config)
async def call_llm(messages: list[dict], model: str = "gpt-4o") -> str:
client = openai.AsyncOpenAI()
response = await client.chat.completions.create(
model=model,
messages=messages,
timeout=30.0,
)
return response.choices[0].message.content
Pattern 2: Circuit Breaker for LLM APIs
Circuit breakers prevent your system from hammering a failing service. When failures exceed a threshold, the circuit opens and immediately rejects requests without even attempting the call — giving the failing service time to recover.
# resilience/circuit_breaker.py
import time
import asyncio
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Optional
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5
recovery_timeout: float = 30.0
half_open_max_calls: int = 3
success_threshold: int = 2 # Successes needed in half-open to close
monitoring_window: float = 60.0 # Window for counting failures
class CircuitBreaker:
def __init__(self, name: str, config: CircuitBreakerConfig = None):
self.name = name
self.config = config or CircuitBreakerConfig()
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.half_open_calls = 0
self.last_failure_time = 0.0
self.last_state_change = time.time()
self._lock = asyncio.Lock()
async def execute(self, func: Callable, *args, **kwargs):
async with self._lock:
if not self._can_execute():
raise CircuitOpenError(
f"Circuit '{self.name}' is OPEN. "
f"Recovery in {self._time_until_recovery():.1f}s"
)
try:
result = await func(*args, **kwargs)
await self._record_success()
return result
except Exception as e:
await self._record_failure()
raise
def _can_execute(self) -> bool:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time >= self.config.recovery_timeout:
self._transition(CircuitState.HALF_OPEN)
return True
return False
if self.state == CircuitState.HALF_OPEN:
return self.half_open_calls < self.config.half_open_max_calls
return False
async def _record_success(self):
async with self._lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
self.half_open_calls += 1
if self.success_count >= self.config.success_threshold:
self._transition(CircuitState.CLOSED)
else:
self.failure_count = max(0, self.failure_count - 1)
async def _record_failure(self):
async with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self._transition(CircuitState.OPEN)
elif self.failure_count >= self.config.failure_threshold:
self._transition(CircuitState.OPEN)
def _transition(self, new_state: CircuitState):
old_state = self.state
self.state = new_state
self.last_state_change = time.time()
if new_state == CircuitState.CLOSED:
self.failure_count = 0
self.success_count = 0
elif new_state == CircuitState.HALF_OPEN:
self.half_open_calls = 0
self.success_count = 0
print(f"Circuit '{self.name}': {old_state.value} -> {new_state.value}")
def _time_until_recovery(self) -> float:
if self.state != CircuitState.OPEN:
return 0.0
elapsed = time.time() - self.last_failure_time
return max(0, self.config.recovery_timeout - elapsed)
class CircuitOpenError(Exception):
pass
Using the Circuit Breaker with an LLM Client
# resilience/llm_client.py
from resilience.circuit_breaker import CircuitBreaker, CircuitBreakerConfig, CircuitOpenError
from resilience.retry import retry_async, RetryConfig
import openai
class ResilientLLMClient:
def __init__(self):
self.client = openai.AsyncOpenAI()
self.breakers = {
"gpt-4o": CircuitBreaker("gpt-4o", CircuitBreakerConfig(
failure_threshold=5,
recovery_timeout=60.0,
)),
"gpt-4o-mini": CircuitBreaker("gpt-4o-mini", CircuitBreakerConfig(
failure_threshold=5,
recovery_timeout=30.0,
)),
}
async def complete(self, messages: list[dict], model: str = "gpt-4o",
fallback_model: str = "gpt-4o-mini") -> str:
# Try primary model
try:
breaker = self.breakers.get(model)
if breaker:
return await breaker.execute(
self._call, messages, model
)
return await self._call(messages, model)
except CircuitOpenError:
print(f"Primary model {model} circuit is open, trying fallback...")
except Exception as e:
print(f"Primary model {model} failed: {e}, trying fallback...")
# Try fallback model
if fallback_model and fallback_model != model:
try:
breaker = self.breakers.get(fallback_model)
if breaker:
return await breaker.execute(
self._call, messages, fallback_model
)
return await self._call(messages, fallback_model)
except Exception as e:
print(f"Fallback model {fallback_model} also failed: {e}")
raise Exception("All models unavailable")
@retry_async(RetryConfig(max_attempts=2, base_delay=0.5))
async def _call(self, messages: list[dict], model: str) -> str:
response = await self.client.chat.completions.create(
model=model,
messages=messages,
timeout=30.0,
)
return response.choices[0].message.content
Pattern 3: Fallback Chains for Tool Execution
When an agent's tool fails, it should not just report an error — it should try alternative approaches:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
# resilience/tool_fallback.py
from typing import Callable, Any
class ToolFallbackChain:
"""Execute a chain of tool implementations, falling back to the
next one if the current one fails."""
def __init__(self, name: str):
self.name = name
self.implementations: list[tuple[str, Callable]] = []
def add(self, label: str, func: Callable) -> "ToolFallbackChain":
self.implementations.append((label, func))
return self
async def execute(self, *args, **kwargs) -> Any:
errors = []
for label, func in self.implementations:
try:
result = await func(*args, **kwargs)
if result is not None:
return result
except Exception as e:
errors.append(f"{label}: {e}")
continue
raise Exception(
f"All implementations of '{self.name}' failed:\n"
+ "\n".join(errors)
)
# Usage example
web_search = ToolFallbackChain("web_search") \
.add("tavily", search_with_tavily) \
.add("brave", search_with_brave) \
.add("cached", search_from_cache)
Pattern 4: Graceful Degradation
When critical services are unavailable, the agent should degrade gracefully rather than failing completely:
# resilience/degradation.py
from dataclasses import dataclass
from enum import Enum
class ServiceLevel(Enum):
FULL = "full" # All capabilities available
DEGRADED = "degraded" # Some features unavailable
MINIMAL = "minimal" # Only basic responses
OFFLINE = "offline" # Cannot serve requests
@dataclass
class SystemHealth:
llm_available: bool = True
tools_available: bool = True
database_available: bool = True
@property
def service_level(self) -> ServiceLevel:
if self.llm_available and self.tools_available and self.database_available:
return ServiceLevel.FULL
if self.llm_available and not self.tools_available:
return ServiceLevel.DEGRADED
if not self.llm_available and self.database_available:
return ServiceLevel.MINIMAL
return ServiceLevel.OFFLINE
class DegradableAgent:
def __init__(self):
self.health = SystemHealth()
self.canned_responses = {
"greeting": "Hello! How can I help you today?",
"error": "I apologize, but I am experiencing technical difficulties. Please try again in a few minutes.",
"degraded": "I can help with basic questions, but some of my advanced features (like searching the web or checking databases) are temporarily unavailable.",
}
async def process(self, user_message: str) -> str:
level = self.health.service_level
if level == ServiceLevel.OFFLINE:
return self.canned_responses["error"]
if level == ServiceLevel.MINIMAL:
# Use cached FAQ or rule-based responses
return self._rule_based_response(user_message)
if level == ServiceLevel.DEGRADED:
# Use LLM but without tool access
prefix = self.canned_responses["degraded"] + "\n\n"
response = await self._llm_only_response(user_message)
return prefix + response
# Full service
return await self._full_agent_response(user_message)
def _rule_based_response(self, message: str) -> str:
"""Keyword-based matching when LLM is unavailable."""
message_lower = message.lower()
if any(w in message_lower for w in ["hours", "open", "close"]):
return "Our business hours are Monday-Friday, 9am-5pm EST."
if any(w in message_lower for w in ["price", "cost", "pricing"]):
return "Please visit our pricing page at callsphere.com/pricing for current plans."
return self.canned_responses["error"]
async def _llm_only_response(self, message: str) -> str:
"""LLM response without tools."""
# Agent runs with empty tools list
pass
async def _full_agent_response(self, message: str) -> str:
"""Full agent with all tools and capabilities."""
pass
Pattern 5: Timeout Management
Different operations need different timeouts. A tool lookup should complete in seconds; an LLM generation might take 30 seconds for a complex response:
# resilience/timeouts.py
import asyncio
from typing import TypeVar, Callable
T = TypeVar("T")
class TimeoutConfig:
LLM_CALL = 45.0 # LLM API calls
TOOL_EXECUTION = 15.0 # Individual tool calls
WEB_SEARCH = 10.0 # External search APIs
DATABASE_QUERY = 5.0 # Database operations
TOTAL_REQUEST = 120.0 # Total time for one user request
async def with_timeout(coro, timeout: float, fallback=None, label: str = ""):
"""Execute a coroutine with a timeout and optional fallback."""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
if fallback is not None:
print(f"Timeout after {timeout}s for {label}, using fallback")
return fallback
raise TimeoutError(f"{label} timed out after {timeout}s")
# Usage
result = await with_timeout(
call_llm(messages),
timeout=TimeoutConfig.LLM_CALL,
fallback="I need a moment to think about this. Could you rephrase your question?",
label="LLM completion",
)
Putting It All Together
Here is how these patterns compose in a production agent:
# resilience/resilient_agent.py
from resilience.llm_client import ResilientLLMClient
from resilience.circuit_breaker import CircuitBreaker
from resilience.degradation import DegradableAgent, SystemHealth
from resilience.timeouts import with_timeout, TimeoutConfig
class ProductionAgent(DegradableAgent):
def __init__(self):
super().__init__()
self.llm = ResilientLLMClient()
self.tool_breakers: dict[str, CircuitBreaker] = {}
async def _full_agent_response(self, message: str) -> str:
return await with_timeout(
self._run_agent_loop(message),
timeout=TimeoutConfig.TOTAL_REQUEST,
fallback="I apologize for the delay. Let me try a simpler approach.",
label="full agent response",
)
async def _run_agent_loop(self, message: str) -> str:
# Resilient LLM call with circuit breakers and fallback models
response = await self.llm.complete(
[{"role": "user", "content": message}],
model="gpt-4o",
fallback_model="gpt-4o-mini",
)
return response
FAQ
How do I test resilience patterns?
Use chaos engineering techniques. Inject failures in your test environment: add a test wrapper that randomly fails LLM calls, simulate timeouts with asyncio.sleep, and kill tool services during integration tests. Libraries like toxiproxy can simulate network failures between services.
What metrics should I monitor for agent resilience?
Track these key metrics: circuit breaker state changes per service, retry rate and success rate after retries, fallback activation rate, p50/p95/p99 latency for each operation (LLM calls, tool executions, total request time), and error rate by type (timeout, rate limit, server error). Set alerts when circuit breakers open or when fallback rates exceed 5%.
How do I handle rate limits from LLM providers?
Rate limits are the most common failure mode. Implement token-bucket rate limiting on your side to stay under provider limits. Use the Retry-After header from 429 responses to set your retry delay. Distribute requests across multiple API keys if you have them. Consider a request queue with priority levels for critical versus non-critical agent tasks.
Should I use different resilience strategies for synchronous versus streaming responses?
Yes. For streaming responses, set a timeout on the time-to-first-token rather than the total response time. If you do not receive the first chunk within 10 seconds, abort and retry. For synchronous calls, set the timeout on the total response. Also, implement a heartbeat check for streaming — if no chunk arrives for 15 seconds mid-stream, the connection may be stalled.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.