LLM Observability: Tracing, Logging, and Debugging AI Systems
A practical guide to implementing observability in LLM applications, covering distributed tracing for multi-step agents, structured logging, cost tracking, quality monitoring, and debugging production issues with tools like LangSmith, Langfuse, and custom solutions.
Why LLM Observability Is Different
Traditional application observability tracks request latency, error rates, and resource utilization. LLM applications need all of that plus a new dimension: output quality. A 200 OK response that contains a hallucinated answer is a failure that standard monitoring will miss.
LLM observability covers four pillars:
- Tracing: Following the complete execution path through multi-step agent workflows
- Quality monitoring: Detecting degradation in model output quality over time
- Cost tracking: Understanding and optimizing token usage and API spend
- Debugging: Reproducing and diagnosing issues in non-deterministic systems
Distributed Tracing for LLM Agents
An AI agent making three tool calls, two retrieval queries, and a final generation step is a distributed system. Each step can fail independently, and understanding the full execution path is essential for debugging.
OpenTelemetry-Based Tracing
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
import functools
# Initialize tracing
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("llm-agent")
def trace_llm_call(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
with tracer.start_as_current_span(
f"llm.{func.__name__}",
attributes={
"llm.model": kwargs.get("model", "unknown"),
"llm.max_tokens": kwargs.get("max_tokens", 0),
}
) as span:
try:
result = await func(*args, **kwargs)
span.set_attribute("llm.input_tokens", result.usage.input_tokens)
span.set_attribute("llm.output_tokens", result.usage.output_tokens)
span.set_attribute("llm.stop_reason", result.stop_reason)
return result
except Exception as e:
span.set_status(trace.StatusCode.ERROR, str(e))
span.record_exception(e)
raise
return wrapper
def trace_tool_call(tool_name: str):
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
with tracer.start_as_current_span(
f"tool.{tool_name}",
attributes={"tool.name": tool_name}
) as span:
result = await func(*args, **kwargs)
span.set_attribute("tool.result_length", len(str(result)))
return result
return wrapper
return decorator
def trace_retrieval(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
with tracer.start_as_current_span("retrieval") as span:
results = await func(*args, **kwargs)
span.set_attribute("retrieval.num_results", len(results))
span.set_attribute("retrieval.top_score",
results[0].score if results else 0)
return results
return wrapper
Agent Trace Structure
A typical agent trace looks like this:
[Agent Run: 2.3s] agent.handle_request
|-- [120ms] llm.plan_steps (input: 450 tokens, output: 180 tokens)
|-- [340ms] retrieval.search (query: "refund policy", results: 5)
|-- [45ms] tool.validate_order_id (order: #12345, result: valid)
|-- [890ms] llm.generate_response (input: 2100 tokens, output: 340 tokens)
|-- [15ms] output.filter (pii_detected: false)
Structured Logging for LLM Systems
Standard logging (logger.info("Generated response")) is nearly useless for debugging LLM issues. Structured logging captures the context needed for investigation:
import structlog
import hashlib
logger = structlog.get_logger()
class LLMLogger:
@staticmethod
async def log_request(
run_id: str,
model: str,
messages: list,
response,
duration_ms: float,
):
# Hash sensitive content for privacy
input_hash = hashlib.sha256(
str(messages).encode()
).hexdigest()[:12]
logger.info(
"llm.request",
run_id=run_id,
model=model,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
total_tokens=response.usage.input_tokens + response.usage.output_tokens,
duration_ms=round(duration_ms, 2),
stop_reason=response.stop_reason,
input_hash=input_hash,
num_messages=len(messages),
estimated_cost=calculate_cost(
model, response.usage.input_tokens, response.usage.output_tokens
),
)
@staticmethod
async def log_quality_issue(
run_id: str,
issue_type: str,
details: dict,
):
logger.warning(
"llm.quality_issue",
run_id=run_id,
issue_type=issue_type,
**details,
)
Cost Tracking and Optimization
LLM API costs can spiral without visibility. Build cost tracking into your observability layer:
# Pricing as of early 2026 (per million tokens)
MODEL_PRICING = {
"claude-sonnet-4-20250514": {"input": 3.0, "output": 15.0},
"claude-haiku-4-20250514": {"input": 0.80, "output": 4.0},
"claude-opus-4-20250514": {"input": 15.0, "output": 75.0},
"gpt-4o": {"input": 2.50, "output": 10.0},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
}
def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
return (
(input_tokens / 1_000_000) * pricing["input"] +
(output_tokens / 1_000_000) * pricing["output"]
)
class CostTracker:
def __init__(self, daily_budget: float = 100.0):
self.daily_budget = daily_budget
self.daily_spend = 0.0
self.hourly_spend = {}
def record(self, model: str, input_tokens: int, output_tokens: int):
cost = calculate_cost(model, input_tokens, output_tokens)
self.daily_spend += cost
hour = datetime.now().strftime("%H")
self.hourly_spend[hour] = self.hourly_spend.get(hour, 0) + cost
if self.daily_spend > self.daily_budget * 0.8:
logger.warning("cost.budget_warning",
daily_spend=self.daily_spend,
budget=self.daily_budget,
utilization=self.daily_spend / self.daily_budget)
return cost
Quality Monitoring
Automated Quality Checks
Run lightweight quality checks on every response:
class QualityMonitor:
def check_response(self, query: str, response: str, context: list[str]) -> dict:
checks = {
"length_adequate": len(response) > 50,
"not_refusal": not any(
phrase in response.lower()
for phrase in ["i cannot", "i'm unable", "i don't have"]
),
"no_hallucination_markers": not any(
phrase in response.lower()
for phrase in ["as an ai", "i don't have access", "my training data"]
),
"context_referenced": any(
# Check if response references the provided context
self._overlap_score(response, ctx) > 0.1
for ctx in context
) if context else True,
}
score = sum(checks.values()) / len(checks)
return {"checks": checks, "score": score, "passed": score >= 0.75}
Drift Detection
Model behavior changes over time due to provider updates, prompt changes, or data distribution shifts. Monitor for drift:
class DriftDetector:
def __init__(self, baseline_metrics: dict):
self.baseline = baseline_metrics
self.window_size = 100
self.recent_scores = []
def record(self, quality_score: float, latency_ms: float, tokens: int):
self.recent_scores.append({
"quality": quality_score,
"latency": latency_ms,
"tokens": tokens,
})
if len(self.recent_scores) >= self.window_size:
current = self._compute_metrics(self.recent_scores[-self.window_size:])
drift = self._detect_drift(self.baseline, current)
if drift:
logger.warning("quality.drift_detected", **drift)
self.recent_scores = self.recent_scores[-self.window_size:]
def _detect_drift(self, baseline, current) -> dict | None:
for metric in ["quality", "latency", "tokens"]:
baseline_val = baseline[metric]
current_val = current[metric]
pct_change = (current_val - baseline_val) / baseline_val
if abs(pct_change) > 0.15: # 15% threshold
return {
"metric": metric,
"baseline": baseline_val,
"current": current_val,
"pct_change": round(pct_change * 100, 1),
}
return None
Observability Tools Comparison
| Tool | Type | Strengths | Pricing |
|---|---|---|---|
| LangSmith | Managed | Deep LangChain integration, playground | Free tier + usage-based |
| Langfuse | Open Source | Self-hostable, model-agnostic | Free (self-hosted) or cloud |
| Arize Phoenix | Open Source | Evaluation-focused, embeddings viz | Free |
| Helicone | Managed | Simple proxy setup, cost tracking | Free tier + usage-based |
| Custom (OTel) | DIY | Full control, no vendor lock-in | Infrastructure costs |
Debugging Production Issues
The Replay Pattern
Store full request/response pairs so you can replay issues locally:
class RequestRecorder:
def __init__(self, storage):
self.storage = storage
async def record(self, run_id: str, messages: list, response, metadata: dict):
await self.storage.save({
"run_id": run_id,
"timestamp": datetime.utcnow().isoformat(),
"messages": messages,
"response": response.model_dump(),
"metadata": metadata,
})
async def replay(self, run_id: str, override_model: str = None):
"""Replay a recorded request, optionally with a different model"""
record = await self.storage.load(run_id)
model = override_model or record["metadata"]["model"]
return await client.messages.create(
model=model,
messages=record["messages"],
max_tokens=record["metadata"].get("max_tokens", 4096),
)
Common Debugging Scenarios
"The agent gave a wrong answer": Pull the full trace, check what context was retrieved, verify the retrieval was relevant, then examine if the generation step misused the context.
"Latency spiked": Check trace spans for which step slowed down. Common culprits: retrieval latency (index issues), model provider latency (check status pages), or excessive tool calls (loop detection).
"Costs jumped unexpectedly": Query hourly cost data. Look for context window bloat (messages array growing without summarization), retry loops, or a spike in traffic.
Key Takeaways
LLM observability is not optional for production systems. At minimum, implement structured logging with token counts and costs, distributed tracing for multi-step agents, automated quality checks on every response, and a request recording system for debugging. The investment pays for itself the first time you need to debug a production issue that would otherwise be invisible.
NYC News
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.