Skip to content
Learn Agentic AI10 min read0 views

AI Agent for IoT Devices: Processing Sensor Data with Local Intelligence

Build an AI agent that processes IoT sensor data locally for real-time anomaly detection, with intelligent cloud reporting for aggregated insights and alerts.

Why Local AI for IoT

IoT devices generate massive volumes of sensor data — temperature readings every second, vibration measurements at 1 kHz, camera frames at 30 fps. Sending all of this to the cloud for processing is expensive, slow, and unnecessary. Most data is normal — only anomalies matter.

An edge AI agent sitting on the IoT gateway processes sensor streams locally, detects anomalies in real time, and sends only significant events to the cloud. This reduces bandwidth by 90 to 99 percent, enables sub-second response times, and keeps the system operational during network outages.

Sensor Data Ingestion

Start with a data ingestion layer that reads from multiple sensors and normalizes their output:

import asyncio
import time
from dataclasses import dataclass, field
from typing import AsyncIterator
from collections import deque

@dataclass
class SensorReading:
    sensor_id: str
    metric: str
    value: float
    timestamp: float = field(default_factory=time.time)
    unit: str = ""

class SensorIngester:
    """Collects readings from multiple sensors."""

    def __init__(self, buffer_size: int = 1000):
        self.buffer: deque[SensorReading] = deque(maxlen=buffer_size)
        self.subscribers: list[asyncio.Queue] = []

    def add_reading(self, reading: SensorReading):
        self.buffer.append(reading)
        for queue in self.subscribers:
            try:
                queue.put_nowait(reading)
            except asyncio.QueueFull:
                pass  # Drop if subscriber is too slow

    def subscribe(self) -> asyncio.Queue:
        queue = asyncio.Queue(maxsize=100)
        self.subscribers.append(queue)
        return queue

    def get_recent(self, sensor_id: str, count: int = 100) -> list[SensorReading]:
        return [
            r for r in self.buffer
            if r.sensor_id == sensor_id
        ][-count:]

Local Anomaly Detection

The core of the IoT agent is an anomaly detection model that runs locally. For sensor data, statistical methods combined with a lightweight neural network work well:

import numpy as np
from dataclasses import dataclass

@dataclass
class AnomalyResult:
    is_anomaly: bool
    score: float
    reason: str
    sensor_id: str
    reading: SensorReading

class AnomalyDetector:
    """Detects anomalies in sensor data using statistical and ML methods."""

    def __init__(self, window_size: int = 100, z_threshold: float = 3.0):
        self.window_size = window_size
        self.z_threshold = z_threshold
        self.histories: dict[str, deque] = {}
        self.baselines: dict[str, dict] = {}

    def update_baseline(self, sensor_id: str):
        """Recalculate baseline statistics for a sensor."""
        if sensor_id not in self.histories:
            return
        values = list(self.histories[sensor_id])
        if len(values) < 10:
            return
        arr = np.array(values)
        self.baselines[sensor_id] = {
            "mean": float(np.mean(arr)),
            "std": float(np.std(arr)),
            "min": float(np.min(arr)),
            "max": float(np.max(arr)),
            "q1": float(np.percentile(arr, 25)),
            "q3": float(np.percentile(arr, 75)),
        }

    def check(self, reading: SensorReading) -> AnomalyResult:
        sensor_key = f"{reading.sensor_id}:{reading.metric}"

        # Initialize history if new sensor
        if sensor_key not in self.histories:
            self.histories[sensor_key] = deque(maxlen=self.window_size)

        self.histories[sensor_key].append(reading.value)

        # Need minimum data for detection
        if len(self.histories[sensor_key]) < 20:
            return AnomalyResult(
                is_anomaly=False, score=0.0,
                reason="Insufficient data for baseline",
                sensor_id=reading.sensor_id, reading=reading,
            )

        # Recalculate baseline periodically
        if len(self.histories[sensor_key]) % 50 == 0:
            self.update_baseline(sensor_key)

        baseline = self.baselines.get(sensor_key)
        if not baseline:
            self.update_baseline(sensor_key)
            baseline = self.baselines.get(sensor_key)

        # Z-score anomaly detection
        if baseline["std"] > 0:
            z_score = abs(reading.value - baseline["mean"]) / baseline["std"]
        else:
            z_score = 0.0

        # Rate of change detection
        recent = list(self.histories[sensor_key])[-5:]
        if len(recent) >= 2:
            rate = abs(recent[-1] - recent[-2])
            avg_rate = np.mean([
                abs(recent[i] - recent[i - 1]) for i in range(1, len(recent))
            ])
            rate_anomaly = rate > avg_rate * 5 if avg_rate > 0 else False
        else:
            rate_anomaly = False

        is_anomaly = z_score > self.z_threshold or rate_anomaly
        reason = []
        if z_score > self.z_threshold:
            reason.append(f"Z-score {z_score:.2f} exceeds threshold {self.z_threshold}")
        if rate_anomaly:
            reason.append(f"Sudden rate change detected")

        return AnomalyResult(
            is_anomaly=is_anomaly,
            score=min(z_score / self.z_threshold, 1.0),
            reason="; ".join(reason) if reason else "Normal",
            sensor_id=reading.sensor_id,
            reading=reading,
        )

