Skip to content
Learn Agentic AI11 min read0 views

Retry and Compensation Patterns for Agent Workflows: Handling Partial Failures

Master retry strategies, compensation logic, idempotency keys, and dead letter handling to build resilient agent workflows that recover gracefully from partial failures.

Partial Failures Are the Norm

In any multi-step agent workflow, partial failures are inevitable. An agent that books a flight, reserves a hotel, and rents a car will sometimes succeed on the flight but fail on the hotel. Without proper handling, you end up with a booked flight and no hotel — an inconsistent state that frustrates users.

Resilient agent workflows need three capabilities: retry (try again when transient errors occur), compensation (undo completed steps when a later step fails permanently), and idempotency (ensure retries do not create duplicate side effects).

Retry Strategies

Not all retries are equal. The right strategy depends on the failure type:

import asyncio
import random
from typing import Callable, Awaitable, TypeVar
from dataclasses import dataclass

T = TypeVar("T")

@dataclass
class RetryConfig:
    max_attempts: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    jitter: bool = True
    retryable_exceptions: tuple = (TimeoutError, ConnectionError)

async def retry_with_backoff(
    fn: Callable[..., Awaitable[T]],
    config: RetryConfig,
    *args,
    **kwargs,
) -> T:
    """Execute a function with exponential backoff and jitter."""
    last_exception = None

    for attempt in range(1, config.max_attempts + 1):
        try:
            return await fn(*args, **kwargs)
        except config.retryable_exceptions as e:
            last_exception = e
            if attempt == config.max_attempts:
                break

            # Calculate delay with exponential backoff
            delay = min(
                config.base_delay * (config.exponential_base ** (attempt - 1)),
                config.max_delay,
            )

            # Add jitter to prevent thundering herd
            if config.jitter:
                delay = delay * (0.5 + random.random())

            print(f"Attempt {attempt} failed: {e}. Retrying in {delay:.1f}s")
            await asyncio.sleep(delay)

    raise last_exception

Exponential backoff with jitter is the gold standard. The delay grows exponentially (1s, 2s, 4s, 8s...) to give the failing system time to recover, and the random jitter prevents multiple agents from retrying at the exact same moment.

Idempotency Keys

Retries are dangerous when actions have side effects. Sending a payment twice charges the customer double. Idempotency keys solve this by letting the receiver detect and deduplicate repeated requests:

import hashlib
import json
from datetime import datetime, timezone

class IdempotencyStore:
    """Track completed operations to prevent duplicate execution."""

    def __init__(self):
        self._completed: dict[str, dict] = {}

    def generate_key(self, operation: str, params: dict) -> str:
        """Deterministic key from operation and parameters."""
        payload = json.dumps(
            {"op": operation, "params": params}, sort_keys=True
        )
        return hashlib.sha256(payload.encode()).hexdigest()[:20]

    async def execute_once(
        self,
        key: str,
        fn: Callable[..., Awaitable],
        *args,
        **kwargs,
    ):
        """Execute only if this key has not been completed before."""
        if key in self._completed:
            print(f"Idempotent skip: {key}")
            return self._completed[key]["result"]

        result = await fn(*args, **kwargs)
        self._completed[key] = {
            "result": result,
            "completed_at": datetime.now(timezone.utc).isoformat(),
        }
        return result

When an agent retries a tool call, it passes the same idempotency key. If the store recognizes the key, it returns the cached result instead of executing again.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Compensation Logic

When a step fails permanently (after all retries are exhausted), you must undo the effects of previously completed steps. This is the compensation pattern:

@dataclass
class WorkflowStep:
    name: str
    execute: Callable[..., Awaitable]
    compensate: Callable[..., Awaitable] | None = None

