Skip to content
Learn Agentic AI14 min read0 views

Workflow Versioning and Migration: Updating Running Agent Workflows Without Downtime

Learn how to version and migrate AI agent workflows safely. Covers versioning strategies, backward compatibility patterns, migration techniques, and rollback procedures for zero-downtime updates.

The Versioning Problem for Agent Workflows

AI agent workflows evolve constantly. You refine prompts, add new processing steps, change model providers, restructure output formats. But when you deploy a new version, there may be hundreds of workflows still running on the old version. A naive deployment that replaces the old code breaks those in-flight executions.

This is fundamentally different from stateless API versioning. A REST endpoint can handle each request independently — new requests use new code, old requests are already complete. Workflows, however, maintain state across hours or days. The code that started a workflow must remain compatible with the code that finishes it.

Versioning Strategies

Strategy 1: Version-Stamped Workflows

Stamp each workflow instance with its version at creation time. Route execution to the correct handler based on the version.

from dataclasses import dataclass
from typing import Any

@dataclass
class VersionedWorkflow:
    id: str
    version: str  # e.g., "2.1.0"
    status: str
    current_step: int
    context: dict[str, Any]

class WorkflowRegistry:
    """Registry that maps workflow versions to handlers."""

    def __init__(self):
        self._handlers: dict[str, dict[str, callable]] = {}

    def register(self, version: str, step_name: str, handler: callable):
        if version not in self._handlers:
            self._handlers[version] = {}
        self._handlers[version][step_name] = handler

    def get_handler(self, version: str, step_name: str) -> callable:
        handlers = self._handlers.get(version)
        if not handlers:
            raise ValueError(f"No handlers for version {version}")
        handler = handlers.get(step_name)
        if not handler:
            raise ValueError(
                f"No handler for step {step_name} in version {version}"
            )
        return handler

# Register handlers for multiple versions
registry = WorkflowRegistry()

# Version 1: Simple prompt
registry.register("1.0.0", "analyze", analyze_v1)
registry.register("1.0.0", "summarize", summarize_v1)

# Version 2: Enhanced prompt with context window
registry.register("2.0.0", "analyze", analyze_v2)
registry.register("2.0.0", "enrich", enrich_v2)  # New step
registry.register("2.0.0", "summarize", summarize_v2)

Strategy 2: Backward-Compatible Step Evolution

Design steps so new versions can process state created by old versions. This avoids maintaining multiple handler codepaths.

async def summarize_v2(context: dict) -> str:
    """V2 summarizer that handles both v1 and v2 context formats."""

    # V1 context has "raw_text", V2 has "enriched_text"
    text = context.get("enriched_text") or context.get("raw_text")
    if not text:
        raise ValueError("No text found in context")

    # V2 adds source attribution, but falls back gracefully
    sources = context.get("sources", [])
    source_instruction = ""
    if sources:
        source_instruction = (
            f"\nCite these sources: {', '.join(sources)}"
        )

    prompt = f"Summarize the following:{source_instruction}\n\n{text}"
    return await call_llm(prompt)

Strategy 3: Workflow Migration

Actively migrate in-flight workflows from the old version to the new version during a maintenance window or gradually.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from datetime import datetime

