LangChain Callbacks and Streaming: Real-Time Token Output and Event Hooks
Implement real-time streaming in LangChain applications with callback handlers for token-by-token output, custom event logging, cost tracking, and production monitoring hooks.
Why Streaming and Callbacks Matter
LLMs can take seconds to generate a full response. Without streaming, users stare at a blank screen waiting for the complete output. Streaming delivers tokens as they are generated, creating a responsive experience. Callbacks extend this further — they let you hook into every event in a chain's lifecycle for logging, monitoring, cost tracking, and custom integrations.
LangChain's callback system is deeply integrated into every component. Every Runnable — prompts, models, parsers, tools, retrievers — fires events that callbacks can intercept.
Basic Streaming
Every LCEL chain supports .stream() out of the box.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
chain = (
ChatPromptTemplate.from_template("Write a poem about {topic}")
| ChatOpenAI(model="gpt-4o-mini")
| StrOutputParser()
)
# Stream tokens as they arrive
for chunk in chain.stream({"topic": "the ocean"}):
print(chunk, end="", flush=True)
Each chunk is a small piece of the output — typically a few tokens. The flush=True ensures output appears immediately in the terminal. For async code, use astream:
async for chunk in chain.astream({"topic": "the ocean"}):
print(chunk, end="", flush=True)
Streaming with FastAPI
In production, you often stream LLM output over HTTP using Server-Sent Events.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
app = FastAPI()
chain = (
ChatPromptTemplate.from_template("Answer this question: {question}")
| ChatOpenAI(model="gpt-4o-mini", streaming=True)
| StrOutputParser()
)
@app.get("/stream")
async def stream_response(question: str):
async def event_generator():
async for chunk in chain.astream({"question": question}):
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
)
The client connects to /stream?question=... and receives tokens in real time via SSE.
Callback Handlers
Callbacks intercept lifecycle events across all LangChain components. The base class BaseCallbackHandler defines methods for every event type.
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import BaseMessage
class LoggingHandler(BaseCallbackHandler):
def on_llm_start(self, serialized, prompts, **kwargs):
print(f"LLM started with {len(prompts)} prompts")
def on_llm_new_token(self, token: str, **kwargs):
print(f"Token: {repr(token)}")
def on_llm_end(self, response, **kwargs):
print(f"LLM finished. Tokens used: {response.llm_output}")
def on_chain_start(self, serialized, inputs, **kwargs):
print(f"Chain started: {serialized.get('name', 'unknown')}")
def on_chain_end(self, outputs, **kwargs):
print(f"Chain finished with keys: {list(outputs.keys())}")
def on_tool_start(self, serialized, input_str, **kwargs):
print(f"Tool called: {serialized.get('name')}")
def on_tool_end(self, output, **kwargs):
print(f"Tool returned: {output[:100]}")
Pass callbacks when invoking a chain:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
handler = LoggingHandler()
result = chain.invoke(
{"topic": "AI"},
config={"callbacks": [handler]},
)
Building a Cost Tracker
A practical callback example: tracking token usage and estimating costs.
from langchain_core.callbacks import BaseCallbackHandler
class CostTracker(BaseCallbackHandler):
def __init__(self):
self.total_prompt_tokens = 0
self.total_completion_tokens = 0
self.total_cost = 0.0
# Pricing per 1M tokens (example for gpt-4o-mini)
PRICING = {
"gpt-4o-mini": {"prompt": 0.15, "completion": 0.60},
"gpt-4o": {"prompt": 2.50, "completion": 10.00},
}
def on_llm_end(self, response, **kwargs):
usage = response.llm_output.get("token_usage", {})
prompt_tokens = usage.get("prompt_tokens", 0)
completion_tokens = usage.get("completion_tokens", 0)
self.total_prompt_tokens += prompt_tokens
self.total_completion_tokens += completion_tokens
model = response.llm_output.get("model_name", "gpt-4o-mini")
prices = self.PRICING.get(model, self.PRICING["gpt-4o-mini"])
cost = (
prompt_tokens * prices["prompt"] / 1_000_000
+ completion_tokens * prices["completion"] / 1_000_000
)
self.total_cost += cost
def report(self):
return {
"prompt_tokens": self.total_prompt_tokens,
"completion_tokens": self.total_completion_tokens,
"total_cost_usd": round(self.total_cost, 6),
}
tracker = CostTracker()
result = chain.invoke({"topic": "AI"}, config={"callbacks": [tracker]})
print(tracker.report())
# {"prompt_tokens": 42, "completion_tokens": 187, "total_cost_usd": 0.000119}
Async Callback Handlers
For high-throughput applications, use AsyncCallbackHandler to avoid blocking the event loop.
from langchain_core.callbacks import AsyncCallbackHandler
class AsyncLogger(AsyncCallbackHandler):
async def on_llm_start(self, serialized, prompts, **kwargs):
await log_to_database("llm_start", prompts)
async def on_llm_new_token(self, token: str, **kwargs):
await websocket_broadcast(token)
async def on_llm_end(self, response, **kwargs):
await log_to_database("llm_end", response.llm_output)
Async handlers are essential when your callback logic involves I/O operations like database writes or WebSocket broadcasts.
astream_events: Fine-Grained Event Streaming
For maximum control, use astream_events to receive every event from every component in a chain.
async for event in chain.astream_events(
{"topic": "machine learning"},
version="v2",
):
kind = event["event"]
if kind == "on_chat_model_stream":
# Individual tokens from the LLM
print(event["data"]["chunk"].content, end="")
elif kind == "on_retriever_end":
# Retrieved documents
docs = event["data"]["output"]
print(f"\nRetrieved {len(docs)} documents")
elif kind == "on_tool_end":
# Tool results
print(f"\nTool result: {event['data']['output']}")
This API gives you visibility into every internal step of a complex chain, including nested sub-chains and parallel branches.
Combining Multiple Handlers
You can attach multiple callback handlers to a single invocation.
result = chain.invoke(
{"topic": "AI safety"},
config={"callbacks": [
CostTracker(),
LoggingHandler(),
AsyncLogger(),
]},
)
Each handler receives all events independently. This lets you compose monitoring concerns — one handler for costs, another for logging, a third for real-time streaming.
FAQ
What is the difference between .stream() and callbacks with on_llm_new_token?
.stream() yields chunks from the final output of the chain. on_llm_new_token fires for every token generated by the LLM, even if the chain has post-processing steps. Use .stream() for user-facing output and on_llm_new_token for internal monitoring.
Can I use callbacks without streaming?
Yes. Callbacks fire for all events regardless of whether you use .invoke(), .stream(), or .batch(). They are useful for logging, cost tracking, and monitoring even when you do not need streaming output.
How do I test callback handlers?
Create a handler instance, run a chain with it attached, then assert on the handler's state. For example, invoke a chain with the CostTracker handler and verify that total_prompt_tokens > 0. Mock the LLM for deterministic tests using FakeListChatModel from langchain_core.language_models.
#LangChain #Streaming #Callbacks #RealTime #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.