Streaming Structured Outputs: Incremental JSON Parsing for Real-Time Applications
Learn how to stream structured outputs from LLMs for real-time UI updates. Covers partial JSON parsing, streaming with Instructor and Pydantic, progressive UI rendering, and handling incomplete data.
The Streaming Problem for Structured Data
Standard structured output extraction waits for the entire LLM response before parsing. For small extractions this is fine, but when generating large structured objects — a detailed analysis with ten sections, a list of fifty extracted entities — the user stares at a loading spinner for 5-15 seconds.
Streaming solves this by delivering partial results as the model generates tokens. The challenge is that partial JSON is invalid JSON. You cannot call json.loads() on half an object. You need specialized parsing that handles incomplete data.
How Instructor Handles Streaming
Instructor provides a create_partial method that yields progressively more complete Pydantic objects:
import instructor
from openai import OpenAI
from pydantic import BaseModel, Field
from typing import List, Optional
client = instructor.from_openai(OpenAI())
class AnalysisReport(BaseModel):
title: str
executive_summary: Optional[str] = None
key_findings: List[str] = Field(default_factory=list)
recommendations: List[str] = Field(default_factory=list)
risk_level: Optional[str] = None
# Stream partial results
for partial_report in client.chat.completions.create_partial(
model="gpt-4o",
response_model=AnalysisReport,
messages=[
{
"role": "system",
"content": "Analyze the market data and produce a detailed report."
},
{
"role": "user",
"content": "Q4 2025 SaaS market data: ARR growth 23%, churn decreased to 4.2%..."
}
],
stream=True,
):
# Each iteration yields a more complete AnalysisReport
print(f"Title: {partial_report.title}")
print(f"Findings so far: {len(partial_report.key_findings)}")
print("---")
Each iteration yields a valid Pydantic object with whatever fields have been completed so far. Fields not yet streamed show their default values (empty lists, None).
Building a Real-Time UI with Streaming
Connect streaming structured outputs to a FastAPI server-sent events endpoint:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
async def stream_analysis(query: str):
"""Generator that yields SSE events with partial structured data."""
from openai import AsyncOpenAI
async_client = instructor.from_openai(AsyncOpenAI())
async for partial in async_client.chat.completions.create_partial(
model="gpt-4o",
response_model=AnalysisReport,
messages=[
{"role": "system", "content": "Analyze the data."},
{"role": "user", "content": query}
],
stream=True,
):
# Send each partial result as an SSE event
data = partial.model_dump(exclude_none=True)
yield f"data: {json.dumps(data)}\n\n"
yield "data: [DONE]\n\n"
@app.get("/api/analyze")
async def analyze(query: str):
return StreamingResponse(
stream_analysis(query),
media_type="text/event-stream",
)
On the frontend, consume the stream with an EventSource:
# Frontend JavaScript (shown for completeness)
# const source = new EventSource("/api/analyze?query=...");
# source.onmessage = (event) => {
# if (event.data === "[DONE]") { source.close(); return; }
# const partial = JSON.parse(event.data);
# updateUI(partial);
# };
Manual Partial JSON Parsing
If you are not using Instructor, you can parse partial JSON manually. The key insight is that incomplete JSON can often be made valid by closing open brackets and braces:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import json
import re
def try_parse_partial_json(partial: str) -> dict | None:
"""Attempt to parse a partial JSON string by closing open structures."""
# Count unclosed brackets and braces
open_braces = partial.count("{") - partial.count("}")
open_brackets = partial.count("[") - partial.count("]")
# Remove trailing comma if present
cleaned = partial.rstrip().rstrip(",")
# Close open structures
cleaned += "]" * open_brackets
cleaned += "}" * open_braces
try:
return json.loads(cleaned)
except json.JSONDecodeError:
return None
# Example: partial stream from LLM
partial_stream = '{"title": "Market Report", "findings": ["Growth is strong"'
result = try_parse_partial_json(partial_stream)
print(result)
# {"title": "Market Report", "findings": ["Growth is strong"]}
This approach is fragile — it breaks on strings that contain literal braces. For production use, prefer Instructor's built-in partial parsing.
Streaming Lists of Objects
When extracting a list of items, you want each completed item to appear as soon as possible:
from typing import Iterable
class ExtractedContact(BaseModel):
name: str
email: Optional[str] = None
company: Optional[str] = None
# Stream individual items as they complete
for contact in client.chat.completions.create_iterable(
model="gpt-4o",
response_model=ExtractedContact,
messages=[
{
"role": "user",
"content": "Extract contacts: John (john@acme.com, Acme), Sarah (sarah@corp.io, BigCorp)..."
}
],
):
print(f"Got contact: {contact.name} at {contact.company}")
# Process each contact immediately — no waiting for full list
save_to_database(contact)
The create_iterable method yields fully validated individual objects as they are completed in the stream. This is different from create_partial, which yields increasingly complete versions of the entire response model.
Handling Stream Interruptions
Streams can be interrupted by network issues or timeouts. Handle partial completion gracefully:
from dataclasses import dataclass, field
@dataclass
class StreamResult:
completed: bool = False
last_partial: dict = field(default_factory=dict)
items_received: int = 0
error: str | None = None
async def safe_stream_extraction(text: str) -> StreamResult:
result = StreamResult()
async_client = instructor.from_openai(AsyncOpenAI())
try:
async for partial in async_client.chat.completions.create_partial(
model="gpt-4o",
response_model=AnalysisReport,
messages=[
{"role": "system", "content": "Analyze the data."},
{"role": "user", "content": text}
],
stream=True,
):
result.last_partial = partial.model_dump(exclude_none=True)
result.items_received += 1
result.completed = True
except Exception as e:
result.error = str(e)
# last_partial still contains the most recent valid state
return result
Even on failure, last_partial contains whatever data was successfully streamed before the interruption.
FAQ
What is the latency improvement from streaming structured outputs?
Time-to-first-token is typically 200-500ms regardless of total response length. Without streaming, the user waits for the full 3-15 second generation. With streaming, the UI starts updating after that first 200-500ms. For large structured outputs (50+ fields), perceived latency drops by 80-90%.
Does streaming affect the quality of structured outputs?
No. The model generates the same tokens whether you stream or not. The difference is purely in delivery timing. Strict mode and constrained decoding still apply to the full generation; streaming just lets you observe the output incrementally.
Can I stream and validate simultaneously?
With Instructor's create_partial, each yielded object is a valid Pydantic instance with default values for incomplete fields. Full validation (including cross-field validators) only applies when the stream completes. During streaming, individual field types are validated as they appear, but model-level validators that depend on multiple fields wait until the end.
#Streaming #RealTime #JSONParsing #StructuredOutputs #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.