Skip to content
Agentic AI
Agentic AI6 min read9 views

Claude API Streaming: Real-Time AI Responses in Production

Complete guide to implementing streaming responses with the Claude API. Covers SSE implementation, token-by-token rendering, error handling during streams, and production patterns for real-time AI applications.

Why Streaming Matters

Without streaming, a Claude API call blocks until the entire response is generated. For a 1,000-token response, that means 5-15 seconds of silence followed by a wall of text. Users perceive this as slow, unresponsive, and frustrating.

Streaming changes the UX fundamentally. The first token arrives within 500ms-2s (time to first token, or TTFT), and subsequent tokens stream in at 50-100 tokens per second. Users see the response forming in real time, which feels fast even when the total generation time is identical.

For production applications -- chatbots, code assistants, real-time analysis tools -- streaming is not optional. It is a core UX requirement.

Basic Streaming in Python

from anthropic import Anthropic

client = Anthropic()

# Basic streaming with the messages API
with client.messages.stream(
    model="claude-sonnet-4-5-20250514",
    max_tokens=4096,
    messages=[{"role": "user", "content": "Explain how TCP/IP works."}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

The stream() method returns a context manager that yields text chunks as they arrive. The flush=True ensures each chunk is printed immediately rather than buffered.

flowchart TD
    START["Claude API Streaming: Real-Time AI Responses in P…"] --> A
    A["Why Streaming Matters"]
    A --> B
    B["Basic Streaming in Python"]
    B --> C
    C["Basic Streaming in TypeScript"]
    C --> D
    D["Server-Sent Events SSE Architecture"]
    D --> E
    E["Streaming with Tool Use"]
    E --> F
    F["Building a Streaming API Endpoint"]
    F --> G
    G["Error Handling During Streams"]
    G --> H
    H["Performance Optimization"]
    H --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff

Basic Streaming in TypeScript

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

const stream = await client.messages.stream({
  model: "claude-sonnet-4-5-20250514",
  max_tokens: 4096,
  messages: [{ role: "user", content: "Explain how TCP/IP works." }],
});

for await (const event of stream) {
  if (event.type === "content_block_delta" && event.delta.type === "text_delta") {
    process.stdout.write(event.delta.text);
  }
}

// Get the final message with usage stats
const finalMessage = await stream.finalMessage();
console.log("\nTokens used:", finalMessage.usage);

Server-Sent Events (SSE) Architecture

The Claude API uses Server-Sent Events for streaming. Each event has a type that tells you what is happening:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Event Type Description When It Occurs
message_start Message metadata, model info First event
content_block_start New content block begins Before each text/tool block
content_block_delta Incremental content update During generation
content_block_stop Content block complete After each block
message_delta Message-level updates (stop reason, usage) Near end
message_stop Stream complete Last event

Handling All Event Types

from anthropic import Anthropic

client = Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-5-20250514",
    max_tokens=4096,
    messages=[{"role": "user", "content": "Write a Python function to sort a list."}]
) as stream:
    for event in stream:
        match event.type:
            case "message_start":
                print(f"Model: {event.message.model}")
            case "content_block_start":
                if event.content_block.type == "text":
                    print("--- Text block started ---")
                elif event.content_block.type == "tool_use":
                    print(f"--- Tool call: {event.content_block.name} ---")
            case "content_block_delta":
                if event.delta.type == "text_delta":
                    print(event.delta.text, end="", flush=True)
                elif event.delta.type == "input_json_delta":
                    print(event.delta.partial_json, end="", flush=True)
            case "message_delta":
                print(f"\nStop reason: {event.delta.stop_reason}")
                print(f"Output tokens: {event.usage.output_tokens}")
            case "message_stop":
                print("\n--- Stream complete ---")

Streaming with Tool Use

Streaming becomes more complex when tools are involved. Claude may stream text, then switch to a tool call, then resume text after seeing the tool result.

flowchart TD
    ROOT["Claude API Streaming: Real-Time AI Responses…"] 
    ROOT --> P0["Server-Sent Events SSE Architecture"]
    P0 --> P0C0["Handling All Event Types"]
    ROOT --> P1["Building a Streaming API Endpoint"]
    P1 --> P1C0["Frontend Consumer React"]
    ROOT --> P2["Performance Optimization"]
    P2 --> P2C0["Token Buffering"]
    P2 --> P2C1["Connection Keep-Alive"]
    style ROOT fill:#4f46e5,stroke:#4338ca,color:#fff
    style P0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style P1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style P2 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
import json

def stream_with_tools(user_message: str, tools: list):
    messages = [{"role": "user", "content": user_message}]

    while True:
        collected_text = ""
        tool_calls = []
        current_tool_input = ""

        with client.messages.stream(
            model="claude-sonnet-4-5-20250514",
            max_tokens=4096,
            tools=tools,
            messages=messages,
        ) as stream:
            for event in stream:
                if event.type == "content_block_delta":
                    if event.delta.type == "text_delta":
                        print(event.delta.text, end="", flush=True)
                        collected_text += event.delta.text
                    elif event.delta.type == "input_json_delta":
                        current_tool_input += event.delta.partial_json

                elif event.type == "content_block_start":
                    if event.content_block.type == "tool_use":
                        current_tool_input = ""
                        tool_calls.append({
                            "id": event.content_block.id,
                            "name": event.content_block.name,
                        })

                elif event.type == "content_block_stop":
                    if tool_calls and current_tool_input:
                        tool_calls[-1]["input"] = json.loads(current_tool_input)
                        current_tool_input = ""

            final = stream.get_final_message()

        # If no tool calls, we are done
        if final.stop_reason != "tool_use":
            return collected_text

        # Execute tools and continue
        messages.append({"role": "assistant", "content": final.content})
        tool_results = []
        for tc in tool_calls:
            result = execute_tool(tc["name"], tc["input"])
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": tc["id"],
                "content": json.dumps(result),
            })
        messages.append({"role": "user", "content": tool_results})

