CRM Integration Agent: Automating Salesforce and HubSpot Updates with AI
Build an AI agent that synchronizes data between CRMs like Salesforce and HubSpot, automatically logs activities, updates pipeline stages, and enriches contact records using intelligent data processing.
Why CRM Data Decays Without Automation
CRM systems are only as valuable as the data inside them. Yet studies show that CRM data decays at roughly 30 percent per year — contacts change jobs, companies merge, deals stall without updates, and sales reps forget to log activities. An AI agent that monitors communication channels, detects relevant events, and pushes updates to the CRM keeps data fresh without burdening the sales team.
In this guide, we build a CRM integration agent that connects to HubSpot and Salesforce APIs, logs activities automatically, updates deal stages based on email signals, and enriches contact records using AI analysis.
Connecting to HubSpot
HubSpot's API uses bearer token authentication. We create a reusable client for common operations:
import httpx
from dataclasses import dataclass
from typing import Any
@dataclass
class CRMContact:
id: str
email: str
first_name: str
last_name: str
company: str
properties: dict[str, Any]
class HubSpotClient:
BASE_URL = "https://api.hubapi.com"
def __init__(self, access_token: str):
self.client = httpx.Client(
base_url=self.BASE_URL,
headers={"Authorization": f"Bearer {access_token}"},
timeout=30,
)
def search_contacts(self, query: str) -> list[CRMContact]:
"""Search contacts by email, name, or company."""
response = self.client.post(
"/crm/v3/objects/contacts/search",
json={
"filterGroups": [{
"filters": [{
"propertyName": "email",
"operator": "CONTAINS_TOKEN",
"value": query,
}]
}],
"properties": ["email", "firstname", "lastname", "company"],
},
)
response.raise_for_status()
results = response.json().get("results", [])
return [
CRMContact(
id=r["id"],
email=r["properties"].get("email", ""),
first_name=r["properties"].get("firstname", ""),
last_name=r["properties"].get("lastname", ""),
company=r["properties"].get("company", ""),
properties=r["properties"],
)
for r in results
]
def update_contact(self, contact_id: str, properties: dict[str, str]):
"""Update contact properties."""
response = self.client.patch(
f"/crm/v3/objects/contacts/{contact_id}",
json={"properties": properties},
)
response.raise_for_status()
def create_note(self, contact_id: str, body: str):
"""Create a note associated with a contact."""
note = self.client.post(
"/crm/v3/objects/notes",
json={"properties": {"hs_note_body": body, "hs_timestamp": ""}},
)
note.raise_for_status()
note_id = note.json()["id"]
self.client.put(
f"/crm/v3/objects/notes/{note_id}/associations/contacts/{contact_id}/note_to_contact",
json={},
)
Connecting to Salesforce
Salesforce uses OAuth2 with a connected app. The simple-salesforce library simplifies authentication:
from simple_salesforce import Salesforce
def get_salesforce_client(
username: str, password: str, security_token: str
) -> Salesforce:
"""Connect to Salesforce using username/password flow."""
return Salesforce(
username=username,
password=password,
security_token=security_token,
)
def sf_search_contacts(sf: Salesforce, email: str) -> list[dict]:
"""Search Salesforce contacts by email."""
query = f"SELECT Id, Email, FirstName, LastName, Account.Name FROM Contact WHERE Email = '{email}'"
result = sf.query(query)
return result["records"]
def sf_log_activity(sf: Salesforce, contact_id: str, subject: str, description: str):
"""Log a task/activity against a Salesforce contact."""
sf.Task.create({
"WhoId": contact_id,
"Subject": subject,
"Description": description,
"Status": "Completed",
"Priority": "Normal",
})
def sf_update_opportunity_stage(sf: Salesforce, opp_id: str, stage: str):
"""Update an opportunity's stage."""
sf.Opportunity.update(opp_id, {"StageName": stage})
Automatic Activity Logging
The agent monitors email threads and logs interactions to the CRM automatically. It uses an LLM to extract structured activity data from emails:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from openai import OpenAI
llm = OpenAI()
def extract_activity_from_email(
sender: str, subject: str, body: str
) -> dict:
"""Extract structured activity data from an email."""
response = llm.chat.completions.create(
model="gpt-4o-mini",
temperature=0,
response_format={"type": "json_object"},
messages=[
{
"role": "system",
"content": (
"Extract CRM activity data from this email. Return JSON with:\n"
"- activity_type: one of (email_sent, email_received, meeting_scheduled, "
" proposal_sent, contract_signed, follow_up_needed)\n"
"- summary: one sentence describing the interaction\n"
"- deal_signal: one of (positive, negative, neutral) indicating "
" whether this moves a deal forward\n"
"- suggested_stage: if deal_signal is positive, suggest a pipeline "
" stage (qualification, proposal, negotiation, closed_won)\n"
"- action_items: list of follow-up actions detected"
),
},
{
"role": "user",
"content": f"From: {sender}\nSubject: {subject}\n\n{body}",
},
],
)
import json
return json.loads(response.choices[0].message.content)
Pipeline Management with Deal Signals
The agent detects deal signals in communications and updates pipeline stages. A positive signal like "We are ready to move forward" triggers a stage advancement:
import logging
logger = logging.getLogger("crm_agent")
STAGE_ORDER = [
"qualification",
"proposal",
"negotiation",
"closed_won",
]
def process_email_for_crm(
hubspot: HubSpotClient,
sender_email: str,
subject: str,
body: str,
):
"""Process an email and update CRM accordingly."""
# Find contact in CRM
contacts = hubspot.search_contacts(sender_email)
if not contacts:
logger.info(f"No CRM contact found for {sender_email}")
return
contact = contacts[0]
# Extract activity data
activity = extract_activity_from_email(sender_email, subject, body)
# Log the activity as a note
note_body = (
f"<b>{activity['activity_type']}</b><br>"
f"Subject: {subject}<br>"
f"Signal: {activity['deal_signal']}<br><br>"
f"{activity['summary']}"
)
hubspot.create_note(contact.id, note_body)
logger.info(f"Logged activity for {contact.email}: {activity['summary']}")
# Update pipeline if positive deal signal detected
if activity["deal_signal"] == "positive" and activity.get("suggested_stage"):
logger.info(
f"Positive deal signal detected. Suggested stage: {activity['suggested_stage']}"
)
# Flag action items for follow-up
if activity.get("action_items"):
for item in activity["action_items"]:
logger.info(f"Action item: {item}")
Data Enrichment
The agent enriches sparse contact records by analyzing available data and filling in missing fields:
def enrich_contact(hubspot: HubSpotClient, contact: CRMContact):
"""Enrich a contact record with AI-analyzed data."""
if contact.company and not contact.properties.get("industry"):
response = llm.chat.completions.create(
model="gpt-4o-mini",
temperature=0,
response_format={"type": "json_object"},
messages=[
{
"role": "system",
"content": "Given a company name, return JSON with: industry, company_size_estimate, likely_headquarters_country.",
},
{"role": "user", "content": f"Company: {contact.company}"},
],
)
import json
enrichment = json.loads(response.choices[0].message.content)
hubspot.update_contact(contact.id, {
"industry": enrichment.get("industry", ""),
})
logger.info(f"Enriched {contact.email} with industry: {enrichment.get('industry')}")
FAQ
How do I handle rate limits from CRM APIs?
HubSpot allows 100 requests per 10 seconds on most plans. Salesforce limits vary by edition. Implement exponential backoff with the tenacity library: decorate API calls with @retry(wait=wait_exponential(min=1, max=30), stop=stop_after_attempt(5)). Batch operations using the CRM's bulk APIs when processing more than 50 records.
Should I sync data bidirectionally between Salesforce and HubSpot?
Bidirectional sync is significantly more complex due to conflict resolution. Designate one system as the source of truth for each data type. For example, HubSpot owns marketing data while Salesforce owns deal data. The agent syncs unidirectionally for each data type, avoiding merge conflicts.
How do I prevent duplicate records when the agent creates contacts?
Always search by email before creating a new contact. Use the CRM's deduplication APIs if available — HubSpot's /crm/v3/objects/contacts/search with email filters handles this well. Maintain a local cache of recently created contacts to catch rapid duplicates that might not appear in search results immediately due to indexing delays.
#CRMAutomation #AIAgents #Salesforce #HubSpot #WorkflowAutomation #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.