Skip to content
Learn Agentic AI14 min read0 views

Building a KYC/AML Agent: Identity Verification and Transaction Monitoring

Learn to build an AI agent for Know Your Customer and Anti-Money Laundering that verifies identities, screens against sanctions lists, monitors transactions, and generates risk alerts.

Why KYC/AML Needs AI Agents

Know Your Customer (KYC) and Anti-Money Laundering (AML) compliance is one of the most resource-intensive requirements for financial institutions. Banks and fintechs spend billions annually on compliance teams that manually verify identities, screen customers against sanctions lists, and investigate suspicious transactions. An AI agent can automate the routine checks, reduce false positives, and ensure consistent application of risk rules — letting compliance officers focus on genuinely suspicious cases.

Agent Architecture

The KYC/AML agent has four capabilities:

  1. Identity Verification — validate customer identity documents
  2. Sanctions Screening — check against watchlists and PEP databases
  3. Risk Scoring — compute a composite customer risk score
  4. Transaction Monitoring — detect suspicious patterns in real time

Step 1: Identity Verification

The agent validates identity documents by extracting data and cross-referencing it.

from pydantic import BaseModel
from datetime import date
from enum import Enum


class VerificationStatus(str, Enum):
    VERIFIED = "verified"
    PENDING_REVIEW = "pending_review"
    FAILED = "failed"
    EXPIRED = "expired"


class IdentityDocument(BaseModel):
    document_type: str  # "passport", "drivers_license", "national_id"
    document_number: str
    full_name: str
    date_of_birth: date
    expiry_date: date
    issuing_country: str
    mrz_data: str | None = None  # Machine Readable Zone


class VerificationResult(BaseModel):
    status: VerificationStatus
    document_authentic: bool
    name_match: bool
    dob_match: bool
    document_expired: bool
    risk_flags: list[str]


def verify_identity(
    document: IdentityDocument,
    declared_name: str,
    declared_dob: date,
) -> VerificationResult:
    """Verify identity document against declared information."""
    flags = []

    # Check document expiry
    is_expired = document.expiry_date < date.today()
    if is_expired:
        flags.append("Document is expired")

    # Name matching with fuzzy comparison
    from difflib import SequenceMatcher

    name_similarity = SequenceMatcher(
        None,
        document.full_name.lower(),
        declared_name.lower(),
    ).ratio()
    name_match = name_similarity > 0.85
    if not name_match:
        flags.append(
            f"Name mismatch: doc='{document.full_name}' "
            f"vs declared='{declared_name}' "
            f"(similarity: {name_similarity:.0%})"
        )

    # Date of birth verification
    dob_match = document.date_of_birth == declared_dob
    if not dob_match:
        flags.append("Date of birth mismatch")

    # High-risk country check
    high_risk_countries = {"KP", "IR", "SY", "CU", "MM"}
    if document.issuing_country in high_risk_countries:
        flags.append(
            f"High-risk jurisdiction: {document.issuing_country}"
        )

    # Determine overall status
    if is_expired:
        status = VerificationStatus.EXPIRED
    elif not name_match or not dob_match:
        status = VerificationStatus.FAILED
    elif flags:
        status = VerificationStatus.PENDING_REVIEW
    else:
        status = VerificationStatus.VERIFIED

    return VerificationResult(
        status=status,
        document_authentic=True,  # Would use document AI in prod
        name_match=name_match,
        dob_match=dob_match,
        document_expired=is_expired,
        risk_flags=flags,
    )

Step 2: Sanctions Screening

Screen customers against sanctions lists (OFAC, EU, UN) and PEP (Politically Exposed Persons) databases.

from dataclasses import dataclass


@dataclass
class SanctionsHit:
    list_name: str
    matched_name: str
    match_score: float
    entity_type: str  # "individual", "entity", "vessel"
    sanctions_program: str
    listed_date: str


class ScreeningResult(BaseModel):
    screened_name: str
    total_hits: int
    hits: list[dict]
    risk_level: str  # "clear", "potential_match", "confirmed_match"


async def screen_sanctions(
    name: str,
    dob: date | None = None,
    country: str | None = None,
) -> ScreeningResult:
    """Screen a name against sanctions databases."""
    import httpx

    # Using OpenSanctions API as an example
    params = {"q": name, "limit": 10}
    if country:
        params["countries"] = country

    async with httpx.AsyncClient() as client:
        resp = await client.get(
            "https://api.opensanctions.org/match/default",
            params=params,
            headers={"Authorization": "ApiKey YOUR_KEY"},
        )
        data = resp.json()

    hits = []
    for result in data.get("results", []):
        score = result.get("score", 0)
        if score > 0.7:  # Threshold for potential match
            hits.append({
                "matched_name": result.get("caption", ""),
                "score": score,
                "datasets": result.get("datasets", []),
                "properties": result.get("properties", {}),
            })

    if not hits:
        risk = "clear"
    elif any(h["score"] > 0.95 for h in hits):
        risk = "confirmed_match"
    else:
        risk = "potential_match"

    return ScreeningResult(
        screened_name=name,
        total_hits=len(hits),
        hits=hits,
        risk_level=risk,
    )

