NATS Messaging for AI Agent Communication: Lightweight, High-Performance Message Passing
Build AI agent communication systems using NATS messaging, covering subject-based routing, queue groups for load balancing, request/reply patterns, and JetStream for persistent message delivery.
Why NATS for AI Agent Messaging
NATS is a lightweight, high-performance messaging system designed for cloud-native applications. Where Redis Pub/Sub gives you basic publish-subscribe over a data store you might already have, NATS is purpose-built for messaging with features that matter for distributed AI agents: subject-based routing with wildcards, built-in request/reply, queue groups for load balancing, and JetStream for persistent messaging.
A single NATS server handles millions of messages per second with sub-millisecond latency. The protocol is text-based and simple enough that you could implement a client with raw sockets. For AI agent systems where you need fast coordination between many agent instances, NATS provides the messaging backbone without the operational complexity of Kafka or RabbitMQ.
Subject-Based Routing
NATS organizes messages by subjects — dot-separated hierarchical names. Subscribers can use wildcards to match multiple subjects, enabling flexible routing without explicit channel management.
import nats
from nats.aio.client import Client as NATS
import json
from dataclasses import dataclass, asdict
# Subject naming convention:
# agent.{agent_id}.events — events from a specific agent
# agent.{agent_id}.commands — commands to a specific agent
# agent.*.events — all agent events (wildcard)
# agent.> — everything under agent. (full wildcard)
# tasks.{skill}.assign — task assignments by skill
# results.{task_id} — results for a specific task
@dataclass
class AgentMessage:
sender: str
msg_type: str
payload: dict
def encode(self) -> bytes:
return json.dumps(asdict(self)).encode()
@classmethod
def decode(cls, data: bytes) -> "AgentMessage":
return cls(**json.loads(data.decode()))
class NATSAgentBus:
def __init__(self, agent_id: str, nats_url: str = "nats://localhost:4222"):
self.agent_id = agent_id
self.nats_url = nats_url
self.nc: NATS = NATS()
async def connect(self):
await self.nc.connect(
servers=[self.nats_url],
reconnected_cb=self._on_reconnect,
disconnected_cb=self._on_disconnect,
max_reconnect_attempts=-1, # Reconnect forever
)
async def _on_reconnect(self):
print(f"Agent {self.agent_id}: Reconnected to NATS")
async def _on_disconnect(self):
print(f"Agent {self.agent_id}: Disconnected from NATS")
async def publish(self, subject: str, msg_type: str, payload: dict):
msg = AgentMessage(
sender=self.agent_id,
msg_type=msg_type,
payload=payload,
)
await self.nc.publish(subject, msg.encode())
async def subscribe(self, subject: str, handler):
async def wrapper(nats_msg):
agent_msg = AgentMessage.decode(nats_msg.data)
await handler(agent_msg, nats_msg)
return await self.nc.subscribe(subject, cb=wrapper)
async def close(self):
await self.nc.close()
The wildcard * matches a single token (e.g., agent.*.events matches agent.search.events but not agent.search.sub.events). The > wildcard matches one or more tokens (e.g., agent.> matches everything starting with agent.).
Queue Groups for Load Balancing
When multiple agent workers subscribe to the same subject using a queue group, NATS delivers each message to only one member of the group. This gives you competing consumer semantics without any additional infrastructure.
class AgentWorkerPool:
"""Workers in the same queue group receive messages round-robin."""
def __init__(self, bus: NATSAgentBus, skill: str):
self.bus = bus
self.skill = skill
self.queue_group = f"workers-{skill}"
async def start(self):
subject = f"tasks.{self.skill}.assign"
async def handle_task(msg: AgentMessage, nats_msg):
print(f"Worker {self.bus.agent_id} processing: {msg.payload}")
result = await self._execute_task(msg.payload)
# Reply directly to the requester
if nats_msg.reply:
response = AgentMessage(
sender=self.bus.agent_id,
msg_type="task_result",
payload={"result": result},
)
await self.bus.nc.publish(nats_msg.reply, response.encode())
await self.bus.nc.subscribe(
subject,
queue=self.queue_group,
cb=lambda m: handle_task(AgentMessage.decode(m.data), m),
)
async def _execute_task(self, task: dict) -> str:
# AI agent processing here
return f"Completed: {task.get('prompt', '')}"
# Start 3 workers — NATS round-robins tasks between them
async def main():
workers = []
for i in range(3):
bus = NATSAgentBus(agent_id=f"worker-{i}")
await bus.connect()
pool = AgentWorkerPool(bus, skill="summarize")
await pool.start()
workers.append(bus)
No locks, no claim logic, no external coordination. NATS handles the distribution internally. If a worker disconnects, NATS automatically removes it from the group and redistributes messages to remaining members.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Request/Reply Pattern
NATS has built-in request/reply support. The requester publishes a message with an auto-generated reply subject, and the responder publishes to that subject. This creates a synchronous-feeling RPC pattern over asynchronous messaging.
class AgentOrchestrator:
def __init__(self, bus: NATSAgentBus):
self.bus = bus
async def request_analysis(self, text: str, timeout: float = 10.0) -> dict:
msg = AgentMessage(
sender=self.bus.agent_id,
msg_type="analyze_request",
payload={"text": text},
)
try:
response = await self.bus.nc.request(
"tasks.analysis.assign",
msg.encode(),
timeout=timeout,
)
result = AgentMessage.decode(response.data)
return result.payload
except nats.errors.TimeoutError:
return {"error": "No analysis worker responded in time"}
async def fan_out_and_collect(
self, prompts: list[str], timeout: float = 15.0
) -> list[dict]:
"""Send multiple requests in parallel, collect all results."""
import asyncio
tasks = [
self.bus.nc.request(
"tasks.analysis.assign",
AgentMessage(
sender=self.bus.agent_id,
msg_type="analyze_request",
payload={"text": p},
).encode(),
timeout=timeout,
)
for p in prompts
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
AgentMessage.decode(r.data).payload
if not isinstance(r, Exception)
else {"error": str(r)}
for r in results
]
JetStream for Persistent Messaging
Core NATS is fire-and-forget. JetStream adds persistence, acknowledgments, and replay — essential for AI agent events that must not be lost.
from nats.js.api import StreamConfig, ConsumerConfig, DeliverPolicy
async def setup_jetstream(nc: NATS):
js = nc.jetstream()
# Create a stream that captures all agent events
await js.add_stream(
StreamConfig(
name="AGENT_EVENTS",
subjects=["agent.*.events", "agent.*.results"],
retention="limits",
max_msgs=1_000_000,
max_age=86400 * 7, # 7 days
)
)
return js
async def publish_durable_event(js, agent_id: str, event: dict):
data = json.dumps(event).encode()
ack = await js.publish(f"agent.{agent_id}.events", data)
return ack.seq # Sequence number for ordering
async def consume_events(js, consumer_name: str):
sub = await js.pull_subscribe(
"agent.*.events",
durable=consumer_name, # Survives consumer restarts
)
while True:
try:
messages = await sub.fetch(batch=10, timeout=5)
for msg in messages:
event = json.loads(msg.data.decode())
await process_event(event)
await msg.ack()
except nats.errors.TimeoutError:
continue # No messages available
JetStream consumers track their position in the stream. If a consumer restarts, it resumes from where it left off — no messages are missed and no duplicates are delivered (with exactly-once semantics enabled).
FAQ
How does NATS compare to Kafka for AI agent messaging?
NATS is simpler to operate, faster for small messages, and easier to embed in agent architectures. It excels at request/reply patterns and real-time coordination. Kafka is better when you need strong ordering guarantees, very high throughput for large event streams, and long-term event retention for data pipelines. For most AI agent systems with fewer than 100 agents and sub-second latency requirements, NATS with JetStream provides all needed capabilities with significantly less operational overhead.
Can NATS queue groups guarantee exactly-once processing?
Core NATS queue groups provide at-most-once delivery — if the message is delivered to a worker that crashes before processing, the message is lost. JetStream queue groups with acknowledgments provide at-least-once delivery — if a worker crashes, the unacknowledged message is redelivered to another group member. For exactly-once semantics, JetStream supports message deduplication using a Nats-Msg-Id header, combined with idempotent processing on the consumer side.
What is the maximum message size in NATS, and how does that affect AI payloads?
The default maximum message size in NATS is 1MB, configurable up to 64MB. For AI agents, this is rarely a bottleneck since coordination messages and even full LLM responses are typically well under 1MB. For larger payloads like document embeddings or image data, store the data in an object store (NATS has a built-in object store in JetStream) and pass a reference in the message. This keeps messages small and the messaging layer fast.
#NATS #Messaging #DistributedSystems #RealTimeAI #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.