Building Agentic AI with Redis: Caching, Sessions, and Pub/Sub Patterns
Master Redis patterns for agentic AI including LLM response caching, conversation sessions, pub/sub for real-time events, and agent performance optimization.
Redis as the Real-Time Backbone of Agentic AI
Every production agentic AI system has a need for speed that relational databases cannot satisfy. Sub-millisecond session lookups for active conversations. Instant cache hits for repeated LLM queries. Real-time event propagation when an agent completes a tool call. Atomic counters for rate limiting. Leaderboards for agent performance tracking.
Redis fills all these roles. It is the Swiss Army knife of agentic AI infrastructure — not the primary data store (that remains PostgreSQL), but the performance layer that makes the difference between a sluggish agent and one that responds in under a second.
At CallSphere, Redis handles session management, LLM response caching, real-time event distribution, and operational metrics across all our agent deployments. This guide covers the Redis patterns we use in production.
Pattern 1: LLM Response Caching
LLM API calls are the single most expensive operation in an agentic AI system — expensive in both latency (1-10 seconds) and cost (dollars per thousand calls). Caching identical or near-identical requests saves both.
Exact Match Caching
The simplest and most effective caching strategy: hash the prompt and cache the response.
import hashlib
import json
import redis.asyncio as redis
class LLMCache:
def __init__(self, redis_client: redis.Redis, default_ttl: int = 3600):
self.redis = redis_client
self.default_ttl = default_ttl
def _cache_key(self, model: str, messages: list, tools: list = None) -> str:
"""Generate a deterministic cache key from the request."""
payload = {
"model": model,
"messages": messages,
"tools": sorted(tools or [], key=lambda t: t["name"]) if tools else [],
}
content = json.dumps(payload, sort_keys=True)
return f"llm:cache:{hashlib.sha256(content.encode()).hexdigest()}"
async def get(self, model: str, messages: list, tools: list = None) -> dict | None:
key = self._cache_key(model, messages, tools)
cached = await self.redis.get(key)
if cached:
await self.redis.hincrby("llm:cache:stats", "hits", 1)
return json.loads(cached)
await self.redis.hincrby("llm:cache:stats", "misses", 1)
return None
async def set(self, model: str, messages: list, response: dict,
tools: list = None, ttl: int = None):
key = self._cache_key(model, messages, tools)
await self.redis.setex(
key,
ttl or self.default_ttl,
json.dumps(response),
)
async def get_hit_rate(self) -> float:
stats = await self.redis.hgetall("llm:cache:stats")
hits = int(stats.get(b"hits", 0))
misses = int(stats.get(b"misses", 0))
total = hits + misses
return hits / total if total > 0 else 0.0
When to Cache (and When Not To)
| Cache This | Do Not Cache This |
|---|---|
| Classification/routing decisions | Conversations with user-specific context |
| Tool parameter extraction from templates | Creative or generative responses |
| FAQ-style questions | Time-sensitive queries (weather, stock prices) |
| System prompt + fixed input combinations | Multi-turn conversations |
| Embedding generation for static content | Responses that include PII |
Semantic Caching with Embeddings
For near-identical queries (e.g., "What are your business hours?" and "When are you open?"), use embedding similarity to find cached responses:
import numpy as np
class SemanticLLMCache:
def __init__(self, redis_client, embedding_client, similarity_threshold=0.95):
self.redis = redis_client
self.embedder = embedding_client
self.threshold = similarity_threshold
async def get_semantic(self, query: str) -> dict | None:
query_embedding = await self.embedder.embed(query)
# Search cached embeddings (stored in Redis sorted set by timestamp)
cached_keys = await self.redis.zrevrangebyscore(
"llm:semantic_cache:keys", "+inf", "-inf", start=0, num=100
)
for key in cached_keys:
cached_embedding = await self.redis.get(f"llm:semantic_cache:emb:{key.decode()}")
if cached_embedding:
cached_vec = np.frombuffer(cached_embedding, dtype=np.float32)
similarity = np.dot(query_embedding, cached_vec) / (
np.linalg.norm(query_embedding) * np.linalg.norm(cached_vec)
)
if similarity >= self.threshold:
response = await self.redis.get(f"llm:semantic_cache:resp:{key.decode()}")
if response:
return json.loads(response)
return None
Pattern 2: Conversation Session Storage
Active conversations need fast read/write access to session state. Redis hashes map naturally to conversation sessions.
class ConversationSession:
PREFIX = "session:conv"
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def create(self, conversation_id: str, initial_state: dict) -> None:
key = f"{self.PREFIX}:{conversation_id}"
pipe = self.redis.pipeline()
pipe.hset(key, mapping={
"tenant_id": initial_state["tenant_id"],
"current_agent": initial_state.get("current_agent", "triage"),
"status": "active",
"turn_count": 0,
"token_count": 0,
"started_at": datetime.utcnow().isoformat(),
"context": json.dumps(initial_state.get("context", {})),
"messages": json.dumps([]),
})
# Auto-expire after 2 hours of inactivity
pipe.expire(key, 7200)
await pipe.execute()
async def add_message(self, conversation_id: str, role: str, content: str,
tokens: int = 0) -> None:
key = f"{self.PREFIX}:{conversation_id}"
pipe = self.redis.pipeline()
# Append message to the messages list
messages_raw = await self.redis.hget(key, "messages")
messages = json.loads(messages_raw) if messages_raw else []
messages.append({"role": role, "content": content, "ts": datetime.utcnow().isoformat()})
# Keep only last 50 messages in session (full history in PostgreSQL)
if len(messages) > 50:
messages = messages[-50:]
pipe.hset(key, mapping={
"messages": json.dumps(messages),
"last_activity": datetime.utcnow().isoformat(),
})
pipe.hincrby(key, "turn_count", 1)
pipe.hincrby(key, "token_count", tokens)
pipe.expire(key, 7200) # Reset TTL on activity
await pipe.execute()
async def get(self, conversation_id: str) -> dict | None:
key = f"{self.PREFIX}:{conversation_id}"
data = await self.redis.hgetall(key)
if not data:
return None
result = {k.decode(): v.decode() for k, v in data.items()}
result["messages"] = json.loads(result.get("messages", "[]"))
result["context"] = json.loads(result.get("context", "{}"))
return result
async def update_agent(self, conversation_id: str, agent_name: str) -> None:
key = f"{self.PREFIX}:{conversation_id}"
await self.redis.hset(key, "current_agent", agent_name)
Why Redis for Sessions Instead of PostgreSQL?
The conversation session is read and updated on every single message. That is 2-10 database operations per user turn. With Redis, each operation takes under 1ms. With PostgreSQL, each operation takes 2-10ms and holds a connection from the pool. At 1000 concurrent conversations, that difference determines whether your system stays responsive or starts queuing.
The pattern is: Redis for the hot session, PostgreSQL for the cold record. When a conversation completes, flush the session data to PostgreSQL for long-term storage and analytics.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Pattern 3: Pub/Sub for Real-Time Agent Events
When an agent finishes processing, completes a tool call, or hands off a conversation, other system components need to know immediately. Redis Pub/Sub provides fire-and-forget event distribution with negligible latency.
class AgentEventBus:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.pubsub = self.redis.pubsub()
self.handlers = {}
async def publish(self, event_type: str, payload: dict) -> None:
message = json.dumps({
"event": event_type,
"payload": payload,
"timestamp": datetime.utcnow().isoformat(),
})
await self.redis.publish(f"agent:events:{event_type}", message)
async def subscribe(self, event_type: str, handler) -> None:
self.handlers[event_type] = handler
await self.pubsub.subscribe(f"agent:events:{event_type}")
async def listen(self) -> None:
async for message in self.pubsub.listen():
if message["type"] == "message":
data = json.loads(message["data"])
handler = self.handlers.get(data["event"])
if handler:
await handler(data["payload"])
# Usage
event_bus = AgentEventBus(redis_client)
# Frontend WebSocket server subscribes to conversation events
await event_bus.subscribe("agent.response.streaming", send_to_websocket)
await event_bus.subscribe("agent.tool.started", update_ui_status)
await event_bus.subscribe("agent.handoff.completed", update_ui_agent)
# Agent publishes events during processing
await event_bus.publish("agent.response.streaming", {
"conversation_id": conv_id,
"chunk": "Let me look up your invoice...",
"agent": "billing_agent",
})
await event_bus.publish("agent.tool.started", {
"conversation_id": conv_id,
"tool": "lookup_invoice",
"agent": "billing_agent",
})
Pub/Sub vs. Streams for Agent Events
Redis Pub/Sub is fire-and-forget — if no subscriber is listening, the message is lost. For UI updates and real-time notifications, this is fine. For events that must not be lost (tool execution requests, handoff commands), use Redis Streams instead:
class DurableAgentEventBus:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def publish(self, stream: str, event: dict) -> str:
event_id = await self.redis.xadd(stream, event, maxlen=10000)
return event_id
async def consume(self, stream: str, group: str, consumer: str):
# Create consumer group if not exists
try:
await self.redis.xgroup_create(stream, group, id="0", mkstream=True)
except redis.ResponseError:
pass # Group already exists
while True:
messages = await self.redis.xreadgroup(
group, consumer, {stream: ">"}, count=10, block=5000
)
for _, entries in messages:
for msg_id, data in entries:
yield msg_id, data
await self.redis.xack(stream, group, msg_id)
Pattern 4: Sorted Sets for Agent Performance Leaderboards
Track and rank agent performance metrics using Redis sorted sets. This is useful for operational dashboards and for identifying which agents need prompt optimization.
class AgentLeaderboard:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def record_resolution(self, agent_name: str, resolution_time_seconds: float,
was_successful: bool) -> None:
today = datetime.utcnow().strftime("%Y-%m-%d")
pipe = self.redis.pipeline()
# Track successful resolutions
if was_successful:
pipe.zincrby(f"leaderboard:resolutions:{today}", 1, agent_name)
# Track average resolution time (using two sorted sets)
pipe.zincrby(f"leaderboard:total_time:{today}", resolution_time_seconds, agent_name)
pipe.zincrby(f"leaderboard:total_count:{today}", 1, agent_name)
# Expire after 30 days
pipe.expire(f"leaderboard:resolutions:{today}", 2592000)
pipe.expire(f"leaderboard:total_time:{today}", 2592000)
pipe.expire(f"leaderboard:total_count:{today}", 2592000)
await pipe.execute()
async def get_top_agents(self, date: str, limit: int = 10) -> list:
agents = await self.redis.zrevrangebyscore(
f"leaderboard:resolutions:{date}", "+inf", "-inf",
start=0, num=limit, withscores=True,
)
results = []
for agent_name, resolution_count in agents:
name = agent_name.decode()
total_time = await self.redis.zscore(f"leaderboard:total_time:{date}", name) or 0
total_count = await self.redis.zscore(f"leaderboard:total_count:{date}", name) or 1
avg_time = total_time / total_count
results.append({
"agent": name,
"resolutions": int(resolution_count),
"avg_resolution_time_seconds": round(avg_time, 1),
})
return results
Pattern 5: Streams for Agent Audit Logs
Every agent action needs an immutable audit trail. Redis Streams provide an append-only log with consumer group support, perfect for capturing agent decisions in real time and processing them asynchronously.
class AgentAuditLog:
STREAM_KEY = "agent:audit:log"
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def log_action(self, conversation_id: str, agent_name: str,
action: str, details: dict) -> str:
entry = {
"conversation_id": conversation_id,
"agent": agent_name,
"action": action,
"details": json.dumps(details),
"timestamp": datetime.utcnow().isoformat(),
}
msg_id = await self.redis.xadd(self.STREAM_KEY, entry, maxlen=100000)
return msg_id
async def get_conversation_audit(self, conversation_id: str, count: int = 100) -> list:
"""Get audit entries for a specific conversation (scan approach)."""
entries = []
last_id = "0"
while True:
results = await self.redis.xrange(self.STREAM_KEY, min=last_id, count=500)
if not results:
break
for msg_id, data in results:
if data.get(b"conversation_id", b"").decode() == conversation_id:
entries.append({
"id": msg_id,
"agent": data[b"agent"].decode(),
"action": data[b"action"].decode(),
"details": json.loads(data[b"details"]),
"timestamp": data[b"timestamp"].decode(),
})
last_id = msg_id
if len(results) < 500:
break
return entries[:count]
Redis Connection Management
Connection Pooling
Never create a new Redis connection per request. Use a connection pool:
import redis.asyncio as redis
# Create a shared connection pool
redis_pool = redis.ConnectionPool.from_url(
"redis://redis:6379/0",
max_connections=50,
decode_responses=False,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
)
redis_client = redis.Redis(connection_pool=redis_pool)
Redis Cluster for Scale
When a single Redis instance is insufficient (typically above 100K concurrent conversations), use Redis Cluster to shard data across multiple nodes. Design your key naming to support hash tags for related keys:
# Keys for the same conversation use hash tag to ensure
# they land on the same shard
session_key = f"session:conv:{{{conversation_id}}}"
messages_key = f"messages:conv:{{{conversation_id}}}"
context_key = f"context:conv:{{{conversation_id}}}"
# The {conversation_id} hash tag ensures co-location
Frequently Asked Questions
How much memory does Redis need for agent sessions?
A typical conversation session with 50 messages consumes 10-50KB in Redis. With 10,000 concurrent conversations, that is 100-500MB. Add LLM response cache (typically 1-5GB depending on cache size and TTL) and operational counters (negligible). Plan for 4-8GB of Redis memory for a medium-scale agent deployment.
Should I use Redis or Memcached for LLM response caching?
Redis. Memcached is faster for simple key-value lookups but lacks the data structures (hashes, sorted sets, streams, pub/sub) that agent systems need. You would end up running both Memcached and Redis, adding operational complexity for marginal performance gain on a single use case.
How do I handle Redis downtime without losing active conversations?
Design your agent system to degrade gracefully when Redis is unavailable. Fall back to PostgreSQL for session reads (slower but functional). Disable caching and accept higher LLM costs temporarily. Queue pub/sub events in memory and replay when Redis recovers. Use Redis Sentinel or Redis Cluster for automatic failover with sub-second recovery.
What TTL should I set for LLM response cache entries?
It depends on how time-sensitive the content is. For FAQ-style responses and classification results, 24 hours is appropriate. For responses that reference live data (account balances, appointment availability), use 5-15 minutes or skip caching entirely. For embedding computations on static content, cache for 7 days or longer.
Can Redis Streams replace Kafka for agent event processing?
For small to medium agent deployments (under 50K events per second), Redis Streams are a simpler alternative to Kafka with similar semantics: append-only log, consumer groups, acknowledgment. Choose Kafka when you need multi-datacenter replication, longer retention (weeks or months), or higher throughput. Redis Streams are ideal when you already have Redis deployed and want to avoid adding another infrastructure component.
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.