Skip to content
Learn Agentic AI14 min read0 views

Webhook and Integration Layer: Connecting AI Agents to CRMs, ERPs, and Third-Party Services

Build a robust integration framework for an AI agent platform that connects agents to Salesforce, HubSpot, Slack, and other third-party services through webhooks, OAuth flows, and a configurable data mapping layer.

Integrations Are the Moat

An AI agent that cannot talk to your CRM, create tickets in your helpdesk, or send notifications to Slack is a toy. Integrations transform agents from chatbots into workflow automation engines. For an agent SaaS platform, the breadth and reliability of your integration layer is often the difference between winning and losing enterprise deals.

The design challenge is building an integration framework that is generic enough to support hundreds of services but reliable enough that a failed webhook does not silently drop a customer's data.

Integration Framework Architecture

The framework has three layers: connection management (OAuth and credentials), execution (making API calls), and data mapping (translating between your schema and the external service):

# integration_models.py — Integration framework data model
from pydantic import BaseModel, Field
from typing import Optional
from enum import Enum
from datetime import datetime
import uuid

class AuthType(str, Enum):
    OAUTH2 = "oauth2"
    API_KEY = "api_key"
    BEARER_TOKEN = "bearer_token"
    BASIC_AUTH = "basic_auth"
    WEBHOOK_SECRET = "webhook_secret"

class IntegrationProvider(BaseModel):
    id: str  # "salesforce", "hubspot", "slack"
    name: str
    auth_type: AuthType
    oauth_config: Optional[dict] = None  # authorize_url, token_url, scopes
    base_url: str
    available_actions: list[str]  # "create_contact", "send_message", etc.
    webhook_events: list[str]  # Events this provider can send to us

class TenantIntegration(BaseModel):
    id: uuid.UUID = Field(default_factory=uuid.uuid4)
    tenant_id: uuid.UUID
    provider_id: str
    status: str = "pending"  # "pending", "active", "error", "revoked"
    credentials_encrypted: bytes = b""
    refresh_token_encrypted: Optional[bytes] = None
    token_expires_at: Optional[datetime] = None
    webhook_url: Optional[str] = None  # Our endpoint for receiving their webhooks
    field_mappings: dict = {}  # Maps our fields to their fields
    created_at: datetime = Field(default_factory=datetime.utcnow)

OAuth Flow Implementation

Most enterprise integrations use OAuth2. Here is a complete flow that handles token refresh:

# oauth_service.py — OAuth2 connection management
import httpx
from cryptography.fernet import Fernet
from datetime import datetime, timedelta

class OAuthService:
    def __init__(self, encryption_key: str, db):
        self.cipher = Fernet(encryption_key.encode())
        self.db = db

    def get_authorize_url(self, provider: IntegrationProvider, tenant_id: uuid.UUID) -> str:
        state = self.cipher.encrypt(f"{tenant_id}:{provider.id}".encode()).decode()
        oauth = provider.oauth_config
        return (
            f"{oauth['authorize_url']}?"
            f"client_id={oauth['client_id']}"
            f"&redirect_uri={oauth['redirect_uri']}"
            f"&scope={'+'.join(oauth['scopes'])}"
            f"&response_type=code"
            f"&state={state}"
        )

    async def handle_callback(self, code: str, state: str) -> TenantIntegration:
        decrypted = self.cipher.decrypt(state.encode()).decode()
        tenant_id, provider_id = decrypted.split(":")
        provider = await self.get_provider(provider_id)
        oauth = provider.oauth_config

        async with httpx.AsyncClient() as client:
            resp = await client.post(oauth["token_url"], data={
                "grant_type": "authorization_code",
                "code": code,
                "client_id": oauth["client_id"],
                "client_secret": oauth["client_secret"],
                "redirect_uri": oauth["redirect_uri"],
            })
            resp.raise_for_status()
            tokens = resp.json()

        integration = TenantIntegration(
            tenant_id=uuid.UUID(tenant_id),
            provider_id=provider_id,
            status="active",
            credentials_encrypted=self.cipher.encrypt(tokens["access_token"].encode()),
            refresh_token_encrypted=self.cipher.encrypt(
                tokens.get("refresh_token", "").encode()
            ),
            token_expires_at=datetime.utcnow() + timedelta(seconds=tokens.get("expires_in", 3600)),
        )
        await self.db.save(integration)
        return integration

    async def get_valid_token(self, integration: TenantIntegration) -> str:
        if integration.token_expires_at and integration.token_expires_at > datetime.utcnow():
            return self.cipher.decrypt(integration.credentials_encrypted).decode()

        # Token expired — refresh it
        provider = await self.get_provider(integration.provider_id)
        refresh_token = self.cipher.decrypt(integration.refresh_token_encrypted).decode()

        async with httpx.AsyncClient() as client:
            resp = await client.post(provider.oauth_config["token_url"], data={
                "grant_type": "refresh_token",
                "refresh_token": refresh_token,
                "client_id": provider.oauth_config["client_id"],
                "client_secret": provider.oauth_config["client_secret"],
            })
            resp.raise_for_status()
            tokens = resp.json()

        integration.credentials_encrypted = self.cipher.encrypt(tokens["access_token"].encode())
        integration.token_expires_at = datetime.utcnow() + timedelta(
            seconds=tokens.get("expires_in", 3600)
        )
        if "refresh_token" in tokens:
            integration.refresh_token_encrypted = self.cipher.encrypt(
                tokens["refresh_token"].encode()
            )
        await self.db.save(integration)
        return tokens["access_token"]

