Skip to content
Learn Agentic AI11 min read0 views

Webhook Integration for AI Agents: Event-Driven Notifications and Callbacks

Build robust webhook systems for AI agent services, covering webhook design patterns, delivery guarantees with retries, payload signature verification, and event-driven architectures that keep agents reactive and loosely coupled.

Why Webhooks Matter for AI Agent Systems

Polling is the enemy of responsive AI systems. When an agent finishes a long-running task, the orchestrator should not be looping every second asking "are you done yet?" Webhooks flip this model: the agent pushes a notification to a registered URL the moment something happens.

In AI agent architectures, webhooks enable event-driven workflows. An agent completes a task and fires a webhook. A conversation reaches a sentiment threshold and triggers an alert. A tool call fails and notifies the error tracking system. These push-based notifications keep your system reactive without wasting resources on polling.

Designing the Webhook Registration System

Let consumers register webhooks for specific event types. Each registration specifies the target URL, which events to subscribe to, and a secret for signature verification:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, HttpUrl
from uuid import uuid4

app = FastAPI(title="AI Agent Webhook Service")

class WebhookRegistration(BaseModel):
    url: HttpUrl
    events: list[str]  # e.g., ["task.completed", "task.failed", "conversation.ended"]
    secret: str
    description: str = ""

class WebhookRecord(BaseModel):
    id: str
    url: str
    events: list[str]
    active: bool = True
    failure_count: int = 0

webhooks_db: dict[str, dict] = {}

@app.post("/webhooks", status_code=201)
async def register_webhook(body: WebhookRegistration) -> WebhookRecord:
    webhook_id = str(uuid4())
    record = {
        "id": webhook_id,
        "url": str(body.url),
        "events": body.events,
        "secret": body.secret,
        "active": True,
        "failure_count": 0,
    }
    webhooks_db[webhook_id] = record
    return WebhookRecord(**record)

@app.get("/webhooks")
async def list_webhooks():
    return {"webhooks": [
        WebhookRecord(**{k: v for k, v in w.items() if k != "secret"})
        for w in webhooks_db.values()
    ]}

@app.delete("/webhooks/{webhook_id}", status_code=204)
async def delete_webhook(webhook_id: str):
    if webhook_id not in webhooks_db:
        raise HTTPException(status_code=404, detail="Webhook not found")
    del webhooks_db[webhook_id]

Payload Signature Verification

Every webhook delivery must be signed so that receivers can verify the payload came from your service and was not tampered with. Use HMAC-SHA256:

import hmac
import hashlib
import json
from datetime import datetime

def sign_payload(payload: dict, secret: str, timestamp: str) -> str:
    message = f"{timestamp}.{json.dumps(payload, sort_keys=True)}"
    return hmac.new(
        secret.encode(),
        message.encode(),
        hashlib.sha256,
    ).hexdigest()

def build_webhook_headers(payload: dict, secret: str) -> dict:
    timestamp = datetime.utcnow().isoformat()
    signature = sign_payload(payload, secret, timestamp)
    return {
        "X-Webhook-Signature": signature,
        "X-Webhook-Timestamp": timestamp,
        "X-Webhook-Event": payload.get("event", "unknown"),
        "Content-Type": "application/json",
    }

On the receiving side, verify the signature before processing:

from fastapi import Request, HTTPException

WEBHOOK_SECRET = "my-webhook-secret"

@app.post("/my-webhook-receiver")
async def receive_webhook(request: Request):
    body = await request.body()
    payload = json.loads(body)
    timestamp = request.headers.get("X-Webhook-Timestamp", "")
    received_sig = request.headers.get("X-Webhook-Signature", "")

    expected_sig = sign_payload(payload, WEBHOOK_SECRET, timestamp)

    if not hmac.compare_digest(received_sig, expected_sig):
        raise HTTPException(status_code=401, detail="Invalid signature")

    # Process the event
    return {"received": True}

The hmac.compare_digest function prevents timing attacks by comparing in constant time.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Delivery with Retry Logic

Webhook deliveries fail. The receiver might be temporarily down, the network might hiccup, or the server might return a 500. Implement exponential backoff retries:

import httpx
import asyncio

MAX_RETRIES = 5
BASE_DELAY = 1  # seconds

async def deliver_webhook(
    url: str, payload: dict, secret: str, webhook_id: str
):
    headers = build_webhook_headers(payload, secret)

    for attempt in range(MAX_RETRIES):
        try:
            async with httpx.AsyncClient(timeout=10.0) as client:
                response = await client.post(url, json=payload, headers=headers)

            if 200 <= response.status_code < 300:
                await log_delivery(webhook_id, "success", attempt + 1)
                return True

            if response.status_code >= 500:
                # Server error — retry
                delay = BASE_DELAY * (2 ** attempt)
                await asyncio.sleep(delay)
                continue

            # 4xx — do not retry, client error
            await log_delivery(webhook_id, "client_error", attempt + 1)
            return False

        except httpx.RequestError:
            delay = BASE_DELAY * (2 ** attempt)
            await asyncio.sleep(delay)

    # All retries exhausted
    await mark_webhook_failing(webhook_id)
    return False

After all retries are exhausted, disable the webhook and notify the owner. Many systems auto-disable webhooks after a streak of failures (e.g., 10 consecutive failures over 24 hours) to avoid wasting resources.

Dispatching Events from Agent Logic

Wire the webhook dispatcher into your agent event system:

async def dispatch_event(event_type: str, data: dict):
    payload = {
        "event": event_type,
        "data": data,
        "timestamp": datetime.utcnow().isoformat(),
        "id": str(uuid4()),
    }
    matching = [
        w for w in webhooks_db.values()
        if w["active"] and event_type in w["events"]
    ]
    tasks = [
        deliver_webhook(w["url"], payload, w["secret"], w["id"])
        for w in matching
    ]
    await asyncio.gather(*tasks, return_exceptions=True)

# Usage in agent code
async def on_task_completed(task_id: str, result: dict):
    await dispatch_event("task.completed", {
        "task_id": task_id,
        "result": result,
    })

The dispatcher filters registered webhooks by event type and delivers to all matches concurrently.

FAQ

How do I prevent webhook replay attacks?

Include the timestamp in the signature and reject any delivery where the timestamp is more than five minutes old. This prevents an attacker from capturing a valid webhook payload and replaying it later. The receiver checks abs(now - webhook_timestamp) < 300 before accepting.

What should I do when a webhook receiver is consistently slow?

Set a hard timeout on delivery (10 seconds is standard) and track response times. If a receiver consistently takes more than 5 seconds, consider sending a notification to the webhook owner suggesting they offload processing to a background queue and return 200 immediately.

How do I let webhook consumers debug failed deliveries?

Provide a webhook delivery log endpoint that shows recent deliveries, their HTTP status codes, response bodies, and retry attempts. Include a "redeliver" button or API endpoint that replays a specific delivery, which is invaluable when consumers are debugging their receiver logic.


#Webhooks #AIAgents #EventDriven #FastAPI #Security #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.