Skip to content
Technology11 min read0 views

Agentic AI Development Patterns: Event Sourcing, CQRS, and Saga for Agents

Advanced architectural patterns for agentic AI — event sourcing for agent actions, CQRS for state management, and saga pattern for multi-agent workflows.

Why Traditional Patterns Break Down for Agents

Agentic AI systems behave fundamentally differently from request-response web applications. An agent can take dozens of actions in a single user interaction — searching databases, calling APIs, making decisions, and modifying state. These actions form a complex, branching execution path that is difficult to debug, replay, or undo using conventional CRUD patterns.

Three patterns from distributed systems engineering map remarkably well to agentic AI challenges: Event Sourcing for capturing every agent action as an immutable record, CQRS for separating the write path (agent actions) from the read path (dashboards and analytics), and the Saga pattern for coordinating multi-agent workflows that span multiple services.

This guide shows how to adapt these patterns specifically for agentic AI systems, with practical implementations you can apply to your own projects.

Event Sourcing for Agent Actions

The Problem

Consider a customer support agent that looks up an order, checks the refund policy, and issues a partial refund. In a traditional CRUD system, you store only the final state — the refund was issued. But when something goes wrong, you cannot answer critical questions: Why did the agent issue a partial refund instead of a full one? What information did it base the decision on? Did it check the right policy?

The Pattern

Event Sourcing captures every action the agent takes as an immutable event in an append-only log. Instead of storing current state, you store the sequence of events that produced that state. The current state can always be reconstructed by replaying the events.

Agent Event Log (conversation: conv-8842)
─────────────────────────────────────────────
[1] ConversationStarted
    timestamp: 2026-03-14T10:23:00Z
    agent: triage-agent
    user_message: "I want a refund for order ORD-5521"

[2] ToolCalled
    timestamp: 2026-03-14T10:23:01Z
    agent: triage-agent
    tool: lookup_order
    input: {order_id: "ORD-5521"}
    output: {status: "delivered", amount: 89.99, items: [...]}

[3] HandoffInitiated
    timestamp: 2026-03-14T10:23:02Z
    from_agent: triage-agent
    to_agent: refund-agent
    reason: "User requesting refund on delivered order"

[4] ToolCalled
    timestamp: 2026-03-14T10:23:03Z
    agent: refund-agent
    tool: check_refund_policy
    input: {order_id: "ORD-5521", reason: "customer_request"}
    output: {eligible: true, max_refund: 44.99, type: "partial"}

[5] ToolCalled
    timestamp: 2026-03-14T10:23:04Z
    agent: refund-agent
    tool: issue_refund
    input: {order_id: "ORD-5521", amount: 44.99}
    output: {refund_id: "REF-3301", status: "processed"}

[6] ResponseGenerated
    timestamp: 2026-03-14T10:23:05Z
    agent: refund-agent
    response: "I have issued a partial refund of $44.99..."

Implementation

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
import json
import uuid

class EventType(Enum):
    CONVERSATION_STARTED = "conversation_started"
    TOOL_CALLED = "tool_called"
    TOOL_RESULT = "tool_result"
    HANDOFF_INITIATED = "handoff_initiated"
    RESPONSE_GENERATED = "response_generated"
    ERROR_OCCURRED = "error_occurred"
    GUARDRAIL_TRIGGERED = "guardrail_triggered"

@dataclass
class AgentEvent:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    conversation_id: str = ""
    event_type: EventType = EventType.CONVERSATION_STARTED
    timestamp: datetime = field(default_factory=datetime.utcnow)
    agent_name: str = ""
    data: dict = field(default_factory=dict)
    sequence_number: int = 0

