Skip to content
Learn Agentic AI14 min read0 views

Task Queues for AI Agents: Celery, RQ, and Dramatiq for Background Agent Jobs

Set up background task queues for AI agent workloads using Celery, RQ, and Dramatiq. Learn worker patterns, retry policies, and result backends for reliable agent job processing.

When asyncio Is Not Enough

asyncio excels at concurrent I/O within a single process. But many AI agent workloads need more: durable job processing that survives server restarts, distributed execution across multiple workers, scheduled recurring jobs, and reliable retry semantics. Task queues provide all of this.

Common AI agent use cases for task queues include: batch document processing, periodic knowledge base updates, long-running multi-step agent workflows that exceed HTTP request timeouts, and fan-out patterns where one user request triggers dozens of agent jobs.

Celery: The Industry Standard

Celery is the most widely deployed Python task queue. It supports multiple brokers (Redis, RabbitMQ), result backends, and has mature tooling for monitoring and administration.

# celery_app.py
from celery import Celery

app = Celery(
    "ai_agents",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_acks_late=True,          # Acknowledge after completion
    worker_prefetch_multiplier=1,  # One task at a time per worker
    task_time_limit=300,           # Hard 5-minute timeout
    task_soft_time_limit=270,      # Soft timeout triggers exception
)

@app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
)
def process_document(self, document_id: str, user_id: str) -> dict:
    """Process a document through an AI pipeline."""
    try:
        # Fetch document
        doc = fetch_document(document_id)

        # Run LLM analysis (synchronous in Celery workers)
        summary = call_llm_sync(f"Summarize: {doc['content']}")
        entities = call_llm_sync(f"Extract entities: {doc['content']}")
        classification = call_llm_sync(
            f"Classify this document: {doc['content']}"
        )

        result = {
            "document_id": document_id,
            "summary": summary,
            "entities": entities,
            "classification": classification,
        }
        save_results(document_id, result)
        return result

    except LLMRateLimitError as exc:
        raise self.retry(exc=exc, countdown=120)
    except LLMTimeoutError as exc:
        raise self.retry(exc=exc, countdown=60)

Key Celery configuration choices for AI workloads:

  • task_acks_late=True ensures a task is re-delivered if the worker crashes mid-execution. Critical for expensive LLM workflows.
  • worker_prefetch_multiplier=1 prevents workers from grabbing tasks they cannot process immediately, important when each task takes seconds.
  • task_soft_time_limit gives the task a chance to clean up before the hard timeout kills it.

RQ: Simplicity First

Redis Queue (RQ) trades Celery's feature richness for simplicity. It is an excellent choice when your requirements are straightforward and you already use Redis.

# tasks.py
import time
from redis import Redis
from rq import Queue
from rq.job import Job

redis_conn = Redis(host="localhost", port=6379)
agent_queue = Queue("agent_jobs", connection=redis_conn)

def run_agent_workflow(query: str, session_id: str) -> dict:
    """Background agent workflow — runs in RQ worker process."""
    # Step 1: Retrieve context
    docs = search_vector_store(query)

    # Step 2: Call LLM
    response = call_llm_sync(
        prompt=query,
        context=docs,
    )

    # Step 3: Store results
    save_conversation(session_id, query, response)
    return {"session_id": session_id, "response": response}

# Enqueue from your web application
job = agent_queue.enqueue(
    run_agent_workflow,
    "How do I configure async pipelines?",
    "session_abc123",
    job_timeout=300,
    retry=3,
    result_ttl=3600,  # Keep results for 1 hour
)

# Check job status later
job = Job.fetch(job.id, connection=redis_conn)
if job.is_finished:
    print(job.result)
elif job.is_failed:
    print(f"Failed: {job.exc_info}")

Start workers with: rq worker agent_jobs --with-scheduler

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Dramatiq: Modern and Reliable

Dramatiq is a newer alternative that emphasizes reliability and performance. It supports both Redis and RabbitMQ as brokers.

# dramatiq_tasks.py
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results import Results
from dramatiq.results.backends.redis import RedisBackend

result_backend = RedisBackend(url="redis://localhost:6379/2")
broker = RedisBroker(url="redis://localhost:6379/0")
broker.add_middleware(Results(backend=result_backend))
dramatiq.set_broker(broker)

@dramatiq.actor(
    max_retries=3,
    min_backoff=30_000,     # 30 seconds minimum retry delay
    max_backoff=300_000,    # 5 minutes maximum retry delay
    time_limit=300_000,     # 5-minute timeout
    store_results=True,
)
def analyze_conversation(conversation_id: str) -> dict:
    """Analyze a conversation with multiple LLM passes."""
    conversation = load_conversation(conversation_id)
    transcript = conversation["transcript"]

    sentiment = call_llm_sync(f"Analyze sentiment: {transcript}")
    summary = call_llm_sync(f"Summarize: {transcript}")
    action_items = call_llm_sync(f"Extract action items: {transcript}")

    result = {
        "conversation_id": conversation_id,
        "sentiment": sentiment,
        "summary": summary,
        "action_items": action_items,
    }
    store_analysis(conversation_id, result)
    return result

# Send the task
message = analyze_conversation.send("conv_12345")

# Retrieve results later
result = message.get_result(block=True, timeout=60_000)

Choosing the Right Queue

Feature Celery RQ Dramatiq
Broker support Redis, RabbitMQ, SQS Redis only Redis, RabbitMQ
Complexity High Low Medium
Async worker support Yes (with gevent) No No (uses threads)
Monitoring Flower, Celery Events rq-dashboard Built-in CLI
Best for Complex workflows Simple background jobs Reliable processing

For AI agent workloads, Celery is the safest choice for complex multi-step workflows. RQ works well for simple fire-and-forget LLM jobs. Dramatiq is the best balance of simplicity and reliability.

Retry Policies for LLM Tasks

LLM API calls fail in predictable ways. Configure retries accordingly.

# Celery retry configuration per failure type
@app.task(bind=True, max_retries=5)
def smart_retry_task(self, prompt: str):
    try:
        return call_llm_sync(prompt)
    except RateLimitError as exc:
        # Back off aggressively for rate limits
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 30)
    except TimeoutError as exc:
        # Moderate backoff for timeouts
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 10)
    except ServerError as exc:
        # Quick retry for transient server errors
        raise self.retry(exc=exc, countdown=5)
    except AuthenticationError:
        # Never retry auth failures
        raise

FAQ

Can I use asyncio inside Celery workers?

Yes, but it requires careful setup. You can run asyncio.run() inside a task function, or use Celery's gevent or eventlet pool. However, mixing Celery's process model with asyncio adds complexity. For purely I/O-bound workloads, consider using an async-native queue like arq (which runs on asyncio natively) or keeping asyncio and Celery separate — use Celery for job dispatch and asyncio within each job.

How do I handle long-running agent workflows that take 10+ minutes?

Set task_time_limit high enough to accommodate the workflow. Use task_soft_time_limit to detect approaching timeouts and checkpoint progress. For very long workflows, break them into multiple chained tasks using Celery chains or chords, so each step is individually retryable.

What result backend should I use?

Redis is the simplest and works well for results you query within minutes. For durable results, use PostgreSQL or MongoDB. Set result_expires to auto-clean old results. For AI workloads, store the actual analysis results in your application database and use the task queue result backend only for job status tracking.


#Python #TaskQueues #Celery #BackgroundJobs #AIAgents #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.