ONNX-Based Neural Anomaly Detection

For more sophisticated detection, deploy a trained autoencoder that learns normal patterns and flags deviations:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import onnxruntime as ort
import numpy as np

class NeuralAnomalyDetector:
    """Uses an autoencoder to detect anomalies in sensor time series."""

    def __init__(self, model_path: str, reconstruction_threshold: float = 0.05):
        self.session = ort.InferenceSession(
            model_path, providers=["CPUExecutionProvider"]
        )
        self.threshold = reconstruction_threshold
        self.window_size = 30  # Model expects 30-step sequences

    def detect(self, sensor_values: list[float]) -> dict:
        if len(sensor_values) < self.window_size:
            return {"is_anomaly": False, "score": 0.0}

        # Take last N values and normalize
        window = np.array(sensor_values[-self.window_size:], dtype=np.float32)
        mean, std = window.mean(), window.std()
        if std > 0:
            normalized = (window - mean) / std
        else:
            normalized = window - mean

        # Run through autoencoder
        input_data = normalized.reshape(1, self.window_size, 1)
        reconstructed = self.session.run(None, {"input": input_data})[0]

        # Reconstruction error = anomaly score
        mse = float(np.mean((input_data - reconstructed) ** 2))

        return {
            "is_anomaly": mse > self.threshold,
            "score": min(mse / self.threshold, 1.0),
            "reconstruction_error": mse,
        }

Cloud Reporting

Only anomalies and periodic summaries get sent to the cloud:

import json
import aiohttp
from datetime import datetime

class CloudReporter:
    """Reports anomalies and summaries to cloud backend."""

    def __init__(self, api_url: str, device_id: str, api_key: str):
        self.api_url = api_url
        self.device_id = device_id
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
        }
        self.pending_reports: list[dict] = []

    async def report_anomaly(self, anomaly: AnomalyResult):
        payload = {
            "device_id": self.device_id,
            "sensor_id": anomaly.sensor_id,
            "value": anomaly.reading.value,
            "score": anomaly.score,
            "reason": anomaly.reason,
            "timestamp": datetime.utcnow().isoformat(),
        }
        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(
                    f"{self.api_url}/anomalies",
                    json=payload,
                    headers=self.headers,
                ) as resp:
                    if resp.status != 201:
                        self.pending_reports.append(payload)
        except Exception:
            self.pending_reports.append(payload)

    async def send_hourly_summary(self, detector: AnomalyDetector):
        summaries = {}
        for key, baseline in detector.baselines.items():
            summaries[key] = {**baseline, "data_points": len(detector.histories[key])}

        await self._post("/summaries", {
            "device_id": self.device_id,
            "timestamp": datetime.utcnow().isoformat(),
            "sensors": summaries,
        })

Main Agent Loop

async def run_iot_agent():
    ingester = SensorIngester()
    detector = AnomalyDetector(z_threshold=3.0)
    reporter = CloudReporter("https://api.example.com", "gateway-01", "key")
    readings_queue = ingester.subscribe()

    while True:
        reading = await readings_queue.get()
        result = detector.check(reading)

        if result.is_anomaly:
            print(f"ANOMALY: {result.sensor_id} = {result.reading.value} ({result.reason})")
            await reporter.report_anomaly(result)

FAQ

What hardware should I use as an IoT AI gateway?

A Raspberry Pi 5 or an NVIDIA Jetson Nano handles most IoT agent workloads. For industrial environments, consider the Jetson Orin Nano which provides more GPU compute for running larger anomaly detection models. The key requirement is enough RAM to hold your model (1 to 4 GB) plus the sensor data buffer.

How do I train the autoencoder anomaly detection model?

Collect 2 to 4 weeks of normal sensor data. Build a simple autoencoder with an encoder that compresses the input window to a small latent space and a decoder that reconstructs it. Train on normal data only. At inference time, high reconstruction error means the current pattern deviates from learned normal behavior — that is your anomaly signal.

Can this approach handle hundreds of sensors simultaneously?

Yes. The statistical detector is O(1) per reading — it maintains a sliding window per sensor and computes simple statistics. The neural detector is more expensive but you can batch multiple sensor windows into a single inference call. On a Raspberry Pi 5, you can process about 10,000 readings per second with the statistical detector and 500 to 1,000 with the neural detector.


#IoT #SensorData #AnomalyDetection #EdgeAI #LocalInference #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.