class EventStore:
    """Append-only event store for agent actions."""

    def __init__(self, db_pool):
        self.db_pool = db_pool

    async def append(self, event: AgentEvent):
        """Append an event to the store."""
        await self.db_pool.execute(
            """INSERT INTO agent_events
               (id, conversation_id, event_type, timestamp,
                agent_name, data, sequence_number)
               VALUES ($1, $2, $3, $4, $5, $6, $7)""",
            event.id,
            event.conversation_id,
            event.event_type.value,
            event.timestamp,
            event.agent_name,
            json.dumps(event.data),
            event.sequence_number,
        )

    async def get_events(
        self, conversation_id: str
    ) -> list[AgentEvent]:
        """Retrieve all events for a conversation."""
        rows = await self.db_pool.fetch(
            """SELECT * FROM agent_events
               WHERE conversation_id = $1
               ORDER BY sequence_number ASC""",
            conversation_id,
        )
        return [self._row_to_event(row) for row in rows]

    async def replay(
        self, conversation_id: str
    ) -> dict:
        """Replay events to reconstruct current state."""
        events = await self.get_events(conversation_id)
        state = {
            "active_agent": None,
            "tools_called": [],
            "handoffs": [],
            "final_response": None,
        }
        for event in events:
            if event.event_type == EventType.CONVERSATION_STARTED:
                state["active_agent"] = event.agent_name
            elif event.event_type == EventType.TOOL_CALLED:
                state["tools_called"].append(event.data)
            elif event.event_type == EventType.HANDOFF_INITIATED:
                state["handoffs"].append(event.data)
                state["active_agent"] = event.data.get("to_agent")
            elif event.event_type == EventType.RESPONSE_GENERATED:
                state["final_response"] = event.data.get("response")
        return state

Benefits for Agentic AI

  • Complete audit trail: Every agent decision is recorded and can be reviewed
  • Debugging: Replay a problematic conversation to see exactly what happened
  • Analytics: Aggregate events to understand tool usage patterns, handoff frequency, and failure modes
  • Compliance: Financial and healthcare agents need immutable records of every action taken

CQRS for Agent State Management

The Problem

An agentic AI system has two very different read/write profiles. The write side (agent executing) needs fast, sequential writes for each action in the agent loop — often 5 to 20 writes per conversation turn. The read side (dashboards, analytics, search) needs complex queries across thousands of conversations — aggregations, full-text search, and real-time metrics.

Optimizing a single database schema for both workloads creates painful compromises.

The Pattern

CQRS (Command Query Responsibility Segregation) separates the write model from the read model. The agent writes events to an optimized write store, and a projection process transforms those events into read-optimized views.

Architecture Diagram (ASCII):

  User Message
       |
       v
  ┌─────────┐     ┌──────────────┐
  │  Agent   │────>│  Event Store │  (Write Side)
  │  Loop    │     │  (append)    │
  └─────────┘     └──────┬───────┘
                         │
                    Event Bus
                         │
              ┌──────────┼──────────┐
              v          v          v
        ┌──────────┐ ┌────────┐ ┌────────────┐
        │Dashboard │ │Search  │ │ Analytics  │
        │View      │ │Index   │ │ Aggregates │
        │(Postgres)│ │(Elastic│ │ (ClickHouse│
        └──────────┘ │search) │ │ or TimescaleDB)
                     └────────┘ └────────────┘
                     (Read Side - Projections)

Implementation

The write side is the event store from the previous section. The read side uses projections — event handlers that build read-optimized views:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

class ConversationProjection:
    """Projects agent events into a read-optimized
    conversations table."""

    def __init__(self, read_db):
        self.read_db = read_db

    async def handle_event(self, event: AgentEvent):
        """Update the read model based on a new event."""
        if event.event_type == EventType.CONVERSATION_STARTED:
            await self.read_db.execute(
                """INSERT INTO conversation_views
                   (id, started_at, active_agent, status,
                    tool_count, handoff_count)
                   VALUES ($1, $2, $3, 'active', 0, 0)""",
                event.conversation_id,
                event.timestamp,
                event.agent_name,
            )
        elif event.event_type == EventType.TOOL_CALLED:
            await self.read_db.execute(
                """UPDATE conversation_views
                   SET tool_count = tool_count + 1,
                       last_tool = $2,
                       updated_at = $3
                   WHERE id = $1""",
                event.conversation_id,
                event.data.get("tool"),
                event.timestamp,
            )
        elif event.event_type == EventType.HANDOFF_INITIATED:
            await self.read_db.execute(
                """UPDATE conversation_views
                   SET handoff_count = handoff_count + 1,
                       active_agent = $2,
                       updated_at = $3
                   WHERE id = $1""",
                event.conversation_id,
                event.data.get("to_agent"),
                event.timestamp,
            )
        elif event.event_type == EventType.RESPONSE_GENERATED:
            await self.read_db.execute(
                """UPDATE conversation_views
                   SET status = 'completed',
                       completed_at = $2,
                       duration_ms = EXTRACT(EPOCH FROM
                         ($2 - started_at)) * 1000
                   WHERE id = $1""",
                event.conversation_id,
                event.timestamp,
            )

