Skip to content
Technology10 min read0 views

Agentic AI with Message Queues: NATS, Kafka, and RabbitMQ Patterns

Compare NATS, Kafka, and RabbitMQ for agentic AI workloads. Learn async tool execution, event-driven agents, and dead letter queue patterns.

Why Message Queues Are Essential for Production Agent Systems

A simple agentic AI prototype calls the LLM synchronously, executes tools inline, and returns the result. This works for demos. In production, it falls apart.

Consider what happens when a triage agent needs to call a tool that takes 30 seconds (a CRM lookup, a database report, an API that rate-limits). The user's WebSocket connection hangs. The HTTP request times out. The agent pod holds a thread doing nothing but waiting.

Message queues decouple agent logic from tool execution, enable event-driven agent triggers, distribute work across agent replicas, and provide durability for operations that must not be lost. At CallSphere, we use NATS JetStream as our primary message backbone for multi-agent communication, and this guide covers the patterns we have found most effective.

Pattern 1: Async Tool Execution

The most immediately useful pattern is decoupling tool execution from the agent conversation loop.

Architecture

User Message
    |
    v
[Triage Agent] --publishes--> [tools.execute.{tool_name}]
    |                                    |
    | (continues thinking)               v
    |                           [Tool Worker Pool]
    |                                    |
    |                           publishes result to
    |                           [tools.result.{conversation_id}]
    |                                    |
    v  <-- subscribes ------------------+
[Triage Agent resumes with tool result]

The agent publishes a tool execution request to a subject, continues processing other conversations (or yields the thread), and picks up the result when the tool worker publishes it back.

NATS JetStream Implementation

import nats
from nats.js.api import StreamConfig, ConsumerConfig, DeliverPolicy

async def setup_tool_streams(nc):
    js = nc.jetstream()

    # Stream for tool execution requests
    await js.add_stream(
        StreamConfig(
            name="TOOL_REQUESTS",
            subjects=["tools.execute.*"],
            retention="workqueue",
            max_age=300_000_000_000,  # 5 minutes TTL
            storage="memory",
        )
    )

    # Stream for tool results
    await js.add_stream(
        StreamConfig(
            name="TOOL_RESULTS",
            subjects=["tools.result.*"],
            retention="interest",
            max_age=60_000_000_000,  # 1 minute TTL
            storage="memory",
        )
    )

async def request_tool_execution(js, conversation_id, tool_name, tool_input):
    """Agent publishes a tool execution request."""
    payload = json.dumps({
        "conversation_id": conversation_id,
        "tool_name": tool_name,
        "tool_input": tool_input,
        "requested_at": datetime.utcnow().isoformat(),
    }).encode()

    await js.publish(
        f"tools.execute.{tool_name}",
        payload,
        headers={"Nats-Msg-Id": f"{conversation_id}-{tool_name}-{uuid4()}"},
    )

async def tool_worker(js, tool_name, executor_fn):
    """Worker that processes tool execution requests."""
    sub = await js.pull_subscribe(
        f"tools.execute.{tool_name}",
        durable=f"worker-{tool_name}",
        config=ConsumerConfig(
            ack_wait=30,
            max_deliver=3,
        ),
    )

    while True:
        try:
            msgs = await sub.fetch(batch=1, timeout=5)
            for msg in msgs:
                request = json.loads(msg.data)
                try:
                    result = await executor_fn(request["tool_input"])
                    await js.publish(
                        f"tools.result.{request['conversation_id']}",
                        json.dumps({"status": "success", "result": result}).encode(),
                    )
                    await msg.ack()
                except Exception as e:
                    await js.publish(
                        f"tools.result.{request['conversation_id']}",
                        json.dumps({"status": "error", "error": str(e)}).encode(),
                    )
                    await msg.nak(delay=5)
        except nats.errors.TimeoutError:
            continue

Why Workqueue Retention?

For tool requests, we use workqueue retention — once a message is acknowledged by a consumer, it is removed from the stream. Tool requests are tasks, not events. You want exactly-once processing, not replay capability.

