Skip to content
Learn Agentic AI12 min read0 views

Continuous Evaluation in Production: Real-Time Quality Monitoring for Deployed Agents

Learn how to implement continuous evaluation for production AI agents with sampling strategies, real-time quality dashboards, alerting on quality degradation, and feedback loops that drive iterative improvement.

Why Pre-Deployment Testing Is Not Enough

Your evaluation dataset covers the scenarios you anticipated. Production covers everything else. Users phrase things in ways you never imagined. Edge cases compound in sequences you never tested. Upstream model providers push silent updates that shift behavior. A model that passed your evaluation suite last week can degrade this week without any change on your end.

Continuous evaluation in production bridges the gap between controlled testing and real-world performance. It samples live conversations, scores them automatically, and alerts you before quality drops become customer complaints.

Designing a Sampling Strategy

You cannot evaluate every conversation in production — the cost of LLM-as-judge scoring would exceed the cost of the agent itself. Strategic sampling gives you statistical confidence at a fraction of the cost.

import random
import hashlib
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional

@dataclass
class SamplingConfig:
    base_rate: float = 0.05  # 5% of conversations
    boost_rate: float = 0.25  # 25% for flagged patterns
    boost_triggers: list[str] = field(
        default_factory=lambda: [
            "user_thumbs_down",
            "escalation_requested",
            "high_token_count",
            "tool_error",
        ]
    )
    min_daily_samples: int = 100
    max_daily_samples: int = 5000

class ProductionSampler:
    def __init__(self, config: SamplingConfig):
        self.config = config
        self.daily_count = 0
        self.last_reset = datetime.utcnow().date()

    def _reset_if_new_day(self):
        today = datetime.utcnow().date()
        if today > self.last_reset:
            self.daily_count = 0
            self.last_reset = today

    def should_sample(
        self, conversation_id: str, signals: dict = None
    ) -> bool:
        self._reset_if_new_day()

        if self.daily_count >= self.config.max_daily_samples:
            return False

        # Deterministic hash for reproducibility
        hash_val = int(
            hashlib.md5(
                conversation_id.encode()
            ).hexdigest()[:8],
            16,
        )
        threshold = hash_val / 0xFFFFFFFF

        signals = signals or {}
        has_trigger = any(
            signals.get(t, False)
            for t in self.config.boost_triggers
        )
        rate = (
            self.config.boost_rate
            if has_trigger
            else self.config.base_rate
        )

        # Boost if below minimum daily target
        hours_elapsed = max(
            1, datetime.utcnow().hour
        )
        expected = (
            self.config.min_daily_samples
            * hours_elapsed / 24
        )
        if self.daily_count < expected * 0.5:
            rate = min(rate * 2, 0.5)

        if threshold < rate:
            self.daily_count += 1
            return True
        return False

The deterministic hash ensures the same conversation always gets the same sampling decision, which matters for debugging. Boost sampling on negative signals — conversations where users express dissatisfaction, where escalations happen, or where tools error out. These are exactly the conversations you need to evaluate most.

Real-Time Quality Scoring Pipeline

Build an asynchronous pipeline that evaluates sampled conversations without blocking the user experience.

import asyncio
from collections import deque

@dataclass
class QualityScore:
    conversation_id: str
    timestamp: str
    scores: dict  # e.g., {"coherence": 4, "task_completion": 0.8}
    flags: list[str] = field(default_factory=list)

class OnlineEvaluationPipeline:
    def __init__(self, scoring_functions: list, queue_size: int = 1000):
        self.scorers = scoring_functions
        self.queue: asyncio.Queue = asyncio.Queue(
            maxsize=queue_size
        )
        self.results: deque = deque(maxlen=10000)
        self._running = False

    async def submit(self, conversation: dict):
        try:
            self.queue.put_nowait(conversation)
        except asyncio.QueueFull:
            pass  # Drop if pipeline is backed up

    async def _process(self):
        while self._running:
            try:
                conversation = await asyncio.wait_for(
                    self.queue.get(), timeout=5.0
                )
            except asyncio.TimeoutError:
                continue

            scores = {}
            flags = []
            for scorer in self.scorers:
                try:
                    result = await scorer(conversation)
                    scores.update(result.get("scores", {}))
                    flags.extend(result.get("flags", []))
                except Exception as e:
                    flags.append(f"scorer_error:{e}")

            quality_score = QualityScore(
                conversation_id=conversation["id"],
                timestamp=datetime.utcnow().isoformat(),
                scores=scores,
                flags=flags,
            )
            self.results.append(quality_score)
            self.queue.task_done()

    async def start(self, workers: int = 3):
        self._running = True
        tasks = [
            asyncio.create_task(self._process())
            for _ in range(workers)
        ]
        return tasks

    async def stop(self):
        self._running = False

Multiple workers process the queue in parallel. If the queue fills up, new submissions are dropped rather than blocking the agent — monitoring should never degrade the user experience.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Building Quality Dashboards

Aggregate scores into time-windowed views that reveal trends and anomalies.

from collections import defaultdict
from typing import Sequence

