Skip to content
Learn Agentic AI12 min read0 views

The Saga Pattern: Managing Long-Running Multi-Step Agent Transactions

Implement the Saga pattern for AI agent systems to manage multi-step transactions with compensating actions, rollback support, and saga orchestration for reliable distributed workflows.

The Transaction Problem in Multi-Agent Systems

When an AI agent workflow spans multiple steps — booking a flight, reserving a hotel, and renting a car — each step may call a different external service. If the car rental fails after the flight and hotel are already booked, you need to cancel the hotel reservation and the flight booking. Traditional database transactions cannot span these external services. The Saga pattern solves this by defining a compensating action for each step that undoes its effect if a later step fails.

A saga is a sequence of steps where each step has both an action (the forward operation) and a compensation (the rollback operation). If any step fails, the saga executes compensations for all previously completed steps, in reverse order.

Core Saga Framework

from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Any
from datetime import datetime


class StepStatus(Enum):
    PENDING = "pending"
    COMPLETED = "completed"
    FAILED = "failed"
    COMPENSATED = "compensated"


@dataclass
class SagaStep:
    name: str
    action: Callable[[dict], Any]
    compensation: Callable[[dict, Any], None]
    status: StepStatus = StepStatus.PENDING
    result: Any = None
    error: str | None = None


class SagaStatus(Enum):
    RUNNING = "running"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    ROLLED_BACK = "rolled_back"
    FAILED = "failed"


@dataclass
class SagaLog:
    saga_id: str
    status: SagaStatus
    steps: list[dict]
    started_at: datetime
    completed_at: datetime | None = None


class SagaOrchestrator:
    def __init__(self, saga_id: str):
        self.saga_id = saga_id
        self.steps: list[SagaStep] = []
        self.context: dict = {}
        self.status = SagaStatus.RUNNING

    def add_step(
        self,
        name: str,
        action: Callable[[dict], Any],
        compensation: Callable[[dict, Any], None],
    ) -> "SagaOrchestrator":
        self.steps.append(SagaStep(
            name=name, action=action, compensation=compensation
        ))
        return self

    def execute(self, initial_context: dict | None = None) -> SagaLog:
        if initial_context:
            self.context.update(initial_context)

        started = datetime.now()
        completed_steps: list[SagaStep] = []

        for step in self.steps:
            print(f"[Saga {self.saga_id}] Executing: {step.name}")
            try:
                result = step.action(self.context)
                step.result = result
                step.status = StepStatus.COMPLETED
                completed_steps.append(step)

                # Store result in context for subsequent steps
                self.context[f"{step.name}_result"] = result
                print(f"[Saga {self.saga_id}] "
                      f"Completed: {step.name}")

            except Exception as e:
                step.status = StepStatus.FAILED
                step.error = str(e)
                print(f"[Saga {self.saga_id}] "
                      f"Failed at {step.name}: {e}")

                # Compensate in reverse order
                self._compensate(completed_steps)
                return self._build_log(started)

        self.status = SagaStatus.COMPLETED
        return self._build_log(started)

    def _compensate(self, completed_steps: list[SagaStep]):
        self.status = SagaStatus.COMPENSATING
        print(f"[Saga {self.saga_id}] Starting compensation "
              f"for {len(completed_steps)} steps")

        for step in reversed(completed_steps):
            try:
                print(f"[Saga {self.saga_id}] "
                      f"Compensating: {step.name}")
                step.compensation(self.context, step.result)
                step.status = StepStatus.COMPENSATED
            except Exception as e:
                print(f"[Saga {self.saga_id}] Compensation "
                      f"FAILED for {step.name}: {e}")
                self.status = SagaStatus.FAILED
                return

        self.status = SagaStatus.ROLLED_BACK

    def _build_log(self, started: datetime) -> SagaLog:
        return SagaLog(
            saga_id=self.saga_id,
            status=self.status,
            steps=[
                {
                    "name": s.name,
                    "status": s.status.value,
                    "error": s.error,
                }
                for s in self.steps
            ],
            started_at=started,
            completed_at=datetime.now(),
        )

