Long-Running Agent Tasks: Durable Execution with Temporal and Celery
Discover how to build resilient long-running agent workflows using durable execution engines like Temporal and Celery, with activity retries, saga patterns, and persistent state across process restarts.
The Problem with Ephemeral Agent Runs
Most agent frameworks treat each invocation as a short-lived function call. The agent receives a prompt, calls some tools, and returns a result — all within a single process lifetime. But real-world agent tasks often take minutes, hours, or even days. A due diligence agent might need to collect data from 50 sources over several hours. A monitoring agent runs indefinitely.
When these long-running tasks crash — and they will — you lose all progress. The agent has no memory of which steps completed, what intermediate results were produced, or where it left off. This is where durable execution comes in.
Durable Execution: The Core Idea
Durable execution means that workflow state survives process failures. If the worker crashes after completing step 3 of 10, it resumes at step 4 when restarted — not step 1. Two popular approaches in the Python ecosystem are Temporal and Celery.
Building Agent Workflows with Temporal
Temporal separates workflows (orchestration logic) from activities (actual work). The workflow is deterministic and replayed on failure. Activities are the non-deterministic side-effecting operations.
from temporalio import workflow, activity
from datetime import timedelta
import asyncio
@activity.defn
async def fetch_source_data(source_url: str) -> dict:
"""Activity: fetch data from a single source."""
# This runs in a worker and can be retried independently
import httpx
async with httpx.AsyncClient() as client:
response = await client.get(source_url, timeout=30)
return response.json()
@activity.defn
async def analyze_with_llm(data: dict) -> str:
"""Activity: send collected data to an LLM for analysis."""
from openai import AsyncOpenAI
client = AsyncOpenAI()
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Analyze the following data."},
{"role": "user", "content": str(data)},
],
)
return response.choices[0].message.content
@workflow.defn
class ResearchWorkflow:
"""Durable workflow that survives crashes."""
@workflow.run
async def run(self, sources: list[str]) -> str:
# Each activity call is persisted to Temporal history
collected = []
for source in sources:
data = await workflow.execute_activity(
fetch_source_data,
source,
start_to_close_timeout=timedelta(minutes=2),
retry_policy=RetryPolicy(maximum_attempts=3),
)
collected.append(data)
# If the worker crashes here, it resumes AFTER the loop
analysis = await workflow.execute_activity(
analyze_with_llm,
{"sources": collected},
start_to_close_timeout=timedelta(minutes=5),
)
return analysis
If the worker crashes after fetching 8 of 10 sources, Temporal replays the workflow history. It skips the 8 completed activities (their results are stored) and resumes fetching source 9.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Saga Pattern for Multi-Step Compensation
When a long workflow fails partway through, you often need to undo earlier steps. The saga pattern pairs each action with a compensation:
from dataclasses import dataclass
from typing import Callable, Awaitable
@dataclass
class SagaStep:
action: Callable[..., Awaitable]
compensation: Callable[..., Awaitable]
name: str
class SagaOrchestrator:
def __init__(self):
self.completed_steps: list[SagaStep] = []
async def execute(self, steps: list[SagaStep], context: dict):
for step in steps:
try:
await step.action(context)
self.completed_steps.append(step)
except Exception as e:
print(f"Step '{step.name}' failed: {e}")
await self.compensate()
raise
async def compensate(self):
"""Roll back completed steps in reverse order."""
for step in reversed(self.completed_steps):
try:
await step.compensation({})
except Exception as comp_error:
print(f"Compensation for '{step.name}' failed: {comp_error}")
Celery for Simpler Durability
If Temporal feels heavyweight, Celery provides task queuing with retries and result persistence:
from celery import Celery, chain
app = Celery("agent_tasks", broker="redis://localhost:6379/0")
app.conf.result_backend = "redis://localhost:6379/1"
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def fetch_data(self, source_url: str):
try:
import httpx
response = httpx.get(source_url, timeout=30)
return response.json()
except Exception as exc:
self.retry(exc=exc)
@app.task
def analyze_data(data: dict):
# LLM analysis step
return {"analysis": "completed", "data": data}
# Chain tasks: fetch then analyze
pipeline = chain(
fetch_data.s("https://api.example.com/data"),
analyze_data.s(),
)
result = pipeline.apply_async()
State Persistence Strategies
Regardless of the engine, persist your agent state at meaningful checkpoints:
import json
from pathlib import Path
class CheckpointManager:
def __init__(self, workflow_id: str, storage_dir: str = "./checkpoints"):
self.path = Path(storage_dir) / f"{workflow_id}.json"
self.path.parent.mkdir(parents=True, exist_ok=True)
def save(self, state: dict):
self.path.write_text(json.dumps(state, default=str))
def load(self) -> dict | None:
if self.path.exists():
return json.loads(self.path.read_text())
return None
def clear(self):
self.path.unlink(missing_ok=True)
FAQ
When should I use Temporal versus Celery for agent workflows?
Use Temporal when your workflow has complex branching, long-running wait states (hours or days), or when you need the replay guarantee that ensures exactly-once semantics. Use Celery when you need a simple task queue with retries and your workflows are linear chains of tasks without complex orchestration logic.
How does Temporal replay work without re-executing completed activities?
Temporal records every activity completion in its event history. During replay, the workflow code runs again, but when it hits execute_activity, Temporal checks the history. If that activity already completed, it returns the stored result immediately instead of dispatching it to a worker. This makes replay deterministic and fast.
What happens to in-flight LLM calls when a worker crashes?
The LLM call becomes orphaned — the API may still process it, but the result is lost. Temporal handles this with activity timeouts and retries. When the worker restarts and replays, it re-dispatches the activity. To avoid paying for the orphaned call, set short start_to_close_timeout values and implement idempotency on your LLM wrapper so duplicate calls return cached results.
#Temporal #Celery #DurableExecution #WorkflowEngines #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.