Skip to content
Learn Agentic AI11 min read0 views

LangChain Callbacks and Streaming: Real-Time Token Output and Event Hooks

Implement real-time streaming in LangChain applications with callback handlers for token-by-token output, custom event logging, cost tracking, and production monitoring hooks.

Why Streaming and Callbacks Matter

LLMs can take seconds to generate a full response. Without streaming, users stare at a blank screen waiting for the complete output. Streaming delivers tokens as they are generated, creating a responsive experience. Callbacks extend this further — they let you hook into every event in a chain's lifecycle for logging, monitoring, cost tracking, and custom integrations.

LangChain's callback system is deeply integrated into every component. Every Runnable — prompts, models, parsers, tools, retrievers — fires events that callbacks can intercept.

Basic Streaming

Every LCEL chain supports .stream() out of the box.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

chain = (
    ChatPromptTemplate.from_template("Write a poem about {topic}")
    | ChatOpenAI(model="gpt-4o-mini")
    | StrOutputParser()
)

# Stream tokens as they arrive
for chunk in chain.stream({"topic": "the ocean"}):
    print(chunk, end="", flush=True)

Each chunk is a small piece of the output — typically a few tokens. The flush=True ensures output appears immediately in the terminal. For async code, use astream:

async for chunk in chain.astream({"topic": "the ocean"}):
    print(chunk, end="", flush=True)

Streaming with FastAPI

In production, you often stream LLM output over HTTP using Server-Sent Events.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

app = FastAPI()

chain = (
    ChatPromptTemplate.from_template("Answer this question: {question}")
    | ChatOpenAI(model="gpt-4o-mini", streaming=True)
    | StrOutputParser()
)

@app.get("/stream")
async def stream_response(question: str):
    async def event_generator():
        async for chunk in chain.astream({"question": question}):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
    )

The client connects to /stream?question=... and receives tokens in real time via SSE.

Callback Handlers

Callbacks intercept lifecycle events across all LangChain components. The base class BaseCallbackHandler defines methods for every event type.

from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import BaseMessage

class LoggingHandler(BaseCallbackHandler):
    def on_llm_start(self, serialized, prompts, **kwargs):
        print(f"LLM started with {len(prompts)} prompts")

    def on_llm_new_token(self, token: str, **kwargs):
        print(f"Token: {repr(token)}")

    def on_llm_end(self, response, **kwargs):
        print(f"LLM finished. Tokens used: {response.llm_output}")

    def on_chain_start(self, serialized, inputs, **kwargs):
        print(f"Chain started: {serialized.get('name', 'unknown')}")

    def on_chain_end(self, outputs, **kwargs):
        print(f"Chain finished with keys: {list(outputs.keys())}")

    def on_tool_start(self, serialized, input_str, **kwargs):
        print(f"Tool called: {serialized.get('name')}")

    def on_tool_end(self, output, **kwargs):
        print(f"Tool returned: {output[:100]}")

Pass callbacks when invoking a chain:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

handler = LoggingHandler()
result = chain.invoke(
    {"topic": "AI"},
    config={"callbacks": [handler]},
)

Building a Cost Tracker

A practical callback example: tracking token usage and estimating costs.

from langchain_core.callbacks import BaseCallbackHandler

class CostTracker(BaseCallbackHandler):
    def __init__(self):
        self.total_prompt_tokens = 0
        self.total_completion_tokens = 0
        self.total_cost = 0.0

    # Pricing per 1M tokens (example for gpt-4o-mini)
    PRICING = {
        "gpt-4o-mini": {"prompt": 0.15, "completion": 0.60},
        "gpt-4o": {"prompt": 2.50, "completion": 10.00},
    }

    def on_llm_end(self, response, **kwargs):
        usage = response.llm_output.get("token_usage", {})
        prompt_tokens = usage.get("prompt_tokens", 0)
        completion_tokens = usage.get("completion_tokens", 0)

        self.total_prompt_tokens += prompt_tokens
        self.total_completion_tokens += completion_tokens

        model = response.llm_output.get("model_name", "gpt-4o-mini")
        prices = self.PRICING.get(model, self.PRICING["gpt-4o-mini"])

        cost = (
            prompt_tokens * prices["prompt"] / 1_000_000
            + completion_tokens * prices["completion"] / 1_000_000
        )
        self.total_cost += cost

    def report(self):
        return {
            "prompt_tokens": self.total_prompt_tokens,
            "completion_tokens": self.total_completion_tokens,
            "total_cost_usd": round(self.total_cost, 6),
        }

tracker = CostTracker()
result = chain.invoke({"topic": "AI"}, config={"callbacks": [tracker]})
print(tracker.report())
# {"prompt_tokens": 42, "completion_tokens": 187, "total_cost_usd": 0.000119}

Async Callback Handlers

For high-throughput applications, use AsyncCallbackHandler to avoid blocking the event loop.

from langchain_core.callbacks import AsyncCallbackHandler

class AsyncLogger(AsyncCallbackHandler):
    async def on_llm_start(self, serialized, prompts, **kwargs):
        await log_to_database("llm_start", prompts)

    async def on_llm_new_token(self, token: str, **kwargs):
        await websocket_broadcast(token)

    async def on_llm_end(self, response, **kwargs):
        await log_to_database("llm_end", response.llm_output)

Async handlers are essential when your callback logic involves I/O operations like database writes or WebSocket broadcasts.

astream_events: Fine-Grained Event Streaming

For maximum control, use astream_events to receive every event from every component in a chain.

async for event in chain.astream_events(
    {"topic": "machine learning"},
    version="v2",
):
    kind = event["event"]

    if kind == "on_chat_model_stream":
        # Individual tokens from the LLM
        print(event["data"]["chunk"].content, end="")
    elif kind == "on_retriever_end":
        # Retrieved documents
        docs = event["data"]["output"]
        print(f"\nRetrieved {len(docs)} documents")
    elif kind == "on_tool_end":
        # Tool results
        print(f"\nTool result: {event['data']['output']}")

This API gives you visibility into every internal step of a complex chain, including nested sub-chains and parallel branches.

Combining Multiple Handlers

You can attach multiple callback handlers to a single invocation.

result = chain.invoke(
    {"topic": "AI safety"},
    config={"callbacks": [
        CostTracker(),
        LoggingHandler(),
        AsyncLogger(),
    ]},
)

Each handler receives all events independently. This lets you compose monitoring concerns — one handler for costs, another for logging, a third for real-time streaming.

FAQ

What is the difference between .stream() and callbacks with on_llm_new_token?

.stream() yields chunks from the final output of the chain. on_llm_new_token fires for every token generated by the LLM, even if the chain has post-processing steps. Use .stream() for user-facing output and on_llm_new_token for internal monitoring.

Can I use callbacks without streaming?

Yes. Callbacks fire for all events regardless of whether you use .invoke(), .stream(), or .batch(). They are useful for logging, cost tracking, and monitoring even when you do not need streaming output.

How do I test callback handlers?

Create a handler instance, run a chain with it attached, then assert on the handler's state. For example, invoke a chain with the CostTracker handler and verify that total_prompt_tokens > 0. Mock the LLM for deterministic tests using FakeListChatModel from langchain_core.language_models.


#LangChain #Streaming #Callbacks #RealTime #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.