Skip to content
Learn Agentic AI11 min read0 views

Database Schema Migrations for AI Agent Systems: Adding Features Without Downtime

Learn how to perform database schema migrations for AI agent systems with zero downtime. Covers online migrations, backward compatibility, data backfill, and rollback strategies.

Why AI Agent Databases Are Tricky to Migrate

AI agent systems have database tables that grow in unpredictable ways. A conversations table might store 50,000 rows per day. A tool_calls table logs every function invocation with its arguments and results. A memory_store table holds vector embeddings that cannot be regenerated cheaply.

Adding a column, changing a constraint, or introducing a new table must happen without locking these high-traffic tables. A traditional ALTER TABLE ... ADD COLUMN with a NOT NULL constraint on a 10-million-row table will lock writes for minutes — and your agents will time out or lose messages.

The Expand-Contract Pattern

The safest migration strategy for production systems is expand-contract (also called parallel change). It has three phases:

  1. Expand: Add the new column or table as nullable with no constraints
  2. Migrate: Backfill existing data and update application code to write to both old and new columns
  3. Contract: Remove the old column after all code reads from the new one
"""
Alembic migration: Add sentiment_score to conversations.
Phase 1 (Expand) — add nullable column, no downtime.
"""
from alembic import op
import sqlalchemy as sa

revision = "042_add_sentiment_score"
down_revision = "041_add_tool_call_index"

def upgrade():
    # Phase 1: Add column as nullable — instant, no table lock
    op.add_column(
        "conversations",
        sa.Column(
            "sentiment_score",
            sa.Float(),
            nullable=True,
            comment="AI-computed sentiment, -1.0 to 1.0",
        ),
    )
    # Add index concurrently to avoid blocking writes
    op.execute(
        "CREATE INDEX CONCURRENTLY idx_conversations_sentiment "
        "ON conversations (sentiment_score) "
        "WHERE sentiment_score IS NOT NULL"
    )

def downgrade():
    op.drop_index("idx_conversations_sentiment")
    op.drop_column("conversations", "sentiment_score")

Backfill Existing Data Without Locking

Never backfill with a single UPDATE on millions of rows. Process in batches.

import asyncpg
import asyncio

async def backfill_sentiment_scores(
    db_url: str,
    batch_size: int = 1000,
    sleep_between_batches: float = 0.1,
):
    """Backfill sentiment scores in small batches."""
    conn = await asyncpg.connect(db_url)
    total_updated = 0

    while True:
        # Select a batch of rows missing the new column
        rows = await conn.fetch(
            """
            SELECT id, content
            FROM conversations
            WHERE sentiment_score IS NULL
            ORDER BY id
            LIMIT $1
            """,
            batch_size,
        )
        if not rows:
            break

        for row in rows:
            score = compute_sentiment(row["content"])
            await conn.execute(
                "UPDATE conversations SET sentiment_score = $1 WHERE id = $2",
                score, row["id"],
            )
            total_updated += 1

        # Yield to other connections
        await asyncio.sleep(sleep_between_batches)
        print(f"Backfilled {total_updated} rows...")

    await conn.close()
    print(f"Backfill complete: {total_updated} rows updated")

def compute_sentiment(text: str) -> float:
    """Compute sentiment score using a lightweight model."""
    # In production, use a fast local model or batch API calls
    from textblob import TextBlob
    return TextBlob(text).sentiment.polarity

Dual-Write During Transition

While the backfill runs, update your application to write to both old and new schemas.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

class ConversationRepository:
    """Repository that supports both old and new schema."""

    async def save_message(
        self, conversation_id: str, role: str, content: str,
    ):
        sentiment = compute_sentiment(content) if role == "user" else None

        await self.conn.execute(
            """
            INSERT INTO messages (conversation_id, role, content)
            VALUES ($1, $2, $3)
            """,
            conversation_id, role, content,
        )

        # Dual-write: update the new column on the conversation
        if sentiment is not None:
            await self.conn.execute(
                """
                UPDATE conversations
                SET sentiment_score = $1, updated_at = now()
                WHERE id = $2
                """,
                sentiment, conversation_id,
            )

Phase 3: Contract — Add Constraints

After the backfill completes and all code writes to the new column, add the constraint.

"""Phase 3 migration: Make sentiment_score NOT NULL."""

revision = "044_sentiment_score_not_null"
down_revision = "043_backfill_sentiment"

def upgrade():
    # Validate that backfill is complete before adding constraint
    op.execute(
        "DO $$ BEGIN "
        "  IF EXISTS (SELECT 1 FROM conversations "
        "             WHERE sentiment_score IS NULL LIMIT 1) THEN "
        "    RAISE EXCEPTION 'Backfill incomplete'; "
        "  END IF; "
        "END $$"
    )
    op.alter_column(
        "conversations", "sentiment_score",
        nullable=False,
        server_default="0.0",
    )

def downgrade():
    op.alter_column(
        "conversations", "sentiment_score",
        nullable=True,
        server_default=None,
    )

Rollback Strategy

Always have a rollback plan that does not require a reverse migration.

import os

class FeatureFlags:
    @staticmethod
    def use_sentiment_score() -> bool:
        return os.getenv("FEATURE_SENTIMENT_SCORE", "false") == "true"

# In your API endpoint
async def get_conversation(conversation_id: str):
    conv = await repo.get_conversation(conversation_id)
    response = {"id": conv.id, "messages": conv.messages}

    if FeatureFlags.use_sentiment_score():
        response["sentiment_score"] = conv.sentiment_score

    return response

FAQ

How do I handle migrations on tables with tens of millions of rows?

Use ALTER TABLE ... ADD COLUMN with a nullable column and no default — this is instant in PostgreSQL 11+ because it only updates the catalog. Then backfill in batches of 1,000-5,000 rows with a small sleep between batches to avoid overwhelming the connection pool. Monitor replication lag if you have read replicas.

What about adding indexes on large tables?

Always use CREATE INDEX CONCURRENTLY in PostgreSQL. This builds the index without holding a table lock, though it takes longer to complete. Never create indexes inside a transaction block when using CONCURRENTLY. With Alembic, use op.execute() for concurrent index creation rather than op.create_index().

How do I coordinate schema changes across multiple agent services?

Use the expand-contract pattern with API versioning. The database expands first (new columns are nullable), then each service is updated to use the new columns at its own pace. Only contract (remove old columns) after all services have been updated and deployed. Keep a migration tracker document so every team knows which phase the migration is in.


#DatabaseMigration #SchemaChanges #ZeroDowntime #PostgreSQL #Alembic #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.