Notification Routing Agent: Intelligent Alert Triage and Delivery Channel Selection
Build an AI agent that classifies incoming alerts by urgency and type, selects the optimal delivery channel for each notification, bundles related alerts to reduce noise, and ensures critical issues reach the right people immediately.
Alert Fatigue Is a Real Problem
Modern systems generate an overwhelming volume of notifications. Monitoring tools fire alerts, CI/CD pipelines report failures, customer support tickets arrive, security scanners flag vulnerabilities, and business dashboards trigger threshold warnings. When everything buzzes, nothing stands out. Alert fatigue leads to missed critical issues because the important signals are buried under noise.
A notification routing agent solves this by classifying each alert, determining its true urgency, selecting the right delivery channel, and bundling related alerts to reduce interruption volume.
Defining the Alert Model
First, we define a structured model for incoming alerts from any source system:
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class Urgency(Enum):
CRITICAL = "critical" # Immediate action needed
HIGH = "high" # Action needed within 1 hour
MEDIUM = "medium" # Action needed today
LOW = "low" # Informational, no rush
NOISE = "noise" # Can be suppressed
class DeliveryChannel(Enum):
PHONE_CALL = "phone_call"
SMS = "sms"
SLACK_DM = "slack_dm"
SLACK_CHANNEL = "slack_channel"
EMAIL = "email"
DASHBOARD = "dashboard"
SUPPRESSED = "suppressed"
@dataclass
class Alert:
id: str
source: str # e.g., "prometheus", "jira", "sentry"
title: str
body: str
timestamp: datetime
raw_severity: str # Original severity from source system
metadata: dict = field(default_factory=dict)
classified_urgency: Urgency | None = None
delivery_channel: DeliveryChannel | None = None
routed_to: list[str] = field(default_factory=list)
bundle_key: str = ""
Classifying Alert Urgency with AI
Source systems assign severity levels, but these are often unreliable. A "critical" Prometheus alert for a staging environment is not truly critical. The agent reclassifies urgency based on context:
from openai import OpenAI
import json
client = OpenAI()
CLASSIFICATION_CONTEXT = """
Rules for urgency classification:
- CRITICAL: Production is down, data loss occurring, security breach active
- HIGH: Production degraded, error rate spiking, customer-facing issue
- MEDIUM: Non-production issue, slow degradation, planned attention needed
- LOW: Informational, minor threshold crossed, non-urgent improvement
- NOISE: Duplicate, auto-resolved, known flaky alert, test environment
"""
def classify_alert(alert: Alert, system_context: str = "") -> dict:
"""Classify alert urgency and determine routing."""
response = client.chat.completions.create(
model="gpt-4o-mini",
temperature=0,
response_format={"type": "json_object"},
messages=[
{
"role": "system",
"content": (
"You are an alert triage system. Classify this alert and "
"determine routing. Return JSON with:\n"
"- urgency: critical, high, medium, low, or noise\n"
"- reasoning: one sentence explaining the classification\n"
"- team: which team should handle this (engineering, security, "
" devops, support, product)\n"
"- bundle_key: a short key for grouping related alerts "
" (e.g., 'db-connection-pool', 'api-latency')\n\n"
f"{CLASSIFICATION_CONTEXT}\n\n"
f"System context: {system_context}"
),
},
{
"role": "user",
"content": (
f"Source: {alert.source}\n"
f"Original Severity: {alert.raw_severity}\n"
f"Title: {alert.title}\n"
f"Body: {alert.body}\n"
f"Metadata: {json.dumps(alert.metadata)}"
),
},
],
)
return json.loads(response.choices[0].message.content)
Selecting the Delivery Channel
The delivery channel depends on urgency, time of day, and the recipient's preferences. Critical alerts at 3 AM go to phone calls, not Slack:
from datetime import time as dt_time
@dataclass
class RecipientPreferences:
name: str
phone: str
email: str
slack_id: str
quiet_hours: tuple[dt_time, dt_time] = (dt_time(22, 0), dt_time(7, 0))
preferred_channel: DeliveryChannel = DeliveryChannel.SLACK_DM
def select_delivery_channel(
urgency: Urgency,
recipient: RecipientPreferences,
current_time: datetime,
) -> DeliveryChannel:
"""Select the optimal delivery channel based on urgency and context."""
if urgency == Urgency.NOISE:
return DeliveryChannel.SUPPRESSED
current_t = current_time.time()
is_quiet_hours = (
current_t >= recipient.quiet_hours[0]
or current_t <= recipient.quiet_hours[1]
)
channel_priority = {
Urgency.CRITICAL: [
DeliveryChannel.PHONE_CALL,
DeliveryChannel.SMS,
DeliveryChannel.SLACK_DM,
],
Urgency.HIGH: [
DeliveryChannel.SMS if is_quiet_hours else DeliveryChannel.SLACK_DM,
DeliveryChannel.SLACK_DM,
DeliveryChannel.EMAIL,
],
Urgency.MEDIUM: [
DeliveryChannel.SLACK_CHANNEL,
DeliveryChannel.EMAIL,
],
Urgency.LOW: [
DeliveryChannel.DASHBOARD,
DeliveryChannel.EMAIL,
],
}
options = channel_priority.get(urgency, [DeliveryChannel.EMAIL])
return options[0] if options else DeliveryChannel.EMAIL
Critical alerts always escalate to phone calls regardless of time. High-urgency alerts during quiet hours use SMS instead of Slack since the recipient is likely not checking Slack at 3 AM.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Alert Bundling to Reduce Noise
When the same underlying issue triggers multiple alerts, the agent bundles them into a single notification:
from collections import defaultdict
from datetime import timedelta
class AlertBundler:
def __init__(self, window_seconds: int = 300):
self.window = timedelta(seconds=window_seconds)
self.bundles: dict[str, list[Alert]] = defaultdict(list)
self.last_sent: dict[str, datetime] = {}
def should_bundle(self, alert: Alert) -> bool:
"""Check if this alert should be bundled with existing alerts."""
key = alert.bundle_key
if not key:
return False
last = self.last_sent.get(key)
if last and (alert.timestamp - last) < self.window:
self.bundles[key].append(alert)
return True
return False
def add_and_check(self, alert: Alert) -> Alert | None:
"""Add alert. Returns None if bundled, or the alert if it should send."""
if self.should_bundle(alert):
return None # Bundled, will send in digest
self.bundles[alert.bundle_key].append(alert)
self.last_sent[alert.bundle_key] = alert.timestamp
return alert
def flush_bundle(self, bundle_key: str) -> list[Alert]:
"""Get all bundled alerts for a key and clear the bundle."""
alerts = self.bundles.pop(bundle_key, [])
self.last_sent.pop(bundle_key, None)
return alerts
def get_bundle_summary(self, bundle_key: str) -> str:
"""Generate a summary for a bundle of related alerts."""
alerts = self.bundles.get(bundle_key, [])
if not alerts:
return ""
count = len(alerts)
first = alerts[0]
return (
f"{count} related alerts for '{bundle_key}' "
f"since {first.timestamp.strftime('%H:%M:%S')}. "
f"Latest: {alerts[-1].title}"
)
The bundler groups alerts by their bundle_key within a configurable time window. Instead of receiving 15 individual "pod restarting" alerts, the on-call engineer receives one notification saying "15 pod restart alerts in the last 5 minutes."
Dispatching Notifications
The dispatcher sends alerts through the selected channel:
import httpx
import logging
logger = logging.getLogger("notification_agent")
class NotificationDispatcher:
def __init__(self, slack_token: str, twilio_sid: str, twilio_token: str):
self.slack_token = slack_token
self.twilio_sid = twilio_sid
self.twilio_token = twilio_token
def dispatch(self, alert: Alert, channel: DeliveryChannel, recipient: RecipientPreferences):
"""Send a notification through the selected channel."""
if channel == DeliveryChannel.SUPPRESSED:
logger.debug(f"Suppressed: {alert.title}")
return
if channel == DeliveryChannel.SLACK_DM:
self._send_slack_dm(recipient.slack_id, alert)
elif channel == DeliveryChannel.SMS:
self._send_sms(recipient.phone, alert)
elif channel == DeliveryChannel.PHONE_CALL:
self._trigger_phone_call(recipient.phone, alert)
elif channel == DeliveryChannel.EMAIL:
self._send_email(recipient.email, alert)
else:
logger.info(f"Dashboard only: {alert.title}")
def _send_slack_dm(self, slack_id: str, alert: Alert):
httpx.post(
"https://slack.com/api/chat.postMessage",
headers={"Authorization": f"Bearer {self.slack_token}"},
json={
"channel": slack_id,
"text": f"*[{alert.classified_urgency.value.upper()}]* {alert.title}\n{alert.body}",
},
)
def _send_sms(self, phone: str, alert: Alert):
httpx.post(
f"https://api.twilio.com/2010-04-01/Accounts/{self.twilio_sid}/Messages.json",
auth=(self.twilio_sid, self.twilio_token),
data={
"To": phone,
"From": "+1234567890",
"Body": f"[{alert.classified_urgency.value.upper()}] {alert.title}",
},
)
def _trigger_phone_call(self, phone: str, alert: Alert):
logger.critical(f"PHONE CALL triggered for {phone}: {alert.title}")
# Integration with Twilio voice or PagerDuty for phone escalation
def _send_email(self, email: str, alert: Alert):
logger.info(f"Email to {email}: {alert.title}")
# Integration with SendGrid, SES, or SMTP
Putting It All Together
The main processing loop receives alerts from any source, classifies them, bundles related ones, and dispatches through the appropriate channel:
def process_alert(
alert: Alert,
bundler: AlertBundler,
dispatcher: NotificationDispatcher,
team_roster: dict[str, RecipientPreferences],
):
"""Process a single alert through the routing pipeline."""
classification = classify_alert(alert)
alert.classified_urgency = Urgency(classification["urgency"])
alert.bundle_key = classification.get("bundle_key", "")
team = classification.get("team", "engineering")
# Check bundling
result = bundler.add_and_check(alert)
if result is None:
logger.info(f"Bundled: {alert.title} (key: {alert.bundle_key})")
return
# Find recipient from team roster
recipient = team_roster.get(team)
if not recipient:
logger.warning(f"No on-call for team: {team}")
return
# Select channel and dispatch
channel = select_delivery_channel(
alert.classified_urgency, recipient, alert.timestamp
)
alert.delivery_channel = channel
dispatcher.dispatch(alert, channel, recipient)
FAQ
How do I prevent alert storms from overwhelming the system?
The bundler handles most alert storms by grouping related alerts. Additionally, implement a rate limiter per recipient: no more than 5 notifications per 10-minute window for non-critical alerts. If the rate limit is hit, automatically escalate the situation to critical and send a single summary notification instead of individual alerts.
How do I handle escalation when nobody responds?
Implement a timeout-based escalation ladder. If a critical alert is not acknowledged within 5 minutes, re-send via the next channel (Slack to SMS to phone). If still unacknowledged after 15 minutes, escalate to the team lead. Track acknowledgment by requiring recipients to click a link or reply with a code.
Can I train the classification model on my organization's alert history?
Yes. Export your historical alerts with their actual urgency outcomes (was action taken, how quickly, was it a false positive). Use this data to fine-tune the classification prompts with few-shot examples specific to your environment. Include examples of alerts your team marked as noise so the model learns your specific suppression patterns.
#NotificationRouting #AIAgents #AlertTriage #WorkflowAutomation #Python #DevOps #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.