Skip to content
Learn Agentic AI15 min read0 views

Building an Async Agent Worker Pool: Concurrent Session Processing at Scale

Design and implement an async worker pool for processing concurrent AI agent sessions. Learn health monitoring, dynamic scaling, graceful drain, and production deployment patterns.

Why Worker Pools for AI Agents

A single async event loop can handle hundreds of concurrent agent sessions. But without structure, you get uncontrolled resource usage: unbounded memory growth, connection exhaustion, and cascading failures when one overloaded component drags down everything else.

A worker pool provides bounded concurrency, health monitoring, and graceful lifecycle management. Each worker processes one agent session at a time, and the pool controls how many workers run simultaneously. This architecture is the foundation for production AI agent deployments.

Core Worker Pool Design

import asyncio
import uuid
import time
from dataclasses import dataclass, field
from enum import Enum

class WorkerState(Enum):
    IDLE = "idle"
    BUSY = "busy"
    DRAINING = "draining"
    STOPPED = "stopped"

@dataclass
class WorkerStats:
    tasks_completed: int = 0
    tasks_failed: int = 0
    total_processing_time: float = 0.0
    last_heartbeat: float = field(default_factory=time.monotonic)

@dataclass
class AgentJob:
    job_id: str
    session_id: str
    payload: dict
    created_at: float = field(default_factory=time.monotonic)

class AgentWorker:
    """Individual worker that processes agent sessions."""

    def __init__(self, worker_id: str, agent_factory):
        self.worker_id = worker_id
        self.state = WorkerState.IDLE
        self.stats = WorkerStats()
        self._agent_factory = agent_factory
        self._current_job: AgentJob | None = None

    async def process(self, job: AgentJob) -> dict:
        """Process a single agent job."""
        self.state = WorkerState.BUSY
        self._current_job = job
        start = time.monotonic()

        try:
            agent = self._agent_factory()
            result = await agent.run(
                session_id=job.session_id,
                payload=job.payload,
            )
            self.stats.tasks_completed += 1
            return {"status": "success", "result": result}

        except Exception as e:
            self.stats.tasks_failed += 1
            return {"status": "error", "error": str(e)}

        finally:
            elapsed = time.monotonic() - start
            self.stats.total_processing_time += elapsed
            self.stats.last_heartbeat = time.monotonic()
            self._current_job = None
            if self.state != WorkerState.DRAINING:
                self.state = WorkerState.IDLE

The Worker Pool Manager

The pool manages a fixed number of workers and routes incoming jobs through a bounded queue.

class AgentWorkerPool:
    """Pool of async workers for concurrent agent processing."""

    def __init__(
        self,
        num_workers: int = 10,
        max_queue_size: int = 100,
        agent_factory=None,
    ):
        self.num_workers = num_workers
        self.max_queue_size = max_queue_size
        self._agent_factory = agent_factory
        self._queue: asyncio.Queue[AgentJob | None] = asyncio.Queue(
            maxsize=max_queue_size
        )
        self._workers: list[AgentWorker] = []
        self._worker_tasks: list[asyncio.Task] = []
        self._results: dict[str, asyncio.Future] = {}
        self._running = False

    async def start(self):
        """Start all workers in the pool."""
        self._running = True
        for i in range(self.num_workers):
            worker = AgentWorker(
                worker_id=f"worker-{i:03d}",
                agent_factory=self._agent_factory,
            )
            self._workers.append(worker)
            task = asyncio.create_task(
                self._worker_loop(worker),
                name=f"worker-{i:03d}",
            )
            self._worker_tasks.append(task)
        print(f"Worker pool started with {self.num_workers} workers")

    async def _worker_loop(self, worker: AgentWorker):
        """Main loop for a single worker."""
        while self._running or not self._queue.empty():
            try:
                job = await asyncio.wait_for(
                    self._queue.get(), timeout=1.0
                )
            except TimeoutError:
                worker.stats.last_heartbeat = time.monotonic()
                continue

            if job is None:  # Shutdown sentinel
                break

            result = await worker.process(job)

            # Deliver result to the waiting caller
            if job.job_id in self._results:
                self._results[job.job_id].set_result(result)

            self._queue.task_done()

        worker.state = WorkerState.STOPPED

    async def submit(self, session_id: str, payload: dict) -> dict:
        """Submit a job and wait for the result."""
        job = AgentJob(
            job_id=str(uuid.uuid4()),
            session_id=session_id,
            payload=payload,
        )

        # Create a future to receive the result
        future = asyncio.get_running_loop().create_future()
        self._results[job.job_id] = future

        try:
            await self._queue.put(job)
            result = await future
            return result
        finally:
            self._results.pop(job.job_id, None)

    async def submit_nowait(
        self, session_id: str, payload: dict
    ) -> str:
        """Submit a job without waiting. Returns job_id."""
        job = AgentJob(
            job_id=str(uuid.uuid4()),
            session_id=session_id,
            payload=payload,
        )
        self._queue.put_nowait(job)  # Raises QueueFull if at capacity
        return job.job_id

Health Monitoring

