Skip to content
Learn Agentic AI10 min read0 views

Building a Security Scanning Agent: Vulnerability Detection and Remediation Suggestions

Learn how to build an AI agent that scans for CVEs in dependencies, audits container images, generates fix suggestions with priority scoring, and tracks remediation progress.

Why Security Scanning Needs an Agent

Security scanners produce hundreds of findings. A typical Node.js project has 50-200 known vulnerabilities in its dependency tree. The problem is not detection but prioritization and remediation. Teams ignore scanner output because it is overwhelming. An AI security agent triages findings by actual exploitability, generates specific fix code, and tracks which vulnerabilities have been addressed.

Dependency Vulnerability Scanning

The agent wraps existing scanning tools and normalizes their output into a common format.

import subprocess
import json
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum

class VulnSeverity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"

@dataclass
class Vulnerability:
    cve_id: str
    package: str
    installed_version: str
    fixed_version: Optional[str]
    severity: VulnSeverity
    cvss_score: float
    title: str
    description: str
    exploitable: bool = False
    reachable: bool = False
    fix_available: bool = False
    priority_score: float = 0.0
    ecosystem: str = ""  # "npm", "pip", "go", "docker"

class DependencyScanner:
    async def scan_npm(self, project_path: str) -> list[Vulnerability]:
        result = subprocess.run(
            ["npm", "audit", "--json"],
            capture_output=True, text=True, cwd=project_path,
        )
        audit_data = json.loads(result.stdout)
        vulns = []
        for vuln_id, details in audit_data.get("vulnerabilities", {}).items():
            for via in details.get("via", []):
                if isinstance(via, dict):
                    vulns.append(Vulnerability(
                        cve_id=via.get("url", "").split("/")[-1],
                        package=vuln_id,
                        installed_version=details.get("range", "unknown"),
                        fixed_version=details.get("fixAvailable", {}).get("version"),
                        severity=VulnSeverity(details.get("severity", "info")),
                        cvss_score=via.get("cvss", {}).get("score", 0.0),
                        title=via.get("title", "Unknown"),
                        description=via.get("title", ""),
                        fix_available=bool(details.get("fixAvailable")),
                        ecosystem="npm",
                    ))
        return vulns

    async def scan_python(self, project_path: str) -> list[Vulnerability]:
        result = subprocess.run(
            ["pip-audit", "--format=json", "--desc"],
            capture_output=True, text=True, cwd=project_path,
        )
        audit_data = json.loads(result.stdout)
        vulns = []
        for finding in audit_data:
            for vuln in finding.get("vulns", []):
                vulns.append(Vulnerability(
                    cve_id=vuln["id"],
                    package=finding["name"],
                    installed_version=finding["version"],
                    fixed_version=vuln.get("fix_versions", [None])[0],
                    severity=VulnSeverity.HIGH,
                    cvss_score=0.0,
                    title=vuln.get("description", "")[:100],
                    description=vuln.get("description", ""),
                    fix_available=bool(vuln.get("fix_versions")),
                    ecosystem="pip",
                ))
        return vulns

Priority Scoring Engine

Not all CVEs are equal. A critical CVE in a dev-only dependency is less urgent than a medium CVE in a package that handles user input in production.

import openai

async def calculate_priority(
    vuln: Vulnerability, app_context: dict
) -> float:
    """Score 0-100 based on actual risk, not just CVSS."""
    score = 0.0

    # Base: CVSS score (0-40 points)
    score += vuln.cvss_score * 4

    # Fix availability (0-15 points, higher if fix exists)
    if vuln.fix_available:
        score += 15

    # Severity multiplier
    severity_mult = {
        VulnSeverity.CRITICAL: 1.0,
        VulnSeverity.HIGH: 0.8,
        VulnSeverity.MEDIUM: 0.5,
        VulnSeverity.LOW: 0.2,
        VulnSeverity.INFO: 0.1,
    }
    score *= severity_mult[vuln.severity]

    # Context: is this package used in production or just dev?
    if vuln.package in app_context.get("production_deps", []):
        score += 20
    if vuln.package in app_context.get("internet_facing_deps", []):
        score += 15

    return min(100.0, score)

