Skip to content
Learn Agentic AI14 min read0 views

Distributed Agent Execution Across Multiple Servers: Scaling Agent Workloads Horizontally

Learn how to scale AI agent workloads across multiple servers using message queues, state synchronization, and fault tolerance patterns. Covers distributed architecture with Redis, task routing, and agent recovery in Python.

When Single-Server Agent Execution Hits Its Limits

A single server can comfortably run 10-20 concurrent agents. Beyond that, you hit CPU limits from LLM inference orchestration, memory pressure from context windows, and I/O bottlenecks from tool execution. Distributed agent execution solves this by spreading agent workloads across a cluster of servers, connected through message queues and a shared state layer.

This is not theoretical — any production agent system handling more than a few dozen concurrent tasks needs horizontal scaling. This guide shows you how to architect it properly.

Distributed Architecture Overview

The architecture has four components:

  1. Task Router — Receives agent tasks and distributes them to worker nodes
  2. Worker Nodes — Execute agents with local compute resources
  3. State Store — Centralized state accessible by all workers (Redis)
  4. Health Monitor — Tracks worker health and reassigns failed tasks
Client Requests
      |
      v
[Task Router]
      |
      +---> [Worker Node 1] --+
      |                       |
      +---> [Worker Node 2] --+--> [Redis State Store]
      |                       |
      +---> [Worker Node 3] --+
      |
      v
[Health Monitor]

Task Definition and Routing

import asyncio
import json
import time
import uuid
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, List, Optional
from enum import Enum

class TaskState(Enum):
    QUEUED = "queued"
    ASSIGNED = "assigned"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRYING = "retrying"

@dataclass
class AgentTask:
    task_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    agent_type: str = ""
    payload: Dict[str, Any] = field(default_factory=dict)
    state: TaskState = TaskState.QUEUED
    assigned_worker: Optional[str] = None
    created_at: float = field(default_factory=time.time)
    started_at: Optional[float] = None
    completed_at: Optional[float] = None
    result: Optional[Dict] = None
    retry_count: int = 0
    max_retries: int = 3
    timeout_seconds: float = 300.0

    def to_json(self) -> str:
        d = asdict(self)
        d["state"] = self.state.value
        return json.dumps(d)

    @classmethod
    def from_json(cls, data: str) -> "AgentTask":
        d = json.loads(data)
        d["state"] = TaskState(d["state"])
        return cls(**d)

Redis-Backed Task Queue

Redis serves as both the task queue and the shared state store. Using Redis lists for queuing gives us atomic push/pop operations and persistence.

import redis.asyncio as redis

class DistributedTaskQueue:
    def __init__(self, redis_url: str = "redis://localhost:6379/0"):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "agent:tasks:queue"
        self.state_prefix = "agent:tasks:state:"
        self.worker_prefix = "agent:workers:"

    async def submit_task(self, task: AgentTask) -> str:
        # Store task state
        await self.redis.set(
            f"{self.state_prefix}{task.task_id}",
            task.to_json(),
            ex=3600,  # 1 hour TTL
        )
        # Push to queue
        await self.redis.lpush(self.queue_key, task.task_id)
        return task.task_id

    async def claim_task(
        self, worker_id: str, timeout: float = 5.0
    ) -> Optional[AgentTask]:
        # Blocking pop — waits for a task to appear
        result = await self.redis.brpop(
            self.queue_key, timeout=int(timeout)
        )
        if not result:
            return None

        task_id = result[1].decode()
        task_data = await self.redis.get(f"{self.state_prefix}{task_id}")
        if not task_data:
            return None

        task = AgentTask.from_json(task_data)
        task.state = TaskState.ASSIGNED
        task.assigned_worker = worker_id
        task.started_at = time.time()

        await self.redis.set(
            f"{self.state_prefix}{task_id}", task.to_json(), ex=3600
        )
        return task

    async def complete_task(
        self, task_id: str, result: Dict
    ):
        task_data = await self.redis.get(f"{self.state_prefix}{task_id}")
        if not task_data:
            return
        task = AgentTask.from_json(task_data)
        task.state = TaskState.COMPLETED
        task.completed_at = time.time()
        task.result = result
        await self.redis.set(
            f"{self.state_prefix}{task_id}", task.to_json(), ex=3600
        )

    async def fail_task(self, task_id: str, error: str):
        task_data = await self.redis.get(f"{self.state_prefix}{task_id}")
        if not task_data:
            return
        task = AgentTask.from_json(task_data)
        task.retry_count += 1

        if task.retry_count <= task.max_retries:
            task.state = TaskState.RETRYING
            task.assigned_worker = None
            await self.redis.set(
                f"{self.state_prefix}{task_id}", task.to_json(), ex=3600
            )
            await self.redis.lpush(self.queue_key, task_id)
        else:
            task.state = TaskState.FAILED
            task.result = {"error": error}
            await self.redis.set(
                f"{self.state_prefix}{task_id}", task.to_json(), ex=3600
            )

Worker Node Implementation

Each worker node runs an event loop that claims tasks, executes agents, and reports results.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

