Agent-to-Agent Protocol Design: Building Interoperable Multi-Agent Communication
Design robust communication protocols for multi-agent systems including message schemas, capability advertisement, negotiation protocols, and service discovery mechanisms with practical Python implementations.
The Interoperability Problem in Multi-Agent Systems
When you build a single agent, communication is straightforward — the agent calls tools and returns results. When you build a team of agents, you face a fundamental question: how do agents talk to each other? Without a well-designed protocol, you end up with tightly coupled agents that can only work with the specific partners they were built for.
Agent-to-agent (A2A) protocol design solves this by establishing standard message formats, capability discovery, and negotiation patterns that allow any agent to communicate with any other agent — even agents built by different teams or frameworks.
Designing the Message Schema
Every inter-agent message needs a consistent structure. Here is a protocol envelope that supports request-response, streaming, and event patterns.
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Optional
import uuid
import time
class MessageType(Enum):
REQUEST = "request"
RESPONSE = "response"
EVENT = "event"
CAPABILITY_QUERY = "capability_query"
CAPABILITY_ADVERTISEMENT = "capability_advertisement"
NEGOTIATION = "negotiation"
@dataclass
class AgentMessage:
msg_type: MessageType
sender_id: str
receiver_id: str
payload: Dict[str, Any]
correlation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
reply_to: Optional[str] = None
timestamp: float = field(default_factory=time.time)
ttl_seconds: float = 30.0
protocol_version: str = "1.0"
@property
def is_expired(self) -> bool:
return (time.time() - self.timestamp) > self.ttl_seconds
def create_reply(self, payload: Dict[str, Any]) -> "AgentMessage":
return AgentMessage(
msg_type=MessageType.RESPONSE,
sender_id=self.receiver_id,
receiver_id=self.sender_id,
payload=payload,
reply_to=self.correlation_id,
)
The correlation_id ties requests to responses. The reply_to field enables threading. The ttl_seconds prevents stale messages from clogging the system.
Capability Advertisement and Discovery
Agents need to discover what other agents can do. A capability registry lets agents advertise their skills and query for agents that match specific needs.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from dataclasses import dataclass
from typing import Dict, List, Set
@dataclass
class AgentCapability:
name: str
description: str
input_schema: Dict # JSON Schema for expected input
output_schema: Dict # JSON Schema for output
cost_estimate: float # Relative cost (0.0 to 1.0)
latency_ms: int # Expected latency
class CapabilityRegistry:
def __init__(self):
self._capabilities: Dict[str, List[AgentCapability]] = {}
def register(self, agent_id: str, capabilities: List[AgentCapability]):
self._capabilities[agent_id] = capabilities
def unregister(self, agent_id: str):
self._capabilities.pop(agent_id, None)
def find_agents(
self, capability_name: str, max_cost: float = 1.0
) -> List[str]:
results = []
for agent_id, caps in self._capabilities.items():
for cap in caps:
if (
cap.name == capability_name
and cap.cost_estimate <= max_cost
):
results.append(agent_id)
return results
def get_capabilities(self, agent_id: str) -> List[AgentCapability]:
return self._capabilities.get(agent_id, [])
Negotiation Protocol
When multiple agents can handle a task, the requesting agent negotiates to find the best match. This implements a simple contract-net protocol.
import asyncio
class NegotiationProtocol:
def __init__(self, registry: CapabilityRegistry, bus: "MessageBus"):
self.registry = registry
self.bus = bus
async def request_bids(
self,
requester_id: str,
capability_name: str,
task_payload: Dict,
timeout: float = 5.0,
) -> List[Dict]:
candidates = self.registry.find_agents(capability_name)
if not candidates:
return []
bids = []
bid_requests = []
for agent_id in candidates:
msg = AgentMessage(
msg_type=MessageType.NEGOTIATION,
sender_id=requester_id,
receiver_id=agent_id,
payload={
"action": "request_bid",
"capability": capability_name,
"task": task_payload,
},
)
bid_requests.append(self.bus.send_and_wait(msg, timeout))
results = await asyncio.gather(
*bid_requests, return_exceptions=True
)
for result in results:
if isinstance(result, AgentMessage):
bids.append(result.payload)
return bids
def select_winner(self, bids: List[Dict]) -> Optional[Dict]:
valid = [b for b in bids if b.get("accepted")]
if not valid:
return None
return min(valid, key=lambda b: b.get("cost", float("inf")))
Message Bus Implementation
The message bus routes messages between agents and supports both direct addressing and publish-subscribe patterns.
class MessageBus:
def __init__(self):
self._handlers: Dict[str, asyncio.Queue] = {}
def register_agent(self, agent_id: str):
self._handlers[agent_id] = asyncio.Queue()
async def send(self, message: AgentMessage):
queue = self._handlers.get(message.receiver_id)
if queue:
await queue.put(message)
async def send_and_wait(
self, message: AgentMessage, timeout: float = 10.0
) -> Optional[AgentMessage]:
await self.send(message)
queue = self._handlers.get(message.sender_id)
if not queue:
return None
try:
while True:
reply = await asyncio.wait_for(queue.get(), timeout)
if reply.reply_to == message.correlation_id:
return reply
except asyncio.TimeoutError:
return None
async def receive(
self, agent_id: str, timeout: float = 5.0
) -> Optional[AgentMessage]:
queue = self._handlers.get(agent_id)
if not queue:
return None
try:
return await asyncio.wait_for(queue.get(), timeout)
except asyncio.TimeoutError:
return None
Putting It Together
async def demo():
bus = MessageBus()
registry = CapabilityRegistry()
# Register agents and their capabilities
bus.register_agent("summarizer")
registry.register("summarizer", [
AgentCapability("summarize", "Summarize text", {}, {}, 0.3, 2000)
])
bus.register_agent("translator")
registry.register("translator", [
AgentCapability("translate", "Translate text", {}, {}, 0.5, 3000)
])
# Discover and negotiate
negotiator = NegotiationProtocol(registry, bus)
agents = registry.find_agents("summarize")
print(f"Agents with summarize capability: {agents}")
FAQ
Why not just use HTTP REST between agents?
HTTP REST works for simple request-response patterns but lacks built-in support for capability discovery, negotiation, and message correlation. A dedicated agent protocol gives you these features plus TTL-based expiration and structured negotiation — reducing the boilerplate each agent must implement.
How does this compare to the A2A protocol from Google?
Google's Agent-to-Agent protocol focuses on web-standard interoperability using JSON-RPC over HTTP with agent cards for discovery. The patterns in this article follow similar principles but are designed for in-process or single-cluster deployments. For cross-organization interoperability, adopt the A2A standard; for internal agent teams, a lightweight custom protocol often performs better.
#AgentProtocol #A2ACommunication #MultiAgentSystems #AgentInteroperability #ProtocolDesign #AgenticAI #PythonAI #DistributedAgents
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.