Skip to content
Learn Agentic AI
Learn Agentic AI17 min read0 views

Event-Driven Agent Architectures: Using NATS, Kafka, and Redis Streams for Agent Communication

Deep dive into event-driven patterns for AI agent coordination: pub/sub messaging, dead letter queues, exactly-once processing with NATS, Kafka, and Redis Streams.

Why Event-Driven Architecture for AI Agents?

Request-response communication works fine when you have a single agent handling a single task. But production AI systems rarely stay that simple. You end up with specialist agents that need to coordinate: a triage agent routes requests, a research agent gathers data, a writing agent produces output, and a review agent validates quality. When these agents communicate via direct HTTP calls, you get tight coupling, cascading failures, and an architecture that becomes increasingly fragile as you add agents.

Event-driven architecture solves this by decoupling agent communication through message brokers. Agents publish events when they complete work, and other agents subscribe to the events they care about. The broker handles delivery, retries, and ordering. This pattern gives you loose coupling, independent scaling, fault tolerance, and a natural audit trail of everything that happened in your system.

This article compares three popular message brokers for agent communication — NATS, Kafka, and Redis Streams — with production-ready code examples for each.

Core Concepts: Events in Agent Systems

Before diving into implementations, let us define the event model for agent communication:

# events/schema.py
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
import uuid

class EventType(str, Enum):
    TASK_CREATED = "task.created"
    TASK_ASSIGNED = "task.assigned"
    AGENT_STARTED = "agent.started"
    AGENT_COMPLETED = "agent.completed"
    AGENT_FAILED = "agent.failed"
    TOOL_CALLED = "tool.called"
    TOOL_RESULT = "tool.result"
    HANDOFF_REQUESTED = "handoff.requested"
    REVIEW_REQUESTED = "review.requested"
    REVIEW_COMPLETED = "review.completed"

class AgentEvent(BaseModel):
    event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    event_type: EventType
    source_agent: str
    target_agent: str | None = None
    correlation_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    payload: dict = Field(default_factory=dict)
    metadata: dict = Field(default_factory=dict)

    def to_bytes(self) -> bytes:
        return self.model_dump_json().encode("utf-8")

    @classmethod
    def from_bytes(cls, data: bytes) -> "AgentEvent":
        return cls.model_validate_json(data)

The correlation_id field is critical — it tracks a single user request across all agents and events, enabling distributed tracing and debugging.

Pattern 1: NATS for Lightweight Agent Pub/Sub

NATS is ideal for agent systems that need low latency and simple deployment. It supports both pub/sub and request/reply patterns, and NATS JetStream adds persistence and exactly-once delivery.

Setting Up NATS with JetStream

# Run NATS with JetStream enabled
docker run -d --name nats -p 4222:4222 nats:latest -js
pip install nats-py

Publishing Agent Events

# broker/nats_publisher.py
import nats
from nats.js.api import StreamConfig, RetentionPolicy
from events.schema import AgentEvent, EventType

class NATSAgentBroker:
    def __init__(self):
        self.nc = None
        self.js = None

    async def connect(self, url: str = "nats://localhost:4222"):
        self.nc = await nats.connect(url)
        self.js = self.nc.jetstream()

        # Create streams for different event categories
        await self.js.add_stream(
            StreamConfig(
                name="AGENT_EVENTS",
                subjects=["agent.>"],
                retention=RetentionPolicy.LIMITS,
                max_age=86400 * 7,  # 7 days retention
                max_msgs=1_000_000,
            )
        )
        await self.js.add_stream(
            StreamConfig(
                name="TASK_EVENTS",
                subjects=["task.>"],
                retention=RetentionPolicy.WORK_QUEUE,
                max_age=86400,
            )
        )

    async def publish(self, event: AgentEvent):
        subject = f"{event.event_type.value}"
        ack = await self.js.publish(
            subject,
            event.to_bytes(),
            headers={
                "Nats-Msg-Id": event.event_id,  # Deduplication
                "correlation-id": event.correlation_id,
            },
        )
        return ack

    async def close(self):
        if self.nc:
            await self.nc.close()

