Claude API Streaming: Real-Time AI Responses in Production
Complete guide to implementing streaming responses with the Claude API. Covers SSE implementation, token-by-token rendering, error handling during streams, and production patterns for real-time AI applications.
Why Streaming Matters
Without streaming, a Claude API call blocks until the entire response is generated. For a 1,000-token response, that means 5-15 seconds of silence followed by a wall of text. Users perceive this as slow, unresponsive, and frustrating.
Streaming changes the UX fundamentally. The first token arrives within 500ms-2s (time to first token, or TTFT), and subsequent tokens stream in at 50-100 tokens per second. Users see the response forming in real time, which feels fast even when the total generation time is identical.
For production applications -- chatbots, code assistants, real-time analysis tools -- streaming is not optional. It is a core UX requirement.
Basic Streaming in Python
from anthropic import Anthropic
client = Anthropic()
# Basic streaming with the messages API
with client.messages.stream(
model="claude-sonnet-4-5-20250514",
max_tokens=4096,
messages=[{"role": "user", "content": "Explain how TCP/IP works."}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
The stream() method returns a context manager that yields text chunks as they arrive. The flush=True ensures each chunk is printed immediately rather than buffered.
Basic Streaming in TypeScript
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
const stream = await client.messages.stream({
model: "claude-sonnet-4-5-20250514",
max_tokens: 4096,
messages: [{ role: "user", content: "Explain how TCP/IP works." }],
});
for await (const event of stream) {
if (event.type === "content_block_delta" && event.delta.type === "text_delta") {
process.stdout.write(event.delta.text);
}
}
// Get the final message with usage stats
const finalMessage = await stream.finalMessage();
console.log("\nTokens used:", finalMessage.usage);
Server-Sent Events (SSE) Architecture
The Claude API uses Server-Sent Events for streaming. Each event has a type that tells you what is happening:
| Event Type | Description | When It Occurs |
|---|---|---|
message_start |
Message metadata, model info | First event |
content_block_start |
New content block begins | Before each text/tool block |
content_block_delta |
Incremental content update | During generation |
content_block_stop |
Content block complete | After each block |
message_delta |
Message-level updates (stop reason, usage) | Near end |
message_stop |
Stream complete | Last event |
Handling All Event Types
from anthropic import Anthropic
client = Anthropic()
with client.messages.stream(
model="claude-sonnet-4-5-20250514",
max_tokens=4096,
messages=[{"role": "user", "content": "Write a Python function to sort a list."}]
) as stream:
for event in stream:
match event.type:
case "message_start":
print(f"Model: {event.message.model}")
case "content_block_start":
if event.content_block.type == "text":
print("--- Text block started ---")
elif event.content_block.type == "tool_use":
print(f"--- Tool call: {event.content_block.name} ---")
case "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.delta.type == "input_json_delta":
print(event.delta.partial_json, end="", flush=True)
case "message_delta":
print(f"\nStop reason: {event.delta.stop_reason}")
print(f"Output tokens: {event.usage.output_tokens}")
case "message_stop":
print("\n--- Stream complete ---")
Streaming with Tool Use
Streaming becomes more complex when tools are involved. Claude may stream text, then switch to a tool call, then resume text after seeing the tool result.
import json
def stream_with_tools(user_message: str, tools: list):
messages = [{"role": "user", "content": user_message}]
while True:
collected_text = ""
tool_calls = []
current_tool_input = ""
with client.messages.stream(
model="claude-sonnet-4-5-20250514",
max_tokens=4096,
tools=tools,
messages=messages,
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
collected_text += event.delta.text
elif event.delta.type == "input_json_delta":
current_tool_input += event.delta.partial_json
elif event.type == "content_block_start":
if event.content_block.type == "tool_use":
current_tool_input = ""
tool_calls.append({
"id": event.content_block.id,
"name": event.content_block.name,
})
elif event.type == "content_block_stop":
if tool_calls and current_tool_input:
tool_calls[-1]["input"] = json.loads(current_tool_input)
current_tool_input = ""
final = stream.get_final_message()
# If no tool calls, we are done
if final.stop_reason != "tool_use":
return collected_text
# Execute tools and continue
messages.append({"role": "assistant", "content": final.content})
tool_results = []
for tc in tool_calls:
result = execute_tool(tc["name"], tc["input"])
tool_results.append({
"type": "tool_result",
"tool_use_id": tc["id"],
"content": json.dumps(result),
})
messages.append({"role": "user", "content": tool_results})
Building a Streaming API Endpoint
For web applications, you need to proxy the Claude stream to your frontend. Here is a FastAPI implementation:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import Anthropic
app = FastAPI()
client = Anthropic()
@app.post("/api/chat")
async def chat_endpoint(request: ChatRequest):
async def generate():
with client.messages.stream(
model="claude-sonnet-4-5-20250514",
max_tokens=4096,
system=request.system_prompt,
messages=request.messages,
) as stream:
for text in stream.text_stream:
# Format as SSE
yield f"data: {json.dumps({'text': text})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
}
)
Frontend Consumer (React)
async function streamChat(messages: Message[]): AsyncGenerator<string> {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n\n");
for (const line of lines) {
if (line.startsWith("data: ") && line !== "data: [DONE]") {
const data = JSON.parse(line.slice(6));
yield data.text;
}
}
}
}
// Usage in a React component
function ChatComponent() {
const [response, setResponse] = useState("");
const handleSend = async (message: string) => {
setResponse("");
for await (const chunk of streamChat([{ role: "user", content: message }])) {
setResponse(prev => prev + chunk);
}
};
return <div>{response}</div>;
}
Error Handling During Streams
Streams can fail mid-generation due to network issues, rate limits, or server errors. Robust error handling is essential.
from anthropic import APIConnectionError, RateLimitError, APIStatusError
import time
def stream_with_retry(messages: list, max_retries: int = 3):
for attempt in range(max_retries):
try:
collected = ""
with client.messages.stream(
model="claude-sonnet-4-5-20250514",
max_tokens=4096,
messages=messages,
) as stream:
for text in stream.text_stream:
collected += text
yield text
return # Success
except APIConnectionError:
if attempt < max_retries - 1:
wait = 2 ** attempt
time.sleep(wait)
continue
raise
except RateLimitError as e:
retry_after = int(e.response.headers.get("retry-after", 30))
time.sleep(retry_after)
continue
except APIStatusError as e:
if e.status_code >= 500 and attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
raise
Performance Optimization
Token Buffering
Sending every single token to the frontend creates excessive network overhead. Buffer tokens and flush periodically:
import time
def buffered_stream(messages: list, flush_interval: float = 0.05):
buffer = ""
last_flush = time.time()
with client.messages.stream(
model="claude-sonnet-4-5-20250514",
max_tokens=4096,
messages=messages,
) as stream:
for text in stream.text_stream:
buffer += text
now = time.time()
if now - last_flush >= flush_interval or len(buffer) > 100:
yield buffer
buffer = ""
last_flush = now
if buffer: # Flush remaining
yield buffer
Connection Keep-Alive
For high-throughput applications, reuse HTTP connections. The Anthropic Python SDK handles this automatically through its internal httpx client. In TypeScript, the SDK uses node-fetch with connection pooling enabled by default.
Monitoring Streaming Performance
Track these metrics in production:
- Time to first token (TTFT): Should be under 2 seconds for interactive applications
- Tokens per second: Typically 50-100 for Claude Sonnet
- Stream completion rate: Percentage of streams that complete without error
- Partial response recovery: How often you successfully retry after mid-stream failures
NYC News
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.