Skip to content
Technology10 min read0 views

Building Agentic AI with Redis: Caching, Sessions, and Pub/Sub Patterns

Master Redis patterns for agentic AI including LLM response caching, conversation sessions, pub/sub for real-time events, and agent performance optimization.

Redis as the Real-Time Backbone of Agentic AI

Every production agentic AI system has a need for speed that relational databases cannot satisfy. Sub-millisecond session lookups for active conversations. Instant cache hits for repeated LLM queries. Real-time event propagation when an agent completes a tool call. Atomic counters for rate limiting. Leaderboards for agent performance tracking.

Redis fills all these roles. It is the Swiss Army knife of agentic AI infrastructure — not the primary data store (that remains PostgreSQL), but the performance layer that makes the difference between a sluggish agent and one that responds in under a second.

At CallSphere, Redis handles session management, LLM response caching, real-time event distribution, and operational metrics across all our agent deployments. This guide covers the Redis patterns we use in production.

Pattern 1: LLM Response Caching

LLM API calls are the single most expensive operation in an agentic AI system — expensive in both latency (1-10 seconds) and cost (dollars per thousand calls). Caching identical or near-identical requests saves both.

Exact Match Caching

The simplest and most effective caching strategy: hash the prompt and cache the response.

import hashlib
import json
import redis.asyncio as redis

class LLMCache:
    def __init__(self, redis_client: redis.Redis, default_ttl: int = 3600):
        self.redis = redis_client
        self.default_ttl = default_ttl

    def _cache_key(self, model: str, messages: list, tools: list = None) -> str:
        """Generate a deterministic cache key from the request."""
        payload = {
            "model": model,
            "messages": messages,
            "tools": sorted(tools or [], key=lambda t: t["name"]) if tools else [],
        }
        content = json.dumps(payload, sort_keys=True)
        return f"llm:cache:{hashlib.sha256(content.encode()).hexdigest()}"

    async def get(self, model: str, messages: list, tools: list = None) -> dict | None:
        key = self._cache_key(model, messages, tools)
        cached = await self.redis.get(key)
        if cached:
            await self.redis.hincrby("llm:cache:stats", "hits", 1)
            return json.loads(cached)
        await self.redis.hincrby("llm:cache:stats", "misses", 1)
        return None

    async def set(self, model: str, messages: list, response: dict,
                  tools: list = None, ttl: int = None):
        key = self._cache_key(model, messages, tools)
        await self.redis.setex(
            key,
            ttl or self.default_ttl,
            json.dumps(response),
        )

    async def get_hit_rate(self) -> float:
        stats = await self.redis.hgetall("llm:cache:stats")
        hits = int(stats.get(b"hits", 0))
        misses = int(stats.get(b"misses", 0))
        total = hits + misses
        return hits / total if total > 0 else 0.0

When to Cache (and When Not To)

Cache This Do Not Cache This
Classification/routing decisions Conversations with user-specific context
Tool parameter extraction from templates Creative or generative responses
FAQ-style questions Time-sensitive queries (weather, stock prices)
System prompt + fixed input combinations Multi-turn conversations
Embedding generation for static content Responses that include PII

Semantic Caching with Embeddings

For near-identical queries (e.g., "What are your business hours?" and "When are you open?"), use embedding similarity to find cached responses:

import numpy as np

class SemanticLLMCache:
    def __init__(self, redis_client, embedding_client, similarity_threshold=0.95):
        self.redis = redis_client
        self.embedder = embedding_client
        self.threshold = similarity_threshold

    async def get_semantic(self, query: str) -> dict | None:
        query_embedding = await self.embedder.embed(query)

        # Search cached embeddings (stored in Redis sorted set by timestamp)
        cached_keys = await self.redis.zrevrangebyscore(
            "llm:semantic_cache:keys", "+inf", "-inf", start=0, num=100
        )

        for key in cached_keys:
            cached_embedding = await self.redis.get(f"llm:semantic_cache:emb:{key.decode()}")
            if cached_embedding:
                cached_vec = np.frombuffer(cached_embedding, dtype=np.float32)
                similarity = np.dot(query_embedding, cached_vec) / (
                    np.linalg.norm(query_embedding) * np.linalg.norm(cached_vec)
                )
                if similarity >= self.threshold:
                    response = await self.redis.get(f"llm:semantic_cache:resp:{key.decode()}")
                    if response:
                        return json.loads(response)
        return None

