Skip to content
Learn Agentic AI11 min read0 views

WebSocket Architecture for AI Applications: Persistent Connections for Real-Time Agents

Learn how to design WebSocket-based architectures for AI agents, covering connection lifecycle management, protocol framing, heartbeat mechanisms, and automatic reconnection strategies for production reliability.

Why WebSockets Matter for AI Agents

HTTP request-response cycles work well for one-shot AI queries, but real-time AI agents need persistent, bidirectional communication. When an agent streams partial results, receives live tool outputs, or coordinates with other agents, opening a new TCP connection for every message adds unacceptable overhead. WebSockets solve this by upgrading an initial HTTP connection into a long-lived, full-duplex channel.

The WebSocket protocol (RFC 6455) begins with an HTTP upgrade handshake. Once the server responds with a 101 status code, both sides can send frames at any time without re-establishing a connection. This eliminates the repeated TLS handshake cost and HTTP header overhead that would otherwise dominate latency-sensitive AI interactions.

Connection Lifecycle Design

A robust WebSocket architecture for AI agents follows four phases: handshake, authentication, active session, and graceful shutdown.

import asyncio
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from datetime import datetime, timezone

app = FastAPI()

class AgentSession:
    def __init__(self, ws: WebSocket, user_id: str):
        self.ws = ws
        self.user_id = user_id
        self.connected_at = datetime.now(timezone.utc)
        self.last_heartbeat = self.connected_at

    async def send_event(self, event_type: str, payload: dict):
        message = {
            "type": event_type,
            "payload": payload,
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }
        await self.ws.send_json(message)

sessions: dict[str, AgentSession] = {}

@app.websocket("/ws/agent/{user_id}")
async def agent_endpoint(ws: WebSocket, user_id: str):
    await ws.accept()

    # Authentication phase
    auth_msg = await asyncio.wait_for(ws.receive_json(), timeout=10.0)
    if auth_msg.get("type") != "auth" or not verify_token(auth_msg.get("token")):
        await ws.close(code=4001, reason="Authentication failed")
        return

    session = AgentSession(ws, user_id)
    sessions[user_id] = session
    await session.send_event("connected", {"session_id": user_id})

    try:
        while True:
            data = await ws.receive_json()
            await handle_agent_message(session, data)
    except WebSocketDisconnect:
        pass
    finally:
        sessions.pop(user_id, None)

The handshake phase accepts the raw connection. Authentication happens immediately after — the client must send a token within 10 seconds or get disconnected. The active session loop processes messages until the client disconnects, and the finally block guarantees cleanup.

Protocol Design with Typed Messages

Define a clear message protocol so both client and server know what to expect. Every message should include a type field, a payload, and an optional request_id for correlating responses.

from enum import Enum
from pydantic import BaseModel
from typing import Any, Optional

class ClientMessageType(str, Enum):
    AUTH = "auth"
    QUERY = "query"
    CANCEL = "cancel"
    HEARTBEAT = "ping"

class ServerMessageType(str, Enum):
    CONNECTED = "connected"
    AGENT_TOKEN = "agent_token"
    AGENT_DONE = "agent_done"
    ERROR = "error"
    HEARTBEAT_ACK = "pong"

class ClientMessage(BaseModel):
    type: ClientMessageType
    payload: dict[str, Any] = {}
    request_id: Optional[str] = None

async def handle_agent_message(session: AgentSession, raw: dict):
    msg = ClientMessage(**raw)

    if msg.type == ClientMessageType.HEARTBEAT:
        session.last_heartbeat = datetime.now(timezone.utc)
        await session.send_event("pong", {})
        return

    if msg.type == ClientMessageType.QUERY:
        asyncio.create_task(
            stream_agent_response(session, msg.payload["prompt"], msg.request_id)
        )

    if msg.type == ClientMessageType.CANCEL:
        cancel_agent_run(session.user_id, msg.request_id)

Heartbeat and Dead Connection Detection

