Monitoring Data Pipeline Health: Alerting on Ingestion Failures and Data Drift
Build a monitoring system for AI agent data pipelines that tracks ingestion metrics, detects data drift, alerts on failures, and enforces SLAs to keep your agent's knowledge base fresh and reliable.
Why Pipeline Monitoring Is Non-Negotiable
A data pipeline that worked perfectly yesterday can silently break today. An API changes its response format. A database migration drops a column. A rate limit kicks in halfway through processing. Without monitoring, these failures go undetected until a user asks your agent a question and gets a stale or wrong answer.
Pipeline monitoring for AI agents goes beyond traditional ETL monitoring. You need to track not just whether the pipeline ran, but whether the data it produced is fresh, complete, correctly formatted, and statistically consistent with what the agent expects.
Core Pipeline Metrics
Start by tracking four categories of metrics: throughput (how much data is flowing), latency (how long processing takes), quality (how clean the data is), and freshness (how recent the data is).
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from enum import Enum
import time
class MetricType(str, Enum):
THROUGHPUT = "throughput"
LATENCY = "latency"
QUALITY = "quality"
FRESHNESS = "freshness"
@dataclass
class PipelineMetric:
pipeline_name: str
metric_type: MetricType
value: float
unit: str
timestamp: datetime
labels: Dict[str, str] = field(default_factory=dict)
class MetricsCollector:
def __init__(self):
self.metrics: List[PipelineMetric] = []
def record(
self, pipeline: str, metric_type: MetricType,
value: float, unit: str, **labels,
):
self.metrics.append(PipelineMetric(
pipeline_name=pipeline,
metric_type=metric_type,
value=value,
unit=unit,
timestamp=datetime.utcnow(),
labels=labels,
))
def get_recent(
self, pipeline: str, metric_type: MetricType,
minutes: int = 60,
) -> List[PipelineMetric]:
cutoff = datetime.utcnow() - timedelta(minutes=minutes)
return [
m for m in self.metrics
if (m.pipeline_name == pipeline
and m.metric_type == metric_type
and m.timestamp >= cutoff)
]
class PipelineTimer:
"""Context manager for timing pipeline stages."""
def __init__(self, collector: MetricsCollector,
pipeline: str, stage: str):
self.collector = collector
self.pipeline = pipeline
self.stage = stage
self.start = None
def __enter__(self):
self.start = time.monotonic()
return self
def __exit__(self, *args):
elapsed = time.monotonic() - self.start
self.collector.record(
self.pipeline, MetricType.LATENCY,
elapsed, "seconds", stage=self.stage,
)
Data Freshness Monitoring
Data freshness is the most critical metric for AI agents. If the knowledge base is stale, the agent gives outdated answers even though everything else works perfectly.
class FreshnessMonitor:
def __init__(self, db_pool, collector: MetricsCollector):
self.db_pool = db_pool
self.collector = collector
async def check_freshness(self) -> Dict[str, dict]:
checks = {}
async with self.db_pool.acquire() as conn:
# Check each data source's most recent record
sources = await conn.fetch("""
SELECT
source,
MAX(updated_at) as last_update,
COUNT(*) as total_records,
COUNT(*) FILTER (
WHERE updated_at >= NOW() - INTERVAL '24 hours'
) as recent_records
FROM knowledge_documents
GROUP BY source
""")
for row in sources:
source = row["source"]
last_update = row["last_update"]
staleness = (
datetime.utcnow() - last_update
).total_seconds() / 3600 # hours
checks[source] = {
"last_update": last_update.isoformat(),
"staleness_hours": round(staleness, 1),
"total_records": row["total_records"],
"recent_records": row["recent_records"],
"is_stale": staleness > 24,
}
self.collector.record(
f"source_{source}",
MetricType.FRESHNESS,
staleness, "hours",
)
return checks
Data Drift Detection
Data drift means the statistical properties of incoming data have changed from what the pipeline and agent expect. This can indicate upstream data source problems, schema changes, or real-world shifts that require agent updates.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import statistics
from typing import Tuple
class DriftDetector:
def __init__(self, baseline_window_days: int = 30):
self.baseline_window = baseline_window_days
async def check_drift(
self, db_pool, table: str, column: str
) -> dict:
async with db_pool.acquire() as conn:
baseline = await conn.fetch(f"""
SELECT {column}
FROM {table}
WHERE created_at BETWEEN
NOW() - INTERVAL '{self.baseline_window} days'
AND NOW() - INTERVAL '1 day'
""")
recent = await conn.fetch(f"""
SELECT {column}
FROM {table}
WHERE created_at >= NOW() - INTERVAL '1 day'
""")
baseline_values = [r[column] for r in baseline if r[column] is not None]
recent_values = [r[column] for r in recent if r[column] is not None]
if not baseline_values or not recent_values:
return {"status": "insufficient_data"}
drift_score = self._calculate_drift(
baseline_values, recent_values
)
return {
"column": column,
"baseline_mean": statistics.mean(baseline_values),
"recent_mean": statistics.mean(recent_values),
"drift_score": drift_score,
"has_drift": drift_score > 2.0,
"baseline_count": len(baseline_values),
"recent_count": len(recent_values),
}
def _calculate_drift(
self,
baseline: List[float],
recent: List[float],
) -> float:
"""Z-score based drift detection."""
bl_mean = statistics.mean(baseline)
bl_std = statistics.stdev(baseline) if len(baseline) > 1 else 1.0
rc_mean = statistics.mean(recent)
if bl_std == 0:
return 0.0
return abs(rc_mean - bl_mean) / bl_std
SLA Tracking and Alerting
Define SLAs for each pipeline and alert when they are violated. SLAs should cover freshness, completeness, and execution time.
@dataclass
class PipelineSLA:
pipeline_name: str
max_staleness_hours: float
min_daily_records: int
max_execution_minutes: float
max_error_rate: float
@dataclass
class SLAViolation:
pipeline_name: str
sla_type: str
expected: float
actual: float
message: str
severity: str
detected_at: datetime = field(
default_factory=datetime.utcnow
)
class SLAMonitor:
def __init__(self, collector: MetricsCollector):
self.collector = collector
self.slas: Dict[str, PipelineSLA] = {}
def register_sla(self, sla: PipelineSLA):
self.slas[sla.pipeline_name] = sla
def check_all(self) -> List[SLAViolation]:
violations = []
for name, sla in self.slas.items():
violations.extend(self._check_pipeline(name, sla))
return violations
def _check_pipeline(
self, name: str, sla: PipelineSLA
) -> List[SLAViolation]:
violations = []
# Check freshness
freshness = self.collector.get_recent(
name, MetricType.FRESHNESS, minutes=60
)
if freshness:
latest = freshness[-1].value
if latest > sla.max_staleness_hours:
violations.append(SLAViolation(
pipeline_name=name,
sla_type="freshness",
expected=sla.max_staleness_hours,
actual=latest,
message=(
f"{name} data is {latest:.1f}h stale "
f"(SLA: {sla.max_staleness_hours}h)"
),
severity="critical"
if latest > sla.max_staleness_hours * 2
else "warning",
))
# Check latency
latency = self.collector.get_recent(
name, MetricType.LATENCY, minutes=120
)
if latency:
max_latency = max(m.value for m in latency) / 60
if max_latency > sla.max_execution_minutes:
violations.append(SLAViolation(
pipeline_name=name,
sla_type="latency",
expected=sla.max_execution_minutes,
actual=max_latency,
message=(
f"{name} took {max_latency:.1f}min "
f"(SLA: {sla.max_execution_minutes}min)"
),
severity="warning",
))
return violations
Alert Dispatcher
Route alerts to the right channels based on severity.
import httpx
import logging
logger = logging.getLogger(__name__)
class AlertDispatcher:
def __init__(self, slack_webhook: str, pagerduty_key: str = ""):
self.slack_webhook = slack_webhook
self.pagerduty_key = pagerduty_key
async def dispatch(self, violations: List[SLAViolation]):
for v in violations:
if v.severity == "critical":
await self._send_slack(v)
if self.pagerduty_key:
await self._send_pagerduty(v)
elif v.severity == "warning":
await self._send_slack(v)
logger.warning(
f"SLA violation: {v.message} "
f"[{v.severity}]"
)
async def _send_slack(self, violation: SLAViolation):
icon = "!!" if violation.severity == "critical" else "!"
payload = {
"text": (
f"{icon} Pipeline SLA Violation\n"
f"*Pipeline:* {violation.pipeline_name}\n"
f"*Type:* {violation.sla_type}\n"
f"*Details:* {violation.message}\n"
f"*Severity:* {violation.severity}"
),
}
async with httpx.AsyncClient() as client:
await client.post(self.slack_webhook, json=payload)
async def _send_pagerduty(self, violation: SLAViolation):
payload = {
"routing_key": self.pagerduty_key,
"event_action": "trigger",
"payload": {
"summary": violation.message,
"severity": violation.severity,
"source": violation.pipeline_name,
},
}
async with httpx.AsyncClient() as client:
await client.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload,
)
Putting It All Together
Run monitoring checks on a schedule and dispatch alerts for any SLA violations.
async def run_monitoring_cycle(
db_pool, collector, sla_monitor, alerter
):
# Check freshness across all sources
freshness_monitor = FreshnessMonitor(db_pool, collector)
freshness = await freshness_monitor.check_freshness()
# Check for data drift on key columns
drift = DriftDetector()
drift_result = await drift.check_drift(
db_pool, "knowledge_documents", "word_count"
)
if drift_result.get("has_drift"):
logger.warning(
f"Data drift detected: {drift_result}"
)
# Check SLA compliance
violations = sla_monitor.check_all()
if violations:
await alerter.dispatch(violations)
return {
"freshness": freshness,
"drift": drift_result,
"violations": len(violations),
}
FAQ
How often should I run pipeline health checks?
Run freshness checks every 5 to 15 minutes and drift detection hourly. SLA checks should align with your pipeline schedules — if a pipeline runs every 6 hours, check its SLA shortly after each expected completion. Avoid running expensive drift detection queries too frequently as they scan large amounts of data and can impact database performance.
What is the difference between data drift and concept drift, and which should I monitor?
Data drift means the statistical distribution of input features has changed — for example, document lengths suddenly averaging 2x longer than normal. Concept drift means the relationship between inputs and expected outputs has changed — the same question now has a different correct answer. Monitor data drift with statistical tests on pipeline metrics. Detect concept drift by tracking agent accuracy metrics (thumbs up/down rate, escalation rate) over time.
How do I set appropriate SLA thresholds for a new pipeline?
Run the pipeline for two to four weeks in observation mode, collecting baseline metrics without alerts. Calculate the mean and standard deviation for freshness, latency, and throughput. Set warning thresholds at mean plus two standard deviations and critical thresholds at mean plus three standard deviations. Adjust based on business requirements — if the agent serves time-sensitive queries, tighten freshness SLAs below the statistical baseline.
#PipelineMonitoring #DataDrift #Alerting #SLATracking #Observability #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.