async def assess_exploitability(vuln: Vulnerability) -> dict:
    client = openai.AsyncOpenAI()
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{
            "role": "user",
            "content": f"""Assess the real-world exploitability of this vulnerability.

CVE: {vuln.cve_id}
Package: {vuln.package} v{vuln.installed_version}
Severity: {vuln.severity.value}
CVSS: {vuln.cvss_score}
Description: {vuln.description}

Return JSON with:
- exploitable_remotely: boolean
- requires_user_interaction: boolean
- attack_complexity: "low" or "high"
- known_exploits_in_wild: boolean
- realistic_risk: "critical", "high", "medium", "low"
- reasoning: brief explanation"""
        }],
        response_format={"type": "json_object"},
        temperature=0.0,
    )
    return json.loads(response.choices[0].message.content)

Automated Fix Generation

The agent generates specific fix instructions based on the vulnerability and ecosystem.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

async def generate_fix(vuln: Vulnerability, project_path: str) -> dict:
    if vuln.ecosystem == "npm" and vuln.fix_available:
        return {
            "type": "command",
            "command": f"npm install {vuln.package}@{vuln.fixed_version}",
            "description": f"Update {vuln.package} to {vuln.fixed_version}",
            "breaking_risk": "low" if _is_patch_update(
                vuln.installed_version, vuln.fixed_version
            ) else "medium",
        }

    if vuln.ecosystem == "pip" and vuln.fixed_version:
        return {
            "type": "file_edit",
            "file": "requirements.txt",
            "find": f"{vuln.package}=={vuln.installed_version}",
            "replace": f"{vuln.package}=={vuln.fixed_version}",
            "description": f"Pin {vuln.package} to {vuln.fixed_version}",
            "breaking_risk": "low",
        }

    # For complex cases, use LLM to generate a workaround
    client = openai.AsyncOpenAI()
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{
            "role": "user",
            "content": f"""No direct fix is available for {vuln.cve_id} in
{vuln.package} v{vuln.installed_version}.

Suggest a workaround or mitigation:
- Can the vulnerable function be avoided?
- Can the package be replaced with an alternative?
- What WAF rules could mitigate the attack vector?

Return JSON: workaround, alternative_package, waf_rule, effort_estimate."""
        }],
        response_format={"type": "json_object"},
        temperature=0.2,
    )
    return json.loads(response.choices[0].message.content)

def _is_patch_update(current: str, target: str) -> bool:
    curr_parts = current.split(".")
    target_parts = target.split(".")
    if len(curr_parts) >= 2 and len(target_parts) >= 2:
        return curr_parts[0] == target_parts[0] and curr_parts[1] == target_parts[1]
    return False

Container Image Scanning

async def scan_container_image(image: str) -> list[Vulnerability]:
    result = subprocess.run(
        ["trivy", "image", "--format=json", "--severity=CRITICAL,HIGH", image],
        capture_output=True, text=True, timeout=300,
    )
    data = json.loads(result.stdout)
    vulns = []
    for target in data.get("Results", []):
        for v in target.get("Vulnerabilities", []):
            vulns.append(Vulnerability(
                cve_id=v["VulnerabilityID"],
                package=v["PkgName"],
                installed_version=v["InstalledVersion"],
                fixed_version=v.get("FixedVersion"),
                severity=VulnSeverity(v["Severity"].lower()),
                cvss_score=v.get("CVSS", {}).get("nvd", {}).get("V3Score", 0),
                title=v.get("Title", ""),
                description=v.get("Description", ""),
                fix_available=bool(v.get("FixedVersion")),
                ecosystem="docker",
            ))
    return vulns

FAQ

How do I prevent the agent from creating breaking changes when auto-fixing vulnerabilities?

The agent should never push fixes directly to the main branch. Instead, it creates a pull request for each fix with the specific version change. The PR runs the full test suite in CI. If tests fail, the agent annotates the PR with the failure details and marks the fix as requiring manual review.

Should I scan every commit or on a schedule?

Both. Scan on every commit in CI to catch newly introduced vulnerabilities. Run a full scheduled scan daily to catch newly disclosed CVEs against your existing dependencies. The scheduled scan uses the latest CVE database while the CI scan uses the database snapshot from your last update.

How do I handle false positives that keep reappearing?

Maintain a suppression list with expiration dates and justifications. The agent stores suppressed CVE-package pairs with a reason and a review date. When the review date passes, the suppression expires and the finding reappears. This prevents indefinite suppression of real issues.


#Security #VulnerabilityScanning #CVE #DevSecOps #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.