Continuous Evaluation in Production: Real-Time Quality Monitoring for Deployed Agents
Learn how to implement continuous evaluation for production AI agents with sampling strategies, real-time quality dashboards, alerting on quality degradation, and feedback loops that drive iterative improvement.
Why Pre-Deployment Testing Is Not Enough
Your evaluation dataset covers the scenarios you anticipated. Production covers everything else. Users phrase things in ways you never imagined. Edge cases compound in sequences you never tested. Upstream model providers push silent updates that shift behavior. A model that passed your evaluation suite last week can degrade this week without any change on your end.
Continuous evaluation in production bridges the gap between controlled testing and real-world performance. It samples live conversations, scores them automatically, and alerts you before quality drops become customer complaints.
Designing a Sampling Strategy
You cannot evaluate every conversation in production — the cost of LLM-as-judge scoring would exceed the cost of the agent itself. Strategic sampling gives you statistical confidence at a fraction of the cost.
import random
import hashlib
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
@dataclass
class SamplingConfig:
base_rate: float = 0.05 # 5% of conversations
boost_rate: float = 0.25 # 25% for flagged patterns
boost_triggers: list[str] = field(
default_factory=lambda: [
"user_thumbs_down",
"escalation_requested",
"high_token_count",
"tool_error",
]
)
min_daily_samples: int = 100
max_daily_samples: int = 5000
class ProductionSampler:
def __init__(self, config: SamplingConfig):
self.config = config
self.daily_count = 0
self.last_reset = datetime.utcnow().date()
def _reset_if_new_day(self):
today = datetime.utcnow().date()
if today > self.last_reset:
self.daily_count = 0
self.last_reset = today
def should_sample(
self, conversation_id: str, signals: dict = None
) -> bool:
self._reset_if_new_day()
if self.daily_count >= self.config.max_daily_samples:
return False
# Deterministic hash for reproducibility
hash_val = int(
hashlib.md5(
conversation_id.encode()
).hexdigest()[:8],
16,
)
threshold = hash_val / 0xFFFFFFFF
signals = signals or {}
has_trigger = any(
signals.get(t, False)
for t in self.config.boost_triggers
)
rate = (
self.config.boost_rate
if has_trigger
else self.config.base_rate
)
# Boost if below minimum daily target
hours_elapsed = max(
1, datetime.utcnow().hour
)
expected = (
self.config.min_daily_samples
* hours_elapsed / 24
)
if self.daily_count < expected * 0.5:
rate = min(rate * 2, 0.5)
if threshold < rate:
self.daily_count += 1
return True
return False
The deterministic hash ensures the same conversation always gets the same sampling decision, which matters for debugging. Boost sampling on negative signals — conversations where users express dissatisfaction, where escalations happen, or where tools error out. These are exactly the conversations you need to evaluate most.
Real-Time Quality Scoring Pipeline
Build an asynchronous pipeline that evaluates sampled conversations without blocking the user experience.
import asyncio
from collections import deque
@dataclass
class QualityScore:
conversation_id: str
timestamp: str
scores: dict # e.g., {"coherence": 4, "task_completion": 0.8}
flags: list[str] = field(default_factory=list)
class OnlineEvaluationPipeline:
def __init__(self, scoring_functions: list, queue_size: int = 1000):
self.scorers = scoring_functions
self.queue: asyncio.Queue = asyncio.Queue(
maxsize=queue_size
)
self.results: deque = deque(maxlen=10000)
self._running = False
async def submit(self, conversation: dict):
try:
self.queue.put_nowait(conversation)
except asyncio.QueueFull:
pass # Drop if pipeline is backed up
async def _process(self):
while self._running:
try:
conversation = await asyncio.wait_for(
self.queue.get(), timeout=5.0
)
except asyncio.TimeoutError:
continue
scores = {}
flags = []
for scorer in self.scorers:
try:
result = await scorer(conversation)
scores.update(result.get("scores", {}))
flags.extend(result.get("flags", []))
except Exception as e:
flags.append(f"scorer_error:{e}")
quality_score = QualityScore(
conversation_id=conversation["id"],
timestamp=datetime.utcnow().isoformat(),
scores=scores,
flags=flags,
)
self.results.append(quality_score)
self.queue.task_done()
async def start(self, workers: int = 3):
self._running = True
tasks = [
asyncio.create_task(self._process())
for _ in range(workers)
]
return tasks
async def stop(self):
self._running = False
Multiple workers process the queue in parallel. If the queue fills up, new submissions are dropped rather than blocking the agent — monitoring should never degrade the user experience.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Building Quality Dashboards
Aggregate scores into time-windowed views that reveal trends and anomalies.
from collections import defaultdict
from typing import Sequence
class QualityDashboard:
def __init__(self, window_minutes: int = 60):
self.window_minutes = window_minutes
self.scores: list[QualityScore] = []
def add_score(self, score: QualityScore):
self.scores.append(score)
def _recent_scores(self) -> list[QualityScore]:
cutoff = (
datetime.utcnow()
- timedelta(minutes=self.window_minutes)
)
cutoff_str = cutoff.isoformat()
return [
s for s in self.scores
if s.timestamp >= cutoff_str
]
def current_metrics(self) -> dict:
recent = self._recent_scores()
if not recent:
return {"status": "no_data"}
metric_values = defaultdict(list)
all_flags = []
for score in recent:
for key, value in score.scores.items():
if isinstance(value, (int, float)):
metric_values[key].append(value)
all_flags.extend(score.flags)
metrics = {}
for key, values in metric_values.items():
metrics[key] = {
"mean": round(sum(values) / len(values), 3),
"min": round(min(values), 3),
"max": round(max(values), 3),
"count": len(values),
}
# Flag frequency
flag_counts = defaultdict(int)
for flag in all_flags:
flag_counts[flag] += 1
return {
"window_minutes": self.window_minutes,
"conversations_evaluated": len(recent),
"metrics": metrics,
"top_flags": dict(
sorted(
flag_counts.items(),
key=lambda x: -x[1],
)[:10]
),
}
def compare_windows(
self, current_minutes: int = 60, baseline_minutes: int = 1440
) -> dict:
now = datetime.utcnow()
current_cutoff = (
now - timedelta(minutes=current_minutes)
).isoformat()
baseline_cutoff = (
now - timedelta(minutes=baseline_minutes)
).isoformat()
current = [
s for s in self.scores
if s.timestamp >= current_cutoff
]
baseline = [
s for s in self.scores
if baseline_cutoff <= s.timestamp < current_cutoff
]
def avg_metric(scores, key):
vals = [
s.scores.get(key, 0)
for s in scores
if isinstance(s.scores.get(key), (int, float))
]
return sum(vals) / len(vals) if vals else 0
# Compare common metrics
all_keys = set()
for s in current + baseline:
all_keys.update(s.scores.keys())
comparison = {}
for key in all_keys:
curr_avg = avg_metric(current, key)
base_avg = avg_metric(baseline, key)
delta = curr_avg - base_avg
comparison[key] = {
"current": round(curr_avg, 3),
"baseline": round(base_avg, 3),
"delta": round(delta, 3),
"degraded": delta < -0.1,
}
return comparison
The compare_windows method is your early warning system. It compares the last hour against the last 24 hours. When a metric's delta turns significantly negative, something changed — a model update, a traffic pattern shift, or a bug.
Alerting on Quality Degradation
Convert dashboard data into actionable alerts.
@dataclass
class AlertRule:
metric: str
threshold: float
comparison: str # "below", "above"
severity: str # "warning", "critical"
message_template: str
class QualityAlertManager:
def __init__(self):
self.rules: list[AlertRule] = []
self.active_alerts: list[dict] = []
def add_rule(self, rule: AlertRule):
self.rules.append(rule)
def evaluate(self, metrics: dict) -> list[dict]:
triggered = []
for rule in self.rules:
metric_data = metrics.get("metrics", {}).get(
rule.metric, {}
)
value = metric_data.get("mean")
if value is None:
continue
fire = (
(rule.comparison == "below" and value < rule.threshold)
or (rule.comparison == "above" and value > rule.threshold)
)
if fire:
alert = {
"metric": rule.metric,
"value": value,
"threshold": rule.threshold,
"severity": rule.severity,
"message": rule.message_template.format(
metric=rule.metric,
value=value,
threshold=rule.threshold,
),
"timestamp": datetime.utcnow().isoformat(),
}
triggered.append(alert)
self.active_alerts = triggered
return triggered
# Configure alerts
alert_mgr = QualityAlertManager()
alert_mgr.add_rule(AlertRule(
metric="task_completion",
threshold=0.7,
comparison="below",
severity="critical",
message_template="Task completion dropped to {value:.1%}, below {threshold:.1%} threshold",
))
alert_mgr.add_rule(AlertRule(
metric="coherence",
threshold=3.0,
comparison="below",
severity="warning",
message_template="Coherence score at {value:.1f}, below {threshold:.1f} minimum",
))
Closing the Feedback Loop
The final piece is feeding production evaluation results back into your offline evaluation datasets. Conversations that score poorly in production become new test cases. Patterns that trigger alerts become new red team samples. This creates a virtuous cycle where your evaluation dataset grows smarter over time, reflecting the actual failure modes of your deployed agent rather than the failures you imagined during development.
FAQ
How much does continuous production evaluation cost?
At a 5 percent sampling rate with LLM-as-judge scoring, expect to spend 2 to 5 percent of your agent's total LLM cost on evaluation. For a system spending 10,000 dollars a month on agent inference, that is 200 to 500 dollars for continuous monitoring. Deterministic checks are essentially free, so maximize those and use LLM judges selectively for quality dimensions that require language understanding.
How do I avoid alert fatigue from too many false positives?
Start with conservative thresholds that only fire on genuine quality drops. Require sustained degradation — the metric must be below threshold for 15 minutes, not just a single sample. Group related alerts together so a single root cause does not generate five separate alerts. Review and tune thresholds monthly based on actual incident correlation.
Should I evaluate the same conversation multiple times with different judges?
For production monitoring, one evaluation pass is sufficient — you need speed and cost efficiency. For conversations flagged as potential quality issues, run a second evaluation with a different judge model to confirm. This two-tier approach keeps costs low while reducing false positives on the cases that might trigger engineering action.
#ProductionMonitoring #ContinuousEvaluation #Observability #Alerting #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.