Skip to content
Learn Agentic AI11 min read0 views

Agentic AI with FastAPI: Production Backend Architecture Patterns

Production patterns for agentic AI backends with FastAPI — WebSocket streaming, background agent tasks, dependency injection, and Pydantic models for agents.

Why FastAPI for Agentic AI Backends

FastAPI has become the default backend framework for agentic AI in 2026 for good reasons. Its native async support handles the I/O-bound nature of LLM API calls efficiently. Pydantic integration provides automatic request validation and documentation. WebSocket support enables streaming agent responses. And its dependency injection system maps cleanly to agent state management patterns.

This guide covers production-grade patterns for building agentic AI backends with FastAPI — from the API layer down to agent execution, with real code you can adapt for your projects.

Project Structure

A well-organized FastAPI project for agentic AI separates concerns into distinct layers:

agent-backend/
├── app/
│   ├── __init__.py
│   ├── main.py                 # FastAPI app and middleware
│   ├── api/
│   │   ├── __init__.py
│   │   ├── routes/
│   │   │   ├── chat.py         # Chat endpoints
│   │   │   ├── conversations.py # Conversation management
│   │   │   └── health.py       # Health checks
│   │   └── dependencies.py     # Shared dependencies
│   ├── agents/
│   │   ├── __init__.py
│   │   ├── base.py             # Agent base class
│   │   ├── registry.py         # Agent registry
│   │   └── implementations/
│   │       ├── support.py
│   │       └── sales.py
│   ├── tools/
│   │   ├── __init__.py
│   │   └── implementations/
│   ├── models/
│   │   ├── __init__.py
│   │   ├── requests.py         # Request schemas
│   │   ├── responses.py        # Response schemas
│   │   └── database.py         # DB models
│   ├── core/
│   │   ├── config.py           # Settings
│   │   ├── database.py         # DB connection
│   │   ├── redis.py            # Redis client
│   │   └── llm.py              # LLM client factory
│   └── services/
│       ├── conversation.py     # Conversation service
│       └── agent_runner.py     # Agent execution service
├── tests/
├── alembic/
├── docker-compose.yml
├── Dockerfile
└── pyproject.toml

Pydantic Models for Agent I/O

Define strict request and response models. FastAPI uses these for automatic validation, documentation, and serialization:

# app/models/requests.py
from pydantic import BaseModel, Field, field_validator
import uuid

class ChatRequest(BaseModel):
    message: str = Field(
        ...,
        min_length=1,
        max_length=10000,
        description="User message to send to the agent",
    )
    conversation_id: str | None = Field(
        None,
        description="Existing conversation ID to continue",
    )

    @field_validator("conversation_id")
    @classmethod
    def validate_conversation_id(cls, v: str | None) -> str | None:
        if v is not None:
            try:
                uuid.UUID(v)
            except ValueError:
                raise ValueError("Invalid conversation ID format")
        return v

class AgentConfigRequest(BaseModel):
    agent_name: str = Field(
        ..., description="Name of the agent to use"
    )
    temperature: float = Field(
        0.3, ge=0.0, le=1.0,
        description="LLM temperature for this session",
    )
    max_iterations: int = Field(
        10, ge=1, le=50,
        description="Maximum agent loop iterations",
    )


# app/models/responses.py
from pydantic import BaseModel, Field
from datetime import datetime

class ToolCallInfo(BaseModel):
    tool_name: str
    arguments: dict
    result_summary: str
    duration_ms: int

class ChatResponse(BaseModel):
    response: str = Field(
        ..., description="Agent response text"
    )
    conversation_id: str = Field(
        ..., description="Conversation ID for follow-ups"
    )
    agent_name: str = Field(
        ..., description="Name of the agent that responded"
    )
    tools_used: list[ToolCallInfo] = Field(
        default_factory=list,
        description="Tools called during this turn",
    )
    total_tokens: int = Field(
        0, description="Total tokens used"
    )
    latency_ms: int = Field(
        0, description="Total response time in milliseconds"
    )