class WorkflowMigrator:
    def __init__(self, store, registry: WorkflowRegistry):
        self.store = store
        self.registry = registry

    async def migrate_workflow(
        self,
        workflow: VersionedWorkflow,
        target_version: str,
    ) -> VersionedWorkflow:
        """Migrate a single workflow to a new version."""
        source_version = workflow.version

        # Get the migration function for this version pair
        migrator = self._get_migrator(source_version, target_version)

        # Transform the context
        new_context = migrator(workflow.context)

        # Update the workflow
        workflow.version = target_version
        workflow.context = new_context
        workflow.context["_migration_history"] = (
            workflow.context.get("_migration_history", [])
            + [{
                "from": source_version,
                "to": target_version,
                "at": datetime.utcnow().isoformat(),
            }]
        )

        await self.store.save(workflow)
        return workflow

    async def migrate_batch(
        self,
        source_version: str,
        target_version: str,
        batch_size: int = 100,
    ) -> dict:
        """Migrate all workflows from one version to another."""
        migrated = 0
        failed = 0

        workflows = await self.store.find_by_version(
            source_version, status="running"
        )

        for batch_start in range(0, len(workflows), batch_size):
            batch = workflows[batch_start:batch_start + batch_size]
            for wf in batch:
                try:
                    await self.migrate_workflow(wf, target_version)
                    migrated += 1
                except Exception as e:
                    failed += 1
                    logger.error(
                        f"Migration failed for {wf.id}: {e}"
                    )

        return {"migrated": migrated, "failed": failed}

    def _get_migrator(self, source: str, target: str) -> callable:
        """Return a function that transforms context between versions."""
        key = f"{source}->{target}"
        migrators = {
            "1.0.0->2.0.0": self._migrate_1_to_2,
            "2.0.0->2.1.0": self._migrate_2_to_2_1,
        }
        migrator = migrators.get(key)
        if not migrator:
            raise ValueError(f"No migrator for {key}")
        return migrator

    @staticmethod
    def _migrate_1_to_2(context: dict) -> dict:
        """Migrate v1 context to v2 format."""
        new_context = {**context}
        # V2 renames raw_text to enriched_text
        if "raw_text" in new_context:
            new_context["enriched_text"] = new_context.pop("raw_text")
        # V2 adds sources field
        new_context.setdefault("sources", [])
        return new_context

Rollback Procedures

Always plan for rollback. A failed migration should not leave workflows in an inconsistent state.

class SafeMigrator(WorkflowMigrator):
    async def migrate_with_rollback(
        self,
        workflow: VersionedWorkflow,
        target_version: str,
    ) -> VersionedWorkflow:
        # Snapshot the original state
        snapshot = {
            "version": workflow.version,
            "context": {**workflow.context},
            "current_step": workflow.current_step,
        }

        try:
            result = await self.migrate_workflow(workflow, target_version)

            # Validate the migrated workflow
            self._validate_workflow(result, target_version)
            return result

        except Exception as e:
            # Rollback to snapshot
            workflow.version = snapshot["version"]
            workflow.context = snapshot["context"]
            workflow.current_step = snapshot["current_step"]
            await self.store.save(workflow)
            raise RuntimeError(
                f"Migration rolled back for {workflow.id}: {e}"
            )

    def _validate_workflow(
        self, workflow: VersionedWorkflow, version: str
    ):
        """Ensure the migrated workflow has all required fields."""
        required_fields = {
            "2.0.0": ["enriched_text", "sources"],
            "2.1.0": ["enriched_text", "sources", "model_config"],
        }
        for field in required_fields.get(version, []):
            if field not in workflow.context:
                raise ValueError(f"Missing required field: {field}")

Temporal-Specific Versioning

Temporal has built-in workflow versioning via the patched API, which handles replay compatibility automatically.

from temporalio import workflow

@workflow.defn
class ResearchAgentV2:
    @workflow.run
    async def run(self, task):
        if workflow.patched("v2-add-enrichment"):
            # New code path: includes enrichment step
            enriched = await workflow.execute_activity(
                enrich_data, args=[task.query],
                start_to_close_timeout=timedelta(seconds=60),
            )
        else:
            # Old code path: skip enrichment
            enriched = task.query

        result = await workflow.execute_activity(
            summarize, args=[enriched],
            start_to_close_timeout=timedelta(seconds=120),
        )
        return result

FAQ

How long should I keep old workflow versions running?

Keep old versions active until all in-flight workflows on that version complete. Monitor the count of running workflows per version and decommission old handlers when the count reaches zero. For workflows with unbounded duration (waiting for human input), set a maximum lifetime and force-complete or migrate stale instances.

Should I version the workflow definition or individual steps?

Version both, but at different granularities. Workflow definitions get major version bumps when the step graph changes (new steps added, steps removed, order changed). Individual steps get minor version bumps when their internal logic changes but their input/output contract stays the same.

What is the safest migration strategy for critical workflows?

Run both versions simultaneously with a feature flag. New workflows use the new version. Existing workflows continue on the old version until they complete naturally. This is the slowest approach but carries zero migration risk. Reserve active migration for non-critical workflows or when you need to force-upgrade for security or compliance reasons.


#WorkflowVersioning #Migration #ZeroDowntime #AIAgents #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.