Skip to content
Learn Agentic AI13 min read0 views

AI Agent for Log Analysis: Automated Error Detection and Root Cause Analysis

Build an AI agent that parses application logs, detects error patterns, identifies anomalies, correlates events across services, and generates root cause analysis reports automatically.

The Log Analysis Challenge

Modern applications generate enormous volumes of logs across multiple services. When an incident occurs, engineers spend most of their time searching through logs, correlating timestamps, and piecing together what happened. An AI log analysis agent automates this process: it ingests logs, detects anomalies, clusters related errors, and produces a root cause analysis that points engineers to the exact sequence of events that triggered the problem.

Structured Log Parsing

The first step is converting raw log lines into structured data the agent can reason about.

import re
from datetime import datetime
from dataclasses import dataclass, field
from collections import Counter
from openai import OpenAI

client = OpenAI()

@dataclass
class LogEntry:
    timestamp: datetime
    level: str
    service: str
    message: str
    raw: str
    metadata: dict = field(default_factory=dict)

class LogParser:
    PATTERNS = [
        re.compile(
            r"(?P<timestamp>[\d-]+ [\d:,.]+)\s+"
            r"(?P<level>DEBUG|INFO|WARNING|ERROR|CRITICAL)\s+"
            r"\[(?P<service>[^\]]+)\]\s+"
            r"(?P<message>.+)"
        ),
        re.compile(
            r"(?P<timestamp>[\d-]+T[\d:.]+Z?)\s+"
            r"(?P<level>\w+)\s+"
            r"(?P<service>[\w.-]+):\s+"
            r"(?P<message>.+)"
        ),
    ]

    def parse(self, raw_logs: str) -> list[LogEntry]:
        entries = []
        for line in raw_logs.strip().split("\n"):
            if not line.strip():
                continue
            entry = self._parse_line(line)
            if entry:
                entries.append(entry)
        entries.sort(key=lambda e: e.timestamp)
        return entries

    def _parse_line(self, line: str) -> LogEntry | None:
        for pattern in self.PATTERNS:
            match = pattern.match(line)
            if match:
                groups = match.groupdict()
                ts_str = groups["timestamp"]
                for fmt in [
                    "%Y-%m-%d %H:%M:%S,%f",
                    "%Y-%m-%dT%H:%M:%S.%fZ",
                    "%Y-%m-%d %H:%M:%S",
                ]:
                    try:
                        ts = datetime.strptime(ts_str, fmt)
                        break
                    except ValueError:
                        continue
                else:
                    ts = datetime.now()
                return LogEntry(
                    timestamp=ts, level=groups["level"],
                    service=groups["service"],
                    message=groups["message"], raw=line,
                )
        return None

Error Pattern Detection

The agent groups errors by message pattern and identifies the most common failures.

class LogAnalysisAgent:
    def __init__(self, model: str = "gpt-4o"):
        self.model = model
        self.parser = LogParser()

    def detect_error_patterns(
        self, entries: list[LogEntry]
    ) -> list[dict]:
        errors = [e for e in entries if e.level in ("ERROR", "CRITICAL")]
        if not errors:
            return []

        normalized = []
        for entry in errors:
            msg = re.sub(r"\b[0-9a-f-]{36}\b", "<UUID>", entry.message)
            msg = re.sub(r"\b\d+\b", "<NUM>", msg)
            msg = re.sub(r"'[^']*'", "'<STR>'", msg)
            normalized.append(msg)

        counter = Counter(normalized)
        patterns = []
        for pattern, count in counter.most_common(20):
            examples = [
                e for e in errors
                if self._matches_pattern(e.message, pattern)
            ][:3]
            patterns.append({
                "pattern": pattern,
                "count": count,
                "first_seen": min(e.timestamp for e in examples).isoformat(),
                "last_seen": max(e.timestamp for e in examples).isoformat(),
                "services": list(set(e.service for e in examples)),
                "examples": [e.raw for e in examples],
            })
        return patterns

The normalization step replaces UUIDs, numbers, and string literals with placeholders, allowing the agent to group errors that differ only in their specific values.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Root Cause Analysis with the LLM

With structured error patterns, the agent uses the LLM to perform root cause analysis.

import json

def analyze_root_cause(self, raw_logs: str) -> dict:
    entries = self.parser.parse(raw_logs)
    patterns = self.detect_error_patterns(entries)
    timeline = self._build_timeline(entries)

    response = client.chat.completions.create(
        model=self.model,
        messages=[
            {"role": "system", "content": """You are a senior SRE
performing incident root cause analysis. Given error patterns
and a timeline, determine:
1. The primary root cause
2. The chain of events that led to the failure
3. Which services were affected and in what order
4. Recommended immediate actions
5. Long-term fixes to prevent recurrence

Return JSON with:
- "root_cause": one-sentence summary
- "event_chain": ordered list of events
- "affected_services": list with impact description
- "immediate_actions": list of steps to take now
- "prevention": list of long-term improvements
- "severity": "critical", "high", "medium", or "low"
"""},
            {"role": "user", "content": (
                f"Error patterns:\n{json.dumps(patterns, indent=2)}\n\n"
                f"Event timeline:\n{timeline}"
            )},
        ],
        temperature=0,
        response_format={"type": "json_object"},
    )

    return json.loads(response.choices[0].message.content)

def _build_timeline(self, entries: list[LogEntry]) -> str:
    significant = [
        e for e in entries
        if e.level in ("ERROR", "CRITICAL", "WARNING")
    ]
    lines = []
    for entry in significant[:100]:
        lines.append(
            f"[{entry.timestamp.isoformat()}] "
            f"{entry.level} [{entry.service}] {entry.message}"
        )
    return "\n".join(lines)

Generating an Incident Report

def generate_report(self, raw_logs: str) -> str:
    analysis = self.analyze_root_cause(raw_logs)
    entries = self.parser.parse(raw_logs)
    total = len(entries)
    errors = len([e for e in entries if e.level in ("ERROR", "CRITICAL")])

    report = f"""# Incident Analysis Report

## Summary
**Root Cause:** {analysis['root_cause']}
**Severity:** {analysis['severity'].upper()}
**Total log entries analyzed:** {total}
**Error entries:** {errors}

## Event Chain
"""
    for i, event in enumerate(analysis["event_chain"], 1):
        report += f"{i}. {event}\n"

    report += "\n## Affected Services\n"
    for svc in analysis["affected_services"]:
        report += f"- {svc}\n"

    report += "\n## Immediate Actions\n"
    for action in analysis["immediate_actions"]:
        report += f"- [ ] {action}\n"

    return report

FAQ

How does the agent handle logs from multiple services with different formats?

The parser supports multiple format patterns and tries each one in order. For services with completely custom formats, you add a new regex pattern to the PATTERNS list. The service name extracted from each log line allows the agent to correlate events across services by timestamp.

Can the agent process logs in real time?

Yes, by wrapping the parser in a streaming consumer that reads from a log aggregation system like Kafka or a file tail. Buffer entries for a time window (for example 60 seconds), run pattern detection on each window, and trigger a full root cause analysis when error rates spike above a threshold.

How do I handle log volumes that exceed the LLM context window?

Summarize before sending to the LLM. The pattern detection step reduces thousands of error lines to a handful of patterns with counts. The timeline is limited to the most recent 100 significant events. For very large volumes, add a pre-filtering step that focuses on the time window around the incident.


#LogAnalysis #AIAgents #Python #Observability #DevOps #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.