Skip to content
Learn Agentic AI10 min read0 views

Building AI Data Import Agents: Mapping, Cleaning, and Validating Uploaded Data

Create an AI-powered data import pipeline that detects file formats, maps columns to your schema automatically, cleans messy data, and validates records before insertion.

The Data Import Problem in SaaS

Every SaaS product eventually faces the CSV import problem. Users upload spreadsheets with inconsistent column names, mixed date formats, duplicate rows, and missing required fields. Traditional import tools show users a mapping screen with 30 dropdowns, and the failure rate is high — wrong mappings, rejected rows, and frustrated users who give up.

An AI data import agent solves this by automatically detecting the file format, mapping columns to your schema, cleaning problematic values, and validating everything before a single row is written.

Format Detection

Start by identifying the file type and parsing it into a normalized structure.

import csv
import io
import json
from pathlib import Path
from dataclasses import dataclass

@dataclass
class ParsedFile:
    columns: list[str]
    rows: list[dict]
    original_filename: str
    detected_format: str
    row_count: int

def detect_and_parse(file_content: bytes, filename: str) -> ParsedFile:
    suffix = Path(filename).suffix.lower()

    if suffix == ".csv":
        return parse_csv(file_content, filename)
    elif suffix in (".xls", ".xlsx"):
        return parse_excel(file_content, filename)
    elif suffix == ".json":
        return parse_json(file_content, filename)
    elif suffix == ".tsv":
        return parse_csv(file_content, filename, delimiter="\t")
    else:
        # Try CSV as default
        return parse_csv(file_content, filename)

def parse_csv(content: bytes, filename: str,
              delimiter: str = ",") -> ParsedFile:
    # Detect encoding
    text = try_decode(content)
    reader = csv.DictReader(io.StringIO(text), delimiter=delimiter)
    rows = list(reader)
    columns = reader.fieldnames or []

    return ParsedFile(
        columns=columns,
        rows=rows,
        original_filename=filename,
        detected_format="csv",
        row_count=len(rows),
    )

def try_decode(content: bytes) -> str:
    for encoding in ["utf-8", "utf-8-sig", "latin-1", "cp1252"]:
        try:
            return content.decode(encoding)
        except UnicodeDecodeError:
            continue
    raise ValueError("Could not detect file encoding.")

def parse_json(content: bytes, filename: str) -> ParsedFile:
    text = try_decode(content)
    data = json.loads(text)

    if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
        columns = list(data[0].keys())
        return ParsedFile(
            columns=columns, rows=data, original_filename=filename,
            detected_format="json", row_count=len(data),
        )
    raise ValueError("JSON must be an array of objects.")

AI-Powered Column Mapping

The AI examines the uploaded column names and sample data to map them to your schema fields.

@dataclass
class ColumnMapping:
    source_column: str
    target_field: str
    confidence: float
    transform: str | None  # e.g., "date_parse", "phone_normalize"

@dataclass
class TargetField:
    name: str
    data_type: str
    required: bool
    description: str
    examples: list[str]

# Define your schema fields
CONTACT_FIELDS = [
    TargetField("first_name", "string", True, "Contact first name",
                ["John", "Jane", "Ahmed"]),
    TargetField("last_name", "string", True, "Contact last name",
                ["Smith", "Doe", "Khan"]),
    TargetField("email", "email", True, "Email address",
                ["john@example.com"]),
    TargetField("phone", "phone", False, "Phone number",
                ["+1-555-123-4567"]),
    TargetField("company", "string", False, "Company name",
                ["Acme Corp", "Globex"]),
    TargetField("created_date", "date", False, "Record creation date",
                ["2026-01-15"]),
]


async def map_columns(parsed: ParsedFile, target_fields: list[TargetField],
                       llm_client) -> list[ColumnMapping]:
    # Extract sample values for each source column
    samples = {}
    for col in parsed.columns:
        values = [row.get(col, "") for row in parsed.rows[:5] if row.get(col)]
        samples[col] = values

    schema_desc = "\n".join([
        f"- {f.name} ({f.data_type}, {'required' if f.required else 'optional'}): "
        f"{f.description}. Examples: {f.examples}"
        for f in target_fields
    ])

    prompt = f"""Map the source CSV columns to the target schema fields.

Source columns and sample values:
{json.dumps(samples, indent=2)}

Target schema:
{schema_desc}

Return JSON array of mappings:
[{{"source": "source_col", "target": "target_field", "confidence": 0.0-1.0,
   "transform": null or "date_parse" or "phone_normalize" or "email_lowercase"}}]

If a source column does not match any target field, set target to null.
If a target field has no matching source column, omit it."""

    response = await llm_client.chat(
        messages=[{"role": "user", "content": prompt}],
        response_format={"type": "json_object"},
    )

    mappings_data = json.loads(response.content)
    return [
        ColumnMapping(
            source_column=m["source"],
            target_field=m["target"],
            confidence=m["confidence"],
            transform=m.get("transform"),
        )
        for m in mappings_data
        if m.get("target")
    ]

Data Cleaning and Transformation

Apply transformations detected during mapping and clean common data quality issues.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from datetime import datetime
import re
import phonenumbers

