Skip to content
Learn Agentic AI10 min read0 views

The Observer Pattern for AI Agents: Event-Driven Agent Communication

Implement the Observer pattern with an event bus for AI agent systems — enabling decoupled, publish-subscribe communication between agents for flexible coordination.

Why Event-Driven Communication?

In tightly coupled agent systems, Agent A calls Agent B directly, which calls Agent C. This creates rigid dependencies — changing one agent requires updating all agents that reference it. The Observer pattern eliminates this coupling by introducing an event bus. Agents publish events when something happens and subscribe to events they care about. No agent needs to know about any other agent's existence.

This decoupling makes the system easier to extend (add new agents without modifying existing ones), test (test each agent in isolation), and debug (trace events through the bus).

Building the Event Bus

from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Any
from collections import defaultdict
import asyncio
import uuid


@dataclass
class Event:
    type: str
    payload: Any
    source: str
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = field(default_factory=datetime.now)


EventHandler = Callable[[Event], Any]


class EventBus:
    def __init__(self):
        self._subscribers: dict[str, list[tuple[str, EventHandler]]] = (
            defaultdict(list)
        )
        self._event_log: list[Event] = []

    def subscribe(self, event_type: str, handler: EventHandler,
                  subscriber_name: str = "anonymous"):
        self._subscribers[event_type].append(
            (subscriber_name, handler)
        )
        print(f"{subscriber_name} subscribed to '{event_type}'")

    def unsubscribe(self, event_type: str, subscriber_name: str):
        self._subscribers[event_type] = [
            (name, h) for name, h in self._subscribers[event_type]
            if name != subscriber_name
        ]

    def publish(self, event: Event):
        self._event_log.append(event)
        handlers = self._subscribers.get(event.type, [])
        print(f"Event '{event.type}' from {event.source} -> "
              f"{len(handlers)} subscribers")
        for name, handler in handlers:
            try:
                handler(event)
            except Exception as e:
                print(f"Handler {name} failed for "
                      f"{event.type}: {e}")

    def get_event_history(
        self, event_type: str | None = None
    ) -> list[Event]:
        if event_type:
            return [e for e in self._event_log
                    if e.type == event_type]
        return self._event_log.copy()

Creating Observer Agents

Each agent subscribes to events it cares about and publishes events when it completes work:

import openai

client = openai.OpenAI()


class AnalysisAgent:
    def __init__(self, bus: EventBus):
        self.bus = bus
        bus.subscribe("document.uploaded", self.on_document,
                      "AnalysisAgent")

    def on_document(self, event: Event):
        text = event.payload["text"]
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system",
                 "content": "Extract key topics as a JSON list."},
                {"role": "user", "content": text},
            ],
            response_format={"type": "json_object"},
        )
        import json
        topics = json.loads(response.choices[0].message.content)

        self.bus.publish(Event(
            type="analysis.completed",
            payload={"topics": topics,
                     "document_id": event.payload["document_id"]},
            source="AnalysisAgent",
        ))


class NotificationAgent:
    def __init__(self, bus: EventBus):
        self.bus = bus
        bus.subscribe("analysis.completed", self.on_analysis,
                      "NotificationAgent")
        bus.subscribe("error.occurred", self.on_error,
                      "NotificationAgent")

    def on_analysis(self, event: Event):
        doc_id = event.payload["document_id"]
        print(f"Notifying user: analysis complete for {doc_id}")

    def on_error(self, event: Event):
        print(f"ALERT: Error from {event.source}: "
              f"{event.payload['message']}")


class LoggingAgent:
    def __init__(self, bus: EventBus):
        self.bus = bus
        # Subscribe to all event types we want to log
        for event_type in ["document.uploaded",
                           "analysis.completed",
                           "error.occurred"]:
            bus.subscribe(event_type, self.log_event,
                          "LoggingAgent")

    def log_event(self, event: Event):
        print(f"[LOG] {event.timestamp} | {event.type} | "
              f"{event.source} | {event.event_id}")

Putting It Together

bus = EventBus()

# Initialize agents — they self-register with the bus
analysis = AnalysisAgent(bus)
notifications = NotificationAgent(bus)
logging_agent = LoggingAgent(bus)

# Trigger the chain by publishing a document upload event
bus.publish(Event(
    type="document.uploaded",
    payload={
        "document_id": "doc-123",
        "text": "AI agents are transforming enterprise software..."
    },
    source="UploadService",
))
# This triggers: AnalysisAgent -> publishes analysis.completed
#                -> NotificationAgent receives it
#                -> LoggingAgent logs everything

Benefits of Decoupled Communication

Adding a new agent — say, a StorageAgent that archives analysis results — requires zero changes to existing agents. You simply create the new agent, subscribe it to analysis.completed, and it starts receiving events. This extensibility is what makes the Observer pattern valuable at scale.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

FAQ

How do I prevent event storms where one event triggers a cascade that overwhelms the system?

Implement event deduplication using the event_id field and a seen-events set. You can also add rate limiting to the event bus by tracking events per second per type and dropping or queuing events that exceed the threshold. Additionally, avoid circular event chains where Event A triggers Event B which triggers Event A.

Should I use an in-memory event bus or a message broker like Redis or RabbitMQ?

For a single-process agent system, an in-memory bus is simpler and faster. For distributed systems with agents running in separate containers or machines, use Redis Pub/Sub or RabbitMQ. The interface stays the same — only the transport layer changes.

How do I handle events that need to be processed in order?

Add a sequence number to events from the same source and buffer events in subscribers that require ordering. Process buffered events only when all preceding sequence numbers have arrived. For most agent workloads, however, event ordering is not critical.


#AgentDesignPatterns #ObserverPattern #EventDriven #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.