Skip to content
Learn Agentic AI12 min read0 views

Distributed Memory for Multi-Agent Systems: Sharing State Across Agent Instances

Learn how to build distributed memory for multi-agent systems using Redis, shared databases, event sourcing, and consistency patterns — enabling multiple agent instances to collaborate with shared state.

The Multi-Agent Memory Challenge

When you run a single agent, memory is simple — it lives in one process. But production systems often run multiple agent instances: a research agent, a writing agent, and a review agent collaborating on a report; or multiple copies of the same agent handling concurrent users. These agents need to share state: facts discovered by one agent must be visible to others. The challenge is doing this reliably without race conditions, stale reads, or data loss.

Distributed memory requires solving three problems: storage (where does shared state live?), consistency (how do agents see up-to-date data?), and coordination (how do agents avoid conflicting actions?).

Approach 1: Redis as Shared Memory

Redis is the most common choice for shared agent state. It is fast, supports multiple data structures, and provides atomic operations that prevent race conditions.

import redis
import json
from typing import Optional, List, Dict, Any
from datetime import datetime

class RedisAgentMemory:
    def __init__(self, redis_url: str = "redis://localhost:6379/0", namespace: str = "agents"):
        self.client = redis.from_url(redis_url, decode_responses=True)
        self.ns = namespace

    def _key(self, *parts) -> str:
        return f"{self.ns}:{':'.join(parts)}"

    # --- Shared Facts ---
    def store_fact(self, agent_id: str, fact: str, category: str, metadata: dict = None):
        """Store a fact that any agent can retrieve."""
        entry = {
            "fact": fact,
            "category": category,
            "agent_id": agent_id,
            "timestamp": datetime.utcnow().isoformat(),
            "metadata": metadata or {},
        }
        fact_id = self.client.incr(self._key("fact_counter"))
        self.client.hset(self._key("facts", str(fact_id)), mapping={
            k: json.dumps(v) if isinstance(v, dict) else str(v)
            for k, v in entry.items()
        })
        # Index by category for efficient retrieval
        self.client.sadd(self._key("category", category), str(fact_id))
        return fact_id

    def get_facts_by_category(self, category: str) -> List[Dict]:
        """Retrieve all facts in a category, from any agent."""
        fact_ids = self.client.smembers(self._key("category", category))
        facts = []
        for fid in fact_ids:
            data = self.client.hgetall(self._key("facts", fid))
            if data:
                facts.append(data)
        return facts

    # --- Agent Status ---
    def set_agent_status(self, agent_id: str, status: str, detail: str = ""):
        """Publish agent status so other agents can coordinate."""
        self.client.hset(self._key("status", agent_id), mapping={
            "status": status,
            "detail": detail,
            "updated_at": datetime.utcnow().isoformat(),
        })

    def get_agent_status(self, agent_id: str) -> Optional[Dict]:
        return self.client.hgetall(self._key("status", agent_id))

    # --- Distributed Lock ---
    def acquire_lock(self, resource: str, agent_id: str, ttl_seconds: int = 30) -> bool:
        """Acquire a distributed lock to prevent conflicting operations."""
        lock_key = self._key("lock", resource)
        return bool(self.client.set(lock_key, agent_id, nx=True, ex=ttl_seconds))

    def release_lock(self, resource: str, agent_id: str):
        """Release a lock only if this agent holds it."""
        lock_key = self._key("lock", resource)
        current = self.client.get(lock_key)
        if current == agent_id:
            self.client.delete(lock_key)

Approach 2: Event Sourcing

Instead of sharing mutable state, agents publish events to a shared log. Each agent reads the event stream to build its own view of the world. This pattern provides a complete audit trail and makes it easy to replay history.

from dataclasses import dataclass, field
from typing import Callable

@dataclass
class AgentEvent:
    event_type: str        # "fact_discovered", "task_completed", "error_occurred"
    agent_id: str
    payload: Dict[str, Any]
    timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
    event_id: str = ""

class EventStore:
    def __init__(self, redis_url: str = "redis://localhost:6379/0"):
        self.client = redis.from_url(redis_url, decode_responses=True)
        self.stream_key = "agent_events"

    def publish(self, event: AgentEvent) -> str:
        """Append an event to the shared event stream."""
        event_data = {
            "event_type": event.event_type,
            "agent_id": event.agent_id,
            "payload": json.dumps(event.payload),
            "timestamp": event.timestamp,
        }
        event_id = self.client.xadd(self.stream_key, event_data)
        return event_id

    def read_events(
        self, since: str = "0-0", count: int = 100
    ) -> List[Dict]:
        """Read events from the stream, optionally from a specific point."""
        entries = self.client.xrange(self.stream_key, min=since, count=count)
        events = []
        for event_id, data in entries:
            data["event_id"] = event_id
            data["payload"] = json.loads(data["payload"])
            events.append(data)
        return events

    def subscribe(self, last_id: str = "$") -> None:
        """Block and wait for new events (for real-time processing)."""
        while True:
            entries = self.client.xread(
                {self.stream_key: last_id}, block=5000, count=10
            )
            for stream, messages in entries:
                for event_id, data in messages:
                    last_id = event_id
                    data["payload"] = json.loads(data["payload"])
                    yield data