class DataCleaner:
    def clean_row(self, row: dict, mappings: list[ColumnMapping]) -> dict:
        cleaned = {}
        for mapping in mappings:
            raw_value = row.get(mapping.source_column, "")
            if not raw_value or str(raw_value).strip() == "":
                cleaned[mapping.target_field] = None
                continue

            value = str(raw_value).strip()

            if mapping.transform == "date_parse":
                value = self.parse_date(value)
            elif mapping.transform == "phone_normalize":
                value = self.normalize_phone(value)
            elif mapping.transform == "email_lowercase":
                value = value.lower()

            # General cleaning
            value = self.general_clean(value, mapping.target_field)
            cleaned[mapping.target_field] = value

        return cleaned

    def parse_date(self, value: str) -> str | None:
        formats = [
            "%Y-%m-%d", "%m/%d/%Y", "%d/%m/%Y", "%m-%d-%Y",
            "%d-%m-%Y", "%B %d, %Y", "%b %d, %Y", "%Y/%m/%d",
        ]
        for fmt in formats:
            try:
                dt = datetime.strptime(value, fmt)
                return dt.strftime("%Y-%m-%d")
            except ValueError:
                continue
        return None

    def normalize_phone(self, value: str) -> str | None:
        try:
            parsed = phonenumbers.parse(value, "US")
            if phonenumbers.is_valid_number(parsed):
                return phonenumbers.format_number(
                    parsed, phonenumbers.PhoneNumberFormat.E164
                )
        except phonenumbers.NumberParseException:
            pass
        # Fallback: strip non-digits
        digits = re.sub(r"[^\d+]", "", value)
        return digits if len(digits) >= 7 else None

    def general_clean(self, value: str, field_name: str) -> str:
        # Remove excess whitespace
        value = " ".join(value.split())
        # Title case for names
        if field_name in ("first_name", "last_name"):
            value = value.title()
        return value

Validation Pipeline

Validate every row before insertion and report errors by row and field.

@dataclass
class ValidationError:
    row_number: int
    field: str
    value: str | None
    error: str
    severity: str  # "error" or "warning"

class DataValidator:
    def __init__(self, target_fields: list[TargetField]):
        self.fields = {f.name: f for f in target_fields}

    def validate_batch(self, rows: list[dict]) -> tuple[list[dict], list[ValidationError]]:
        valid_rows = []
        errors = []

        for i, row in enumerate(rows):
            row_errors = self.validate_row(row, i + 1)
            has_fatal = any(e.severity == "error" for e in row_errors)
            errors.extend(row_errors)
            if not has_fatal:
                valid_rows.append(row)

        return valid_rows, errors

    def validate_row(self, row: dict, row_num: int) -> list[ValidationError]:
        errors = []

        # Check required fields
        for field_name, field_def in self.fields.items():
            value = row.get(field_name)
            if field_def.required and not value:
                errors.append(ValidationError(
                    row_number=row_num, field=field_name,
                    value=None, error="Required field is missing",
                    severity="error",
                ))
                continue

            if value and field_def.data_type == "email":
                if not re.match(r"^[^@]+@[^@]+\.[^@]+$", str(value)):
                    errors.append(ValidationError(
                        row_number=row_num, field=field_name,
                        value=str(value), error="Invalid email format",
                        severity="error",
                    ))

        return errors

Import API

Tie everything together in an API that handles upload, preview, and commit.

from fastapi import FastAPI, UploadFile, Depends
from pydantic import BaseModel

app = FastAPI()

class ImportPreview(BaseModel):
    row_count: int
    column_mappings: list[dict]
    validation_errors: list[dict]
    valid_row_count: int
    sample_rows: list[dict]

@app.post("/api/import/preview", response_model=ImportPreview)
async def preview_import(
    file: UploadFile,
    entity_type: str,
    tenant_id: str = Depends(get_current_tenant),
    llm_client = Depends(get_llm_client),
):
    content = await file.read()
    parsed = detect_and_parse(content, file.filename)

    target_fields = get_target_fields(entity_type)
    mappings = await map_columns(parsed, target_fields, llm_client)

    cleaner = DataCleaner()
    cleaned_rows = [cleaner.clean_row(row, mappings) for row in parsed.rows]

    validator = DataValidator(target_fields)
    valid_rows, errors = validator.validate_batch(cleaned_rows)

    return ImportPreview(
        row_count=parsed.row_count,
        column_mappings=[vars(m) for m in mappings],
        validation_errors=[vars(e) for e in errors[:100]],
        valid_row_count=len(valid_rows),
        sample_rows=valid_rows[:5],
    )

FAQ

How do I handle CSV files with no header row?

Detect headerless files by checking if the first row contains values that look like data rather than labels (e.g., they contain numbers, email addresses, or dates). If no header is detected, generate synthetic column names ("Column 1", "Column 2") and pass the sample data to the LLM for mapping. The AI can often infer the correct mapping from data patterns alone.

What if the AI maps columns incorrectly?

Always show the user a mapping preview before committing the import. Display source column names, sample values, the AI's suggested target field, and a confidence score. Let users change any mapping with a dropdown. Log the user's corrections as training data to improve future mapping accuracy for that tenant.

How do I handle duplicate detection during import?

Before insertion, check for duplicates using a combination of key fields (e.g., email for contacts, name + company for deals). Present duplicates to the user with three options: skip, overwrite, or merge. For merge, use the AI to combine fields intelligently — for example, keeping the longer notes field and the more recent phone number.


#AIDataImport #DataCleaning #ColumnMapping #SaaS #Python #ETL #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.