class QualityDashboard:
    def __init__(self, window_minutes: int = 60):
        self.window_minutes = window_minutes
        self.scores: list[QualityScore] = []

    def add_score(self, score: QualityScore):
        self.scores.append(score)

    def _recent_scores(self) -> list[QualityScore]:
        cutoff = (
            datetime.utcnow()
            - timedelta(minutes=self.window_minutes)
        )
        cutoff_str = cutoff.isoformat()
        return [
            s for s in self.scores
            if s.timestamp >= cutoff_str
        ]

    def current_metrics(self) -> dict:
        recent = self._recent_scores()
        if not recent:
            return {"status": "no_data"}

        metric_values = defaultdict(list)
        all_flags = []
        for score in recent:
            for key, value in score.scores.items():
                if isinstance(value, (int, float)):
                    metric_values[key].append(value)
            all_flags.extend(score.flags)

        metrics = {}
        for key, values in metric_values.items():
            metrics[key] = {
                "mean": round(sum(values) / len(values), 3),
                "min": round(min(values), 3),
                "max": round(max(values), 3),
                "count": len(values),
            }

        # Flag frequency
        flag_counts = defaultdict(int)
        for flag in all_flags:
            flag_counts[flag] += 1

        return {
            "window_minutes": self.window_minutes,
            "conversations_evaluated": len(recent),
            "metrics": metrics,
            "top_flags": dict(
                sorted(
                    flag_counts.items(),
                    key=lambda x: -x[1],
                )[:10]
            ),
        }

    def compare_windows(
        self, current_minutes: int = 60, baseline_minutes: int = 1440
    ) -> dict:
        now = datetime.utcnow()
        current_cutoff = (
            now - timedelta(minutes=current_minutes)
        ).isoformat()
        baseline_cutoff = (
            now - timedelta(minutes=baseline_minutes)
        ).isoformat()

        current = [
            s for s in self.scores
            if s.timestamp >= current_cutoff
        ]
        baseline = [
            s for s in self.scores
            if baseline_cutoff <= s.timestamp < current_cutoff
        ]

        def avg_metric(scores, key):
            vals = [
                s.scores.get(key, 0)
                for s in scores
                if isinstance(s.scores.get(key), (int, float))
            ]
            return sum(vals) / len(vals) if vals else 0

        # Compare common metrics
        all_keys = set()
        for s in current + baseline:
            all_keys.update(s.scores.keys())

        comparison = {}
        for key in all_keys:
            curr_avg = avg_metric(current, key)
            base_avg = avg_metric(baseline, key)
            delta = curr_avg - base_avg
            comparison[key] = {
                "current": round(curr_avg, 3),
                "baseline": round(base_avg, 3),
                "delta": round(delta, 3),
                "degraded": delta < -0.1,
            }

        return comparison

The compare_windows method is your early warning system. It compares the last hour against the last 24 hours. When a metric's delta turns significantly negative, something changed — a model update, a traffic pattern shift, or a bug.

Alerting on Quality Degradation

Convert dashboard data into actionable alerts.

@dataclass
class AlertRule:
    metric: str
    threshold: float
    comparison: str  # "below", "above"
    severity: str  # "warning", "critical"
    message_template: str

class QualityAlertManager:
    def __init__(self):
        self.rules: list[AlertRule] = []
        self.active_alerts: list[dict] = []

    def add_rule(self, rule: AlertRule):
        self.rules.append(rule)

    def evaluate(self, metrics: dict) -> list[dict]:
        triggered = []
        for rule in self.rules:
            metric_data = metrics.get("metrics", {}).get(
                rule.metric, {}
            )
            value = metric_data.get("mean")
            if value is None:
                continue

            fire = (
                (rule.comparison == "below" and value < rule.threshold)
                or (rule.comparison == "above" and value > rule.threshold)
            )
            if fire:
                alert = {
                    "metric": rule.metric,
                    "value": value,
                    "threshold": rule.threshold,
                    "severity": rule.severity,
                    "message": rule.message_template.format(
                        metric=rule.metric,
                        value=value,
                        threshold=rule.threshold,
                    ),
                    "timestamp": datetime.utcnow().isoformat(),
                }
                triggered.append(alert)

        self.active_alerts = triggered
        return triggered

# Configure alerts
alert_mgr = QualityAlertManager()
alert_mgr.add_rule(AlertRule(
    metric="task_completion",
    threshold=0.7,
    comparison="below",
    severity="critical",
    message_template="Task completion dropped to {value:.1%}, below {threshold:.1%} threshold",
))
alert_mgr.add_rule(AlertRule(
    metric="coherence",
    threshold=3.0,
    comparison="below",
    severity="warning",
    message_template="Coherence score at {value:.1f}, below {threshold:.1f} minimum",
))

Closing the Feedback Loop

The final piece is feeding production evaluation results back into your offline evaluation datasets. Conversations that score poorly in production become new test cases. Patterns that trigger alerts become new red team samples. This creates a virtuous cycle where your evaluation dataset grows smarter over time, reflecting the actual failure modes of your deployed agent rather than the failures you imagined during development.

FAQ

How much does continuous production evaluation cost?

At a 5 percent sampling rate with LLM-as-judge scoring, expect to spend 2 to 5 percent of your agent's total LLM cost on evaluation. For a system spending 10,000 dollars a month on agent inference, that is 200 to 500 dollars for continuous monitoring. Deterministic checks are essentially free, so maximize those and use LLM judges selectively for quality dimensions that require language understanding.

How do I avoid alert fatigue from too many false positives?

Start with conservative thresholds that only fire on genuine quality drops. Require sustained degradation — the metric must be below threshold for 15 minutes, not just a single sample. Group related alerts together so a single root cause does not generate five separate alerts. Review and tune thresholds monthly based on actual incident correlation.

Should I evaluate the same conversation multiple times with different judges?

For production monitoring, one evaluation pass is sufficient — you need speed and cost efficiency. For conversations flagged as potential quality issues, run a second evaluation with a different judge model to confirm. This two-tier approach keeps costs low while reducing false positives on the cases that might trigger engineering action.


#ProductionMonitoring #ContinuousEvaluation #Observability #Alerting #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.