The read side can now serve dashboard queries efficiently:

# Fast dashboard queries on the read model
async def get_agent_metrics(time_range: str):
    return await read_db.fetch(
        """SELECT
             active_agent,
             COUNT(*) as total_conversations,
             AVG(duration_ms) as avg_duration,
             AVG(tool_count) as avg_tools_per_conversation,
             SUM(handoff_count) as total_handoffs
           FROM conversation_views
           WHERE started_at > NOW() - $1::interval
           GROUP BY active_agent""",
        time_range,
    )

Saga Pattern for Multi-Agent Workflows

The Problem

Consider a multi-agent workflow where a sales agent qualifies a lead, a scheduling agent books a demo, and a CRM agent creates the opportunity record. If the scheduling step fails (no available slots), you need to undo the CRM record that was optimistically created and notify the user. This is a distributed transaction across multiple agents.

The Pattern

The Saga pattern breaks a complex workflow into a sequence of steps, each with a corresponding compensating action (rollback). If any step fails, the saga executes compensating actions for all completed steps in reverse order.

Saga: Book Demo Meeting
───────────────────────────────────

Step 1: Qualify Lead
  Action:    qualify_lead(lead_id)
  Compensate: mark_lead_unqualified(lead_id)
       |
       v (success)
Step 2: Create CRM Opportunity
  Action:    create_opportunity(lead_id, deal_value)
  Compensate: delete_opportunity(opportunity_id)
       |
       v (success)
Step 3: Schedule Demo
  Action:    book_calendar_slot(rep_id, lead_id, time)
  Compensate: cancel_calendar_slot(booking_id)
       |
       v (FAILURE - no slots available)

  ← Compensate Step 2: delete_opportunity(opp-123)
  ← Compensate Step 1: mark_lead_unqualified(lead-456)

Result: All changes rolled back cleanly

Implementation

from dataclasses import dataclass
from typing import Callable, Any
import asyncio

@dataclass
class SagaStep:
    name: str
    action: Callable[..., Any]
    compensate: Callable[..., Any]
    action_args: dict = field(default_factory=dict)
    result: Any = None

class AgentSaga:
    """Orchestrates multi-agent workflows with
    compensation on failure."""

    def __init__(self, name: str, event_store: EventStore):
        self.name = name
        self.steps: list[SagaStep] = []
        self.completed_steps: list[SagaStep] = []
        self.event_store = event_store

    def add_step(
        self,
        name: str,
        action: Callable,
        compensate: Callable,
        **action_args,
    ):
        self.steps.append(SagaStep(
            name=name,
            action=action,
            compensate=compensate,
            action_args=action_args,
        ))

    async def execute(self, conversation_id: str) -> dict:
        """Execute the saga steps in order."""
        for step in self.steps:
            try:
                result = await step.action(
                    **step.action_args
                )
                step.result = result
                self.completed_steps.append(step)

                await self.event_store.append(AgentEvent(
                    conversation_id=conversation_id,
                    event_type=EventType.TOOL_CALLED,
                    agent_name=self.name,
                    data={
                        "saga_step": step.name,
                        "status": "completed",
                        "result": str(result),
                    },
                ))
            except Exception as e:
                await self._compensate(conversation_id, e)
                return {
                    "status": "rolled_back",
                    "failed_step": step.name,
                    "error": str(e),
                }

        return {
            "status": "completed",
            "steps": [s.name for s in self.completed_steps],
        }

    async def _compensate(
        self, conversation_id: str, error: Exception
    ):
        """Roll back completed steps in reverse order."""
        for step in reversed(self.completed_steps):
            try:
                await step.compensate(
                    **step.action_args,
                    result=step.result,
                )
                await self.event_store.append(AgentEvent(
                    conversation_id=conversation_id,
                    event_type=EventType.TOOL_CALLED,
                    agent_name=self.name,
                    data={
                        "saga_step": step.name,
                        "status": "compensated",
                        "reason": str(error),
                    },
                ))
            except Exception as comp_error:
                # Log compensation failure - requires manual fix
                await self.event_store.append(AgentEvent(
                    conversation_id=conversation_id,
                    event_type=EventType.ERROR_OCCURRED,
                    agent_name=self.name,
                    data={
                        "saga_step": step.name,
                        "status": "compensation_failed",
                        "error": str(comp_error),
                    },
                ))

