Skip to content
Learn Agentic AI
Learn Agentic AI15 min read0 views

Building Resilient AI Agents: Circuit Breakers, Retries, and Graceful Degradation

Production resilience patterns for AI agents: circuit breakers for LLM APIs, exponential backoff with jitter, fallback models, and graceful degradation strategies.

Why Resilience Matters for AI Agents

AI agents depend on external services that fail. LLM APIs experience rate limits, timeouts, and outages. Tool servers crash. Databases become unreachable. A production agent that lacks resilience patterns will fail catastrophically when any dependency hiccups — and in a system that chains multiple LLM calls and tool executions, the probability of at least one failure per request is significant.

Consider an agent that makes 5 tool calls per request, each with 99% reliability. The probability that all 5 succeed is 0.99 to the power of 5, which is 95.1%. That means roughly 1 in 20 requests will encounter at least one failure. Without resilience patterns, those requests fail completely. With proper retries, circuit breakers, and fallbacks, you can push the effective reliability back above 99.9%.

Pattern 1: Retry with Exponential Backoff and Jitter

The most fundamental resilience pattern. When a call fails, wait and try again — but do it intelligently.

# resilience/retry.py
import asyncio
import random
import time
from functools import wraps
from typing import Type

class RetryConfig:
    def __init__(
        self,
        max_attempts: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
        jitter: bool = True,
        retryable_exceptions: tuple[Type[Exception], ...] = (Exception,),
    ):
        self.max_attempts = max_attempts
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter
        self.retryable_exceptions = retryable_exceptions

def calculate_delay(attempt: int, config: RetryConfig) -> float:
    """Calculate delay with exponential backoff and optional jitter."""
    delay = config.base_delay * (config.exponential_base ** attempt)
    delay = min(delay, config.max_delay)

    if config.jitter:
        # Full jitter: random value between 0 and the calculated delay
        delay = random.uniform(0, delay)

    return delay

def retry_async(config: RetryConfig = None):
    """Decorator for async functions with retry logic."""
    if config is None:
        config = RetryConfig()

    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(config.max_attempts):
                try:
                    return await func(*args, **kwargs)
                except config.retryable_exceptions as e:
                    last_exception = e
                    if attempt < config.max_attempts - 1:
                        delay = calculate_delay(attempt, config)
                        print(
                            f"Attempt {attempt + 1} failed: {e}. "
                            f"Retrying in {delay:.2f}s..."
                        )
                        await asyncio.sleep(delay)
                    else:
                        print(f"All {config.max_attempts} attempts failed.")

            raise last_exception

        return wrapper
    return decorator

Why Jitter Matters

Without jitter, when a service recovers from an outage, all clients retry at exactly the same time — creating a thundering herd that immediately overloads the service again. Jitter spreads retries over time, giving the service room to recover.

# Applying retry to LLM calls
from resilience.retry import retry_async, RetryConfig
import openai

llm_retry_config = RetryConfig(
    max_attempts=3,
    base_delay=1.0,
    max_delay=30.0,
    retryable_exceptions=(
        openai.RateLimitError,
        openai.APITimeoutError,
        openai.InternalServerError,
        openai.APIConnectionError,
    ),
)

@retry_async(llm_retry_config)
async def call_llm(messages: list[dict], model: str = "gpt-4o") -> str:
    client = openai.AsyncOpenAI()
    response = await client.chat.completions.create(
        model=model,
        messages=messages,
        timeout=30.0,
    )
    return response.choices[0].message.content

Pattern 2: Circuit Breaker for LLM APIs

Circuit breakers prevent your system from hammering a failing service. When failures exceed a threshold, the circuit opens and immediately rejects requests without even attempting the call — giving the failing service time to recover.

# resilience/circuit_breaker.py
import time
import asyncio
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Optional

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5
    recovery_timeout: float = 30.0
    half_open_max_calls: int = 3
    success_threshold: int = 2  # Successes needed in half-open to close
    monitoring_window: float = 60.0  # Window for counting failures

