Skip to content
Learn Agentic AI11 min read0 views

Event Sourcing for AI Agents: Recording Every Decision for Replay and Audit

Implement event sourcing for AI agent systems to create a complete audit trail of every decision, enable workflow replay for debugging, and build projections that reconstruct agent state from event history.

Why Event Sourcing Fits AI Agents

Traditional state-based systems store only the current state. When an AI agent makes a series of tool calls, reasoning steps, and decisions, you only see the final output. But debugging an agent that produced a wrong answer requires understanding the full chain: which tools were called, what data came back, how the LLM interpreted the results, and where the reasoning went wrong.

Event sourcing solves this by storing every state change as an immutable event. Instead of updating a "current state" record, you append events. The current state is always derivable by replaying the event stream from the beginning.

Designing the Event Store

The event store is an append-only log. Each event carries a timestamp, a type, the agent session it belongs to, and a payload:

import uuid
import json
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
from typing import Any

@dataclass
class AgentEvent:
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    session_id: str = ""
    event_type: str = ""
    timestamp: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )
    payload: dict[str, Any] = field(default_factory=dict)
    version: int = 0

class EventStore:
    """Append-only event store for agent sessions."""

    def __init__(self):
        self._events: list[AgentEvent] = []

    def append(self, event: AgentEvent):
        event.version = len(self._events) + 1
        self._events.append(event)

    def get_events(
        self, session_id: str, after_version: int = 0
    ) -> list[AgentEvent]:
        return [
            e for e in self._events
            if e.session_id == session_id and e.version > after_version
        ]

    def get_all_events(self) -> list[AgentEvent]:
        return list(self._events)

Recording Agent Decisions

Wrap your agent execution to emit events at each decision point:

class EventSourcedAgent:
    """Agent that records every action as an event."""

    def __init__(self, session_id: str, store: EventStore):
        self.session_id = session_id
        self.store = store

    def _emit(self, event_type: str, payload: dict):
        self.store.append(AgentEvent(
            session_id=self.session_id,
            event_type=event_type,
            payload=payload,
        ))

    async def run(self, user_message: str):
        self._emit("user_message_received", {"message": user_message})

        # Step 1: Decide which tool to call
        tool_choice = await self._decide_tool(user_message)
        self._emit("tool_selected", {
            "tool": tool_choice["name"],
            "reasoning": tool_choice["reasoning"],
        })

        # Step 2: Execute the tool
        tool_result = await self._execute_tool(tool_choice)
        self._emit("tool_executed", {
            "tool": tool_choice["name"],
            "result": tool_result,
        })

        # Step 3: Generate final response
        response = await self._generate_response(user_message, tool_result)
        self._emit("response_generated", {
            "response": response,
            "tokens_used": len(response.split()),
        })

        return response

Every decision becomes a first-class record. You know exactly which tool was chosen, why the LLM chose it, what the tool returned, and what final answer was produced.

Building Projections

A projection reads the event stream and builds a view optimized for a specific query. For example, a "session summary" projection:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

@dataclass
class SessionSummary:
    session_id: str
    total_messages: int = 0
    tools_used: list[str] = field(default_factory=list)
    total_tokens: int = 0
    errors: list[str] = field(default_factory=list)
    started_at: str = ""
    last_activity: str = ""

def build_session_summary(
    store: EventStore, session_id: str
) -> SessionSummary:
    """Project events into a session summary."""
    summary = SessionSummary(session_id=session_id)
    events = store.get_events(session_id)

    for event in events:
        if not summary.started_at:
            summary.started_at = event.timestamp
        summary.last_activity = event.timestamp

        if event.event_type == "user_message_received":
            summary.total_messages += 1
        elif event.event_type == "tool_executed":
            summary.tools_used.append(event.payload["tool"])
        elif event.event_type == "response_generated":
            summary.total_tokens += event.payload.get("tokens_used", 0)
        elif event.event_type == "error_occurred":
            summary.errors.append(event.payload.get("error", ""))

    return summary

Different projections can answer different questions: "Which sessions used the search tool?" or "What was the average token count per session?" — all derived from the same event stream.

Replaying for Debugging

The most powerful feature of event sourcing is replay. When an agent produces a bad result, load its event stream and step through it:

def replay_session(store: EventStore, session_id: str):
    """Replay a session step by step for debugging."""
    events = store.get_events(session_id)
    print(f"Replaying session {session_id} ({len(events)} events)")

    for event in events:
        print(f"  [{event.timestamp}] {event.event_type}")
        if event.event_type == "tool_selected":
            print(f"    Tool: {event.payload['tool']}")
            print(f"    Reasoning: {event.payload['reasoning']}")
        elif event.event_type == "tool_executed":
            result = str(event.payload["result"])[:200]
            print(f"    Result: {result}")
        elif event.event_type == "error_occurred":
            print(f"    ERROR: {event.payload['error']}")

You can also build automated regression testing by replaying a session with a different LLM model and comparing the decision points.

Persistent Event Store with SQLite

For production use, persist events to a database:

import sqlite3

class SQLiteEventStore(EventStore):
    def __init__(self, db_path: str):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS events (
                event_id TEXT PRIMARY KEY,
                session_id TEXT NOT NULL,
                event_type TEXT NOT NULL,
                timestamp TEXT NOT NULL,
                payload TEXT NOT NULL,
                version INTEGER NOT NULL
            )
        """)
        self.conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_session ON events(session_id, version)"
        )

    def append(self, event: AgentEvent):
        event.version = self._next_version(event.session_id)
        self.conn.execute(
            "INSERT INTO events VALUES (?, ?, ?, ?, ?, ?)",
            (event.event_id, event.session_id, event.event_type,
             event.timestamp, json.dumps(event.payload), event.version),
        )
        self.conn.commit()

    def _next_version(self, session_id: str) -> int:
        row = self.conn.execute(
            "SELECT MAX(version) FROM events WHERE session_id = ?",
            (session_id,),
        ).fetchone()
        return (row[0] or 0) + 1

FAQ

How is event sourcing different from just logging agent actions?

Logging is unstructured and designed for humans to read. Event sourcing produces structured, typed events that can be replayed programmatically to reconstruct state. You can build multiple projections from the same events, run automated regression tests, and guarantee that your audit trail is complete because the events are the source of truth — not a side effect.

Does event sourcing add significant overhead to agent execution?

The overhead is minimal. Appending a small JSON event to a database or file takes microseconds compared to the seconds spent on LLM calls and tool executions. The storage cost grows linearly with the number of events, but agent sessions rarely produce more than a few hundred events, so storage is not a practical concern.

How do I handle schema evolution when event payloads change over time?

Use versioned event types. When a payload schema changes, create a new version (e.g., tool_executed_v2) and write an upcaster that converts v1 events to v2 format during replay. This ensures old events remain readable without modifying the immutable event store.


#EventSourcing #AuditTrail #Debugging #AgentArchitecture #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.