Skip to content
Learn Agentic AI14 min read0 views

CRM Integration Agent: Automating Salesforce and HubSpot Updates with AI

Build an AI agent that synchronizes data between CRMs like Salesforce and HubSpot, automatically logs activities, updates pipeline stages, and enriches contact records using intelligent data processing.

Why CRM Data Decays Without Automation

CRM systems are only as valuable as the data inside them. Yet studies show that CRM data decays at roughly 30 percent per year — contacts change jobs, companies merge, deals stall without updates, and sales reps forget to log activities. An AI agent that monitors communication channels, detects relevant events, and pushes updates to the CRM keeps data fresh without burdening the sales team.

In this guide, we build a CRM integration agent that connects to HubSpot and Salesforce APIs, logs activities automatically, updates deal stages based on email signals, and enriches contact records using AI analysis.

Connecting to HubSpot

HubSpot's API uses bearer token authentication. We create a reusable client for common operations:

import httpx
from dataclasses import dataclass
from typing import Any

@dataclass
class CRMContact:
    id: str
    email: str
    first_name: str
    last_name: str
    company: str
    properties: dict[str, Any]

class HubSpotClient:
    BASE_URL = "https://api.hubapi.com"

    def __init__(self, access_token: str):
        self.client = httpx.Client(
            base_url=self.BASE_URL,
            headers={"Authorization": f"Bearer {access_token}"},
            timeout=30,
        )

    def search_contacts(self, query: str) -> list[CRMContact]:
        """Search contacts by email, name, or company."""
        response = self.client.post(
            "/crm/v3/objects/contacts/search",
            json={
                "filterGroups": [{
                    "filters": [{
                        "propertyName": "email",
                        "operator": "CONTAINS_TOKEN",
                        "value": query,
                    }]
                }],
                "properties": ["email", "firstname", "lastname", "company"],
            },
        )
        response.raise_for_status()
        results = response.json().get("results", [])
        return [
            CRMContact(
                id=r["id"],
                email=r["properties"].get("email", ""),
                first_name=r["properties"].get("firstname", ""),
                last_name=r["properties"].get("lastname", ""),
                company=r["properties"].get("company", ""),
                properties=r["properties"],
            )
            for r in results
        ]

    def update_contact(self, contact_id: str, properties: dict[str, str]):
        """Update contact properties."""
        response = self.client.patch(
            f"/crm/v3/objects/contacts/{contact_id}",
            json={"properties": properties},
        )
        response.raise_for_status()

    def create_note(self, contact_id: str, body: str):
        """Create a note associated with a contact."""
        note = self.client.post(
            "/crm/v3/objects/notes",
            json={"properties": {"hs_note_body": body, "hs_timestamp": ""}},
        )
        note.raise_for_status()
        note_id = note.json()["id"]

        self.client.put(
            f"/crm/v3/objects/notes/{note_id}/associations/contacts/{contact_id}/note_to_contact",
            json={},
        )

Connecting to Salesforce

Salesforce uses OAuth2 with a connected app. The simple-salesforce library simplifies authentication:

from simple_salesforce import Salesforce

def get_salesforce_client(
    username: str, password: str, security_token: str
) -> Salesforce:
    """Connect to Salesforce using username/password flow."""
    return Salesforce(
        username=username,
        password=password,
        security_token=security_token,
    )

def sf_search_contacts(sf: Salesforce, email: str) -> list[dict]:
    """Search Salesforce contacts by email."""
    query = f"SELECT Id, Email, FirstName, LastName, Account.Name FROM Contact WHERE Email = '{email}'"
    result = sf.query(query)
    return result["records"]

def sf_log_activity(sf: Salesforce, contact_id: str, subject: str, description: str):
    """Log a task/activity against a Salesforce contact."""
    sf.Task.create({
        "WhoId": contact_id,
        "Subject": subject,
        "Description": description,
        "Status": "Completed",
        "Priority": "Normal",
    })

def sf_update_opportunity_stage(sf: Salesforce, opp_id: str, stage: str):
    """Update an opportunity's stage."""
    sf.Opportunity.update(opp_id, {"StageName": stage})

Automatic Activity Logging

The agent monitors email threads and logs interactions to the CRM automatically. It uses an LLM to extract structured activity data from emails:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from openai import OpenAI

llm = OpenAI()

