WebSocket Architecture for AI Applications: Persistent Connections for Real-Time Agents
Learn how to design WebSocket-based architectures for AI agents, covering connection lifecycle management, protocol framing, heartbeat mechanisms, and automatic reconnection strategies for production reliability.
Why WebSockets Matter for AI Agents
HTTP request-response cycles work well for one-shot AI queries, but real-time AI agents need persistent, bidirectional communication. When an agent streams partial results, receives live tool outputs, or coordinates with other agents, opening a new TCP connection for every message adds unacceptable overhead. WebSockets solve this by upgrading an initial HTTP connection into a long-lived, full-duplex channel.
The WebSocket protocol (RFC 6455) begins with an HTTP upgrade handshake. Once the server responds with a 101 status code, both sides can send frames at any time without re-establishing a connection. This eliminates the repeated TLS handshake cost and HTTP header overhead that would otherwise dominate latency-sensitive AI interactions.
Connection Lifecycle Design
A robust WebSocket architecture for AI agents follows four phases: handshake, authentication, active session, and graceful shutdown.
import asyncio
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from datetime import datetime, timezone
app = FastAPI()
class AgentSession:
def __init__(self, ws: WebSocket, user_id: str):
self.ws = ws
self.user_id = user_id
self.connected_at = datetime.now(timezone.utc)
self.last_heartbeat = self.connected_at
async def send_event(self, event_type: str, payload: dict):
message = {
"type": event_type,
"payload": payload,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
await self.ws.send_json(message)
sessions: dict[str, AgentSession] = {}
@app.websocket("/ws/agent/{user_id}")
async def agent_endpoint(ws: WebSocket, user_id: str):
await ws.accept()
# Authentication phase
auth_msg = await asyncio.wait_for(ws.receive_json(), timeout=10.0)
if auth_msg.get("type") != "auth" or not verify_token(auth_msg.get("token")):
await ws.close(code=4001, reason="Authentication failed")
return
session = AgentSession(ws, user_id)
sessions[user_id] = session
await session.send_event("connected", {"session_id": user_id})
try:
while True:
data = await ws.receive_json()
await handle_agent_message(session, data)
except WebSocketDisconnect:
pass
finally:
sessions.pop(user_id, None)
The handshake phase accepts the raw connection. Authentication happens immediately after — the client must send a token within 10 seconds or get disconnected. The active session loop processes messages until the client disconnects, and the finally block guarantees cleanup.
Protocol Design with Typed Messages
Define a clear message protocol so both client and server know what to expect. Every message should include a type field, a payload, and an optional request_id for correlating responses.
from enum import Enum
from pydantic import BaseModel
from typing import Any, Optional
class ClientMessageType(str, Enum):
AUTH = "auth"
QUERY = "query"
CANCEL = "cancel"
HEARTBEAT = "ping"
class ServerMessageType(str, Enum):
CONNECTED = "connected"
AGENT_TOKEN = "agent_token"
AGENT_DONE = "agent_done"
ERROR = "error"
HEARTBEAT_ACK = "pong"
class ClientMessage(BaseModel):
type: ClientMessageType
payload: dict[str, Any] = {}
request_id: Optional[str] = None
async def handle_agent_message(session: AgentSession, raw: dict):
msg = ClientMessage(**raw)
if msg.type == ClientMessageType.HEARTBEAT:
session.last_heartbeat = datetime.now(timezone.utc)
await session.send_event("pong", {})
return
if msg.type == ClientMessageType.QUERY:
asyncio.create_task(
stream_agent_response(session, msg.payload["prompt"], msg.request_id)
)
if msg.type == ClientMessageType.CANCEL:
cancel_agent_run(session.user_id, msg.request_id)
Heartbeat and Dead Connection Detection
Network failures often happen silently — the TCP connection stays open at the OS level even though packets are no longer being delivered. Heartbeats detect these zombie connections by requiring periodic proof-of-life from the client.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
HEARTBEAT_INTERVAL = 30 # seconds
HEARTBEAT_TIMEOUT = 90 # seconds — 3 missed heartbeats
async def heartbeat_monitor():
while True:
await asyncio.sleep(HEARTBEAT_INTERVAL)
now = datetime.now(timezone.utc)
dead_sessions = []
for user_id, session in sessions.items():
elapsed = (now - session.last_heartbeat).total_seconds()
if elapsed > HEARTBEAT_TIMEOUT:
dead_sessions.append(user_id)
for user_id in dead_sessions:
session = sessions.pop(user_id, None)
if session:
try:
await session.ws.close(code=4002, reason="Heartbeat timeout")
except Exception:
pass
Start this monitor as a background task when the application boots. The three-interval tolerance prevents false disconnections from brief network hiccups.
Client-Side Reconnection with Exponential Backoff
The client must handle reconnection automatically. Exponential backoff with jitter prevents a thundering herd when a server restarts and hundreds of clients try to reconnect simultaneously.
class AgentWebSocket {
private ws: WebSocket | null = null;
private reconnectAttempt = 0;
private maxReconnectDelay = 30000;
constructor(private url: string, private token: string) {}
connect(): void {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
this.reconnectAttempt = 0;
this.ws!.send(JSON.stringify({ type: "auth", token: this.token }));
this.startHeartbeat();
};
this.ws.onclose = (event) => {
if (event.code !== 1000) {
this.scheduleReconnect();
}
};
this.ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
this.handleMessage(msg);
};
}
private scheduleReconnect(): void {
const baseDelay = Math.min(
1000 * Math.pow(2, this.reconnectAttempt),
this.maxReconnectDelay
);
const jitter = baseDelay * 0.5 * Math.random();
const delay = baseDelay + jitter;
this.reconnectAttempt++;
setTimeout(() => this.connect(), delay);
}
private startHeartbeat(): void {
setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: "ping" }));
}
}, 25000);
}
}
The jitter adds randomness to the backoff delay, spreading reconnection attempts over time instead of creating a synchronized spike.
FAQ
Why not use HTTP long-polling instead of WebSockets for AI agent communication?
Long-polling requires the client to repeatedly open new HTTP connections, each carrying full headers and going through TLS negotiation. For AI agents that exchange dozens of messages per minute — streaming tokens, tool calls, status updates — the overhead is substantial. WebSockets maintain a single connection with minimal per-message framing (as low as 2 bytes for small messages), making them far more efficient for bidirectional, high-frequency communication.
How do you handle WebSocket connections across multiple server instances behind a load balancer?
You need sticky sessions or a shared session registry. Configure your load balancer to use IP hash or cookie-based affinity so a reconnecting client hits the same server. Alternatively, use a shared pub/sub layer like Redis — when a message arrives for a user, publish it to a channel, and whichever server holds that user's WebSocket will receive and forward it. This decouples message routing from connection ownership.
What happens to in-flight AI agent responses when a WebSocket disconnects unexpectedly?
Design your protocol with request IDs and server-side response buffering. When the agent finishes generating a response, store it temporarily (in Redis or memory with a TTL). When the client reconnects, it sends its last seen request ID, and the server replays any missed messages. This ensures no agent output is lost even during brief network interruptions.
#WebSocket #RealTimeAI #AgenticAI #Python #Streaming #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.