Task Queues for AI Agents: Celery, RQ, and Dramatiq for Background Agent Jobs
Set up background task queues for AI agent workloads using Celery, RQ, and Dramatiq. Learn worker patterns, retry policies, and result backends for reliable agent job processing.
When asyncio Is Not Enough
asyncio excels at concurrent I/O within a single process. But many AI agent workloads need more: durable job processing that survives server restarts, distributed execution across multiple workers, scheduled recurring jobs, and reliable retry semantics. Task queues provide all of this.
Common AI agent use cases for task queues include: batch document processing, periodic knowledge base updates, long-running multi-step agent workflows that exceed HTTP request timeouts, and fan-out patterns where one user request triggers dozens of agent jobs.
Celery: The Industry Standard
Celery is the most widely deployed Python task queue. It supports multiple brokers (Redis, RabbitMQ), result backends, and has mature tooling for monitoring and administration.
# celery_app.py
from celery import Celery
app = Celery(
"ai_agents",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
task_acks_late=True, # Acknowledge after completion
worker_prefetch_multiplier=1, # One task at a time per worker
task_time_limit=300, # Hard 5-minute timeout
task_soft_time_limit=270, # Soft timeout triggers exception
)
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
)
def process_document(self, document_id: str, user_id: str) -> dict:
"""Process a document through an AI pipeline."""
try:
# Fetch document
doc = fetch_document(document_id)
# Run LLM analysis (synchronous in Celery workers)
summary = call_llm_sync(f"Summarize: {doc['content']}")
entities = call_llm_sync(f"Extract entities: {doc['content']}")
classification = call_llm_sync(
f"Classify this document: {doc['content']}"
)
result = {
"document_id": document_id,
"summary": summary,
"entities": entities,
"classification": classification,
}
save_results(document_id, result)
return result
except LLMRateLimitError as exc:
raise self.retry(exc=exc, countdown=120)
except LLMTimeoutError as exc:
raise self.retry(exc=exc, countdown=60)
Key Celery configuration choices for AI workloads:
- task_acks_late=True ensures a task is re-delivered if the worker crashes mid-execution. Critical for expensive LLM workflows.
- worker_prefetch_multiplier=1 prevents workers from grabbing tasks they cannot process immediately, important when each task takes seconds.
- task_soft_time_limit gives the task a chance to clean up before the hard timeout kills it.
RQ: Simplicity First
Redis Queue (RQ) trades Celery's feature richness for simplicity. It is an excellent choice when your requirements are straightforward and you already use Redis.
# tasks.py
import time
from redis import Redis
from rq import Queue
from rq.job import Job
redis_conn = Redis(host="localhost", port=6379)
agent_queue = Queue("agent_jobs", connection=redis_conn)
def run_agent_workflow(query: str, session_id: str) -> dict:
"""Background agent workflow — runs in RQ worker process."""
# Step 1: Retrieve context
docs = search_vector_store(query)
# Step 2: Call LLM
response = call_llm_sync(
prompt=query,
context=docs,
)
# Step 3: Store results
save_conversation(session_id, query, response)
return {"session_id": session_id, "response": response}
# Enqueue from your web application
job = agent_queue.enqueue(
run_agent_workflow,
"How do I configure async pipelines?",
"session_abc123",
job_timeout=300,
retry=3,
result_ttl=3600, # Keep results for 1 hour
)
# Check job status later
job = Job.fetch(job.id, connection=redis_conn)
if job.is_finished:
print(job.result)
elif job.is_failed:
print(f"Failed: {job.exc_info}")
Start workers with: rq worker agent_jobs --with-scheduler
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Dramatiq: Modern and Reliable
Dramatiq is a newer alternative that emphasizes reliability and performance. It supports both Redis and RabbitMQ as brokers.
# dramatiq_tasks.py
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results import Results
from dramatiq.results.backends.redis import RedisBackend
result_backend = RedisBackend(url="redis://localhost:6379/2")
broker = RedisBroker(url="redis://localhost:6379/0")
broker.add_middleware(Results(backend=result_backend))
dramatiq.set_broker(broker)
@dramatiq.actor(
max_retries=3,
min_backoff=30_000, # 30 seconds minimum retry delay
max_backoff=300_000, # 5 minutes maximum retry delay
time_limit=300_000, # 5-minute timeout
store_results=True,
)
def analyze_conversation(conversation_id: str) -> dict:
"""Analyze a conversation with multiple LLM passes."""
conversation = load_conversation(conversation_id)
transcript = conversation["transcript"]
sentiment = call_llm_sync(f"Analyze sentiment: {transcript}")
summary = call_llm_sync(f"Summarize: {transcript}")
action_items = call_llm_sync(f"Extract action items: {transcript}")
result = {
"conversation_id": conversation_id,
"sentiment": sentiment,
"summary": summary,
"action_items": action_items,
}
store_analysis(conversation_id, result)
return result
# Send the task
message = analyze_conversation.send("conv_12345")
# Retrieve results later
result = message.get_result(block=True, timeout=60_000)
Choosing the Right Queue
| Feature | Celery | RQ | Dramatiq |
|---|---|---|---|
| Broker support | Redis, RabbitMQ, SQS | Redis only | Redis, RabbitMQ |
| Complexity | High | Low | Medium |
| Async worker support | Yes (with gevent) | No | No (uses threads) |
| Monitoring | Flower, Celery Events | rq-dashboard | Built-in CLI |
| Best for | Complex workflows | Simple background jobs | Reliable processing |
For AI agent workloads, Celery is the safest choice for complex multi-step workflows. RQ works well for simple fire-and-forget LLM jobs. Dramatiq is the best balance of simplicity and reliability.
Retry Policies for LLM Tasks
LLM API calls fail in predictable ways. Configure retries accordingly.
# Celery retry configuration per failure type
@app.task(bind=True, max_retries=5)
def smart_retry_task(self, prompt: str):
try:
return call_llm_sync(prompt)
except RateLimitError as exc:
# Back off aggressively for rate limits
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 30)
except TimeoutError as exc:
# Moderate backoff for timeouts
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 10)
except ServerError as exc:
# Quick retry for transient server errors
raise self.retry(exc=exc, countdown=5)
except AuthenticationError:
# Never retry auth failures
raise
FAQ
Can I use asyncio inside Celery workers?
Yes, but it requires careful setup. You can run asyncio.run() inside a task function, or use Celery's gevent or eventlet pool. However, mixing Celery's process model with asyncio adds complexity. For purely I/O-bound workloads, consider using an async-native queue like arq (which runs on asyncio natively) or keeping asyncio and Celery separate — use Celery for job dispatch and asyncio within each job.
How do I handle long-running agent workflows that take 10+ minutes?
Set task_time_limit high enough to accommodate the workflow. Use task_soft_time_limit to detect approaching timeouts and checkpoint progress. For very long workflows, break them into multiple chained tasks using Celery chains or chords, so each step is individually retryable.
What result backend should I use?
Redis is the simplest and works well for results you query within minutes. For durable results, use PostgreSQL or MongoDB. Set result_expires to auto-clean old results. For AI workloads, store the actual analysis results in your application database and use the task queue result backend only for job status tracking.
#Python #TaskQueues #Celery #BackgroundJobs #AIAgents #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.