OpenAI Agents SDK with FastAPI: Production Web Server Integration Patterns
Learn how to mount OpenAI Agents SDK agents inside a FastAPI web server with session management, concurrent user handling, streaming responses, and production-ready error handling.
Why FastAPI and Agents SDK Work Well Together
FastAPI is async-native. The OpenAI Agents SDK is async-native. This alignment means you can run agent loops inside request handlers without blocking other users. No thread pools, no workarounds — just native async/await throughout the stack.
This guide shows you how to build a production web API that exposes agent capabilities to multiple concurrent users with proper session isolation.
Basic Integration: Agent as an Endpoint
The simplest pattern wraps a Runner.run call inside a FastAPI route.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from agents import Agent, Runner
app = FastAPI(title="Agent API")
support_agent = Agent(
name="support",
instructions="You are a customer support agent for a SaaS product.",
)
class ChatRequest(BaseModel):
message: str
user_id: str
class ChatResponse(BaseModel):
reply: str
agent_name: str
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
try:
result = await Runner.run(
support_agent,
input=request.message,
)
return ChatResponse(
reply=result.final_output,
agent_name=result.last_agent.name,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Session Management: Multi-Turn Conversations
Real conversations span multiple requests. You need to persist the conversation state between calls. Here is a session manager that stores history per user.
from datetime import datetime, timedelta
from typing import Any
import uuid
class SessionManager:
def __init__(self, ttl_minutes: int = 60):
self._sessions: dict[str, dict[str, Any]] = {}
self.ttl = timedelta(minutes=ttl_minutes)
def get_or_create(self, session_id: str) -> dict[str, Any]:
if session_id not in self._sessions:
self._sessions[session_id] = {
"id": session_id,
"history": [],
"created_at": datetime.utcnow(),
"last_active": datetime.utcnow(),
}
session = self._sessions[session_id]
session["last_active"] = datetime.utcnow()
return session
def cleanup_expired(self):
now = datetime.utcnow()
expired = [
sid for sid, s in self._sessions.items()
if now - s["last_active"] > self.ttl
]
for sid in expired:
del self._sessions[sid]
sessions = SessionManager(ttl_minutes=30)
Multi-Turn Endpoint with History
Now wire the session manager into your endpoint so each request carries forward the conversation.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from agents.items import TResponseInputItem
class MultiTurnRequest(BaseModel):
message: str
session_id: str | None = None
class MultiTurnResponse(BaseModel):
reply: str
session_id: str
turn_count: int
@app.post("/chat/session", response_model=MultiTurnResponse)
async def chat_session(request: MultiTurnRequest):
session_id = request.session_id or str(uuid.uuid4())
session = sessions.get_or_create(session_id)
# Build input from history plus new message
input_items: list[TResponseInputItem] = list(session["history"])
input_items.append({"role": "user", "content": request.message})
result = await Runner.run(support_agent, input=input_items)
# Persist the new turn in session history
session["history"] = result.to_input_list()
return MultiTurnResponse(
reply=result.final_output,
session_id=session_id,
turn_count=len([
item for item in session["history"]
if isinstance(item, dict) and item.get("role") == "user"
]),
)
Streaming Responses with Server-Sent Events
For long agent responses, streaming gives users immediate feedback.
from fastapi.responses import StreamingResponse
from agents import Runner
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
async def event_generator():
result = Runner.run_streamed(support_agent, input=request.message)
async for event in result.stream_events():
if hasattr(event, "data"):
yield f"data: {event.data}\n\n"
yield f"data: [DONE]\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
Handling Concurrent Users
FastAPI handles concurrency naturally with async, but you need to ensure agent state is isolated per request. Never share mutable agent state across requests.
from contextlib import asynccontextmanager
import asyncio
# Rate limiting per user
user_semaphores: dict[str, asyncio.Semaphore] = {}
def get_user_semaphore(user_id: str, max_concurrent: int = 3) -> asyncio.Semaphore:
if user_id not in user_semaphores:
user_semaphores[user_id] = asyncio.Semaphore(max_concurrent)
return user_semaphores[user_id]
@app.post("/chat/limited")
async def chat_with_limit(request: ChatRequest):
semaphore = get_user_semaphore(request.user_id)
if not semaphore._value:
raise HTTPException(
status_code=429,
detail="Too many concurrent requests. Please wait.",
)
async with semaphore:
result = await Runner.run(support_agent, input=request.message)
return {"reply": result.final_output}
Startup and Shutdown Lifecycle
Use FastAPI's lifespan events to manage resources.
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: validate agent configuration
print("Agent API starting, validating agents...")
test_result = await Runner.run(support_agent, input="ping")
print(f"Agent validated: {test_result.last_agent.name}")
yield
# Shutdown: cleanup
sessions.cleanup_expired()
print("Agent API shutdown complete")
app = FastAPI(title="Agent API", lifespan=lifespan)
FAQ
How do I handle agent timeouts in a web server context?
Wrap your Runner.run call with asyncio.wait_for(Runner.run(...), timeout=30.0). This raises asyncio.TimeoutError after 30 seconds, which you catch and return as a 504 Gateway Timeout. Set the timeout based on your load balancer and client expectations.
Should I create a new Agent instance per request?
No. Agent instances are lightweight configuration objects — they hold instructions, tool definitions, and handoff lists. They do not store conversation state. Create agents once at module level and reuse them across requests. The Runner manages per-request state internally.
How do I scale this beyond a single server?
Move session storage from in-memory dictionaries to Redis. Use Redis as your session backend so any server instance can resume any conversation. Deploy multiple FastAPI instances behind a load balancer. The agents are stateless, so horizontal scaling is straightforward.
#OpenAIAgentsSDK #FastAPI #Production #WebServer #Python #SessionManagement #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.