Agentic AI with Message Queues: NATS, Kafka, and RabbitMQ Patterns
Compare NATS, Kafka, and RabbitMQ for agentic AI workloads. Learn async tool execution, event-driven agents, and dead letter queue patterns.
Why Message Queues Are Essential for Production Agent Systems
A simple agentic AI prototype calls the LLM synchronously, executes tools inline, and returns the result. This works for demos. In production, it falls apart.
Consider what happens when a triage agent needs to call a tool that takes 30 seconds (a CRM lookup, a database report, an API that rate-limits). The user's WebSocket connection hangs. The HTTP request times out. The agent pod holds a thread doing nothing but waiting.
Message queues decouple agent logic from tool execution, enable event-driven agent triggers, distribute work across agent replicas, and provide durability for operations that must not be lost. At CallSphere, we use NATS JetStream as our primary message backbone for multi-agent communication, and this guide covers the patterns we have found most effective.
Pattern 1: Async Tool Execution
The most immediately useful pattern is decoupling tool execution from the agent conversation loop.
Architecture
User Message
|
v
[Triage Agent] --publishes--> [tools.execute.{tool_name}]
| |
| (continues thinking) v
| [Tool Worker Pool]
| |
| publishes result to
| [tools.result.{conversation_id}]
| |
v <-- subscribes ------------------+
[Triage Agent resumes with tool result]
The agent publishes a tool execution request to a subject, continues processing other conversations (or yields the thread), and picks up the result when the tool worker publishes it back.
NATS JetStream Implementation
import nats
from nats.js.api import StreamConfig, ConsumerConfig, DeliverPolicy
async def setup_tool_streams(nc):
js = nc.jetstream()
# Stream for tool execution requests
await js.add_stream(
StreamConfig(
name="TOOL_REQUESTS",
subjects=["tools.execute.*"],
retention="workqueue",
max_age=300_000_000_000, # 5 minutes TTL
storage="memory",
)
)
# Stream for tool results
await js.add_stream(
StreamConfig(
name="TOOL_RESULTS",
subjects=["tools.result.*"],
retention="interest",
max_age=60_000_000_000, # 1 minute TTL
storage="memory",
)
)
async def request_tool_execution(js, conversation_id, tool_name, tool_input):
"""Agent publishes a tool execution request."""
payload = json.dumps({
"conversation_id": conversation_id,
"tool_name": tool_name,
"tool_input": tool_input,
"requested_at": datetime.utcnow().isoformat(),
}).encode()
await js.publish(
f"tools.execute.{tool_name}",
payload,
headers={"Nats-Msg-Id": f"{conversation_id}-{tool_name}-{uuid4()}"},
)
async def tool_worker(js, tool_name, executor_fn):
"""Worker that processes tool execution requests."""
sub = await js.pull_subscribe(
f"tools.execute.{tool_name}",
durable=f"worker-{tool_name}",
config=ConsumerConfig(
ack_wait=30,
max_deliver=3,
),
)
while True:
try:
msgs = await sub.fetch(batch=1, timeout=5)
for msg in msgs:
request = json.loads(msg.data)
try:
result = await executor_fn(request["tool_input"])
await js.publish(
f"tools.result.{request['conversation_id']}",
json.dumps({"status": "success", "result": result}).encode(),
)
await msg.ack()
except Exception as e:
await js.publish(
f"tools.result.{request['conversation_id']}",
json.dumps({"status": "error", "error": str(e)}).encode(),
)
await msg.nak(delay=5)
except nats.errors.TimeoutError:
continue
Why Workqueue Retention?
For tool requests, we use workqueue retention — once a message is acknowledged by a consumer, it is removed from the stream. Tool requests are tasks, not events. You want exactly-once processing, not replay capability.
For tool results, we use interest-based retention — results stay in the stream as long as there are active consumers interested in them, then get cleaned up. The agent subscribes to its specific conversation_id subject, receives the result, and the message is eligible for cleanup.
Pattern 2: Event-Driven Agent Triggers
Instead of a monolithic orchestrator deciding which agent to invoke, publish events and let agents subscribe to the events they care about.
# Events that agents subscribe to
EVENT_SUBJECTS = {
"conversation.started": ["triage_agent"],
"conversation.escalated": ["supervisor_agent"],
"intent.identified.billing": ["billing_agent"],
"intent.identified.support": ["support_agent"],
"intent.identified.sales": ["sales_agent"],
"tool.result.payment_processed": ["billing_agent", "notification_agent"],
"agent.handoff.requested": ["triage_agent"],
}
async def publish_event(js, event_type, payload):
"""Publish a domain event."""
await js.publish(
f"events.{event_type}",
json.dumps({
"event_type": event_type,
"payload": payload,
"timestamp": datetime.utcnow().isoformat(),
}).encode(),
)
# When intent is identified, the relevant agent wakes up
await publish_event(js, "intent.identified.billing", {
"conversation_id": conv_id,
"user_message": "I need to update my payment method",
"confidence": 0.94,
})
This pattern eliminates the central orchestrator bottleneck. Adding a new agent type means deploying a new service that subscribes to the relevant events — zero changes to existing agents.
Pattern 3: Distributed Agent Coordination
When multiple agents need to collaborate on a single task (e.g., a complex customer request that touches billing, support, and scheduling), use a saga pattern coordinated through message queues.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
class AgentSaga:
"""Coordinates multi-agent collaboration via message queue."""
SAGA_STEPS = [
{"agent": "billing_agent", "action": "check_account_status"},
{"agent": "support_agent", "action": "lookup_ticket_history"},
{"agent": "scheduling_agent", "action": "find_available_slots"},
]
async def execute(self, js, conversation_id, context):
saga_id = str(uuid4())
results = {}
for step in self.SAGA_STEPS:
# Publish step request
await js.publish(
f"saga.{saga_id}.{step['agent']}",
json.dumps({
"saga_id": saga_id,
"conversation_id": conversation_id,
"action": step["action"],
"context": context,
"prior_results": results,
}).encode(),
)
# Wait for step result with timeout
sub = await js.subscribe(f"saga.{saga_id}.{step['agent']}.result")
try:
msg = await sub.next_msg(timeout=30)
result = json.loads(msg.data)
results[step["agent"]] = result
if result.get("status") == "error":
await self.compensate(js, saga_id, results)
return {"status": "failed", "failed_at": step["agent"]}
except TimeoutError:
await self.compensate(js, saga_id, results)
return {"status": "timeout", "timed_out_at": step["agent"]}
finally:
await sub.unsubscribe()
return {"status": "completed", "results": results}
Pattern 4: Dead Letter Queues for Failed Agent Actions
When a tool execution fails after all retries, or an agent cannot process a message, the message must go somewhere for investigation rather than being silently dropped.
async def setup_dead_letter_stream(js):
await js.add_stream(
StreamConfig(
name="DEAD_LETTERS",
subjects=["dlq.*"],
retention="limits",
max_age=604800_000_000_000, # 7 days
storage="file", # Persist to disk
)
)
async def send_to_dlq(js, original_subject, message, error):
"""Route a failed message to the dead letter queue."""
await js.publish(
f"dlq.{original_subject.replace('.', '_')}",
json.dumps({
"original_subject": original_subject,
"original_message": json.loads(message.data),
"error": str(error),
"failed_at": datetime.utcnow().isoformat(),
"delivery_count": message.metadata.num_delivered if hasattr(message, 'metadata') else None,
}).encode(),
)
Monitor the dead letter queue size. A growing DLQ signals a systemic issue — a broken tool, a bad prompt causing the agent to generate invalid tool inputs, or a downstream service outage.
Comparison: NATS vs Kafka vs RabbitMQ for Agent Workloads
| Feature | NATS JetStream | Apache Kafka | RabbitMQ |
|---|---|---|---|
| Latency | Sub-millisecond | Low milliseconds | Low milliseconds |
| Throughput | 10M+ msg/sec | 1M+ msg/sec | 100K+ msg/sec |
| Operational complexity | Low (single binary) | High (ZK/KRaft, brokers, schema registry) | Medium (Erlang runtime) |
| Message replay | Yes (stream retention) | Yes (log-based, excellent) | Limited (requires plugins) |
| Exactly-once delivery | Yes (dedup by Msg-Id) | Yes (idempotent producers) | No (at-least-once) |
| Request-reply | Native support | Not native (workaround needed) | Via reply-to queues |
| WebSocket support | Native (NATS WebSocket) | No (need proxy) | Via STOMP plugin |
| Memory footprint | ~20MB base | ~1GB+ per broker | ~100MB base |
| Best for agents | Real-time agent communication, tool execution | Agent event sourcing, audit trails, replay | Task queues, routing-heavy workflows |
When to Choose Each
NATS JetStream is the best default for agentic AI systems. Its native request-reply pattern maps perfectly to agent-tool communication, the operational overhead is minimal (single Go binary), and WebSocket support means browser-based agents can connect directly.
Apache Kafka is the right choice when you need a durable event log of all agent actions for compliance, replay, or training data collection. If you must be able to replay every conversation and tool execution from the beginning of time, Kafka's log-based architecture is unmatched.
RabbitMQ fits when your agent system is primarily about task routing — different message types go to different agent queues based on routing keys. RabbitMQ's exchange and binding model is more expressive than NATS subjects for complex routing scenarios.
Performance Tuning for Agent Workloads
Batch Size Tuning
When tool workers pull messages from the queue, batch size matters. Pulling one message at a time adds round-trip overhead. Pulling too many risks holding messages longer than the ack timeout.
# Good: batch of 10 with short timeout
msgs = await sub.fetch(batch=10, timeout=1)
# Process concurrently
results = await asyncio.gather(
*[process_tool_request(msg) for msg in msgs]
)
Connection Pooling
Each agent pod should maintain a single NATS connection with multiple subscriptions, not one connection per subscription. NATS connections are multiplexed — a single TCP connection handles thousands of subscriptions efficiently.
Subject Hierarchy Design
Design your subject namespace to support both specific and wildcard subscriptions:
agents.{agent_name}.events # Agent-specific events
agents.*.events # All agent events (monitoring)
tools.execute.{tool_name} # Specific tool requests
tools.execute.> # All tool requests (audit)
conversations.{id}.messages # Conversation-specific messages
Frequently Asked Questions
Can I use Redis Pub/Sub instead of a dedicated message queue for agent communication?
Redis Pub/Sub is fire-and-forget with no durability — if a subscriber is not connected when a message is published, the message is lost. For non-critical notifications this is fine, but tool execution requests and agent handoff events must not be lost. Use Redis Streams (which provide durability) or a proper message queue.
How do I handle message ordering in a multi-agent system?
Order matters within a conversation but not across conversations. Use conversation_id as the partition key (Kafka) or ensure your NATS consumers process messages for a single conversation sequentially. NATS JetStream's pull-based consumers with manual ack naturally provide per-subject ordering.
What happens when an agent crashes mid-conversation?
If the agent has not acknowledged the message, the message queue redelivers it to another agent instance after the ack timeout. Design your agents to be idempotent — processing the same message twice should produce the same result. Store conversation state in the database, not in agent process memory.
How do I monitor message queue health for agent systems?
Track these metrics: queue depth per subject (growing queues mean consumers are falling behind), message age (oldest unprocessed message indicates latency), consumer count per subject (zero consumers means messages are building up with no one to process them), and redelivery rate (high redelivery indicates consumer failures).
Should I use a message queue or gRPC for agent-to-agent communication?
Use gRPC for synchronous, low-latency agent calls where you need an immediate response (e.g., a triage agent checking if a specialist is available). Use message queues for asynchronous operations (tool execution, event broadcasting, saga coordination). Most production systems use both — gRPC for the hot path and message queues for everything else.
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.