# Usage: Research agent publishes findings
store = EventStore()
store.publish(AgentEvent(
    event_type="fact_discovered",
    agent_id="research_agent_1",
    payload={"topic": "market_size", "value": "$4.2B", "source": "report_xyz"},
))

Approach 3: Shared Database with Optimistic Locking

For agents that need structured queries and transactional guarantees, a shared PostgreSQL database works well. Optimistic locking prevents lost updates when two agents modify the same record.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import asyncpg
from typing import Optional

class SharedAgentDB:
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool: Optional[asyncpg.Pool] = None

    async def connect(self):
        self.pool = await asyncpg.create_pool(self.dsn, min_size=2, max_size=10)
        await self.pool.execute("""
            CREATE TABLE IF NOT EXISTS shared_memory (
                id SERIAL PRIMARY KEY,
                key TEXT UNIQUE NOT NULL,
                value JSONB NOT NULL,
                version INTEGER DEFAULT 1,
                updated_by TEXT NOT NULL,
                updated_at TIMESTAMPTZ DEFAULT NOW()
            )
        """)

    async def read(self, key: str) -> Optional[Dict]:
        row = await self.pool.fetchrow(
            "SELECT value, version FROM shared_memory WHERE key = $1", key
        )
        if row:
            return {"value": json.loads(row["value"]), "version": row["version"]}
        return None

    async def write(self, key: str, value: Any, agent_id: str, expected_version: int = None):
        """Write with optimistic locking — fails if another agent updated first."""
        json_value = json.dumps(value)

        if expected_version is not None:
            # Update only if version matches (optimistic lock)
            result = await self.pool.execute(
                """UPDATE shared_memory
                   SET value = $1, version = version + 1,
                       updated_by = $2, updated_at = NOW()
                   WHERE key = $3 AND version = $4""",
                json_value, agent_id, key, expected_version,
            )
            if result == "UPDATE 0":
                raise ConflictError(
                    f"Key '{key}' was modified by another agent. "
                    f"Expected version {expected_version}."
                )
        else:
            # Upsert
            await self.pool.execute(
                """INSERT INTO shared_memory (key, value, updated_by)
                   VALUES ($1, $2, $3)
                   ON CONFLICT (key) DO UPDATE
                   SET value = $2, version = shared_memory.version + 1,
                       updated_by = $3, updated_at = NOW()""",
                key, json_value, agent_id,
            )

class ConflictError(Exception):
    pass

Choosing the Right Pattern

Pattern Latency Consistency Audit Trail Best For
Redis Shared Memory Very Low Eventual No Real-time coordination, caching
Event Sourcing Low Eventual (full history) Yes Collaborative workflows, debugging
Shared Database Medium Strong (with locks) Partial Transactional state, structured queries

For most multi-agent systems, combine patterns: use Redis for fast coordination and status updates, event sourcing for an audit trail of agent actions, and a database for structured persistent state.

Consistency Best Practices

  1. Use distributed locks when two agents must not act on the same resource simultaneously (e.g., both sending an email to the same customer).
  2. Prefer idempotent operations so that retries after failures do not cause duplicate side effects.
  3. Version your shared state with optimistic locking to detect and resolve conflicts.
  4. Set TTLs on ephemeral data (agent status, locks) so stale entries auto-expire if an agent crashes.

FAQ

How do I handle network partitions where an agent cannot reach Redis?

Implement a local write-ahead log. The agent writes to a local queue when Redis is unavailable, then replays the queue when connectivity is restored. This ensures no data is lost, though other agents may see stale state during the partition.

Is event sourcing overkill for simple multi-agent setups?

For two agents sharing a few facts, yes — a shared Redis hash is simpler. Event sourcing shines when you need a full audit trail, when you want to replay agent interactions for debugging, or when you have many agents that need to build different views of the same data.

How do I scale beyond a single Redis instance?

Redis Cluster shards data across multiple nodes automatically. For event sourcing, Redis Streams support consumer groups that distribute event processing across agent instances. For databases, read replicas handle read-heavy workloads while the primary handles writes.


#DistributedSystems #MultiAgent #Redis #EventSourcing #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.