Skip to content
Back to Blog
Agentic AI6 min read

LLM Observability: Tracing, Logging, and Debugging AI Systems

A practical guide to implementing observability in LLM applications, covering distributed tracing for multi-step agents, structured logging, cost tracking, quality monitoring, and debugging production issues with tools like LangSmith, Langfuse, and custom solutions.

Why LLM Observability Is Different

Traditional application observability tracks request latency, error rates, and resource utilization. LLM applications need all of that plus a new dimension: output quality. A 200 OK response that contains a hallucinated answer is a failure that standard monitoring will miss.

LLM observability covers four pillars:

  1. Tracing: Following the complete execution path through multi-step agent workflows
  2. Quality monitoring: Detecting degradation in model output quality over time
  3. Cost tracking: Understanding and optimizing token usage and API spend
  4. Debugging: Reproducing and diagnosing issues in non-deterministic systems

Distributed Tracing for LLM Agents

An AI agent making three tool calls, two retrieval queries, and a final generation step is a distributed system. Each step can fail independently, and understanding the full execution path is essential for debugging.

OpenTelemetry-Based Tracing

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
import functools

# Initialize tracing
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("llm-agent")

def trace_llm_call(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        with tracer.start_as_current_span(
            f"llm.{func.__name__}",
            attributes={
                "llm.model": kwargs.get("model", "unknown"),
                "llm.max_tokens": kwargs.get("max_tokens", 0),
            }
        ) as span:
            try:
                result = await func(*args, **kwargs)
                span.set_attribute("llm.input_tokens", result.usage.input_tokens)
                span.set_attribute("llm.output_tokens", result.usage.output_tokens)
                span.set_attribute("llm.stop_reason", result.stop_reason)
                return result
            except Exception as e:
                span.set_status(trace.StatusCode.ERROR, str(e))
                span.record_exception(e)
                raise
    return wrapper

def trace_tool_call(tool_name: str):
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            with tracer.start_as_current_span(
                f"tool.{tool_name}",
                attributes={"tool.name": tool_name}
            ) as span:
                result = await func(*args, **kwargs)
                span.set_attribute("tool.result_length", len(str(result)))
                return result
        return wrapper
    return decorator

def trace_retrieval(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        with tracer.start_as_current_span("retrieval") as span:
            results = await func(*args, **kwargs)
            span.set_attribute("retrieval.num_results", len(results))
            span.set_attribute("retrieval.top_score",
                             results[0].score if results else 0)
            return results
    return wrapper

Agent Trace Structure

A typical agent trace looks like this:

[Agent Run: 2.3s] agent.handle_request
  |-- [120ms] llm.plan_steps          (input: 450 tokens, output: 180 tokens)
  |-- [340ms] retrieval.search         (query: "refund policy", results: 5)
  |-- [45ms]  tool.validate_order_id   (order: #12345, result: valid)
  |-- [890ms] llm.generate_response    (input: 2100 tokens, output: 340 tokens)
  |-- [15ms]  output.filter            (pii_detected: false)

Structured Logging for LLM Systems

Standard logging (logger.info("Generated response")) is nearly useless for debugging LLM issues. Structured logging captures the context needed for investigation:

import structlog
import hashlib

logger = structlog.get_logger()

class LLMLogger:
    @staticmethod
    async def log_request(
        run_id: str,
        model: str,
        messages: list,
        response,
        duration_ms: float,
    ):
        # Hash sensitive content for privacy
        input_hash = hashlib.sha256(
            str(messages).encode()
        ).hexdigest()[:12]

        logger.info(
            "llm.request",
            run_id=run_id,
            model=model,
            input_tokens=response.usage.input_tokens,
            output_tokens=response.usage.output_tokens,
            total_tokens=response.usage.input_tokens + response.usage.output_tokens,
            duration_ms=round(duration_ms, 2),
            stop_reason=response.stop_reason,
            input_hash=input_hash,
            num_messages=len(messages),
            estimated_cost=calculate_cost(
                model, response.usage.input_tokens, response.usage.output_tokens
            ),
        )

    @staticmethod
    async def log_quality_issue(
        run_id: str,
        issue_type: str,
        details: dict,
    ):
        logger.warning(
            "llm.quality_issue",
            run_id=run_id,
            issue_type=issue_type,
            **details,
        )

Cost Tracking and Optimization

LLM API costs can spiral without visibility. Build cost tracking into your observability layer:

# Pricing as of early 2026 (per million tokens)
MODEL_PRICING = {
    "claude-sonnet-4-20250514": {"input": 3.0, "output": 15.0},
    "claude-haiku-4-20250514": {"input": 0.80, "output": 4.0},
    "claude-opus-4-20250514": {"input": 15.0, "output": 75.0},
    "gpt-4o": {"input": 2.50, "output": 10.0},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
}

def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
    return (
        (input_tokens / 1_000_000) * pricing["input"] +
        (output_tokens / 1_000_000) * pricing["output"]
    )

class CostTracker:
    def __init__(self, daily_budget: float = 100.0):
        self.daily_budget = daily_budget
        self.daily_spend = 0.0
        self.hourly_spend = {}

    def record(self, model: str, input_tokens: int, output_tokens: int):
        cost = calculate_cost(model, input_tokens, output_tokens)
        self.daily_spend += cost

        hour = datetime.now().strftime("%H")
        self.hourly_spend[hour] = self.hourly_spend.get(hour, 0) + cost

        if self.daily_spend > self.daily_budget * 0.8:
            logger.warning("cost.budget_warning",
                          daily_spend=self.daily_spend,
                          budget=self.daily_budget,
                          utilization=self.daily_spend / self.daily_budget)

        return cost

Quality Monitoring

Automated Quality Checks

Run lightweight quality checks on every response:

class QualityMonitor:
    def check_response(self, query: str, response: str, context: list[str]) -> dict:
        checks = {
            "length_adequate": len(response) > 50,
            "not_refusal": not any(
                phrase in response.lower()
                for phrase in ["i cannot", "i'm unable", "i don't have"]
            ),
            "no_hallucination_markers": not any(
                phrase in response.lower()
                for phrase in ["as an ai", "i don't have access", "my training data"]
            ),
            "context_referenced": any(
                # Check if response references the provided context
                self._overlap_score(response, ctx) > 0.1
                for ctx in context
            ) if context else True,
        }

        score = sum(checks.values()) / len(checks)
        return {"checks": checks, "score": score, "passed": score >= 0.75}

Drift Detection

Model behavior changes over time due to provider updates, prompt changes, or data distribution shifts. Monitor for drift:

class DriftDetector:
    def __init__(self, baseline_metrics: dict):
        self.baseline = baseline_metrics
        self.window_size = 100
        self.recent_scores = []

    def record(self, quality_score: float, latency_ms: float, tokens: int):
        self.recent_scores.append({
            "quality": quality_score,
            "latency": latency_ms,
            "tokens": tokens,
        })

        if len(self.recent_scores) >= self.window_size:
            current = self._compute_metrics(self.recent_scores[-self.window_size:])
            drift = self._detect_drift(self.baseline, current)
            if drift:
                logger.warning("quality.drift_detected", **drift)
            self.recent_scores = self.recent_scores[-self.window_size:]

    def _detect_drift(self, baseline, current) -> dict | None:
        for metric in ["quality", "latency", "tokens"]:
            baseline_val = baseline[metric]
            current_val = current[metric]
            pct_change = (current_val - baseline_val) / baseline_val
            if abs(pct_change) > 0.15:  # 15% threshold
                return {
                    "metric": metric,
                    "baseline": baseline_val,
                    "current": current_val,
                    "pct_change": round(pct_change * 100, 1),
                }
        return None

Observability Tools Comparison

Tool Type Strengths Pricing
LangSmith Managed Deep LangChain integration, playground Free tier + usage-based
Langfuse Open Source Self-hostable, model-agnostic Free (self-hosted) or cloud
Arize Phoenix Open Source Evaluation-focused, embeddings viz Free
Helicone Managed Simple proxy setup, cost tracking Free tier + usage-based
Custom (OTel) DIY Full control, no vendor lock-in Infrastructure costs

Debugging Production Issues

The Replay Pattern

Store full request/response pairs so you can replay issues locally:

class RequestRecorder:
    def __init__(self, storage):
        self.storage = storage

    async def record(self, run_id: str, messages: list, response, metadata: dict):
        await self.storage.save({
            "run_id": run_id,
            "timestamp": datetime.utcnow().isoformat(),
            "messages": messages,
            "response": response.model_dump(),
            "metadata": metadata,
        })

    async def replay(self, run_id: str, override_model: str = None):
        """Replay a recorded request, optionally with a different model"""
        record = await self.storage.load(run_id)
        model = override_model or record["metadata"]["model"]
        return await client.messages.create(
            model=model,
            messages=record["messages"],
            max_tokens=record["metadata"].get("max_tokens", 4096),
        )

Common Debugging Scenarios

  1. "The agent gave a wrong answer": Pull the full trace, check what context was retrieved, verify the retrieval was relevant, then examine if the generation step misused the context.

  2. "Latency spiked": Check trace spans for which step slowed down. Common culprits: retrieval latency (index issues), model provider latency (check status pages), or excessive tool calls (loop detection).

  3. "Costs jumped unexpectedly": Query hourly cost data. Look for context window bloat (messages array growing without summarization), retry loops, or a spike in traffic.

Key Takeaways

LLM observability is not optional for production systems. At minimum, implement structured logging with token counts and costs, distributed tracing for multi-step agents, automated quality checks on every response, and a request recording system for debugging. The investment pays for itself the first time you need to debug a production issue that would otherwise be invisible.

Share this article
N

NYC News

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.