class CircuitBreaker:
    def __init__(self, name: str, config: CircuitBreakerConfig = None):
        self.name = name
        self.config = config or CircuitBreakerConfig()
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.half_open_calls = 0
        self.last_failure_time = 0.0
        self.last_state_change = time.time()
        self._lock = asyncio.Lock()

    async def execute(self, func: Callable, *args, **kwargs):
        async with self._lock:
            if not self._can_execute():
                raise CircuitOpenError(
                    f"Circuit '{self.name}' is OPEN. "
                    f"Recovery in {self._time_until_recovery():.1f}s"
                )

        try:
            result = await func(*args, **kwargs)
            await self._record_success()
            return result
        except Exception as e:
            await self._record_failure()
            raise

    def _can_execute(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True

        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time >= self.config.recovery_timeout:
                self._transition(CircuitState.HALF_OPEN)
                return True
            return False

        if self.state == CircuitState.HALF_OPEN:
            return self.half_open_calls < self.config.half_open_max_calls

        return False

    async def _record_success(self):
        async with self._lock:
            if self.state == CircuitState.HALF_OPEN:
                self.success_count += 1
                self.half_open_calls += 1
                if self.success_count >= self.config.success_threshold:
                    self._transition(CircuitState.CLOSED)
            else:
                self.failure_count = max(0, self.failure_count - 1)

    async def _record_failure(self):
        async with self._lock:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.state == CircuitState.HALF_OPEN:
                self._transition(CircuitState.OPEN)
            elif self.failure_count >= self.config.failure_threshold:
                self._transition(CircuitState.OPEN)

    def _transition(self, new_state: CircuitState):
        old_state = self.state
        self.state = new_state
        self.last_state_change = time.time()

        if new_state == CircuitState.CLOSED:
            self.failure_count = 0
            self.success_count = 0
        elif new_state == CircuitState.HALF_OPEN:
            self.half_open_calls = 0
            self.success_count = 0

        print(f"Circuit '{self.name}': {old_state.value} -> {new_state.value}")

    def _time_until_recovery(self) -> float:
        if self.state != CircuitState.OPEN:
            return 0.0
        elapsed = time.time() - self.last_failure_time
        return max(0, self.config.recovery_timeout - elapsed)

class CircuitOpenError(Exception):
    pass

Using the Circuit Breaker with an LLM Client

# resilience/llm_client.py
from resilience.circuit_breaker import CircuitBreaker, CircuitBreakerConfig, CircuitOpenError
from resilience.retry import retry_async, RetryConfig
import openai

class ResilientLLMClient:
    def __init__(self):
        self.client = openai.AsyncOpenAI()
        self.breakers = {
            "gpt-4o": CircuitBreaker("gpt-4o", CircuitBreakerConfig(
                failure_threshold=5,
                recovery_timeout=60.0,
            )),
            "gpt-4o-mini": CircuitBreaker("gpt-4o-mini", CircuitBreakerConfig(
                failure_threshold=5,
                recovery_timeout=30.0,
            )),
        }

    async def complete(self, messages: list[dict], model: str = "gpt-4o",
                       fallback_model: str = "gpt-4o-mini") -> str:
        # Try primary model
        try:
            breaker = self.breakers.get(model)
            if breaker:
                return await breaker.execute(
                    self._call, messages, model
                )
            return await self._call(messages, model)
        except CircuitOpenError:
            print(f"Primary model {model} circuit is open, trying fallback...")
        except Exception as e:
            print(f"Primary model {model} failed: {e}, trying fallback...")

        # Try fallback model
        if fallback_model and fallback_model != model:
            try:
                breaker = self.breakers.get(fallback_model)
                if breaker:
                    return await breaker.execute(
                        self._call, messages, fallback_model
                    )
                return await self._call(messages, fallback_model)
            except Exception as e:
                print(f"Fallback model {fallback_model} also failed: {e}")

        raise Exception("All models unavailable")

    @retry_async(RetryConfig(max_attempts=2, base_delay=0.5))
    async def _call(self, messages: list[dict], model: str) -> str:
        response = await self.client.chat.completions.create(
            model=model,
            messages=messages,
            timeout=30.0,
        )
        return response.choices[0].message.content

Pattern 3: Fallback Chains for Tool Execution

When an agent's tool fails, it should not just report an error — it should try alternative approaches:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

# resilience/tool_fallback.py
from typing import Callable, Any

class ToolFallbackChain:
    """Execute a chain of tool implementations, falling back to the
    next one if the current one fails."""

    def __init__(self, name: str):
        self.name = name
        self.implementations: list[tuple[str, Callable]] = []

    def add(self, label: str, func: Callable) -> "ToolFallbackChain":
        self.implementations.append((label, func))
        return self

    async def execute(self, *args, **kwargs) -> Any:
        errors = []
        for label, func in self.implementations:
            try:
                result = await func(*args, **kwargs)
                if result is not None:
                    return result
            except Exception as e:
                errors.append(f"{label}: {e}")
                continue

        raise Exception(
            f"All implementations of '{self.name}' failed:\n"
            + "\n".join(errors)
        )

# Usage example
web_search = ToolFallbackChain("web_search") \
    .add("tavily", search_with_tavily) \
    .add("brave", search_with_brave) \
    .add("cached", search_from_cache)

Pattern 4: Graceful Degradation

When critical services are unavailable, the agent should degrade gracefully rather than failing completely:

# resilience/degradation.py
from dataclasses import dataclass
from enum import Enum

class ServiceLevel(Enum):
    FULL = "full"           # All capabilities available
    DEGRADED = "degraded"   # Some features unavailable
    MINIMAL = "minimal"     # Only basic responses
    OFFLINE = "offline"     # Cannot serve requests

@dataclass
class SystemHealth:
    llm_available: bool = True
    tools_available: bool = True
    database_available: bool = True

    @property
    def service_level(self) -> ServiceLevel:
        if self.llm_available and self.tools_available and self.database_available:
            return ServiceLevel.FULL
        if self.llm_available and not self.tools_available:
            return ServiceLevel.DEGRADED
        if not self.llm_available and self.database_available:
            return ServiceLevel.MINIMAL
        return ServiceLevel.OFFLINE

class DegradableAgent:
    def __init__(self):
        self.health = SystemHealth()
        self.canned_responses = {
            "greeting": "Hello! How can I help you today?",
            "error": "I apologize, but I am experiencing technical difficulties. Please try again in a few minutes.",
            "degraded": "I can help with basic questions, but some of my advanced features (like searching the web or checking databases) are temporarily unavailable.",
        }

    async def process(self, user_message: str) -> str:
        level = self.health.service_level

        if level == ServiceLevel.OFFLINE:
            return self.canned_responses["error"]

        if level == ServiceLevel.MINIMAL:
            # Use cached FAQ or rule-based responses
            return self._rule_based_response(user_message)

        if level == ServiceLevel.DEGRADED:
            # Use LLM but without tool access
            prefix = self.canned_responses["degraded"] + "\n\n"
            response = await self._llm_only_response(user_message)
            return prefix + response

        # Full service
        return await self._full_agent_response(user_message)

    def _rule_based_response(self, message: str) -> str:
        """Keyword-based matching when LLM is unavailable."""
        message_lower = message.lower()
        if any(w in message_lower for w in ["hours", "open", "close"]):
            return "Our business hours are Monday-Friday, 9am-5pm EST."
        if any(w in message_lower for w in ["price", "cost", "pricing"]):
            return "Please visit our pricing page at callsphere.com/pricing for current plans."
        return self.canned_responses["error"]

    async def _llm_only_response(self, message: str) -> str:
        """LLM response without tools."""
        # Agent runs with empty tools list
        pass

    async def _full_agent_response(self, message: str) -> str:
        """Full agent with all tools and capabilities."""
        pass

Pattern 5: Timeout Management

Different operations need different timeouts. A tool lookup should complete in seconds; an LLM generation might take 30 seconds for a complex response:

# resilience/timeouts.py
import asyncio
from typing import TypeVar, Callable

T = TypeVar("T")

class TimeoutConfig:
    LLM_CALL = 45.0        # LLM API calls
    TOOL_EXECUTION = 15.0   # Individual tool calls
    WEB_SEARCH = 10.0       # External search APIs
    DATABASE_QUERY = 5.0    # Database operations
    TOTAL_REQUEST = 120.0   # Total time for one user request

async def with_timeout(coro, timeout: float, fallback=None, label: str = ""):
    """Execute a coroutine with a timeout and optional fallback."""
    try:
        return await asyncio.wait_for(coro, timeout=timeout)
    except asyncio.TimeoutError:
        if fallback is not None:
            print(f"Timeout after {timeout}s for {label}, using fallback")
            return fallback
        raise TimeoutError(f"{label} timed out after {timeout}s")

# Usage
result = await with_timeout(
    call_llm(messages),
    timeout=TimeoutConfig.LLM_CALL,
    fallback="I need a moment to think about this. Could you rephrase your question?",
    label="LLM completion",
)

Putting It All Together

Here is how these patterns compose in a production agent:

# resilience/resilient_agent.py
from resilience.llm_client import ResilientLLMClient
from resilience.circuit_breaker import CircuitBreaker
from resilience.degradation import DegradableAgent, SystemHealth
from resilience.timeouts import with_timeout, TimeoutConfig

class ProductionAgent(DegradableAgent):
    def __init__(self):
        super().__init__()
        self.llm = ResilientLLMClient()
        self.tool_breakers: dict[str, CircuitBreaker] = {}

    async def _full_agent_response(self, message: str) -> str:
        return await with_timeout(
            self._run_agent_loop(message),
            timeout=TimeoutConfig.TOTAL_REQUEST,
            fallback="I apologize for the delay. Let me try a simpler approach.",
            label="full agent response",
        )

    async def _run_agent_loop(self, message: str) -> str:
        # Resilient LLM call with circuit breakers and fallback models
        response = await self.llm.complete(
            [{"role": "user", "content": message}],
            model="gpt-4o",
            fallback_model="gpt-4o-mini",
        )
        return response

FAQ

How do I test resilience patterns?

Use chaos engineering techniques. Inject failures in your test environment: add a test wrapper that randomly fails LLM calls, simulate timeouts with asyncio.sleep, and kill tool services during integration tests. Libraries like toxiproxy can simulate network failures between services.

What metrics should I monitor for agent resilience?

Track these key metrics: circuit breaker state changes per service, retry rate and success rate after retries, fallback activation rate, p50/p95/p99 latency for each operation (LLM calls, tool executions, total request time), and error rate by type (timeout, rate limit, server error). Set alerts when circuit breakers open or when fallback rates exceed 5%.

How do I handle rate limits from LLM providers?

Rate limits are the most common failure mode. Implement token-bucket rate limiting on your side to stay under provider limits. Use the Retry-After header from 429 responses to set your retry delay. Distribute requests across multiple API keys if you have them. Consider a request queue with priority levels for critical versus non-critical agent tasks.

Should I use different resilience strategies for synchronous versus streaming responses?

Yes. For streaming responses, set a timeout on the time-to-first-token rather than the total response time. If you do not receive the first chunk within 10 seconds, abort and retry. For synchronous calls, set the timeout on the total response. Also, implement a heartbeat check for streaming — if no chunk arrives for 15 seconds mid-stream, the connection may be stalled.

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.