Skip to content
Learn Agentic AI
Learn Agentic AI16 min read0 views

Scaling AI Agents to 10,000 Concurrent Users: Architecture Patterns and Load Testing

Learn how to scale agentic AI systems to handle 10,000 concurrent users with connection pooling, async processing, horizontal scaling, and k6 load testing strategies.

Why Agent Systems Break at Scale

Scaling a traditional REST API to 10,000 concurrent users is a solved problem: add stateless application servers behind a load balancer, scale the database with read replicas, and cache aggressively. Scaling an AI agent system is fundamentally harder because agents are stateful, long-running, and computationally expensive.

A single agent interaction might involve 5-15 LLM calls, each taking 1-10 seconds. The agent maintains conversational state across these calls. It holds connections to external tools, databases, and APIs. And it consumes significant memory for context windows that can exceed 100K tokens.

At 10,000 concurrent users, you are not managing 10,000 HTTP request-response cycles. You are managing 10,000 concurrent state machines, each executing multi-step workflows with variable latency and resource consumption. This post covers the architecture patterns that make this possible.

The Core Architecture: Separating Concerns

The first principle of agent scaling is separating the components that scale differently:

Gateway Layer: Handles WebSocket connections, authentication, rate limiting. Scales horizontally with minimal state.

Router Layer: Classifies incoming requests and dispatches to the appropriate agent pool. Lightweight, fast, scales easily.

Agent Worker Pool: Executes agent logic. This is the bottleneck. Each worker manages one or more agent sessions, making LLM calls and tool invocations. Scaling requires careful resource management.

State Store: Persists conversation state, agent memory, and session data. Must handle high read/write throughput with low latency.

Tool Execution Layer: Manages connections to external services, databases, and APIs. Needs connection pooling and circuit breaking.

# Agent scaling architecture with FastAPI and Redis

import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from redis.asyncio import Redis
from dataclasses import dataclass, field
from typing import Optional
import json
import uuid

@dataclass
class AgentSession:
    session_id: str
    user_id: str
    agent_type: str
    messages: list[dict] = field(default_factory=list)
    state: dict = field(default_factory=dict)
    created_at: float = 0.0
    last_active: float = 0.0

class AgentSessionManager:
    """Manages agent sessions with Redis-backed state."""

    def __init__(self, redis: Redis, ttl: int = 3600):
        self.redis = redis
        self.ttl = ttl

    async def create_session(self, user_id: str, agent_type: str) -> AgentSession:
        session = AgentSession(
            session_id=str(uuid.uuid4()),
            user_id=user_id,
            agent_type=agent_type,
        )
        await self._save(session)
        return session

    async def get_session(self, session_id: str) -> Optional[AgentSession]:
        data = await self.redis.get(f"agent:session:{session_id}")
        if not data:
            return None
        return AgentSession(**json.loads(data))

    async def update_session(self, session: AgentSession):
        await self._save(session)

    async def _save(self, session: AgentSession):
        key = f"agent:session:{session.session_id}"
        await self.redis.setex(
            key,
            self.ttl,
            json.dumps({
                "session_id": session.session_id,
                "user_id": session.user_id,
                "agent_type": session.agent_type,
                "messages": session.messages[-50:],  # Keep last 50 messages
                "state": session.state,
                "created_at": session.created_at,
                "last_active": session.last_active,
            })
        )

app = FastAPI()
redis = Redis.from_url("redis://redis-cluster:6379/0")
session_mgr = AgentSessionManager(redis)

# Connection tracking for backpressure
active_connections: dict[str, WebSocket] = {}
MAX_CONCURRENT_SESSIONS = 10000

