Workflow Versioning and Migration: Updating Running Agent Workflows Without Downtime
Learn how to version and migrate AI agent workflows safely. Covers versioning strategies, backward compatibility patterns, migration techniques, and rollback procedures for zero-downtime updates.
The Versioning Problem for Agent Workflows
AI agent workflows evolve constantly. You refine prompts, add new processing steps, change model providers, restructure output formats. But when you deploy a new version, there may be hundreds of workflows still running on the old version. A naive deployment that replaces the old code breaks those in-flight executions.
This is fundamentally different from stateless API versioning. A REST endpoint can handle each request independently — new requests use new code, old requests are already complete. Workflows, however, maintain state across hours or days. The code that started a workflow must remain compatible with the code that finishes it.
Versioning Strategies
Strategy 1: Version-Stamped Workflows
Stamp each workflow instance with its version at creation time. Route execution to the correct handler based on the version.
from dataclasses import dataclass
from typing import Any
@dataclass
class VersionedWorkflow:
id: str
version: str # e.g., "2.1.0"
status: str
current_step: int
context: dict[str, Any]
class WorkflowRegistry:
"""Registry that maps workflow versions to handlers."""
def __init__(self):
self._handlers: dict[str, dict[str, callable]] = {}
def register(self, version: str, step_name: str, handler: callable):
if version not in self._handlers:
self._handlers[version] = {}
self._handlers[version][step_name] = handler
def get_handler(self, version: str, step_name: str) -> callable:
handlers = self._handlers.get(version)
if not handlers:
raise ValueError(f"No handlers for version {version}")
handler = handlers.get(step_name)
if not handler:
raise ValueError(
f"No handler for step {step_name} in version {version}"
)
return handler
# Register handlers for multiple versions
registry = WorkflowRegistry()
# Version 1: Simple prompt
registry.register("1.0.0", "analyze", analyze_v1)
registry.register("1.0.0", "summarize", summarize_v1)
# Version 2: Enhanced prompt with context window
registry.register("2.0.0", "analyze", analyze_v2)
registry.register("2.0.0", "enrich", enrich_v2) # New step
registry.register("2.0.0", "summarize", summarize_v2)
Strategy 2: Backward-Compatible Step Evolution
Design steps so new versions can process state created by old versions. This avoids maintaining multiple handler codepaths.
async def summarize_v2(context: dict) -> str:
"""V2 summarizer that handles both v1 and v2 context formats."""
# V1 context has "raw_text", V2 has "enriched_text"
text = context.get("enriched_text") or context.get("raw_text")
if not text:
raise ValueError("No text found in context")
# V2 adds source attribution, but falls back gracefully
sources = context.get("sources", [])
source_instruction = ""
if sources:
source_instruction = (
f"\nCite these sources: {', '.join(sources)}"
)
prompt = f"Summarize the following:{source_instruction}\n\n{text}"
return await call_llm(prompt)
Strategy 3: Workflow Migration
Actively migrate in-flight workflows from the old version to the new version during a maintenance window or gradually.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from datetime import datetime
class WorkflowMigrator:
def __init__(self, store, registry: WorkflowRegistry):
self.store = store
self.registry = registry
async def migrate_workflow(
self,
workflow: VersionedWorkflow,
target_version: str,
) -> VersionedWorkflow:
"""Migrate a single workflow to a new version."""
source_version = workflow.version
# Get the migration function for this version pair
migrator = self._get_migrator(source_version, target_version)
# Transform the context
new_context = migrator(workflow.context)
# Update the workflow
workflow.version = target_version
workflow.context = new_context
workflow.context["_migration_history"] = (
workflow.context.get("_migration_history", [])
+ [{
"from": source_version,
"to": target_version,
"at": datetime.utcnow().isoformat(),
}]
)
await self.store.save(workflow)
return workflow
async def migrate_batch(
self,
source_version: str,
target_version: str,
batch_size: int = 100,
) -> dict:
"""Migrate all workflows from one version to another."""
migrated = 0
failed = 0
workflows = await self.store.find_by_version(
source_version, status="running"
)
for batch_start in range(0, len(workflows), batch_size):
batch = workflows[batch_start:batch_start + batch_size]
for wf in batch:
try:
await self.migrate_workflow(wf, target_version)
migrated += 1
except Exception as e:
failed += 1
logger.error(
f"Migration failed for {wf.id}: {e}"
)
return {"migrated": migrated, "failed": failed}
def _get_migrator(self, source: str, target: str) -> callable:
"""Return a function that transforms context between versions."""
key = f"{source}->{target}"
migrators = {
"1.0.0->2.0.0": self._migrate_1_to_2,
"2.0.0->2.1.0": self._migrate_2_to_2_1,
}
migrator = migrators.get(key)
if not migrator:
raise ValueError(f"No migrator for {key}")
return migrator
@staticmethod
def _migrate_1_to_2(context: dict) -> dict:
"""Migrate v1 context to v2 format."""
new_context = {**context}
# V2 renames raw_text to enriched_text
if "raw_text" in new_context:
new_context["enriched_text"] = new_context.pop("raw_text")
# V2 adds sources field
new_context.setdefault("sources", [])
return new_context
Rollback Procedures
Always plan for rollback. A failed migration should not leave workflows in an inconsistent state.
class SafeMigrator(WorkflowMigrator):
async def migrate_with_rollback(
self,
workflow: VersionedWorkflow,
target_version: str,
) -> VersionedWorkflow:
# Snapshot the original state
snapshot = {
"version": workflow.version,
"context": {**workflow.context},
"current_step": workflow.current_step,
}
try:
result = await self.migrate_workflow(workflow, target_version)
# Validate the migrated workflow
self._validate_workflow(result, target_version)
return result
except Exception as e:
# Rollback to snapshot
workflow.version = snapshot["version"]
workflow.context = snapshot["context"]
workflow.current_step = snapshot["current_step"]
await self.store.save(workflow)
raise RuntimeError(
f"Migration rolled back for {workflow.id}: {e}"
)
def _validate_workflow(
self, workflow: VersionedWorkflow, version: str
):
"""Ensure the migrated workflow has all required fields."""
required_fields = {
"2.0.0": ["enriched_text", "sources"],
"2.1.0": ["enriched_text", "sources", "model_config"],
}
for field in required_fields.get(version, []):
if field not in workflow.context:
raise ValueError(f"Missing required field: {field}")
Temporal-Specific Versioning
Temporal has built-in workflow versioning via the patched API, which handles replay compatibility automatically.
from temporalio import workflow
@workflow.defn
class ResearchAgentV2:
@workflow.run
async def run(self, task):
if workflow.patched("v2-add-enrichment"):
# New code path: includes enrichment step
enriched = await workflow.execute_activity(
enrich_data, args=[task.query],
start_to_close_timeout=timedelta(seconds=60),
)
else:
# Old code path: skip enrichment
enriched = task.query
result = await workflow.execute_activity(
summarize, args=[enriched],
start_to_close_timeout=timedelta(seconds=120),
)
return result
FAQ
How long should I keep old workflow versions running?
Keep old versions active until all in-flight workflows on that version complete. Monitor the count of running workflows per version and decommission old handlers when the count reaches zero. For workflows with unbounded duration (waiting for human input), set a maximum lifetime and force-complete or migrate stale instances.
Should I version the workflow definition or individual steps?
Version both, but at different granularities. Workflow definitions get major version bumps when the step graph changes (new steps added, steps removed, order changed). Individual steps get minor version bumps when their internal logic changes but their input/output contract stays the same.
What is the safest migration strategy for critical workflows?
Run both versions simultaneously with a feature flag. New workflows use the new version. Existing workflows continue on the old version until they complete naturally. This is the slowest approach but carries zero migration risk. Reserve active migration for non-critical workflows or when you need to force-upgrade for security or compliance reasons.
#WorkflowVersioning #Migration #ZeroDowntime #AIAgents #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.