Skip to content
Learn Agentic AI11 min read0 views

Monitoring Data Pipeline Health: Alerting on Ingestion Failures and Data Drift

Build a monitoring system for AI agent data pipelines that tracks ingestion metrics, detects data drift, alerts on failures, and enforces SLAs to keep your agent's knowledge base fresh and reliable.

Why Pipeline Monitoring Is Non-Negotiable

A data pipeline that worked perfectly yesterday can silently break today. An API changes its response format. A database migration drops a column. A rate limit kicks in halfway through processing. Without monitoring, these failures go undetected until a user asks your agent a question and gets a stale or wrong answer.

Pipeline monitoring for AI agents goes beyond traditional ETL monitoring. You need to track not just whether the pipeline ran, but whether the data it produced is fresh, complete, correctly formatted, and statistically consistent with what the agent expects.

Core Pipeline Metrics

Start by tracking four categories of metrics: throughput (how much data is flowing), latency (how long processing takes), quality (how clean the data is), and freshness (how recent the data is).

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from enum import Enum
import time

class MetricType(str, Enum):
    THROUGHPUT = "throughput"
    LATENCY = "latency"
    QUALITY = "quality"
    FRESHNESS = "freshness"

@dataclass
class PipelineMetric:
    pipeline_name: str
    metric_type: MetricType
    value: float
    unit: str
    timestamp: datetime
    labels: Dict[str, str] = field(default_factory=dict)

class MetricsCollector:
    def __init__(self):
        self.metrics: List[PipelineMetric] = []

    def record(
        self, pipeline: str, metric_type: MetricType,
        value: float, unit: str, **labels,
    ):
        self.metrics.append(PipelineMetric(
            pipeline_name=pipeline,
            metric_type=metric_type,
            value=value,
            unit=unit,
            timestamp=datetime.utcnow(),
            labels=labels,
        ))

    def get_recent(
        self, pipeline: str, metric_type: MetricType,
        minutes: int = 60,
    ) -> List[PipelineMetric]:
        cutoff = datetime.utcnow() - timedelta(minutes=minutes)
        return [
            m for m in self.metrics
            if (m.pipeline_name == pipeline
                and m.metric_type == metric_type
                and m.timestamp >= cutoff)
        ]

class PipelineTimer:
    """Context manager for timing pipeline stages."""
    def __init__(self, collector: MetricsCollector,
                 pipeline: str, stage: str):
        self.collector = collector
        self.pipeline = pipeline
        self.stage = stage
        self.start = None

    def __enter__(self):
        self.start = time.monotonic()
        return self

    def __exit__(self, *args):
        elapsed = time.monotonic() - self.start
        self.collector.record(
            self.pipeline, MetricType.LATENCY,
            elapsed, "seconds", stage=self.stage,
        )

Data Freshness Monitoring

Data freshness is the most critical metric for AI agents. If the knowledge base is stale, the agent gives outdated answers even though everything else works perfectly.

class FreshnessMonitor:
    def __init__(self, db_pool, collector: MetricsCollector):
        self.db_pool = db_pool
        self.collector = collector

    async def check_freshness(self) -> Dict[str, dict]:
        checks = {}
        async with self.db_pool.acquire() as conn:
            # Check each data source's most recent record
            sources = await conn.fetch("""
                SELECT
                    source,
                    MAX(updated_at) as last_update,
                    COUNT(*) as total_records,
                    COUNT(*) FILTER (
                        WHERE updated_at >= NOW() - INTERVAL '24 hours'
                    ) as recent_records
                FROM knowledge_documents
                GROUP BY source
            """)

        for row in sources:
            source = row["source"]
            last_update = row["last_update"]
            staleness = (
                datetime.utcnow() - last_update
            ).total_seconds() / 3600  # hours

            checks[source] = {
                "last_update": last_update.isoformat(),
                "staleness_hours": round(staleness, 1),
                "total_records": row["total_records"],
                "recent_records": row["recent_records"],
                "is_stale": staleness > 24,
            }

            self.collector.record(
                f"source_{source}",
                MetricType.FRESHNESS,
                staleness, "hours",
            )

        return checks

