Skip to content
Learn Agentic AI12 min read0 views

Event-Driven AI Agent Architecture: Pub/Sub, Event Sourcing, and CQRS

Design event-driven AI agent systems using pub/sub messaging, event sourcing for full audit trails, and CQRS to separate read and write models for scalable, observable agent architectures.

Why Event-Driven Architecture Fits AI Agents

Traditional request-response architectures struggle with AI agent workloads. An agent might take 30 seconds to complete a complex reasoning chain, call three external tools, and produce intermediate results that other services need to react to. Blocking the caller for 30 seconds wastes resources. Polling for status is inefficient. Event-driven architecture solves this by making agent actions produce events that flow through the system asynchronously.

In an event-driven system, each AI agent publishes events describing what it did — "query received," "tool called," "reasoning step completed," "response generated." Other components subscribe to these events and react independently. The billing service tracks token usage. The monitoring service updates dashboards. The orchestrator decides whether to trigger a follow-up agent. No component waits for another.

Core Pattern: Pub/Sub with Typed Events

Define your event types as structured data contracts. Every event needs a type identifier, a timestamp, a source agent ID, and a typed payload.

from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Any
from enum import Enum
import json
import uuid

class EventType(str, Enum):
    AGENT_STARTED = "agent.started"
    TOOL_CALLED = "agent.tool_called"
    TOOL_RESULT = "agent.tool_result"
    TOKENS_GENERATED = "agent.tokens_generated"
    AGENT_COMPLETED = "agent.completed"
    AGENT_FAILED = "agent.failed"

@dataclass
class AgentEvent:
    event_type: EventType
    agent_id: str
    payload: dict[str, Any]
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )
    correlation_id: str = ""

    def to_json(self) -> str:
        return json.dumps(asdict(self))

    @classmethod
    def from_json(cls, data: str) -> "AgentEvent":
        parsed = json.loads(data)
        parsed["event_type"] = EventType(parsed["event_type"])
        return cls(**parsed)

The correlation_id ties related events together across a multi-step agent workflow. When an orchestrator triggers agent A, which triggers agent B, all events share the same correlation ID for end-to-end tracing.

Event Sourcing: The Complete Agent Audit Trail

Event sourcing stores every state change as an immutable event rather than overwriting the current state. For AI agents, this is valuable because you can replay the exact sequence of decisions an agent made — which tools it called, what results it received, and how it reasoned through each step.

from collections import defaultdict

class AgentEventStore:
    def __init__(self):
        self._events: dict[str, list[AgentEvent]] = defaultdict(list)
        self._global_log: list[AgentEvent] = []

    def append(self, event: AgentEvent):
        self._events[event.correlation_id].append(event)
        self._global_log.append(event)

    def get_run_history(self, correlation_id: str) -> list[AgentEvent]:
        return list(self._events.get(correlation_id, []))

    def replay_agent_state(self, correlation_id: str) -> dict:
        """Rebuild agent state by replaying events."""
        state = {
            "status": "unknown",
            "tools_called": [],
            "tokens_used": 0,
            "errors": [],
            "result": None,
        }

        for event in self.get_run_history(correlation_id):
            if event.event_type == EventType.AGENT_STARTED:
                state["status"] = "running"
            elif event.event_type == EventType.TOOL_CALLED:
                state["tools_called"].append(event.payload["tool_name"])
            elif event.event_type == EventType.TOKENS_GENERATED:
                state["tokens_used"] += event.payload.get("count", 0)
            elif event.event_type == EventType.AGENT_COMPLETED:
                state["status"] = "completed"
                state["result"] = event.payload.get("result")
            elif event.event_type == EventType.AGENT_FAILED:
                state["status"] = "failed"
                state["errors"].append(event.payload.get("error"))

        return state

The replay_agent_state method reconstructs the final state from the event sequence. This is powerful for debugging — you can replay a failed agent run event by event to find exactly where it went wrong.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

CQRS: Separating Reads and Writes

Command Query Responsibility Segregation (CQRS) separates the write model (event store) from the read model (materialized views optimized for queries). AI agent systems benefit from CQRS because the write path (recording agent events) and the read path (dashboard queries, analytics, billing) have very different performance characteristics.