For tool results, we use interest-based retention — results stay in the stream as long as there are active consumers interested in them, then get cleaned up. The agent subscribes to its specific conversation_id subject, receives the result, and the message is eligible for cleanup.

Pattern 2: Event-Driven Agent Triggers

Instead of a monolithic orchestrator deciding which agent to invoke, publish events and let agents subscribe to the events they care about.

# Events that agents subscribe to
EVENT_SUBJECTS = {
    "conversation.started": ["triage_agent"],
    "conversation.escalated": ["supervisor_agent"],
    "intent.identified.billing": ["billing_agent"],
    "intent.identified.support": ["support_agent"],
    "intent.identified.sales": ["sales_agent"],
    "tool.result.payment_processed": ["billing_agent", "notification_agent"],
    "agent.handoff.requested": ["triage_agent"],
}

async def publish_event(js, event_type, payload):
    """Publish a domain event."""
    await js.publish(
        f"events.{event_type}",
        json.dumps({
            "event_type": event_type,
            "payload": payload,
            "timestamp": datetime.utcnow().isoformat(),
        }).encode(),
    )

# When intent is identified, the relevant agent wakes up
await publish_event(js, "intent.identified.billing", {
    "conversation_id": conv_id,
    "user_message": "I need to update my payment method",
    "confidence": 0.94,
})

This pattern eliminates the central orchestrator bottleneck. Adding a new agent type means deploying a new service that subscribes to the relevant events — zero changes to existing agents.

Pattern 3: Distributed Agent Coordination

When multiple agents need to collaborate on a single task (e.g., a complex customer request that touches billing, support, and scheduling), use a saga pattern coordinated through message queues.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

class AgentSaga:
    """Coordinates multi-agent collaboration via message queue."""

    SAGA_STEPS = [
        {"agent": "billing_agent", "action": "check_account_status"},
        {"agent": "support_agent", "action": "lookup_ticket_history"},
        {"agent": "scheduling_agent", "action": "find_available_slots"},
    ]

    async def execute(self, js, conversation_id, context):
        saga_id = str(uuid4())
        results = {}

        for step in self.SAGA_STEPS:
            # Publish step request
            await js.publish(
                f"saga.{saga_id}.{step['agent']}",
                json.dumps({
                    "saga_id": saga_id,
                    "conversation_id": conversation_id,
                    "action": step["action"],
                    "context": context,
                    "prior_results": results,
                }).encode(),
            )

            # Wait for step result with timeout
            sub = await js.subscribe(f"saga.{saga_id}.{step['agent']}.result")
            try:
                msg = await sub.next_msg(timeout=30)
                result = json.loads(msg.data)
                results[step["agent"]] = result

                if result.get("status") == "error":
                    await self.compensate(js, saga_id, results)
                    return {"status": "failed", "failed_at": step["agent"]}
            except TimeoutError:
                await self.compensate(js, saga_id, results)
                return {"status": "timeout", "timed_out_at": step["agent"]}
            finally:
                await sub.unsubscribe()

        return {"status": "completed", "results": results}

Pattern 4: Dead Letter Queues for Failed Agent Actions

When a tool execution fails after all retries, or an agent cannot process a message, the message must go somewhere for investigation rather than being silently dropped.

async def setup_dead_letter_stream(js):
    await js.add_stream(
        StreamConfig(
            name="DEAD_LETTERS",
            subjects=["dlq.*"],
            retention="limits",
            max_age=604800_000_000_000,  # 7 days
            storage="file",  # Persist to disk
        )
    )

async def send_to_dlq(js, original_subject, message, error):
    """Route a failed message to the dead letter queue."""
    await js.publish(
        f"dlq.{original_subject.replace('.', '_')}",
        json.dumps({
            "original_subject": original_subject,
            "original_message": json.loads(message.data),
            "error": str(error),
            "failed_at": datetime.utcnow().isoformat(),
            "delivery_count": message.metadata.num_delivered if hasattr(message, 'metadata') else None,
        }).encode(),
    )

Monitor the dead letter queue size. A growing DLQ signals a systemic issue — a broken tool, a bad prompt causing the agent to generate invalid tool inputs, or a downstream service outage.

