Retry and Compensation Patterns for Agent Workflows: Handling Partial Failures
Master retry strategies, compensation logic, idempotency keys, and dead letter handling to build resilient agent workflows that recover gracefully from partial failures.
Partial Failures Are the Norm
In any multi-step agent workflow, partial failures are inevitable. An agent that books a flight, reserves a hotel, and rents a car will sometimes succeed on the flight but fail on the hotel. Without proper handling, you end up with a booked flight and no hotel — an inconsistent state that frustrates users.
Resilient agent workflows need three capabilities: retry (try again when transient errors occur), compensation (undo completed steps when a later step fails permanently), and idempotency (ensure retries do not create duplicate side effects).
Retry Strategies
Not all retries are equal. The right strategy depends on the failure type:
import asyncio
import random
from typing import Callable, Awaitable, TypeVar
from dataclasses import dataclass
T = TypeVar("T")
@dataclass
class RetryConfig:
max_attempts: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
retryable_exceptions: tuple = (TimeoutError, ConnectionError)
async def retry_with_backoff(
fn: Callable[..., Awaitable[T]],
config: RetryConfig,
*args,
**kwargs,
) -> T:
"""Execute a function with exponential backoff and jitter."""
last_exception = None
for attempt in range(1, config.max_attempts + 1):
try:
return await fn(*args, **kwargs)
except config.retryable_exceptions as e:
last_exception = e
if attempt == config.max_attempts:
break
# Calculate delay with exponential backoff
delay = min(
config.base_delay * (config.exponential_base ** (attempt - 1)),
config.max_delay,
)
# Add jitter to prevent thundering herd
if config.jitter:
delay = delay * (0.5 + random.random())
print(f"Attempt {attempt} failed: {e}. Retrying in {delay:.1f}s")
await asyncio.sleep(delay)
raise last_exception
Exponential backoff with jitter is the gold standard. The delay grows exponentially (1s, 2s, 4s, 8s...) to give the failing system time to recover, and the random jitter prevents multiple agents from retrying at the exact same moment.
Idempotency Keys
Retries are dangerous when actions have side effects. Sending a payment twice charges the customer double. Idempotency keys solve this by letting the receiver detect and deduplicate repeated requests:
import hashlib
import json
from datetime import datetime, timezone
class IdempotencyStore:
"""Track completed operations to prevent duplicate execution."""
def __init__(self):
self._completed: dict[str, dict] = {}
def generate_key(self, operation: str, params: dict) -> str:
"""Deterministic key from operation and parameters."""
payload = json.dumps(
{"op": operation, "params": params}, sort_keys=True
)
return hashlib.sha256(payload.encode()).hexdigest()[:20]
async def execute_once(
self,
key: str,
fn: Callable[..., Awaitable],
*args,
**kwargs,
):
"""Execute only if this key has not been completed before."""
if key in self._completed:
print(f"Idempotent skip: {key}")
return self._completed[key]["result"]
result = await fn(*args, **kwargs)
self._completed[key] = {
"result": result,
"completed_at": datetime.now(timezone.utc).isoformat(),
}
return result
When an agent retries a tool call, it passes the same idempotency key. If the store recognizes the key, it returns the cached result instead of executing again.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Compensation Logic
When a step fails permanently (after all retries are exhausted), you must undo the effects of previously completed steps. This is the compensation pattern:
@dataclass
class WorkflowStep:
name: str
execute: Callable[..., Awaitable]
compensate: Callable[..., Awaitable] | None = None
class CompensatingWorkflow:
"""Execute steps with automatic rollback on failure."""
def __init__(self, idempotency_store: IdempotencyStore):
self.store = idempotency_store
self.completed: list[tuple[WorkflowStep, dict]] = []
async def run(self, steps: list[WorkflowStep], context: dict) -> dict:
for step in steps:
try:
key = self.store.generate_key(step.name, context)
result = await retry_with_backoff(
lambda: self.store.execute_once(
key, step.execute, context
),
RetryConfig(max_attempts=3),
)
context[f"{step.name}_result"] = result
self.completed.append((step, context.copy()))
except Exception as e:
print(f"Step '{step.name}' failed permanently: {e}")
await self._compensate_all()
raise WorkflowFailedError(step.name, e)
return context
async def _compensate_all(self):
"""Run compensations in reverse order."""
for step, ctx in reversed(self.completed):
if step.compensate:
try:
await step.compensate(ctx)
print(f"Compensated: {step.name}")
except Exception as ce:
print(f"Compensation failed for {step.name}: {ce}")
# Log and continue — do not stop other compensations
Dead Letter Handling
When both execution and compensation fail, the operation enters a "dead letter" state for manual intervention:
@dataclass
class DeadLetterEntry:
workflow_id: str
failed_step: str
error: str
context: dict
timestamp: str
retry_count: int
class DeadLetterQueue:
def __init__(self):
self.entries: list[DeadLetterEntry] = []
def add(self, entry: DeadLetterEntry):
self.entries.append(entry)
# Alert operations team
self._notify_ops(entry)
def _notify_ops(self, entry: DeadLetterEntry):
print(
f"DEAD LETTER: workflow={entry.workflow_id} "
f"step={entry.failed_step} error={entry.error}"
)
def get_pending(self) -> list[DeadLetterEntry]:
return list(self.entries)
def resolve(self, workflow_id: str):
self.entries = [
e for e in self.entries if e.workflow_id != workflow_id
]
Putting It All Together
Here is a complete travel booking workflow with retries, compensation, and dead letter handling:
async def book_flight(ctx):
return {"confirmation": "FL-12345"}
async def cancel_flight(ctx):
conf = ctx.get("book_flight_result", {}).get("confirmation")
print(f"Cancelling flight {conf}")
async def reserve_hotel(ctx):
raise ConnectionError("Hotel API temporarily unavailable")
async def cancel_hotel(ctx):
print("Cancelling hotel reservation")
steps = [
WorkflowStep("book_flight", book_flight, cancel_flight),
WorkflowStep("reserve_hotel", reserve_hotel, cancel_hotel),
]
workflow = CompensatingWorkflow(IdempotencyStore())
try:
await workflow.run(steps, {"trip_id": "TRIP-001"})
except WorkflowFailedError as e:
dead_letter.add(DeadLetterEntry(
workflow_id="TRIP-001",
failed_step=e.step_name,
error=str(e),
context={},
timestamp=datetime.now(timezone.utc).isoformat(),
retry_count=3,
))
FAQ
When should I retry versus compensate and give up?
Retry on transient errors — network timeouts, rate limits (429), temporary service unavailability (503). Compensate on permanent errors — invalid input (400), authorization failures (403), or business logic violations. A good heuristic: if the same request would succeed if you tried again in 30 seconds, retry. If it would fail forever regardless, compensate.
How do I implement idempotency for LLM calls specifically?
Hash the full prompt (system message, user message, temperature, and model) to create a cache key. Store the LLM response against this key. On retry, check the cache first. This not only prevents duplicate work but also saves money on API costs. Set a reasonable TTL on the cache (1 to 24 hours) since the same prompt may need a fresh response in different contexts.
What if compensation itself is not possible — like an email that was already sent?
Some actions are inherently irreversible. For these, use a "forward recovery" strategy instead of compensation. If the hotel booking fails after the email confirmation was sent, do not try to "unsend" the email. Instead, send a correction email, or complete the workflow by finding an alternative hotel. Design your workflow so that irreversible steps execute last, after all reversible steps have succeeded.
#RetryPatterns #ErrorHandling #Resilience #Idempotency #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.