class WorkerNode:
    def __init__(
        self,
        worker_id: str,
        queue: DistributedTaskQueue,
        max_concurrent: int = 5,
    ):
        self.worker_id = worker_id
        self.queue = queue
        self.max_concurrent = max_concurrent
        self.active_tasks: Dict[str, asyncio.Task] = {}
        self._running = False

    async def start(self):
        self._running = True
        await self._register()
        while self._running:
            if len(self.active_tasks) >= self.max_concurrent:
                await asyncio.sleep(0.5)
                continue
            task = await self.queue.claim_task(self.worker_id)
            if task:
                coro = asyncio.create_task(self._execute(task))
                self.active_tasks[task.task_id] = coro
                coro.add_done_callback(
                    lambda _, tid=task.task_id: self.active_tasks.pop(tid, None)
                )

    async def _register(self):
        await self.queue.redis.hset(
            f"{self.queue.worker_prefix}{self.worker_id}",
            mapping={
                "status": "active",
                "max_concurrent": str(self.max_concurrent),
                "last_heartbeat": str(time.time()),
            },
        )

    async def _heartbeat(self):
        while self._running:
            await self.queue.redis.hset(
                f"{self.queue.worker_prefix}{self.worker_id}",
                "last_heartbeat",
                str(time.time()),
            )
            await asyncio.sleep(10)

    async def _execute(self, task: AgentTask):
        try:
            # Route to the appropriate agent executor
            result = await self._run_agent(task)
            await self.queue.complete_task(task.task_id, result)
        except Exception as e:
            await self.queue.fail_task(task.task_id, str(e))

    async def _run_agent(self, task: AgentTask) -> Dict:
        # Replace with actual agent execution logic
        await asyncio.sleep(2)  # Simulate work
        return {"output": f"Processed by {self.worker_id}"}

Health Monitor and Task Recovery

The health monitor detects dead workers and reassigns their tasks.

class HealthMonitor:
    def __init__(
        self,
        queue: DistributedTaskQueue,
        heartbeat_timeout: float = 30.0,
    ):
        self.queue = queue
        self.heartbeat_timeout = heartbeat_timeout

    async def monitor(self):
        while True:
            await self._check_workers()
            await asyncio.sleep(10)

    async def _check_workers(self):
        keys = await self.queue.redis.keys(
            f"{self.queue.worker_prefix}*"
        )
        for key in keys:
            worker_data = await self.queue.redis.hgetall(key)
            last_hb = float(worker_data.get(b"last_heartbeat", b"0"))
            if time.time() - last_hb > self.heartbeat_timeout:
                worker_id = key.decode().split(":")[-1]
                await self._recover_worker_tasks(worker_id)
                await self.queue.redis.delete(key)

    async def _recover_worker_tasks(self, worker_id: str):
        # Scan for tasks assigned to dead worker and re-queue them
        keys = await self.queue.redis.keys(f"{self.queue.state_prefix}*")
        for key in keys:
            data = await self.queue.redis.get(key)
            if not data:
                continue
            task = AgentTask.from_json(data)
            if (
                task.assigned_worker == worker_id
                and task.state in (TaskState.ASSIGNED, TaskState.RUNNING)
            ):
                task.state = TaskState.QUEUED
                task.assigned_worker = None
                await self.queue.redis.set(key, task.to_json(), ex=3600)
                await self.queue.redis.lpush(
                    self.queue.queue_key, task.task_id
                )
                print(f"Recovered task {task.task_id} from dead worker")

Launching the Distributed System

async def main():
    queue = DistributedTaskQueue("redis://localhost:6379/0")

    # Start workers (in production, each runs on a different server)
    workers = [
        WorkerNode(f"worker_{i}", queue, max_concurrent=5)
        for i in range(3)
    ]
    monitor = HealthMonitor(queue)

    # Submit tasks
    for i in range(20):
        task = AgentTask(agent_type="research", payload={"query": f"Topic {i}"})
        await queue.submit_task(task)

    # Run everything concurrently
    await asyncio.gather(
        *[w.start() for w in workers],
        monitor.monitor(),
    )

FAQ

Why use Redis instead of RabbitMQ or Kafka for the task queue?

Redis is the simplest option that handles both queuing (via lists with BRPOP) and state storage (via key-value pairs) in a single system. RabbitMQ adds robust delivery guarantees and routing, which matters when you have hundreds of workers. Kafka is overkill for most agent systems unless you need event replay and stream processing. Start with Redis and migrate if you hit its limits.

How do I handle tasks that exceed the timeout?

The health monitor should also check for tasks that have been in RUNNING state longer than their timeout_seconds. When detected, mark the task as failed and re-queue it (up to max_retries). On the worker side, wrap agent execution in asyncio.wait_for() to enforce the timeout locally.

What about data locality — should I route tasks to specific workers?

Yes, if your agents need large local resources (embeddings, model weights, cached data). Add a routing key to AgentTask and implement consistent hashing in the task router so tasks of the same type always go to the same worker subset. This maximizes cache hit rates.


#DistributedAgents #HorizontalScaling #MessageQueues #FaultTolerance #RedisQueues #AgenticAI #PythonAI #AgentInfrastructure

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.