Network failures often happen silently — the TCP connection stays open at the OS level even though packets are no longer being delivered. Heartbeats detect these zombie connections by requiring periodic proof-of-life from the client.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

HEARTBEAT_INTERVAL = 30  # seconds
HEARTBEAT_TIMEOUT = 90   # seconds — 3 missed heartbeats

async def heartbeat_monitor():
    while True:
        await asyncio.sleep(HEARTBEAT_INTERVAL)
        now = datetime.now(timezone.utc)
        dead_sessions = []

        for user_id, session in sessions.items():
            elapsed = (now - session.last_heartbeat).total_seconds()
            if elapsed > HEARTBEAT_TIMEOUT:
                dead_sessions.append(user_id)

        for user_id in dead_sessions:
            session = sessions.pop(user_id, None)
            if session:
                try:
                    await session.ws.close(code=4002, reason="Heartbeat timeout")
                except Exception:
                    pass

Start this monitor as a background task when the application boots. The three-interval tolerance prevents false disconnections from brief network hiccups.

Client-Side Reconnection with Exponential Backoff

The client must handle reconnection automatically. Exponential backoff with jitter prevents a thundering herd when a server restarts and hundreds of clients try to reconnect simultaneously.

class AgentWebSocket {
  private ws: WebSocket | null = null;
  private reconnectAttempt = 0;
  private maxReconnectDelay = 30000;

  constructor(private url: string, private token: string) {}

  connect(): void {
    this.ws = new WebSocket(this.url);

    this.ws.onopen = () => {
      this.reconnectAttempt = 0;
      this.ws!.send(JSON.stringify({ type: "auth", token: this.token }));
      this.startHeartbeat();
    };

    this.ws.onclose = (event) => {
      if (event.code !== 1000) {
        this.scheduleReconnect();
      }
    };

    this.ws.onmessage = (event) => {
      const msg = JSON.parse(event.data);
      this.handleMessage(msg);
    };
  }

  private scheduleReconnect(): void {
    const baseDelay = Math.min(
      1000 * Math.pow(2, this.reconnectAttempt),
      this.maxReconnectDelay
    );
    const jitter = baseDelay * 0.5 * Math.random();
    const delay = baseDelay + jitter;
    this.reconnectAttempt++;
    setTimeout(() => this.connect(), delay);
  }

  private startHeartbeat(): void {
    setInterval(() => {
      if (this.ws?.readyState === WebSocket.OPEN) {
        this.ws.send(JSON.stringify({ type: "ping" }));
      }
    }, 25000);
  }
}

The jitter adds randomness to the backoff delay, spreading reconnection attempts over time instead of creating a synchronized spike.

FAQ

Why not use HTTP long-polling instead of WebSockets for AI agent communication?

Long-polling requires the client to repeatedly open new HTTP connections, each carrying full headers and going through TLS negotiation. For AI agents that exchange dozens of messages per minute — streaming tokens, tool calls, status updates — the overhead is substantial. WebSockets maintain a single connection with minimal per-message framing (as low as 2 bytes for small messages), making them far more efficient for bidirectional, high-frequency communication.

How do you handle WebSocket connections across multiple server instances behind a load balancer?

You need sticky sessions or a shared session registry. Configure your load balancer to use IP hash or cookie-based affinity so a reconnecting client hits the same server. Alternatively, use a shared pub/sub layer like Redis — when a message arrives for a user, publish it to a channel, and whichever server holds that user's WebSocket will receive and forward it. This decouples message routing from connection ownership.

What happens to in-flight AI agent responses when a WebSocket disconnects unexpectedly?

Design your protocol with request IDs and server-side response buffering. When the agent finishes generating a response, store it temporarily (in Redis or memory with a TTL). When the client reconnects, it sends its last seen request ID, and the server replays any missed messages. This ensures no agent output is lost even during brief network interruptions.


#WebSocket #RealTimeAI #AgenticAI #Python #Streaming #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.