Subscribing to Events

# broker/nats_subscriber.py
from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy

class NATSAgentSubscriber:
    def __init__(self, broker: "NATSAgentBroker", agent_name: str):
        self.broker = broker
        self.agent_name = agent_name

    async def subscribe(self, subject: str, handler, durable_name: str = None):
        """Subscribe to events with durable consumer for reliability."""
        config = ConsumerConfig(
            durable_name=durable_name or f"{self.agent_name}_{subject.replace('.', '_')}",
            deliver_policy=DeliverPolicy.ALL,
            ack_policy=AckPolicy.EXPLICIT,
            max_deliver=3,  # Max retry attempts
            ack_wait=30,    # Seconds to wait for ack before redelivery
        )

        sub = await self.broker.js.subscribe(
            subject,
            config=config,
        )

        async for msg in sub.messages:
            try:
                event = AgentEvent.from_bytes(msg.data)
                await handler(event)
                await msg.ack()
            except Exception as e:
                # After max_deliver attempts, message goes to dead letter
                if msg.metadata.num_delivered >= 3:
                    await self.handle_dead_letter(msg, e)
                    await msg.ack()  # Ack to stop redelivery
                else:
                    await msg.nak(delay=2 ** msg.metadata.num_delivered)

    async def handle_dead_letter(self, msg, error):
        """Route failed messages to a dead letter stream for investigation."""
        event = AgentEvent.from_bytes(msg.data)
        dead_letter = AgentEvent(
            event_type=EventType.AGENT_FAILED,
            source_agent=self.agent_name,
            correlation_id=event.correlation_id,
            payload={
                "original_event": event.model_dump(),
                "error": str(error),
                "attempts": msg.metadata.num_delivered,
            },
        )
        await self.broker.publish(dead_letter)

Wiring Agents to NATS

# agents/research_agent_nats.py
from broker.nats_publisher import NATSAgentBroker
from broker.nats_subscriber import NATSAgentSubscriber
from events.schema import AgentEvent, EventType

async def run_research_agent():
    broker = NATSAgentBroker()
    await broker.connect()
    subscriber = NATSAgentSubscriber(broker, "research-agent")

    async def handle_task(event: AgentEvent):
        query = event.payload.get("query", "")
        print(f"Research agent received task: {query}")

        # Publish start event
        await broker.publish(AgentEvent(
            event_type=EventType.AGENT_STARTED,
            source_agent="research-agent",
            correlation_id=event.correlation_id,
            payload={"query": query},
        ))

        # Do the research work...
        results = await do_research(query)

        # Publish completion event
        await broker.publish(AgentEvent(
            event_type=EventType.AGENT_COMPLETED,
            source_agent="research-agent",
            target_agent="writing-agent",
            correlation_id=event.correlation_id,
            payload={"results": results},
        ))

    await subscriber.subscribe("task.assigned", handle_task)

Pattern 2: Kafka for High-Throughput Agent Pipelines

Kafka excels when your agent system processes high volumes of events and you need strong ordering guarantees, replay capability, and exactly-once semantics.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Kafka Setup and Topic Configuration

# broker/kafka_broker.py
from confluent_kafka import Producer, Consumer, KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
import json

