Memory-Efficient Agent Design: Handling Long Conversations Without OOM
Design AI agents that handle long conversations gracefully by using streaming processing, incremental state management, garbage collection strategies, and memory limits to prevent out-of-memory crashes.
How Agent Memory Grows Out of Control
An AI agent conversation is not just a list of strings. Each turn includes the user message, assistant response, tool calls, tool results, and metadata. A single tool result can be 10KB of JSON. Over a 50-turn conversation with 3-5 tool calls per turn, the in-memory conversation state can exceed 500KB — per session.
Multiply that by hundreds of concurrent sessions and you have a server consuming gigabytes of RAM just for conversation state. Add in embedding vectors, cached results, and intermediate processing buffers, and out-of-memory (OOM) crashes become a real production risk.
Streaming Processing: Never Hold the Full Response
When processing LLM responses, stream them instead of accumulating the entire response in memory before returning it.
from openai import AsyncOpenAI
client = AsyncOpenAI()
# BAD: Accumulates the entire response in memory
async def generate_full(messages: list[dict]) -> str:
response = await client.chat.completions.create(
model="gpt-4o", messages=messages,
)
return response.choices[0].message.content # Full string in memory
# GOOD: Stream chunks to the client as they arrive
async def generate_streamed(messages: list[dict]):
stream = await client.chat.completions.create(
model="gpt-4o", messages=messages, stream=True,
)
async for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
yield delta # Yield each chunk, never hold the full response
For FastAPI, combine this with StreamingResponse:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat")
async def chat(request: ChatRequest):
async def stream_generator():
async for chunk in generate_streamed(request.messages):
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
stream_generator(),
media_type="text/event-stream",
)
Incremental State: Store Summaries, Not Full History
Instead of keeping every message in memory, maintain an incremental state that compresses old messages into summaries.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from dataclasses import dataclass, field
@dataclass
class ConversationState:
session_id: str
summary: str = ""
recent_messages: list[dict] = field(default_factory=list)
max_recent: int = 10
_total_turns: int = 0
def add_message(self, message: dict):
self.recent_messages.append(message)
self._total_turns += 1
def needs_compaction(self) -> bool:
return len(self.recent_messages) > self.max_recent * 2
async def compact(self, summarizer):
"""Compress old messages into the summary."""
if not self.needs_compaction():
return
# Keep the last max_recent messages
to_summarize = self.recent_messages[:-self.max_recent]
self.recent_messages = self.recent_messages[-self.max_recent:]
# Add to running summary
new_summary = await summarizer.summarize(to_summarize)
self.summary = f"{self.summary} {new_summary}".strip()
def get_context(self) -> list[dict]:
"""Build the context for the LLM call."""
context = []
if self.summary:
context.append({
"role": "system",
"content": f"Previous conversation summary: {self.summary}",
})
context.extend(self.recent_messages)
return context
@property
def memory_estimate_bytes(self) -> int:
"""Rough estimate of memory consumed by this state."""
summary_bytes = len(self.summary.encode("utf-8"))
messages_bytes = sum(
len(str(m).encode("utf-8")) for m in self.recent_messages
)
return summary_bytes + messages_bytes
Session Memory Limits and Eviction
For multi-session servers, enforce per-session and global memory limits.
import asyncio
from collections import OrderedDict
class SessionManager:
def __init__(
self,
max_sessions: int = 1000,
max_memory_bytes: int = 500 * 1024 * 1024, # 500MB
):
self.max_sessions = max_sessions
self.max_memory_bytes = max_memory_bytes
self._sessions: OrderedDict[str, ConversationState] = OrderedDict()
self._lock = asyncio.Lock()
async def get_or_create(self, session_id: str) -> ConversationState:
async with self._lock:
if session_id in self._sessions:
self._sessions.move_to_end(session_id)
return self._sessions[session_id]
# Evict if at capacity
await self._evict_if_needed()
state = ConversationState(session_id=session_id)
self._sessions[session_id] = state
return state
async def _evict_if_needed(self):
# Evict by count
while len(self._sessions) >= self.max_sessions:
evicted_id, evicted_state = self._sessions.popitem(last=False)
await self._persist_to_disk(evicted_id, evicted_state)
# Evict by memory
total_memory = sum(
s.memory_estimate_bytes for s in self._sessions.values()
)
while total_memory > self.max_memory_bytes and self._sessions:
evicted_id, evicted_state = self._sessions.popitem(last=False)
total_memory -= evicted_state.memory_estimate_bytes
await self._persist_to_disk(evicted_id, evicted_state)
async def _persist_to_disk(self, session_id: str, state: ConversationState):
"""Save evicted session to database for later retrieval."""
# Implementation: write to PostgreSQL, Redis, or file
pass
Truncating Tool Outputs Before Storage
Tool outputs are the single largest memory consumer. Truncate them before adding to conversation state.
import json
class ToolOutputTruncator:
def __init__(self, max_chars: int = 2000):
self.max_chars = max_chars
def truncate(self, output: str) -> str:
if len(output) <= self.max_chars:
return output
try:
data = json.loads(output)
return self._truncate_json(data)
except (json.JSONDecodeError, TypeError):
return output[:self.max_chars] + "\n...(truncated)"
def _truncate_json(self, data, depth: int = 0) -> str:
if depth > 3:
return '"...(nested)"'
if isinstance(data, list):
if len(data) > 5:
truncated = data[:5]
result = json.dumps(truncated, default=str)
return result + f"\n...({len(data) - 5} more items)"
return json.dumps(data, default=str)
if isinstance(data, dict):
# Keep only essential fields
essential = {k: v for k, v in list(data.items())[:10]}
return json.dumps(essential, default=str)
return json.dumps(data, default=str)
Monitoring Memory Usage
Add memory monitoring to detect leaks before they cause OOM crashes.
import psutil
import os
import logging
logger = logging.getLogger(__name__)
class MemoryMonitor:
def __init__(self, warning_pct: float = 75.0, critical_pct: float = 90.0):
self.warning_pct = warning_pct
self.critical_pct = critical_pct
self.process = psutil.Process(os.getpid())
def check(self) -> dict:
mem = self.process.memory_info()
system_mem = psutil.virtual_memory()
usage_pct = (mem.rss / system_mem.total) * 100
status = {
"rss_mb": mem.rss / (1024 * 1024),
"usage_pct": usage_pct,
"status": "ok",
}
if usage_pct > self.critical_pct:
status["status"] = "critical"
logger.critical(f"Memory critical: {usage_pct:.1f}% of system RAM")
elif usage_pct > self.warning_pct:
status["status"] = "warning"
logger.warning(f"Memory warning: {usage_pct:.1f}% of system RAM")
return status
FAQ
How many concurrent agent sessions can a typical server handle?
With efficient memory management, a server with 4GB of RAM can handle 1,000-5,000 concurrent sessions depending on conversation length. Without optimization, the same server might OOM at 200 sessions. The key is keeping per-session memory under 500KB through summarization and tool output truncation.
Should I use Redis or in-process memory for conversation state?
Use in-process memory for active sessions (fastest access) and Redis for idle sessions (shared across server instances). Implement an LRU eviction policy that moves inactive sessions from memory to Redis after a configurable idle timeout, typically 5-15 minutes.
How do I detect memory leaks in a long-running agent service?
Track RSS (Resident Set Size) over time using psutil. If RSS grows monotonically even when session counts are stable, you have a leak. Common culprits are: accumulating references in global lists, not closing HTTP clients, and circular references in tool result objects that prevent garbage collection.
#MemoryManagement #Streaming #Scalability #Production #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.