Event Sourcing for AI Agents: Recording Every Decision for Replay and Audit
Implement event sourcing for AI agent systems to create a complete audit trail of every decision, enable workflow replay for debugging, and build projections that reconstruct agent state from event history.
Why Event Sourcing Fits AI Agents
Traditional state-based systems store only the current state. When an AI agent makes a series of tool calls, reasoning steps, and decisions, you only see the final output. But debugging an agent that produced a wrong answer requires understanding the full chain: which tools were called, what data came back, how the LLM interpreted the results, and where the reasoning went wrong.
Event sourcing solves this by storing every state change as an immutable event. Instead of updating a "current state" record, you append events. The current state is always derivable by replaying the event stream from the beginning.
Designing the Event Store
The event store is an append-only log. Each event carries a timestamp, a type, the agent session it belongs to, and a payload:
import uuid
import json
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
from typing import Any
@dataclass
class AgentEvent:
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
session_id: str = ""
event_type: str = ""
timestamp: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
payload: dict[str, Any] = field(default_factory=dict)
version: int = 0
class EventStore:
"""Append-only event store for agent sessions."""
def __init__(self):
self._events: list[AgentEvent] = []
def append(self, event: AgentEvent):
event.version = len(self._events) + 1
self._events.append(event)
def get_events(
self, session_id: str, after_version: int = 0
) -> list[AgentEvent]:
return [
e for e in self._events
if e.session_id == session_id and e.version > after_version
]
def get_all_events(self) -> list[AgentEvent]:
return list(self._events)
Recording Agent Decisions
Wrap your agent execution to emit events at each decision point:
class EventSourcedAgent:
"""Agent that records every action as an event."""
def __init__(self, session_id: str, store: EventStore):
self.session_id = session_id
self.store = store
def _emit(self, event_type: str, payload: dict):
self.store.append(AgentEvent(
session_id=self.session_id,
event_type=event_type,
payload=payload,
))
async def run(self, user_message: str):
self._emit("user_message_received", {"message": user_message})
# Step 1: Decide which tool to call
tool_choice = await self._decide_tool(user_message)
self._emit("tool_selected", {
"tool": tool_choice["name"],
"reasoning": tool_choice["reasoning"],
})
# Step 2: Execute the tool
tool_result = await self._execute_tool(tool_choice)
self._emit("tool_executed", {
"tool": tool_choice["name"],
"result": tool_result,
})
# Step 3: Generate final response
response = await self._generate_response(user_message, tool_result)
self._emit("response_generated", {
"response": response,
"tokens_used": len(response.split()),
})
return response
Every decision becomes a first-class record. You know exactly which tool was chosen, why the LLM chose it, what the tool returned, and what final answer was produced.
Building Projections
A projection reads the event stream and builds a view optimized for a specific query. For example, a "session summary" projection:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
@dataclass
class SessionSummary:
session_id: str
total_messages: int = 0
tools_used: list[str] = field(default_factory=list)
total_tokens: int = 0
errors: list[str] = field(default_factory=list)
started_at: str = ""
last_activity: str = ""
def build_session_summary(
store: EventStore, session_id: str
) -> SessionSummary:
"""Project events into a session summary."""
summary = SessionSummary(session_id=session_id)
events = store.get_events(session_id)
for event in events:
if not summary.started_at:
summary.started_at = event.timestamp
summary.last_activity = event.timestamp
if event.event_type == "user_message_received":
summary.total_messages += 1
elif event.event_type == "tool_executed":
summary.tools_used.append(event.payload["tool"])
elif event.event_type == "response_generated":
summary.total_tokens += event.payload.get("tokens_used", 0)
elif event.event_type == "error_occurred":
summary.errors.append(event.payload.get("error", ""))
return summary
Different projections can answer different questions: "Which sessions used the search tool?" or "What was the average token count per session?" — all derived from the same event stream.
Replaying for Debugging
The most powerful feature of event sourcing is replay. When an agent produces a bad result, load its event stream and step through it:
def replay_session(store: EventStore, session_id: str):
"""Replay a session step by step for debugging."""
events = store.get_events(session_id)
print(f"Replaying session {session_id} ({len(events)} events)")
for event in events:
print(f" [{event.timestamp}] {event.event_type}")
if event.event_type == "tool_selected":
print(f" Tool: {event.payload['tool']}")
print(f" Reasoning: {event.payload['reasoning']}")
elif event.event_type == "tool_executed":
result = str(event.payload["result"])[:200]
print(f" Result: {result}")
elif event.event_type == "error_occurred":
print(f" ERROR: {event.payload['error']}")
You can also build automated regression testing by replaying a session with a different LLM model and comparing the decision points.
Persistent Event Store with SQLite
For production use, persist events to a database:
import sqlite3
class SQLiteEventStore(EventStore):
def __init__(self, db_path: str):
self.conn = sqlite3.connect(db_path)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS events (
event_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
event_type TEXT NOT NULL,
timestamp TEXT NOT NULL,
payload TEXT NOT NULL,
version INTEGER NOT NULL
)
""")
self.conn.execute(
"CREATE INDEX IF NOT EXISTS idx_session ON events(session_id, version)"
)
def append(self, event: AgentEvent):
event.version = self._next_version(event.session_id)
self.conn.execute(
"INSERT INTO events VALUES (?, ?, ?, ?, ?, ?)",
(event.event_id, event.session_id, event.event_type,
event.timestamp, json.dumps(event.payload), event.version),
)
self.conn.commit()
def _next_version(self, session_id: str) -> int:
row = self.conn.execute(
"SELECT MAX(version) FROM events WHERE session_id = ?",
(session_id,),
).fetchone()
return (row[0] or 0) + 1
FAQ
How is event sourcing different from just logging agent actions?
Logging is unstructured and designed for humans to read. Event sourcing produces structured, typed events that can be replayed programmatically to reconstruct state. You can build multiple projections from the same events, run automated regression tests, and guarantee that your audit trail is complete because the events are the source of truth — not a side effect.
Does event sourcing add significant overhead to agent execution?
The overhead is minimal. Appending a small JSON event to a database or file takes microseconds compared to the seconds spent on LLM calls and tool executions. The storage cost grows linearly with the number of events, but agent sessions rarely produce more than a few hundred events, so storage is not a practical concern.
How do I handle schema evolution when event payloads change over time?
Use versioned event types. When a payload schema changes, create a new version (e.g., tool_executed_v2) and write an upcaster that converts v1 events to v2 format during replay. This ensures old events remain readable without modifying the immutable event store.
#EventSourcing #AuditTrail #Debugging #AgentArchitecture #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.