Agent-to-Agent Communication: Protocols, Message Passing, and Shared State Patterns
How agents communicate in multi-agent systems using direct message passing, shared blackboard, event-driven pub/sub, and MCP-based tool sharing with production code examples.
The Communication Problem in Multi-Agent Systems
When you have a single AI agent, communication is simple: user sends a message, agent responds. The moment you add a second agent, you must answer fundamental architectural questions. How does Agent A tell Agent B to do something? How do they share data without corrupting each other's state? How do you trace a request that touches five agents?
These questions are not new — distributed systems engineering has answered them for decades with patterns like message queues, pub/sub, and shared state. But AI agents add unique wrinkles: communication is often natural language, the boundary between data and instructions blurs, and agents may need to negotiate rather than simply command.
This guide covers four communication patterns for multi-agent systems, with implementation code and trade-off analysis for each.
Pattern 1: Direct Message Passing
Direct message passing is the simplest pattern: Agent A sends a structured message directly to Agent B and waits for a response. This is the synchronous function call of agent communication.
from dataclasses import dataclass, field
from typing import Any
import asyncio
import uuid
import time
@dataclass
class AgentMessage:
sender: str
receiver: str
message_type: str # "request", "response", "notification"
payload: dict[str, Any]
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
correlation_id: str | None = None # Links request to response
timestamp: float = field(default_factory=time.time)
class MessageBus:
def __init__(self):
self.mailboxes: dict[str, asyncio.Queue] = {}
self.message_log: list[AgentMessage] = []
def register(self, agent_id: str):
self.mailboxes[agent_id] = asyncio.Queue()
async def send(self, message: AgentMessage):
self.message_log.append(message)
if message.receiver in self.mailboxes:
await self.mailboxes[message.receiver].put(message)
else:
raise ValueError(
f"Agent {message.receiver} not registered"
)
async def receive(self, agent_id: str,
timeout: float = 30.0) -> AgentMessage:
try:
return await asyncio.wait_for(
self.mailboxes[agent_id].get(), timeout=timeout
)
except asyncio.TimeoutError:
raise TimeoutError(
f"Agent {agent_id} did not receive a message "
f"within {timeout}s"
)
async def request_response(self, request: AgentMessage,
timeout: float = 30.0) -> AgentMessage:
"""Send a request and wait for the correlated response."""
await self.send(request)
while True:
response = await self.receive(
request.sender, timeout=timeout
)
if response.correlation_id == request.message_id:
return response
# Re-queue non-matching messages
await self.mailboxes[request.sender].put(response)
When to use: Small systems (under 10 agents) where communication patterns are well-known at design time. Works well for request-response interactions like "Agent A asks Agent B to look up customer data."
Trade-offs: Tight coupling between sender and receiver. Both agents must know about each other. If Agent B is down, Agent A blocks. Not suitable for broadcast communication.
Pattern 2: Shared Blackboard
The blackboard pattern uses a central shared data structure that all agents can read from and write to. Agents monitor the blackboard for changes relevant to their capabilities and contribute their results.
from dataclasses import dataclass, field
from typing import Any, Callable
import asyncio
import time
@dataclass
class BlackboardEntry:
key: str
value: Any
author: str
timestamp: float = field(default_factory=time.time)
version: int = 1
class Blackboard:
def __init__(self):
self.entries: dict[str, BlackboardEntry] = {}
self.subscribers: dict[str, list[Callable]] = {}
self._lock = asyncio.Lock()
async def write(self, key: str, value: Any, author: str):
async with self._lock:
if key in self.entries:
entry = self.entries[key]
entry.value = value
entry.author = author
entry.timestamp = time.time()
entry.version += 1
else:
self.entries[key] = BlackboardEntry(
key=key, value=value, author=author
)
entry = self.entries[key]
# Notify subscribers outside the lock
for pattern, callbacks in self.subscribers.items():
if key.startswith(pattern) or pattern == "*":
for callback in callbacks:
asyncio.create_task(callback(entry))
async def read(self, key: str) -> Any | None:
entry = self.entries.get(key)
return entry.value if entry else None
async def read_pattern(self, prefix: str) -> dict[str, Any]:
return {
k: v.value for k, v in self.entries.items()
if k.startswith(prefix)
}
def subscribe(self, pattern: str, callback: Callable):
if pattern not in self.subscribers:
self.subscribers[pattern] = []
self.subscribers[pattern].append(callback)
Here is how agents interact with the blackboard:
class ResearchAgent:
def __init__(self, blackboard: Blackboard):
self.blackboard = blackboard
self.name = "researcher"
# React when a new research request appears
blackboard.subscribe(
"research_request",
self.on_research_request,
)
async def on_research_request(self, entry: BlackboardEntry):
query = entry.value["query"]
# Perform research (simplified)
results = await self._search(query)
# Write findings back to blackboard
await self.blackboard.write(
f"research_results/{entry.key}",
{"query": query, "findings": results},
author=self.name,
)
async def _search(self, query: str) -> list[dict]:
return [{"title": f"Result for {query}", "relevance": 0.95}]
class AnalysisAgent:
def __init__(self, blackboard: Blackboard):
self.blackboard = blackboard
self.name = "analyst"
# React when research results appear
blackboard.subscribe(
"research_results",
self.on_results_available,
)
async def on_results_available(self, entry: BlackboardEntry):
findings = entry.value["findings"]
analysis = await self._analyze(findings)
await self.blackboard.write(
f"analysis/{entry.key}",
{"analysis": analysis, "source": entry.key},
author=self.name,
)
async def _analyze(self, findings: list[dict]) -> str:
return f"Analysis of {len(findings)} findings complete"
When to use: Problems where the workflow is not predetermined. Useful when multiple agents can contribute to a solution independently and the order of contributions does not matter.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Trade-offs: Can become chaotic with many agents writing to the same keys. Requires careful key naming conventions and conflict resolution. Harder to trace the flow of execution compared to direct message passing.
Pattern 3: Event-Driven Pub/Sub
Publish-subscribe decouples senders from receivers entirely. Agents publish events to topics, and any agent subscribed to that topic receives the event. This is the pattern of choice for large, evolving systems.
from dataclasses import dataclass, field
from typing import Any, Callable, Awaitable
import asyncio
import time
@dataclass
class Event:
topic: str
payload: dict[str, Any]
source: str
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: float = field(default_factory=time.time)
class EventBus:
def __init__(self):
self.subscriptions: dict[str, list[Callable]] = {}
self.event_log: list[Event] = []
self.dead_letter: list[tuple[Event, str]] = []
def subscribe(self, topic: str,
handler: Callable[[Event], Awaitable[None]]):
if topic not in self.subscriptions:
self.subscriptions[topic] = []
self.subscriptions[topic].append(handler)
async def publish(self, event: Event):
self.event_log.append(event)
handlers = self.subscriptions.get(event.topic, [])
if not handlers:
self.dead_letter.append((event, "no_subscribers"))
return
tasks = [handler(event) for handler in handlers]
results = await asyncio.gather(
*tasks, return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
self.dead_letter.append(
(event, f"handler_{i}_error: {result}")
)
async def replay(self, topic: str, since: float):
"""Replay events from a point in time for recovery."""
events = [
e for e in self.event_log
if e.topic == topic and e.timestamp >= since
]
for event in events:
await self.publish(event)
When to use: Systems with 10+ agents that need loose coupling. Agents can be added or removed without modifying existing agents. Ideal for event-driven workflows like order processing, incident response, and data pipelines.
Trade-offs: Harder to debug because there is no single execution path. Requires a dead letter queue for undelivered or failed events. Eventual consistency — agents may see events in different orders.
Pattern 4: MCP-Based Tool Sharing
The Model Context Protocol (MCP) enables agents to expose their capabilities as tools that other agents can discover and invoke. Rather than communicating through messages, agents share functionality.
// Agent A exposes a tool via MCP server
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
const server = new McpServer({
name: "customer-data-agent",
version: "1.0.0",
});
server.tool(
"lookup_customer",
"Look up customer details by email or ID",
{
identifier: z.string().describe("Email or customer ID"),
fields: z.array(z.string()).optional()
.describe("Specific fields to return"),
},
async ({ identifier, fields }) => {
const customer = await db.customers.findOne(identifier);
const result = fields
? Object.fromEntries(
fields.map((f) => [f, customer[f]])
)
: customer;
return {
content: [{ type: "text", text: JSON.stringify(result) }],
};
}
);
Other agents connect to this MCP server and invoke the tool as if it were a local function:
from agents import Agent
from agents.mcp import MCPServerStdio
# Agent B connects to Agent A's tools via MCP
customer_data_mcp = MCPServerStdio(
name="customer-data",
command="node",
args=["customer_data_agent.js"],
)
billing_agent = Agent(
name="Billing Agent",
instructions="Handle billing queries using customer data tools.",
mcp_servers=[customer_data_mcp],
)
When to use: When agents are developed by different teams or need to share capabilities across organizational boundaries. MCP provides a standard interface that works regardless of the underlying agent framework.
Trade-offs: Adds serialization overhead for each tool call. Requires running MCP servers alongside agents. Best for coarse-grained capabilities, not high-frequency inter-agent chatter.
Choosing the Right Pattern
| Pattern | Best For | Coupling | Scalability | Debuggability |
|---|---|---|---|---|
| Direct Message | Small teams, request-response | High | Low | High |
| Blackboard | Emergent workflows | Medium | Medium | Medium |
| Pub/Sub | Large systems, event-driven | Low | High | Low |
| MCP Tools | Cross-team, capability sharing | Low | High | High |
Most production systems combine patterns. A common architecture uses pub/sub for inter-service events, direct messages for synchronous requests within a service, and MCP for exposing capabilities to external systems.
FAQ
How do you prevent message storms in pub/sub systems?
Implement rate limiting at the publisher level and backpressure at the subscriber level. Use exponential backoff for retry logic. Set TTL (time-to-live) on events so stale events are automatically discarded. Monitor event throughput per topic and alert on anomalies.
Can agents communicate in natural language or should messages be structured?
Use structured messages (JSON schemas) for all inter-agent communication. Natural language adds ambiguity and makes the system non-deterministic. Reserve natural language for the agent-to-human interface. Between agents, well-defined schemas eliminate an entire class of misinterpretation bugs.
How do you handle ordering guarantees in async communication?
For events that must be processed in order, use a single-partition topic or include a sequence number in the event payload. The receiving agent buffers out-of-order events and processes them sequentially. For events where order does not matter, prefer unordered delivery for better throughput and simpler implementation.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.