The Observer Pattern for AI Agents: Event-Driven Agent Communication
Implement the Observer pattern with an event bus for AI agent systems — enabling decoupled, publish-subscribe communication between agents for flexible coordination.
Why Event-Driven Communication?
In tightly coupled agent systems, Agent A calls Agent B directly, which calls Agent C. This creates rigid dependencies — changing one agent requires updating all agents that reference it. The Observer pattern eliminates this coupling by introducing an event bus. Agents publish events when something happens and subscribe to events they care about. No agent needs to know about any other agent's existence.
This decoupling makes the system easier to extend (add new agents without modifying existing ones), test (test each agent in isolation), and debug (trace events through the bus).
Building the Event Bus
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Any
from collections import defaultdict
import asyncio
import uuid
@dataclass
class Event:
type: str
payload: Any
source: str
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
EventHandler = Callable[[Event], Any]
class EventBus:
def __init__(self):
self._subscribers: dict[str, list[tuple[str, EventHandler]]] = (
defaultdict(list)
)
self._event_log: list[Event] = []
def subscribe(self, event_type: str, handler: EventHandler,
subscriber_name: str = "anonymous"):
self._subscribers[event_type].append(
(subscriber_name, handler)
)
print(f"{subscriber_name} subscribed to '{event_type}'")
def unsubscribe(self, event_type: str, subscriber_name: str):
self._subscribers[event_type] = [
(name, h) for name, h in self._subscribers[event_type]
if name != subscriber_name
]
def publish(self, event: Event):
self._event_log.append(event)
handlers = self._subscribers.get(event.type, [])
print(f"Event '{event.type}' from {event.source} -> "
f"{len(handlers)} subscribers")
for name, handler in handlers:
try:
handler(event)
except Exception as e:
print(f"Handler {name} failed for "
f"{event.type}: {e}")
def get_event_history(
self, event_type: str | None = None
) -> list[Event]:
if event_type:
return [e for e in self._event_log
if e.type == event_type]
return self._event_log.copy()
Creating Observer Agents
Each agent subscribes to events it cares about and publishes events when it completes work:
import openai
client = openai.OpenAI()
class AnalysisAgent:
def __init__(self, bus: EventBus):
self.bus = bus
bus.subscribe("document.uploaded", self.on_document,
"AnalysisAgent")
def on_document(self, event: Event):
text = event.payload["text"]
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system",
"content": "Extract key topics as a JSON list."},
{"role": "user", "content": text},
],
response_format={"type": "json_object"},
)
import json
topics = json.loads(response.choices[0].message.content)
self.bus.publish(Event(
type="analysis.completed",
payload={"topics": topics,
"document_id": event.payload["document_id"]},
source="AnalysisAgent",
))
class NotificationAgent:
def __init__(self, bus: EventBus):
self.bus = bus
bus.subscribe("analysis.completed", self.on_analysis,
"NotificationAgent")
bus.subscribe("error.occurred", self.on_error,
"NotificationAgent")
def on_analysis(self, event: Event):
doc_id = event.payload["document_id"]
print(f"Notifying user: analysis complete for {doc_id}")
def on_error(self, event: Event):
print(f"ALERT: Error from {event.source}: "
f"{event.payload['message']}")
class LoggingAgent:
def __init__(self, bus: EventBus):
self.bus = bus
# Subscribe to all event types we want to log
for event_type in ["document.uploaded",
"analysis.completed",
"error.occurred"]:
bus.subscribe(event_type, self.log_event,
"LoggingAgent")
def log_event(self, event: Event):
print(f"[LOG] {event.timestamp} | {event.type} | "
f"{event.source} | {event.event_id}")
Putting It Together
bus = EventBus()
# Initialize agents — they self-register with the bus
analysis = AnalysisAgent(bus)
notifications = NotificationAgent(bus)
logging_agent = LoggingAgent(bus)
# Trigger the chain by publishing a document upload event
bus.publish(Event(
type="document.uploaded",
payload={
"document_id": "doc-123",
"text": "AI agents are transforming enterprise software..."
},
source="UploadService",
))
# This triggers: AnalysisAgent -> publishes analysis.completed
# -> NotificationAgent receives it
# -> LoggingAgent logs everything
Benefits of Decoupled Communication
Adding a new agent — say, a StorageAgent that archives analysis results — requires zero changes to existing agents. You simply create the new agent, subscribe it to analysis.completed, and it starts receiving events. This extensibility is what makes the Observer pattern valuable at scale.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
FAQ
How do I prevent event storms where one event triggers a cascade that overwhelms the system?
Implement event deduplication using the event_id field and a seen-events set. You can also add rate limiting to the event bus by tracking events per second per type and dropping or queuing events that exceed the threshold. Additionally, avoid circular event chains where Event A triggers Event B which triggers Event A.
Should I use an in-memory event bus or a message broker like Redis or RabbitMQ?
For a single-process agent system, an in-memory bus is simpler and faster. For distributed systems with agents running in separate containers or machines, use Redis Pub/Sub or RabbitMQ. The interface stays the same — only the transport layer changes.
How do I handle events that need to be processed in order?
Add a sequence number to events from the same source and buffer events in subscribers that require ordering. Process buffered events only when all preceding sequence numbers have arrived. For most agent workloads, however, event ordering is not critical.
#AgentDesignPatterns #ObserverPattern #EventDriven #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.