Skip to content
Back to Blog
Agentic AI6 min read

Claude API Streaming: Real-Time AI Responses in Production

Complete guide to implementing streaming responses with the Claude API. Covers SSE implementation, token-by-token rendering, error handling during streams, and production patterns for real-time AI applications.

Why Streaming Matters

Without streaming, a Claude API call blocks until the entire response is generated. For a 1,000-token response, that means 5-15 seconds of silence followed by a wall of text. Users perceive this as slow, unresponsive, and frustrating.

Streaming changes the UX fundamentally. The first token arrives within 500ms-2s (time to first token, or TTFT), and subsequent tokens stream in at 50-100 tokens per second. Users see the response forming in real time, which feels fast even when the total generation time is identical.

For production applications -- chatbots, code assistants, real-time analysis tools -- streaming is not optional. It is a core UX requirement.

Basic Streaming in Python

from anthropic import Anthropic

client = Anthropic()

# Basic streaming with the messages API
with client.messages.stream(
    model="claude-sonnet-4-5-20250514",
    max_tokens=4096,
    messages=[{"role": "user", "content": "Explain how TCP/IP works."}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

The stream() method returns a context manager that yields text chunks as they arrive. The flush=True ensures each chunk is printed immediately rather than buffered.

Basic Streaming in TypeScript

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

const stream = await client.messages.stream({
  model: "claude-sonnet-4-5-20250514",
  max_tokens: 4096,
  messages: [{ role: "user", content: "Explain how TCP/IP works." }],
});

for await (const event of stream) {
  if (event.type === "content_block_delta" && event.delta.type === "text_delta") {
    process.stdout.write(event.delta.text);
  }
}

// Get the final message with usage stats
const finalMessage = await stream.finalMessage();
console.log("\nTokens used:", finalMessage.usage);

Server-Sent Events (SSE) Architecture

The Claude API uses Server-Sent Events for streaming. Each event has a type that tells you what is happening:

Event Type Description When It Occurs
message_start Message metadata, model info First event
content_block_start New content block begins Before each text/tool block
content_block_delta Incremental content update During generation
content_block_stop Content block complete After each block
message_delta Message-level updates (stop reason, usage) Near end
message_stop Stream complete Last event

Handling All Event Types

from anthropic import Anthropic

client = Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-5-20250514",
    max_tokens=4096,
    messages=[{"role": "user", "content": "Write a Python function to sort a list."}]
) as stream:
    for event in stream:
        match event.type:
            case "message_start":
                print(f"Model: {event.message.model}")
            case "content_block_start":
                if event.content_block.type == "text":
                    print("--- Text block started ---")
                elif event.content_block.type == "tool_use":
                    print(f"--- Tool call: {event.content_block.name} ---")
            case "content_block_delta":
                if event.delta.type == "text_delta":
                    print(event.delta.text, end="", flush=True)
                elif event.delta.type == "input_json_delta":
                    print(event.delta.partial_json, end="", flush=True)
            case "message_delta":
                print(f"\nStop reason: {event.delta.stop_reason}")
                print(f"Output tokens: {event.usage.output_tokens}")
            case "message_stop":
                print("\n--- Stream complete ---")

Streaming with Tool Use

Streaming becomes more complex when tools are involved. Claude may stream text, then switch to a tool call, then resume text after seeing the tool result.

import json

def stream_with_tools(user_message: str, tools: list):
    messages = [{"role": "user", "content": user_message}]

    while True:
        collected_text = ""
        tool_calls = []
        current_tool_input = ""

        with client.messages.stream(
            model="claude-sonnet-4-5-20250514",
            max_tokens=4096,
            tools=tools,
            messages=messages,
        ) as stream:
            for event in stream:
                if event.type == "content_block_delta":
                    if event.delta.type == "text_delta":
                        print(event.delta.text, end="", flush=True)
                        collected_text += event.delta.text
                    elif event.delta.type == "input_json_delta":
                        current_tool_input += event.delta.partial_json

                elif event.type == "content_block_start":
                    if event.content_block.type == "tool_use":
                        current_tool_input = ""
                        tool_calls.append({
                            "id": event.content_block.id,
                            "name": event.content_block.name,
                        })

                elif event.type == "content_block_stop":
                    if tool_calls and current_tool_input:
                        tool_calls[-1]["input"] = json.loads(current_tool_input)
                        current_tool_input = ""

            final = stream.get_final_message()

        # If no tool calls, we are done
        if final.stop_reason != "tool_use":
            return collected_text

        # Execute tools and continue
        messages.append({"role": "assistant", "content": final.content})
        tool_results = []
        for tc in tool_calls:
            result = execute_tool(tc["name"], tc["input"])
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": tc["id"],
                "content": json.dumps(result),
            })
        messages.append({"role": "user", "content": tool_results})

