Event-Driven Agent Architectures: Using NATS, Kafka, and Redis Streams for Agent Communication
Deep dive into event-driven patterns for AI agent coordination: pub/sub messaging, dead letter queues, exactly-once processing with NATS, Kafka, and Redis Streams.
Why Event-Driven Architecture for AI Agents?
Request-response communication works fine when you have a single agent handling a single task. But production AI systems rarely stay that simple. You end up with specialist agents that need to coordinate: a triage agent routes requests, a research agent gathers data, a writing agent produces output, and a review agent validates quality. When these agents communicate via direct HTTP calls, you get tight coupling, cascading failures, and an architecture that becomes increasingly fragile as you add agents.
Event-driven architecture solves this by decoupling agent communication through message brokers. Agents publish events when they complete work, and other agents subscribe to the events they care about. The broker handles delivery, retries, and ordering. This pattern gives you loose coupling, independent scaling, fault tolerance, and a natural audit trail of everything that happened in your system.
This article compares three popular message brokers for agent communication — NATS, Kafka, and Redis Streams — with production-ready code examples for each.
Core Concepts: Events in Agent Systems
Before diving into implementations, let us define the event model for agent communication:
# events/schema.py
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
import uuid
class EventType(str, Enum):
TASK_CREATED = "task.created"
TASK_ASSIGNED = "task.assigned"
AGENT_STARTED = "agent.started"
AGENT_COMPLETED = "agent.completed"
AGENT_FAILED = "agent.failed"
TOOL_CALLED = "tool.called"
TOOL_RESULT = "tool.result"
HANDOFF_REQUESTED = "handoff.requested"
REVIEW_REQUESTED = "review.requested"
REVIEW_COMPLETED = "review.completed"
class AgentEvent(BaseModel):
event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
event_type: EventType
source_agent: str
target_agent: str | None = None
correlation_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = Field(default_factory=datetime.utcnow)
payload: dict = Field(default_factory=dict)
metadata: dict = Field(default_factory=dict)
def to_bytes(self) -> bytes:
return self.model_dump_json().encode("utf-8")
@classmethod
def from_bytes(cls, data: bytes) -> "AgentEvent":
return cls.model_validate_json(data)
The correlation_id field is critical — it tracks a single user request across all agents and events, enabling distributed tracing and debugging.
Pattern 1: NATS for Lightweight Agent Pub/Sub
NATS is ideal for agent systems that need low latency and simple deployment. It supports both pub/sub and request/reply patterns, and NATS JetStream adds persistence and exactly-once delivery.
Setting Up NATS with JetStream
# Run NATS with JetStream enabled
docker run -d --name nats -p 4222:4222 nats:latest -js
pip install nats-py
Publishing Agent Events
# broker/nats_publisher.py
import nats
from nats.js.api import StreamConfig, RetentionPolicy
from events.schema import AgentEvent, EventType
class NATSAgentBroker:
def __init__(self):
self.nc = None
self.js = None
async def connect(self, url: str = "nats://localhost:4222"):
self.nc = await nats.connect(url)
self.js = self.nc.jetstream()
# Create streams for different event categories
await self.js.add_stream(
StreamConfig(
name="AGENT_EVENTS",
subjects=["agent.>"],
retention=RetentionPolicy.LIMITS,
max_age=86400 * 7, # 7 days retention
max_msgs=1_000_000,
)
)
await self.js.add_stream(
StreamConfig(
name="TASK_EVENTS",
subjects=["task.>"],
retention=RetentionPolicy.WORK_QUEUE,
max_age=86400,
)
)
async def publish(self, event: AgentEvent):
subject = f"{event.event_type.value}"
ack = await self.js.publish(
subject,
event.to_bytes(),
headers={
"Nats-Msg-Id": event.event_id, # Deduplication
"correlation-id": event.correlation_id,
},
)
return ack
async def close(self):
if self.nc:
await self.nc.close()
Subscribing to Events
# broker/nats_subscriber.py
from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy
class NATSAgentSubscriber:
def __init__(self, broker: "NATSAgentBroker", agent_name: str):
self.broker = broker
self.agent_name = agent_name
async def subscribe(self, subject: str, handler, durable_name: str = None):
"""Subscribe to events with durable consumer for reliability."""
config = ConsumerConfig(
durable_name=durable_name or f"{self.agent_name}_{subject.replace('.', '_')}",
deliver_policy=DeliverPolicy.ALL,
ack_policy=AckPolicy.EXPLICIT,
max_deliver=3, # Max retry attempts
ack_wait=30, # Seconds to wait for ack before redelivery
)
sub = await self.broker.js.subscribe(
subject,
config=config,
)
async for msg in sub.messages:
try:
event = AgentEvent.from_bytes(msg.data)
await handler(event)
await msg.ack()
except Exception as e:
# After max_deliver attempts, message goes to dead letter
if msg.metadata.num_delivered >= 3:
await self.handle_dead_letter(msg, e)
await msg.ack() # Ack to stop redelivery
else:
await msg.nak(delay=2 ** msg.metadata.num_delivered)
async def handle_dead_letter(self, msg, error):
"""Route failed messages to a dead letter stream for investigation."""
event = AgentEvent.from_bytes(msg.data)
dead_letter = AgentEvent(
event_type=EventType.AGENT_FAILED,
source_agent=self.agent_name,
correlation_id=event.correlation_id,
payload={
"original_event": event.model_dump(),
"error": str(error),
"attempts": msg.metadata.num_delivered,
},
)
await self.broker.publish(dead_letter)
Wiring Agents to NATS
# agents/research_agent_nats.py
from broker.nats_publisher import NATSAgentBroker
from broker.nats_subscriber import NATSAgentSubscriber
from events.schema import AgentEvent, EventType
async def run_research_agent():
broker = NATSAgentBroker()
await broker.connect()
subscriber = NATSAgentSubscriber(broker, "research-agent")
async def handle_task(event: AgentEvent):
query = event.payload.get("query", "")
print(f"Research agent received task: {query}")
# Publish start event
await broker.publish(AgentEvent(
event_type=EventType.AGENT_STARTED,
source_agent="research-agent",
correlation_id=event.correlation_id,
payload={"query": query},
))
# Do the research work...
results = await do_research(query)
# Publish completion event
await broker.publish(AgentEvent(
event_type=EventType.AGENT_COMPLETED,
source_agent="research-agent",
target_agent="writing-agent",
correlation_id=event.correlation_id,
payload={"results": results},
))
await subscriber.subscribe("task.assigned", handle_task)
Pattern 2: Kafka for High-Throughput Agent Pipelines
Kafka excels when your agent system processes high volumes of events and you need strong ordering guarantees, replay capability, and exactly-once semantics.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Kafka Setup and Topic Configuration
# broker/kafka_broker.py
from confluent_kafka import Producer, Consumer, KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
import json
class KafkaAgentBroker:
def __init__(self, bootstrap_servers: str = "localhost:9092"):
self.servers = bootstrap_servers
self.admin = AdminClient({"bootstrap.servers": self.servers})
self.producer = Producer({
"bootstrap.servers": self.servers,
"enable.idempotence": True, # Exactly-once production
"acks": "all", # Wait for all replicas
"retries": 5,
"retry.backoff.ms": 100,
})
def ensure_topics(self):
topics = [
NewTopic("agent-tasks", num_partitions=6, replication_factor=1),
NewTopic("agent-results", num_partitions=6, replication_factor=1),
NewTopic("agent-events", num_partitions=3, replication_factor=1),
NewTopic("agent-dlq", num_partitions=1, replication_factor=1),
]
self.admin.create_topics(topics)
def publish(self, topic: str, event: "AgentEvent", partition_key: str = None):
key = (partition_key or event.correlation_id).encode("utf-8")
self.producer.produce(
topic=topic,
key=key,
value=event.to_bytes(),
headers={
"event-type": event.event_type.value,
"source-agent": event.source_agent,
"correlation-id": event.correlation_id,
},
callback=self._delivery_callback,
)
self.producer.poll(0)
def _delivery_callback(self, err, msg):
if err:
print(f"Delivery failed: {err}")
else:
print(f"Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")
def create_consumer(self, group_id: str, topics: list[str]) -> Consumer:
consumer = Consumer({
"bootstrap.servers": self.servers,
"group.id": group_id,
"auto.offset.reset": "earliest",
"enable.auto.commit": False, # Manual commit for exactly-once
"isolation.level": "read_committed",
})
consumer.subscribe(topics)
return consumer
Consuming with Exactly-Once Semantics
# broker/kafka_consumer.py
from events.schema import AgentEvent
import json
class KafkaAgentConsumer:
def __init__(self, broker: "KafkaAgentBroker", agent_name: str):
self.broker = broker
self.agent_name = agent_name
self.consumer = broker.create_consumer(
group_id=f"{agent_name}-group",
topics=["agent-tasks"],
)
def consume_loop(self, handler):
"""Main consume loop with manual offset commits."""
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise Exception(msg.error())
event = AgentEvent.from_bytes(msg.value())
try:
handler(event)
# Commit only after successful processing
self.consumer.commit(msg)
except Exception as e:
# Send to dead letter queue
dlq_event = AgentEvent(
event_type=EventType.AGENT_FAILED,
source_agent=self.agent_name,
correlation_id=event.correlation_id,
payload={"error": str(e), "original": event.model_dump()},
)
self.broker.publish("agent-dlq", dlq_event)
self.consumer.commit(msg) # Don't reprocess
finally:
self.consumer.close()
The key to exactly-once processing in Kafka is combining idempotent producers (enable.idempotence=True), manual offset commits (commit only after successful processing), and read-committed isolation level (only read fully committed messages).
Pattern 3: Redis Streams for Simple Agent Queues
Redis Streams is the best choice when you already run Redis for caching and need lightweight persistent messaging without deploying a separate broker.
Redis Streams Agent Broker
# broker/redis_broker.py
import redis.asyncio as redis
from events.schema import AgentEvent
import json
class RedisAgentBroker:
def __init__(self, url: str = "redis://localhost:6379/0"):
self.redis = redis.from_url(url, decode_responses=True)
async def publish(self, stream: str, event: AgentEvent):
"""Add an event to a Redis stream."""
await self.redis.xadd(
stream,
{
"event_id": event.event_id,
"event_type": event.event_type.value,
"source_agent": event.source_agent,
"correlation_id": event.correlation_id,
"payload": event.model_dump_json(),
},
maxlen=100000, # Cap stream length
)
async def create_consumer_group(self, stream: str, group: str):
"""Create a consumer group for reliable message processing."""
try:
await self.redis.xgroup_create(stream, group, id="0", mkstream=True)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
async def consume(self, stream: str, group: str, consumer: str,
handler, batch_size: int = 10):
"""Consume messages from a stream with consumer group semantics."""
await self.create_consumer_group(stream, group)
while True:
# Read new messages
messages = await self.redis.xreadgroup(
groupname=group,
consumername=consumer,
streams={stream: ">"},
count=batch_size,
block=5000, # Block for 5 seconds
)
if not messages:
# Check for pending messages that need reprocessing
await self._process_pending(stream, group, consumer, handler)
continue
for stream_name, entries in messages:
for msg_id, fields in entries:
event = AgentEvent.model_validate_json(fields["payload"])
try:
await handler(event)
await self.redis.xack(stream, group, msg_id)
except Exception as e:
# Message stays pending for retry
print(f"Processing failed for {msg_id}: {e}")
async def _process_pending(self, stream: str, group: str,
consumer: str, handler):
"""Retry pending messages that were not acknowledged."""
pending = await self.redis.xpending_range(
stream, group, min="-", max="+", count=10, consumername=consumer,
)
for entry in pending:
if entry["time_since_delivered"] > 30000: # 30 seconds
if entry["times_delivered"] >= 3:
# Move to dead letter stream
msgs = await self.redis.xrange(
stream, min=entry["message_id"], max=entry["message_id"]
)
if msgs:
await self.redis.xadd(f"{stream}:dlq", msgs[0][1])
await self.redis.xack(stream, group, entry["message_id"])
else:
# Claim and retry
await self.redis.xclaim(
stream, group, consumer,
min_idle_time=30000,
message_ids=[entry["message_id"]],
)
Choosing the Right Broker
| Feature | NATS JetStream | Kafka | Redis Streams |
|---|---|---|---|
| Latency | Sub-millisecond | Low milliseconds | Sub-millisecond |
| Throughput | Millions/sec | Millions/sec | Hundreds of thousands/sec |
| Ordering | Per subject | Per partition | Per stream |
| Retention | Time/count based | Configurable | Memory/maxlen |
| Exactly-once | Yes (dedup) | Yes (transactions) | No (at-least-once) |
| Operational complexity | Low | High | Low (if Redis exists) |
| Best for | Agent-to-agent RPC | High-volume pipelines | Simple task queues |
Use NATS when you need low-latency agent-to-agent communication with simple operations. Use Kafka when you need high-throughput event streaming with strong ordering and replay. Use Redis Streams when you already have Redis and need lightweight persistent queues.
Dead Letter Queue Pattern for Agents
Every event-driven agent system needs a dead letter queue strategy. When an agent fails to process a message after multiple retries, the message must go somewhere for investigation rather than being lost or blocking the queue forever.
# dlq/handler.py
async def process_dead_letters(broker, dlq_stream: str):
"""Monitor the dead letter queue and alert on failures."""
async def handle_dlq(event: AgentEvent):
error = event.payload.get("error", "unknown")
original = event.payload.get("original", {})
# Log for investigation
print(f"DLQ: Agent {event.source_agent} failed processing "
f"correlation {event.correlation_id}: {error}")
# Could send to alerting system (PagerDuty, Slack, etc.)
# Could store in a database for manual review
# Could attempt reprocessing with different parameters
await broker.consume(dlq_stream, "dlq-monitor", "monitor-1", handle_dlq)
FAQ
How do I trace a request across multiple agents?
Use the correlation_id field consistently. When one agent publishes an event in response to another event, it copies the correlation_id from the incoming event. This creates a trace of all events related to a single user request. Pair this with structured logging that includes the correlation ID, and you can reconstruct the full event chain in your log aggregator.
What happens if a message broker goes down?
NATS JetStream and Kafka both support clustering for high availability. With proper replication, broker failures are transparent to agents. Redis Streams can use Redis Sentinel or Cluster for HA. In all cases, agents should implement local buffering to handle brief broker outages without dropping events.
How do I handle message ordering when agents scale horizontally?
Use partition keys (Kafka) or subject-based routing (NATS) to ensure messages for the same entity are always processed by the same consumer instance. For example, key all events for a conversation by the conversation ID. This guarantees ordering per conversation while allowing parallel processing across conversations.
Can I mix synchronous and asynchronous communication?
Yes. Use request-reply (NATS) or synchronous HTTP for operations that need immediate results, and pub/sub for fire-and-forget coordination. NATS natively supports both patterns. With Kafka, pair it with a lightweight HTTP layer for synchronous needs. The key is to use async for agent coordination and sync only for user-facing responses that need immediate feedback.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.