Real-Time Data Ingestion for AI Agents: Streaming Data from APIs, Webhooks, and Databases
Build a real-time data ingestion system for AI agents using change data capture, webhook receivers, and stream processing to keep agent knowledge bases continuously updated.
Why Batch Pipelines Are Not Enough
Batch ingestion pipelines that run every hour or every day leave AI agents working with stale data. When a customer updates their account, when a support ticket escalates, or when inventory drops below a threshold, your agent needs to know within seconds — not hours.
Real-time ingestion feeds data to agents as events occur. There are three primary patterns: polling APIs on tight intervals, receiving webhook pushes from external systems, and capturing database changes as they happen via change data capture (CDC). Each pattern fits different scenarios, and production systems typically combine all three.
Webhook Receivers
Webhooks are the simplest real-time pattern. External systems push events to your endpoint whenever something changes. The challenge is handling them reliably — verifying signatures, processing asynchronously, and surviving downstream failures.
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from datetime import datetime
import hashlib
import hmac
import json
app = FastAPI()
WEBHOOK_SECRET = "your-webhook-secret"
def verify_signature(payload: bytes, signature: str) -> bool:
expected = hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
async def process_event(event: dict):
"""Process webhook event asynchronously."""
event_type = event.get("type")
handlers = {
"ticket.created": handle_ticket_created,
"ticket.updated": handle_ticket_updated,
"customer.updated": handle_customer_updated,
}
handler = handlers.get(event_type)
if handler:
await handler(event["data"])
@app.post("/webhooks/incoming")
async def receive_webhook(
request: Request,
background_tasks: BackgroundTasks,
):
body = await request.body()
signature = request.headers.get("X-Signature", "")
if not verify_signature(body, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
event = json.loads(body)
# Store raw event for replay capability
await store_raw_event(event)
# Process asynchronously so webhook returns 200 fast
background_tasks.add_task(process_event, event)
return {"status": "accepted"}
Returning 200 quickly is essential. Webhook senders retry on timeouts, and if your processing is slow, you will receive duplicate events. Store the raw event first, then process in the background.
Change Data Capture from PostgreSQL
CDC captures every INSERT, UPDATE, and DELETE from your database and streams those changes to your ingestion pipeline. This is the most reliable real-time pattern because it captures all changes regardless of which application made them.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import psycopg2
import psycopg2.extras
import json
from datetime import datetime
class PostgresCDC:
def __init__(self, dsn: str, slot_name: str = "agent_cdc"):
self.dsn = dsn
self.slot_name = slot_name
self.conn = None
def setup(self):
self.conn = psycopg2.connect(
self.dsn,
connection_factory=psycopg2.extras.LogicalReplicationConnection,
)
cursor = self.conn.cursor()
try:
cursor.create_replication_slot(
self.slot_name,
output_plugin="wal2json",
)
except psycopg2.errors.DuplicateObject:
pass # slot already exists
def stream_changes(self, callback):
cursor = self.conn.cursor()
cursor.start_replication(
slot_name=self.slot_name,
decode=True,
options={"include-timestamp": "true"},
)
class ChangeHandler:
def __call__(self, msg):
payload = json.loads(msg.payload)
for change in payload.get("change", []):
event = {
"table": change["table"],
"operation": change["kind"],
"timestamp": payload.get("timestamp"),
"data": self._extract_data(change),
}
callback(event)
msg.cursor.send_feedback(flush_lsn=msg.data_start)
def _extract_data(self, change):
if change["kind"] == "delete":
return dict(zip(
change.get("oldkeys", {}).get("keynames", []),
change.get("oldkeys", {}).get("keyvalues", []),
))
return dict(zip(
change.get("columnnames", []),
change.get("columnvalues", []),
))
cursor.consume_stream(ChangeHandler())
Stream Processing with Materialized Views
Raw change events need transformation before agents can use them. A lightweight stream processor enriches events, aggregates related changes, and updates materialized views.
import asyncio
from collections import defaultdict
from datetime import datetime, timedelta
class StreamProcessor:
def __init__(self, vector_store, embedding_client):
self.vector_store = vector_store
self.embedding_client = embedding_client
self.buffer = defaultdict(list)
self.flush_interval = 5 # seconds
async def handle_change(self, event: dict):
table = event["table"]
key = f"{table}:{event['data'].get('id', 'unknown')}"
self.buffer[key].append(event)
async def flush_loop(self):
while True:
await asyncio.sleep(self.flush_interval)
if not self.buffer:
continue
batch = dict(self.buffer)
self.buffer.clear()
for key, events in batch.items():
# Collapse multiple changes to the same record
latest = events[-1]
text = self._to_document(latest)
embedding = await self.embedding_client.embeddings.create(
model="text-embedding-3-small",
input=text,
)
await self.vector_store.upsert(
id=key,
embedding=embedding.data[0].embedding,
document=text,
metadata={
"table": latest["table"],
"updated_at": datetime.utcnow().isoformat(),
"operation": latest["operation"],
},
)
def _to_document(self, event: dict) -> str:
data = event["data"]
parts = [f"{k}: {v}" for k, v in data.items()]
return f"[{event['table']}] " + " | ".join(parts)
The buffer collapses multiple rapid updates to the same record into a single embedding operation, which saves API costs and avoids unnecessary vector index churn.
FAQ
How do I handle webhook failures and ensure no events are lost?
Store every raw webhook payload to a durable queue (Redis Streams, SQS, or a database table) before attempting to process it. If processing fails, the raw event persists for retry. Implement idempotency keys so reprocessed events do not create duplicate side effects.
What is the difference between CDC and database triggers for real-time ingestion?
CDC reads the write-ahead log (WAL) without adding load to your application queries, while triggers execute inside the transaction and can slow down writes. CDC is also more reliable because it captures changes from all sources including migrations and manual SQL, whereas triggers only fire for standard application writes.
How do I prevent the vector store from becoming inconsistent with the source database?
Run a periodic reconciliation job that compares record counts and checksums between the source database and the vector store. Flag discrepancies and re-ingest affected records. This acts as a safety net for edge cases where CDC events are missed during network partitions or slot overflow.
#RealTimeData #CDC #Webhooks #StreamProcessing #DataPipelines #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.