ETL for AI Agent Training Data: Extracting and Transforming Conversation Logs
Build an ETL pipeline that extracts conversation logs from AI agent systems, anonymizes PII, transforms them into training-ready formats, and filters for quality to improve agent performance.
Why Conversation Logs Are Your Most Valuable Data
Every conversation your AI agent handles is a data point about what users actually ask, how the agent responds, and where it fails. This data is far more valuable than synthetic training sets because it reflects real user language, real edge cases, and real failure modes specific to your domain.
But raw conversation logs are messy. They contain PII that cannot be stored in training sets, they include failed conversations that would teach the model bad habits, and they are in whatever format your logging system uses rather than the format your training pipeline needs. An ETL pipeline transforms raw logs into clean, anonymized, quality-filtered training data.
Extracting Logs from Multiple Sources
Agent conversation logs typically live in multiple places: database tables, JSON log files, and third-party platforms. The extraction layer normalizes all sources into a common format.
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
from enum import Enum
import json
class MessageRole(str, Enum):
USER = "user"
ASSISTANT = "assistant"
SYSTEM = "system"
TOOL = "tool"
@dataclass
class Message:
role: MessageRole
content: str
timestamp: Optional[datetime] = None
tool_name: Optional[str] = None
tool_args: Optional[dict] = None
@dataclass
class Conversation:
id: str
messages: List[Message]
metadata: dict
source: str
class LogExtractor:
async def extract_from_db(self, db_pool) -> List[Conversation]:
async with db_pool.acquire() as conn:
rows = await conn.fetch("""
SELECT
c.id,
c.created_at,
c.metadata,
json_agg(
json_build_object(
'role', m.role,
'content', m.content,
'timestamp', m.created_at,
'tool_name', m.tool_name,
'tool_args', m.tool_args
) ORDER BY m.created_at
) AS messages
FROM conversations c
JOIN messages m ON m.conversation_id = c.id
WHERE c.created_at >= NOW() - INTERVAL '7 days'
GROUP BY c.id, c.created_at, c.metadata
""")
conversations = []
for row in rows:
messages = [
Message(
role=MessageRole(m["role"]),
content=m["content"],
timestamp=m.get("timestamp"),
tool_name=m.get("tool_name"),
tool_args=m.get("tool_args"),
)
for m in row["messages"]
]
conversations.append(Conversation(
id=str(row["id"]),
messages=messages,
metadata=dict(row["metadata"]) if row["metadata"] else {},
source="database",
))
return conversations
PII Anonymization
Training data must never contain personally identifiable information. Build a pipeline that detects and replaces PII before any data leaves the extraction stage.
import re
from typing import Dict, List
class PIIAnonymizer:
PATTERNS = {
"email": (
r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
"[EMAIL_REDACTED]"
),
"phone": (
r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
"[PHONE_REDACTED]"
),
"ssn": (
r"\b\d{3}-\d{2}-\d{4}\b",
"[SSN_REDACTED]"
),
"credit_card": (
r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
"[CC_REDACTED]"
),
"ip_address": (
r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
"[IP_REDACTED]"
),
}
def __init__(self, custom_patterns: Dict[str, tuple] = None):
self.patterns = {**self.PATTERNS}
if custom_patterns:
self.patterns.update(custom_patterns)
self.stats = {key: 0 for key in self.patterns}
def anonymize_text(self, text: str) -> str:
for name, (pattern, replacement) in self.patterns.items():
matches = re.findall(pattern, text)
self.stats[name] += len(matches)
text = re.sub(pattern, replacement, text)
return text
def anonymize_conversation(
self, conv: Conversation
) -> Conversation:
clean_messages = []
for msg in conv.messages:
clean_messages.append(Message(
role=msg.role,
content=self.anonymize_text(msg.content),
timestamp=msg.timestamp,
tool_name=msg.tool_name,
tool_args=(
self._anonymize_dict(msg.tool_args)
if msg.tool_args else None
),
))
return Conversation(
id=conv.id,
messages=clean_messages,
metadata={}, # strip metadata entirely
source=conv.source,
)
def _anonymize_dict(self, d: dict) -> dict:
result = {}
for k, v in d.items():
if isinstance(v, str):
result[k] = self.anonymize_text(v)
elif isinstance(v, dict):
result[k] = self._anonymize_dict(v)
else:
result[k] = v
return result
Quality Filtering
Not every conversation should become training data. Filter out conversations that are too short, contain errors, or represent edge cases that would confuse the model.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
@dataclass
class QualityScore:
conversation_id: str
turn_count: int
avg_response_length: int
has_tool_use: bool
has_error: bool
user_satisfaction: Optional[float]
passes: bool
rejection_reason: Optional[str] = None
class QualityFilter:
def __init__(
self,
min_turns: int = 3,
min_avg_response_length: int = 50,
max_turns: int = 50,
):
self.min_turns = min_turns
self.min_avg_response_length = min_avg_response_length
self.max_turns = max_turns
def evaluate(self, conv: Conversation) -> QualityScore:
user_msgs = [m for m in conv.messages if m.role == MessageRole.USER]
asst_msgs = [m for m in conv.messages if m.role == MessageRole.ASSISTANT]
turn_count = len(user_msgs)
avg_length = 0
if asst_msgs:
avg_length = sum(len(m.content) for m in asst_msgs) // len(asst_msgs)
has_tool = any(m.role == MessageRole.TOOL for m in conv.messages)
error_indicators = [
"error", "sorry, i cannot", "i don't have access",
"something went wrong",
]
has_error = any(
any(ind in m.content.lower() for ind in error_indicators)
for m in asst_msgs
)
passes = True
reason = None
if turn_count < self.min_turns:
passes, reason = False, f"Too few turns: {turn_count}"
elif turn_count > self.max_turns:
passes, reason = False, f"Too many turns: {turn_count}"
elif avg_length < self.min_avg_response_length:
passes, reason = False, f"Responses too short: {avg_length}"
elif has_error:
passes, reason = False, "Contains error responses"
return QualityScore(
conversation_id=conv.id,
turn_count=turn_count,
avg_response_length=avg_length,
has_tool_use=has_tool,
has_error=has_error,
user_satisfaction=None,
passes=passes,
rejection_reason=reason,
)
Format Conversion for Fine-Tuning
Convert filtered conversations to the JSONL format expected by training APIs.
def to_openai_format(conv: Conversation) -> dict:
messages = []
for msg in conv.messages:
if msg.role == MessageRole.TOOL:
messages.append({
"role": "tool",
"content": msg.content,
"tool_call_id": msg.tool_name,
})
else:
messages.append({
"role": msg.role.value,
"content": msg.content,
})
return {"messages": messages}
def export_training_data(
conversations: List[Conversation],
output_path: str,
):
with open(output_path, "w") as f:
for conv in conversations:
line = json.dumps(to_openai_format(conv))
f.write(line + "\n")
FAQ
How do I handle PII that regex patterns miss, like names and addresses?
Regex catches structured PII like emails and phone numbers. For unstructured PII like names and addresses, use a named entity recognition model such as spaCy's en_core_web_lg or a dedicated PII detection service. Run NER as a second pass after regex replacement, and replace detected PERSON, GPE, and ADDRESS entities with placeholders.
How many conversations do I need for effective fine-tuning?
OpenAI recommends a minimum of 50 examples for fine-tuning, but meaningful improvement typically requires 500 to 1,000 high-quality conversations. Quality matters more than quantity — 200 well-filtered conversations outperform 2,000 noisy ones. Start with a small dataset, evaluate the fine-tuned model, and add more data where you see gaps.
Should I include conversations where the agent used tools?
Yes, including tool-use conversations is especially valuable because tool calling is one of the hardest skills for agents to learn. Keep the tool call messages and tool response messages in the training data. This teaches the model when to invoke tools, how to format arguments, and how to synthesize tool outputs into natural responses.
#ETL #TrainingData #ConversationLogs #DataPipelines #PIIAnonymization #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.