class ConversationSummary(BaseModel):
    id: str
    agent_name: str
    message_count: int
    created_at: datetime
    last_message_at: datetime
    status: str = "active"

class ConversationListResponse(BaseModel):
    conversations: list[ConversationSummary]
    total: int
    page: int
    page_size: int

WebSocket Endpoints for Streaming

Streaming is critical for agentic AI — users should see the agent's response as it is generated, not wait 5-10 seconds for the full response. FastAPI's WebSocket support handles this cleanly:

# app/api/routes/chat.py
import json
import time
import uuid
from fastapi import (
    APIRouter, WebSocket, WebSocketDisconnect,
    Depends, HTTPException,
)
from app.models.requests import ChatRequest
from app.models.responses import ChatResponse, ToolCallInfo
from app.services.agent_runner import AgentRunner
from app.api.dependencies import (
    get_agent_runner,
    get_conversation_service,
)

router = APIRouter(prefix="/chat", tags=["chat"])

@router.post("/", response_model=ChatResponse)
async def chat(
    request: ChatRequest,
    runner: AgentRunner = Depends(get_agent_runner),
    conv_service = Depends(get_conversation_service),
):
    """Send a message and receive a complete response."""
    start = time.monotonic()
    conversation_id = request.conversation_id or str(uuid.uuid4())

    # Load or create conversation
    history = await conv_service.get_history(conversation_id)

    result = await runner.run(
        message=request.message,
        history=history,
    )

    # Save updated history
    await conv_service.save_history(
        conversation_id, result.messages
    )

    latency_ms = int((time.monotonic() - start) * 1000)

    return ChatResponse(
        response=result.final_text,
        conversation_id=conversation_id,
        agent_name=result.agent_name,
        tools_used=[
            ToolCallInfo(
                tool_name=t["name"],
                arguments=t["args"],
                result_summary=t["result"][:200],
                duration_ms=t["duration_ms"],
            )
            for t in result.tool_calls
        ],
        total_tokens=result.total_tokens,
        latency_ms=latency_ms,
    )


@router.websocket("/ws/{conversation_id}")
async def chat_websocket(
    websocket: WebSocket,
    conversation_id: str,
    runner: AgentRunner = Depends(get_agent_runner),
    conv_service = Depends(get_conversation_service),
):
    """WebSocket endpoint for streaming agent responses."""
    await websocket.accept()

    try:
        while True:
            # Receive user message
            data = await websocket.receive_json()
            user_message = data.get("message", "")

            if not user_message:
                await websocket.send_json({
                    "type": "error",
                    "message": "Empty message",
                })
                continue

            history = await conv_service.get_history(
                conversation_id
            )

            # Stream the agent's response
            async for event in runner.stream(
                message=user_message,
                history=history,
            ):
                if event["type"] == "text_delta":
                    await websocket.send_json({
                        "type": "text",
                        "content": event["text"],
                    })
                elif event["type"] == "tool_start":
                    await websocket.send_json({
                        "type": "tool_start",
                        "tool_name": event["tool_name"],
                    })
                elif event["type"] == "tool_end":
                    await websocket.send_json({
                        "type": "tool_end",
                        "tool_name": event["tool_name"],
                        "duration_ms": event["duration_ms"],
                    })
                elif event["type"] == "done":
                    await websocket.send_json({
                        "type": "done",
                        "tokens_used": event["tokens"],
                    })

            # Save conversation after response
            await conv_service.save_history(
                conversation_id, event.get("messages", [])
            )

    except WebSocketDisconnect:
        pass  # Client disconnected, clean up handled by GC

Background Tasks for Agent Execution

Some agent tasks take too long for synchronous HTTP responses — report generation, data analysis, multi-step workflows. Use FastAPI's background tasks or a task queue:

# app/api/routes/tasks.py
from fastapi import APIRouter, BackgroundTasks, Depends
from pydantic import BaseModel, Field
import uuid