def extract_activity_from_email(
    sender: str, subject: str, body: str
) -> dict:
    """Extract structured activity data from an email."""
    response = llm.chat.completions.create(
        model="gpt-4o-mini",
        temperature=0,
        response_format={"type": "json_object"},
        messages=[
            {
                "role": "system",
                "content": (
                    "Extract CRM activity data from this email. Return JSON with:\n"
                    "- activity_type: one of (email_sent, email_received, meeting_scheduled, "
                    "  proposal_sent, contract_signed, follow_up_needed)\n"
                    "- summary: one sentence describing the interaction\n"
                    "- deal_signal: one of (positive, negative, neutral) indicating "
                    "  whether this moves a deal forward\n"
                    "- suggested_stage: if deal_signal is positive, suggest a pipeline "
                    "  stage (qualification, proposal, negotiation, closed_won)\n"
                    "- action_items: list of follow-up actions detected"
                ),
            },
            {
                "role": "user",
                "content": f"From: {sender}\nSubject: {subject}\n\n{body}",
            },
        ],
    )
    import json
    return json.loads(response.choices[0].message.content)

Pipeline Management with Deal Signals

The agent detects deal signals in communications and updates pipeline stages. A positive signal like "We are ready to move forward" triggers a stage advancement:

import logging

logger = logging.getLogger("crm_agent")

STAGE_ORDER = [
    "qualification",
    "proposal",
    "negotiation",
    "closed_won",
]

def process_email_for_crm(
    hubspot: HubSpotClient,
    sender_email: str,
    subject: str,
    body: str,
):
    """Process an email and update CRM accordingly."""
    # Find contact in CRM
    contacts = hubspot.search_contacts(sender_email)
    if not contacts:
        logger.info(f"No CRM contact found for {sender_email}")
        return

    contact = contacts[0]

    # Extract activity data
    activity = extract_activity_from_email(sender_email, subject, body)

    # Log the activity as a note
    note_body = (
        f"<b>{activity['activity_type']}</b><br>"
        f"Subject: {subject}<br>"
        f"Signal: {activity['deal_signal']}<br><br>"
        f"{activity['summary']}"
    )
    hubspot.create_note(contact.id, note_body)
    logger.info(f"Logged activity for {contact.email}: {activity['summary']}")

    # Update pipeline if positive deal signal detected
    if activity["deal_signal"] == "positive" and activity.get("suggested_stage"):
        logger.info(
            f"Positive deal signal detected. Suggested stage: {activity['suggested_stage']}"
        )

    # Flag action items for follow-up
    if activity.get("action_items"):
        for item in activity["action_items"]:
            logger.info(f"Action item: {item}")

Data Enrichment

The agent enriches sparse contact records by analyzing available data and filling in missing fields:

def enrich_contact(hubspot: HubSpotClient, contact: CRMContact):
    """Enrich a contact record with AI-analyzed data."""
    if contact.company and not contact.properties.get("industry"):
        response = llm.chat.completions.create(
            model="gpt-4o-mini",
            temperature=0,
            response_format={"type": "json_object"},
            messages=[
                {
                    "role": "system",
                    "content": "Given a company name, return JSON with: industry, company_size_estimate, likely_headquarters_country.",
                },
                {"role": "user", "content": f"Company: {contact.company}"},
            ],
        )
        import json
        enrichment = json.loads(response.choices[0].message.content)
        hubspot.update_contact(contact.id, {
            "industry": enrichment.get("industry", ""),
        })
        logger.info(f"Enriched {contact.email} with industry: {enrichment.get('industry')}")

FAQ

How do I handle rate limits from CRM APIs?

HubSpot allows 100 requests per 10 seconds on most plans. Salesforce limits vary by edition. Implement exponential backoff with the tenacity library: decorate API calls with @retry(wait=wait_exponential(min=1, max=30), stop=stop_after_attempt(5)). Batch operations using the CRM's bulk APIs when processing more than 50 records.

Should I sync data bidirectionally between Salesforce and HubSpot?

Bidirectional sync is significantly more complex due to conflict resolution. Designate one system as the source of truth for each data type. For example, HubSpot owns marketing data while Salesforce owns deal data. The agent syncs unidirectionally for each data type, avoiding merge conflicts.

How do I prevent duplicate records when the agent creates contacts?

Always search by email before creating a new contact. Use the CRM's deduplication APIs if available — HubSpot's /crm/v3/objects/contacts/search with email filters handles this well. Maintain a local cache of recently created contacts to catch rapid duplicates that might not appear in search results immediately due to indexing delays.


#CRMAutomation #AIAgents #Salesforce #HubSpot #WorkflowAutomation #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.