Skip to content
Learn Agentic AI11 min read0 views

Time-Series Data for AI Agents: Tracking Metrics, Costs, and Performance Over Time

Learn how to store and analyze AI agent time-series data including token costs, latency, and throughput using TimescaleDB, partitioning, retention policies, and aggregation queries.

Why Time-Series Data Matters for Agents

Every AI agent invocation generates temporal data: how long the LLM took to respond, how many tokens were consumed, what the cost was, whether the tool call succeeded, and how the user rated the response. Stored properly, this data answers critical operational questions. Which model is cheapest per successful conversation? Is latency trending upward? Which tools fail most often during peak hours?

Standard relational tables struggle with time-series workloads because of the write-heavy, append-only access pattern and the need for efficient time-range aggregations. TimescaleDB — a PostgreSQL extension — solves this with automatic partitioning, built-in compression, and time-oriented query functions.

Setting Up TimescaleDB

TimescaleDB runs as an extension inside PostgreSQL. Enable it and create a metrics hypertable:

-- Enable the extension
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- Create the metrics table
CREATE TABLE agent_metrics (
    time        TIMESTAMPTZ NOT NULL,
    agent_id    UUID NOT NULL,
    model       TEXT NOT NULL,
    metric_type TEXT NOT NULL,
    value       DOUBLE PRECISION NOT NULL,
    metadata    JSONB DEFAULT '{}'
);

-- Convert to a hypertable partitioned by time
SELECT create_hypertable(
    'agent_metrics',
    by_range('time'),
    chunk_time_interval => INTERVAL '1 day'
);

-- Add indexes for common query patterns
CREATE INDEX idx_agent_metrics_agent_type
    ON agent_metrics (agent_id, metric_type, time DESC);

The create_hypertable call transparently partitions the table into daily chunks. Queries that filter by time range only scan relevant chunks, and old chunks can be compressed or dropped independently.

Recording Agent Metrics

Insert metrics from your agent application after each LLM call:

import asyncpg
from datetime import datetime, timezone


async def record_agent_metrics(
    pool: asyncpg.Pool,
    agent_id: str,
    model: str,
    latency_ms: float,
    input_tokens: int,
    output_tokens: int,
    cost_usd: float,
    success: bool,
):
    now = datetime.now(timezone.utc)
    records = [
        (now, agent_id, model, "latency_ms", latency_ms, {}),
        (now, agent_id, model, "input_tokens", float(input_tokens), {}),
        (now, agent_id, model, "output_tokens", float(output_tokens), {}),
        (now, agent_id, model, "cost_usd", cost_usd, {}),
        (now, agent_id, model, "success", 1.0 if success else 0.0, {}),
    ]
    await pool.executemany(
        """
        INSERT INTO agent_metrics (time, agent_id, model, metric_type, value, metadata)
        VALUES ($1, $2, $3, $4, $5, $6)
        """,
        records,
    )

Batching multiple metric types into a single executemany call reduces round-trips. For high-throughput systems, buffer metrics in memory and flush in batches every few seconds.

Aggregation Queries

TimescaleDB provides time_bucket for time-based aggregation that outperforms standard date_trunc:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

-- Hourly average latency and total cost per model (last 24 hours)
SELECT
    time_bucket('1 hour', time) AS bucket,
    model,
    avg(value) FILTER (WHERE metric_type = 'latency_ms') AS avg_latency,
    sum(value) FILTER (WHERE metric_type = 'cost_usd') AS total_cost,
    avg(value) FILTER (WHERE metric_type = 'success') AS success_rate
FROM agent_metrics
WHERE time > now() - INTERVAL '24 hours'
GROUP BY bucket, model
ORDER BY bucket DESC;

The FILTER clause lets you aggregate different metric types in a single pass over the data. This is far more efficient than running separate queries per metric.

Continuous Aggregates

For dashboards that query the same aggregations repeatedly, create a continuous aggregate — a materialized view that TimescaleDB refreshes automatically:

CREATE MATERIALIZED VIEW hourly_agent_stats
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', time) AS bucket,
    agent_id,
    model,
    metric_type,
    avg(value) AS avg_value,
    max(value) AS max_value,
    min(value) AS min_value,
    count(*) AS sample_count
FROM agent_metrics
GROUP BY bucket, agent_id, model, metric_type;

-- Refresh policy: update every hour, covering the last 3 hours
SELECT add_continuous_aggregate_policy('hourly_agent_stats',
    start_offset => INTERVAL '3 hours',
    end_offset => INTERVAL '1 hour',
    schedule_interval => INTERVAL '1 hour'
);

Dashboard queries now read from the pre-computed aggregate, reducing query time from seconds to milliseconds.

Retention and Compression

Agent metrics accumulate rapidly. Configure automatic compression and retention:

-- Compress chunks older than 7 days
ALTER TABLE agent_metrics
    SET (timescaledb.compress);

SELECT add_compression_policy(
    'agent_metrics',
    compress_after => INTERVAL '7 days'
);

-- Drop raw data older than 90 days
SELECT add_retention_policy(
    'agent_metrics',
    drop_after => INTERVAL '90 days'
);

Compression typically achieves 90-95% space reduction for time-series data. The continuous aggregate retains the hourly summaries even after raw data is dropped.

Python Dashboard Query Example

Query the continuous aggregate for a cost dashboard:

async def get_daily_cost_summary(
    pool: asyncpg.Pool, agent_id: str, days: int = 30
) -> list[dict]:
    rows = await pool.fetch(
        """
        SELECT
            time_bucket('1 day', bucket) AS day,
            model,
            sum(avg_value * sample_count) AS total_cost
        FROM hourly_agent_stats
        WHERE agent_id = $1
          AND metric_type = 'cost_usd'
          AND bucket > now() - make_interval(days => $2)
        GROUP BY day, model
        ORDER BY day DESC
        """,
        agent_id,
        days,
    )
    return [dict(r) for r in rows]

FAQ

Can I use TimescaleDB with SQLAlchemy or Prisma?

Yes. TimescaleDB is a PostgreSQL extension, so any PostgreSQL-compatible ORM works. Define your tables normally in SQLAlchemy or Prisma, then run the create_hypertable call in a migration. The ORM does not need to know about hypertables — they behave like regular tables for inserts and queries.

How does TimescaleDB compare to InfluxDB or Prometheus for agent metrics?

TimescaleDB keeps everything in PostgreSQL, so you can JOIN metrics with your conversation and user tables. InfluxDB and Prometheus require a separate data store and cannot easily correlate metrics with application data. Use dedicated time-series databases only when you need sub-second ingestion of millions of points per second.

What chunk interval should I use?

Choose a chunk interval where each chunk fits comfortably in memory. For moderate write volumes (thousands of inserts per minute), daily chunks work well. For very high throughput, use hourly chunks. The TimescaleDB documentation recommends targeting 25% of available memory per active chunk.


#TimescaleDB #TimeSeries #PostgreSQL #AIAgents #Monitoring #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.