router = APIRouter(prefix="/tasks", tags=["tasks"])

class TaskRequest(BaseModel):
    agent_name: str
    instruction: str = Field(
        ..., max_length=50000
    )
    webhook_url: str | None = None

class TaskResponse(BaseModel):
    task_id: str
    status: str = "queued"

class TaskStatus(BaseModel):
    task_id: str
    status: str  # queued, running, completed, failed
    result: str | None = None
    error: str | None = None
    progress: float = 0.0

# In-memory task store (use Redis in production)
tasks: dict[str, TaskStatus] = {}

async def execute_agent_task(
    task_id: str,
    agent_name: str,
    instruction: str,
    runner: AgentRunner,
):
    """Background task that runs an agent to completion."""
    tasks[task_id] = TaskStatus(
        task_id=task_id, status="running"
    )

    try:
        result = await runner.run(
            message=instruction,
            history=[],
        )

        tasks[task_id] = TaskStatus(
            task_id=task_id,
            status="completed",
            result=result.final_text,
            progress=1.0,
        )
    except Exception as e:
        tasks[task_id] = TaskStatus(
            task_id=task_id,
            status="failed",
            error=str(e),
        )


@router.post("/", response_model=TaskResponse)
async def create_task(
    request: TaskRequest,
    background_tasks: BackgroundTasks,
    runner: AgentRunner = Depends(get_agent_runner),
):
    """Queue an agent task for background execution."""
    task_id = str(uuid.uuid4())

    background_tasks.add_task(
        execute_agent_task,
        task_id=task_id,
        agent_name=request.agent_name,
        instruction=request.instruction,
        runner=runner,
    )

    return TaskResponse(task_id=task_id)


@router.get("/{task_id}", response_model=TaskStatus)
async def get_task_status(task_id: str):
    """Check the status of a background agent task."""
    status = tasks.get(task_id)
    if not status:
        raise HTTPException(404, "Task not found")
    return status

For production, replace BackgroundTasks with a proper task queue like Celery, ARQ, or Dramatiq. BackgroundTasks runs in the same process and is lost if the server restarts.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Dependency Injection for Agent State

FastAPI's dependency injection system is excellent for managing agent-related resources — database connections, LLM clients, and agent configurations:

# app/api/dependencies.py
from functools import lru_cache
from anthropic import AsyncAnthropic
from app.core.config import Settings
from app.core.database import Database
from app.core.redis import RedisClient
from app.services.agent_runner import AgentRunner
from app.services.conversation import ConversationService

@lru_cache()
def get_settings() -> Settings:
    return Settings()

_llm_client: AsyncAnthropic | None = None

async def get_llm_client() -> AsyncAnthropic:
    global _llm_client
    if _llm_client is None:
        settings = get_settings()
        _llm_client = AsyncAnthropic(
            api_key=settings.anthropic_api_key
        )
    return _llm_client

_db: Database | None = None

async def get_database() -> Database:
    global _db
    if _db is None:
        settings = get_settings()
        _db = Database(settings.database_url)
        await _db.connect()
    return _db

_redis: RedisClient | None = None

async def get_redis() -> RedisClient:
    global _redis
    if _redis is None:
        settings = get_settings()
        _redis = RedisClient(settings.redis_url)
        await _redis.connect()
    return _redis

async def get_agent_runner(
    llm_client: AsyncAnthropic = Depends(get_llm_client),
    settings: Settings = Depends(get_settings),
) -> AgentRunner:
    return AgentRunner(
        client=llm_client,
        model=settings.default_model,
        max_iterations=settings.max_iterations,
    )

async def get_conversation_service(
    db: Database = Depends(get_database),
    redis: RedisClient = Depends(get_redis),
) -> ConversationService:
    return ConversationService(db=db, cache=redis)

This pattern ensures:

  • A single LLM client is shared across all requests (connection pooling)
  • Database and Redis connections are managed centrally
  • Each endpoint declares exactly what dependencies it needs
  • Testing is easy — override dependencies with mocks

