Skip to content
Learn Agentic AI10 min read0 views

AI Agent for Log Analysis at Scale: Pattern Detection Across Millions of Log Lines

Build an AI agent that aggregates logs from multiple services, extracts patterns using clustering and LLM analysis, correlates events across systems, and generates actionable alerts.

The Problem with Logs at Scale

Modern microservice architectures generate millions of log lines per hour. A single user request can touch 15 services, producing logs in different formats, at different verbosity levels, across different time zones. No human can read all of this. Traditional log tools let you search for known patterns but they cannot discover unknown problems. An AI log analysis agent fills this gap by finding patterns you did not know to look for.

Log Ingestion and Normalization

The agent reads from your existing log aggregation system (Elasticsearch, Loki, CloudWatch) and normalizes entries into a common structure.

import re
import httpx
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

@dataclass
class LogEntry:
    timestamp: datetime
    service: str
    level: str  # ERROR, WARN, INFO, DEBUG
    message: str
    trace_id: Optional[str] = None
    span_id: Optional[str] = None
    raw: str = ""

class LokiLogReader:
    def __init__(self, base_url: str = "http://loki:3100"):
        self.base_url = base_url
        self.client = httpx.AsyncClient(timeout=60)

    async def query(
        self, logql: str, limit: int = 5000, hours_back: int = 1
    ) -> list[LogEntry]:
        end_ns = int(datetime.utcnow().timestamp() * 1e9)
        start_ns = end_ns - (hours_back * 3600 * int(1e9))

        resp = await self.client.get(
            f"{self.base_url}/loki/api/v1/query_range",
            params={
                "query": logql,
                "start": str(start_ns),
                "end": str(end_ns),
                "limit": limit,
            },
        )
        entries = []
        for stream in resp.json()["data"]["result"]:
            service = stream["stream"].get("app", "unknown")
            for ts, line in stream["values"]:
                entries.append(self._parse_line(service, ts, line))
        return entries

    def _parse_line(self, service: str, ts: str, line: str) -> LogEntry:
        level = "INFO"
        for lvl in ["ERROR", "WARN", "DEBUG", "FATAL"]:
            if lvl in line.upper():
                level = lvl
                break

        trace_match = re.search(r'trace_id[=:]\s*([a-f0-9]{32})', line)
        return LogEntry(
            timestamp=datetime.fromtimestamp(int(ts) / 1e9),
            service=service,
            level=level,
            message=line,
            trace_id=trace_match.group(1) if trace_match else None,
            raw=line,
        )

Pattern Extraction with Log Clustering

Instead of sending millions of raw log lines to an LLM, the agent first clusters similar log messages and then asks the LLM to analyze representative samples from each cluster.

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import DBSCAN
import numpy as np

class LogClusterer:
    def __init__(self, eps: float = 0.3, min_samples: int = 5):
        self.vectorizer = TfidfVectorizer(
            max_features=1000,
            stop_words="english",
            token_pattern=r"[a-zA-Z_]+",
        )
        self.clusterer = DBSCAN(eps=eps, min_samples=min_samples, metric="cosine")

    def cluster_logs(self, entries: list[LogEntry]) -> dict[int, list[LogEntry]]:
        messages = [self._normalize(e.message) for e in entries]
        tfidf_matrix = self.vectorizer.fit_transform(messages)
        labels = self.clusterer.fit_predict(tfidf_matrix)

        clusters: dict[int, list[LogEntry]] = {}
        for label, entry in zip(labels, entries):
            if label == -1:
                continue  # noise
            clusters.setdefault(label, []).append(entry)

        return dict(sorted(
            clusters.items(),
            key=lambda x: len(x[1]),
            reverse=True,
        ))

    def _normalize(self, message: str) -> str:
        """Replace variable parts with placeholders."""
        msg = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', 'IP_ADDR', message)
        msg = re.sub(r'\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b', 'UUID', msg)
        msg = re.sub(r'\b\d{10,13}\b', 'TIMESTAMP', msg)
        msg = re.sub(r'\b\d+\b', 'NUM', msg)
        return msg

LLM-Powered Pattern Analysis

The agent sends cluster summaries rather than raw logs to the LLM, keeping costs low while getting intelligent analysis.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import openai
import json

async def analyze_clusters(clusters: dict[int, list[LogEntry]]) -> list[dict]:
    client = openai.AsyncOpenAI()
    findings = []

    for cluster_id, entries in list(clusters.items())[:20]:
        sample = [e.message for e in entries[:10]]
        services = list(set(e.service for e in entries))
        levels = list(set(e.level for e in entries))

        response = await client.chat.completions.create(
            model="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""Analyze this log pattern cluster.

Cluster size: {len(entries)} log lines
Services affected: {services}
Log levels: {levels}
Time range: {entries[0].timestamp} to {entries[-1].timestamp}
Sample messages:
{chr(10).join(sample)}

Determine:
1. What this pattern represents
2. Whether it indicates a problem (critical/warning/info)
3. Likely root cause if it is a problem
4. Recommended action

Return JSON: pattern_summary, severity, root_cause, recommended_action."""
            }],
            response_format={"type": "json_object"},
            temperature=0.1,
        )
        finding = json.loads(response.choices[0].message.content)
        finding["cluster_id"] = cluster_id
        finding["count"] = len(entries)
        finding["services"] = services
        findings.append(finding)

    return findings

Cross-Service Correlation

The agent uses trace IDs to correlate errors across services and build a dependency graph of failures.

from collections import defaultdict

def correlate_errors_by_trace(entries: list[LogEntry]) -> list[dict]:
    """Group error entries by trace_id to find cross-service failure chains."""
    trace_groups: dict[str, list[LogEntry]] = defaultdict(list)

    for entry in entries:
        if entry.trace_id and entry.level in ("ERROR", "FATAL"):
            trace_groups[entry.trace_id].append(entry)

    chains = []
    for trace_id, error_entries in trace_groups.items():
        if len(error_entries) < 2:
            continue  # single-service error, not a chain

        services_hit = list(set(e.service for e in error_entries))
        sorted_entries = sorted(error_entries, key=lambda e: e.timestamp)

        chains.append({
            "trace_id": trace_id,
            "services": services_hit,
            "origin_service": sorted_entries[0].service,
            "first_error": sorted_entries[0].message,
            "last_error": sorted_entries[-1].message,
            "propagation_time_ms": (
                sorted_entries[-1].timestamp - sorted_entries[0].timestamp
            ).total_seconds() * 1000,
        })

    return sorted(chains, key=lambda c: len(c["services"]), reverse=True)

FAQ

How do I handle the cost of sending logs to an LLM?

Never send raw logs to an LLM. Always cluster first, then send representative samples. With DBSCAN clustering, you can reduce millions of log lines down to 20-50 clusters. The LLM only analyzes the cluster summaries, keeping costs under a few cents per analysis cycle.

What about structured logs versus unstructured logs?

The clustering approach handles both. For structured JSON logs, extract the message field for clustering and keep the structured fields as metadata. For unstructured logs, the TF-IDF vectorizer and normalization step handle variable content like IPs, UUIDs, and timestamps by replacing them with placeholders before clustering.

How does the agent distinguish between a new error pattern and a known recurring issue?

Maintain a pattern fingerprint database. After each analysis cycle, hash the cluster centroid and store it with its analysis result. When the same fingerprint appears again, reference the existing analysis instead of re-analyzing. Only escalate patterns that are new or have significantly increased in frequency.


#LogAnalysis #Observability #DevOps #PatternDetection #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.