Building a KYC/AML Agent: Identity Verification and Transaction Monitoring
Learn to build an AI agent for Know Your Customer and Anti-Money Laundering that verifies identities, screens against sanctions lists, monitors transactions, and generates risk alerts.
Why KYC/AML Needs AI Agents
Know Your Customer (KYC) and Anti-Money Laundering (AML) compliance is one of the most resource-intensive requirements for financial institutions. Banks and fintechs spend billions annually on compliance teams that manually verify identities, screen customers against sanctions lists, and investigate suspicious transactions. An AI agent can automate the routine checks, reduce false positives, and ensure consistent application of risk rules — letting compliance officers focus on genuinely suspicious cases.
Agent Architecture
The KYC/AML agent has four capabilities:
- Identity Verification — validate customer identity documents
- Sanctions Screening — check against watchlists and PEP databases
- Risk Scoring — compute a composite customer risk score
- Transaction Monitoring — detect suspicious patterns in real time
Step 1: Identity Verification
The agent validates identity documents by extracting data and cross-referencing it.
from pydantic import BaseModel
from datetime import date
from enum import Enum
class VerificationStatus(str, Enum):
VERIFIED = "verified"
PENDING_REVIEW = "pending_review"
FAILED = "failed"
EXPIRED = "expired"
class IdentityDocument(BaseModel):
document_type: str # "passport", "drivers_license", "national_id"
document_number: str
full_name: str
date_of_birth: date
expiry_date: date
issuing_country: str
mrz_data: str | None = None # Machine Readable Zone
class VerificationResult(BaseModel):
status: VerificationStatus
document_authentic: bool
name_match: bool
dob_match: bool
document_expired: bool
risk_flags: list[str]
def verify_identity(
document: IdentityDocument,
declared_name: str,
declared_dob: date,
) -> VerificationResult:
"""Verify identity document against declared information."""
flags = []
# Check document expiry
is_expired = document.expiry_date < date.today()
if is_expired:
flags.append("Document is expired")
# Name matching with fuzzy comparison
from difflib import SequenceMatcher
name_similarity = SequenceMatcher(
None,
document.full_name.lower(),
declared_name.lower(),
).ratio()
name_match = name_similarity > 0.85
if not name_match:
flags.append(
f"Name mismatch: doc='{document.full_name}' "
f"vs declared='{declared_name}' "
f"(similarity: {name_similarity:.0%})"
)
# Date of birth verification
dob_match = document.date_of_birth == declared_dob
if not dob_match:
flags.append("Date of birth mismatch")
# High-risk country check
high_risk_countries = {"KP", "IR", "SY", "CU", "MM"}
if document.issuing_country in high_risk_countries:
flags.append(
f"High-risk jurisdiction: {document.issuing_country}"
)
# Determine overall status
if is_expired:
status = VerificationStatus.EXPIRED
elif not name_match or not dob_match:
status = VerificationStatus.FAILED
elif flags:
status = VerificationStatus.PENDING_REVIEW
else:
status = VerificationStatus.VERIFIED
return VerificationResult(
status=status,
document_authentic=True, # Would use document AI in prod
name_match=name_match,
dob_match=dob_match,
document_expired=is_expired,
risk_flags=flags,
)
Step 2: Sanctions Screening
Screen customers against sanctions lists (OFAC, EU, UN) and PEP (Politically Exposed Persons) databases.
from dataclasses import dataclass
@dataclass
class SanctionsHit:
list_name: str
matched_name: str
match_score: float
entity_type: str # "individual", "entity", "vessel"
sanctions_program: str
listed_date: str
class ScreeningResult(BaseModel):
screened_name: str
total_hits: int
hits: list[dict]
risk_level: str # "clear", "potential_match", "confirmed_match"
async def screen_sanctions(
name: str,
dob: date | None = None,
country: str | None = None,
) -> ScreeningResult:
"""Screen a name against sanctions databases."""
import httpx
# Using OpenSanctions API as an example
params = {"q": name, "limit": 10}
if country:
params["countries"] = country
async with httpx.AsyncClient() as client:
resp = await client.get(
"https://api.opensanctions.org/match/default",
params=params,
headers={"Authorization": "ApiKey YOUR_KEY"},
)
data = resp.json()
hits = []
for result in data.get("results", []):
score = result.get("score", 0)
if score > 0.7: # Threshold for potential match
hits.append({
"matched_name": result.get("caption", ""),
"score": score,
"datasets": result.get("datasets", []),
"properties": result.get("properties", {}),
})
if not hits:
risk = "clear"
elif any(h["score"] > 0.95 for h in hits):
risk = "confirmed_match"
else:
risk = "potential_match"
return ScreeningResult(
screened_name=name,
total_hits=len(hits),
hits=hits,
risk_level=risk,
)
Step 3: Customer Risk Scoring
Combine multiple risk factors into a composite score.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
class CustomerRiskProfile(BaseModel):
customer_id: str
risk_score: int # 0-100
risk_level: str # "low", "medium", "high", "critical"
factors: list[dict]
enhanced_due_diligence: bool
next_review_date: date
def calculate_risk_score(
verification: VerificationResult,
screening: ScreeningResult,
customer_data: dict,
) -> CustomerRiskProfile:
"""Calculate composite KYC risk score."""
score = 0
factors = []
# Identity verification (0-20 points)
if verification.status == VerificationStatus.FAILED:
score += 20
factors.append({"factor": "ID verification failed", "points": 20})
elif verification.status == VerificationStatus.PENDING_REVIEW:
score += 10
factors.append({"factor": "ID pending review", "points": 10})
# Sanctions screening (0-40 points)
if screening.risk_level == "confirmed_match":
score += 40
factors.append({"factor": "Sanctions match", "points": 40})
elif screening.risk_level == "potential_match":
score += 20
factors.append({"factor": "Potential sanctions hit", "points": 20})
# Geographic risk (0-15 points)
high_risk_countries = {"AF", "KP", "IR", "SY", "YE"}
country = customer_data.get("country", "")
if country in high_risk_countries:
score += 15
factors.append({"factor": f"High-risk country: {country}", "points": 15})
# Business type risk (0-15 points)
high_risk_businesses = {"casino", "crypto", "money_service"}
biz = customer_data.get("business_type", "")
if biz in high_risk_businesses:
score += 15
factors.append({"factor": f"High-risk business: {biz}", "points": 15})
# Transaction volume risk (0-10 points)
volume = customer_data.get("monthly_volume", 0)
if volume > 100000:
score += 10
factors.append({"factor": "High transaction volume", "points": 10})
# Determine risk level
if score >= 60:
level = "critical"
elif score >= 40:
level = "high"
elif score >= 20:
level = "medium"
else:
level = "low"
# Review schedule based on risk
from datetime import timedelta
review_intervals = {
"critical": 30, "high": 90, "medium": 180, "low": 365
}
next_review = date.today() + timedelta(
days=review_intervals[level]
)
return CustomerRiskProfile(
customer_id=customer_data.get("id", ""),
risk_score=min(score, 100),
risk_level=level,
factors=factors,
enhanced_due_diligence=score >= 40,
next_review_date=next_review,
)
Step 4: Transaction Monitoring
Detect suspicious transaction patterns using rule-based and ML approaches.
class SuspiciousAlert(BaseModel):
alert_id: str
customer_id: str
alert_type: str
severity: str
description: str
transactions: list[str] # Transaction IDs
recommended_action: str
def monitor_transactions(
customer_id: str,
transactions: list[dict],
risk_profile: CustomerRiskProfile,
) -> list[SuspiciousAlert]:
"""Monitor transactions for suspicious patterns."""
alerts = []
import uuid
# Rule 1: Structuring detection (smurfing)
daily_totals = {}
for txn in transactions:
day = txn["date"]
daily_totals.setdefault(day, []).append(txn)
for day, day_txns in daily_totals.items():
amounts = [t["amount"] for t in day_txns]
if (
len(amounts) >= 3
and all(a < 10000 for a in amounts)
and sum(amounts) > 10000
):
alerts.append(
SuspiciousAlert(
alert_id=str(uuid.uuid4()),
customer_id=customer_id,
alert_type="structuring",
severity="high",
description=(
f"Potential structuring: {len(amounts)} "
f"transactions totaling "
f"${sum(amounts):,.2f} on {day}"
),
transactions=[t["id"] for t in day_txns],
recommended_action="File SAR if confirmed",
)
)
# Rule 2: Rapid movement (funds in and out quickly)
# Rule 3: Unusual geography
# Additional rules would follow the same pattern
return alerts
FAQ
How do you reduce false positives in sanctions screening?
Use a multi-pass approach. First run broad fuzzy matching, then apply filters for date of birth, nationality, and other identifying information. Weight exact field matches higher than name-only matches. Track historical false positives to build a whitelist of known safe names that closely match sanctioned entities.
What regulations govern KYC/AML agent design?
In the US, the Bank Secrecy Act and FinCEN regulations set KYC/AML requirements. The EU has Anti-Money Laundering Directives (currently 6AMLD). The FATF provides international standards. Your agent must maintain a full audit trail, support regulatory examination, and allow human override of every automated decision.
Can the agent file Suspicious Activity Reports (SARs) automatically?
The agent can prepare SAR drafts with all required fields pre-populated, but a compliance officer must review and approve every filing. Automated filing without human review would violate regulatory requirements. The agent should queue SAR drafts for review and track approval status.
#KYC #AML #IdentityVerification #Compliance #TransactionMonitoring #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.