class KafkaAgentBroker:
    def __init__(self, bootstrap_servers: str = "localhost:9092"):
        self.servers = bootstrap_servers
        self.admin = AdminClient({"bootstrap.servers": self.servers})
        self.producer = Producer({
            "bootstrap.servers": self.servers,
            "enable.idempotence": True,      # Exactly-once production
            "acks": "all",                    # Wait for all replicas
            "retries": 5,
            "retry.backoff.ms": 100,
        })

    def ensure_topics(self):
        topics = [
            NewTopic("agent-tasks", num_partitions=6, replication_factor=1),
            NewTopic("agent-results", num_partitions=6, replication_factor=1),
            NewTopic("agent-events", num_partitions=3, replication_factor=1),
            NewTopic("agent-dlq", num_partitions=1, replication_factor=1),
        ]
        self.admin.create_topics(topics)

    def publish(self, topic: str, event: "AgentEvent", partition_key: str = None):
        key = (partition_key or event.correlation_id).encode("utf-8")
        self.producer.produce(
            topic=topic,
            key=key,
            value=event.to_bytes(),
            headers={
                "event-type": event.event_type.value,
                "source-agent": event.source_agent,
                "correlation-id": event.correlation_id,
            },
            callback=self._delivery_callback,
        )
        self.producer.poll(0)

    def _delivery_callback(self, err, msg):
        if err:
            print(f"Delivery failed: {err}")
        else:
            print(f"Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")

    def create_consumer(self, group_id: str, topics: list[str]) -> Consumer:
        consumer = Consumer({
            "bootstrap.servers": self.servers,
            "group.id": group_id,
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,  # Manual commit for exactly-once
            "isolation.level": "read_committed",
        })
        consumer.subscribe(topics)
        return consumer

Consuming with Exactly-Once Semantics

# broker/kafka_consumer.py
from events.schema import AgentEvent
import json

class KafkaAgentConsumer:
    def __init__(self, broker: "KafkaAgentBroker", agent_name: str):
        self.broker = broker
        self.agent_name = agent_name
        self.consumer = broker.create_consumer(
            group_id=f"{agent_name}-group",
            topics=["agent-tasks"],
        )

    def consume_loop(self, handler):
        """Main consume loop with manual offset commits."""
        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)
                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        continue
                    raise Exception(msg.error())

                event = AgentEvent.from_bytes(msg.value())

                try:
                    handler(event)
                    # Commit only after successful processing
                    self.consumer.commit(msg)
                except Exception as e:
                    # Send to dead letter queue
                    dlq_event = AgentEvent(
                        event_type=EventType.AGENT_FAILED,
                        source_agent=self.agent_name,
                        correlation_id=event.correlation_id,
                        payload={"error": str(e), "original": event.model_dump()},
                    )
                    self.broker.publish("agent-dlq", dlq_event)
                    self.consumer.commit(msg)  # Don't reprocess
        finally:
            self.consumer.close()

The key to exactly-once processing in Kafka is combining idempotent producers (enable.idempotence=True), manual offset commits (commit only after successful processing), and read-committed isolation level (only read fully committed messages).

Pattern 3: Redis Streams for Simple Agent Queues

Redis Streams is the best choice when you already run Redis for caching and need lightweight persistent messaging without deploying a separate broker.

Redis Streams Agent Broker

# broker/redis_broker.py
import redis.asyncio as redis
from events.schema import AgentEvent
import json

class RedisAgentBroker:
    def __init__(self, url: str = "redis://localhost:6379/0"):
        self.redis = redis.from_url(url, decode_responses=True)

    async def publish(self, stream: str, event: AgentEvent):
        """Add an event to a Redis stream."""
        await self.redis.xadd(
            stream,
            {
                "event_id": event.event_id,
                "event_type": event.event_type.value,
                "source_agent": event.source_agent,
                "correlation_id": event.correlation_id,
                "payload": event.model_dump_json(),
            },
            maxlen=100000,  # Cap stream length
        )

    async def create_consumer_group(self, stream: str, group: str):
        """Create a consumer group for reliable message processing."""
        try:
            await self.redis.xgroup_create(stream, group, id="0", mkstream=True)
        except redis.ResponseError as e:
            if "BUSYGROUP" not in str(e):
                raise

    async def consume(self, stream: str, group: str, consumer: str,
                      handler, batch_size: int = 10):
        """Consume messages from a stream with consumer group semantics."""
        await self.create_consumer_group(stream, group)

        while True:
            # Read new messages
            messages = await self.redis.xreadgroup(
                groupname=group,
                consumername=consumer,
                streams={stream: ">"},
                count=batch_size,
                block=5000,  # Block for 5 seconds
            )

            if not messages:
                # Check for pending messages that need reprocessing
                await self._process_pending(stream, group, consumer, handler)
                continue

            for stream_name, entries in messages:
                for msg_id, fields in entries:
                    event = AgentEvent.model_validate_json(fields["payload"])
                    try:
                        await handler(event)
                        await self.redis.xack(stream, group, msg_id)
                    except Exception as e:
                        # Message stays pending for retry
                        print(f"Processing failed for {msg_id}: {e}")

    async def _process_pending(self, stream: str, group: str,
                                consumer: str, handler):
        """Retry pending messages that were not acknowledged."""
        pending = await self.redis.xpending_range(
            stream, group, min="-", max="+", count=10, consumername=consumer,
        )
        for entry in pending:
            if entry["time_since_delivered"] > 30000:  # 30 seconds
                if entry["times_delivered"] >= 3:
                    # Move to dead letter stream
                    msgs = await self.redis.xrange(
                        stream, min=entry["message_id"], max=entry["message_id"]
                    )
                    if msgs:
                        await self.redis.xadd(f"{stream}:dlq", msgs[0][1])
                    await self.redis.xack(stream, group, entry["message_id"])
                else:
                    # Claim and retry
                    await self.redis.xclaim(
                        stream, group, consumer,
                        min_idle_time=30000,
                        message_ids=[entry["message_id"]],
                    )

