AI Agent for IoT Devices: Processing Sensor Data with Local Intelligence
Build an AI agent that processes IoT sensor data locally for real-time anomaly detection, with intelligent cloud reporting for aggregated insights and alerts.
Why Local AI for IoT
IoT devices generate massive volumes of sensor data — temperature readings every second, vibration measurements at 1 kHz, camera frames at 30 fps. Sending all of this to the cloud for processing is expensive, slow, and unnecessary. Most data is normal — only anomalies matter.
An edge AI agent sitting on the IoT gateway processes sensor streams locally, detects anomalies in real time, and sends only significant events to the cloud. This reduces bandwidth by 90 to 99 percent, enables sub-second response times, and keeps the system operational during network outages.
Sensor Data Ingestion
Start with a data ingestion layer that reads from multiple sensors and normalizes their output:
import asyncio
import time
from dataclasses import dataclass, field
from typing import AsyncIterator
from collections import deque
@dataclass
class SensorReading:
sensor_id: str
metric: str
value: float
timestamp: float = field(default_factory=time.time)
unit: str = ""
class SensorIngester:
"""Collects readings from multiple sensors."""
def __init__(self, buffer_size: int = 1000):
self.buffer: deque[SensorReading] = deque(maxlen=buffer_size)
self.subscribers: list[asyncio.Queue] = []
def add_reading(self, reading: SensorReading):
self.buffer.append(reading)
for queue in self.subscribers:
try:
queue.put_nowait(reading)
except asyncio.QueueFull:
pass # Drop if subscriber is too slow
def subscribe(self) -> asyncio.Queue:
queue = asyncio.Queue(maxsize=100)
self.subscribers.append(queue)
return queue
def get_recent(self, sensor_id: str, count: int = 100) -> list[SensorReading]:
return [
r for r in self.buffer
if r.sensor_id == sensor_id
][-count:]
Local Anomaly Detection
The core of the IoT agent is an anomaly detection model that runs locally. For sensor data, statistical methods combined with a lightweight neural network work well:
import numpy as np
from dataclasses import dataclass
@dataclass
class AnomalyResult:
is_anomaly: bool
score: float
reason: str
sensor_id: str
reading: SensorReading
class AnomalyDetector:
"""Detects anomalies in sensor data using statistical and ML methods."""
def __init__(self, window_size: int = 100, z_threshold: float = 3.0):
self.window_size = window_size
self.z_threshold = z_threshold
self.histories: dict[str, deque] = {}
self.baselines: dict[str, dict] = {}
def update_baseline(self, sensor_id: str):
"""Recalculate baseline statistics for a sensor."""
if sensor_id not in self.histories:
return
values = list(self.histories[sensor_id])
if len(values) < 10:
return
arr = np.array(values)
self.baselines[sensor_id] = {
"mean": float(np.mean(arr)),
"std": float(np.std(arr)),
"min": float(np.min(arr)),
"max": float(np.max(arr)),
"q1": float(np.percentile(arr, 25)),
"q3": float(np.percentile(arr, 75)),
}
def check(self, reading: SensorReading) -> AnomalyResult:
sensor_key = f"{reading.sensor_id}:{reading.metric}"
# Initialize history if new sensor
if sensor_key not in self.histories:
self.histories[sensor_key] = deque(maxlen=self.window_size)
self.histories[sensor_key].append(reading.value)
# Need minimum data for detection
if len(self.histories[sensor_key]) < 20:
return AnomalyResult(
is_anomaly=False, score=0.0,
reason="Insufficient data for baseline",
sensor_id=reading.sensor_id, reading=reading,
)
# Recalculate baseline periodically
if len(self.histories[sensor_key]) % 50 == 0:
self.update_baseline(sensor_key)
baseline = self.baselines.get(sensor_key)
if not baseline:
self.update_baseline(sensor_key)
baseline = self.baselines.get(sensor_key)
# Z-score anomaly detection
if baseline["std"] > 0:
z_score = abs(reading.value - baseline["mean"]) / baseline["std"]
else:
z_score = 0.0
# Rate of change detection
recent = list(self.histories[sensor_key])[-5:]
if len(recent) >= 2:
rate = abs(recent[-1] - recent[-2])
avg_rate = np.mean([
abs(recent[i] - recent[i - 1]) for i in range(1, len(recent))
])
rate_anomaly = rate > avg_rate * 5 if avg_rate > 0 else False
else:
rate_anomaly = False
is_anomaly = z_score > self.z_threshold or rate_anomaly
reason = []
if z_score > self.z_threshold:
reason.append(f"Z-score {z_score:.2f} exceeds threshold {self.z_threshold}")
if rate_anomaly:
reason.append(f"Sudden rate change detected")
return AnomalyResult(
is_anomaly=is_anomaly,
score=min(z_score / self.z_threshold, 1.0),
reason="; ".join(reason) if reason else "Normal",
sensor_id=reading.sensor_id,
reading=reading,
)
ONNX-Based Neural Anomaly Detection
For more sophisticated detection, deploy a trained autoencoder that learns normal patterns and flags deviations:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import onnxruntime as ort
import numpy as np
class NeuralAnomalyDetector:
"""Uses an autoencoder to detect anomalies in sensor time series."""
def __init__(self, model_path: str, reconstruction_threshold: float = 0.05):
self.session = ort.InferenceSession(
model_path, providers=["CPUExecutionProvider"]
)
self.threshold = reconstruction_threshold
self.window_size = 30 # Model expects 30-step sequences
def detect(self, sensor_values: list[float]) -> dict:
if len(sensor_values) < self.window_size:
return {"is_anomaly": False, "score": 0.0}
# Take last N values and normalize
window = np.array(sensor_values[-self.window_size:], dtype=np.float32)
mean, std = window.mean(), window.std()
if std > 0:
normalized = (window - mean) / std
else:
normalized = window - mean
# Run through autoencoder
input_data = normalized.reshape(1, self.window_size, 1)
reconstructed = self.session.run(None, {"input": input_data})[0]
# Reconstruction error = anomaly score
mse = float(np.mean((input_data - reconstructed) ** 2))
return {
"is_anomaly": mse > self.threshold,
"score": min(mse / self.threshold, 1.0),
"reconstruction_error": mse,
}
Cloud Reporting
Only anomalies and periodic summaries get sent to the cloud:
import json
import aiohttp
from datetime import datetime
class CloudReporter:
"""Reports anomalies and summaries to cloud backend."""
def __init__(self, api_url: str, device_id: str, api_key: str):
self.api_url = api_url
self.device_id = device_id
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
self.pending_reports: list[dict] = []
async def report_anomaly(self, anomaly: AnomalyResult):
payload = {
"device_id": self.device_id,
"sensor_id": anomaly.sensor_id,
"value": anomaly.reading.value,
"score": anomaly.score,
"reason": anomaly.reason,
"timestamp": datetime.utcnow().isoformat(),
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.api_url}/anomalies",
json=payload,
headers=self.headers,
) as resp:
if resp.status != 201:
self.pending_reports.append(payload)
except Exception:
self.pending_reports.append(payload)
async def send_hourly_summary(self, detector: AnomalyDetector):
summaries = {}
for key, baseline in detector.baselines.items():
summaries[key] = {**baseline, "data_points": len(detector.histories[key])}
await self._post("/summaries", {
"device_id": self.device_id,
"timestamp": datetime.utcnow().isoformat(),
"sensors": summaries,
})
Main Agent Loop
async def run_iot_agent():
ingester = SensorIngester()
detector = AnomalyDetector(z_threshold=3.0)
reporter = CloudReporter("https://api.example.com", "gateway-01", "key")
readings_queue = ingester.subscribe()
while True:
reading = await readings_queue.get()
result = detector.check(reading)
if result.is_anomaly:
print(f"ANOMALY: {result.sensor_id} = {result.reading.value} ({result.reason})")
await reporter.report_anomaly(result)
FAQ
What hardware should I use as an IoT AI gateway?
A Raspberry Pi 5 or an NVIDIA Jetson Nano handles most IoT agent workloads. For industrial environments, consider the Jetson Orin Nano which provides more GPU compute for running larger anomaly detection models. The key requirement is enough RAM to hold your model (1 to 4 GB) plus the sensor data buffer.
How do I train the autoencoder anomaly detection model?
Collect 2 to 4 weeks of normal sensor data. Build a simple autoencoder with an encoder that compresses the input window to a small latent space and a decoder that reconstructs it. Train on normal data only. At inference time, high reconstruction error means the current pattern deviates from learned normal behavior — that is your anomaly signal.
Can this approach handle hundreds of sensors simultaneously?
Yes. The statistical detector is O(1) per reading — it maintains a sliding window per sensor and computes simple statistics. The neural detector is more expensive but you can batch multiple sensor windows into a single inference call. On a Raspberry Pi 5, you can process about 10,000 readings per second with the statistical detector and 500 to 1,000 with the neural detector.
#IoT #SensorData #AnomalyDetection #EdgeAI #LocalInference #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.