Building a Streaming API Endpoint

For web applications, you need to proxy the Claude stream to your frontend. Here is a FastAPI implementation:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import Anthropic

app = FastAPI()
client = Anthropic()

@app.post("/api/chat")
async def chat_endpoint(request: ChatRequest):
    async def generate():
        with client.messages.stream(
            model="claude-sonnet-4-5-20250514",
            max_tokens=4096,
            system=request.system_prompt,
            messages=request.messages,
        ) as stream:
            for text in stream.text_stream:
                # Format as SSE
                yield f"data: {json.dumps({'text': text})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        }
    )

Frontend Consumer (React)

async function streamChat(messages: Message[]): AsyncGenerator<string> {
  const response = await fetch("/api/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ messages }),
  });

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value);
    const lines = chunk.split("\n\n");

    for (const line of lines) {
      if (line.startsWith("data: ") && line !== "data: [DONE]") {
        const data = JSON.parse(line.slice(6));
        yield data.text;
      }
    }
  }
}

// Usage in a React component
function ChatComponent() {
  const [response, setResponse] = useState("");

  const handleSend = async (message: string) => {
    setResponse("");
    for await (const chunk of streamChat([{ role: "user", content: message }])) {
      setResponse(prev => prev + chunk);
    }
  };

  return <div>{response}</div>;
}

Error Handling During Streams

Streams can fail mid-generation due to network issues, rate limits, or server errors. Robust error handling is essential.

from anthropic import APIConnectionError, RateLimitError, APIStatusError
import time

def stream_with_retry(messages: list, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            collected = ""
            with client.messages.stream(
                model="claude-sonnet-4-5-20250514",
                max_tokens=4096,
                messages=messages,
            ) as stream:
                for text in stream.text_stream:
                    collected += text
                    yield text
            return  # Success

        except APIConnectionError:
            if attempt < max_retries - 1:
                wait = 2 ** attempt
                time.sleep(wait)
                continue
            raise

        except RateLimitError as e:
            retry_after = int(e.response.headers.get("retry-after", 30))
            time.sleep(retry_after)
            continue

        except APIStatusError as e:
            if e.status_code >= 500 and attempt < max_retries - 1:
                time.sleep(2 ** attempt)
                continue
            raise

Performance Optimization

Token Buffering

Sending every single token to the frontend creates excessive network overhead. Buffer tokens and flush periodically:

import time

def buffered_stream(messages: list, flush_interval: float = 0.05):
    buffer = ""
    last_flush = time.time()

    with client.messages.stream(
        model="claude-sonnet-4-5-20250514",
        max_tokens=4096,
        messages=messages,
    ) as stream:
        for text in stream.text_stream:
            buffer += text
            now = time.time()

            if now - last_flush >= flush_interval or len(buffer) > 100:
                yield buffer
                buffer = ""
                last_flush = now

        if buffer:  # Flush remaining
            yield buffer

Connection Keep-Alive

For high-throughput applications, reuse HTTP connections. The Anthropic Python SDK handles this automatically through its internal httpx client. In TypeScript, the SDK uses node-fetch with connection pooling enabled by default.

Monitoring Streaming Performance

Track these metrics in production:

  • Time to first token (TTFT): Should be under 2 seconds for interactive applications
  • Tokens per second: Typically 50-100 for Claude Sonnet
  • Stream completion rate: Percentage of streams that complete without error
  • Partial response recovery: How often you successfully retry after mid-stream failures
Share this article
N

NYC News

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.