SDK Streaming Support: Implementing Real-Time Response Handling in Client Libraries
Learn how to implement streaming support in AI agent SDKs using Server-Sent Events, async iterators, event handling patterns, and automatic reconnection for real-time response delivery.
Why Streaming Matters for Agent SDKs
AI agent runs generate output incrementally — the model produces tokens one at a time, tools execute and return results mid-run, and status transitions happen throughout. Without streaming, users wait in silence until the entire run completes. With streaming, they see tokens appear in real time, watch tool calls execute, and can cancel long-running operations.
Streaming is not a nice-to-have for agent SDKs. It is fundamental to building responsive applications.
Server-Sent Events Parsing
Most AI APIs stream responses using Server-Sent Events (SSE). The format is simple: each event is a series of field: value lines separated by double newlines:
data: {"type": "token", "text": "Hello"}
data: {"type": "token", "text": " world"}
data: {"type": "tool_call", "name": "search", "arguments": "{\"q\": \"weather\"}"}
data: [DONE]
Here is a robust SSE parser in Python:
from __future__ import annotations
from dataclasses import dataclass
from typing import AsyncIterator
import json
@dataclass
class SSEEvent:
event: str | None = None
data: str = ""
id: str | None = None
retry: int | None = None
async def parse_sse(response) -> AsyncIterator[SSEEvent]:
"""Parse an SSE stream from an httpx async response."""
current = SSEEvent()
async for line in response.aiter_lines():
if line == "":
# Empty line = event boundary
if current.data:
yield current
current = SSEEvent()
continue
if line.startswith(":"):
# Comment line, skip
continue
field, _, value = line.partition(":")
value = value.lstrip(" ")
if field == "data":
current.data += value
elif field == "event":
current.event = value
elif field == "id":
current.id = value
elif field == "retry":
try:
current.retry = int(value)
except ValueError:
pass
Python Streaming with Async Iterators
The SDK should expose streaming through async iterators. This lets users consume events with a simple async for loop:
from typing import AsyncIterator
import httpx
import json
@dataclass
class StreamEvent:
type: str
data: dict
@property
def is_token(self) -> bool:
return self.type == "token"
@property
def text(self) -> str:
return self.data.get("text", "")
class RunStream:
"""Async iterator over a streaming agent run."""
def __init__(self, response: httpx.Response) -> None:
self._response = response
self._collected_text = ""
async def __aiter__(self) -> AsyncIterator[StreamEvent]:
async for sse in parse_sse(self._response):
if sse.data == "[DONE]":
return
payload = json.loads(sse.data)
event = StreamEvent(type=payload["type"], data=payload)
if event.is_token:
self._collected_text += event.text
yield event
@property
def collected_text(self) -> str:
return self._collected_text
class RunsResource:
def __init__(self, client) -> None:
self._client = client
async def create_stream(
self, agent_id: str, input_text: str
) -> RunStream:
response = await self._client._async_http.stream(
"POST",
f"/agents/{agent_id}/runs",
json={"input": input_text, "stream": True},
)
return RunStream(response)
Usage becomes intuitive:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
stream = await client.runs.create_stream(
agent_id="agent_abc123",
input_text="Summarize the quarterly report",
)
async for event in stream:
if event.is_token:
print(event.text, end="", flush=True)
elif event.type == "tool_call":
print(f"\nCalling tool: {event.data['name']}")
print(f"\nFull response: {stream.collected_text}")
TypeScript Streaming
In TypeScript, use the ReadableStream API to parse SSE from a fetch response:
interface StreamEvent {
type: 'token' | 'tool_call' | 'status' | 'error' | 'done';
data: Record<string, unknown>;
}
async function* parseSSEStream(
response: Response
): AsyncGenerator<StreamEvent> {
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() ?? '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
yield JSON.parse(data) as StreamEvent;
}
}
}
} finally {
reader.releaseLock();
}
}
// Usage
const response = await fetch(`${baseUrl}/agents/${agentId}/runs`, {
method: 'POST',
headers: { Authorization: `Bearer ${apiKey}` },
body: JSON.stringify({ input: 'Hello', stream: true }),
});
for await (const event of parseSSEStream(response)) {
if (event.type === 'token') {
process.stdout.write(event.data.text as string);
}
}
Event Callbacks as an Alternative API
Some users prefer event callbacks over async iteration. Offer both patterns:
class RunStream:
# ... existing async iterator methods ...
async def on(
self,
token: Callable[[str], None] | None = None,
tool_call: Callable[[dict], None] | None = None,
done: Callable[[str], None] | None = None,
error: Callable[[Exception], None] | None = None,
) -> str:
"""Consume the stream with event callbacks."""
async for event in self:
try:
if event.type == "token" and token:
token(event.text)
elif event.type == "tool_call" and tool_call:
tool_call(event.data)
except Exception as exc:
if error:
error(exc)
else:
raise
if done:
done(self.collected_text)
return self.collected_text
Automatic Reconnection
Streams break. Connections drop. A robust SDK reconnects automatically using the last event ID:
async def create_stream_with_reconnect(
self, agent_id: str, input_text: str, max_reconnects: int = 3
) -> AsyncIterator[StreamEvent]:
last_event_id = None
reconnect_count = 0
while reconnect_count <= max_reconnects:
try:
headers = {}
if last_event_id:
headers["Last-Event-ID"] = last_event_id
stream = await self.create_stream(agent_id, input_text)
async for event in stream:
if hasattr(event, "id") and event.id:
last_event_id = event.id
yield event
return # Stream completed normally
except (ConnectionError, TimeoutError):
reconnect_count += 1
if reconnect_count > max_reconnects:
raise
await asyncio.sleep(1.0 * reconnect_count)
FAQ
How do I handle backpressure when the SDK receives events faster than the user processes them?
Async iterators handle backpressure naturally. The async for loop only requests the next event when the current one has been processed. If the consumer is slow, the SDK buffers incoming data in the HTTP response stream, which applies TCP-level backpressure to the server. Avoid pre-reading all events into an in-memory queue unless you explicitly need lookahead.
Should I support both streaming and non-streaming from the same method?
No. Use separate methods: client.runs.create() for synchronous runs that return a completed result, and client.runs.create_stream() for streaming. Mixing the two via a boolean flag makes the return type ambiguous and requires conditional type handling. Separate methods give each mode a clear type signature and distinct documentation.
How do I test streaming responses in unit tests?
Create mock SSE streams using async generators that yield predefined event sequences. In Python, use asyncio to create an AsyncIterator that yields SSEEvent objects with controlled timing. This lets you test parsing, event handling, and reconnection logic without a live server.
#Streaming #SSE #AsyncIterators #RealTime #SDKDesign #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.