TypeScript Streaming Patterns: ReadableStream, AsyncIterator, and SSE for AI
Deep dive into TypeScript streaming patterns essential for AI applications. Learn ReadableStream construction, TransformStreams for processing, async iterators for consumption, Server-Sent Events for browser delivery, and backpressure handling.
Why Streaming Matters for AI Applications
LLMs generate tokens sequentially, and a typical response takes 2-10 seconds to complete. Without streaming, users stare at a loading spinner for the entire duration. With streaming, the first token appears in under 200 milliseconds, creating a dramatically better user experience.
TypeScript's Web Streams API, async iterators, and Server-Sent Events provide the building blocks for end-to-end streaming from the LLM to the browser. Understanding these primitives lets you build custom streaming pipelines beyond what framework abstractions provide.
ReadableStream: The Foundation
A ReadableStream is the standard way to represent a source of data that arrives over time. The Web Streams API is available in Node.js 18+, Deno, Bun, and all modern browsers.
Construct a ReadableStream that emits LLM tokens:
function createTokenStream(tokens: string[]): ReadableStream<string> {
let index = 0;
return new ReadableStream<string>({
pull(controller) {
if (index < tokens.length) {
controller.enqueue(tokens[index]);
index++;
} else {
controller.close();
}
},
});
}
The pull method is called by the consumer when it is ready for more data — this is how backpressure works. The stream only produces data as fast as the consumer can handle it.
For an LLM streaming response, wrap the provider's async iterable:
function llmToReadableStream(
stream: AsyncIterable<ChatCompletionChunk>
): ReadableStream<string> {
const encoder = new TextEncoder();
return new ReadableStream({
async start(controller) {
try {
for await (const chunk of stream) {
const text = chunk.choices[0]?.delta?.content;
if (text) {
controller.enqueue(encoder.encode(text));
}
}
controller.close();
} catch (error) {
controller.error(error);
}
},
});
}
TransformStream: Processing in Flight
TransformStreams let you modify data as it flows through the pipeline. This is useful for formatting, filtering, or enriching tokens:
function createSSETransform(): TransformStream<string, Uint8Array> {
const encoder = new TextEncoder();
return new TransformStream({
transform(chunk, controller) {
const data = JSON.stringify({ text: chunk, timestamp: Date.now() });
controller.enqueue(encoder.encode(`data: ${data}
`));
},
flush(controller) {
controller.enqueue(encoder.encode("data: [DONE]
"));
},
});
}
// Pipeline: LLM tokens -> SSE formatted events
const sseStream = tokenStream.pipeThrough(createSSETransform());
A more practical transform counts tokens as they flow through:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
function createTokenCounter(): TransformStream<string, string> {
let tokenCount = 0;
return new TransformStream({
transform(chunk, controller) {
tokenCount += chunk.split(/s+/).length;
controller.enqueue(chunk);
},
flush(controller) {
console.log(`Stream complete. Approximate tokens: ${tokenCount}`);
},
});
}
Async Iterators: Consuming Streams
Convert a ReadableStream into an async iterator for ergonomic consumption:
async function* streamToAsyncIterator<T>(
stream: ReadableStream<T>
): AsyncGenerator<T> {
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
yield value;
}
} finally {
reader.releaseLock();
}
}
// Consume the stream
const stream = getAgentResponseStream();
for await (const token of streamToAsyncIterator(stream)) {
process.stdout.write(token);
}
In Node.js 20+, ReadableStream implements Symbol.asyncIterator natively, so you can iterate directly:
for await (const chunk of readableStream) {
process.stdout.write(new TextDecoder().decode(chunk));
}
Server-Sent Events: Browser Delivery
SSE is the simplest way to stream data from server to browser. It uses a plain HTTP connection with a specific content type:
// Server: Next.js API route
export async function GET(req: Request) {
const stream = await getAgentStream();
const sseStream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
for await (const token of stream) {
const event = `data: ${JSON.stringify({ token })}
`;
controller.enqueue(encoder.encode(event));
}
controller.enqueue(encoder.encode("data: [DONE]
"));
controller.close();
},
});
return new Response(sseStream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
},
});
}
Consume SSE on the client with EventSource or fetch:
// Client: Browser
function streamAgentResponse(
onToken: (token: string) => void,
onDone: () => void
) {
const eventSource = new EventSource("/api/agent/stream");
eventSource.onmessage = (event) => {
if (event.data === "[DONE]") {
eventSource.close();
onDone();
return;
}
const { token } = JSON.parse(event.data);
onToken(token);
};
eventSource.onerror = () => {
eventSource.close();
};
}
For POST requests (EventSource only supports GET), use fetch with a reader:
async function fetchStream(messages: Message[]) {
const response = await fetch("/api/agent", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
// Parse SSE events from text
for (const line of text.split("\n")) {
if (line.startsWith("data: ") && line !== "data: [DONE]") {
const data = JSON.parse(line.slice(6));
appendToken(data.token);
}
}
}
}
Backpressure Handling
When the client reads slower than the LLM produces tokens, backpressure prevents memory buildup:
function createBackpressuredStream(
source: AsyncIterable<string>
): ReadableStream<Uint8Array> {
const encoder = new TextEncoder();
return new ReadableStream({
async pull(controller) {
// pull is only called when the consumer is ready
const iterator = (this as any)._iterator ??= source[Symbol.asyncIterator]();
const { done, value } = await iterator.next();
if (done) {
controller.close();
} else {
controller.enqueue(encoder.encode(value));
}
},
});
}
The pull-based model ensures the LLM response is consumed at the rate the client can handle, preventing unbounded buffering.
FAQ
When should I use SSE versus WebSockets for AI streaming?
Use SSE for AI agent responses because the data flow is unidirectional (server to client). SSE is simpler, works over standard HTTP, reconnects automatically, and is supported by all browsers. WebSockets are better when you need bidirectional real-time communication, such as collaborative editing or voice streaming.
Why not just use chunked transfer encoding without SSE framing?
Raw chunked encoding does not provide event boundaries. With SSE, each data: line is a discrete event that the client can parse independently. This matters when a single network chunk contains multiple partial tokens or when tokens span chunk boundaries.
How do I handle stream errors gracefully on the client?
Monitor the onerror event on EventSource or catch errors on the fetch reader. Display a user-friendly message and optionally retry the request. For critical applications, implement a heartbeat mechanism — send a periodic data: {"heartbeat": true} event so the client can detect stale connections.
#Streaming #TypeScript #ReadableStream #SSE #AsyncIterator #WebStreams #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.