Monitor worker health to detect stuck tasks and unhealthy workers.

    async def health_check(self) -> dict:
        """Return pool health metrics."""
        now = time.monotonic()
        stale_threshold = 120.0  # 2 minutes without heartbeat

        worker_states = {}
        stale_workers = []
        for worker in self._workers:
            worker_states[worker.worker_id] = {
                "state": worker.state.value,
                "completed": worker.stats.tasks_completed,
                "failed": worker.stats.tasks_failed,
                "avg_time": (
                    worker.stats.total_processing_time
                    / max(worker.stats.tasks_completed, 1)
                ),
                "last_heartbeat_ago": now - worker.stats.last_heartbeat,
            }
            if now - worker.stats.last_heartbeat > stale_threshold:
                stale_workers.append(worker.worker_id)

        return {
            "pool_size": self.num_workers,
            "queue_size": self._queue.qsize(),
            "queue_capacity": self.max_queue_size,
            "utilization": sum(
                1 for w in self._workers
                if w.state == WorkerState.BUSY
            ) / self.num_workers,
            "total_completed": sum(
                w.stats.tasks_completed for w in self._workers
            ),
            "total_failed": sum(
                w.stats.tasks_failed for w in self._workers
            ),
            "stale_workers": stale_workers,
            "workers": worker_states,
        }

Dynamic Scaling

Adjust pool size based on queue depth and worker utilization.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

    async def auto_scale(
        self,
        min_workers: int = 2,
        max_workers: int = 50,
        scale_up_threshold: float = 0.8,
        scale_down_threshold: float = 0.3,
        check_interval: float = 10.0,
    ):
        """Automatically scale workers based on load."""
        while self._running:
            await asyncio.sleep(check_interval)

            health = await self.health_check()
            utilization = health["utilization"]
            queue_ratio = (
                health["queue_size"] / health["queue_capacity"]
            )

            if (utilization > scale_up_threshold
                    or queue_ratio > 0.5):
                # Scale up
                new_count = min(
                    max_workers,
                    self.num_workers + max(1, self.num_workers // 4),
                )
                if new_count > self.num_workers:
                    await self._add_workers(
                        new_count - self.num_workers
                    )
                    print(f"Scaled up to {self.num_workers} workers")

            elif (utilization < scale_down_threshold
                  and self.num_workers > min_workers):
                # Scale down
                new_count = max(
                    min_workers,
                    self.num_workers - max(1, self.num_workers // 4),
                )
                if new_count < self.num_workers:
                    await self._remove_workers(
                        self.num_workers - new_count
                    )
                    print(f"Scaled down to {self.num_workers} workers")

    async def _add_workers(self, count: int):
        """Add workers to the pool."""
        for _ in range(count):
            worker = AgentWorker(
                worker_id=f"worker-{len(self._workers):03d}",
                agent_factory=self._agent_factory,
            )
            self._workers.append(worker)
            task = asyncio.create_task(self._worker_loop(worker))
            self._worker_tasks.append(task)
            self.num_workers += 1

    async def _remove_workers(self, count: int):
        """Signal idle workers to drain and stop."""
        removed = 0
        for worker in reversed(self._workers):
            if removed >= count:
                break
            if worker.state == WorkerState.IDLE:
                worker.state = WorkerState.DRAINING
                await self._queue.put(None)  # Sentinel to stop
                removed += 1
                self.num_workers -= 1

Graceful Drain

When shutting down, finish in-flight jobs before stopping.

    async def drain(self, timeout: float = 60.0):
        """Gracefully drain and shut down the pool."""
        print("Draining worker pool...")
        self._running = False

        # Send shutdown sentinels for all workers
        for _ in self._workers:
            await self._queue.put(None)

        # Wait for all workers to finish
        try:
            async with asyncio.timeout(timeout):
                await asyncio.gather(
                    *self._worker_tasks,
                    return_exceptions=True,
                )
        except TimeoutError:
            print(f"Drain timeout. Cancelling remaining tasks.")
            for task in self._worker_tasks:
                if not task.done():
                    task.cancel()

        final_health = await self.health_check()
        print(
            f"Pool drained. Completed: {final_health['total_completed']}, "
            f"Failed: {final_health['total_failed']}"
        )

Integrating with FastAPI

from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException

pool: AgentWorkerPool | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global pool
    pool = AgentWorkerPool(
        num_workers=10,
        max_queue_size=200,
        agent_factory=create_agent,
    )
    await pool.start()
    # Start auto-scaler in background
    scaler = asyncio.create_task(pool.auto_scale())
    yield
    scaler.cancel()
    await pool.drain(timeout=30.0)

app = FastAPI(lifespan=lifespan)

@app.post("/api/agent/run")
async def run_agent(session_id: str, query: str):
    try:
        result = await pool.submit(
            session_id=session_id,
            payload={"query": query},
        )
        return result
    except asyncio.QueueFull:
        raise HTTPException(
            status_code=503,
            detail="Agent pool at capacity. Retry later.",
        )

@app.get("/api/agent/health")
async def agent_health():
    return await pool.health_check()

FAQ

How do I choose the right number of workers?

Start with workers equal to your expected concurrent sessions. If each agent session takes 5 seconds and you expect 20 requests per second peak, you need at least 100 workers. Monitor utilization in production: sustained above 80% means you need more workers or faster processing. Below 30% means you are over-provisioned.

What happens when the job queue is full?

With submit_nowait, an asyncio.QueueFull exception is raised immediately, which you should translate to an HTTP 503 (Service Unavailable) with a Retry-After header. With submit, the call blocks until queue space opens. Always set a reasonable max_queue_size to prevent unbounded memory growth during traffic spikes.

How is this different from just using asyncio.Semaphore?

A semaphore limits concurrency but provides no structure. A worker pool adds job queuing, health monitoring, per-worker statistics, dynamic scaling, and graceful shutdown. For simple scripts, a semaphore is fine. For production services processing thousands of agent sessions, the worker pool architecture is essential for observability and reliability.


#Python #WorkerPool #Asyncio #Scaling #Production #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.