class CompensatingWorkflow:
    """Execute steps with automatic rollback on failure."""

    def __init__(self, idempotency_store: IdempotencyStore):
        self.store = idempotency_store
        self.completed: list[tuple[WorkflowStep, dict]] = []

    async def run(self, steps: list[WorkflowStep], context: dict) -> dict:
        for step in steps:
            try:
                key = self.store.generate_key(step.name, context)
                result = await retry_with_backoff(
                    lambda: self.store.execute_once(
                        key, step.execute, context
                    ),
                    RetryConfig(max_attempts=3),
                )
                context[f"{step.name}_result"] = result
                self.completed.append((step, context.copy()))
            except Exception as e:
                print(f"Step '{step.name}' failed permanently: {e}")
                await self._compensate_all()
                raise WorkflowFailedError(step.name, e)

        return context

    async def _compensate_all(self):
        """Run compensations in reverse order."""
        for step, ctx in reversed(self.completed):
            if step.compensate:
                try:
                    await step.compensate(ctx)
                    print(f"Compensated: {step.name}")
                except Exception as ce:
                    print(f"Compensation failed for {step.name}: {ce}")
                    # Log and continue — do not stop other compensations

Dead Letter Handling

When both execution and compensation fail, the operation enters a "dead letter" state for manual intervention:

@dataclass
class DeadLetterEntry:
    workflow_id: str
    failed_step: str
    error: str
    context: dict
    timestamp: str
    retry_count: int

class DeadLetterQueue:
    def __init__(self):
        self.entries: list[DeadLetterEntry] = []

    def add(self, entry: DeadLetterEntry):
        self.entries.append(entry)
        # Alert operations team
        self._notify_ops(entry)

    def _notify_ops(self, entry: DeadLetterEntry):
        print(
            f"DEAD LETTER: workflow={entry.workflow_id} "
            f"step={entry.failed_step} error={entry.error}"
        )

    def get_pending(self) -> list[DeadLetterEntry]:
        return list(self.entries)

    def resolve(self, workflow_id: str):
        self.entries = [
            e for e in self.entries if e.workflow_id != workflow_id
        ]

Putting It All Together

Here is a complete travel booking workflow with retries, compensation, and dead letter handling:

async def book_flight(ctx):
    return {"confirmation": "FL-12345"}

async def cancel_flight(ctx):
    conf = ctx.get("book_flight_result", {}).get("confirmation")
    print(f"Cancelling flight {conf}")

async def reserve_hotel(ctx):
    raise ConnectionError("Hotel API temporarily unavailable")

async def cancel_hotel(ctx):
    print("Cancelling hotel reservation")

steps = [
    WorkflowStep("book_flight", book_flight, cancel_flight),
    WorkflowStep("reserve_hotel", reserve_hotel, cancel_hotel),
]

workflow = CompensatingWorkflow(IdempotencyStore())
try:
    await workflow.run(steps, {"trip_id": "TRIP-001"})
except WorkflowFailedError as e:
    dead_letter.add(DeadLetterEntry(
        workflow_id="TRIP-001",
        failed_step=e.step_name,
        error=str(e),
        context={},
        timestamp=datetime.now(timezone.utc).isoformat(),
        retry_count=3,
    ))

FAQ

When should I retry versus compensate and give up?

Retry on transient errors — network timeouts, rate limits (429), temporary service unavailability (503). Compensate on permanent errors — invalid input (400), authorization failures (403), or business logic violations. A good heuristic: if the same request would succeed if you tried again in 30 seconds, retry. If it would fail forever regardless, compensate.

How do I implement idempotency for LLM calls specifically?

Hash the full prompt (system message, user message, temperature, and model) to create a cache key. Store the LLM response against this key. On retry, check the cache first. This not only prevents duplicate work but also saves money on API costs. Set a reasonable TTL on the cache (1 to 24 hours) since the same prompt may need a fresh response in different contexts.

What if compensation itself is not possible — like an email that was already sent?

Some actions are inherently irreversible. For these, use a "forward recovery" strategy instead of compensation. If the hotel booking fails after the email confirmation was sent, do not try to "unsend" the email. Instead, send a correction email, or complete the workflow by finding an alternative hotel. Design your workflow so that irreversible steps execute last, after all reversible steps have succeeded.


#RetryPatterns #ErrorHandling #Resilience #Idempotency #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.