Skip to content
Learn Agentic AI12 min read0 views

SDK Streaming Support: Implementing Real-Time Response Handling in Client Libraries

Learn how to implement streaming support in AI agent SDKs using Server-Sent Events, async iterators, event handling patterns, and automatic reconnection for real-time response delivery.

Why Streaming Matters for Agent SDKs

AI agent runs generate output incrementally — the model produces tokens one at a time, tools execute and return results mid-run, and status transitions happen throughout. Without streaming, users wait in silence until the entire run completes. With streaming, they see tokens appear in real time, watch tool calls execute, and can cancel long-running operations.

Streaming is not a nice-to-have for agent SDKs. It is fundamental to building responsive applications.

Server-Sent Events Parsing

Most AI APIs stream responses using Server-Sent Events (SSE). The format is simple: each event is a series of field: value lines separated by double newlines:

data: {"type": "token", "text": "Hello"}

data: {"type": "token", "text": " world"}

data: {"type": "tool_call", "name": "search", "arguments": "{\"q\": \"weather\"}"}

data: [DONE]

Here is a robust SSE parser in Python:

from __future__ import annotations
from dataclasses import dataclass
from typing import AsyncIterator
import json


@dataclass
class SSEEvent:
    event: str | None = None
    data: str = ""
    id: str | None = None
    retry: int | None = None


async def parse_sse(response) -> AsyncIterator[SSEEvent]:
    """Parse an SSE stream from an httpx async response."""
    current = SSEEvent()

    async for line in response.aiter_lines():
        if line == "":
            # Empty line = event boundary
            if current.data:
                yield current
            current = SSEEvent()
            continue

        if line.startswith(":"):
            # Comment line, skip
            continue

        field, _, value = line.partition(":")
        value = value.lstrip(" ")

        if field == "data":
            current.data += value
        elif field == "event":
            current.event = value
        elif field == "id":
            current.id = value
        elif field == "retry":
            try:
                current.retry = int(value)
            except ValueError:
                pass

Python Streaming with Async Iterators

The SDK should expose streaming through async iterators. This lets users consume events with a simple async for loop:

from typing import AsyncIterator
import httpx
import json


@dataclass
class StreamEvent:
    type: str
    data: dict

    @property
    def is_token(self) -> bool:
        return self.type == "token"

    @property
    def text(self) -> str:
        return self.data.get("text", "")


class RunStream:
    """Async iterator over a streaming agent run."""

    def __init__(self, response: httpx.Response) -> None:
        self._response = response
        self._collected_text = ""

    async def __aiter__(self) -> AsyncIterator[StreamEvent]:
        async for sse in parse_sse(self._response):
            if sse.data == "[DONE]":
                return

            payload = json.loads(sse.data)
            event = StreamEvent(type=payload["type"], data=payload)

            if event.is_token:
                self._collected_text += event.text

            yield event

    @property
    def collected_text(self) -> str:
        return self._collected_text


class RunsResource:
    def __init__(self, client) -> None:
        self._client = client

    async def create_stream(
        self, agent_id: str, input_text: str
    ) -> RunStream:
        response = await self._client._async_http.stream(
            "POST",
            f"/agents/{agent_id}/runs",
            json={"input": input_text, "stream": True},
        )
        return RunStream(response)

Usage becomes intuitive:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

stream = await client.runs.create_stream(
    agent_id="agent_abc123",
    input_text="Summarize the quarterly report",
)

async for event in stream:
    if event.is_token:
        print(event.text, end="", flush=True)
    elif event.type == "tool_call":
        print(f"\nCalling tool: {event.data['name']}")

print(f"\nFull response: {stream.collected_text}")

TypeScript Streaming

In TypeScript, use the ReadableStream API to parse SSE from a fetch response:

interface StreamEvent {
  type: 'token' | 'tool_call' | 'status' | 'error' | 'done';
  data: Record<string, unknown>;
}

async function* parseSSEStream(
  response: Response
): AsyncGenerator<StreamEvent> {
  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      buffer += decoder.decode(value, { stream: true });
      const lines = buffer.split('\n');
      buffer = lines.pop() ?? '';

      for (const line of lines) {
        if (line.startsWith('data: ')) {
          const data = line.slice(6);
          if (data === '[DONE]') return;
          yield JSON.parse(data) as StreamEvent;
        }
      }
    }
  } finally {
    reader.releaseLock();
  }
}

// Usage
const response = await fetch(`${baseUrl}/agents/${agentId}/runs`, {
  method: 'POST',
  headers: { Authorization: `Bearer ${apiKey}` },
  body: JSON.stringify({ input: 'Hello', stream: true }),
});

for await (const event of parseSSEStream(response)) {
  if (event.type === 'token') {
    process.stdout.write(event.data.text as string);
  }
}

Event Callbacks as an Alternative API

Some users prefer event callbacks over async iteration. Offer both patterns:

class RunStream:
    # ... existing async iterator methods ...

    async def on(
        self,
        token: Callable[[str], None] | None = None,
        tool_call: Callable[[dict], None] | None = None,
        done: Callable[[str], None] | None = None,
        error: Callable[[Exception], None] | None = None,
    ) -> str:
        """Consume the stream with event callbacks."""
        async for event in self:
            try:
                if event.type == "token" and token:
                    token(event.text)
                elif event.type == "tool_call" and tool_call:
                    tool_call(event.data)
            except Exception as exc:
                if error:
                    error(exc)
                else:
                    raise

        if done:
            done(self.collected_text)
        return self.collected_text

Automatic Reconnection

Streams break. Connections drop. A robust SDK reconnects automatically using the last event ID:

async def create_stream_with_reconnect(
    self, agent_id: str, input_text: str, max_reconnects: int = 3
) -> AsyncIterator[StreamEvent]:
    last_event_id = None
    reconnect_count = 0

    while reconnect_count <= max_reconnects:
        try:
            headers = {}
            if last_event_id:
                headers["Last-Event-ID"] = last_event_id

            stream = await self.create_stream(agent_id, input_text)
            async for event in stream:
                if hasattr(event, "id") and event.id:
                    last_event_id = event.id
                yield event
            return  # Stream completed normally

        except (ConnectionError, TimeoutError):
            reconnect_count += 1
            if reconnect_count > max_reconnects:
                raise
            await asyncio.sleep(1.0 * reconnect_count)

FAQ

How do I handle backpressure when the SDK receives events faster than the user processes them?

Async iterators handle backpressure naturally. The async for loop only requests the next event when the current one has been processed. If the consumer is slow, the SDK buffers incoming data in the HTTP response stream, which applies TCP-level backpressure to the server. Avoid pre-reading all events into an in-memory queue unless you explicitly need lookahead.

Should I support both streaming and non-streaming from the same method?

No. Use separate methods: client.runs.create() for synchronous runs that return a completed result, and client.runs.create_stream() for streaming. Mixing the two via a boolean flag makes the return type ambiguous and requires conditional type handling. Separate methods give each mode a clear type signature and distinct documentation.

How do I test streaming responses in unit tests?

Create mock SSE streams using async generators that yield predefined event sequences. In Python, use asyncio to create an AsyncIterator that yields SSEEvent objects with controlled timing. This lets you test parsing, event handling, and reconnection logic without a live server.


#Streaming #SSE #AsyncIterators #RealTime #SDKDesign #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.