Webhook and Integration Layer: Connecting AI Agents to CRMs, ERPs, and Third-Party Services
Build a robust integration framework for an AI agent platform that connects agents to Salesforce, HubSpot, Slack, and other third-party services through webhooks, OAuth flows, and a configurable data mapping layer.
Integrations Are the Moat
An AI agent that cannot talk to your CRM, create tickets in your helpdesk, or send notifications to Slack is a toy. Integrations transform agents from chatbots into workflow automation engines. For an agent SaaS platform, the breadth and reliability of your integration layer is often the difference between winning and losing enterprise deals.
The design challenge is building an integration framework that is generic enough to support hundreds of services but reliable enough that a failed webhook does not silently drop a customer's data.
Integration Framework Architecture
The framework has three layers: connection management (OAuth and credentials), execution (making API calls), and data mapping (translating between your schema and the external service):
# integration_models.py — Integration framework data model
from pydantic import BaseModel, Field
from typing import Optional
from enum import Enum
from datetime import datetime
import uuid
class AuthType(str, Enum):
OAUTH2 = "oauth2"
API_KEY = "api_key"
BEARER_TOKEN = "bearer_token"
BASIC_AUTH = "basic_auth"
WEBHOOK_SECRET = "webhook_secret"
class IntegrationProvider(BaseModel):
id: str # "salesforce", "hubspot", "slack"
name: str
auth_type: AuthType
oauth_config: Optional[dict] = None # authorize_url, token_url, scopes
base_url: str
available_actions: list[str] # "create_contact", "send_message", etc.
webhook_events: list[str] # Events this provider can send to us
class TenantIntegration(BaseModel):
id: uuid.UUID = Field(default_factory=uuid.uuid4)
tenant_id: uuid.UUID
provider_id: str
status: str = "pending" # "pending", "active", "error", "revoked"
credentials_encrypted: bytes = b""
refresh_token_encrypted: Optional[bytes] = None
token_expires_at: Optional[datetime] = None
webhook_url: Optional[str] = None # Our endpoint for receiving their webhooks
field_mappings: dict = {} # Maps our fields to their fields
created_at: datetime = Field(default_factory=datetime.utcnow)
OAuth Flow Implementation
Most enterprise integrations use OAuth2. Here is a complete flow that handles token refresh:
# oauth_service.py — OAuth2 connection management
import httpx
from cryptography.fernet import Fernet
from datetime import datetime, timedelta
class OAuthService:
def __init__(self, encryption_key: str, db):
self.cipher = Fernet(encryption_key.encode())
self.db = db
def get_authorize_url(self, provider: IntegrationProvider, tenant_id: uuid.UUID) -> str:
state = self.cipher.encrypt(f"{tenant_id}:{provider.id}".encode()).decode()
oauth = provider.oauth_config
return (
f"{oauth['authorize_url']}?"
f"client_id={oauth['client_id']}"
f"&redirect_uri={oauth['redirect_uri']}"
f"&scope={'+'.join(oauth['scopes'])}"
f"&response_type=code"
f"&state={state}"
)
async def handle_callback(self, code: str, state: str) -> TenantIntegration:
decrypted = self.cipher.decrypt(state.encode()).decode()
tenant_id, provider_id = decrypted.split(":")
provider = await self.get_provider(provider_id)
oauth = provider.oauth_config
async with httpx.AsyncClient() as client:
resp = await client.post(oauth["token_url"], data={
"grant_type": "authorization_code",
"code": code,
"client_id": oauth["client_id"],
"client_secret": oauth["client_secret"],
"redirect_uri": oauth["redirect_uri"],
})
resp.raise_for_status()
tokens = resp.json()
integration = TenantIntegration(
tenant_id=uuid.UUID(tenant_id),
provider_id=provider_id,
status="active",
credentials_encrypted=self.cipher.encrypt(tokens["access_token"].encode()),
refresh_token_encrypted=self.cipher.encrypt(
tokens.get("refresh_token", "").encode()
),
token_expires_at=datetime.utcnow() + timedelta(seconds=tokens.get("expires_in", 3600)),
)
await self.db.save(integration)
return integration
async def get_valid_token(self, integration: TenantIntegration) -> str:
if integration.token_expires_at and integration.token_expires_at > datetime.utcnow():
return self.cipher.decrypt(integration.credentials_encrypted).decode()
# Token expired — refresh it
provider = await self.get_provider(integration.provider_id)
refresh_token = self.cipher.decrypt(integration.refresh_token_encrypted).decode()
async with httpx.AsyncClient() as client:
resp = await client.post(provider.oauth_config["token_url"], data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": provider.oauth_config["client_id"],
"client_secret": provider.oauth_config["client_secret"],
})
resp.raise_for_status()
tokens = resp.json()
integration.credentials_encrypted = self.cipher.encrypt(tokens["access_token"].encode())
integration.token_expires_at = datetime.utcnow() + timedelta(
seconds=tokens.get("expires_in", 3600)
)
if "refresh_token" in tokens:
integration.refresh_token_encrypted = self.cipher.encrypt(
tokens["refresh_token"].encode()
)
await self.db.save(integration)
return tokens["access_token"]
Webhook Receiver
Your platform needs to receive webhooks from external services. Each tenant gets a unique webhook URL with signature verification:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
# webhook_receiver.py — Incoming webhook handler
import hmac
import hashlib
from fastapi import APIRouter, Request, HTTPException
router = APIRouter(prefix="/v1/webhooks")
@router.post("/incoming/{integration_id}")
async def receive_webhook(integration_id: str, request: Request):
integration = await db.get_integration(integration_id)
if not integration or integration.status != "active":
raise HTTPException(status_code=404)
body = await request.body()
# Verify webhook signature
provider = await get_provider(integration.provider_id)
if not verify_signature(provider, integration, request.headers, body):
raise HTTPException(status_code=401, detail="Invalid signature")
payload = await request.json()
# Route to appropriate handler
event_type = extract_event_type(provider, request.headers, payload)
await event_router.dispatch(
tenant_id=integration.tenant_id,
provider_id=integration.provider_id,
event_type=event_type,
payload=payload,
)
return {"status": "accepted"}
def verify_signature(provider, integration, headers, body: bytes) -> bool:
if provider.auth_type == AuthType.WEBHOOK_SECRET:
secret = cipher.decrypt(integration.credentials_encrypted)
signature_header = headers.get("x-webhook-signature", "")
expected = hmac.new(secret, body, hashlib.sha256).hexdigest()
return hmac.compare_digest(signature_header, expected)
return True # Providers without signature verification
Data Mapping Layer
Different CRMs use different field names for the same concept. The mapping layer translates between your canonical schema and each provider:
# field_mapper.py — Configurable field mapping between systems
class FieldMapper:
DEFAULT_MAPPINGS = {
"salesforce": {
"contact.email": "Email",
"contact.first_name": "FirstName",
"contact.last_name": "LastName",
"contact.phone": "Phone",
"contact.company": "Account.Name",
"ticket.subject": "Subject",
"ticket.description": "Description",
"ticket.priority": "Priority",
},
"hubspot": {
"contact.email": "email",
"contact.first_name": "firstname",
"contact.last_name": "lastname",
"contact.phone": "phone",
"contact.company": "company",
"ticket.subject": "subject",
"ticket.description": "content",
"ticket.priority": "hs_ticket_priority",
},
}
def __init__(self, provider_id: str, custom_mappings: dict = None):
self.mappings = {**self.DEFAULT_MAPPINGS.get(provider_id, {})}
if custom_mappings:
self.mappings.update(custom_mappings)
def to_external(self, canonical_data: dict) -> dict:
result = {}
for canonical_key, value in canonical_data.items():
external_key = self.mappings.get(canonical_key)
if external_key:
self._set_nested(result, external_key, value)
return result
def from_external(self, external_data: dict) -> dict:
reverse = {v: k for k, v in self.mappings.items()}
result = {}
for ext_key, value in self._flatten(external_data).items():
canonical_key = reverse.get(ext_key)
if canonical_key:
result[canonical_key] = value
return result
def _set_nested(self, d: dict, key: str, value):
keys = key.split(".")
for k in keys[:-1]:
d = d.setdefault(k, {})
d[keys[-1]] = value
def _flatten(self, d: dict, prefix="") -> dict:
items = {}
for k, v in d.items():
new_key = f"{prefix}.{k}" if prefix else k
if isinstance(v, dict):
items.update(self._flatten(v, new_key))
else:
items[new_key] = v
return items
FAQ
How do I handle webhook delivery failures from external services?
Implement idempotent webhook processing using the webhook's unique event ID. Store processed event IDs in a set (Redis or database) and skip duplicates. For your outgoing webhooks, implement exponential backoff retry with a dead-letter queue. After 5 failed attempts, mark the integration as "error" and notify the tenant.
Should I build integrations in-house or use a third-party integration platform like Merge or Unified?
Start with 3-5 in-house integrations for the services your customers use most (typically Salesforce, HubSpot, Slack, and Zendesk). This gives you full control over the experience. Evaluate third-party integration platforms once you need to support 20+ services — the abstraction layer they provide saves engineering time at scale, though it adds latency and another point of failure.
How do I let users configure field mappings without writing code?
Build a visual mapping UI with two columns — your canonical fields on the left, the external service's fields on the right. Fetch the external service's field list dynamically via their API (most CRMs have metadata endpoints). Let users draw connections between fields by clicking on pairs. Pre-populate with default mappings so most users only need to add their custom fields.
#Webhooks #Integrations #OAuth #CRM #APIDesign #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.