Data Drift Detection

Data drift means the statistical properties of incoming data have changed from what the pipeline and agent expect. This can indicate upstream data source problems, schema changes, or real-world shifts that require agent updates.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import statistics
from typing import Tuple

class DriftDetector:
    def __init__(self, baseline_window_days: int = 30):
        self.baseline_window = baseline_window_days

    async def check_drift(
        self, db_pool, table: str, column: str
    ) -> dict:
        async with db_pool.acquire() as conn:
            baseline = await conn.fetch(f"""
                SELECT {column}
                FROM {table}
                WHERE created_at BETWEEN
                    NOW() - INTERVAL '{self.baseline_window} days'
                    AND NOW() - INTERVAL '1 day'
            """)
            recent = await conn.fetch(f"""
                SELECT {column}
                FROM {table}
                WHERE created_at >= NOW() - INTERVAL '1 day'
            """)

        baseline_values = [r[column] for r in baseline if r[column] is not None]
        recent_values = [r[column] for r in recent if r[column] is not None]

        if not baseline_values or not recent_values:
            return {"status": "insufficient_data"}

        drift_score = self._calculate_drift(
            baseline_values, recent_values
        )

        return {
            "column": column,
            "baseline_mean": statistics.mean(baseline_values),
            "recent_mean": statistics.mean(recent_values),
            "drift_score": drift_score,
            "has_drift": drift_score > 2.0,
            "baseline_count": len(baseline_values),
            "recent_count": len(recent_values),
        }

    def _calculate_drift(
        self,
        baseline: List[float],
        recent: List[float],
    ) -> float:
        """Z-score based drift detection."""
        bl_mean = statistics.mean(baseline)
        bl_std = statistics.stdev(baseline) if len(baseline) > 1 else 1.0
        rc_mean = statistics.mean(recent)
        if bl_std == 0:
            return 0.0
        return abs(rc_mean - bl_mean) / bl_std

SLA Tracking and Alerting

Define SLAs for each pipeline and alert when they are violated. SLAs should cover freshness, completeness, and execution time.

@dataclass
class PipelineSLA:
    pipeline_name: str
    max_staleness_hours: float
    min_daily_records: int
    max_execution_minutes: float
    max_error_rate: float

@dataclass
class SLAViolation:
    pipeline_name: str
    sla_type: str
    expected: float
    actual: float
    message: str
    severity: str
    detected_at: datetime = field(
        default_factory=datetime.utcnow
    )

class SLAMonitor:
    def __init__(self, collector: MetricsCollector):
        self.collector = collector
        self.slas: Dict[str, PipelineSLA] = {}

    def register_sla(self, sla: PipelineSLA):
        self.slas[sla.pipeline_name] = sla

    def check_all(self) -> List[SLAViolation]:
        violations = []
        for name, sla in self.slas.items():
            violations.extend(self._check_pipeline(name, sla))
        return violations

    def _check_pipeline(
        self, name: str, sla: PipelineSLA
    ) -> List[SLAViolation]:
        violations = []

        # Check freshness
        freshness = self.collector.get_recent(
            name, MetricType.FRESHNESS, minutes=60
        )
        if freshness:
            latest = freshness[-1].value
            if latest > sla.max_staleness_hours:
                violations.append(SLAViolation(
                    pipeline_name=name,
                    sla_type="freshness",
                    expected=sla.max_staleness_hours,
                    actual=latest,
                    message=(
                        f"{name} data is {latest:.1f}h stale "
                        f"(SLA: {sla.max_staleness_hours}h)"
                    ),
                    severity="critical"
                    if latest > sla.max_staleness_hours * 2
                    else "warning",
                ))

        # Check latency
        latency = self.collector.get_recent(
            name, MetricType.LATENCY, minutes=120
        )
        if latency:
            max_latency = max(m.value for m in latency) / 60
            if max_latency > sla.max_execution_minutes:
                violations.append(SLAViolation(
                    pipeline_name=name,
                    sla_type="latency",
                    expected=sla.max_execution_minutes,
                    actual=max_latency,
                    message=(
                        f"{name} took {max_latency:.1f}min "
                        f"(SLA: {sla.max_execution_minutes}min)"
                    ),
                    severity="warning",
                ))

        return violations

