Real-Time Notifications from AI Agents: Push Alerts, Emails, and SMS Triggers
Build a notification pipeline for AI agents that delivers push alerts, emails, and SMS messages based on configurable trigger conditions, with deduplication and user preference management.
Why AI Agents Need Notification Systems
AI agents often perform background work — monitoring data feeds, processing long-running tasks, detecting anomalies, or completing scheduled analyses. Users cannot sit in front of a dashboard waiting for results. A notification system bridges the gap between asynchronous agent work and timely user awareness.
But agent notifications are different from traditional application notifications. An agent might generate hundreds of events per hour, and blindly forwarding all of them as push alerts would overwhelm users. You need smart filtering, trigger conditions, deduplication, and channel routing so users receive the right information at the right time through the right channel.
Notification Pipeline Architecture
The pipeline has four stages: event ingestion from agents, trigger evaluation, deduplication, and multi-channel delivery.
import asyncio
import hashlib
import time
import json
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
class NotificationChannel(str, Enum):
PUSH = "push"
EMAIL = "email"
SMS = "sms"
WEBHOOK = "webhook"
class NotificationPriority(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class AgentNotification:
notification_id: str
agent_id: str
title: str
body: str
priority: NotificationPriority
channels: list[NotificationChannel]
user_id: str
metadata: dict = field(default_factory=dict)
created_at: float = field(default_factory=time.time)
dedup_key: Optional[str] = None
@dataclass
class TriggerRule:
name: str
agent_id: str
condition: str # "error_rate > 0.05", "sentiment < 0.3"
channels: list[NotificationChannel]
priority: NotificationPriority
cooldown_seconds: int = 300 # Minimum time between triggers
user_ids: list[str] = field(default_factory=list)
Trigger Evaluation Engine
Define trigger rules that evaluate agent events against conditions. When a condition is met, the engine generates a notification.
import operator
from typing import Any
class TriggerEngine:
OPERATORS = {
">": operator.gt,
"<": operator.lt,
">=": operator.ge,
"<=": operator.le,
"==": operator.eq,
"!=": operator.ne,
}
def __init__(self):
self.rules: list[TriggerRule] = []
self.last_triggered: dict[str, float] = {}
def add_rule(self, rule: TriggerRule):
self.rules.append(rule)
def evaluate(self, agent_id: str, metrics: dict[str, Any]) -> list[TriggerRule]:
triggered = []
now = time.time()
for rule in self.rules:
if rule.agent_id != agent_id and rule.agent_id != "*":
continue
# Parse condition: "error_rate > 0.05"
parts = rule.condition.split()
if len(parts) != 3:
continue
metric_name, op_str, threshold_str = parts
if metric_name not in metrics:
continue
if op_str not in self.OPERATORS:
continue
value = metrics[metric_name]
threshold = type(value)(threshold_str)
op_func = self.OPERATORS[op_str]
if op_func(value, threshold):
# Check cooldown
last = self.last_triggered.get(rule.name, 0)
if now - last >= rule.cooldown_seconds:
self.last_triggered[rule.name] = now
triggered.append(rule)
return triggered
# Usage
engine = TriggerEngine()
engine.add_rule(TriggerRule(
name="high_error_rate",
agent_id="support-agent",
condition="error_rate > 0.05",
channels=[NotificationChannel.PUSH, NotificationChannel.EMAIL],
priority=NotificationPriority.HIGH,
cooldown_seconds=600,
user_ids=["admin-1", "admin-2"],
))
engine.add_rule(TriggerRule(
name="task_completed",
agent_id="*",
condition="completed == 1",
channels=[NotificationChannel.PUSH],
priority=NotificationPriority.LOW,
cooldown_seconds=0,
))
The cooldown mechanism prevents alert storms. If the error rate stays above the threshold, the rule only fires once every 10 minutes instead of on every metrics update.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Deduplication Layer
Deduplication prevents users from receiving the same notification multiple times when events arrive in bursts. Use a content-based hash with a time window.
class NotificationDeduplicator:
def __init__(self, window_seconds: int = 300):
self.window = window_seconds
self.seen: dict[str, float] = {}
def _compute_key(self, notification: AgentNotification) -> str:
if notification.dedup_key:
return notification.dedup_key
content = f"{notification.agent_id}:{notification.title}:{notification.user_id}"
return hashlib.sha256(content.encode()).hexdigest()[:16]
def is_duplicate(self, notification: AgentNotification) -> bool:
key = self._compute_key(notification)
now = time.time()
# Prune expired entries
expired = [k for k, t in self.seen.items() if now - t > self.window]
for k in expired:
del self.seen[k]
if key in self.seen:
return True
self.seen[key] = now
return False
Multi-Channel Delivery
Each delivery channel has its own adapter. The dispatcher routes notifications based on the channels specified in the trigger rule and the user's preferences.
from abc import ABC, abstractmethod
class DeliveryAdapter(ABC):
@abstractmethod
async def send(self, notification: AgentNotification) -> bool:
pass
class PushAdapter(DeliveryAdapter):
async def send(self, notification: AgentNotification) -> bool:
# WebSocket push to connected clients
session = active_sessions.get(notification.user_id)
if session:
await session.ws.send_json({
"type": "notification",
"title": notification.title,
"body": notification.body,
"priority": notification.priority.value,
"agent_id": notification.agent_id,
})
return True
return False
class EmailAdapter(DeliveryAdapter):
def __init__(self, smtp_host: str, from_addr: str):
self.smtp_host = smtp_host
self.from_addr = from_addr
async def send(self, notification: AgentNotification) -> bool:
user_email = await get_user_email(notification.user_id)
if not user_email:
return False
# Use aiosmtplib or similar async email library
await send_email(
to=user_email,
subject=f"[{notification.priority.value.upper()}] {notification.title}",
body=notification.body,
from_addr=self.from_addr,
)
return True
class NotificationDispatcher:
def __init__(self):
self.adapters: dict[NotificationChannel, DeliveryAdapter] = {}
self.dedup = NotificationDeduplicator()
def register_adapter(self, channel: NotificationChannel, adapter: DeliveryAdapter):
self.adapters[channel] = adapter
async def dispatch(self, notification: AgentNotification):
if self.dedup.is_duplicate(notification):
return
user_prefs = await get_user_notification_prefs(notification.user_id)
for channel in notification.channels:
if not user_prefs.get(channel.value, True):
continue # User has disabled this channel
if channel == NotificationChannel.SMS:
if notification.priority not in (
NotificationPriority.HIGH,
NotificationPriority.CRITICAL,
):
continue # Only send SMS for high/critical
adapter = self.adapters.get(channel)
if adapter:
try:
await adapter.send(notification)
except Exception as e:
print(f"Delivery failed on {channel}: {e}")
The dispatcher respects user preferences and applies channel-specific rules. SMS is restricted to high and critical priority to avoid overusing an expensive channel. If push delivery fails (user not connected), the notification can fall through to email.
FAQ
How do you handle notification preferences across different AI agents?
Store preferences at two levels: global defaults and per-agent overrides. A user might want push notifications for all agents by default, but email-only for a nightly batch processing agent. Store these preferences in a JSON structure like {"default": {"push": true, "email": true, "sms": false}, "agents": {"batch-agent": {"push": false, "email": true}}}. When dispatching, merge the agent-specific settings over the defaults.
What is the best way to handle notification delivery failures and retries?
Implement a retry queue with exponential backoff for each delivery channel independently. If email delivery fails, retry 3 times with delays of 30 seconds, 2 minutes, and 10 minutes. If all retries fail, log the failure and optionally try a fallback channel (e.g., push instead of email). Keep a delivery status log so you can audit which notifications reached users and which were lost. For critical notifications, require at least two channels to succeed before marking delivery as complete.
How do you prevent alert fatigue when AI agents generate many events?
Layer three mechanisms: trigger cooldowns (minimum time between alerts for the same rule), deduplication (content-based suppression within a time window), and digest batching. For low-priority notifications, batch them into a single daily or hourly digest email instead of sending individual alerts. Let users configure their own thresholds — some prefer immediate alerts for everything, while others only want to be notified about critical issues.
#Notifications #RealTimeAI #Email #SMS #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.