Webhook Receiver

Your platform needs to receive webhooks from external services. Each tenant gets a unique webhook URL with signature verification:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

# webhook_receiver.py — Incoming webhook handler
import hmac
import hashlib
from fastapi import APIRouter, Request, HTTPException

router = APIRouter(prefix="/v1/webhooks")

@router.post("/incoming/{integration_id}")
async def receive_webhook(integration_id: str, request: Request):
    integration = await db.get_integration(integration_id)
    if not integration or integration.status != "active":
        raise HTTPException(status_code=404)

    body = await request.body()

    # Verify webhook signature
    provider = await get_provider(integration.provider_id)
    if not verify_signature(provider, integration, request.headers, body):
        raise HTTPException(status_code=401, detail="Invalid signature")

    payload = await request.json()

    # Route to appropriate handler
    event_type = extract_event_type(provider, request.headers, payload)
    await event_router.dispatch(
        tenant_id=integration.tenant_id,
        provider_id=integration.provider_id,
        event_type=event_type,
        payload=payload,
    )
    return {"status": "accepted"}

def verify_signature(provider, integration, headers, body: bytes) -> bool:
    if provider.auth_type == AuthType.WEBHOOK_SECRET:
        secret = cipher.decrypt(integration.credentials_encrypted)
        signature_header = headers.get("x-webhook-signature", "")
        expected = hmac.new(secret, body, hashlib.sha256).hexdigest()
        return hmac.compare_digest(signature_header, expected)
    return True  # Providers without signature verification

Data Mapping Layer

Different CRMs use different field names for the same concept. The mapping layer translates between your canonical schema and each provider:

# field_mapper.py — Configurable field mapping between systems
class FieldMapper:
    DEFAULT_MAPPINGS = {
        "salesforce": {
            "contact.email": "Email",
            "contact.first_name": "FirstName",
            "contact.last_name": "LastName",
            "contact.phone": "Phone",
            "contact.company": "Account.Name",
            "ticket.subject": "Subject",
            "ticket.description": "Description",
            "ticket.priority": "Priority",
        },
        "hubspot": {
            "contact.email": "email",
            "contact.first_name": "firstname",
            "contact.last_name": "lastname",
            "contact.phone": "phone",
            "contact.company": "company",
            "ticket.subject": "subject",
            "ticket.description": "content",
            "ticket.priority": "hs_ticket_priority",
        },
    }

    def __init__(self, provider_id: str, custom_mappings: dict = None):
        self.mappings = {**self.DEFAULT_MAPPINGS.get(provider_id, {})}
        if custom_mappings:
            self.mappings.update(custom_mappings)

    def to_external(self, canonical_data: dict) -> dict:
        result = {}
        for canonical_key, value in canonical_data.items():
            external_key = self.mappings.get(canonical_key)
            if external_key:
                self._set_nested(result, external_key, value)
        return result

    def from_external(self, external_data: dict) -> dict:
        reverse = {v: k for k, v in self.mappings.items()}
        result = {}
        for ext_key, value in self._flatten(external_data).items():
            canonical_key = reverse.get(ext_key)
            if canonical_key:
                result[canonical_key] = value
        return result

    def _set_nested(self, d: dict, key: str, value):
        keys = key.split(".")
        for k in keys[:-1]:
            d = d.setdefault(k, {})
        d[keys[-1]] = value

    def _flatten(self, d: dict, prefix="") -> dict:
        items = {}
        for k, v in d.items():
            new_key = f"{prefix}.{k}" if prefix else k
            if isinstance(v, dict):
                items.update(self._flatten(v, new_key))
            else:
                items[new_key] = v
        return items

FAQ

How do I handle webhook delivery failures from external services?

Implement idempotent webhook processing using the webhook's unique event ID. Store processed event IDs in a set (Redis or database) and skip duplicates. For your outgoing webhooks, implement exponential backoff retry with a dead-letter queue. After 5 failed attempts, mark the integration as "error" and notify the tenant.

Should I build integrations in-house or use a third-party integration platform like Merge or Unified?

Start with 3-5 in-house integrations for the services your customers use most (typically Salesforce, HubSpot, Slack, and Zendesk). This gives you full control over the experience. Evaluate third-party integration platforms once you need to support 20+ services — the abstraction layer they provide saves engineering time at scale, though it adds latency and another point of failure.

How do I let users configure field mappings without writing code?

Build a visual mapping UI with two columns — your canonical fields on the left, the external service's fields on the right. Fetch the external service's field list dynamically via their API (most CRMs have metadata endpoints). Let users draw connections between fields by clicking on pairs. Pre-populate with default mappings so most users only need to add their custom fields.


#Webhooks #Integrations #OAuth #CRM #APIDesign #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.