Distributed Memory for Multi-Agent Systems: Sharing State Across Agent Instances
Learn how to build distributed memory for multi-agent systems using Redis, shared databases, event sourcing, and consistency patterns — enabling multiple agent instances to collaborate with shared state.
The Multi-Agent Memory Challenge
When you run a single agent, memory is simple — it lives in one process. But production systems often run multiple agent instances: a research agent, a writing agent, and a review agent collaborating on a report; or multiple copies of the same agent handling concurrent users. These agents need to share state: facts discovered by one agent must be visible to others. The challenge is doing this reliably without race conditions, stale reads, or data loss.
Distributed memory requires solving three problems: storage (where does shared state live?), consistency (how do agents see up-to-date data?), and coordination (how do agents avoid conflicting actions?).
Approach 1: Redis as Shared Memory
Redis is the most common choice for shared agent state. It is fast, supports multiple data structures, and provides atomic operations that prevent race conditions.
import redis
import json
from typing import Optional, List, Dict, Any
from datetime import datetime
class RedisAgentMemory:
def __init__(self, redis_url: str = "redis://localhost:6379/0", namespace: str = "agents"):
self.client = redis.from_url(redis_url, decode_responses=True)
self.ns = namespace
def _key(self, *parts) -> str:
return f"{self.ns}:{':'.join(parts)}"
# --- Shared Facts ---
def store_fact(self, agent_id: str, fact: str, category: str, metadata: dict = None):
"""Store a fact that any agent can retrieve."""
entry = {
"fact": fact,
"category": category,
"agent_id": agent_id,
"timestamp": datetime.utcnow().isoformat(),
"metadata": metadata or {},
}
fact_id = self.client.incr(self._key("fact_counter"))
self.client.hset(self._key("facts", str(fact_id)), mapping={
k: json.dumps(v) if isinstance(v, dict) else str(v)
for k, v in entry.items()
})
# Index by category for efficient retrieval
self.client.sadd(self._key("category", category), str(fact_id))
return fact_id
def get_facts_by_category(self, category: str) -> List[Dict]:
"""Retrieve all facts in a category, from any agent."""
fact_ids = self.client.smembers(self._key("category", category))
facts = []
for fid in fact_ids:
data = self.client.hgetall(self._key("facts", fid))
if data:
facts.append(data)
return facts
# --- Agent Status ---
def set_agent_status(self, agent_id: str, status: str, detail: str = ""):
"""Publish agent status so other agents can coordinate."""
self.client.hset(self._key("status", agent_id), mapping={
"status": status,
"detail": detail,
"updated_at": datetime.utcnow().isoformat(),
})
def get_agent_status(self, agent_id: str) -> Optional[Dict]:
return self.client.hgetall(self._key("status", agent_id))
# --- Distributed Lock ---
def acquire_lock(self, resource: str, agent_id: str, ttl_seconds: int = 30) -> bool:
"""Acquire a distributed lock to prevent conflicting operations."""
lock_key = self._key("lock", resource)
return bool(self.client.set(lock_key, agent_id, nx=True, ex=ttl_seconds))
def release_lock(self, resource: str, agent_id: str):
"""Release a lock only if this agent holds it."""
lock_key = self._key("lock", resource)
current = self.client.get(lock_key)
if current == agent_id:
self.client.delete(lock_key)
Approach 2: Event Sourcing
Instead of sharing mutable state, agents publish events to a shared log. Each agent reads the event stream to build its own view of the world. This pattern provides a complete audit trail and makes it easy to replay history.
from dataclasses import dataclass, field
from typing import Callable
@dataclass
class AgentEvent:
event_type: str # "fact_discovered", "task_completed", "error_occurred"
agent_id: str
payload: Dict[str, Any]
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
event_id: str = ""
class EventStore:
def __init__(self, redis_url: str = "redis://localhost:6379/0"):
self.client = redis.from_url(redis_url, decode_responses=True)
self.stream_key = "agent_events"
def publish(self, event: AgentEvent) -> str:
"""Append an event to the shared event stream."""
event_data = {
"event_type": event.event_type,
"agent_id": event.agent_id,
"payload": json.dumps(event.payload),
"timestamp": event.timestamp,
}
event_id = self.client.xadd(self.stream_key, event_data)
return event_id
def read_events(
self, since: str = "0-0", count: int = 100
) -> List[Dict]:
"""Read events from the stream, optionally from a specific point."""
entries = self.client.xrange(self.stream_key, min=since, count=count)
events = []
for event_id, data in entries:
data["event_id"] = event_id
data["payload"] = json.loads(data["payload"])
events.append(data)
return events
def subscribe(self, last_id: str = "$") -> None:
"""Block and wait for new events (for real-time processing)."""
while True:
entries = self.client.xread(
{self.stream_key: last_id}, block=5000, count=10
)
for stream, messages in entries:
for event_id, data in messages:
last_id = event_id
data["payload"] = json.loads(data["payload"])
yield data
# Usage: Research agent publishes findings
store = EventStore()
store.publish(AgentEvent(
event_type="fact_discovered",
agent_id="research_agent_1",
payload={"topic": "market_size", "value": "$4.2B", "source": "report_xyz"},
))
Approach 3: Shared Database with Optimistic Locking
For agents that need structured queries and transactional guarantees, a shared PostgreSQL database works well. Optimistic locking prevents lost updates when two agents modify the same record.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import asyncpg
from typing import Optional
class SharedAgentDB:
def __init__(self, dsn: str):
self.dsn = dsn
self.pool: Optional[asyncpg.Pool] = None
async def connect(self):
self.pool = await asyncpg.create_pool(self.dsn, min_size=2, max_size=10)
await self.pool.execute("""
CREATE TABLE IF NOT EXISTS shared_memory (
id SERIAL PRIMARY KEY,
key TEXT UNIQUE NOT NULL,
value JSONB NOT NULL,
version INTEGER DEFAULT 1,
updated_by TEXT NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW()
)
""")
async def read(self, key: str) -> Optional[Dict]:
row = await self.pool.fetchrow(
"SELECT value, version FROM shared_memory WHERE key = $1", key
)
if row:
return {"value": json.loads(row["value"]), "version": row["version"]}
return None
async def write(self, key: str, value: Any, agent_id: str, expected_version: int = None):
"""Write with optimistic locking — fails if another agent updated first."""
json_value = json.dumps(value)
if expected_version is not None:
# Update only if version matches (optimistic lock)
result = await self.pool.execute(
"""UPDATE shared_memory
SET value = $1, version = version + 1,
updated_by = $2, updated_at = NOW()
WHERE key = $3 AND version = $4""",
json_value, agent_id, key, expected_version,
)
if result == "UPDATE 0":
raise ConflictError(
f"Key '{key}' was modified by another agent. "
f"Expected version {expected_version}."
)
else:
# Upsert
await self.pool.execute(
"""INSERT INTO shared_memory (key, value, updated_by)
VALUES ($1, $2, $3)
ON CONFLICT (key) DO UPDATE
SET value = $2, version = shared_memory.version + 1,
updated_by = $3, updated_at = NOW()""",
key, json_value, agent_id,
)
class ConflictError(Exception):
pass
Choosing the Right Pattern
| Pattern | Latency | Consistency | Audit Trail | Best For |
|---|---|---|---|---|
| Redis Shared Memory | Very Low | Eventual | No | Real-time coordination, caching |
| Event Sourcing | Low | Eventual (full history) | Yes | Collaborative workflows, debugging |
| Shared Database | Medium | Strong (with locks) | Partial | Transactional state, structured queries |
For most multi-agent systems, combine patterns: use Redis for fast coordination and status updates, event sourcing for an audit trail of agent actions, and a database for structured persistent state.
Consistency Best Practices
- Use distributed locks when two agents must not act on the same resource simultaneously (e.g., both sending an email to the same customer).
- Prefer idempotent operations so that retries after failures do not cause duplicate side effects.
- Version your shared state with optimistic locking to detect and resolve conflicts.
- Set TTLs on ephemeral data (agent status, locks) so stale entries auto-expire if an agent crashes.
FAQ
How do I handle network partitions where an agent cannot reach Redis?
Implement a local write-ahead log. The agent writes to a local queue when Redis is unavailable, then replays the queue when connectivity is restored. This ensures no data is lost, though other agents may see stale state during the partition.
Is event sourcing overkill for simple multi-agent setups?
For two agents sharing a few facts, yes — a shared Redis hash is simpler. Event sourcing shines when you need a full audit trail, when you want to replay agent interactions for debugging, or when you have many agents that need to build different views of the same data.
How do I scale beyond a single Redis instance?
Redis Cluster shards data across multiple nodes automatically. For event sourcing, Redis Streams support consumer groups that distribute event processing across agent instances. For databases, read replicas handle read-heavy workloads while the primary handles writes.
#DistributedSystems #MultiAgent #Redis #EventSourcing #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.