The Saga Pattern: Managing Long-Running Multi-Step Agent Transactions
Implement the Saga pattern for AI agent systems to manage multi-step transactions with compensating actions, rollback support, and saga orchestration for reliable distributed workflows.
The Transaction Problem in Multi-Agent Systems
When an AI agent workflow spans multiple steps — booking a flight, reserving a hotel, and renting a car — each step may call a different external service. If the car rental fails after the flight and hotel are already booked, you need to cancel the hotel reservation and the flight booking. Traditional database transactions cannot span these external services. The Saga pattern solves this by defining a compensating action for each step that undoes its effect if a later step fails.
A saga is a sequence of steps where each step has both an action (the forward operation) and a compensation (the rollback operation). If any step fails, the saga executes compensations for all previously completed steps, in reverse order.
Core Saga Framework
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Any
from datetime import datetime
class StepStatus(Enum):
PENDING = "pending"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATED = "compensated"
@dataclass
class SagaStep:
name: str
action: Callable[[dict], Any]
compensation: Callable[[dict, Any], None]
status: StepStatus = StepStatus.PENDING
result: Any = None
error: str | None = None
class SagaStatus(Enum):
RUNNING = "running"
COMPLETED = "completed"
COMPENSATING = "compensating"
ROLLED_BACK = "rolled_back"
FAILED = "failed"
@dataclass
class SagaLog:
saga_id: str
status: SagaStatus
steps: list[dict]
started_at: datetime
completed_at: datetime | None = None
class SagaOrchestrator:
def __init__(self, saga_id: str):
self.saga_id = saga_id
self.steps: list[SagaStep] = []
self.context: dict = {}
self.status = SagaStatus.RUNNING
def add_step(
self,
name: str,
action: Callable[[dict], Any],
compensation: Callable[[dict, Any], None],
) -> "SagaOrchestrator":
self.steps.append(SagaStep(
name=name, action=action, compensation=compensation
))
return self
def execute(self, initial_context: dict | None = None) -> SagaLog:
if initial_context:
self.context.update(initial_context)
started = datetime.now()
completed_steps: list[SagaStep] = []
for step in self.steps:
print(f"[Saga {self.saga_id}] Executing: {step.name}")
try:
result = step.action(self.context)
step.result = result
step.status = StepStatus.COMPLETED
completed_steps.append(step)
# Store result in context for subsequent steps
self.context[f"{step.name}_result"] = result
print(f"[Saga {self.saga_id}] "
f"Completed: {step.name}")
except Exception as e:
step.status = StepStatus.FAILED
step.error = str(e)
print(f"[Saga {self.saga_id}] "
f"Failed at {step.name}: {e}")
# Compensate in reverse order
self._compensate(completed_steps)
return self._build_log(started)
self.status = SagaStatus.COMPLETED
return self._build_log(started)
def _compensate(self, completed_steps: list[SagaStep]):
self.status = SagaStatus.COMPENSATING
print(f"[Saga {self.saga_id}] Starting compensation "
f"for {len(completed_steps)} steps")
for step in reversed(completed_steps):
try:
print(f"[Saga {self.saga_id}] "
f"Compensating: {step.name}")
step.compensation(self.context, step.result)
step.status = StepStatus.COMPENSATED
except Exception as e:
print(f"[Saga {self.saga_id}] Compensation "
f"FAILED for {step.name}: {e}")
self.status = SagaStatus.FAILED
return
self.status = SagaStatus.ROLLED_BACK
def _build_log(self, started: datetime) -> SagaLog:
return SagaLog(
saga_id=self.saga_id,
status=self.status,
steps=[
{
"name": s.name,
"status": s.status.value,
"error": s.error,
}
for s in self.steps
],
started_at=started,
completed_at=datetime.now(),
)
Applying the Saga to a Travel Booking
import uuid
# Simulated external service calls
def book_flight(ctx: dict) -> dict:
print(f" Booking flight to {ctx['destination']}")
booking_id = str(uuid.uuid4())[:8]
# Simulate API call to airline
return {"booking_id": booking_id, "airline": "SkyAir",
"price": 450.00}
def cancel_flight(ctx: dict, result: dict) -> None:
print(f" Cancelling flight {result['booking_id']}")
# Simulate cancellation API call
def reserve_hotel(ctx: dict) -> dict:
print(f" Reserving hotel in {ctx['destination']}")
reservation_id = str(uuid.uuid4())[:8]
return {"reservation_id": reservation_id,
"hotel": "Grand Plaza", "price": 200.00}
def cancel_hotel(ctx: dict, result: dict) -> None:
print(f" Cancelling hotel {result['reservation_id']}")
def rent_car(ctx: dict) -> dict:
print(f" Renting car in {ctx['destination']}")
# Simulate a failure
if ctx.get("simulate_failure"):
raise Exception("No cars available at destination")
rental_id = str(uuid.uuid4())[:8]
return {"rental_id": rental_id, "price": 75.00}
def cancel_car(ctx: dict, result: dict) -> None:
print(f" Cancelling car rental {result['rental_id']}")
# Build and execute the saga
saga = (
SagaOrchestrator("travel-001")
.add_step("book_flight", book_flight, cancel_flight)
.add_step("reserve_hotel", reserve_hotel, cancel_hotel)
.add_step("rent_car", rent_car, cancel_car)
)
# This will fail at rent_car and roll back hotel + flight
log = saga.execute({
"destination": "Tokyo",
"simulate_failure": True,
})
print(f"\nSaga status: {log.status.value}")
for step in log.steps:
print(f" {step['name']}: {step['status']}")
Running this produces:
[Saga travel-001] Executing: book_flight
Booking flight to Tokyo
[Saga travel-001] Completed: book_flight
[Saga travel-001] Executing: reserve_hotel
Reserving hotel in Tokyo
[Saga travel-001] Completed: reserve_hotel
[Saga travel-001] Executing: rent_car
Renting car in Tokyo
[Saga travel-001] Failed at rent_car: No cars available
[Saga travel-001] Starting compensation for 2 steps
[Saga travel-001] Compensating: reserve_hotel
Cancelling hotel abc123
[Saga travel-001] Compensating: book_flight
Cancelling flight def456
Saga status: rolled_back
book_flight: compensated
reserve_hotel: compensated
rent_car: failed
Handling Compensation Failures
The hardest part of the Saga pattern is when a compensation itself fails. If you cannot cancel the flight, the system is in an inconsistent state. Common strategies include: retrying the compensation with exponential backoff, logging the failure for manual intervention, or using an idempotent compensation design so retries are safe.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
FAQ
What is the difference between the Saga pattern and the Pipeline pattern?
The Pipeline pattern focuses on data transformation through sequential stages — if a stage fails, you stop or retry that stage. The Saga pattern focuses on distributed transactions — if a step fails, you must undo the side effects of all previous steps. Use Pipeline for data processing and Saga for operations that create external state that needs cleanup on failure.
How do I make compensations idempotent?
Store the result of each step (booking IDs, reservation IDs) and check whether the resource has already been cancelled before attempting cancellation. If the resource no longer exists, the compensation is a no-op rather than an error. This makes it safe to retry compensations multiple times.
Can I run saga steps in parallel instead of sequentially?
Yes, but parallel sagas are significantly more complex. You need to track which parallel branches completed, compensate only the completed branches on failure, and handle the case where a compensation races with a still-running step. Start with sequential sagas and only introduce parallelism when the performance gain justifies the added complexity.
#AgentDesignPatterns #SagaPattern #Python #DistributedSystems #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.