Step 3: Customer Risk Scoring

Combine multiple risk factors into a composite score.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

class CustomerRiskProfile(BaseModel):
    customer_id: str
    risk_score: int  # 0-100
    risk_level: str  # "low", "medium", "high", "critical"
    factors: list[dict]
    enhanced_due_diligence: bool
    next_review_date: date


def calculate_risk_score(
    verification: VerificationResult,
    screening: ScreeningResult,
    customer_data: dict,
) -> CustomerRiskProfile:
    """Calculate composite KYC risk score."""
    score = 0
    factors = []

    # Identity verification (0-20 points)
    if verification.status == VerificationStatus.FAILED:
        score += 20
        factors.append({"factor": "ID verification failed", "points": 20})
    elif verification.status == VerificationStatus.PENDING_REVIEW:
        score += 10
        factors.append({"factor": "ID pending review", "points": 10})

    # Sanctions screening (0-40 points)
    if screening.risk_level == "confirmed_match":
        score += 40
        factors.append({"factor": "Sanctions match", "points": 40})
    elif screening.risk_level == "potential_match":
        score += 20
        factors.append({"factor": "Potential sanctions hit", "points": 20})

    # Geographic risk (0-15 points)
    high_risk_countries = {"AF", "KP", "IR", "SY", "YE"}
    country = customer_data.get("country", "")
    if country in high_risk_countries:
        score += 15
        factors.append({"factor": f"High-risk country: {country}", "points": 15})

    # Business type risk (0-15 points)
    high_risk_businesses = {"casino", "crypto", "money_service"}
    biz = customer_data.get("business_type", "")
    if biz in high_risk_businesses:
        score += 15
        factors.append({"factor": f"High-risk business: {biz}", "points": 15})

    # Transaction volume risk (0-10 points)
    volume = customer_data.get("monthly_volume", 0)
    if volume > 100000:
        score += 10
        factors.append({"factor": "High transaction volume", "points": 10})

    # Determine risk level
    if score >= 60:
        level = "critical"
    elif score >= 40:
        level = "high"
    elif score >= 20:
        level = "medium"
    else:
        level = "low"

    # Review schedule based on risk
    from datetime import timedelta
    review_intervals = {
        "critical": 30, "high": 90, "medium": 180, "low": 365
    }
    next_review = date.today() + timedelta(
        days=review_intervals[level]
    )

    return CustomerRiskProfile(
        customer_id=customer_data.get("id", ""),
        risk_score=min(score, 100),
        risk_level=level,
        factors=factors,
        enhanced_due_diligence=score >= 40,
        next_review_date=next_review,
    )

Step 4: Transaction Monitoring

Detect suspicious transaction patterns using rule-based and ML approaches.

class SuspiciousAlert(BaseModel):
    alert_id: str
    customer_id: str
    alert_type: str
    severity: str
    description: str
    transactions: list[str]  # Transaction IDs
    recommended_action: str


def monitor_transactions(
    customer_id: str,
    transactions: list[dict],
    risk_profile: CustomerRiskProfile,
) -> list[SuspiciousAlert]:
    """Monitor transactions for suspicious patterns."""
    alerts = []
    import uuid

    # Rule 1: Structuring detection (smurfing)
    daily_totals = {}
    for txn in transactions:
        day = txn["date"]
        daily_totals.setdefault(day, []).append(txn)

    for day, day_txns in daily_totals.items():
        amounts = [t["amount"] for t in day_txns]
        if (
            len(amounts) >= 3
            and all(a < 10000 for a in amounts)
            and sum(amounts) > 10000
        ):
            alerts.append(
                SuspiciousAlert(
                    alert_id=str(uuid.uuid4()),
                    customer_id=customer_id,
                    alert_type="structuring",
                    severity="high",
                    description=(
                        f"Potential structuring: {len(amounts)} "
                        f"transactions totaling "
                        f"${sum(amounts):,.2f} on {day}"
                    ),
                    transactions=[t["id"] for t in day_txns],
                    recommended_action="File SAR if confirmed",
                )
            )

    # Rule 2: Rapid movement (funds in and out quickly)
    # Rule 3: Unusual geography
    # Additional rules would follow the same pattern

    return alerts

FAQ

How do you reduce false positives in sanctions screening?

Use a multi-pass approach. First run broad fuzzy matching, then apply filters for date of birth, nationality, and other identifying information. Weight exact field matches higher than name-only matches. Track historical false positives to build a whitelist of known safe names that closely match sanctioned entities.

What regulations govern KYC/AML agent design?

In the US, the Bank Secrecy Act and FinCEN regulations set KYC/AML requirements. The EU has Anti-Money Laundering Directives (currently 6AMLD). The FATF provides international standards. Your agent must maintain a full audit trail, support regulatory examination, and allow human override of every automated decision.

Can the agent file Suspicious Activity Reports (SARs) automatically?

The agent can prepare SAR drafts with all required fields pre-populated, but a compliance officer must review and approve every filing. Automated filing without human review would violate regulatory requirements. The agent should queue SAR drafts for review and track approval status.


#KYC #AML #IdentityVerification #Compliance #TransactionMonitoring #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.