Applying the Saga to a Travel Booking

import uuid


# Simulated external service calls
def book_flight(ctx: dict) -> dict:
    print(f"  Booking flight to {ctx['destination']}")
    booking_id = str(uuid.uuid4())[:8]
    # Simulate API call to airline
    return {"booking_id": booking_id, "airline": "SkyAir",
            "price": 450.00}


def cancel_flight(ctx: dict, result: dict) -> None:
    print(f"  Cancelling flight {result['booking_id']}")
    # Simulate cancellation API call


def reserve_hotel(ctx: dict) -> dict:
    print(f"  Reserving hotel in {ctx['destination']}")
    reservation_id = str(uuid.uuid4())[:8]
    return {"reservation_id": reservation_id,
            "hotel": "Grand Plaza", "price": 200.00}


def cancel_hotel(ctx: dict, result: dict) -> None:
    print(f"  Cancelling hotel {result['reservation_id']}")


def rent_car(ctx: dict) -> dict:
    print(f"  Renting car in {ctx['destination']}")
    # Simulate a failure
    if ctx.get("simulate_failure"):
        raise Exception("No cars available at destination")
    rental_id = str(uuid.uuid4())[:8]
    return {"rental_id": rental_id, "price": 75.00}


def cancel_car(ctx: dict, result: dict) -> None:
    print(f"  Cancelling car rental {result['rental_id']}")


# Build and execute the saga
saga = (
    SagaOrchestrator("travel-001")
    .add_step("book_flight", book_flight, cancel_flight)
    .add_step("reserve_hotel", reserve_hotel, cancel_hotel)
    .add_step("rent_car", rent_car, cancel_car)
)

# This will fail at rent_car and roll back hotel + flight
log = saga.execute({
    "destination": "Tokyo",
    "simulate_failure": True,
})

print(f"\nSaga status: {log.status.value}")
for step in log.steps:
    print(f"  {step['name']}: {step['status']}")

Running this produces:

[Saga travel-001] Executing: book_flight
  Booking flight to Tokyo
[Saga travel-001] Completed: book_flight
[Saga travel-001] Executing: reserve_hotel
  Reserving hotel in Tokyo
[Saga travel-001] Completed: reserve_hotel
[Saga travel-001] Executing: rent_car
  Renting car in Tokyo
[Saga travel-001] Failed at rent_car: No cars available
[Saga travel-001] Starting compensation for 2 steps
[Saga travel-001] Compensating: reserve_hotel
  Cancelling hotel abc123
[Saga travel-001] Compensating: book_flight
  Cancelling flight def456

Saga status: rolled_back
  book_flight: compensated
  reserve_hotel: compensated
  rent_car: failed

Handling Compensation Failures

The hardest part of the Saga pattern is when a compensation itself fails. If you cannot cancel the flight, the system is in an inconsistent state. Common strategies include: retrying the compensation with exponential backoff, logging the failure for manual intervention, or using an idempotent compensation design so retries are safe.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

FAQ

What is the difference between the Saga pattern and the Pipeline pattern?

The Pipeline pattern focuses on data transformation through sequential stages — if a stage fails, you stop or retry that stage. The Saga pattern focuses on distributed transactions — if a step fails, you must undo the side effects of all previous steps. Use Pipeline for data processing and Saga for operations that create external state that needs cleanup on failure.

How do I make compensations idempotent?

Store the result of each step (booking IDs, reservation IDs) and check whether the resource has already been cancelled before attempting cancellation. If the resource no longer exists, the compensation is a no-op rather than an error. This makes it safe to retry compensations multiple times.

Can I run saga steps in parallel instead of sequentially?

Yes, but parallel sagas are significantly more complex. You need to track which parallel branches completed, compensate only the completed branches on failure, and handle the case where a compensation races with a still-running step. Start with sequential sagas and only introduce parallelism when the performance gain justifies the added complexity.


#AgentDesignPatterns #SagaPattern #Python #DistributedSystems #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.