Skip to content
Learn Agentic AI11 min read0 views

Migrating Agent Data: Moving Conversations, Sessions, and Memory Between Systems

Learn how to migrate conversations, sessions, and agent memory between AI systems with zero downtime. Covers data export, transformation, import validation, and cutover strategies.

Why Agent Data Migration Is Harder Than Regular Data Migration

Agent data has unique characteristics that make migration challenging. Conversations have temporal ordering that must be preserved. Session state references tool call IDs and function outputs that are framework-specific. Memory stores may contain embeddings tied to a particular model version. And users expect continuity — they do not want to re-explain context after a system change.

A well-planned migration preserves all of this while the system stays online.

Step 1: Define a Canonical Data Format

Before exporting anything, define a framework-agnostic format that captures all the information you need.

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import json

@dataclass
class CanonicalMessage:
    role: str  # "user", "assistant", "system", "tool"
    content: str
    timestamp: datetime
    tool_call_id: Optional[str] = None
    tool_name: Optional[str] = None
    metadata: dict = field(default_factory=dict)

@dataclass
class CanonicalSession:
    session_id: str
    user_id: str
    messages: list[CanonicalMessage]
    created_at: datetime
    updated_at: datetime
    agent_name: str
    metadata: dict = field(default_factory=dict)

def serialize_session(session: CanonicalSession) -> str:
    """Serialize to JSON for transport."""
    return json.dumps({
        "session_id": session.session_id,
        "user_id": session.user_id,
        "messages": [
            {
                "role": m.role,
                "content": m.content,
                "timestamp": m.timestamp.isoformat(),
                "tool_call_id": m.tool_call_id,
                "tool_name": m.tool_name,
                "metadata": m.metadata,
            }
            for m in session.messages
        ],
        "created_at": session.created_at.isoformat(),
        "updated_at": session.updated_at.isoformat(),
        "agent_name": session.agent_name,
        "metadata": session.metadata,
    }, indent=2)

Step 2: Export from the Source System

Write an exporter that reads from your current storage and transforms to the canonical format.

import asyncpg

async def export_sessions(
    db_url: str,
    batch_size: int = 500,
) -> list[CanonicalSession]:
    """Export sessions from PostgreSQL in batches."""
    conn = await asyncpg.connect(db_url)
    sessions = []
    offset = 0

    while True:
        rows = await conn.fetch(
            """
            SELECT s.id, s.user_id, s.created_at, s.updated_at,
                   s.agent_name, s.metadata
            FROM sessions s
            ORDER BY s.created_at
            LIMIT $1 OFFSET $2
            """,
            batch_size, offset,
        )
        if not rows:
            break

        for row in rows:
            messages = await conn.fetch(
                """
                SELECT role, content, created_at, tool_call_id,
                       tool_name, metadata
                FROM messages
                WHERE session_id = $1
                ORDER BY created_at
                """,
                row["id"],
            )
            sessions.append(CanonicalSession(
                session_id=str(row["id"]),
                user_id=str(row["user_id"]),
                messages=[
                    CanonicalMessage(
                        role=m["role"],
                        content=m["content"],
                        timestamp=m["created_at"],
                        tool_call_id=m.get("tool_call_id"),
                        tool_name=m.get("tool_name"),
                        metadata=m.get("metadata") or {},
                    )
                    for m in messages
                ],
                created_at=row["created_at"],
                updated_at=row["updated_at"],
                agent_name=row["agent_name"],
                metadata=row.get("metadata") or {},
            ))
        offset += batch_size

    await conn.close()
    return sessions

Step 3: Import and Validate

Import into the target system with validation checks at every step.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

async def import_sessions(
    sessions: list[CanonicalSession],
    target_db_url: str,
) -> dict:
    """Import sessions with validation."""
    conn = await asyncpg.connect(target_db_url)
    stats = {"imported": 0, "skipped": 0, "errors": 0}

    for session in sessions:
        try:
            # Check for duplicates
            existing = await conn.fetchval(
                "SELECT 1 FROM sessions WHERE id = $1",
                session.session_id,
            )
            if existing:
                stats["skipped"] += 1
                continue

            async with conn.transaction():
                await conn.execute(
                    """INSERT INTO sessions
                       (id, user_id, agent_name, created_at, updated_at)
                       VALUES ($1, $2, $3, $4, $5)""",
                    session.session_id, session.user_id,
                    session.agent_name, session.created_at,
                    session.updated_at,
                )
                for msg in session.messages:
                    await conn.execute(
                        """INSERT INTO messages
                           (session_id, role, content, created_at)
                           VALUES ($1, $2, $3, $4)""",
                        session.session_id, msg.role,
                        msg.content, msg.timestamp,
                    )
            stats["imported"] += 1
        except Exception as e:
            stats["errors"] += 1
            print(f"Error importing {session.session_id}: {e}")

    await conn.close()
    return stats

Step 4: Validate Counts and Integrity

After import, run integrity checks to make sure nothing was lost.

async def validate_migration(source_url: str, target_url: str):
    src = await asyncpg.connect(source_url)
    tgt = await asyncpg.connect(target_url)

    src_sessions = await src.fetchval("SELECT count(*) FROM sessions")
    tgt_sessions = await tgt.fetchval("SELECT count(*) FROM sessions")
    src_messages = await src.fetchval("SELECT count(*) FROM messages")
    tgt_messages = await tgt.fetchval("SELECT count(*) FROM messages")

    print(f"Sessions: source={src_sessions}, target={tgt_sessions}")
    print(f"Messages: source={src_messages}, target={tgt_messages}")
    assert src_sessions == tgt_sessions, "Session count mismatch"
    assert src_messages == tgt_messages, "Message count mismatch"

FAQ

How do I handle active sessions during migration?

Use a write-ahead approach. Set a cutoff timestamp, export all sessions up to that point, then replay any new writes that occurred during the export. A CDC (Change Data Capture) stream from tools like Debezium can capture these delta writes automatically.

Should I migrate tool call results or just the conversation text?

Migrate tool call results. They provide context that the agent used to formulate responses. Without them, resuming a conversation in the new system may produce inconsistent follow-ups because the agent loses the factual grounding from previous tool calls.

What about memory stores like vector databases?

Vector memory requires special handling because embeddings are model-specific. If you are changing embedding models, you must re-embed the source documents rather than copying vectors directly. Plan for the re-embedding compute cost.


#DataMigration #AgentMemory #Conversations #ZeroDowntime #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.