Skip to content
Learn Agentic AI11 min read0 views

Streaming Structured Outputs: Incremental JSON Parsing for Real-Time Applications

Learn how to stream structured outputs from LLMs for real-time UI updates. Covers partial JSON parsing, streaming with Instructor and Pydantic, progressive UI rendering, and handling incomplete data.

The Streaming Problem for Structured Data

Standard structured output extraction waits for the entire LLM response before parsing. For small extractions this is fine, but when generating large structured objects — a detailed analysis with ten sections, a list of fifty extracted entities — the user stares at a loading spinner for 5-15 seconds.

Streaming solves this by delivering partial results as the model generates tokens. The challenge is that partial JSON is invalid JSON. You cannot call json.loads() on half an object. You need specialized parsing that handles incomplete data.

How Instructor Handles Streaming

Instructor provides a create_partial method that yields progressively more complete Pydantic objects:

import instructor
from openai import OpenAI
from pydantic import BaseModel, Field
from typing import List, Optional

client = instructor.from_openai(OpenAI())

class AnalysisReport(BaseModel):
    title: str
    executive_summary: Optional[str] = None
    key_findings: List[str] = Field(default_factory=list)
    recommendations: List[str] = Field(default_factory=list)
    risk_level: Optional[str] = None

# Stream partial results
for partial_report in client.chat.completions.create_partial(
    model="gpt-4o",
    response_model=AnalysisReport,
    messages=[
        {
            "role": "system",
            "content": "Analyze the market data and produce a detailed report."
        },
        {
            "role": "user",
            "content": "Q4 2025 SaaS market data: ARR growth 23%, churn decreased to 4.2%..."
        }
    ],
    stream=True,
):
    # Each iteration yields a more complete AnalysisReport
    print(f"Title: {partial_report.title}")
    print(f"Findings so far: {len(partial_report.key_findings)}")
    print("---")

Each iteration yields a valid Pydantic object with whatever fields have been completed so far. Fields not yet streamed show their default values (empty lists, None).

Building a Real-Time UI with Streaming

Connect streaming structured outputs to a FastAPI server-sent events endpoint:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

async def stream_analysis(query: str):
    """Generator that yields SSE events with partial structured data."""
    from openai import AsyncOpenAI

    async_client = instructor.from_openai(AsyncOpenAI())

    async for partial in async_client.chat.completions.create_partial(
        model="gpt-4o",
        response_model=AnalysisReport,
        messages=[
            {"role": "system", "content": "Analyze the data."},
            {"role": "user", "content": query}
        ],
        stream=True,
    ):
        # Send each partial result as an SSE event
        data = partial.model_dump(exclude_none=True)
        yield f"data: {json.dumps(data)}\n\n"

    yield "data: [DONE]\n\n"

@app.get("/api/analyze")
async def analyze(query: str):
    return StreamingResponse(
        stream_analysis(query),
        media_type="text/event-stream",
    )

On the frontend, consume the stream with an EventSource:

# Frontend JavaScript (shown for completeness)
# const source = new EventSource("/api/analyze?query=...");
# source.onmessage = (event) => {
#   if (event.data === "[DONE]") { source.close(); return; }
#   const partial = JSON.parse(event.data);
#   updateUI(partial);
# };

Manual Partial JSON Parsing

If you are not using Instructor, you can parse partial JSON manually. The key insight is that incomplete JSON can often be made valid by closing open brackets and braces:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import json
import re

def try_parse_partial_json(partial: str) -> dict | None:
    """Attempt to parse a partial JSON string by closing open structures."""
    # Count unclosed brackets and braces
    open_braces = partial.count("{") - partial.count("}")
    open_brackets = partial.count("[") - partial.count("]")

    # Remove trailing comma if present
    cleaned = partial.rstrip().rstrip(",")

    # Close open structures
    cleaned += "]" * open_brackets
    cleaned += "}" * open_braces

    try:
        return json.loads(cleaned)
    except json.JSONDecodeError:
        return None

# Example: partial stream from LLM
partial_stream = '{"title": "Market Report", "findings": ["Growth is strong"'
result = try_parse_partial_json(partial_stream)
print(result)
# {"title": "Market Report", "findings": ["Growth is strong"]}

This approach is fragile — it breaks on strings that contain literal braces. For production use, prefer Instructor's built-in partial parsing.

Streaming Lists of Objects

When extracting a list of items, you want each completed item to appear as soon as possible:

from typing import Iterable

class ExtractedContact(BaseModel):
    name: str
    email: Optional[str] = None
    company: Optional[str] = None

# Stream individual items as they complete
for contact in client.chat.completions.create_iterable(
    model="gpt-4o",
    response_model=ExtractedContact,
    messages=[
        {
            "role": "user",
            "content": "Extract contacts: John (john@acme.com, Acme), Sarah (sarah@corp.io, BigCorp)..."
        }
    ],
):
    print(f"Got contact: {contact.name} at {contact.company}")
    # Process each contact immediately — no waiting for full list
    save_to_database(contact)

The create_iterable method yields fully validated individual objects as they are completed in the stream. This is different from create_partial, which yields increasingly complete versions of the entire response model.

Handling Stream Interruptions

Streams can be interrupted by network issues or timeouts. Handle partial completion gracefully:

from dataclasses import dataclass, field

@dataclass
class StreamResult:
    completed: bool = False
    last_partial: dict = field(default_factory=dict)
    items_received: int = 0
    error: str | None = None

async def safe_stream_extraction(text: str) -> StreamResult:
    result = StreamResult()
    async_client = instructor.from_openai(AsyncOpenAI())

    try:
        async for partial in async_client.chat.completions.create_partial(
            model="gpt-4o",
            response_model=AnalysisReport,
            messages=[
                {"role": "system", "content": "Analyze the data."},
                {"role": "user", "content": text}
            ],
            stream=True,
        ):
            result.last_partial = partial.model_dump(exclude_none=True)
            result.items_received += 1

        result.completed = True
    except Exception as e:
        result.error = str(e)
        # last_partial still contains the most recent valid state

    return result

Even on failure, last_partial contains whatever data was successfully streamed before the interruption.

FAQ

What is the latency improvement from streaming structured outputs?

Time-to-first-token is typically 200-500ms regardless of total response length. Without streaming, the user waits for the full 3-15 second generation. With streaming, the UI starts updating after that first 200-500ms. For large structured outputs (50+ fields), perceived latency drops by 80-90%.

Does streaming affect the quality of structured outputs?

No. The model generates the same tokens whether you stream or not. The difference is purely in delivery timing. Strict mode and constrained decoding still apply to the full generation; streaming just lets you observe the output incrementally.

Can I stream and validate simultaneously?

With Instructor's create_partial, each yielded object is a valid Pydantic instance with default values for incomplete fields. Full validation (including cross-field validators) only applies when the stream completes. During streaming, individual field types are validated as they appear, but model-level validators that depend on multiple fields wait until the end.


#Streaming #RealTime #JSONParsing #StructuredOutputs #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.