The Agent Runner Service

The agent runner encapsulates the agent loop, making it reusable across endpoints:

# app/services/agent_runner.py
import time
import json
from dataclasses import dataclass, field
from typing import AsyncIterator
from anthropic import AsyncAnthropic

@dataclass
class AgentResult:
    final_text: str
    agent_name: str
    messages: list[dict]
    tool_calls: list[dict] = field(default_factory=list)
    total_tokens: int = 0

class AgentRunner:
    def __init__(
        self,
        client: AsyncAnthropic,
        model: str = "claude-sonnet-4-20250514",
        max_iterations: int = 10,
    ):
        self.client = client
        self.model = model
        self.max_iterations = max_iterations
        self.tools_registry = {}

    def register_tools(self, tools: list):
        for tool in tools:
            self.tools_registry[tool["name"]] = tool

    async def run(
        self,
        message: str,
        history: list[dict],
        system_prompt: str = "",
    ) -> AgentResult:
        messages = history + [
            {"role": "user", "content": message}
        ]
        tool_calls = []
        total_tokens = 0

        for _ in range(self.max_iterations):
            response = await self.client.messages.create(
                model=self.model,
                max_tokens=4096,
                system=system_prompt,
                tools=list(self.tools_registry.values()),
                messages=messages,
            )

            total_tokens += (
                response.usage.input_tokens
                + response.usage.output_tokens
            )

            if response.stop_reason == "tool_use":
                messages.append({
                    "role": "assistant",
                    "content": response.content,
                })

                results = []
                for block in response.content:
                    if block.type != "tool_use":
                        continue

                    start = time.monotonic()
                    result = await self._execute_tool(
                        block.name, block.input
                    )
                    duration = int(
                        (time.monotonic() - start) * 1000
                    )

                    tool_calls.append({
                        "name": block.name,
                        "args": block.input,
                        "result": result,
                        "duration_ms": duration,
                    })

                    results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": result,
                    })

                messages.append({
                    "role": "user",
                    "content": results,
                })
            else:
                text = ""
                for block in response.content:
                    if hasattr(block, "text"):
                        text += block.text

                messages.append({
                    "role": "assistant",
                    "content": text,
                })

                return AgentResult(
                    final_text=text,
                    agent_name="default",
                    messages=messages,
                    tool_calls=tool_calls,
                    total_tokens=total_tokens,
                )

        return AgentResult(
            final_text="Maximum iterations reached.",
            agent_name="default",
            messages=messages,
            tool_calls=tool_calls,
            total_tokens=total_tokens,
        )

    async def stream(
        self,
        message: str,
        history: list[dict],
        system_prompt: str = "",
    ) -> AsyncIterator[dict]:
        """Stream agent responses as events."""
        messages = history + [
            {"role": "user", "content": message}
        ]

        for _ in range(self.max_iterations):
            async with self.client.messages.stream(
                model=self.model,
                max_tokens=4096,
                system=system_prompt,
                tools=list(self.tools_registry.values()),
                messages=messages,
            ) as stream:
                response = await stream.get_final_message()

            if response.stop_reason == "tool_use":
                messages.append({
                    "role": "assistant",
                    "content": response.content,
                })

                results = []
                for block in response.content:
                    if block.type != "tool_use":
                        continue

                    yield {
                        "type": "tool_start",
                        "tool_name": block.name,
                    }

                    start = time.monotonic()
                    result = await self._execute_tool(
                        block.name, block.input
                    )
                    duration = int(
                        (time.monotonic() - start) * 1000
                    )

                    yield {
                        "type": "tool_end",
                        "tool_name": block.name,
                        "duration_ms": duration,
                    }

                    results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": result,
                    })

                messages.append({
                    "role": "user",
                    "content": results,
                })
            else:
                for block in response.content:
                    if hasattr(block, "text"):
                        yield {
                            "type": "text_delta",
                            "text": block.text,
                        }

                yield {
                    "type": "done",
                    "tokens": response.usage.input_tokens
                             + response.usage.output_tokens,
                    "messages": messages,
                }
                return

    async def _execute_tool(
        self, name: str, args: dict
    ) -> str:
        """Execute a tool with error handling."""
        func = self.tools_registry.get(name, {}).get("execute")
        if not func:
            return json.dumps({
                "error": f"Unknown tool: {name}"
            })
        try:
            result = await func(**args)
            return (
                result if isinstance(result, str)
                else json.dumps(result)
            )
        except Exception as e:
            return json.dumps({"error": str(e)})

