Skip to content
Learn Agentic AI14 min read0 views

Building an Agent Analytics Pipeline: Collecting, Storing, and Analyzing Conversation Data

Learn how to build an end-to-end analytics pipeline for AI agents, from event collection and schema design to data warehousing, ETL processing, and query patterns that surface actionable insights.

Why Agent Analytics Requires a Dedicated Pipeline

Most teams deploy AI agents and then rely on application logs to understand what is happening. Application logs were designed for debugging, not analysis. They are unstructured, scattered across services, and impossible to aggregate into business metrics without significant effort.

A dedicated analytics pipeline collects structured events from every agent interaction, stores them in a queryable format, and enables both real-time dashboards and historical analysis. This is the foundation that every other analytics capability builds on.

Defining the Event Schema

The first step is designing an event schema that captures what matters. Every agent interaction produces several types of events: conversation starts, user messages, agent responses, tool calls, handoffs, and conversation endings. Each event needs a consistent structure.

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid
import json

@dataclass
class AgentEvent:
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    conversation_id: str = ""
    session_id: str = ""
    event_type: str = ""  # message, tool_call, handoff, error, completion
    timestamp: str = field(
        default_factory=lambda: datetime.utcnow().isoformat()
    )
    agent_name: str = ""
    user_id: str = ""
    payload: dict[str, Any] = field(default_factory=dict)
    metadata: dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> dict:
        return {
            "event_id": self.event_id,
            "conversation_id": self.conversation_id,
            "session_id": self.session_id,
            "event_type": self.event_type,
            "timestamp": self.timestamp,
            "agent_name": self.agent_name,
            "user_id": self.user_id,
            "payload": self.payload,
            "metadata": self.metadata,
        }

The payload field holds event-specific data: the message text for a message event, the tool name and arguments for a tool call, or the error details for an error event. The metadata field captures contextual information like model name, token counts, and latency.

Event Collection Layer

The collection layer instruments your agent code to emit events at every significant point. A lightweight collector class buffers events and flushes them in batches to avoid overwhelming downstream systems.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import asyncio
from collections import deque
import aiohttp

class EventCollector:
    def __init__(self, endpoint: str, batch_size: int = 50, flush_interval: float = 5.0):
        self.endpoint = endpoint
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self._buffer: deque[dict] = deque()
        self._running = False

    async def collect(self, event: AgentEvent) -> None:
        self._buffer.append(event.to_dict())
        if len(self._buffer) >= self.batch_size:
            await self._flush()

    async def _flush(self) -> None:
        if not self._buffer:
            return
        batch = []
        while self._buffer and len(batch) < self.batch_size:
            batch.append(self._buffer.popleft())
        async with aiohttp.ClientSession() as session:
            await session.post(
                self.endpoint,
                json={"events": batch},
                headers={"Content-Type": "application/json"},
            )

    async def start_periodic_flush(self) -> None:
        self._running = True
        while self._running:
            await asyncio.sleep(self.flush_interval)
            await self._flush()

ETL and Data Warehouse Loading

Raw events need transformation before they become useful for analysis. An ETL stage enriches events with computed fields, normalizes values, and loads them into a warehouse table.

import psycopg2
from psycopg2.extras import execute_values

def transform_events(raw_events: list[dict]) -> list[tuple]:
    rows = []
    for event in raw_events:
        token_count = event.get("metadata", {}).get("total_tokens", 0)
        latency_ms = event.get("metadata", {}).get("latency_ms", 0)
        rows.append((
            event["event_id"],
            event["conversation_id"],
            event["session_id"],
            event["event_type"],
            event["timestamp"],
            event["agent_name"],
            event["user_id"],
            json.dumps(event["payload"]),
            token_count,
            latency_ms,
        ))
    return rows

def load_to_warehouse(rows: list[tuple], conn_string: str) -> int:
    conn = psycopg2.connect(conn_string)
    cur = conn.cursor()
    execute_values(
        cur,
        """INSERT INTO agent_events
           (event_id, conversation_id, session_id, event_type,
            event_ts, agent_name, user_id, payload, token_count, latency_ms)
           VALUES %s
           ON CONFLICT (event_id) DO NOTHING""",
        rows,
    )
    conn.commit()
    inserted = cur.rowcount
    cur.close()
    conn.close()
    return inserted

Query Patterns for Analysis

With structured data in a warehouse, you can answer critical questions. How many conversations happen per hour? What is the average resolution time? Which agents handle the most volume?

QUERIES = {
    "conversations_per_hour": """
        SELECT date_trunc('hour', event_ts) AS hour,
               COUNT(DISTINCT conversation_id) AS conversations
        FROM agent_events
        WHERE event_type = 'message'
          AND event_ts >= NOW() - INTERVAL '24 hours'
        GROUP BY 1 ORDER BY 1
    """,
    "avg_resolution_time": """
        SELECT agent_name,
               AVG(EXTRACT(EPOCH FROM (max_ts - min_ts))) AS avg_seconds
        FROM (
            SELECT conversation_id, agent_name,
                   MIN(event_ts) AS min_ts, MAX(event_ts) AS max_ts
            FROM agent_events
            GROUP BY conversation_id, agent_name
        ) sub
        GROUP BY agent_name
    """,
    "top_error_types": """
        SELECT payload->>'error_type' AS error_type,
               COUNT(*) AS occurrences
        FROM agent_events
        WHERE event_type = 'error'
        GROUP BY 1 ORDER BY 2 DESC LIMIT 10
    """,
}

FAQ

What database should I use for agent analytics?

PostgreSQL works well for moderate volumes (under 100 million events). For larger scales, columnar stores like ClickHouse or cloud warehouses like BigQuery give significantly faster aggregation queries. Start with PostgreSQL and migrate when query latency becomes a bottleneck.

How do I handle high-volume event collection without slowing down the agent?

Use asynchronous buffered collection as shown above. The collector accumulates events in memory and flushes them in batches, so the agent never blocks waiting for a database write. For very high throughput, add a message queue like Kafka or Redis Streams between the collector and the warehouse loader.

Should I store raw conversation text in the analytics warehouse?

Store it, but be mindful of PII regulations. The raw text is invaluable for conversation mining and quality analysis. Apply column-level encryption or tokenization for sensitive fields, and implement retention policies that automatically purge data older than your compliance window.


#Analytics #DataPipeline #ETL #Python #AIAgents #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.