Pattern 2: Conversation Session Storage

Active conversations need fast read/write access to session state. Redis hashes map naturally to conversation sessions.

class ConversationSession:
    PREFIX = "session:conv"

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    async def create(self, conversation_id: str, initial_state: dict) -> None:
        key = f"{self.PREFIX}:{conversation_id}"
        pipe = self.redis.pipeline()
        pipe.hset(key, mapping={
            "tenant_id": initial_state["tenant_id"],
            "current_agent": initial_state.get("current_agent", "triage"),
            "status": "active",
            "turn_count": 0,
            "token_count": 0,
            "started_at": datetime.utcnow().isoformat(),
            "context": json.dumps(initial_state.get("context", {})),
            "messages": json.dumps([]),
        })
        # Auto-expire after 2 hours of inactivity
        pipe.expire(key, 7200)
        await pipe.execute()

    async def add_message(self, conversation_id: str, role: str, content: str,
                          tokens: int = 0) -> None:
        key = f"{self.PREFIX}:{conversation_id}"
        pipe = self.redis.pipeline()

        # Append message to the messages list
        messages_raw = await self.redis.hget(key, "messages")
        messages = json.loads(messages_raw) if messages_raw else []
        messages.append({"role": role, "content": content, "ts": datetime.utcnow().isoformat()})

        # Keep only last 50 messages in session (full history in PostgreSQL)
        if len(messages) > 50:
            messages = messages[-50:]

        pipe.hset(key, mapping={
            "messages": json.dumps(messages),
            "last_activity": datetime.utcnow().isoformat(),
        })
        pipe.hincrby(key, "turn_count", 1)
        pipe.hincrby(key, "token_count", tokens)
        pipe.expire(key, 7200)  # Reset TTL on activity
        await pipe.execute()

    async def get(self, conversation_id: str) -> dict | None:
        key = f"{self.PREFIX}:{conversation_id}"
        data = await self.redis.hgetall(key)
        if not data:
            return None
        result = {k.decode(): v.decode() for k, v in data.items()}
        result["messages"] = json.loads(result.get("messages", "[]"))
        result["context"] = json.loads(result.get("context", "{}"))
        return result

    async def update_agent(self, conversation_id: str, agent_name: str) -> None:
        key = f"{self.PREFIX}:{conversation_id}"
        await self.redis.hset(key, "current_agent", agent_name)

Why Redis for Sessions Instead of PostgreSQL?

The conversation session is read and updated on every single message. That is 2-10 database operations per user turn. With Redis, each operation takes under 1ms. With PostgreSQL, each operation takes 2-10ms and holds a connection from the pool. At 1000 concurrent conversations, that difference determines whether your system stays responsive or starts queuing.

The pattern is: Redis for the hot session, PostgreSQL for the cold record. When a conversation completes, flush the session data to PostgreSQL for long-term storage and analytics.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Pattern 3: Pub/Sub for Real-Time Agent Events

When an agent finishes processing, completes a tool call, or hands off a conversation, other system components need to know immediately. Redis Pub/Sub provides fire-and-forget event distribution with negligible latency.

class AgentEventBus:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.pubsub = self.redis.pubsub()
        self.handlers = {}

    async def publish(self, event_type: str, payload: dict) -> None:
        message = json.dumps({
            "event": event_type,
            "payload": payload,
            "timestamp": datetime.utcnow().isoformat(),
        })
        await self.redis.publish(f"agent:events:{event_type}", message)

    async def subscribe(self, event_type: str, handler) -> None:
        self.handlers[event_type] = handler
        await self.pubsub.subscribe(f"agent:events:{event_type}")

    async def listen(self) -> None:
        async for message in self.pubsub.listen():
            if message["type"] == "message":
                data = json.loads(message["data"])
                handler = self.handlers.get(data["event"])
                if handler:
                    await handler(data["payload"])

