Message Queues for AI Agent Workloads: RabbitMQ, SQS, and Kafka Patterns
Explore how to use message queues like RabbitMQ, Amazon SQS, and Apache Kafka to manage AI agent workloads with reliable delivery, backpressure handling, dead letter queues, and consumer scaling patterns.
Why AI Agents Need Message Queues
AI agent tasks — generating reports, processing documents, running multi-step tool chains — are slow operations that can take seconds to minutes. Running these inline in an HTTP request handler leads to timeouts, failed requests under load, and no ability to retry failures. Message queues decouple the request from the processing, letting you accept work immediately, process it asynchronously, and scale consumers independently.
The core pattern is simple: an API server publishes a task to a queue, and one or more worker processes consume tasks, execute the agent logic, and report results. This architecture handles bursty traffic gracefully because the queue absorbs spikes that would overwhelm direct server processing.
Choosing the Right Queue
Each queue technology has different strengths for AI agent workloads:
RabbitMQ excels at task routing with exchanges and bindings. Use it when you need to route different agent task types to specialized worker pools — one queue for summarization agents, another for research agents, a third for code generation agents.
Amazon SQS is the simplest option for AWS-native deployments. Standard queues offer at-least-once delivery with nearly unlimited throughput. FIFO queues maintain ordering per message group, useful when agent tasks for the same user must execute sequentially.
Apache Kafka is best when you need durable event logs and replay capability. If you want to reprocess all agent interactions from the past week with a new prompt template, Kafka's retention makes that possible.
Producer Pattern with RabbitMQ
Here is a robust producer that publishes AI agent tasks with priority and expiration:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import pika
import json
import uuid
class AgentTaskProducer:
def __init__(self, amqp_url: str):
self.connection = pika.BlockingConnection(
pika.URLParameters(amqp_url)
)
self.channel = self.connection.channel()
self.channel.queue_declare(
queue="agent_tasks",
durable=True,
arguments={
"x-dead-letter-exchange": "agent_dlx",
"x-dead-letter-routing-key": "failed_tasks",
"x-max-priority": 10,
"x-message-ttl": 300000, # 5 minute expiry
},
)
def publish_task(self, task_type: str, payload: dict, priority: int = 5):
task_id = str(uuid.uuid4())
message = {
"task_id": task_id,
"task_type": task_type,
"payload": payload,
}
self.channel.basic_publish(
exchange="",
routing_key="agent_tasks",
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # persistent
priority=priority,
message_id=task_id,
content_type="application/json",
),
)
return task_id
The declaration includes a dead letter exchange so that messages that fail after retries or expire are routed to a separate queue for investigation rather than being lost.
Consumer with Backpressure Control
The consumer uses prefetch to limit how many unacknowledged messages a single worker holds. This is critical for AI agent tasks because each task may consume significant memory and take seconds to process:
import pika
import json
import traceback
def create_consumer(amqp_url: str, concurrency: int = 3):
connection = pika.BlockingConnection(
pika.URLParameters(amqp_url)
)
channel = connection.channel()
# Only deliver N unacked messages at a time
channel.basic_qos(prefetch_count=concurrency)
def on_message(ch, method, properties, body):
task = json.loads(body)
try:
result = process_agent_task(task)
store_result(task["task_id"], result)
ch.basic_ack(delivery_tag=method.delivery_tag)
except RetryableError:
# Requeue for retry with a limit
retry_count = (properties.headers or {}).get("x-retry-count", 0)
if retry_count < 3:
ch.basic_reject(
delivery_tag=method.delivery_tag, requeue=True
)
else:
ch.basic_reject(
delivery_tag=method.delivery_tag, requeue=False
) # goes to DLQ
except Exception:
traceback.print_exc()
ch.basic_reject(
delivery_tag=method.delivery_tag, requeue=False
)
channel.basic_consume(
queue="agent_tasks", on_message_callback=on_message
)
channel.start_consuming()
Setting prefetch_count=3 means each worker handles at most three agent tasks concurrently. If a worker falls behind, new messages stay in the queue and get picked up by other workers or by new workers that scale up.
Dead Letter Queue Processing
Failed agent tasks land in the dead letter queue. Set up a separate consumer that logs failures, alerts on patterns, and optionally retries with modified parameters:
def process_dead_letters(amqp_url: str):
connection = pika.BlockingConnection(
pika.URLParameters(amqp_url)
)
channel = connection.channel()
channel.queue_declare(queue="failed_tasks", durable=True)
def on_dead_letter(ch, method, properties, body):
task = json.loads(body)
death_info = (properties.headers or {}).get("x-death", [{}])[0]
failure_reason = death_info.get("reason", "unknown")
log_failure(
task_id=task["task_id"],
task_type=task["task_type"],
reason=failure_reason,
original_queue=death_info.get("queue"),
)
if failure_reason == "expired":
alert_ops(f"Task {task['task_id']} expired before processing")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue="failed_tasks", on_message_callback=on_dead_letter
)
channel.start_consuming()
FAQ
When should I use Kafka instead of RabbitMQ for AI agent tasks?
Use Kafka when you need to replay or reprocess historical agent tasks — for example, when you update your agent logic and want to rerun all tasks from the past week. Kafka retains messages for a configurable period regardless of whether they have been consumed. Use RabbitMQ when you need flexible routing, priority queues, and simple task distribution without replay requirements.
How do I handle LLM rate limits in queue consumers?
Implement a token bucket or leaky bucket rate limiter in your consumer. When the rate limit is hit, reject the message with requeue so it returns to the queue and gets retried after a delay. Alternatively, use a delayed message exchange in RabbitMQ to schedule retries with exponential backoff.
What prefetch count should I set for AI agent workers?
Start with a prefetch count equal to the number of concurrent agent tasks your worker can handle based on memory. Each active agent task may hold 50 to 500 KB of conversation context. For a worker with 2 GB available, start with a prefetch of 3 to 5 and adjust based on observed memory usage and LLM API concurrency limits.
#MessageQueues #RabbitMQ #Kafka #AIAgents #DistributedSystems #SQS #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.