Skip to content
Learn Agentic AI14 min read0 views

Temporal for AI Agent Workflows: Durable Execution and Workflow-as-Code

Learn how Temporal provides durable execution guarantees for AI agent workflows. Covers workflow definition, activities, automatic retries, and state management for long-running agent pipelines.

Why Durable Execution Matters for AI Agents

AI agent workflows frequently span minutes or hours. A research agent might call an LLM, scrape web pages, run code analysis, and synthesize results across dozens of steps. If any step fails — a network timeout, an API rate limit, a transient LLM error — naive implementations lose all progress and must restart from scratch.

Temporal solves this with durable execution. Every step in a workflow is automatically checkpointed. If a worker crashes mid-execution, another worker picks up the workflow exactly where it left off. No lost state. No duplicate side effects. No manual retry logic scattered throughout your code.

This matters enormously for AI agents because LLM calls are expensive, slow, and non-deterministic. You do not want to re-run a 30-step research pipeline because step 28 hit a transient error.

Core Temporal Concepts

Temporal separates workflows (deterministic orchestration logic) from activities (non-deterministic side effects like API calls). Workflows define the control flow. Activities do the actual work.

from temporalio import workflow, activity
from dataclasses import dataclass
import asyncio

@dataclass
class AgentTask:
    query: str
    max_steps: int = 10
    model: str = "gpt-4"

@dataclass
class AgentResult:
    answer: str
    steps_taken: int
    sources: list[str]

Defining Activities

Activities encapsulate each unit of work your agent performs. They run in activity workers and can be retried independently.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from temporalio import activity
from datetime import timedelta
import httpx

@activity.defn
async def call_llm(prompt: str, model: str) -> str:
    """Call an LLM with automatic retry on transient failures."""
    async with httpx.AsyncClient(timeout=60) as client:
        response = await client.post(
            "https://api.openai.com/v1/chat/completions",
            headers={"Authorization": f"Bearer {API_KEY}"},
            json={
                "model": model,
                "messages": [{"role": "user", "content": prompt}],
            },
        )
        response.raise_for_status()
        return response.json()["choices"][0]["message"]["content"]

@activity.defn
async def search_web(query: str) -> list[str]:
    """Search the web and return relevant snippets."""
    async with httpx.AsyncClient(timeout=30) as client:
        response = await client.get(
            "https://api.search-provider.com/search",
            params={"q": query, "limit": 5},
        )
        response.raise_for_status()
        return [r["snippet"] for r in response.json()["results"]]

@activity.defn
async def store_result(task_id: str, result: dict) -> None:
    """Persist the final result to a database."""
    # Database write logic here
    activity.logger.info(f"Stored result for task {task_id}")

Building the Workflow

The workflow orchestrates activities in a deterministic sequence. Temporal replays this function from the event history on recovery, so it must not contain side effects directly.

from temporalio import workflow
from datetime import timedelta

@workflow.defn
class ResearchAgentWorkflow:
    @workflow.run
    async def run(self, task: AgentTask) -> AgentResult:
        sources = []
        steps = 0

        # Step 1: Plan the research
        plan = await workflow.execute_activity(
            call_llm,
            args=[f"Create a research plan for: {task.query}", task.model],
            start_to_close_timeout=timedelta(seconds=120),
            retry_policy=RetryPolicy(
                initial_interval=timedelta(seconds=2),
                maximum_interval=timedelta(seconds=60),
                maximum_attempts=5,
            ),
        )
        steps += 1

        # Step 2: Execute searches based on plan
        search_results = await workflow.execute_activity(
            search_web,
            args=[task.query],
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=RetryPolicy(maximum_attempts=3),
        )
        sources.extend(search_results)
        steps += 1

        # Step 3: Synthesize findings
        synthesis_prompt = (
            f"Based on these sources, answer: {task.query}\n"
            f"Sources: {search_results}"
        )
        answer = await workflow.execute_activity(
            call_llm,
            args=[synthesis_prompt, task.model],
            start_to_close_timeout=timedelta(seconds=120),
            retry_policy=RetryPolicy(maximum_attempts=5),
        )
        steps += 1

        return AgentResult(
            answer=answer,
            steps_taken=steps,
            sources=sources,
        )

Running the Worker and Client

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.common import RetryPolicy

async def main():
    client = await Client.connect("localhost:7233")

    # Start a worker
    worker = Worker(
        client,
        task_queue="agent-tasks",
        workflows=[ResearchAgentWorkflow],
        activities=[call_llm, search_web, store_result],
    )

    # Run worker in background
    async with worker:
        # Start a workflow execution
        result = await client.execute_workflow(
            ResearchAgentWorkflow.run,
            AgentTask(query="What are the latest advances in RAG?"),
            id="research-rag-2026",
            task_queue="agent-tasks",
        )
        print(f"Answer: {result.answer}")

asyncio.run(main())

State Management and Signals

Temporal workflows can receive external signals, allowing human-in-the-loop patterns for agent oversight.

@workflow.defn
class SupervisedAgentWorkflow:
    def __init__(self):
        self.approved = False
        self.feedback = ""

    @workflow.signal
    async def approve(self, feedback: str):
        self.approved = True
        self.feedback = feedback

    @workflow.run
    async def run(self, task: AgentTask) -> AgentResult:
        draft = await workflow.execute_activity(
            call_llm,
            args=[task.query, task.model],
            start_to_close_timeout=timedelta(seconds=120),
        )

        # Wait for human approval
        await workflow.wait_condition(lambda: self.approved)

        # Incorporate feedback and finalize
        final = await workflow.execute_activity(
            call_llm,
            args=[
                f"Revise this based on feedback: {self.feedback}\n{draft}",
                task.model,
            ],
            start_to_close_timeout=timedelta(seconds=120),
        )
        return AgentResult(answer=final, steps_taken=2, sources=[])

FAQ

When should I choose Temporal over simpler retry libraries?

Use Temporal when your agent workflow has more than a few steps, takes longer than a few seconds, or must survive process restarts. Simple retry decorators work for single API calls, but they cannot checkpoint multi-step progress or coordinate across distributed workers.

Does Temporal add significant latency to each step?

Temporal adds roughly 10-50 milliseconds of overhead per activity dispatch for event history persistence. For AI agent workflows where individual LLM calls take 1-30 seconds, this overhead is negligible.

Can I run Temporal workflows locally during development?

Yes. Use the Temporal CLI to start a local development server with temporal server start-dev. This gives you a fully functional Temporal cluster with a web UI for inspecting workflow execution histories.


#Temporal #WorkflowOrchestration #DurableExecution #AIAgents #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.