Building an Async Agent Worker Pool: Concurrent Session Processing at Scale
Design and implement an async worker pool for processing concurrent AI agent sessions. Learn health monitoring, dynamic scaling, graceful drain, and production deployment patterns.
Why Worker Pools for AI Agents
A single async event loop can handle hundreds of concurrent agent sessions. But without structure, you get uncontrolled resource usage: unbounded memory growth, connection exhaustion, and cascading failures when one overloaded component drags down everything else.
A worker pool provides bounded concurrency, health monitoring, and graceful lifecycle management. Each worker processes one agent session at a time, and the pool controls how many workers run simultaneously. This architecture is the foundation for production AI agent deployments.
Core Worker Pool Design
import asyncio
import uuid
import time
from dataclasses import dataclass, field
from enum import Enum
class WorkerState(Enum):
IDLE = "idle"
BUSY = "busy"
DRAINING = "draining"
STOPPED = "stopped"
@dataclass
class WorkerStats:
tasks_completed: int = 0
tasks_failed: int = 0
total_processing_time: float = 0.0
last_heartbeat: float = field(default_factory=time.monotonic)
@dataclass
class AgentJob:
job_id: str
session_id: str
payload: dict
created_at: float = field(default_factory=time.monotonic)
class AgentWorker:
"""Individual worker that processes agent sessions."""
def __init__(self, worker_id: str, agent_factory):
self.worker_id = worker_id
self.state = WorkerState.IDLE
self.stats = WorkerStats()
self._agent_factory = agent_factory
self._current_job: AgentJob | None = None
async def process(self, job: AgentJob) -> dict:
"""Process a single agent job."""
self.state = WorkerState.BUSY
self._current_job = job
start = time.monotonic()
try:
agent = self._agent_factory()
result = await agent.run(
session_id=job.session_id,
payload=job.payload,
)
self.stats.tasks_completed += 1
return {"status": "success", "result": result}
except Exception as e:
self.stats.tasks_failed += 1
return {"status": "error", "error": str(e)}
finally:
elapsed = time.monotonic() - start
self.stats.total_processing_time += elapsed
self.stats.last_heartbeat = time.monotonic()
self._current_job = None
if self.state != WorkerState.DRAINING:
self.state = WorkerState.IDLE
The Worker Pool Manager
The pool manages a fixed number of workers and routes incoming jobs through a bounded queue.
class AgentWorkerPool:
"""Pool of async workers for concurrent agent processing."""
def __init__(
self,
num_workers: int = 10,
max_queue_size: int = 100,
agent_factory=None,
):
self.num_workers = num_workers
self.max_queue_size = max_queue_size
self._agent_factory = agent_factory
self._queue: asyncio.Queue[AgentJob | None] = asyncio.Queue(
maxsize=max_queue_size
)
self._workers: list[AgentWorker] = []
self._worker_tasks: list[asyncio.Task] = []
self._results: dict[str, asyncio.Future] = {}
self._running = False
async def start(self):
"""Start all workers in the pool."""
self._running = True
for i in range(self.num_workers):
worker = AgentWorker(
worker_id=f"worker-{i:03d}",
agent_factory=self._agent_factory,
)
self._workers.append(worker)
task = asyncio.create_task(
self._worker_loop(worker),
name=f"worker-{i:03d}",
)
self._worker_tasks.append(task)
print(f"Worker pool started with {self.num_workers} workers")
async def _worker_loop(self, worker: AgentWorker):
"""Main loop for a single worker."""
while self._running or not self._queue.empty():
try:
job = await asyncio.wait_for(
self._queue.get(), timeout=1.0
)
except TimeoutError:
worker.stats.last_heartbeat = time.monotonic()
continue
if job is None: # Shutdown sentinel
break
result = await worker.process(job)
# Deliver result to the waiting caller
if job.job_id in self._results:
self._results[job.job_id].set_result(result)
self._queue.task_done()
worker.state = WorkerState.STOPPED
async def submit(self, session_id: str, payload: dict) -> dict:
"""Submit a job and wait for the result."""
job = AgentJob(
job_id=str(uuid.uuid4()),
session_id=session_id,
payload=payload,
)
# Create a future to receive the result
future = asyncio.get_running_loop().create_future()
self._results[job.job_id] = future
try:
await self._queue.put(job)
result = await future
return result
finally:
self._results.pop(job.job_id, None)
async def submit_nowait(
self, session_id: str, payload: dict
) -> str:
"""Submit a job without waiting. Returns job_id."""
job = AgentJob(
job_id=str(uuid.uuid4()),
session_id=session_id,
payload=payload,
)
self._queue.put_nowait(job) # Raises QueueFull if at capacity
return job.job_id
Health Monitoring
Monitor worker health to detect stuck tasks and unhealthy workers.
async def health_check(self) -> dict:
"""Return pool health metrics."""
now = time.monotonic()
stale_threshold = 120.0 # 2 minutes without heartbeat
worker_states = {}
stale_workers = []
for worker in self._workers:
worker_states[worker.worker_id] = {
"state": worker.state.value,
"completed": worker.stats.tasks_completed,
"failed": worker.stats.tasks_failed,
"avg_time": (
worker.stats.total_processing_time
/ max(worker.stats.tasks_completed, 1)
),
"last_heartbeat_ago": now - worker.stats.last_heartbeat,
}
if now - worker.stats.last_heartbeat > stale_threshold:
stale_workers.append(worker.worker_id)
return {
"pool_size": self.num_workers,
"queue_size": self._queue.qsize(),
"queue_capacity": self.max_queue_size,
"utilization": sum(
1 for w in self._workers
if w.state == WorkerState.BUSY
) / self.num_workers,
"total_completed": sum(
w.stats.tasks_completed for w in self._workers
),
"total_failed": sum(
w.stats.tasks_failed for w in self._workers
),
"stale_workers": stale_workers,
"workers": worker_states,
}
Dynamic Scaling
Adjust pool size based on queue depth and worker utilization.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
async def auto_scale(
self,
min_workers: int = 2,
max_workers: int = 50,
scale_up_threshold: float = 0.8,
scale_down_threshold: float = 0.3,
check_interval: float = 10.0,
):
"""Automatically scale workers based on load."""
while self._running:
await asyncio.sleep(check_interval)
health = await self.health_check()
utilization = health["utilization"]
queue_ratio = (
health["queue_size"] / health["queue_capacity"]
)
if (utilization > scale_up_threshold
or queue_ratio > 0.5):
# Scale up
new_count = min(
max_workers,
self.num_workers + max(1, self.num_workers // 4),
)
if new_count > self.num_workers:
await self._add_workers(
new_count - self.num_workers
)
print(f"Scaled up to {self.num_workers} workers")
elif (utilization < scale_down_threshold
and self.num_workers > min_workers):
# Scale down
new_count = max(
min_workers,
self.num_workers - max(1, self.num_workers // 4),
)
if new_count < self.num_workers:
await self._remove_workers(
self.num_workers - new_count
)
print(f"Scaled down to {self.num_workers} workers")
async def _add_workers(self, count: int):
"""Add workers to the pool."""
for _ in range(count):
worker = AgentWorker(
worker_id=f"worker-{len(self._workers):03d}",
agent_factory=self._agent_factory,
)
self._workers.append(worker)
task = asyncio.create_task(self._worker_loop(worker))
self._worker_tasks.append(task)
self.num_workers += 1
async def _remove_workers(self, count: int):
"""Signal idle workers to drain and stop."""
removed = 0
for worker in reversed(self._workers):
if removed >= count:
break
if worker.state == WorkerState.IDLE:
worker.state = WorkerState.DRAINING
await self._queue.put(None) # Sentinel to stop
removed += 1
self.num_workers -= 1
Graceful Drain
When shutting down, finish in-flight jobs before stopping.
async def drain(self, timeout: float = 60.0):
"""Gracefully drain and shut down the pool."""
print("Draining worker pool...")
self._running = False
# Send shutdown sentinels for all workers
for _ in self._workers:
await self._queue.put(None)
# Wait for all workers to finish
try:
async with asyncio.timeout(timeout):
await asyncio.gather(
*self._worker_tasks,
return_exceptions=True,
)
except TimeoutError:
print(f"Drain timeout. Cancelling remaining tasks.")
for task in self._worker_tasks:
if not task.done():
task.cancel()
final_health = await self.health_check()
print(
f"Pool drained. Completed: {final_health['total_completed']}, "
f"Failed: {final_health['total_failed']}"
)
Integrating with FastAPI
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
pool: AgentWorkerPool | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global pool
pool = AgentWorkerPool(
num_workers=10,
max_queue_size=200,
agent_factory=create_agent,
)
await pool.start()
# Start auto-scaler in background
scaler = asyncio.create_task(pool.auto_scale())
yield
scaler.cancel()
await pool.drain(timeout=30.0)
app = FastAPI(lifespan=lifespan)
@app.post("/api/agent/run")
async def run_agent(session_id: str, query: str):
try:
result = await pool.submit(
session_id=session_id,
payload={"query": query},
)
return result
except asyncio.QueueFull:
raise HTTPException(
status_code=503,
detail="Agent pool at capacity. Retry later.",
)
@app.get("/api/agent/health")
async def agent_health():
return await pool.health_check()
FAQ
How do I choose the right number of workers?
Start with workers equal to your expected concurrent sessions. If each agent session takes 5 seconds and you expect 20 requests per second peak, you need at least 100 workers. Monitor utilization in production: sustained above 80% means you need more workers or faster processing. Below 30% means you are over-provisioned.
What happens when the job queue is full?
With submit_nowait, an asyncio.QueueFull exception is raised immediately, which you should translate to an HTTP 503 (Service Unavailable) with a Retry-After header. With submit, the call blocks until queue space opens. Always set a reasonable max_queue_size to prevent unbounded memory growth during traffic spikes.
How is this different from just using asyncio.Semaphore?
A semaphore limits concurrency but provides no structure. A worker pool adds job queuing, health monitoring, per-worker statistics, dynamic scaling, and graceful shutdown. For simple scripts, a semaphore is fine. For production services processing thousands of agent sessions, the worker pool architecture is essential for observability and reliability.
#Python #WorkerPool #Asyncio #Scaling #Production #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.