Choosing the Right Broker

Feature NATS JetStream Kafka Redis Streams
Latency Sub-millisecond Low milliseconds Sub-millisecond
Throughput Millions/sec Millions/sec Hundreds of thousands/sec
Ordering Per subject Per partition Per stream
Retention Time/count based Configurable Memory/maxlen
Exactly-once Yes (dedup) Yes (transactions) No (at-least-once)
Operational complexity Low High Low (if Redis exists)
Best for Agent-to-agent RPC High-volume pipelines Simple task queues

Use NATS when you need low-latency agent-to-agent communication with simple operations. Use Kafka when you need high-throughput event streaming with strong ordering and replay. Use Redis Streams when you already have Redis and need lightweight persistent queues.

Dead Letter Queue Pattern for Agents

Every event-driven agent system needs a dead letter queue strategy. When an agent fails to process a message after multiple retries, the message must go somewhere for investigation rather than being lost or blocking the queue forever.

# dlq/handler.py
async def process_dead_letters(broker, dlq_stream: str):
    """Monitor the dead letter queue and alert on failures."""
    async def handle_dlq(event: AgentEvent):
        error = event.payload.get("error", "unknown")
        original = event.payload.get("original", {})

        # Log for investigation
        print(f"DLQ: Agent {event.source_agent} failed processing "
              f"correlation {event.correlation_id}: {error}")

        # Could send to alerting system (PagerDuty, Slack, etc.)
        # Could store in a database for manual review
        # Could attempt reprocessing with different parameters

    await broker.consume(dlq_stream, "dlq-monitor", "monitor-1", handle_dlq)

FAQ

How do I trace a request across multiple agents?

Use the correlation_id field consistently. When one agent publishes an event in response to another event, it copies the correlation_id from the incoming event. This creates a trace of all events related to a single user request. Pair this with structured logging that includes the correlation ID, and you can reconstruct the full event chain in your log aggregator.

What happens if a message broker goes down?

NATS JetStream and Kafka both support clustering for high availability. With proper replication, broker failures are transparent to agents. Redis Streams can use Redis Sentinel or Cluster for HA. In all cases, agents should implement local buffering to handle brief broker outages without dropping events.

How do I handle message ordering when agents scale horizontally?

Use partition keys (Kafka) or subject-based routing (NATS) to ensure messages for the same entity are always processed by the same consumer instance. For example, key all events for a conversation by the conversation ID. This guarantees ordering per conversation while allowing parallel processing across conversations.

Can I mix synchronous and asynchronous communication?

Yes. Use request-reply (NATS) or synchronous HTTP for operations that need immediate results, and pub/sub for fire-and-forget coordination. NATS natively supports both patterns. With Kafka, pair it with a lightweight HTTP layer for synchronous needs. The key is to use async for agent coordination and sync only for user-facing responses that need immediate feedback.

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.