Skip to content
Learn Agentic AI10 min read0 views

Real-Time Notifications from AI Agents: Push Alerts, Emails, and SMS Triggers

Build a notification pipeline for AI agents that delivers push alerts, emails, and SMS messages based on configurable trigger conditions, with deduplication and user preference management.

Why AI Agents Need Notification Systems

AI agents often perform background work — monitoring data feeds, processing long-running tasks, detecting anomalies, or completing scheduled analyses. Users cannot sit in front of a dashboard waiting for results. A notification system bridges the gap between asynchronous agent work and timely user awareness.

But agent notifications are different from traditional application notifications. An agent might generate hundreds of events per hour, and blindly forwarding all of them as push alerts would overwhelm users. You need smart filtering, trigger conditions, deduplication, and channel routing so users receive the right information at the right time through the right channel.

Notification Pipeline Architecture

The pipeline has four stages: event ingestion from agents, trigger evaluation, deduplication, and multi-channel delivery.

import asyncio
import hashlib
import time
import json
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum

class NotificationChannel(str, Enum):
    PUSH = "push"
    EMAIL = "email"
    SMS = "sms"
    WEBHOOK = "webhook"

class NotificationPriority(str, Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class AgentNotification:
    notification_id: str
    agent_id: str
    title: str
    body: str
    priority: NotificationPriority
    channels: list[NotificationChannel]
    user_id: str
    metadata: dict = field(default_factory=dict)
    created_at: float = field(default_factory=time.time)
    dedup_key: Optional[str] = None

@dataclass
class TriggerRule:
    name: str
    agent_id: str
    condition: str  # "error_rate > 0.05", "sentiment < 0.3"
    channels: list[NotificationChannel]
    priority: NotificationPriority
    cooldown_seconds: int = 300  # Minimum time between triggers
    user_ids: list[str] = field(default_factory=list)

Trigger Evaluation Engine

Define trigger rules that evaluate agent events against conditions. When a condition is met, the engine generates a notification.

import operator
from typing import Any

class TriggerEngine:
    OPERATORS = {
        ">": operator.gt,
        "<": operator.lt,
        ">=": operator.ge,
        "<=": operator.le,
        "==": operator.eq,
        "!=": operator.ne,
    }

    def __init__(self):
        self.rules: list[TriggerRule] = []
        self.last_triggered: dict[str, float] = {}

    def add_rule(self, rule: TriggerRule):
        self.rules.append(rule)

    def evaluate(self, agent_id: str, metrics: dict[str, Any]) -> list[TriggerRule]:
        triggered = []
        now = time.time()

        for rule in self.rules:
            if rule.agent_id != agent_id and rule.agent_id != "*":
                continue

            # Parse condition: "error_rate > 0.05"
            parts = rule.condition.split()
            if len(parts) != 3:
                continue

            metric_name, op_str, threshold_str = parts
            if metric_name not in metrics:
                continue
            if op_str not in self.OPERATORS:
                continue

            value = metrics[metric_name]
            threshold = type(value)(threshold_str)
            op_func = self.OPERATORS[op_str]

            if op_func(value, threshold):
                # Check cooldown
                last = self.last_triggered.get(rule.name, 0)
                if now - last >= rule.cooldown_seconds:
                    self.last_triggered[rule.name] = now
                    triggered.append(rule)

        return triggered

# Usage
engine = TriggerEngine()
engine.add_rule(TriggerRule(
    name="high_error_rate",
    agent_id="support-agent",
    condition="error_rate > 0.05",
    channels=[NotificationChannel.PUSH, NotificationChannel.EMAIL],
    priority=NotificationPriority.HIGH,
    cooldown_seconds=600,
    user_ids=["admin-1", "admin-2"],
))
engine.add_rule(TriggerRule(
    name="task_completed",
    agent_id="*",
    condition="completed == 1",
    channels=[NotificationChannel.PUSH],
    priority=NotificationPriority.LOW,
    cooldown_seconds=0,
))

The cooldown mechanism prevents alert storms. If the error rate stays above the threshold, the rule only fires once every 10 minutes instead of on every metrics update.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Deduplication Layer

Deduplication prevents users from receiving the same notification multiple times when events arrive in bursts. Use a content-based hash with a time window.

class NotificationDeduplicator:
    def __init__(self, window_seconds: int = 300):
        self.window = window_seconds
        self.seen: dict[str, float] = {}

    def _compute_key(self, notification: AgentNotification) -> str:
        if notification.dedup_key:
            return notification.dedup_key

        content = f"{notification.agent_id}:{notification.title}:{notification.user_id}"
        return hashlib.sha256(content.encode()).hexdigest()[:16]

    def is_duplicate(self, notification: AgentNotification) -> bool:
        key = self._compute_key(notification)
        now = time.time()

        # Prune expired entries
        expired = [k for k, t in self.seen.items() if now - t > self.window]
        for k in expired:
            del self.seen[k]

        if key in self.seen:
            return True

        self.seen[key] = now
        return False

Multi-Channel Delivery

Each delivery channel has its own adapter. The dispatcher routes notifications based on the channels specified in the trigger rule and the user's preferences.

from abc import ABC, abstractmethod

class DeliveryAdapter(ABC):
    @abstractmethod
    async def send(self, notification: AgentNotification) -> bool:
        pass

class PushAdapter(DeliveryAdapter):
    async def send(self, notification: AgentNotification) -> bool:
        # WebSocket push to connected clients
        session = active_sessions.get(notification.user_id)
        if session:
            await session.ws.send_json({
                "type": "notification",
                "title": notification.title,
                "body": notification.body,
                "priority": notification.priority.value,
                "agent_id": notification.agent_id,
            })
            return True
        return False

class EmailAdapter(DeliveryAdapter):
    def __init__(self, smtp_host: str, from_addr: str):
        self.smtp_host = smtp_host
        self.from_addr = from_addr

    async def send(self, notification: AgentNotification) -> bool:
        user_email = await get_user_email(notification.user_id)
        if not user_email:
            return False

        # Use aiosmtplib or similar async email library
        await send_email(
            to=user_email,
            subject=f"[{notification.priority.value.upper()}] {notification.title}",
            body=notification.body,
            from_addr=self.from_addr,
        )
        return True

class NotificationDispatcher:
    def __init__(self):
        self.adapters: dict[NotificationChannel, DeliveryAdapter] = {}
        self.dedup = NotificationDeduplicator()

    def register_adapter(self, channel: NotificationChannel, adapter: DeliveryAdapter):
        self.adapters[channel] = adapter

    async def dispatch(self, notification: AgentNotification):
        if self.dedup.is_duplicate(notification):
            return

        user_prefs = await get_user_notification_prefs(notification.user_id)

        for channel in notification.channels:
            if not user_prefs.get(channel.value, True):
                continue  # User has disabled this channel

            if channel == NotificationChannel.SMS:
                if notification.priority not in (
                    NotificationPriority.HIGH,
                    NotificationPriority.CRITICAL,
                ):
                    continue  # Only send SMS for high/critical

            adapter = self.adapters.get(channel)
            if adapter:
                try:
                    await adapter.send(notification)
                except Exception as e:
                    print(f"Delivery failed on {channel}: {e}")

The dispatcher respects user preferences and applies channel-specific rules. SMS is restricted to high and critical priority to avoid overusing an expensive channel. If push delivery fails (user not connected), the notification can fall through to email.

FAQ

How do you handle notification preferences across different AI agents?

Store preferences at two levels: global defaults and per-agent overrides. A user might want push notifications for all agents by default, but email-only for a nightly batch processing agent. Store these preferences in a JSON structure like {"default": {"push": true, "email": true, "sms": false}, "agents": {"batch-agent": {"push": false, "email": true}}}. When dispatching, merge the agent-specific settings over the defaults.

What is the best way to handle notification delivery failures and retries?

Implement a retry queue with exponential backoff for each delivery channel independently. If email delivery fails, retry 3 times with delays of 30 seconds, 2 minutes, and 10 minutes. If all retries fail, log the failure and optionally try a fallback channel (e.g., push instead of email). Keep a delivery status log so you can audit which notifications reached users and which were lost. For critical notifications, require at least two channels to succeed before marking delivery as complete.

How do you prevent alert fatigue when AI agents generate many events?

Layer three mechanisms: trigger cooldowns (minimum time between alerts for the same rule), deduplication (content-based suppression within a time window), and digest batching. For low-priority notifications, batch them into a single daily or hourly digest email instead of sending individual alerts. Let users configure their own thresholds — some prefer immediate alerts for everything, while others only want to be notified about critical issues.


#Notifications #RealTimeAI #Email #SMS #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.