AI Agent for Log Analysis at Scale: Pattern Detection Across Millions of Log Lines
Build an AI agent that aggregates logs from multiple services, extracts patterns using clustering and LLM analysis, correlates events across systems, and generates actionable alerts.
The Problem with Logs at Scale
Modern microservice architectures generate millions of log lines per hour. A single user request can touch 15 services, producing logs in different formats, at different verbosity levels, across different time zones. No human can read all of this. Traditional log tools let you search for known patterns but they cannot discover unknown problems. An AI log analysis agent fills this gap by finding patterns you did not know to look for.
Log Ingestion and Normalization
The agent reads from your existing log aggregation system (Elasticsearch, Loki, CloudWatch) and normalizes entries into a common structure.
import re
import httpx
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class LogEntry:
timestamp: datetime
service: str
level: str # ERROR, WARN, INFO, DEBUG
message: str
trace_id: Optional[str] = None
span_id: Optional[str] = None
raw: str = ""
class LokiLogReader:
def __init__(self, base_url: str = "http://loki:3100"):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=60)
async def query(
self, logql: str, limit: int = 5000, hours_back: int = 1
) -> list[LogEntry]:
end_ns = int(datetime.utcnow().timestamp() * 1e9)
start_ns = end_ns - (hours_back * 3600 * int(1e9))
resp = await self.client.get(
f"{self.base_url}/loki/api/v1/query_range",
params={
"query": logql,
"start": str(start_ns),
"end": str(end_ns),
"limit": limit,
},
)
entries = []
for stream in resp.json()["data"]["result"]:
service = stream["stream"].get("app", "unknown")
for ts, line in stream["values"]:
entries.append(self._parse_line(service, ts, line))
return entries
def _parse_line(self, service: str, ts: str, line: str) -> LogEntry:
level = "INFO"
for lvl in ["ERROR", "WARN", "DEBUG", "FATAL"]:
if lvl in line.upper():
level = lvl
break
trace_match = re.search(r'trace_id[=:]\s*([a-f0-9]{32})', line)
return LogEntry(
timestamp=datetime.fromtimestamp(int(ts) / 1e9),
service=service,
level=level,
message=line,
trace_id=trace_match.group(1) if trace_match else None,
raw=line,
)
Pattern Extraction with Log Clustering
Instead of sending millions of raw log lines to an LLM, the agent first clusters similar log messages and then asks the LLM to analyze representative samples from each cluster.
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import DBSCAN
import numpy as np
class LogClusterer:
def __init__(self, eps: float = 0.3, min_samples: int = 5):
self.vectorizer = TfidfVectorizer(
max_features=1000,
stop_words="english",
token_pattern=r"[a-zA-Z_]+",
)
self.clusterer = DBSCAN(eps=eps, min_samples=min_samples, metric="cosine")
def cluster_logs(self, entries: list[LogEntry]) -> dict[int, list[LogEntry]]:
messages = [self._normalize(e.message) for e in entries]
tfidf_matrix = self.vectorizer.fit_transform(messages)
labels = self.clusterer.fit_predict(tfidf_matrix)
clusters: dict[int, list[LogEntry]] = {}
for label, entry in zip(labels, entries):
if label == -1:
continue # noise
clusters.setdefault(label, []).append(entry)
return dict(sorted(
clusters.items(),
key=lambda x: len(x[1]),
reverse=True,
))
def _normalize(self, message: str) -> str:
"""Replace variable parts with placeholders."""
msg = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', 'IP_ADDR', message)
msg = re.sub(r'\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b', 'UUID', msg)
msg = re.sub(r'\b\d{10,13}\b', 'TIMESTAMP', msg)
msg = re.sub(r'\b\d+\b', 'NUM', msg)
return msg
LLM-Powered Pattern Analysis
The agent sends cluster summaries rather than raw logs to the LLM, keeping costs low while getting intelligent analysis.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import openai
import json
async def analyze_clusters(clusters: dict[int, list[LogEntry]]) -> list[dict]:
client = openai.AsyncOpenAI()
findings = []
for cluster_id, entries in list(clusters.items())[:20]:
sample = [e.message for e in entries[:10]]
services = list(set(e.service for e in entries))
levels = list(set(e.level for e in entries))
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "user",
"content": f"""Analyze this log pattern cluster.
Cluster size: {len(entries)} log lines
Services affected: {services}
Log levels: {levels}
Time range: {entries[0].timestamp} to {entries[-1].timestamp}
Sample messages:
{chr(10).join(sample)}
Determine:
1. What this pattern represents
2. Whether it indicates a problem (critical/warning/info)
3. Likely root cause if it is a problem
4. Recommended action
Return JSON: pattern_summary, severity, root_cause, recommended_action."""
}],
response_format={"type": "json_object"},
temperature=0.1,
)
finding = json.loads(response.choices[0].message.content)
finding["cluster_id"] = cluster_id
finding["count"] = len(entries)
finding["services"] = services
findings.append(finding)
return findings
Cross-Service Correlation
The agent uses trace IDs to correlate errors across services and build a dependency graph of failures.
from collections import defaultdict
def correlate_errors_by_trace(entries: list[LogEntry]) -> list[dict]:
"""Group error entries by trace_id to find cross-service failure chains."""
trace_groups: dict[str, list[LogEntry]] = defaultdict(list)
for entry in entries:
if entry.trace_id and entry.level in ("ERROR", "FATAL"):
trace_groups[entry.trace_id].append(entry)
chains = []
for trace_id, error_entries in trace_groups.items():
if len(error_entries) < 2:
continue # single-service error, not a chain
services_hit = list(set(e.service for e in error_entries))
sorted_entries = sorted(error_entries, key=lambda e: e.timestamp)
chains.append({
"trace_id": trace_id,
"services": services_hit,
"origin_service": sorted_entries[0].service,
"first_error": sorted_entries[0].message,
"last_error": sorted_entries[-1].message,
"propagation_time_ms": (
sorted_entries[-1].timestamp - sorted_entries[0].timestamp
).total_seconds() * 1000,
})
return sorted(chains, key=lambda c: len(c["services"]), reverse=True)
FAQ
How do I handle the cost of sending logs to an LLM?
Never send raw logs to an LLM. Always cluster first, then send representative samples. With DBSCAN clustering, you can reduce millions of log lines down to 20-50 clusters. The LLM only analyzes the cluster summaries, keeping costs under a few cents per analysis cycle.
What about structured logs versus unstructured logs?
The clustering approach handles both. For structured JSON logs, extract the message field for clustering and keep the structured fields as metadata. For unstructured logs, the TF-IDF vectorizer and normalization step handle variable content like IPs, UUIDs, and timestamps by replacing them with placeholders before clustering.
How does the agent distinguish between a new error pattern and a known recurring issue?
Maintain a pattern fingerprint database. After each analysis cycle, hash the cluster centroid and store it with its analysis result. When the same fingerprint appears again, reference the existing analysis instead of re-analyzing. Only escalate patterns that are new or have significantly increased in frequency.
#LogAnalysis #Observability #DevOps #PatternDetection #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.