Agentic AI Development Patterns: Event Sourcing, CQRS, and Saga for Agents
Advanced architectural patterns for agentic AI — event sourcing for agent actions, CQRS for state management, and saga pattern for multi-agent workflows.
Why Traditional Patterns Break Down for Agents
Agentic AI systems behave fundamentally differently from request-response web applications. An agent can take dozens of actions in a single user interaction — searching databases, calling APIs, making decisions, and modifying state. These actions form a complex, branching execution path that is difficult to debug, replay, or undo using conventional CRUD patterns.
Three patterns from distributed systems engineering map remarkably well to agentic AI challenges: Event Sourcing for capturing every agent action as an immutable record, CQRS for separating the write path (agent actions) from the read path (dashboards and analytics), and the Saga pattern for coordinating multi-agent workflows that span multiple services.
This guide shows how to adapt these patterns specifically for agentic AI systems, with practical implementations you can apply to your own projects.
Event Sourcing for Agent Actions
The Problem
Consider a customer support agent that looks up an order, checks the refund policy, and issues a partial refund. In a traditional CRUD system, you store only the final state — the refund was issued. But when something goes wrong, you cannot answer critical questions: Why did the agent issue a partial refund instead of a full one? What information did it base the decision on? Did it check the right policy?
The Pattern
Event Sourcing captures every action the agent takes as an immutable event in an append-only log. Instead of storing current state, you store the sequence of events that produced that state. The current state can always be reconstructed by replaying the events.
Agent Event Log (conversation: conv-8842)
─────────────────────────────────────────────
[1] ConversationStarted
timestamp: 2026-03-14T10:23:00Z
agent: triage-agent
user_message: "I want a refund for order ORD-5521"
[2] ToolCalled
timestamp: 2026-03-14T10:23:01Z
agent: triage-agent
tool: lookup_order
input: {order_id: "ORD-5521"}
output: {status: "delivered", amount: 89.99, items: [...]}
[3] HandoffInitiated
timestamp: 2026-03-14T10:23:02Z
from_agent: triage-agent
to_agent: refund-agent
reason: "User requesting refund on delivered order"
[4] ToolCalled
timestamp: 2026-03-14T10:23:03Z
agent: refund-agent
tool: check_refund_policy
input: {order_id: "ORD-5521", reason: "customer_request"}
output: {eligible: true, max_refund: 44.99, type: "partial"}
[5] ToolCalled
timestamp: 2026-03-14T10:23:04Z
agent: refund-agent
tool: issue_refund
input: {order_id: "ORD-5521", amount: 44.99}
output: {refund_id: "REF-3301", status: "processed"}
[6] ResponseGenerated
timestamp: 2026-03-14T10:23:05Z
agent: refund-agent
response: "I have issued a partial refund of $44.99..."
Implementation
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
import json
import uuid
class EventType(Enum):
CONVERSATION_STARTED = "conversation_started"
TOOL_CALLED = "tool_called"
TOOL_RESULT = "tool_result"
HANDOFF_INITIATED = "handoff_initiated"
RESPONSE_GENERATED = "response_generated"
ERROR_OCCURRED = "error_occurred"
GUARDRAIL_TRIGGERED = "guardrail_triggered"
@dataclass
class AgentEvent:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
conversation_id: str = ""
event_type: EventType = EventType.CONVERSATION_STARTED
timestamp: datetime = field(default_factory=datetime.utcnow)
agent_name: str = ""
data: dict = field(default_factory=dict)
sequence_number: int = 0
class EventStore:
"""Append-only event store for agent actions."""
def __init__(self, db_pool):
self.db_pool = db_pool
async def append(self, event: AgentEvent):
"""Append an event to the store."""
await self.db_pool.execute(
"""INSERT INTO agent_events
(id, conversation_id, event_type, timestamp,
agent_name, data, sequence_number)
VALUES ($1, $2, $3, $4, $5, $6, $7)""",
event.id,
event.conversation_id,
event.event_type.value,
event.timestamp,
event.agent_name,
json.dumps(event.data),
event.sequence_number,
)
async def get_events(
self, conversation_id: str
) -> list[AgentEvent]:
"""Retrieve all events for a conversation."""
rows = await self.db_pool.fetch(
"""SELECT * FROM agent_events
WHERE conversation_id = $1
ORDER BY sequence_number ASC""",
conversation_id,
)
return [self._row_to_event(row) for row in rows]
async def replay(
self, conversation_id: str
) -> dict:
"""Replay events to reconstruct current state."""
events = await self.get_events(conversation_id)
state = {
"active_agent": None,
"tools_called": [],
"handoffs": [],
"final_response": None,
}
for event in events:
if event.event_type == EventType.CONVERSATION_STARTED:
state["active_agent"] = event.agent_name
elif event.event_type == EventType.TOOL_CALLED:
state["tools_called"].append(event.data)
elif event.event_type == EventType.HANDOFF_INITIATED:
state["handoffs"].append(event.data)
state["active_agent"] = event.data.get("to_agent")
elif event.event_type == EventType.RESPONSE_GENERATED:
state["final_response"] = event.data.get("response")
return state
Benefits for Agentic AI
- Complete audit trail: Every agent decision is recorded and can be reviewed
- Debugging: Replay a problematic conversation to see exactly what happened
- Analytics: Aggregate events to understand tool usage patterns, handoff frequency, and failure modes
- Compliance: Financial and healthcare agents need immutable records of every action taken
CQRS for Agent State Management
The Problem
An agentic AI system has two very different read/write profiles. The write side (agent executing) needs fast, sequential writes for each action in the agent loop — often 5 to 20 writes per conversation turn. The read side (dashboards, analytics, search) needs complex queries across thousands of conversations — aggregations, full-text search, and real-time metrics.
Optimizing a single database schema for both workloads creates painful compromises.
The Pattern
CQRS (Command Query Responsibility Segregation) separates the write model from the read model. The agent writes events to an optimized write store, and a projection process transforms those events into read-optimized views.
Architecture Diagram (ASCII):
User Message
|
v
┌─────────┐ ┌──────────────┐
│ Agent │────>│ Event Store │ (Write Side)
│ Loop │ │ (append) │
└─────────┘ └──────┬───────┘
│
Event Bus
│
┌──────────┼──────────┐
v v v
┌──────────┐ ┌────────┐ ┌────────────┐
│Dashboard │ │Search │ │ Analytics │
│View │ │Index │ │ Aggregates │
│(Postgres)│ │(Elastic│ │ (ClickHouse│
└──────────┘ │search) │ │ or TimescaleDB)
└────────┘ └────────────┘
(Read Side - Projections)
Implementation
The write side is the event store from the previous section. The read side uses projections — event handlers that build read-optimized views:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
class ConversationProjection:
"""Projects agent events into a read-optimized
conversations table."""
def __init__(self, read_db):
self.read_db = read_db
async def handle_event(self, event: AgentEvent):
"""Update the read model based on a new event."""
if event.event_type == EventType.CONVERSATION_STARTED:
await self.read_db.execute(
"""INSERT INTO conversation_views
(id, started_at, active_agent, status,
tool_count, handoff_count)
VALUES ($1, $2, $3, 'active', 0, 0)""",
event.conversation_id,
event.timestamp,
event.agent_name,
)
elif event.event_type == EventType.TOOL_CALLED:
await self.read_db.execute(
"""UPDATE conversation_views
SET tool_count = tool_count + 1,
last_tool = $2,
updated_at = $3
WHERE id = $1""",
event.conversation_id,
event.data.get("tool"),
event.timestamp,
)
elif event.event_type == EventType.HANDOFF_INITIATED:
await self.read_db.execute(
"""UPDATE conversation_views
SET handoff_count = handoff_count + 1,
active_agent = $2,
updated_at = $3
WHERE id = $1""",
event.conversation_id,
event.data.get("to_agent"),
event.timestamp,
)
elif event.event_type == EventType.RESPONSE_GENERATED:
await self.read_db.execute(
"""UPDATE conversation_views
SET status = 'completed',
completed_at = $2,
duration_ms = EXTRACT(EPOCH FROM
($2 - started_at)) * 1000
WHERE id = $1""",
event.conversation_id,
event.timestamp,
)
The read side can now serve dashboard queries efficiently:
# Fast dashboard queries on the read model
async def get_agent_metrics(time_range: str):
return await read_db.fetch(
"""SELECT
active_agent,
COUNT(*) as total_conversations,
AVG(duration_ms) as avg_duration,
AVG(tool_count) as avg_tools_per_conversation,
SUM(handoff_count) as total_handoffs
FROM conversation_views
WHERE started_at > NOW() - $1::interval
GROUP BY active_agent""",
time_range,
)
Saga Pattern for Multi-Agent Workflows
The Problem
Consider a multi-agent workflow where a sales agent qualifies a lead, a scheduling agent books a demo, and a CRM agent creates the opportunity record. If the scheduling step fails (no available slots), you need to undo the CRM record that was optimistically created and notify the user. This is a distributed transaction across multiple agents.
The Pattern
The Saga pattern breaks a complex workflow into a sequence of steps, each with a corresponding compensating action (rollback). If any step fails, the saga executes compensating actions for all completed steps in reverse order.
Saga: Book Demo Meeting
───────────────────────────────────
Step 1: Qualify Lead
Action: qualify_lead(lead_id)
Compensate: mark_lead_unqualified(lead_id)
|
v (success)
Step 2: Create CRM Opportunity
Action: create_opportunity(lead_id, deal_value)
Compensate: delete_opportunity(opportunity_id)
|
v (success)
Step 3: Schedule Demo
Action: book_calendar_slot(rep_id, lead_id, time)
Compensate: cancel_calendar_slot(booking_id)
|
v (FAILURE - no slots available)
← Compensate Step 2: delete_opportunity(opp-123)
← Compensate Step 1: mark_lead_unqualified(lead-456)
Result: All changes rolled back cleanly
Implementation
from dataclasses import dataclass
from typing import Callable, Any
import asyncio
@dataclass
class SagaStep:
name: str
action: Callable[..., Any]
compensate: Callable[..., Any]
action_args: dict = field(default_factory=dict)
result: Any = None
class AgentSaga:
"""Orchestrates multi-agent workflows with
compensation on failure."""
def __init__(self, name: str, event_store: EventStore):
self.name = name
self.steps: list[SagaStep] = []
self.completed_steps: list[SagaStep] = []
self.event_store = event_store
def add_step(
self,
name: str,
action: Callable,
compensate: Callable,
**action_args,
):
self.steps.append(SagaStep(
name=name,
action=action,
compensate=compensate,
action_args=action_args,
))
async def execute(self, conversation_id: str) -> dict:
"""Execute the saga steps in order."""
for step in self.steps:
try:
result = await step.action(
**step.action_args
)
step.result = result
self.completed_steps.append(step)
await self.event_store.append(AgentEvent(
conversation_id=conversation_id,
event_type=EventType.TOOL_CALLED,
agent_name=self.name,
data={
"saga_step": step.name,
"status": "completed",
"result": str(result),
},
))
except Exception as e:
await self._compensate(conversation_id, e)
return {
"status": "rolled_back",
"failed_step": step.name,
"error": str(e),
}
return {
"status": "completed",
"steps": [s.name for s in self.completed_steps],
}
async def _compensate(
self, conversation_id: str, error: Exception
):
"""Roll back completed steps in reverse order."""
for step in reversed(self.completed_steps):
try:
await step.compensate(
**step.action_args,
result=step.result,
)
await self.event_store.append(AgentEvent(
conversation_id=conversation_id,
event_type=EventType.TOOL_CALLED,
agent_name=self.name,
data={
"saga_step": step.name,
"status": "compensated",
"reason": str(error),
},
))
except Exception as comp_error:
# Log compensation failure - requires manual fix
await self.event_store.append(AgentEvent(
conversation_id=conversation_id,
event_type=EventType.ERROR_OCCURRED,
agent_name=self.name,
data={
"saga_step": step.name,
"status": "compensation_failed",
"error": str(comp_error),
},
))
Usage Example
async def book_demo_workflow(lead_id: str, rep_id: str):
saga = AgentSaga("book-demo", event_store)
saga.add_step(
name="qualify_lead",
action=sales_agent.qualify_lead,
compensate=sales_agent.unqualify_lead,
lead_id=lead_id,
)
saga.add_step(
name="create_opportunity",
action=crm_agent.create_opportunity,
compensate=crm_agent.delete_opportunity,
lead_id=lead_id,
)
saga.add_step(
name="schedule_demo",
action=scheduling_agent.book_slot,
compensate=scheduling_agent.cancel_slot,
rep_id=rep_id,
lead_id=lead_id,
)
return await saga.execute(conversation_id="conv-123")
At CallSphere, we use saga-based orchestration for our multi-agent call handling workflows, where a triage agent, specialist agent, and follow-up agent need to coordinate across CRM updates, calendar bookings, and notification delivery.
Combining the Patterns
The three patterns complement each other powerfully:
- Event Sourcing captures every action as an immutable event
- CQRS projects those events into read-optimized views for dashboards and analytics
- Saga orchestrates multi-step workflows with automatic rollback on failure
Together, they give you complete observability (Event Sourcing), efficient querying (CQRS), and reliable coordination (Saga) — the three biggest challenges in production agentic AI systems.
Frequently Asked Questions
Is Event Sourcing overkill for simple agents?
For a single-agent system with 2-3 tools handling low traffic, yes — simple structured logging gives you most of the debugging benefits without the infrastructure overhead. Event Sourcing becomes essential when you have multi-agent systems, compliance requirements, or need to replay and debug conversations systematically. The sweet spot is introducing Event Sourcing when your agent system grows beyond a single agent or when you need an audit trail for business or regulatory reasons.
How do I handle event schema evolution?
As your agent system evolves, event schemas will change — new tools, new data fields, renamed agents. Use a versioned schema approach: include a schema_version field in every event and write upcasters that transform old events into the current schema during replay. Never modify existing events. Instead, create new event types alongside the old ones. This ensures your event log remains an accurate historical record.
What is the performance overhead of Event Sourcing?
The append-only write to the event store adds 1-5ms per event. For a typical agent turn with 5-10 events, that is 5-50ms of overhead — negligible compared to the 500-3000ms LLM API call. The read side (projections) runs asynchronously and does not affect agent response time. The main cost is storage — plan for roughly 1-5KB per event, which means 10-50KB per conversation turn. For 100,000 conversations per day, that is 1-5GB of event data per day.
When should I use the Saga pattern vs simple sequential execution?
Use Saga when: (1) multiple external systems are modified during a workflow, (2) those modifications need to be undone if a later step fails, and (3) the consequences of inconsistent state are significant (financial transactions, legal records, customer-facing bookings). For workflows where partial completion is acceptable or where steps are read-only, simple sequential execution with error handling is sufficient and much simpler.
Can I use these patterns with any agent framework?
Yes. These patterns operate at the infrastructure layer, not the agent framework layer. Whether you use the OpenAI Agents SDK, LangGraph, or Claude's tool-use API, you can wrap tool executions with event logging, project events into read models, and coordinate multi-step workflows with sagas. The patterns are framework-agnostic — they care about the actions agents take, not how the agents are implemented.
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.