Building a CRM Event Agent: Reacting to New Leads, Updates, and Deal Closures
Build an AI agent that reacts to CRM webhook events to score leads, automate follow-ups, and trigger notifications when deals progress through your sales pipeline.
Why CRM Events Need AI Agents
Sales teams miss opportunities every day because they cannot react fast enough. A new lead fills out a form at midnight, and nobody follows up until the next afternoon. A deal that has been stalled for two weeks goes unnoticed. A high-value customer downgrades their plan, and the account manager finds out three days later.
A CRM event agent solves this by monitoring every change in your CRM in real time and taking intelligent action. It scores incoming leads and routes them to the right salesperson. It detects stalled deals and nudges the assigned rep. It drafts personalized follow-up emails based on the prospect's industry and interaction history.
CRM Webhook Architecture
Most modern CRMs — HubSpot, Salesforce, Pipedrive — support webhooks. The pattern is consistent: you register a URL, select which object changes to listen for, and the CRM sends POST requests when those changes occur.
import os
from fastapi import FastAPI, Request, BackgroundTasks
from pydantic import BaseModel
from openai import AsyncOpenAI
from datetime import datetime
app = FastAPI()
llm = AsyncOpenAI()
class CRMEvent(BaseModel):
event_type: str # e.g., "contact.created", "deal.updated"
object_id: str
object_type: str # "contact", "deal", "company"
properties: dict
timestamp: str
previous_properties: dict | None = None
@app.post("/crm/webhook")
async def crm_webhook(
request: Request, background_tasks: BackgroundTasks
):
payload = await request.json()
events = payload if isinstance(payload, list) else [payload]
for event_data in events:
event = CRMEvent(**event_data)
background_tasks.add_task(process_crm_event, event)
return {"status": "accepted", "count": len(events)}
Many CRMs batch multiple events into a single webhook delivery. The handler unpacks lists and processes each event individually.
AI-Powered Lead Scoring
When a new contact enters the CRM, the agent scores them based on their profile data and enrichment signals.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
async def process_crm_event(event: CRMEvent):
handlers = {
"contact.created": handle_new_lead,
"deal.updated": handle_deal_update,
"deal.stage_changed": handle_deal_stage_change,
"contact.updated": handle_contact_update,
}
handler = handlers.get(event.event_type)
if handler:
await handler(event)
async def handle_new_lead(event: CRMEvent):
props = event.properties
company = props.get("company", "Unknown")
title = props.get("job_title", "Unknown")
source = props.get("lead_source", "Unknown")
email_domain = props.get("email", "").split("@")[-1]
prompt = f"""Score this incoming lead from 1-100 and classify them.
Company: {company}
Job Title: {title}
Lead Source: {source}
Email Domain: {email_domain}
Respond in this exact format:
SCORE: [number]
TIER: [hot/warm/cold]
REASON: [one sentence explanation]
RECOMMENDED_ACTION: [immediate-call/email-sequence/nurture-campaign]"""
response = await llm.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
)
analysis = response.choices[0].message.content
score_data = parse_lead_score(analysis)
await update_crm_contact(event.object_id, {
"lead_score": score_data["score"],
"lead_tier": score_data["tier"],
})
if score_data["tier"] == "hot":
await assign_to_senior_rep(event.object_id)
await send_slack_alert(
f"Hot lead detected: {props.get('name')} at {company} "
f"(Score: {score_data['score']})"
)
Deal Pipeline Monitoring
Track deal progression and detect when deals stall or regress through stages.
async def handle_deal_stage_change(event: CRMEvent):
current_stage = event.properties.get("stage")
previous_stage = (event.previous_properties or {}).get("stage")
deal_value = event.properties.get("amount", 0)
deal_name = event.properties.get("name", "Unknown Deal")
owner = event.properties.get("owner_name", "Unassigned")
if is_regression(previous_stage, current_stage):
prompt = f"""A deal has regressed in the pipeline.
Deal: {deal_name} (Value: ${deal_value:,.2f})
Moved from: {previous_stage} -> {current_stage}
Owner: {owner}
Draft a brief internal alert explaining why this might be concerning
and suggest 2-3 recovery actions the sales rep could take."""
response = await llm.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
)
await send_slack_alert(
f"Deal regression alert: {deal_name}\n\n"
f"{response.choices[0].message.content}"
)
if current_stage == "closed_won":
await handle_deal_won(event)
STAGE_ORDER = [
"qualification", "discovery", "proposal",
"negotiation", "closed_won", "closed_lost",
]
def is_regression(old_stage: str | None, new_stage: str) -> bool:
if old_stage is None:
return False
try:
old_idx = STAGE_ORDER.index(old_stage)
new_idx = STAGE_ORDER.index(new_stage)
return new_idx < old_idx
except ValueError:
return False
Automated Follow-Up Generation
When a deal advances past discovery, generate a personalized follow-up email based on all accumulated context.
async def handle_deal_won(event: CRMEvent):
deal = event.properties
contact_id = deal.get("contact_id")
contact = await fetch_crm_contact(contact_id)
prompt = f"""A deal has been won! Generate two messages:
1. A congratulations message for the sales rep
2. A customer welcome email for onboarding
Deal: {deal.get('name')}
Value: ${deal.get('amount', 0):,.2f}
Customer: {contact.get('name')} at {contact.get('company')}
Industry: {contact.get('industry', 'Unknown')}
Keep both messages professional and warm."""
response = await llm.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
)
messages = response.choices[0].message.content
await send_slack_alert(f"Deal won: {deal.get('name')} - ${deal.get('amount', 0):,.2f}")
await queue_onboarding_email(contact_id, messages)
CRM API Integration Helpers
Abstract the CRM API calls so swapping between providers requires minimal changes.
import httpx
CRM_API_BASE = os.environ["CRM_API_BASE"]
CRM_API_KEY = os.environ["CRM_API_KEY"]
async def update_crm_contact(contact_id: str, properties: dict):
async with httpx.AsyncClient() as client:
await client.patch(
f"{CRM_API_BASE}/contacts/{contact_id}",
headers={"Authorization": f"Bearer {CRM_API_KEY}"},
json={"properties": properties},
)
async def fetch_crm_contact(contact_id: str) -> dict:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{CRM_API_BASE}/contacts/{contact_id}",
headers={"Authorization": f"Bearer {CRM_API_KEY}"},
)
return resp.json()
FAQ
How do I handle CRMs that do not support webhooks natively?
Use a polling approach. Schedule a background task that queries the CRM API every 30-60 seconds for recently modified records. Compare timestamps with the last poll to identify new changes. Libraries like APScheduler or Celery Beat work well for this.
How do I avoid circular updates when the agent writes back to the CRM?
Add a flag like updated_by: "ai-agent" to every CRM update your agent makes. In your webhook handler, check for this flag and skip events that your agent triggered. This prevents infinite loops where the agent reacts to its own updates.
What lead scoring accuracy should I expect from an LLM?
LLM-based lead scoring is best used as a first-pass prioritization, not a replacement for historical conversion data. Combine the LLM score with rule-based signals like company size, industry, and engagement history. Regularly audit scores against actual conversion outcomes to refine your prompts.
#CRM #LeadScoring #SalesAutomation #AIAgents #Webhooks #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.