Distributed Agent Execution Across Multiple Servers: Scaling Agent Workloads Horizontally
Learn how to scale AI agent workloads across multiple servers using message queues, state synchronization, and fault tolerance patterns. Covers distributed architecture with Redis, task routing, and agent recovery in Python.
When Single-Server Agent Execution Hits Its Limits
A single server can comfortably run 10-20 concurrent agents. Beyond that, you hit CPU limits from LLM inference orchestration, memory pressure from context windows, and I/O bottlenecks from tool execution. Distributed agent execution solves this by spreading agent workloads across a cluster of servers, connected through message queues and a shared state layer.
This is not theoretical — any production agent system handling more than a few dozen concurrent tasks needs horizontal scaling. This guide shows you how to architect it properly.
Distributed Architecture Overview
The architecture has four components:
- Task Router — Receives agent tasks and distributes them to worker nodes
- Worker Nodes — Execute agents with local compute resources
- State Store — Centralized state accessible by all workers (Redis)
- Health Monitor — Tracks worker health and reassigns failed tasks
Client Requests
|
v
[Task Router]
|
+---> [Worker Node 1] --+
| |
+---> [Worker Node 2] --+--> [Redis State Store]
| |
+---> [Worker Node 3] --+
|
v
[Health Monitor]
Task Definition and Routing
import asyncio
import json
import time
import uuid
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, List, Optional
from enum import Enum
class TaskState(Enum):
QUEUED = "queued"
ASSIGNED = "assigned"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
RETRYING = "retrying"
@dataclass
class AgentTask:
task_id: str = field(default_factory=lambda: str(uuid.uuid4()))
agent_type: str = ""
payload: Dict[str, Any] = field(default_factory=dict)
state: TaskState = TaskState.QUEUED
assigned_worker: Optional[str] = None
created_at: float = field(default_factory=time.time)
started_at: Optional[float] = None
completed_at: Optional[float] = None
result: Optional[Dict] = None
retry_count: int = 0
max_retries: int = 3
timeout_seconds: float = 300.0
def to_json(self) -> str:
d = asdict(self)
d["state"] = self.state.value
return json.dumps(d)
@classmethod
def from_json(cls, data: str) -> "AgentTask":
d = json.loads(data)
d["state"] = TaskState(d["state"])
return cls(**d)
Redis-Backed Task Queue
Redis serves as both the task queue and the shared state store. Using Redis lists for queuing gives us atomic push/pop operations and persistence.
import redis.asyncio as redis
class DistributedTaskQueue:
def __init__(self, redis_url: str = "redis://localhost:6379/0"):
self.redis = redis.from_url(redis_url)
self.queue_key = "agent:tasks:queue"
self.state_prefix = "agent:tasks:state:"
self.worker_prefix = "agent:workers:"
async def submit_task(self, task: AgentTask) -> str:
# Store task state
await self.redis.set(
f"{self.state_prefix}{task.task_id}",
task.to_json(),
ex=3600, # 1 hour TTL
)
# Push to queue
await self.redis.lpush(self.queue_key, task.task_id)
return task.task_id
async def claim_task(
self, worker_id: str, timeout: float = 5.0
) -> Optional[AgentTask]:
# Blocking pop — waits for a task to appear
result = await self.redis.brpop(
self.queue_key, timeout=int(timeout)
)
if not result:
return None
task_id = result[1].decode()
task_data = await self.redis.get(f"{self.state_prefix}{task_id}")
if not task_data:
return None
task = AgentTask.from_json(task_data)
task.state = TaskState.ASSIGNED
task.assigned_worker = worker_id
task.started_at = time.time()
await self.redis.set(
f"{self.state_prefix}{task_id}", task.to_json(), ex=3600
)
return task
async def complete_task(
self, task_id: str, result: Dict
):
task_data = await self.redis.get(f"{self.state_prefix}{task_id}")
if not task_data:
return
task = AgentTask.from_json(task_data)
task.state = TaskState.COMPLETED
task.completed_at = time.time()
task.result = result
await self.redis.set(
f"{self.state_prefix}{task_id}", task.to_json(), ex=3600
)
async def fail_task(self, task_id: str, error: str):
task_data = await self.redis.get(f"{self.state_prefix}{task_id}")
if not task_data:
return
task = AgentTask.from_json(task_data)
task.retry_count += 1
if task.retry_count <= task.max_retries:
task.state = TaskState.RETRYING
task.assigned_worker = None
await self.redis.set(
f"{self.state_prefix}{task_id}", task.to_json(), ex=3600
)
await self.redis.lpush(self.queue_key, task_id)
else:
task.state = TaskState.FAILED
task.result = {"error": error}
await self.redis.set(
f"{self.state_prefix}{task_id}", task.to_json(), ex=3600
)
Worker Node Implementation
Each worker node runs an event loop that claims tasks, executes agents, and reports results.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
class WorkerNode:
def __init__(
self,
worker_id: str,
queue: DistributedTaskQueue,
max_concurrent: int = 5,
):
self.worker_id = worker_id
self.queue = queue
self.max_concurrent = max_concurrent
self.active_tasks: Dict[str, asyncio.Task] = {}
self._running = False
async def start(self):
self._running = True
await self._register()
while self._running:
if len(self.active_tasks) >= self.max_concurrent:
await asyncio.sleep(0.5)
continue
task = await self.queue.claim_task(self.worker_id)
if task:
coro = asyncio.create_task(self._execute(task))
self.active_tasks[task.task_id] = coro
coro.add_done_callback(
lambda _, tid=task.task_id: self.active_tasks.pop(tid, None)
)
async def _register(self):
await self.queue.redis.hset(
f"{self.queue.worker_prefix}{self.worker_id}",
mapping={
"status": "active",
"max_concurrent": str(self.max_concurrent),
"last_heartbeat": str(time.time()),
},
)
async def _heartbeat(self):
while self._running:
await self.queue.redis.hset(
f"{self.queue.worker_prefix}{self.worker_id}",
"last_heartbeat",
str(time.time()),
)
await asyncio.sleep(10)
async def _execute(self, task: AgentTask):
try:
# Route to the appropriate agent executor
result = await self._run_agent(task)
await self.queue.complete_task(task.task_id, result)
except Exception as e:
await self.queue.fail_task(task.task_id, str(e))
async def _run_agent(self, task: AgentTask) -> Dict:
# Replace with actual agent execution logic
await asyncio.sleep(2) # Simulate work
return {"output": f"Processed by {self.worker_id}"}
Health Monitor and Task Recovery
The health monitor detects dead workers and reassigns their tasks.
class HealthMonitor:
def __init__(
self,
queue: DistributedTaskQueue,
heartbeat_timeout: float = 30.0,
):
self.queue = queue
self.heartbeat_timeout = heartbeat_timeout
async def monitor(self):
while True:
await self._check_workers()
await asyncio.sleep(10)
async def _check_workers(self):
keys = await self.queue.redis.keys(
f"{self.queue.worker_prefix}*"
)
for key in keys:
worker_data = await self.queue.redis.hgetall(key)
last_hb = float(worker_data.get(b"last_heartbeat", b"0"))
if time.time() - last_hb > self.heartbeat_timeout:
worker_id = key.decode().split(":")[-1]
await self._recover_worker_tasks(worker_id)
await self.queue.redis.delete(key)
async def _recover_worker_tasks(self, worker_id: str):
# Scan for tasks assigned to dead worker and re-queue them
keys = await self.queue.redis.keys(f"{self.queue.state_prefix}*")
for key in keys:
data = await self.queue.redis.get(key)
if not data:
continue
task = AgentTask.from_json(data)
if (
task.assigned_worker == worker_id
and task.state in (TaskState.ASSIGNED, TaskState.RUNNING)
):
task.state = TaskState.QUEUED
task.assigned_worker = None
await self.queue.redis.set(key, task.to_json(), ex=3600)
await self.queue.redis.lpush(
self.queue.queue_key, task.task_id
)
print(f"Recovered task {task.task_id} from dead worker")
Launching the Distributed System
async def main():
queue = DistributedTaskQueue("redis://localhost:6379/0")
# Start workers (in production, each runs on a different server)
workers = [
WorkerNode(f"worker_{i}", queue, max_concurrent=5)
for i in range(3)
]
monitor = HealthMonitor(queue)
# Submit tasks
for i in range(20):
task = AgentTask(agent_type="research", payload={"query": f"Topic {i}"})
await queue.submit_task(task)
# Run everything concurrently
await asyncio.gather(
*[w.start() for w in workers],
monitor.monitor(),
)
FAQ
Why use Redis instead of RabbitMQ or Kafka for the task queue?
Redis is the simplest option that handles both queuing (via lists with BRPOP) and state storage (via key-value pairs) in a single system. RabbitMQ adds robust delivery guarantees and routing, which matters when you have hundreds of workers. Kafka is overkill for most agent systems unless you need event replay and stream processing. Start with Redis and migrate if you hit its limits.
How do I handle tasks that exceed the timeout?
The health monitor should also check for tasks that have been in RUNNING state longer than their timeout_seconds. When detected, mark the task as failed and re-queue it (up to max_retries). On the worker side, wrap agent execution in asyncio.wait_for() to enforce the timeout locally.
What about data locality — should I route tasks to specific workers?
Yes, if your agents need large local resources (embeddings, model weights, cached data). Add a routing key to AgentTask and implement consistent hashing in the task router so tasks of the same type always go to the same worker subset. This maximizes cache hit rates.
#DistributedAgents #HorizontalScaling #MessageQueues #FaultTolerance #RedisQueues #AgenticAI #PythonAI #AgentInfrastructure
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.