Building a Streaming API Endpoint

For web applications, you need to proxy the Claude stream to your frontend. Here is a FastAPI implementation:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import Anthropic

app = FastAPI()
client = Anthropic()

@app.post("/api/chat")
async def chat_endpoint(request: ChatRequest):
    async def generate():
        with client.messages.stream(
            model="claude-sonnet-4-5-20250514",
            max_tokens=4096,
            system=request.system_prompt,
            messages=request.messages,
        ) as stream:
            for text in stream.text_stream:
                # Format as SSE
                yield f"data: {json.dumps({'text': text})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        }
    )

Frontend Consumer (React)

async function streamChat(messages: Message[]): AsyncGenerator<string> {
  const response = await fetch("/api/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ messages }),
  });

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value);
    const lines = chunk.split("\n\n");

    for (const line of lines) {
      if (line.startsWith("data: ") && line !== "data: [DONE]") {
        const data = JSON.parse(line.slice(6));
        yield data.text;
      }
    }
  }
}

// Usage in a React component
function ChatComponent() {
  const [response, setResponse] = useState("");

  const handleSend = async (message: string) => {
    setResponse("");
    for await (const chunk of streamChat([{ role: "user", content: message }])) {
      setResponse(prev => prev + chunk);
    }
  };

  return <div>{response}</div>;
}

Error Handling During Streams

Streams can fail mid-generation due to network issues, rate limits, or server errors. Robust error handling is essential.

flowchart TD
    CENTER(("Key Components"))
    CENTER --> N0["Time to first token TTFT: Should be und…"]
    CENTER --> N1["Tokens per second: Typically 50-100 for…"]
    CENTER --> N2["Stream completion rate: Percentage of s…"]
    CENTER --> N3["Partial response recovery: How often yo…"]
    style CENTER fill:#4f46e5,stroke:#4338ca,color:#fff
from anthropic import APIConnectionError, RateLimitError, APIStatusError
import time

def stream_with_retry(messages: list, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            collected = ""
            with client.messages.stream(
                model="claude-sonnet-4-5-20250514",
                max_tokens=4096,
                messages=messages,
            ) as stream:
                for text in stream.text_stream:
                    collected += text
                    yield text
            return  # Success

        except APIConnectionError:
            if attempt < max_retries - 1:
                wait = 2 ** attempt
                time.sleep(wait)
                continue
            raise

        except RateLimitError as e:
            retry_after = int(e.response.headers.get("retry-after", 30))
            time.sleep(retry_after)
            continue

        except APIStatusError as e:
            if e.status_code >= 500 and attempt < max_retries - 1:
                time.sleep(2 ** attempt)
                continue
            raise

Performance Optimization

Token Buffering

Sending every single token to the frontend creates excessive network overhead. Buffer tokens and flush periodically:

import time

def buffered_stream(messages: list, flush_interval: float = 0.05):
    buffer = ""
    last_flush = time.time()

    with client.messages.stream(
        model="claude-sonnet-4-5-20250514",
        max_tokens=4096,
        messages=messages,
    ) as stream:
        for text in stream.text_stream:
            buffer += text
            now = time.time()

            if now - last_flush >= flush_interval or len(buffer) > 100:
                yield buffer
                buffer = ""
                last_flush = now

        if buffer:  # Flush remaining
            yield buffer

Connection Keep-Alive

For high-throughput applications, reuse HTTP connections. The Anthropic Python SDK handles this automatically through its internal httpx client. In TypeScript, the SDK uses node-fetch with connection pooling enabled by default.

Monitoring Streaming Performance

Track these metrics in production:

  • Time to first token (TTFT): Should be under 2 seconds for interactive applications
  • Tokens per second: Typically 50-100 for Claude Sonnet
  • Stream completion rate: Percentage of streams that complete without error
  • Partial response recovery: How often you successfully retry after mid-stream failures
Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.