Skip to content
Learn Agentic AI11 min read0 views

Debugging Streaming Issues: Fixing Dropped Tokens, Connection Resets, and Partial Responses

Learn how to diagnose and fix common streaming problems in AI agents including dropped tokens, connection resets, partial responses, and timeout failures with practical debugging techniques.

Streaming Looks Simple Until It Breaks

Streaming LLM responses gives users instant feedback — tokens appear as they are generated instead of waiting for the full response. But streaming introduces a class of bugs that do not exist in non-streaming mode: dropped tokens, mid-stream disconnects, partial tool calls, and buffer corruption.

These bugs are insidious because they are often intermittent. The stream works perfectly for 99 conversations, then silently drops the last 50 tokens on the 100th. Users see a response that ends mid-sentence, and your logs might not capture what went wrong.

Building a Stream Diagnostic Wrapper

Wrap your streaming calls with diagnostics that track every chunk:

import asyncio
import time
from dataclasses import dataclass, field

@dataclass
class StreamDiagnostics:
    chunks_received: int = 0
    total_content_length: int = 0
    first_chunk_ms: float = 0
    last_chunk_ms: float = 0
    finish_reason: str | None = None
    errors: list[str] = field(default_factory=list)
    chunk_gaps: list[float] = field(default_factory=list)

async def debug_stream(client, messages, **kwargs):
    diag = StreamDiagnostics()
    start = time.perf_counter()
    last_chunk_time = start
    full_content = []

    try:
        stream = await client.chat.completions.create(
            messages=messages,
            stream=True,
            **kwargs,
        )

        async for chunk in stream:
            now = time.perf_counter()
            diag.chunks_received += 1

            if diag.chunks_received == 1:
                diag.first_chunk_ms = (now - start) * 1000

            gap = (now - last_chunk_time) * 1000
            diag.chunk_gaps.append(gap)
            last_chunk_time = now

            delta = chunk.choices[0].delta if chunk.choices else None
            if delta and delta.content:
                full_content.append(delta.content)
                diag.total_content_length += len(delta.content)

            if chunk.choices and chunk.choices[0].finish_reason:
                diag.finish_reason = chunk.choices[0].finish_reason

    except Exception as e:
        diag.errors.append(f"{type(e).__name__}: {e}")

    diag.last_chunk_ms = (time.perf_counter() - start) * 1000
    return "".join(full_content), diag

Detecting Dropped Tokens

Dropped tokens occur when chunks are lost in transit or when the client disconnects before the stream completes. Compare streaming output against a non-streaming request with the same input:

async def verify_stream_completeness(client, messages, model="gpt-4o"):
    # Get non-streaming response as baseline
    non_stream = await client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=0,
        stream=False,
    )
    baseline = non_stream.choices[0].message.content

    # Get streaming response
    streamed_content, diag = await debug_stream(
        client, messages, model=model, temperature=0,
    )

    # Compare
    match = baseline == streamed_content
    if not match:
        print(f"MISMATCH DETECTED")
        print(f"  Baseline length:  {len(baseline)}")
        print(f"  Streamed length:  {len(streamed_content)}")
        print(f"  Finish reason:    {diag.finish_reason}")
        # Find where they diverge
        for i, (a, b) in enumerate(zip(baseline, streamed_content)):
            if a != b:
                print(f"  First diff at char {i}: '{a}' vs '{b}'")
                break
    return match, diag

Handling Connection Timeouts

Long-running streams can be interrupted by proxy timeouts, load balancer idle limits, or client-side timeouts. Set appropriate timeouts and implement reconnection logic:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import httpx

async def resilient_stream(client, messages, **kwargs):
    max_retries = 3
    collected = []

    for attempt in range(max_retries):
        try:
            stream = await client.chat.completions.create(
                messages=messages,
                stream=True,
                timeout=httpx.Timeout(
                    connect=10.0,
                    read=60.0,    # Per-chunk read timeout
                    write=10.0,
                    pool=10.0,
                ),
                **kwargs,
            )
            async for chunk in stream:
                delta = chunk.choices[0].delta if chunk.choices else None
                if delta and delta.content:
                    collected.append(delta.content)
                    yield delta.content

            # Stream completed successfully
            return

        except (httpx.ReadTimeout, httpx.RemoteProtocolError) as e:
            print(f"Stream error on attempt {attempt + 1}: {e}")
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(1)

Buffering for Tool Call Streams

Tool calls in streaming mode arrive as fragments across multiple chunks. You need to buffer and assemble them before execution:

class ToolCallBuffer:
    def __init__(self):
        self.buffers: dict[int, dict] = {}

    def process_chunk(self, chunk):
        delta = chunk.choices[0].delta if chunk.choices else None
        if not delta or not delta.tool_calls:
            return None

        for tc_delta in delta.tool_calls:
            idx = tc_delta.index
            if idx not in self.buffers:
                self.buffers[idx] = {
                    "id": tc_delta.id or "",
                    "name": "",
                    "arguments": "",
                }
            if tc_delta.function:
                if tc_delta.function.name:
                    self.buffers[idx]["name"] = tc_delta.function.name
                if tc_delta.function.arguments:
                    self.buffers[idx]["arguments"] += tc_delta.function.arguments

        # Check if stream is done
        if chunk.choices[0].finish_reason == "tool_calls":
            return list(self.buffers.values())
        return None

FAQ

Why does my stream sometimes end without a finish_reason?

This usually indicates the connection was interrupted before the model completed its response. Common causes include proxy timeouts (Nginx default is 60 seconds), client-side timeout settings, or network instability. Check your reverse proxy configuration and increase read timeouts for LLM streaming endpoints.

How do I handle streaming when the model makes a tool call mid-response?

When streaming with tools enabled, the model may emit content tokens and then switch to emitting tool call deltas. Monitor the delta.tool_calls field on each chunk. Buffer the tool call fragments until you receive a finish_reason of tool_calls, then assemble and execute the complete tool call.

Should I disable streaming for agent workflows and only use it for final user-facing responses?

This is a common and effective pattern. Use non-streaming requests for internal agent reasoning and tool call cycles where latency per-turn matters less than reliability. Enable streaming only for the final response sent to the user where perceived latency matters most.


#Debugging #Streaming #WebSocket #AIAgents #Performance #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.