Event-Driven Microservices for AI Agents: Kafka, RabbitMQ, and NATS Patterns
Implement event-driven communication between AI agent microservices using Kafka, RabbitMQ, and NATS. Learn event schema design, pub/sub patterns, event sourcing, and exactly-once delivery semantics.
Why Event-Driven Architecture Fits AI Agent Systems
AI agent workflows are inherently asynchronous. A user sends a message, the agent reasons over it, calls tools, retrieves context from a vector store, and eventually returns a response. Many of these steps can happen independently. The memory service needs to record the conversation after the response is sent. The analytics service needs to log latency metrics. The billing service needs to track token usage.
If all of these happen synchronously in the request path, response latency balloons. Event-driven architecture decouples the request path from downstream processing. The conversation service publishes events, and other services consume them independently.
Designing Event Schemas
A well-designed event schema is the contract between services. It must be self-describing, versioned, and contain enough context for any consumer to act without making additional API calls:
from dataclasses import dataclass, field, asdict
from datetime import datetime
import uuid
import json
@dataclass
class AgentEvent:
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = ""
version: str = "1.0"
timestamp: str = field(
default_factory=lambda: datetime.utcnow().isoformat()
)
source_service: str = ""
correlation_id: str = ""
payload: dict = field(default_factory=dict)
def to_json(self) -> str:
return json.dumps(asdict(self))
# Example events published by the conversation service
def create_message_received_event(
session_id: str, user_msg: str, correlation_id: str
) -> AgentEvent:
return AgentEvent(
event_type="agent.message.received",
source_service="conversation-manager",
correlation_id=correlation_id,
payload={
"session_id": session_id,
"message": user_msg,
"message_type": "user",
},
)
def create_response_generated_event(
session_id: str,
response: str,
tokens_used: int,
model: str,
correlation_id: str,
) -> AgentEvent:
return AgentEvent(
event_type="agent.response.generated",
source_service="conversation-manager",
correlation_id=correlation_id,
payload={
"session_id": session_id,
"response_length": len(response),
"tokens_used": tokens_used,
"model": model,
},
)
The correlation_id ties all events from a single user request together across services, which is essential for distributed tracing.
Kafka for High-Throughput Agent Event Streams
Kafka excels when you need durable, ordered event streams at high throughput. Agent systems that process thousands of messages per minute benefit from Kafka's partitioned log architecture:
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import asyncio
# Producer in the conversation service
class AgentEventProducer:
def __init__(self, bootstrap_servers: str = "kafka:9092"):
self.producer = AIOKafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: v.encode("utf-8"),
acks="all", # Wait for all replicas to acknowledge
)
async def start(self):
await self.producer.start()
async def publish(self, event: AgentEvent):
topic = event.event_type.replace(".", "-")
await self.producer.send_and_wait(
topic=topic,
value=event.to_json(),
key=event.correlation_id.encode("utf-8"),
)
# Consumer in the analytics service
class AnalyticsConsumer:
def __init__(self):
self.consumer = AIOKafkaConsumer(
"agent-response-generated",
bootstrap_servers="kafka:9092",
group_id="analytics-service",
auto_offset_reset="earliest",
enable_auto_commit=False,
)
async def consume(self):
await self.consumer.start()
try:
async for msg in self.consumer:
event = json.loads(msg.value.decode("utf-8"))
await self.process_event(event)
await self.consumer.commit()
finally:
await self.consumer.stop()
async def process_event(self, event: dict):
payload = event["payload"]
await self.db.insert_metric(
session_id=payload["session_id"],
tokens_used=payload["tokens_used"],
model=payload["model"],
timestamp=event["timestamp"],
)
Setting acks="all" ensures the event is durably written before the producer considers it sent. The consumer uses manual commit (enable_auto_commit=False) to guarantee at-least-once processing.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
NATS for Lightweight Agent Communication
NATS is a strong choice for agent systems that need low-latency pub/sub without Kafka's operational complexity:
import nats
async def nats_publisher():
nc = await nats.connect("nats://nats:4222")
event = create_message_received_event(
session_id="sess-123",
user_msg="What is my account balance?",
correlation_id="req-abc",
)
await nc.publish(
"agent.message.received",
event.to_json().encode(),
)
await nc.flush()
await nc.close()
async def nats_subscriber():
nc = await nats.connect("nats://nats:4222")
sub = await nc.subscribe("agent.>") # Wildcard subscription
async for msg in sub.messages:
event = json.loads(msg.data.decode())
print(f"Received {event['event_type']} "
f"from {event['source_service']}")
NATS uses subject-based addressing with wildcards. The pattern agent.> subscribes to all events under the agent namespace, making it easy to build monitoring dashboards.
Exactly-Once Semantics
True exactly-once delivery is achievable through idempotent consumers. Store the event_id in a processed-events table and check it before processing:
async def process_event_exactly_once(self, event: dict):
event_id = event["event_id"]
if await self.db.event_already_processed(event_id):
return # Skip duplicate
await self.handle(event)
await self.db.mark_event_processed(event_id)
FAQ
When should I choose Kafka over NATS for an agent system?
Choose Kafka when you need durable event storage for replay, strict ordering within partitions, and high throughput at scale (thousands of events per second). Choose NATS when you need simple pub/sub with low latency, the event volume is moderate, and you want minimal operational overhead. For most agent systems under 500 requests per minute, NATS is simpler to operate.
How do I handle schema evolution when event formats change?
Include a version field in every event. When the schema changes, increment the version. Consumers should handle multiple versions by checking the version field and applying the appropriate deserialization logic. Avoid breaking changes — add new fields rather than renaming or removing existing ones.
Should every microservice publish events, or just the core conversation service?
Every service that performs a meaningful state change should publish events. The tool execution service should publish tool.execution.completed events. The RAG service should publish rag.retrieval.completed events. This gives downstream services full visibility into the agent's behavior without coupling them to the conversation service.
#EventDriven #Kafka #RabbitMQ #NATS #Microservices #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.