Usage Example

async def book_demo_workflow(lead_id: str, rep_id: str):
    saga = AgentSaga("book-demo", event_store)

    saga.add_step(
        name="qualify_lead",
        action=sales_agent.qualify_lead,
        compensate=sales_agent.unqualify_lead,
        lead_id=lead_id,
    )
    saga.add_step(
        name="create_opportunity",
        action=crm_agent.create_opportunity,
        compensate=crm_agent.delete_opportunity,
        lead_id=lead_id,
    )
    saga.add_step(
        name="schedule_demo",
        action=scheduling_agent.book_slot,
        compensate=scheduling_agent.cancel_slot,
        rep_id=rep_id,
        lead_id=lead_id,
    )

    return await saga.execute(conversation_id="conv-123")

At CallSphere, we use saga-based orchestration for our multi-agent call handling workflows, where a triage agent, specialist agent, and follow-up agent need to coordinate across CRM updates, calendar bookings, and notification delivery.

Combining the Patterns

The three patterns complement each other powerfully:

  1. Event Sourcing captures every action as an immutable event
  2. CQRS projects those events into read-optimized views for dashboards and analytics
  3. Saga orchestrates multi-step workflows with automatic rollback on failure

Together, they give you complete observability (Event Sourcing), efficient querying (CQRS), and reliable coordination (Saga) — the three biggest challenges in production agentic AI systems.

Frequently Asked Questions

Is Event Sourcing overkill for simple agents?

For a single-agent system with 2-3 tools handling low traffic, yes — simple structured logging gives you most of the debugging benefits without the infrastructure overhead. Event Sourcing becomes essential when you have multi-agent systems, compliance requirements, or need to replay and debug conversations systematically. The sweet spot is introducing Event Sourcing when your agent system grows beyond a single agent or when you need an audit trail for business or regulatory reasons.

How do I handle event schema evolution?

As your agent system evolves, event schemas will change — new tools, new data fields, renamed agents. Use a versioned schema approach: include a schema_version field in every event and write upcasters that transform old events into the current schema during replay. Never modify existing events. Instead, create new event types alongside the old ones. This ensures your event log remains an accurate historical record.

What is the performance overhead of Event Sourcing?

The append-only write to the event store adds 1-5ms per event. For a typical agent turn with 5-10 events, that is 5-50ms of overhead — negligible compared to the 500-3000ms LLM API call. The read side (projections) runs asynchronously and does not affect agent response time. The main cost is storage — plan for roughly 1-5KB per event, which means 10-50KB per conversation turn. For 100,000 conversations per day, that is 1-5GB of event data per day.

When should I use the Saga pattern vs simple sequential execution?

Use Saga when: (1) multiple external systems are modified during a workflow, (2) those modifications need to be undone if a later step fails, and (3) the consequences of inconsistent state are significant (financial transactions, legal records, customer-facing bookings). For workflows where partial completion is acceptable or where steps are read-only, simple sequential execution with error handling is sufficient and much simpler.

Can I use these patterns with any agent framework?

Yes. These patterns operate at the infrastructure layer, not the agent framework layer. Whether you use the OpenAI Agents SDK, LangGraph, or Claude's tool-use API, you can wrap tool executions with event logging, project events into read models, and coordinate multi-step workflows with sagas. The patterns are framework-agnostic — they care about the actions agents take, not how the agents are implemented.

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.