Middleware for Observability

Add middleware to capture request-level metrics and trace agent interactions:

# app/main.py
import time
import uuid
import structlog
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from app.api.routes import chat, conversations, health

logger = structlog.get_logger()

app = FastAPI(
    title="Agentic AI Backend",
    version="1.0.0",
    description="Production backend for AI agent systems",
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Restrict in production
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.middleware("http")
async def add_request_context(request: Request, call_next):
    request_id = str(uuid.uuid4())
    start = time.monotonic()

    # Add request ID to logger context
    structlog.contextvars.bind_contextvars(
        request_id=request_id
    )

    response = await call_next(request)

    duration_ms = int((time.monotonic() - start) * 1000)

    logger.info(
        "request_completed",
        method=request.method,
        path=request.url.path,
        status_code=response.status_code,
        duration_ms=duration_ms,
    )

    response.headers["X-Request-ID"] = request_id
    response.headers["X-Response-Time"] = f"{duration_ms}ms"

    return response

app.include_router(chat.router)
app.include_router(conversations.router)
app.include_router(health.router)

At CallSphere, we use this exact FastAPI architecture to serve production agent backends across healthcare, real estate, and customer service verticals, handling thousands of concurrent WebSocket connections with sub-second response times for the streaming path.

Frequently Asked Questions

How do I handle concurrent requests to the same conversation?

Use a distributed lock (Redis-based) to ensure only one request processes a conversation at a time. When a new request arrives for a conversation that is already being processed, either queue it or return a 409 Conflict response. Without locking, concurrent requests can interleave messages in the conversation history, producing corrupted state.

Should I use WebSockets or Server-Sent Events for streaming?

Use WebSockets when you need bidirectional communication — the client sends messages and receives streaming responses in the same connection. Use SSE when the communication is mostly server-to-client and you want simpler infrastructure (SSE works through standard HTTP proxies and load balancers without special configuration). For most chat-style agent interfaces, WebSockets are the better choice because the client needs to send follow-up messages.

How do I scale a FastAPI agent backend horizontally?

FastAPI agent backends are I/O-bound (waiting for LLM API responses), not CPU-bound, so they scale well horizontally. Run multiple instances behind a load balancer, use Redis for shared session state, and store conversations in PostgreSQL. The key is to make each instance stateless — no in-memory conversation storage, no singleton agent instances. Use Kubernetes horizontal pod autoscaling based on request queue depth or concurrent WebSocket connections.

What is the best way to handle LLM rate limits in a production backend?

Implement a token bucket rate limiter in front of your LLM calls. Use Redis to maintain the bucket state across multiple server instances. When the rate limit is approached, queue requests rather than rejecting them. Implement exponential backoff with jitter for retry logic. Monitor your rate limit usage and alert at 80% capacity. For high-volume systems, consider using multiple API keys distributed across a key pool.

How do I monitor agent costs in real time?

Track token usage on every LLM call and store it alongside the conversation record. Calculate cost using the model's pricing (input tokens times input price plus output tokens times output price). Build a dashboard that shows cost per conversation, cost per agent, daily spend, and trends. Set up alerts for anomalous spending — a single runaway conversation with a long context can cost 10-100x a normal conversation. Implement a per-conversation cost ceiling that terminates the agent loop if exceeded.

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.