# Usage
event_bus = AgentEventBus(redis_client)

# Frontend WebSocket server subscribes to conversation events
await event_bus.subscribe("agent.response.streaming", send_to_websocket)
await event_bus.subscribe("agent.tool.started", update_ui_status)
await event_bus.subscribe("agent.handoff.completed", update_ui_agent)

# Agent publishes events during processing
await event_bus.publish("agent.response.streaming", {
    "conversation_id": conv_id,
    "chunk": "Let me look up your invoice...",
    "agent": "billing_agent",
})

await event_bus.publish("agent.tool.started", {
    "conversation_id": conv_id,
    "tool": "lookup_invoice",
    "agent": "billing_agent",
})

Pub/Sub vs. Streams for Agent Events

Redis Pub/Sub is fire-and-forget — if no subscriber is listening, the message is lost. For UI updates and real-time notifications, this is fine. For events that must not be lost (tool execution requests, handoff commands), use Redis Streams instead:

class DurableAgentEventBus:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    async def publish(self, stream: str, event: dict) -> str:
        event_id = await self.redis.xadd(stream, event, maxlen=10000)
        return event_id

    async def consume(self, stream: str, group: str, consumer: str):
        # Create consumer group if not exists
        try:
            await self.redis.xgroup_create(stream, group, id="0", mkstream=True)
        except redis.ResponseError:
            pass  # Group already exists

        while True:
            messages = await self.redis.xreadgroup(
                group, consumer, {stream: ">"}, count=10, block=5000
            )
            for _, entries in messages:
                for msg_id, data in entries:
                    yield msg_id, data
                    await self.redis.xack(stream, group, msg_id)

Pattern 4: Sorted Sets for Agent Performance Leaderboards

Track and rank agent performance metrics using Redis sorted sets. This is useful for operational dashboards and for identifying which agents need prompt optimization.

class AgentLeaderboard:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    async def record_resolution(self, agent_name: str, resolution_time_seconds: float,
                                 was_successful: bool) -> None:
        today = datetime.utcnow().strftime("%Y-%m-%d")
        pipe = self.redis.pipeline()

        # Track successful resolutions
        if was_successful:
            pipe.zincrby(f"leaderboard:resolutions:{today}", 1, agent_name)

        # Track average resolution time (using two sorted sets)
        pipe.zincrby(f"leaderboard:total_time:{today}", resolution_time_seconds, agent_name)
        pipe.zincrby(f"leaderboard:total_count:{today}", 1, agent_name)

        # Expire after 30 days
        pipe.expire(f"leaderboard:resolutions:{today}", 2592000)
        pipe.expire(f"leaderboard:total_time:{today}", 2592000)
        pipe.expire(f"leaderboard:total_count:{today}", 2592000)
        await pipe.execute()

    async def get_top_agents(self, date: str, limit: int = 10) -> list:
        agents = await self.redis.zrevrangebyscore(
            f"leaderboard:resolutions:{date}", "+inf", "-inf",
            start=0, num=limit, withscores=True,
        )
        results = []
        for agent_name, resolution_count in agents:
            name = agent_name.decode()
            total_time = await self.redis.zscore(f"leaderboard:total_time:{date}", name) or 0
            total_count = await self.redis.zscore(f"leaderboard:total_count:{date}", name) or 1
            avg_time = total_time / total_count
            results.append({
                "agent": name,
                "resolutions": int(resolution_count),
                "avg_resolution_time_seconds": round(avg_time, 1),
            })
        return results

Pattern 5: Streams for Agent Audit Logs

Every agent action needs an immutable audit trail. Redis Streams provide an append-only log with consumer group support, perfect for capturing agent decisions in real time and processing them asynchronously.