@app.websocket("/ws/agent/{agent_type}")
async def agent_websocket(websocket: WebSocket, agent_type: str):
    if len(active_connections) >= MAX_CONCURRENT_SESSIONS:
        await websocket.close(code=1013, reason="Server at capacity")
        return

    await websocket.accept()
    session = await session_mgr.create_session(
        user_id=websocket.headers.get("x-user-id", "anonymous"),
        agent_type=agent_type
    )
    active_connections[session.session_id] = websocket

    try:
        while True:
            message = await websocket.receive_text()
            # Dispatch to agent worker pool via queue
            await redis.lpush(
                f"agent:queue:{agent_type}",
                json.dumps({
                    "session_id": session.session_id,
                    "message": message,
                })
            )
            # Wait for response on session-specific channel
            pubsub = redis.pubsub()
            await pubsub.subscribe(f"agent:response:{session.session_id}")
            async for msg in pubsub.listen():
                if msg["type"] == "message":
                    await websocket.send_text(msg["data"].decode())
                    break
            await pubsub.unsubscribe()
    except WebSocketDisconnect:
        pass
    finally:
        active_connections.pop(session.session_id, None)

Connection Pooling for LLM API Calls

The single largest bottleneck in agent scaling is LLM API calls. Each agent session makes multiple calls, and these calls are the slowest operations in the pipeline (1-10 seconds each). Without careful connection management, you will exhaust your HTTP connection pool long before you hit CPU or memory limits.

# LLM connection pool with concurrency limiting and retry logic

import httpx
import asyncio
from dataclasses import dataclass
from typing import Any

@dataclass
class LLMPoolConfig:
    max_connections: int = 200
    max_keepalive: int = 100
    timeout_seconds: float = 60.0
    max_concurrent_requests: int = 150
    retry_attempts: int = 3
    retry_backoff_base: float = 1.0

class LLMConnectionPool:
    def __init__(self, config: LLMPoolConfig):
        self.config = config
        self.semaphore = asyncio.Semaphore(config.max_concurrent_requests)
        self.client = httpx.AsyncClient(
            limits=httpx.Limits(
                max_connections=config.max_connections,
                max_keepalive_connections=config.max_keepalive,
            ),
            timeout=httpx.Timeout(config.timeout_seconds),
        )
        self._request_count = 0
        self._error_count = 0

    async def chat_completion(
        self, messages: list[dict], model: str, **kwargs
    ) -> dict:
        async with self.semaphore:
            self._request_count += 1
            for attempt in range(self.config.retry_attempts):
                try:
                    response = await self.client.post(
                        "https://api.anthropic.com/v1/messages",
                        json={
                            "model": model,
                            "messages": messages,
                            "max_tokens": kwargs.get("max_tokens", 4096),
                            **kwargs,
                        },
                        headers={
                            "x-api-key": self._get_api_key(),
                            "anthropic-version": "2023-06-01",
                        },
                    )

                    if response.status_code == 429:
                        # Rate limited: exponential backoff
                        wait = self.config.retry_backoff_base * (2 ** attempt)
                        await asyncio.sleep(wait)
                        continue

                    if response.status_code == 529:
                        # Overloaded: back off more aggressively
                        wait = self.config.retry_backoff_base * (3 ** attempt)
                        await asyncio.sleep(wait)
                        continue

                    response.raise_for_status()
                    return response.json()

                except httpx.TimeoutException:
                    if attempt == self.config.retry_attempts - 1:
                        self._error_count += 1
                        raise

            raise RuntimeError("Max retries exceeded")

    @property
    def utilization(self) -> float:
        """Current pool utilization (0.0 to 1.0)."""
        active = self.config.max_concurrent_requests - self.semaphore._value
        return active / self.config.max_concurrent_requests

    def _get_api_key(self) -> str:
        import os
        return os.environ["ANTHROPIC_API_KEY"]

Horizontal Scaling with Worker Pools

Agent workers consume significant resources: memory for context windows, CPU for response parsing, and network I/O for tool calls. Scaling horizontally means running multiple worker processes across multiple machines, with a message queue distributing work.

# Agent worker that processes tasks from a Redis queue

import asyncio
import signal
from typing import Callable

