Redis Pub/Sub for AI Agents: Real-Time Communication Between Agent Instances
Implement real-time communication between distributed AI agent instances using Redis Pub/Sub, covering channel design, message serialization, scaling patterns, and understanding delivery guarantees.
Redis Pub/Sub as an Agent Communication Layer
When AI agents run across multiple processes or servers, they need a lightweight way to broadcast messages to each other. Redis Pub/Sub provides exactly this: a fire-and-forget messaging system where publishers send messages to channels and all current subscribers on those channels receive them instantly.
Redis Pub/Sub is attractive for agent communication because of its simplicity and speed. There is no message queue to configure, no consumer groups to manage, and no acknowledgment protocol. A publisher calls PUBLISH, and every subscriber on that channel gets the message within microseconds (on a local network). This makes it ideal for coordination signals, status broadcasts, and real-time notifications between agents.
The tradeoff is that Redis Pub/Sub offers no persistence and no delivery guarantees. If a subscriber is disconnected when a message is published, that message is lost. This is acceptable for real-time coordination where stale messages have no value, but it means you should not use raw Pub/Sub for critical events that must never be missed.
Channel Design for Multi-Agent Systems
Design your channel namespace to support filtering at the subscription level. Use a hierarchical naming convention so subscribers can listen to broad categories or narrow topics.
import redis.asyncio as redis
import json
from dataclasses import dataclass, asdict
from typing import Optional
# Channel naming convention:
# agent:{agent_id}:events — events from a specific agent
# agent:all:events — broadcast to all agents
# agent:coordination:tasks — task assignment channel
# agent:coordination:results — result collection channel
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class AgentMessage:
sender_id: str
message_type: str
payload: dict
channel: str = ""
def serialize(self) -> str:
return json.dumps(asdict(self))
@classmethod
def deserialize(cls, data: str) -> "AgentMessage":
return cls(**json.loads(data))
class AgentPubSub:
def __init__(self, agent_id: str, redis_url: str = REDIS_URL):
self.agent_id = agent_id
self.redis_url = redis_url
self.client: Optional[redis.Redis] = None
self.pubsub: Optional[redis.client.PubSub] = None
async def connect(self):
self.client = redis.from_url(self.redis_url)
self.pubsub = self.client.pubsub()
async def publish(self, channel: str, message_type: str, payload: dict):
msg = AgentMessage(
sender_id=self.agent_id,
message_type=message_type,
payload=payload,
channel=channel,
)
await self.client.publish(channel, msg.serialize())
async def subscribe(self, *channels: str):
await self.pubsub.subscribe(*channels)
async def subscribe_pattern(self, pattern: str):
await self.pubsub.psubscribe(pattern)
async def listen(self):
async for message in self.pubsub.listen():
if message["type"] in ("message", "pmessage"):
data = message["data"]
if isinstance(data, bytes):
data = data.decode("utf-8")
yield AgentMessage.deserialize(data)
async def close(self):
if self.pubsub:
await self.pubsub.close()
if self.client:
await self.client.close()
Pattern: Agent Task Distribution
A common pattern is a coordinator agent that distributes tasks to worker agents. The coordinator publishes tasks on a shared channel, and workers claim them using a Redis lock to prevent double-processing.
import asyncio
import uuid
class CoordinatorAgent:
def __init__(self, pubsub: AgentPubSub):
self.pubsub = pubsub
self.pending_tasks: dict[str, asyncio.Future] = {}
async def assign_task(self, task_prompt: str, timeout: float = 30.0) -> dict:
task_id = str(uuid.uuid4())
future = asyncio.get_event_loop().create_future()
self.pending_tasks[task_id] = future
await self.pubsub.publish(
"agent:coordination:tasks",
"task_assignment",
{"task_id": task_id, "prompt": task_prompt},
)
try:
result = await asyncio.wait_for(future, timeout=timeout)
return result
except asyncio.TimeoutError:
self.pending_tasks.pop(task_id, None)
return {"error": "Task timed out", "task_id": task_id}
async def listen_for_results(self):
await self.pubsub.subscribe("agent:coordination:results")
async for msg in self.pubsub.listen():
if msg.message_type == "task_result":
task_id = msg.payload["task_id"]
future = self.pending_tasks.pop(task_id, None)
if future and not future.done():
future.set_result(msg.payload)
class WorkerAgent:
def __init__(self, pubsub: AgentPubSub):
self.pubsub = pubsub
async def run(self):
await self.pubsub.subscribe("agent:coordination:tasks")
async for msg in self.pubsub.listen():
if msg.message_type == "task_assignment":
claimed = await self._try_claim(msg.payload["task_id"])
if claimed:
result = await self._process_task(msg.payload)
await self.pubsub.publish(
"agent:coordination:results",
"task_result",
{"task_id": msg.payload["task_id"], "result": result},
)
async def _try_claim(self, task_id: str) -> bool:
lock_key = f"task_lock:{task_id}"
acquired = await self.pubsub.client.set(
lock_key, self.pubsub.agent_id, nx=True, ex=60
)
return bool(acquired)
async def _process_task(self, task: dict) -> str:
# Run AI agent logic here
return f"Processed: {task['prompt']}"
The SET NX (set-if-not-exists) command acts as a distributed lock. Only one worker can claim each task, preventing duplicate processing even when multiple workers receive the same Pub/Sub message.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Scaling Considerations
Redis Pub/Sub has a key scaling property: every subscriber receives every message on its subscribed channels. With 10 worker agents subscribed to the task channel, all 10 receive each task (which is why we need the lock pattern above). This is broadcast semantics, not competing consumer semantics.
For competing consumers — where you want only one worker to process each message — consider Redis Streams instead of Pub/Sub.
class AgentStreamConsumer:
"""Redis Streams for competing consumer pattern."""
def __init__(self, agent_id: str, redis_url: str = REDIS_URL):
self.agent_id = agent_id
self.client: Optional[redis.Redis] = None
self.group = "agent-workers"
self.stream = "agent:tasks"
async def connect(self):
self.client = redis.from_url(self.redis_url)
try:
await self.client.xgroup_create(
self.stream, self.group, id="0", mkstream=True
)
except redis.ResponseError:
pass # Group already exists
async def consume(self):
while True:
messages = await self.client.xreadgroup(
self.group,
self.agent_id,
{self.stream: ">"},
count=1,
block=5000,
)
for stream_name, entries in messages:
for entry_id, data in entries:
yield entry_id, data
await self.client.xack(
self.stream, self.group, entry_id
)
Redis Streams give you consumer groups (competing consumers), message acknowledgment, and message persistence. Use Pub/Sub for broadcasts and coordination signals; use Streams for task queues and event processing where durability matters.
FAQ
When should I use Redis Pub/Sub versus Redis Streams for AI agent communication?
Use Pub/Sub when every subscriber should see every message — status broadcasts, coordination signals, real-time dashboard updates, and agent discovery announcements. Use Redis Streams when messages must be processed exactly once by one consumer, when you need message persistence across restarts, or when you need consumer group semantics for load balancing. Many production systems use both: Pub/Sub for real-time notifications and Streams for reliable task processing.
What happens when a Redis Pub/Sub subscriber falls behind on processing messages?
Redis buffers messages in the subscriber's output buffer. If the buffer exceeds the configured limit (client-output-buffer-limit pubsub), Redis disconnects the slow subscriber to protect server memory. The default limit is 32MB (hard) and 8MB sustained over 60 seconds. For AI agents that might have slow processing spikes, increase these limits or add your own buffering layer — receive messages into an in-process queue and process them asynchronously so the Redis subscription loop stays fast.
Can Redis Pub/Sub work across multiple Redis instances for high availability?
Standard Redis Pub/Sub does not replicate messages across Redis Cluster nodes — a message published on one node is only delivered to subscribers on that same node. For multi-node setups, use Redis Cluster with hash tags to ensure related channels map to the same node, or use a dedicated Redis instance for Pub/Sub separate from your data store. Redis Sentinel provides failover for a single primary, which works well for Pub/Sub since the subscriber automatically reconnects to the new primary.
#Redis #PubSub #DistributedSystems #RealTimeAI #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.