Alert Dispatcher

Route alerts to the right channels based on severity.

import httpx
import logging

logger = logging.getLogger(__name__)

class AlertDispatcher:
    def __init__(self, slack_webhook: str, pagerduty_key: str = ""):
        self.slack_webhook = slack_webhook
        self.pagerduty_key = pagerduty_key

    async def dispatch(self, violations: List[SLAViolation]):
        for v in violations:
            if v.severity == "critical":
                await self._send_slack(v)
                if self.pagerduty_key:
                    await self._send_pagerduty(v)
            elif v.severity == "warning":
                await self._send_slack(v)
            logger.warning(
                f"SLA violation: {v.message} "
                f"[{v.severity}]"
            )

    async def _send_slack(self, violation: SLAViolation):
        icon = "!!" if violation.severity == "critical" else "!"
        payload = {
            "text": (
                f"{icon} Pipeline SLA Violation\n"
                f"*Pipeline:* {violation.pipeline_name}\n"
                f"*Type:* {violation.sla_type}\n"
                f"*Details:* {violation.message}\n"
                f"*Severity:* {violation.severity}"
            ),
        }
        async with httpx.AsyncClient() as client:
            await client.post(self.slack_webhook, json=payload)

    async def _send_pagerduty(self, violation: SLAViolation):
        payload = {
            "routing_key": self.pagerduty_key,
            "event_action": "trigger",
            "payload": {
                "summary": violation.message,
                "severity": violation.severity,
                "source": violation.pipeline_name,
            },
        }
        async with httpx.AsyncClient() as client:
            await client.post(
                "https://events.pagerduty.com/v2/enqueue",
                json=payload,
            )

Putting It All Together

Run monitoring checks on a schedule and dispatch alerts for any SLA violations.

async def run_monitoring_cycle(
    db_pool, collector, sla_monitor, alerter
):
    # Check freshness across all sources
    freshness_monitor = FreshnessMonitor(db_pool, collector)
    freshness = await freshness_monitor.check_freshness()

    # Check for data drift on key columns
    drift = DriftDetector()
    drift_result = await drift.check_drift(
        db_pool, "knowledge_documents", "word_count"
    )
    if drift_result.get("has_drift"):
        logger.warning(
            f"Data drift detected: {drift_result}"
        )

    # Check SLA compliance
    violations = sla_monitor.check_all()
    if violations:
        await alerter.dispatch(violations)

    return {
        "freshness": freshness,
        "drift": drift_result,
        "violations": len(violations),
    }

FAQ

How often should I run pipeline health checks?

Run freshness checks every 5 to 15 minutes and drift detection hourly. SLA checks should align with your pipeline schedules — if a pipeline runs every 6 hours, check its SLA shortly after each expected completion. Avoid running expensive drift detection queries too frequently as they scan large amounts of data and can impact database performance.

What is the difference between data drift and concept drift, and which should I monitor?

Data drift means the statistical distribution of input features has changed — for example, document lengths suddenly averaging 2x longer than normal. Concept drift means the relationship between inputs and expected outputs has changed — the same question now has a different correct answer. Monitor data drift with statistical tests on pipeline metrics. Detect concept drift by tracking agent accuracy metrics (thumbs up/down rate, escalation rate) over time.

How do I set appropriate SLA thresholds for a new pipeline?

Run the pipeline for two to four weeks in observation mode, collecting baseline metrics without alerts. Calculate the mean and standard deviation for freshness, latency, and throughput. Set warning thresholds at mean plus two standard deviations and critical thresholds at mean plus three standard deviations. Adjust based on business requirements — if the agent serves time-sensitive queries, tighten freshness SLAs below the statistical baseline.


#PipelineMonitoring #DataDrift #Alerting #SLATracking #Observability #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.