class AgentWorker:
    """
    A worker process that pulls agent tasks from a Redis queue
    and executes them. Run multiple instances for horizontal scaling.
    """

    def __init__(
        self,
        redis: Redis,
        llm_pool: LLMConnectionPool,
        agent_factory: Callable,
        queue_name: str,
        max_concurrent: int = 50,
    ):
        self.redis = redis
        self.llm_pool = llm_pool
        self.agent_factory = agent_factory
        self.queue_name = queue_name
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.running = True
        self.active_tasks = 0

    async def start(self):
        """Main worker loop: pull tasks and process them."""
        # Graceful shutdown handling
        loop = asyncio.get_event_loop()
        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.add_signal_handler(sig, self._shutdown)

        while self.running:
            try:
                # Block-wait for a task (with timeout for shutdown checks)
                result = await self.redis.brpop(
                    self.queue_name, timeout=5
                )
                if result is None:
                    continue

                _, task_data = result
                task = json.loads(task_data)

                # Process in background with concurrency limit
                asyncio.create_task(self._process_task(task))

            except Exception as e:
                print(f"Worker error: {e}")
                await asyncio.sleep(1)

    async def _process_task(self, task: dict):
        async with self.semaphore:
            self.active_tasks += 1
            session_id = task["session_id"]
            try:
                # Load session state
                session_mgr = AgentSessionManager(self.redis)
                session = await session_mgr.get_session(session_id)
                if not session:
                    return

                # Create agent instance
                agent = self.agent_factory(
                    agent_type=session.agent_type,
                    llm_pool=self.llm_pool,
                )

                # Execute agent with streaming
                response_parts = []
                async for chunk in agent.run_streaming(
                    message=task["message"],
                    history=session.messages,
                    state=session.state,
                ):
                    response_parts.append(chunk)
                    # Stream partial responses to the user
                    await self.redis.publish(
                        f"agent:response:{session_id}",
                        json.dumps({"type": "chunk", "content": chunk})
                    )

                # Send completion signal
                full_response = "".join(response_parts)
                await self.redis.publish(
                    f"agent:response:{session_id}",
                    json.dumps({"type": "done", "content": full_response})
                )

                # Update session state
                session.messages.append({"role": "user", "content": task["message"]})
                session.messages.append({"role": "assistant", "content": full_response})
                await session_mgr.update_session(session)

            except Exception as e:
                await self.redis.publish(
                    f"agent:response:{session_id}",
                    json.dumps({"type": "error", "content": str(e)})
                )
            finally:
                self.active_tasks -= 1

    def _shutdown(self):
        self.running = False

WebSocket Management at Scale

At 10,000 concurrent users, WebSocket management becomes a significant concern. Each WebSocket connection consumes a file descriptor, memory for buffers, and periodic keepalive bandwidth.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Key strategies for WebSocket scaling:

Connection limits per pod: Set explicit limits (2,000-3,000 connections per pod) and use Kubernetes Horizontal Pod Autoscaler to add pods as connections grow.

Heartbeat and cleanup: Implement server-side heartbeats to detect dead connections. A connection that misses 3 heartbeats should be closed and its resources freed.

Sticky sessions: Use session affinity in the load balancer so that reconnecting clients return to the same pod where their session state is cached in memory.

Graceful degradation: When the system is at capacity, fall back to HTTP long-polling rather than rejecting users outright. Long-polling is less efficient but allows the system to serve more users during peak load.

Load Testing with k6

Load testing agent systems requires simulating realistic multi-turn conversations, not just HTTP request floods. The k6 framework supports WebSocket testing, making it ideal for agent load testing.

// k6 load test for agent WebSocket endpoint

import ws from "k6/ws";
import { check, sleep } from "k6";
import { Counter, Trend } from "k6/metrics";

const responseTime = new Trend("agent_response_time", true);
const errorCount = new Counter("agent_errors");
const messagesProcessed = new Counter("messages_processed");

export const options = {
  scenarios: {
    ramp_to_10k: {
      executor: "ramping-vus",
      startVUs: 100,
      stages: [
        { duration: "2m", target: 1000 },
        { duration: "3m", target: 5000 },
        { duration: "5m", target: 10000 },
        { duration: "10m", target: 10000 },  // Sustain peak
        { duration: "3m", target: 0 },
      ],
    },
  },
  thresholds: {
    agent_response_time: ["p(95)<15000"],  // 95th percentile under 15s
    agent_errors: ["count<100"],
  },
};

