Skip to content
Learn Agentic AI12 min read0 views

Long-Running Agent Tasks: Durable Execution with Temporal and Celery

Discover how to build resilient long-running agent workflows using durable execution engines like Temporal and Celery, with activity retries, saga patterns, and persistent state across process restarts.

The Problem with Ephemeral Agent Runs

Most agent frameworks treat each invocation as a short-lived function call. The agent receives a prompt, calls some tools, and returns a result — all within a single process lifetime. But real-world agent tasks often take minutes, hours, or even days. A due diligence agent might need to collect data from 50 sources over several hours. A monitoring agent runs indefinitely.

When these long-running tasks crash — and they will — you lose all progress. The agent has no memory of which steps completed, what intermediate results were produced, or where it left off. This is where durable execution comes in.

Durable Execution: The Core Idea

Durable execution means that workflow state survives process failures. If the worker crashes after completing step 3 of 10, it resumes at step 4 when restarted — not step 1. Two popular approaches in the Python ecosystem are Temporal and Celery.

Building Agent Workflows with Temporal

Temporal separates workflows (orchestration logic) from activities (actual work). The workflow is deterministic and replayed on failure. Activities are the non-deterministic side-effecting operations.

from temporalio import workflow, activity
from datetime import timedelta
import asyncio

@activity.defn
async def fetch_source_data(source_url: str) -> dict:
    """Activity: fetch data from a single source."""
    # This runs in a worker and can be retried independently
    import httpx
    async with httpx.AsyncClient() as client:
        response = await client.get(source_url, timeout=30)
        return response.json()

@activity.defn
async def analyze_with_llm(data: dict) -> str:
    """Activity: send collected data to an LLM for analysis."""
    from openai import AsyncOpenAI
    client = AsyncOpenAI()
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Analyze the following data."},
            {"role": "user", "content": str(data)},
        ],
    )
    return response.choices[0].message.content

@workflow.defn
class ResearchWorkflow:
    """Durable workflow that survives crashes."""

    @workflow.run
    async def run(self, sources: list[str]) -> str:
        # Each activity call is persisted to Temporal history
        collected = []
        for source in sources:
            data = await workflow.execute_activity(
                fetch_source_data,
                source,
                start_to_close_timeout=timedelta(minutes=2),
                retry_policy=RetryPolicy(maximum_attempts=3),
            )
            collected.append(data)

        # If the worker crashes here, it resumes AFTER the loop
        analysis = await workflow.execute_activity(
            analyze_with_llm,
            {"sources": collected},
            start_to_close_timeout=timedelta(minutes=5),
        )
        return analysis

If the worker crashes after fetching 8 of 10 sources, Temporal replays the workflow history. It skips the 8 completed activities (their results are stored) and resumes fetching source 9.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Saga Pattern for Multi-Step Compensation

When a long workflow fails partway through, you often need to undo earlier steps. The saga pattern pairs each action with a compensation:

from dataclasses import dataclass
from typing import Callable, Awaitable

@dataclass
class SagaStep:
    action: Callable[..., Awaitable]
    compensation: Callable[..., Awaitable]
    name: str

class SagaOrchestrator:
    def __init__(self):
        self.completed_steps: list[SagaStep] = []

    async def execute(self, steps: list[SagaStep], context: dict):
        for step in steps:
            try:
                await step.action(context)
                self.completed_steps.append(step)
            except Exception as e:
                print(f"Step '{step.name}' failed: {e}")
                await self.compensate()
                raise

    async def compensate(self):
        """Roll back completed steps in reverse order."""
        for step in reversed(self.completed_steps):
            try:
                await step.compensation({})
            except Exception as comp_error:
                print(f"Compensation for '{step.name}' failed: {comp_error}")

Celery for Simpler Durability

If Temporal feels heavyweight, Celery provides task queuing with retries and result persistence:

from celery import Celery, chain

app = Celery("agent_tasks", broker="redis://localhost:6379/0")
app.conf.result_backend = "redis://localhost:6379/1"

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def fetch_data(self, source_url: str):
    try:
        import httpx
        response = httpx.get(source_url, timeout=30)
        return response.json()
    except Exception as exc:
        self.retry(exc=exc)

@app.task
def analyze_data(data: dict):
    # LLM analysis step
    return {"analysis": "completed", "data": data}

# Chain tasks: fetch then analyze
pipeline = chain(
    fetch_data.s("https://api.example.com/data"),
    analyze_data.s(),
)
result = pipeline.apply_async()

State Persistence Strategies

Regardless of the engine, persist your agent state at meaningful checkpoints:

import json
from pathlib import Path

class CheckpointManager:
    def __init__(self, workflow_id: str, storage_dir: str = "./checkpoints"):
        self.path = Path(storage_dir) / f"{workflow_id}.json"
        self.path.parent.mkdir(parents=True, exist_ok=True)

    def save(self, state: dict):
        self.path.write_text(json.dumps(state, default=str))

    def load(self) -> dict | None:
        if self.path.exists():
            return json.loads(self.path.read_text())
        return None

    def clear(self):
        self.path.unlink(missing_ok=True)

FAQ

When should I use Temporal versus Celery for agent workflows?

Use Temporal when your workflow has complex branching, long-running wait states (hours or days), or when you need the replay guarantee that ensures exactly-once semantics. Use Celery when you need a simple task queue with retries and your workflows are linear chains of tasks without complex orchestration logic.

How does Temporal replay work without re-executing completed activities?

Temporal records every activity completion in its event history. During replay, the workflow code runs again, but when it hits execute_activity, Temporal checks the history. If that activity already completed, it returns the stored result immediately instead of dispatching it to a worker. This makes replay deterministic and fast.

What happens to in-flight LLM calls when a worker crashes?

The LLM call becomes orphaned — the API may still process it, but the result is lost. Temporal handles this with activity timeouts and retries. When the worker restarts and replays, it re-dispatches the activity. To avoid paying for the orphaned call, set short start_to_close_timeout values and implement idempotency on your LLM wrapper so duplicate calls return cached results.


#Temporal #Celery #DurableExecution #WorkflowEngines #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.