Agent Analytics for Marketplace Providers: Understanding Usage and Revenue
Build an analytics system for agent marketplace publishers that tracks usage patterns, revenue metrics, user satisfaction, and optimization opportunities. Learn metrics collection, dashboard design, and actionable insights generation.
Why Marketplace Analytics Are Different
Agent marketplace analytics serve two audiences: the marketplace operator needs platform-level metrics (total GMV, active publishers, consumer retention), and individual publishers need agent-level metrics (install count, usage patterns, revenue, satisfaction scores). The analytics system must aggregate raw telemetry into actionable insights for both audiences.
Traditional SaaS analytics track page views and clicks. Agent analytics track conversations, tool usage patterns, error rates, cost efficiency, and outcome quality. These agent-specific metrics require purpose-built collection and aggregation pipelines.
Event Collection Pipeline
Every agent interaction generates a stream of events. A structured event schema ensures consistent collection across all agents in the marketplace:
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
import uuid
class EventType(Enum):
AGENT_INVOKED = "agent_invoked"
AGENT_COMPLETED = "agent_completed"
AGENT_ERRORED = "agent_errored"
TOOL_CALLED = "tool_called"
TOOL_FAILED = "tool_failed"
USER_FEEDBACK = "user_feedback"
INSTALL = "install"
UNINSTALL = "uninstall"
@dataclass
class AnalyticsEvent:
id: str = field(
default_factory=lambda: str(uuid.uuid4())
)
event_type: EventType = EventType.AGENT_INVOKED
agent_id: str = ""
publisher_id: str = ""
tenant_id: str = ""
timestamp: datetime = field(
default_factory=lambda: datetime.now(timezone.utc)
)
properties: dict = field(default_factory=dict)
class EventCollector:
def __init__(self, event_queue):
self.queue = event_queue
async def track_invocation(
self,
agent_id: str,
publisher_id: str,
tenant_id: str,
input_tokens: int,
output_tokens: int,
tool_calls: list[str],
duration_ms: int,
success: bool,
cost_usd: float,
):
event = AnalyticsEvent(
event_type=(
EventType.AGENT_COMPLETED
if success
else EventType.AGENT_ERRORED
),
agent_id=agent_id,
publisher_id=publisher_id,
tenant_id=tenant_id,
properties={
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"tool_calls": tool_calls,
"duration_ms": duration_ms,
"cost_usd": cost_usd,
},
)
await self.queue.enqueue(event)
async def track_feedback(
self,
agent_id: str,
publisher_id: str,
tenant_id: str,
rating: int,
comment: Optional[str] = None,
):
event = AnalyticsEvent(
event_type=EventType.USER_FEEDBACK,
agent_id=agent_id,
publisher_id=publisher_id,
tenant_id=tenant_id,
properties={
"rating": rating,
"comment": comment,
},
)
await self.queue.enqueue(event)
Publisher Dashboard Metrics
Publishers need metrics that help them understand how their agent performs and where to invest improvement effort:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from dataclasses import dataclass
@dataclass
class PublisherDashboardMetrics:
# Usage
total_invocations: int = 0
unique_tenants: int = 0
active_installs: int = 0
invocations_trend: list[dict] = field(
default_factory=list
)
# Quality
avg_satisfaction: float = 0.0
error_rate: float = 0.0
avg_response_time_ms: int = 0
p95_response_time_ms: int = 0
# Revenue
total_revenue: float = 0.0
revenue_trend: list[dict] = field(
default_factory=list
)
avg_revenue_per_tenant: float = 0.0
# Tool usage
tool_usage_breakdown: dict[str, int] = field(
default_factory=dict
)
tool_failure_rates: dict[str, float] = field(
default_factory=dict
)
class PublisherAnalyticsService:
def __init__(self, event_store):
self.events = event_store
async def get_dashboard(
self, publisher_id: str, period_days: int = 30
) -> PublisherDashboardMetrics:
raw_events = await self.events.query(
publisher_id=publisher_id,
days=period_days,
)
completions = [
e for e in raw_events
if e.event_type == EventType.AGENT_COMPLETED
]
errors = [
e for e in raw_events
if e.event_type == EventType.AGENT_ERRORED
]
feedback = [
e for e in raw_events
if e.event_type == EventType.USER_FEEDBACK
]
total = len(completions) + len(errors)
unique_tenants = len(set(
e.tenant_id for e in completions + errors
))
# Tool usage breakdown
tool_counts: dict[str, int] = {}
for event in completions:
for tool in event.properties.get(
"tool_calls", []
):
tool_counts[tool] = (
tool_counts.get(tool, 0) + 1
)
# Revenue
total_revenue = sum(
e.properties.get("cost_usd", 0)
for e in completions
)
# Satisfaction
ratings = [
e.properties["rating"]
for e in feedback
if "rating" in e.properties
]
avg_sat = (
sum(ratings) / len(ratings) if ratings else 0.0
)
# Response times
durations = [
e.properties["duration_ms"]
for e in completions
if "duration_ms" in e.properties
]
durations.sort()
avg_duration = (
sum(durations) // len(durations)
if durations
else 0
)
p95_duration = (
durations[int(len(durations) * 0.95)]
if durations
else 0
)
return PublisherDashboardMetrics(
total_invocations=total,
unique_tenants=unique_tenants,
avg_satisfaction=round(avg_sat, 2),
error_rate=(
round(len(errors) / total, 4)
if total > 0
else 0.0
),
avg_response_time_ms=avg_duration,
p95_response_time_ms=p95_duration,
total_revenue=round(total_revenue, 2),
avg_revenue_per_tenant=(
round(total_revenue / unique_tenants, 2)
if unique_tenants > 0
else 0.0
),
tool_usage_breakdown=tool_counts,
)
Insight Generation
Raw metrics are useful, but actionable insights drive improvement. An insight engine analyzes patterns and generates recommendations:
@dataclass
class Insight:
severity: str # "critical", "warning", "info"
category: str
title: str
description: str
recommendation: str
class InsightEngine:
async def generate_insights(
self, metrics: PublisherDashboardMetrics
) -> list[Insight]:
insights = []
if metrics.error_rate > 0.05:
insights.append(Insight(
severity="critical",
category="reliability",
title="High Error Rate",
description=(
f"Error rate is {metrics.error_rate:.1%}, "
f"above the 5% threshold."
),
recommendation=(
"Review error logs for the most common "
"failure patterns. Check tool integrations "
"and add retry logic for transient failures."
),
))
if metrics.p95_response_time_ms > 10000:
insights.append(Insight(
severity="warning",
category="performance",
title="Slow p95 Response Time",
description=(
f"p95 latency is "
f"{metrics.p95_response_time_ms}ms."
),
recommendation=(
"Consider using a faster model for simple "
"queries or adding response streaming."
),
))
if metrics.avg_satisfaction < 3.5:
insights.append(Insight(
severity="warning",
category="quality",
title="Low User Satisfaction",
description=(
f"Average rating is "
f"{metrics.avg_satisfaction}/5.0."
),
recommendation=(
"Review low-rated conversations to identify "
"common frustration patterns. Improve system "
"prompt or add missing tool capabilities."
),
))
# Tool failure analysis
for tool, rate in metrics.tool_failure_rates.items():
if rate > 0.1:
insights.append(Insight(
severity="warning",
category="reliability",
title=f"Tool '{tool}' Failing Often",
description=(
f"Failure rate: {rate:.1%}"
),
recommendation=(
f"Check the '{tool}' integration "
f"configuration and API health."
),
))
return insights
FAQ
What are the most important metrics for a marketplace publisher to track?
Focus on three pillars: adoption (install count, active tenants, retention), quality (satisfaction rating, error rate, response latency), and revenue (total revenue, revenue per tenant, churn rate). Adoption without quality leads to uninstalls. Quality without revenue tracking leads to unsustainable pricing.
How do you handle analytics data privacy across tenants?
Never expose one tenant's conversation content to another tenant or to the publisher. Aggregate metrics — counts, averages, distributions — are safe to share. Individual conversation logs should only be visible to the tenant who owns them. Publishers see aggregate statistics about how their agent performs across all tenants without seeing any specific tenant's data.
How frequently should analytics be updated?
Real-time for operational metrics like error rate and latency — publishers need to catch issues immediately. Hourly for usage and revenue metrics — this balances freshness with compute cost. Daily for trend analysis and insights — these require enough data to be statistically meaningful.
#AgentAnalytics #MarketplaceMetrics #RevenueAnalytics #UsageTracking #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.