Skip to content
Learn Agentic AI11 min read0 views

LangGraph Streaming: Real-Time Node Updates and Token Streaming

Implement real-time streaming in LangGraph with stream modes for node-level updates, token-by-token LLM output, custom event streams, and practical patterns for responsive agent UIs.

Why Streaming Matters for Agents

Agent workflows can take seconds or even minutes to complete, especially when they involve multiple tool calls, web searches, or multi-step reasoning. Without streaming, users stare at a blank screen until the entire workflow finishes. Streaming gives users real-time visibility into what the agent is doing: which node is currently executing, what tokens the LLM is generating, and what intermediate results have been produced.

Stream Modes in LangGraph

LangGraph supports multiple stream modes that control what data gets emitted during execution:

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

class State(TypedDict):
    messages: Annotated[list, add_messages]

llm = ChatOpenAI(model="gpt-4o-mini")

def agent(state: State) -> dict:
    return {"messages": [llm.invoke(state["messages"])]}

builder = StateGraph(State)
builder.add_node("agent", agent)
builder.add_edge(START, "agent")
builder.add_edge("agent", END)
graph = builder.compile()

Values Mode: Full State After Each Node

The values stream mode emits the complete state after each node finishes:

for chunk in graph.stream(
    {"messages": [HumanMessage(content="Explain quantum computing")]},
    stream_mode="values",
):
    messages = chunk["messages"]
    print(f"State has {len(messages)} messages")
    print(f"Latest: {messages[-1].content[:80]}...")

This is useful when your UI needs to render the complete conversation state at each step.

Updates Mode: Node-Level Deltas

The updates stream mode emits only the changes each node makes:

for node_name, update in graph.stream(
    {"messages": [HumanMessage(content="What is LangGraph?")]},
    stream_mode="updates",
):
    print(f"Node '{node_name}' produced:")
    if "messages" in update:
        for msg in update["messages"]:
            print(f"  {msg.content[:80]}...")

This is more efficient than values mode because you only receive the delta, not the entire accumulated state.

Token-Level Streaming with astream_events

For token-by-token output from the LLM, use the events streaming API:

import asyncio

async def stream_tokens():
    async for event in graph.astream_events(
        {"messages": [HumanMessage(content="Write a poem about AI")]},
        version="v2",
    ):
        if event["event"] == "on_chat_model_stream":
            token = event["data"]["chunk"].content
            if token:
                print(token, end="", flush=True)

asyncio.run(stream_tokens())

The on_chat_model_stream event fires for every token the LLM generates. This gives users the familiar ChatGPT-style typing effect even within complex multi-node workflows.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Filtering Events by Node

In multi-node graphs, you often want to stream tokens only from specific nodes:

async def stream_final_response():
    async for event in graph.astream_events(
        {"messages": [HumanMessage(content="Help me plan a trip")]},
        version="v2",
    ):
        kind = event["event"]
        tags = event.get("tags", [])

        # Only stream tokens from the 'respond' node
        if kind == "on_chat_model_stream" and "respond_node" in tags:
            token = event["data"]["chunk"].content
            if token:
                print(token, end="", flush=True)

Tag your nodes to filter events effectively. Add tags when defining nodes:

def respond(state: State) -> dict:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

builder.add_node("respond", respond, metadata={"tags": ["respond_node"]})

Streaming Multiple Modes Simultaneously

You can combine stream modes to get both state updates and token streams:

for event in graph.stream(
    {"messages": [HumanMessage(content="Analyze this data")]},
    stream_mode=["updates", "messages"],
):
    if isinstance(event, tuple):
        mode, data = event
        if mode == "messages":
            msg_chunk, metadata = data
            print(f"Token: {msg_chunk.content}", end="")
        elif mode == "updates":
            print(f"\nNode update: {data}")

This is particularly useful for building rich UIs that show both progress indicators for node transitions and streaming text for LLM output.

Practical Streaming Pattern for Web APIs

Here is how to wire LangGraph streaming into a FastAPI server-sent events endpoint:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def event_generator(query: str):
    async for event in graph.astream_events(
        {"messages": [HumanMessage(content=query)]},
        version="v2",
    ):
        if event["event"] == "on_chat_model_stream":
            token = event["data"]["chunk"].content
            if token:
                yield f"data: {token}\n\n"
    yield "data: [DONE]\n\n"

@app.get("/stream")
async def stream_endpoint(q: str):
    return StreamingResponse(
        event_generator(q),
        media_type="text/event-stream",
    )

This lets frontend clients consume the agent's output in real time using standard SSE.

FAQ

What is the difference between stream() and astream_events()?

stream() emits state-level updates (after each node completes). astream_events() emits fine-grained events including individual LLM tokens, tool calls, and chain starts/ends. Use stream() for node-level progress and astream_events() for token-level output.

Does streaming work with checkpointing?

Yes. Streaming and checkpointing are independent features. You can stream a checkpointed graph and state will be persisted at each node regardless of whether the output is streamed or collected.

Can I stream from a graph running in LangGraph Cloud?

Yes. LangGraph Cloud exposes streaming endpoints that emit server-sent events. The client SDK provides methods to consume these streams, giving you the same streaming experience as local execution but with managed infrastructure.


#LangGraph #Streaming #RealTime #TokenStreaming #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.