Comparison: NATS vs Kafka vs RabbitMQ for Agent Workloads

Feature NATS JetStream Apache Kafka RabbitMQ
Latency Sub-millisecond Low milliseconds Low milliseconds
Throughput 10M+ msg/sec 1M+ msg/sec 100K+ msg/sec
Operational complexity Low (single binary) High (ZK/KRaft, brokers, schema registry) Medium (Erlang runtime)
Message replay Yes (stream retention) Yes (log-based, excellent) Limited (requires plugins)
Exactly-once delivery Yes (dedup by Msg-Id) Yes (idempotent producers) No (at-least-once)
Request-reply Native support Not native (workaround needed) Via reply-to queues
WebSocket support Native (NATS WebSocket) No (need proxy) Via STOMP plugin
Memory footprint ~20MB base ~1GB+ per broker ~100MB base
Best for agents Real-time agent communication, tool execution Agent event sourcing, audit trails, replay Task queues, routing-heavy workflows

When to Choose Each

NATS JetStream is the best default for agentic AI systems. Its native request-reply pattern maps perfectly to agent-tool communication, the operational overhead is minimal (single Go binary), and WebSocket support means browser-based agents can connect directly.

Apache Kafka is the right choice when you need a durable event log of all agent actions for compliance, replay, or training data collection. If you must be able to replay every conversation and tool execution from the beginning of time, Kafka's log-based architecture is unmatched.

RabbitMQ fits when your agent system is primarily about task routing — different message types go to different agent queues based on routing keys. RabbitMQ's exchange and binding model is more expressive than NATS subjects for complex routing scenarios.

Performance Tuning for Agent Workloads

Batch Size Tuning

When tool workers pull messages from the queue, batch size matters. Pulling one message at a time adds round-trip overhead. Pulling too many risks holding messages longer than the ack timeout.

# Good: batch of 10 with short timeout
msgs = await sub.fetch(batch=10, timeout=1)

# Process concurrently
results = await asyncio.gather(
    *[process_tool_request(msg) for msg in msgs]
)

Connection Pooling

Each agent pod should maintain a single NATS connection with multiple subscriptions, not one connection per subscription. NATS connections are multiplexed — a single TCP connection handles thousands of subscriptions efficiently.

Subject Hierarchy Design

Design your subject namespace to support both specific and wildcard subscriptions:

agents.{agent_name}.events        # Agent-specific events
agents.*.events                   # All agent events (monitoring)
tools.execute.{tool_name}         # Specific tool requests
tools.execute.>                   # All tool requests (audit)
conversations.{id}.messages       # Conversation-specific messages

Frequently Asked Questions

Can I use Redis Pub/Sub instead of a dedicated message queue for agent communication?

Redis Pub/Sub is fire-and-forget with no durability — if a subscriber is not connected when a message is published, the message is lost. For non-critical notifications this is fine, but tool execution requests and agent handoff events must not be lost. Use Redis Streams (which provide durability) or a proper message queue.

How do I handle message ordering in a multi-agent system?

Order matters within a conversation but not across conversations. Use conversation_id as the partition key (Kafka) or ensure your NATS consumers process messages for a single conversation sequentially. NATS JetStream's pull-based consumers with manual ack naturally provide per-subject ordering.

What happens when an agent crashes mid-conversation?

If the agent has not acknowledged the message, the message queue redelivers it to another agent instance after the ack timeout. Design your agents to be idempotent — processing the same message twice should produce the same result. Store conversation state in the database, not in agent process memory.

How do I monitor message queue health for agent systems?

Track these metrics: queue depth per subject (growing queues mean consumers are falling behind), message age (oldest unprocessed message indicates latency), consumer count per subject (zero consumers means messages are building up with no one to process them), and redelivery rate (high redelivery indicates consumer failures).

Should I use a message queue or gRPC for agent-to-agent communication?

Use gRPC for synchronous, low-latency agent calls where you need an immediate response (e.g., a triage agent checking if a specialist is available). Use message queues for asynchronous operations (tool execution, event broadcasting, saga coordination). Most production systems use both — gRPC for the hot path and message queues for everything else.

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.