AI Agent for Log Analysis: Automated Error Detection and Root Cause Analysis
Build an AI agent that parses application logs, detects error patterns, identifies anomalies, correlates events across services, and generates root cause analysis reports automatically.
The Log Analysis Challenge
Modern applications generate enormous volumes of logs across multiple services. When an incident occurs, engineers spend most of their time searching through logs, correlating timestamps, and piecing together what happened. An AI log analysis agent automates this process: it ingests logs, detects anomalies, clusters related errors, and produces a root cause analysis that points engineers to the exact sequence of events that triggered the problem.
Structured Log Parsing
The first step is converting raw log lines into structured data the agent can reason about.
import re
from datetime import datetime
from dataclasses import dataclass, field
from collections import Counter
from openai import OpenAI
client = OpenAI()
@dataclass
class LogEntry:
timestamp: datetime
level: str
service: str
message: str
raw: str
metadata: dict = field(default_factory=dict)
class LogParser:
PATTERNS = [
re.compile(
r"(?P<timestamp>[\d-]+ [\d:,.]+)\s+"
r"(?P<level>DEBUG|INFO|WARNING|ERROR|CRITICAL)\s+"
r"\[(?P<service>[^\]]+)\]\s+"
r"(?P<message>.+)"
),
re.compile(
r"(?P<timestamp>[\d-]+T[\d:.]+Z?)\s+"
r"(?P<level>\w+)\s+"
r"(?P<service>[\w.-]+):\s+"
r"(?P<message>.+)"
),
]
def parse(self, raw_logs: str) -> list[LogEntry]:
entries = []
for line in raw_logs.strip().split("\n"):
if not line.strip():
continue
entry = self._parse_line(line)
if entry:
entries.append(entry)
entries.sort(key=lambda e: e.timestamp)
return entries
def _parse_line(self, line: str) -> LogEntry | None:
for pattern in self.PATTERNS:
match = pattern.match(line)
if match:
groups = match.groupdict()
ts_str = groups["timestamp"]
for fmt in [
"%Y-%m-%d %H:%M:%S,%f",
"%Y-%m-%dT%H:%M:%S.%fZ",
"%Y-%m-%d %H:%M:%S",
]:
try:
ts = datetime.strptime(ts_str, fmt)
break
except ValueError:
continue
else:
ts = datetime.now()
return LogEntry(
timestamp=ts, level=groups["level"],
service=groups["service"],
message=groups["message"], raw=line,
)
return None
Error Pattern Detection
The agent groups errors by message pattern and identifies the most common failures.
class LogAnalysisAgent:
def __init__(self, model: str = "gpt-4o"):
self.model = model
self.parser = LogParser()
def detect_error_patterns(
self, entries: list[LogEntry]
) -> list[dict]:
errors = [e for e in entries if e.level in ("ERROR", "CRITICAL")]
if not errors:
return []
normalized = []
for entry in errors:
msg = re.sub(r"\b[0-9a-f-]{36}\b", "<UUID>", entry.message)
msg = re.sub(r"\b\d+\b", "<NUM>", msg)
msg = re.sub(r"'[^']*'", "'<STR>'", msg)
normalized.append(msg)
counter = Counter(normalized)
patterns = []
for pattern, count in counter.most_common(20):
examples = [
e for e in errors
if self._matches_pattern(e.message, pattern)
][:3]
patterns.append({
"pattern": pattern,
"count": count,
"first_seen": min(e.timestamp for e in examples).isoformat(),
"last_seen": max(e.timestamp for e in examples).isoformat(),
"services": list(set(e.service for e in examples)),
"examples": [e.raw for e in examples],
})
return patterns
The normalization step replaces UUIDs, numbers, and string literals with placeholders, allowing the agent to group errors that differ only in their specific values.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Root Cause Analysis with the LLM
With structured error patterns, the agent uses the LLM to perform root cause analysis.
import json
def analyze_root_cause(self, raw_logs: str) -> dict:
entries = self.parser.parse(raw_logs)
patterns = self.detect_error_patterns(entries)
timeline = self._build_timeline(entries)
response = client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": """You are a senior SRE
performing incident root cause analysis. Given error patterns
and a timeline, determine:
1. The primary root cause
2. The chain of events that led to the failure
3. Which services were affected and in what order
4. Recommended immediate actions
5. Long-term fixes to prevent recurrence
Return JSON with:
- "root_cause": one-sentence summary
- "event_chain": ordered list of events
- "affected_services": list with impact description
- "immediate_actions": list of steps to take now
- "prevention": list of long-term improvements
- "severity": "critical", "high", "medium", or "low"
"""},
{"role": "user", "content": (
f"Error patterns:\n{json.dumps(patterns, indent=2)}\n\n"
f"Event timeline:\n{timeline}"
)},
],
temperature=0,
response_format={"type": "json_object"},
)
return json.loads(response.choices[0].message.content)
def _build_timeline(self, entries: list[LogEntry]) -> str:
significant = [
e for e in entries
if e.level in ("ERROR", "CRITICAL", "WARNING")
]
lines = []
for entry in significant[:100]:
lines.append(
f"[{entry.timestamp.isoformat()}] "
f"{entry.level} [{entry.service}] {entry.message}"
)
return "\n".join(lines)
Generating an Incident Report
def generate_report(self, raw_logs: str) -> str:
analysis = self.analyze_root_cause(raw_logs)
entries = self.parser.parse(raw_logs)
total = len(entries)
errors = len([e for e in entries if e.level in ("ERROR", "CRITICAL")])
report = f"""# Incident Analysis Report
## Summary
**Root Cause:** {analysis['root_cause']}
**Severity:** {analysis['severity'].upper()}
**Total log entries analyzed:** {total}
**Error entries:** {errors}
## Event Chain
"""
for i, event in enumerate(analysis["event_chain"], 1):
report += f"{i}. {event}\n"
report += "\n## Affected Services\n"
for svc in analysis["affected_services"]:
report += f"- {svc}\n"
report += "\n## Immediate Actions\n"
for action in analysis["immediate_actions"]:
report += f"- [ ] {action}\n"
return report
FAQ
How does the agent handle logs from multiple services with different formats?
The parser supports multiple format patterns and tries each one in order. For services with completely custom formats, you add a new regex pattern to the PATTERNS list. The service name extracted from each log line allows the agent to correlate events across services by timestamp.
Can the agent process logs in real time?
Yes, by wrapping the parser in a streaming consumer that reads from a log aggregation system like Kafka or a file tail. Buffer entries for a time window (for example 60 seconds), run pattern detection on each window, and trigger a full root cause analysis when error rates spike above a threshold.
How do I handle log volumes that exceed the LLM context window?
Summarize before sending to the LLM. The pattern detection step reduces thousands of error lines to a handful of patterns with counts. The timeline is limited to the most recent 100 significant events. For very large volumes, add a pre-filtering step that focuses on the time window around the incident.
#LogAnalysis #AIAgents #Python #Observability #DevOps #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.