Building an Agent Analytics Pipeline: Collecting, Storing, and Analyzing Conversation Data
Learn how to build an end-to-end analytics pipeline for AI agents, from event collection and schema design to data warehousing, ETL processing, and query patterns that surface actionable insights.
Why Agent Analytics Requires a Dedicated Pipeline
Most teams deploy AI agents and then rely on application logs to understand what is happening. Application logs were designed for debugging, not analysis. They are unstructured, scattered across services, and impossible to aggregate into business metrics without significant effort.
A dedicated analytics pipeline collects structured events from every agent interaction, stores them in a queryable format, and enables both real-time dashboards and historical analysis. This is the foundation that every other analytics capability builds on.
Defining the Event Schema
The first step is designing an event schema that captures what matters. Every agent interaction produces several types of events: conversation starts, user messages, agent responses, tool calls, handoffs, and conversation endings. Each event needs a consistent structure.
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid
import json
@dataclass
class AgentEvent:
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
conversation_id: str = ""
session_id: str = ""
event_type: str = "" # message, tool_call, handoff, error, completion
timestamp: str = field(
default_factory=lambda: datetime.utcnow().isoformat()
)
agent_name: str = ""
user_id: str = ""
payload: dict[str, Any] = field(default_factory=dict)
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict:
return {
"event_id": self.event_id,
"conversation_id": self.conversation_id,
"session_id": self.session_id,
"event_type": self.event_type,
"timestamp": self.timestamp,
"agent_name": self.agent_name,
"user_id": self.user_id,
"payload": self.payload,
"metadata": self.metadata,
}
The payload field holds event-specific data: the message text for a message event, the tool name and arguments for a tool call, or the error details for an error event. The metadata field captures contextual information like model name, token counts, and latency.
Event Collection Layer
The collection layer instruments your agent code to emit events at every significant point. A lightweight collector class buffers events and flushes them in batches to avoid overwhelming downstream systems.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import asyncio
from collections import deque
import aiohttp
class EventCollector:
def __init__(self, endpoint: str, batch_size: int = 50, flush_interval: float = 5.0):
self.endpoint = endpoint
self.batch_size = batch_size
self.flush_interval = flush_interval
self._buffer: deque[dict] = deque()
self._running = False
async def collect(self, event: AgentEvent) -> None:
self._buffer.append(event.to_dict())
if len(self._buffer) >= self.batch_size:
await self._flush()
async def _flush(self) -> None:
if not self._buffer:
return
batch = []
while self._buffer and len(batch) < self.batch_size:
batch.append(self._buffer.popleft())
async with aiohttp.ClientSession() as session:
await session.post(
self.endpoint,
json={"events": batch},
headers={"Content-Type": "application/json"},
)
async def start_periodic_flush(self) -> None:
self._running = True
while self._running:
await asyncio.sleep(self.flush_interval)
await self._flush()
ETL and Data Warehouse Loading
Raw events need transformation before they become useful for analysis. An ETL stage enriches events with computed fields, normalizes values, and loads them into a warehouse table.
import psycopg2
from psycopg2.extras import execute_values
def transform_events(raw_events: list[dict]) -> list[tuple]:
rows = []
for event in raw_events:
token_count = event.get("metadata", {}).get("total_tokens", 0)
latency_ms = event.get("metadata", {}).get("latency_ms", 0)
rows.append((
event["event_id"],
event["conversation_id"],
event["session_id"],
event["event_type"],
event["timestamp"],
event["agent_name"],
event["user_id"],
json.dumps(event["payload"]),
token_count,
latency_ms,
))
return rows
def load_to_warehouse(rows: list[tuple], conn_string: str) -> int:
conn = psycopg2.connect(conn_string)
cur = conn.cursor()
execute_values(
cur,
"""INSERT INTO agent_events
(event_id, conversation_id, session_id, event_type,
event_ts, agent_name, user_id, payload, token_count, latency_ms)
VALUES %s
ON CONFLICT (event_id) DO NOTHING""",
rows,
)
conn.commit()
inserted = cur.rowcount
cur.close()
conn.close()
return inserted
Query Patterns for Analysis
With structured data in a warehouse, you can answer critical questions. How many conversations happen per hour? What is the average resolution time? Which agents handle the most volume?
QUERIES = {
"conversations_per_hour": """
SELECT date_trunc('hour', event_ts) AS hour,
COUNT(DISTINCT conversation_id) AS conversations
FROM agent_events
WHERE event_type = 'message'
AND event_ts >= NOW() - INTERVAL '24 hours'
GROUP BY 1 ORDER BY 1
""",
"avg_resolution_time": """
SELECT agent_name,
AVG(EXTRACT(EPOCH FROM (max_ts - min_ts))) AS avg_seconds
FROM (
SELECT conversation_id, agent_name,
MIN(event_ts) AS min_ts, MAX(event_ts) AS max_ts
FROM agent_events
GROUP BY conversation_id, agent_name
) sub
GROUP BY agent_name
""",
"top_error_types": """
SELECT payload->>'error_type' AS error_type,
COUNT(*) AS occurrences
FROM agent_events
WHERE event_type = 'error'
GROUP BY 1 ORDER BY 2 DESC LIMIT 10
""",
}
FAQ
What database should I use for agent analytics?
PostgreSQL works well for moderate volumes (under 100 million events). For larger scales, columnar stores like ClickHouse or cloud warehouses like BigQuery give significantly faster aggregation queries. Start with PostgreSQL and migrate when query latency becomes a bottleneck.
How do I handle high-volume event collection without slowing down the agent?
Use asynchronous buffered collection as shown above. The collector accumulates events in memory and flushes them in batches, so the agent never blocks waiting for a database write. For very high throughput, add a message queue like Kafka or Redis Streams between the collector and the warehouse loader.
Should I store raw conversation text in the analytics warehouse?
Store it, but be mindful of PII regulations. The raw text is invaluable for conversation mining and quality analysis. Apply column-level encryption or tokenization for sensitive fields, and implement retention policies that automatically purge data older than your compliance window.
#Analytics #DataPipeline #ETL #Python #AIAgents #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.