Temporal for AI Agent Workflows: Durable Execution and Workflow-as-Code
Learn how Temporal provides durable execution guarantees for AI agent workflows. Covers workflow definition, activities, automatic retries, and state management for long-running agent pipelines.
Why Durable Execution Matters for AI Agents
AI agent workflows frequently span minutes or hours. A research agent might call an LLM, scrape web pages, run code analysis, and synthesize results across dozens of steps. If any step fails — a network timeout, an API rate limit, a transient LLM error — naive implementations lose all progress and must restart from scratch.
Temporal solves this with durable execution. Every step in a workflow is automatically checkpointed. If a worker crashes mid-execution, another worker picks up the workflow exactly where it left off. No lost state. No duplicate side effects. No manual retry logic scattered throughout your code.
This matters enormously for AI agents because LLM calls are expensive, slow, and non-deterministic. You do not want to re-run a 30-step research pipeline because step 28 hit a transient error.
Core Temporal Concepts
Temporal separates workflows (deterministic orchestration logic) from activities (non-deterministic side effects like API calls). Workflows define the control flow. Activities do the actual work.
from temporalio import workflow, activity
from dataclasses import dataclass
import asyncio
@dataclass
class AgentTask:
query: str
max_steps: int = 10
model: str = "gpt-4"
@dataclass
class AgentResult:
answer: str
steps_taken: int
sources: list[str]
Defining Activities
Activities encapsulate each unit of work your agent performs. They run in activity workers and can be retried independently.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from temporalio import activity
from datetime import timedelta
import httpx
@activity.defn
async def call_llm(prompt: str, model: str) -> str:
"""Call an LLM with automatic retry on transient failures."""
async with httpx.AsyncClient(timeout=60) as client:
response = await client.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
},
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
@activity.defn
async def search_web(query: str) -> list[str]:
"""Search the web and return relevant snippets."""
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(
"https://api.search-provider.com/search",
params={"q": query, "limit": 5},
)
response.raise_for_status()
return [r["snippet"] for r in response.json()["results"]]
@activity.defn
async def store_result(task_id: str, result: dict) -> None:
"""Persist the final result to a database."""
# Database write logic here
activity.logger.info(f"Stored result for task {task_id}")
Building the Workflow
The workflow orchestrates activities in a deterministic sequence. Temporal replays this function from the event history on recovery, so it must not contain side effects directly.
from temporalio import workflow
from datetime import timedelta
@workflow.defn
class ResearchAgentWorkflow:
@workflow.run
async def run(self, task: AgentTask) -> AgentResult:
sources = []
steps = 0
# Step 1: Plan the research
plan = await workflow.execute_activity(
call_llm,
args=[f"Create a research plan for: {task.query}", task.model],
start_to_close_timeout=timedelta(seconds=120),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=2),
maximum_interval=timedelta(seconds=60),
maximum_attempts=5,
),
)
steps += 1
# Step 2: Execute searches based on plan
search_results = await workflow.execute_activity(
search_web,
args=[task.query],
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(maximum_attempts=3),
)
sources.extend(search_results)
steps += 1
# Step 3: Synthesize findings
synthesis_prompt = (
f"Based on these sources, answer: {task.query}\n"
f"Sources: {search_results}"
)
answer = await workflow.execute_activity(
call_llm,
args=[synthesis_prompt, task.model],
start_to_close_timeout=timedelta(seconds=120),
retry_policy=RetryPolicy(maximum_attempts=5),
)
steps += 1
return AgentResult(
answer=answer,
steps_taken=steps,
sources=sources,
)
Running the Worker and Client
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.common import RetryPolicy
async def main():
client = await Client.connect("localhost:7233")
# Start a worker
worker = Worker(
client,
task_queue="agent-tasks",
workflows=[ResearchAgentWorkflow],
activities=[call_llm, search_web, store_result],
)
# Run worker in background
async with worker:
# Start a workflow execution
result = await client.execute_workflow(
ResearchAgentWorkflow.run,
AgentTask(query="What are the latest advances in RAG?"),
id="research-rag-2026",
task_queue="agent-tasks",
)
print(f"Answer: {result.answer}")
asyncio.run(main())
State Management and Signals
Temporal workflows can receive external signals, allowing human-in-the-loop patterns for agent oversight.
@workflow.defn
class SupervisedAgentWorkflow:
def __init__(self):
self.approved = False
self.feedback = ""
@workflow.signal
async def approve(self, feedback: str):
self.approved = True
self.feedback = feedback
@workflow.run
async def run(self, task: AgentTask) -> AgentResult:
draft = await workflow.execute_activity(
call_llm,
args=[task.query, task.model],
start_to_close_timeout=timedelta(seconds=120),
)
# Wait for human approval
await workflow.wait_condition(lambda: self.approved)
# Incorporate feedback and finalize
final = await workflow.execute_activity(
call_llm,
args=[
f"Revise this based on feedback: {self.feedback}\n{draft}",
task.model,
],
start_to_close_timeout=timedelta(seconds=120),
)
return AgentResult(answer=final, steps_taken=2, sources=[])
FAQ
When should I choose Temporal over simpler retry libraries?
Use Temporal when your agent workflow has more than a few steps, takes longer than a few seconds, or must survive process restarts. Simple retry decorators work for single API calls, but they cannot checkpoint multi-step progress or coordinate across distributed workers.
Does Temporal add significant latency to each step?
Temporal adds roughly 10-50 milliseconds of overhead per activity dispatch for event history persistence. For AI agent workflows where individual LLM calls take 1-30 seconds, this overhead is negligible.
Can I run Temporal workflows locally during development?
Yes. Use the Temporal CLI to start a local development server with temporal server start-dev. This gives you a fully functional Temporal cluster with a web UI for inspecting workflow execution histories.
#Temporal #WorkflowOrchestration #DurableExecution #AIAgents #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.