class AgentReadModel:
    """Materialized read model, updated by event projections."""

    def __init__(self):
        self.agent_stats: dict[str, dict] = {}
        self.active_runs: dict[str, dict] = {}

    def project(self, event: AgentEvent):
        """Update read model from a single event."""
        aid = event.agent_id

        if aid not in self.agent_stats:
            self.agent_stats[aid] = {
                "total_runs": 0,
                "total_tokens": 0,
                "total_errors": 0,
                "avg_latency_ms": 0,
                "latencies": [],
            }

        stats = self.agent_stats[aid]

        if event.event_type == EventType.AGENT_STARTED:
            self.active_runs[event.correlation_id] = {
                "agent_id": aid,
                "started_at": event.timestamp,
            }
            stats["total_runs"] += 1

        elif event.event_type == EventType.TOKENS_GENERATED:
            stats["total_tokens"] += event.payload.get("count", 0)

        elif event.event_type == EventType.AGENT_COMPLETED:
            run = self.active_runs.pop(event.correlation_id, None)
            if run:
                latency = event.payload.get("duration_ms", 0)
                stats["latencies"].append(latency)
                stats["avg_latency_ms"] = (
                    sum(stats["latencies"]) / len(stats["latencies"])
                )

        elif event.event_type == EventType.AGENT_FAILED:
            stats["total_errors"] += 1
            self.active_runs.pop(event.correlation_id, None)

    def get_dashboard_data(self) -> dict:
        return {
            "agents": self.agent_stats,
            "active_run_count": len(self.active_runs),
        }

Wiring It Together

The event bus connects producers (agents) to consumers (event store, read model, billing, monitoring) without coupling them.

class EventBus:
    def __init__(self):
        self._handlers: dict[str, list] = defaultdict(list)

    def subscribe(self, event_type: str, handler):
        self._handlers[event_type].append(handler)

    def subscribe_all(self, handler):
        self._handlers["*"].append(handler)

    async def publish(self, event: AgentEvent):
        handlers = self._handlers.get(event.event_type, [])
        handlers += self._handlers.get("*", [])
        for handler in handlers:
            try:
                await handler(event)
            except Exception as e:
                print(f"Handler error: {e}")

# Setup
bus = EventBus()
store = AgentEventStore()
read_model = AgentReadModel()

bus.subscribe_all(lambda e: store.append(e))
bus.subscribe_all(lambda e: read_model.project(e))

Agents publish to the bus. The store captures everything for replay. The read model projects events into queryable state. Adding a new consumer — say, a cost tracker — requires only subscribing a new handler. No existing code changes.

FAQ

When should I use event sourcing versus a simpler state-based approach for AI agents?

Use event sourcing when you need a complete audit trail of agent decisions — regulatory compliance, debugging complex multi-step reasoning, or A/B testing different agent strategies by replaying the same events. For simple AI integrations where you only care about the final output, a state-based approach with traditional CRUD is simpler and sufficient. The overhead of event sourcing — storage growth, eventual consistency, replay complexity — is only justified when the audit trail or replay capability delivers clear value.

How do you handle event ordering in a distributed multi-agent system?

Use a combination of wall-clock timestamps and per-agent sequence numbers. Each agent maintains a monotonically increasing sequence counter for its own events. The event bus preserves per-agent ordering. For cross-agent ordering, use the correlation ID to group related events and sort by timestamp within each group. For strict global ordering (rarely needed), use a single-partition message broker or a database-backed event store with auto-incrementing IDs.

What happens when a read model projection has a bug and needs to be rebuilt?

This is one of event sourcing's biggest advantages. Since the event store contains the complete history, you deploy the fixed projection code and replay all events through it to rebuild the read model from scratch. During rebuilding, serve reads from the old (stale) read model. Once the new model catches up to the current event position, swap it in atomically. This ability to rebuild derived state from events is why event sourcing pairs naturally with CQRS.


#EventDriven #CQRS #EventSourcing #PubSub #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.