class AgentAuditLog:
    STREAM_KEY = "agent:audit:log"

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    async def log_action(self, conversation_id: str, agent_name: str,
                          action: str, details: dict) -> str:
        entry = {
            "conversation_id": conversation_id,
            "agent": agent_name,
            "action": action,
            "details": json.dumps(details),
            "timestamp": datetime.utcnow().isoformat(),
        }
        msg_id = await self.redis.xadd(self.STREAM_KEY, entry, maxlen=100000)
        return msg_id

    async def get_conversation_audit(self, conversation_id: str, count: int = 100) -> list:
        """Get audit entries for a specific conversation (scan approach)."""
        entries = []
        last_id = "0"
        while True:
            results = await self.redis.xrange(self.STREAM_KEY, min=last_id, count=500)
            if not results:
                break
            for msg_id, data in results:
                if data.get(b"conversation_id", b"").decode() == conversation_id:
                    entries.append({
                        "id": msg_id,
                        "agent": data[b"agent"].decode(),
                        "action": data[b"action"].decode(),
                        "details": json.loads(data[b"details"]),
                        "timestamp": data[b"timestamp"].decode(),
                    })
                last_id = msg_id
            if len(results) < 500:
                break
        return entries[:count]

Redis Connection Management

Connection Pooling

Never create a new Redis connection per request. Use a connection pool:

import redis.asyncio as redis

# Create a shared connection pool
redis_pool = redis.ConnectionPool.from_url(
    "redis://redis:6379/0",
    max_connections=50,
    decode_responses=False,
    socket_connect_timeout=5,
    socket_timeout=5,
    retry_on_timeout=True,
)

redis_client = redis.Redis(connection_pool=redis_pool)

Redis Cluster for Scale

When a single Redis instance is insufficient (typically above 100K concurrent conversations), use Redis Cluster to shard data across multiple nodes. Design your key naming to support hash tags for related keys:

# Keys for the same conversation use hash tag to ensure
# they land on the same shard
session_key = f"session:conv:{{{conversation_id}}}"
messages_key = f"messages:conv:{{{conversation_id}}}"
context_key = f"context:conv:{{{conversation_id}}}"
# The {conversation_id} hash tag ensures co-location

Frequently Asked Questions

How much memory does Redis need for agent sessions?

A typical conversation session with 50 messages consumes 10-50KB in Redis. With 10,000 concurrent conversations, that is 100-500MB. Add LLM response cache (typically 1-5GB depending on cache size and TTL) and operational counters (negligible). Plan for 4-8GB of Redis memory for a medium-scale agent deployment.

Should I use Redis or Memcached for LLM response caching?

Redis. Memcached is faster for simple key-value lookups but lacks the data structures (hashes, sorted sets, streams, pub/sub) that agent systems need. You would end up running both Memcached and Redis, adding operational complexity for marginal performance gain on a single use case.

How do I handle Redis downtime without losing active conversations?

Design your agent system to degrade gracefully when Redis is unavailable. Fall back to PostgreSQL for session reads (slower but functional). Disable caching and accept higher LLM costs temporarily. Queue pub/sub events in memory and replay when Redis recovers. Use Redis Sentinel or Redis Cluster for automatic failover with sub-second recovery.

What TTL should I set for LLM response cache entries?

It depends on how time-sensitive the content is. For FAQ-style responses and classification results, 24 hours is appropriate. For responses that reference live data (account balances, appointment availability), use 5-15 minutes or skip caching entirely. For embedding computations on static content, cache for 7 days or longer.

Can Redis Streams replace Kafka for agent event processing?

For small to medium agent deployments (under 50K events per second), Redis Streams are a simpler alternative to Kafka with similar semantics: append-only log, consumer groups, acknowledgment. Choose Kafka when you need multi-datacenter replication, longer retention (weeks or months), or higher throughput. Redis Streams are ideal when you already have Redis deployed and want to avoid adding another infrastructure component.

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.