Agentic AI with FastAPI: Production Backend Architecture Patterns
Production patterns for agentic AI backends with FastAPI — WebSocket streaming, background agent tasks, dependency injection, and Pydantic models for agents.
Why FastAPI for Agentic AI Backends
FastAPI has become the default backend framework for agentic AI in 2026 for good reasons. Its native async support handles the I/O-bound nature of LLM API calls efficiently. Pydantic integration provides automatic request validation and documentation. WebSocket support enables streaming agent responses. And its dependency injection system maps cleanly to agent state management patterns.
This guide covers production-grade patterns for building agentic AI backends with FastAPI — from the API layer down to agent execution, with real code you can adapt for your projects.
Project Structure
A well-organized FastAPI project for agentic AI separates concerns into distinct layers:
agent-backend/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app and middleware
│ ├── api/
│ │ ├── __init__.py
│ │ ├── routes/
│ │ │ ├── chat.py # Chat endpoints
│ │ │ ├── conversations.py # Conversation management
│ │ │ └── health.py # Health checks
│ │ └── dependencies.py # Shared dependencies
│ ├── agents/
│ │ ├── __init__.py
│ │ ├── base.py # Agent base class
│ │ ├── registry.py # Agent registry
│ │ └── implementations/
│ │ ├── support.py
│ │ └── sales.py
│ ├── tools/
│ │ ├── __init__.py
│ │ └── implementations/
│ ├── models/
│ │ ├── __init__.py
│ │ ├── requests.py # Request schemas
│ │ ├── responses.py # Response schemas
│ │ └── database.py # DB models
│ ├── core/
│ │ ├── config.py # Settings
│ │ ├── database.py # DB connection
│ │ ├── redis.py # Redis client
│ │ └── llm.py # LLM client factory
│ └── services/
│ ├── conversation.py # Conversation service
│ └── agent_runner.py # Agent execution service
├── tests/
├── alembic/
├── docker-compose.yml
├── Dockerfile
└── pyproject.toml
Pydantic Models for Agent I/O
Define strict request and response models. FastAPI uses these for automatic validation, documentation, and serialization:
# app/models/requests.py
from pydantic import BaseModel, Field, field_validator
import uuid
class ChatRequest(BaseModel):
message: str = Field(
...,
min_length=1,
max_length=10000,
description="User message to send to the agent",
)
conversation_id: str | None = Field(
None,
description="Existing conversation ID to continue",
)
@field_validator("conversation_id")
@classmethod
def validate_conversation_id(cls, v: str | None) -> str | None:
if v is not None:
try:
uuid.UUID(v)
except ValueError:
raise ValueError("Invalid conversation ID format")
return v
class AgentConfigRequest(BaseModel):
agent_name: str = Field(
..., description="Name of the agent to use"
)
temperature: float = Field(
0.3, ge=0.0, le=1.0,
description="LLM temperature for this session",
)
max_iterations: int = Field(
10, ge=1, le=50,
description="Maximum agent loop iterations",
)
# app/models/responses.py
from pydantic import BaseModel, Field
from datetime import datetime
class ToolCallInfo(BaseModel):
tool_name: str
arguments: dict
result_summary: str
duration_ms: int
class ChatResponse(BaseModel):
response: str = Field(
..., description="Agent response text"
)
conversation_id: str = Field(
..., description="Conversation ID for follow-ups"
)
agent_name: str = Field(
..., description="Name of the agent that responded"
)
tools_used: list[ToolCallInfo] = Field(
default_factory=list,
description="Tools called during this turn",
)
total_tokens: int = Field(
0, description="Total tokens used"
)
latency_ms: int = Field(
0, description="Total response time in milliseconds"
)
class ConversationSummary(BaseModel):
id: str
agent_name: str
message_count: int
created_at: datetime
last_message_at: datetime
status: str = "active"
class ConversationListResponse(BaseModel):
conversations: list[ConversationSummary]
total: int
page: int
page_size: int
WebSocket Endpoints for Streaming
Streaming is critical for agentic AI — users should see the agent's response as it is generated, not wait 5-10 seconds for the full response. FastAPI's WebSocket support handles this cleanly:
# app/api/routes/chat.py
import json
import time
import uuid
from fastapi import (
APIRouter, WebSocket, WebSocketDisconnect,
Depends, HTTPException,
)
from app.models.requests import ChatRequest
from app.models.responses import ChatResponse, ToolCallInfo
from app.services.agent_runner import AgentRunner
from app.api.dependencies import (
get_agent_runner,
get_conversation_service,
)
router = APIRouter(prefix="/chat", tags=["chat"])
@router.post("/", response_model=ChatResponse)
async def chat(
request: ChatRequest,
runner: AgentRunner = Depends(get_agent_runner),
conv_service = Depends(get_conversation_service),
):
"""Send a message and receive a complete response."""
start = time.monotonic()
conversation_id = request.conversation_id or str(uuid.uuid4())
# Load or create conversation
history = await conv_service.get_history(conversation_id)
result = await runner.run(
message=request.message,
history=history,
)
# Save updated history
await conv_service.save_history(
conversation_id, result.messages
)
latency_ms = int((time.monotonic() - start) * 1000)
return ChatResponse(
response=result.final_text,
conversation_id=conversation_id,
agent_name=result.agent_name,
tools_used=[
ToolCallInfo(
tool_name=t["name"],
arguments=t["args"],
result_summary=t["result"][:200],
duration_ms=t["duration_ms"],
)
for t in result.tool_calls
],
total_tokens=result.total_tokens,
latency_ms=latency_ms,
)
@router.websocket("/ws/{conversation_id}")
async def chat_websocket(
websocket: WebSocket,
conversation_id: str,
runner: AgentRunner = Depends(get_agent_runner),
conv_service = Depends(get_conversation_service),
):
"""WebSocket endpoint for streaming agent responses."""
await websocket.accept()
try:
while True:
# Receive user message
data = await websocket.receive_json()
user_message = data.get("message", "")
if not user_message:
await websocket.send_json({
"type": "error",
"message": "Empty message",
})
continue
history = await conv_service.get_history(
conversation_id
)
# Stream the agent's response
async for event in runner.stream(
message=user_message,
history=history,
):
if event["type"] == "text_delta":
await websocket.send_json({
"type": "text",
"content": event["text"],
})
elif event["type"] == "tool_start":
await websocket.send_json({
"type": "tool_start",
"tool_name": event["tool_name"],
})
elif event["type"] == "tool_end":
await websocket.send_json({
"type": "tool_end",
"tool_name": event["tool_name"],
"duration_ms": event["duration_ms"],
})
elif event["type"] == "done":
await websocket.send_json({
"type": "done",
"tokens_used": event["tokens"],
})
# Save conversation after response
await conv_service.save_history(
conversation_id, event.get("messages", [])
)
except WebSocketDisconnect:
pass # Client disconnected, clean up handled by GC
Background Tasks for Agent Execution
Some agent tasks take too long for synchronous HTTP responses — report generation, data analysis, multi-step workflows. Use FastAPI's background tasks or a task queue:
# app/api/routes/tasks.py
from fastapi import APIRouter, BackgroundTasks, Depends
from pydantic import BaseModel, Field
import uuid
router = APIRouter(prefix="/tasks", tags=["tasks"])
class TaskRequest(BaseModel):
agent_name: str
instruction: str = Field(
..., max_length=50000
)
webhook_url: str | None = None
class TaskResponse(BaseModel):
task_id: str
status: str = "queued"
class TaskStatus(BaseModel):
task_id: str
status: str # queued, running, completed, failed
result: str | None = None
error: str | None = None
progress: float = 0.0
# In-memory task store (use Redis in production)
tasks: dict[str, TaskStatus] = {}
async def execute_agent_task(
task_id: str,
agent_name: str,
instruction: str,
runner: AgentRunner,
):
"""Background task that runs an agent to completion."""
tasks[task_id] = TaskStatus(
task_id=task_id, status="running"
)
try:
result = await runner.run(
message=instruction,
history=[],
)
tasks[task_id] = TaskStatus(
task_id=task_id,
status="completed",
result=result.final_text,
progress=1.0,
)
except Exception as e:
tasks[task_id] = TaskStatus(
task_id=task_id,
status="failed",
error=str(e),
)
@router.post("/", response_model=TaskResponse)
async def create_task(
request: TaskRequest,
background_tasks: BackgroundTasks,
runner: AgentRunner = Depends(get_agent_runner),
):
"""Queue an agent task for background execution."""
task_id = str(uuid.uuid4())
background_tasks.add_task(
execute_agent_task,
task_id=task_id,
agent_name=request.agent_name,
instruction=request.instruction,
runner=runner,
)
return TaskResponse(task_id=task_id)
@router.get("/{task_id}", response_model=TaskStatus)
async def get_task_status(task_id: str):
"""Check the status of a background agent task."""
status = tasks.get(task_id)
if not status:
raise HTTPException(404, "Task not found")
return status
For production, replace BackgroundTasks with a proper task queue like Celery, ARQ, or Dramatiq. BackgroundTasks runs in the same process and is lost if the server restarts.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Dependency Injection for Agent State
FastAPI's dependency injection system is excellent for managing agent-related resources — database connections, LLM clients, and agent configurations:
# app/api/dependencies.py
from functools import lru_cache
from anthropic import AsyncAnthropic
from app.core.config import Settings
from app.core.database import Database
from app.core.redis import RedisClient
from app.services.agent_runner import AgentRunner
from app.services.conversation import ConversationService
@lru_cache()
def get_settings() -> Settings:
return Settings()
_llm_client: AsyncAnthropic | None = None
async def get_llm_client() -> AsyncAnthropic:
global _llm_client
if _llm_client is None:
settings = get_settings()
_llm_client = AsyncAnthropic(
api_key=settings.anthropic_api_key
)
return _llm_client
_db: Database | None = None
async def get_database() -> Database:
global _db
if _db is None:
settings = get_settings()
_db = Database(settings.database_url)
await _db.connect()
return _db
_redis: RedisClient | None = None
async def get_redis() -> RedisClient:
global _redis
if _redis is None:
settings = get_settings()
_redis = RedisClient(settings.redis_url)
await _redis.connect()
return _redis
async def get_agent_runner(
llm_client: AsyncAnthropic = Depends(get_llm_client),
settings: Settings = Depends(get_settings),
) -> AgentRunner:
return AgentRunner(
client=llm_client,
model=settings.default_model,
max_iterations=settings.max_iterations,
)
async def get_conversation_service(
db: Database = Depends(get_database),
redis: RedisClient = Depends(get_redis),
) -> ConversationService:
return ConversationService(db=db, cache=redis)
This pattern ensures:
- A single LLM client is shared across all requests (connection pooling)
- Database and Redis connections are managed centrally
- Each endpoint declares exactly what dependencies it needs
- Testing is easy — override dependencies with mocks
The Agent Runner Service
The agent runner encapsulates the agent loop, making it reusable across endpoints:
# app/services/agent_runner.py
import time
import json
from dataclasses import dataclass, field
from typing import AsyncIterator
from anthropic import AsyncAnthropic
@dataclass
class AgentResult:
final_text: str
agent_name: str
messages: list[dict]
tool_calls: list[dict] = field(default_factory=list)
total_tokens: int = 0
class AgentRunner:
def __init__(
self,
client: AsyncAnthropic,
model: str = "claude-sonnet-4-20250514",
max_iterations: int = 10,
):
self.client = client
self.model = model
self.max_iterations = max_iterations
self.tools_registry = {}
def register_tools(self, tools: list):
for tool in tools:
self.tools_registry[tool["name"]] = tool
async def run(
self,
message: str,
history: list[dict],
system_prompt: str = "",
) -> AgentResult:
messages = history + [
{"role": "user", "content": message}
]
tool_calls = []
total_tokens = 0
for _ in range(self.max_iterations):
response = await self.client.messages.create(
model=self.model,
max_tokens=4096,
system=system_prompt,
tools=list(self.tools_registry.values()),
messages=messages,
)
total_tokens += (
response.usage.input_tokens
+ response.usage.output_tokens
)
if response.stop_reason == "tool_use":
messages.append({
"role": "assistant",
"content": response.content,
})
results = []
for block in response.content:
if block.type != "tool_use":
continue
start = time.monotonic()
result = await self._execute_tool(
block.name, block.input
)
duration = int(
(time.monotonic() - start) * 1000
)
tool_calls.append({
"name": block.name,
"args": block.input,
"result": result,
"duration_ms": duration,
})
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
messages.append({
"role": "user",
"content": results,
})
else:
text = ""
for block in response.content:
if hasattr(block, "text"):
text += block.text
messages.append({
"role": "assistant",
"content": text,
})
return AgentResult(
final_text=text,
agent_name="default",
messages=messages,
tool_calls=tool_calls,
total_tokens=total_tokens,
)
return AgentResult(
final_text="Maximum iterations reached.",
agent_name="default",
messages=messages,
tool_calls=tool_calls,
total_tokens=total_tokens,
)
async def stream(
self,
message: str,
history: list[dict],
system_prompt: str = "",
) -> AsyncIterator[dict]:
"""Stream agent responses as events."""
messages = history + [
{"role": "user", "content": message}
]
for _ in range(self.max_iterations):
async with self.client.messages.stream(
model=self.model,
max_tokens=4096,
system=system_prompt,
tools=list(self.tools_registry.values()),
messages=messages,
) as stream:
response = await stream.get_final_message()
if response.stop_reason == "tool_use":
messages.append({
"role": "assistant",
"content": response.content,
})
results = []
for block in response.content:
if block.type != "tool_use":
continue
yield {
"type": "tool_start",
"tool_name": block.name,
}
start = time.monotonic()
result = await self._execute_tool(
block.name, block.input
)
duration = int(
(time.monotonic() - start) * 1000
)
yield {
"type": "tool_end",
"tool_name": block.name,
"duration_ms": duration,
}
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
messages.append({
"role": "user",
"content": results,
})
else:
for block in response.content:
if hasattr(block, "text"):
yield {
"type": "text_delta",
"text": block.text,
}
yield {
"type": "done",
"tokens": response.usage.input_tokens
+ response.usage.output_tokens,
"messages": messages,
}
return
async def _execute_tool(
self, name: str, args: dict
) -> str:
"""Execute a tool with error handling."""
func = self.tools_registry.get(name, {}).get("execute")
if not func:
return json.dumps({
"error": f"Unknown tool: {name}"
})
try:
result = await func(**args)
return (
result if isinstance(result, str)
else json.dumps(result)
)
except Exception as e:
return json.dumps({"error": str(e)})
Middleware for Observability
Add middleware to capture request-level metrics and trace agent interactions:
# app/main.py
import time
import uuid
import structlog
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from app.api.routes import chat, conversations, health
logger = structlog.get_logger()
app = FastAPI(
title="Agentic AI Backend",
version="1.0.0",
description="Production backend for AI agent systems",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Restrict in production
allow_methods=["*"],
allow_headers=["*"],
)
@app.middleware("http")
async def add_request_context(request: Request, call_next):
request_id = str(uuid.uuid4())
start = time.monotonic()
# Add request ID to logger context
structlog.contextvars.bind_contextvars(
request_id=request_id
)
response = await call_next(request)
duration_ms = int((time.monotonic() - start) * 1000)
logger.info(
"request_completed",
method=request.method,
path=request.url.path,
status_code=response.status_code,
duration_ms=duration_ms,
)
response.headers["X-Request-ID"] = request_id
response.headers["X-Response-Time"] = f"{duration_ms}ms"
return response
app.include_router(chat.router)
app.include_router(conversations.router)
app.include_router(health.router)
At CallSphere, we use this exact FastAPI architecture to serve production agent backends across healthcare, real estate, and customer service verticals, handling thousands of concurrent WebSocket connections with sub-second response times for the streaming path.
Frequently Asked Questions
How do I handle concurrent requests to the same conversation?
Use a distributed lock (Redis-based) to ensure only one request processes a conversation at a time. When a new request arrives for a conversation that is already being processed, either queue it or return a 409 Conflict response. Without locking, concurrent requests can interleave messages in the conversation history, producing corrupted state.
Should I use WebSockets or Server-Sent Events for streaming?
Use WebSockets when you need bidirectional communication — the client sends messages and receives streaming responses in the same connection. Use SSE when the communication is mostly server-to-client and you want simpler infrastructure (SSE works through standard HTTP proxies and load balancers without special configuration). For most chat-style agent interfaces, WebSockets are the better choice because the client needs to send follow-up messages.
How do I scale a FastAPI agent backend horizontally?
FastAPI agent backends are I/O-bound (waiting for LLM API responses), not CPU-bound, so they scale well horizontally. Run multiple instances behind a load balancer, use Redis for shared session state, and store conversations in PostgreSQL. The key is to make each instance stateless — no in-memory conversation storage, no singleton agent instances. Use Kubernetes horizontal pod autoscaling based on request queue depth or concurrent WebSocket connections.
What is the best way to handle LLM rate limits in a production backend?
Implement a token bucket rate limiter in front of your LLM calls. Use Redis to maintain the bucket state across multiple server instances. When the rate limit is approached, queue requests rather than rejecting them. Implement exponential backoff with jitter for retry logic. Monitor your rate limit usage and alert at 80% capacity. For high-volume systems, consider using multiple API keys distributed across a key pool.
How do I monitor agent costs in real time?
Track token usage on every LLM call and store it alongside the conversation record. Calculate cost using the model's pricing (input tokens times input price plus output tokens times output price). Build a dashboard that shows cost per conversation, cost per agent, daily spend, and trends. Set up alerts for anomalous spending — a single runaway conversation with a long context can cost 10-100x a normal conversation. Implement a per-conversation cost ceiling that terminates the agent loop if exceeded.
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.