const CONVERSATION_TURNS = [
  "What is the status of my last order?",
  "Can you look up order #12345?",
  "I need to change the shipping address",
  "Please update it to 123 Main St, New York, NY 10001",
  "When will it arrive with the new address?",
];

export default function () {
  const url = "wss://api.example.com/ws/agent/customer-support";
  const params = {
    headers: {
      "x-user-id": `load-test-user-${__VU}`,
      Authorization: `Bearer ${__ENV.TEST_TOKEN}`,
    },
  };

  const res = ws.connect(url, params, function (socket) {
    let turnIndex = 0;

    socket.on("open", function () {
      // Send first message
      const start = Date.now();
      socket.send(
        JSON.stringify({ message: CONVERSATION_TURNS[turnIndex] })
      );

      socket.on("message", function (msg) {
        const data = JSON.parse(msg);

        if (data.type === "done") {
          const elapsed = Date.now() - start;
          responseTime.add(elapsed);
          messagesProcessed.add(1);

          turnIndex++;
          if (turnIndex < CONVERSATION_TURNS.length) {
            // Simulate human think time (2-8 seconds)
            sleep(2 + Math.random() * 6);
            socket.send(
              JSON.stringify({ message: CONVERSATION_TURNS[turnIndex] })
            );
          } else {
            socket.close();
          }
        }

        if (data.type === "error") {
          errorCount.add(1);
          socket.close();
        }
      });
    });

    socket.on("error", function (e) {
      errorCount.add(1);
    });

    socket.setTimeout(function () {
      socket.close();
    }, 120000);  // 2-minute timeout per conversation
  });

  check(res, {
    "WebSocket connected": (r) => r && r.status === 101,
  });
}

Performance Benchmarking Metrics

When scaling agent systems, track these metrics:

Time to First Token (TTFT): How long until the user sees the first response chunk. Target: under 2 seconds. This is the perceived responsiveness of the system.

End-to-End Latency: Total time from user message to complete response. Target: under 15 seconds for 95th percentile. Agent responses are inherently slower than API responses, so user expectations are different.

Throughput: Conversations per minute the system can sustain. Measure at steady state, not burst.

Error Rate: Percentage of interactions that fail (timeout, LLM error, tool error). Target: under 1%.

Resource Efficiency: Cost per conversation at peak load. Track LLM API costs, compute costs, and infrastructure costs separately to identify optimization opportunities.

FAQ

How much does it cost to run 10,000 concurrent agent sessions?

The dominant cost is LLM API calls. At 10,000 concurrent users with an average of 5 messages per conversation and 3 LLM calls per message, you are making roughly 150,000 LLM calls per hour at peak. Using a mid-tier model at approximately 3 dollars per million input tokens and 15 dollars per million output tokens, the LLM cost alone is approximately 200-500 dollars per hour depending on context length. Infrastructure costs (compute, Redis, networking) are typically 10-20% of the LLM cost. Model tiering (using cheap models for routing and expensive models for reasoning) can reduce total cost by 40-60%.

Should I use WebSockets or Server-Sent Events for agent streaming?

WebSockets are better when the client needs to send multiple messages during a conversation (multi-turn agents). Server-Sent Events (SSE) are simpler and work better with HTTP/2 when the client sends a single request and receives a streaming response. For most agent use cases, WebSockets are the right choice because conversations are inherently bidirectional.

How do you handle agent state when a pod crashes?

Externalize all session state to Redis or a similar store. The agent worker should be stateless: it loads session state from Redis at the start of each message processing, executes the agent logic, and writes the updated state back. If a pod crashes, the session state is preserved in Redis, and the next message from the user will be picked up by another pod that loads the same state.

What is the optimal number of concurrent agent sessions per worker pod?

This depends on your workload profile, but a good starting point is 50-100 concurrent sessions per pod with 2 CPU cores and 4GB RAM. The limiting factor is usually not CPU or memory but the number of concurrent outbound HTTP connections to LLM APIs. Profile your specific workload with realistic traffic patterns before setting final numbers.

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.