Skip to content
Learn Agentic AI
Learn Agentic AI15 min read0 views

Self-Correcting AI Agents: Reflection, Retry, and Validation Loop Patterns

How to build AI agents that catch and fix their own errors through output validation, reflection prompting, retry with feedback, and graceful escalation when self-correction fails.

Why Agents Need Self-Correction

LLMs make mistakes. They hallucinate facts, produce malformed JSON, write code that does not compile, and misinterpret ambiguous instructions. In a single-shot interaction, these errors surface as a bad response that the user manually corrects. In an agentic system, errors compound: a wrong tool call produces wrong data, which feeds into wrong reasoning, which triggers more wrong actions. Without self-correction, agent reliability degrades exponentially with task complexity.

Self-correcting agents implement a closed feedback loop: generate output, validate it against explicit criteria, and if validation fails, reflect on the error and retry with corrective feedback. This pattern can increase task completion rates from 60% to 90%+ on complex multi-step tasks.

Output Validation Patterns

The first line of defense is validating the agent's output before it is used or returned to the user. Validation should be as specific and automated as possible — never rely on the LLM to validate its own output in the same call that generated it.

from dataclasses import dataclass, field
from typing import Any, Callable
from enum import Enum
import json

class ValidationResult(Enum):
    PASS = "pass"
    FAIL = "fail"
    WARN = "warn"

@dataclass
class ValidationCheck:
    name: str
    check_fn: Callable[[Any], bool]
    error_message: str
    severity: str = "error"  # "error" or "warning"

@dataclass
class ValidationReport:
    passed: bool
    checks: list[dict] = field(default_factory=list)
    errors: list[str] = field(default_factory=list)
    warnings: list[str] = field(default_factory=list)

class OutputValidator:
    """Validates agent outputs against a set of rules."""

    def __init__(self):
        self.checks: list[ValidationCheck] = []

    def add_check(
        self,
        name: str,
        check_fn: Callable[[Any], bool],
        error_message: str,
        severity: str = "error",
    ):
        self.checks.append(ValidationCheck(
            name=name,
            check_fn=check_fn,
            error_message=error_message,
            severity=severity,
        ))

    def validate(self, output: Any) -> ValidationReport:
        report = ValidationReport(passed=True)

        for check in self.checks:
            try:
                result = check.check_fn(output)
                report.checks.append({
                    "name": check.name,
                    "result": "pass" if result else "fail",
                })
                if not result:
                    if check.severity == "error":
                        report.passed = False
                        report.errors.append(check.error_message)
                    else:
                        report.warnings.append(check.error_message)
            except Exception as e:
                report.passed = False
                report.errors.append(
                    f"{check.name} raised exception: {e}"
                )

        return report


# Example: Validate JSON output from an agent
json_validator = OutputValidator()

json_validator.add_check(
    name="valid_json",
    check_fn=lambda x: isinstance(json.loads(x) if isinstance(x, str) else x, dict),
    error_message="Output is not valid JSON",
)

json_validator.add_check(
    name="has_required_fields",
    check_fn=lambda x: all(
        k in (json.loads(x) if isinstance(x, str) else x)
        for k in ["action", "reasoning", "confidence"]
    ),
    error_message="Missing required fields: action, reasoning, confidence",
)

json_validator.add_check(
    name="confidence_in_range",
    check_fn=lambda x: 0 <= (json.loads(x) if isinstance(x, str) else x).get("confidence", -1) <= 1,
    error_message="Confidence must be between 0 and 1",
)

Code Output Validation

When agents generate code, static analysis provides stronger validation than string matching:

import ast
import subprocess
import tempfile
from pathlib import Path

class CodeValidator:
    """Validates Python code generated by an agent."""

    async def validate_python(self, code: str) -> ValidationReport:
        report = ValidationReport(passed=True)

        # Check 1: Syntax validity
        try:
            ast.parse(code)
            report.checks.append({
                "name": "syntax", "result": "pass"
            })
        except SyntaxError as e:
            report.passed = False
            report.errors.append(
                f"Syntax error at line {e.lineno}: {e.msg}"
            )
            report.checks.append({
                "name": "syntax", "result": "fail"
            })
            return report  # No point checking further

        # Check 2: Type checking with mypy
        with tempfile.NamedTemporaryFile(
            suffix=".py", mode="w", delete=False
        ) as f:
            f.write(code)
            f.flush()
            result = subprocess.run(
                ["mypy", "--ignore-missing-imports", f.name],
                capture_output=True,
                text=True,
                timeout=30,
            )
            if result.returncode != 0:
                report.warnings.append(
                    f"Type errors: {result.stdout.strip()}"
                )
                report.checks.append({
                    "name": "type_check", "result": "warn"
                })
            else:
                report.checks.append({
                    "name": "type_check", "result": "pass"
                })

        # Check 3: Security scan — no dangerous imports
        dangerous_imports = [
            "os.system", "subprocess.call", "eval(", "exec(",
            "__import__", "pickle.loads",
        ]
        for danger in dangerous_imports:
            if danger in code:
                report.passed = False
                report.errors.append(
                    f"Security risk: {danger} found in code"
                )

        return report

Reflection Prompting

When validation fails, the agent needs to understand what went wrong and how to fix it. Reflection prompting asks the LLM to analyze its own failed output and identify specific errors — then uses that analysis to generate a corrected output.

from dataclasses import dataclass
from typing import Optional

@dataclass
class ReflectionResult:
    original_output: str
    errors_identified: list[str]
    root_cause: str
    corrected_output: str
    correction_confidence: float

class ReflectionAgent:
    """Uses reflection to self-correct agent outputs."""

    REFLECTION_PROMPT = """You made an error in your previous output.

ORIGINAL OUTPUT:
{original_output}

VALIDATION ERRORS:
{errors}

Analyze what went wrong:
1. Identify each specific error
2. Determine the root cause
3. Generate a corrected output that fixes ALL errors

Format:
ERRORS IDENTIFIED:
- [error 1]
- [error 2]

ROOT CAUSE: [why these errors occurred]

CORRECTED OUTPUT:
[your corrected output]

CONFIDENCE: [0.0-1.0]"""

    def __init__(self, llm_client, validator: OutputValidator):
        self.llm = llm_client
        self.validator = validator

    async def generate_with_reflection(
        self,
        prompt: str,
        max_retries: int = 3,
    ) -> dict:
        # Initial generation
        response = await self.llm.chat(
            messages=[{"role": "user", "content": prompt}]
        )
        output = response.content

        attempts = [{"output": output, "attempt": 1}]

        for attempt in range(2, max_retries + 2):
            # Validate
            report = self.validator.validate(output)
            if report.passed:
                return {
                    "output": output,
                    "attempts": len(attempts),
                    "final_validation": report,
                }

            # Reflect and retry
            reflection = await self._reflect(
                output, report.errors
            )
            output = reflection.corrected_output
            attempts.append({
                "output": output,
                "attempt": attempt,
                "reflection": reflection,
            })

        # Final validation
        final_report = self.validator.validate(output)
        return {
            "output": output,
            "attempts": len(attempts),
            "final_validation": final_report,
            "fully_corrected": final_report.passed,
        }

    async def _reflect(
        self, original: str, errors: list[str]
    ) -> ReflectionResult:
        error_text = "\n".join(f"- {e}" for e in errors)

        response = await self.llm.chat(messages=[{
            "role": "user",
            "content": self.REFLECTION_PROMPT.format(
                original_output=original,
                errors=error_text,
            ),
        }])

        return self._parse_reflection(original, response.content)

    def _parse_reflection(
        self, original: str, text: str
    ) -> ReflectionResult:
        errors = []
        root_cause = ""
        corrected = ""
        confidence = 0.5

        sections = text.split("\n")
        current_section = None

        for line in sections:
            line = line.strip()
            if "ERRORS IDENTIFIED" in line:
                current_section = "errors"
            elif "ROOT CAUSE" in line:
                current_section = "root_cause"
                root_cause = line.split(":", 1)[1].strip() if ":" in line else ""
            elif "CORRECTED OUTPUT" in line:
                current_section = "corrected"
            elif "CONFIDENCE" in line:
                try:
                    confidence = float(
                        line.split(":", 1)[1].strip()
                    )
                except (ValueError, IndexError):
                    pass
            elif current_section == "errors" and line.startswith("-"):
                errors.append(line[1:].strip())
            elif current_section == "corrected":
                corrected += line + "\n"

        return ReflectionResult(
            original_output=original,
            errors_identified=errors,
            root_cause=root_cause,
            corrected_output=corrected.strip(),
            correction_confidence=confidence,
        )

Retry with Exponential Feedback

For transient errors (API timeouts, rate limits, non-deterministic LLM failures), a structured retry mechanism with increasing detail in feedback improves success rates without wasting tokens on reflection for every failure.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import asyncio
import random
from typing import TypeVar, Callable, Awaitable

T = TypeVar("T")

class RetryWithFeedback:
    """Retries agent operations with escalating feedback detail."""

    def __init__(
        self,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 30.0,
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay

    async def execute(
        self,
        operation: Callable[..., Awaitable[T]],
        validator: Callable[[T], ValidationReport],
        feedback_escalation: list[str],
        **kwargs,
    ) -> dict:
        """Execute with retry, escalating feedback on each failure.

        feedback_escalation: list of increasingly specific hints.
        Example:
            ["Ensure output is valid JSON",
             "The 'status' field must be 'success' or 'error'",
             "Here is an example of correct output: {...}"]
        """
        errors_so_far = []

        for attempt in range(self.max_retries + 1):
            # Add feedback from previous attempts
            extra_context = ""
            if errors_so_far:
                extra_context = "\n\nPREVIOUS ERRORS:\n"
                extra_context += "\n".join(
                    f"Attempt {i+1}: {e}"
                    for i, e in enumerate(errors_so_far)
                )
                if attempt - 1 < len(feedback_escalation):
                    extra_context += (
                        f"\n\nHINT: {feedback_escalation[attempt - 1]}"
                    )

            try:
                result = await operation(
                    extra_context=extra_context, **kwargs
                )
                report = validator(result)

                if report.passed:
                    return {
                        "result": result,
                        "attempts": attempt + 1,
                        "success": True,
                    }

                errors_so_far.append(
                    "; ".join(report.errors)
                )

            except Exception as e:
                errors_so_far.append(str(e))

            # Exponential backoff with jitter
            if attempt < self.max_retries:
                delay = min(
                    self.base_delay * (2 ** attempt)
                    + random.uniform(0, 1),
                    self.max_delay,
                )
                await asyncio.sleep(delay)

        return {
            "result": None,
            "attempts": self.max_retries + 1,
            "success": False,
            "errors": errors_so_far,
        }

Graceful Escalation

When self-correction fails after multiple attempts, the agent must escalate gracefully rather than producing a bad result. The escalation strategy depends on the context: in a user-facing chat, ask the user for clarification. In an automated pipeline, create a ticket for human review. In a critical system, fail safely with a meaningful error.

from enum import Enum
from dataclasses import dataclass
from typing import Optional

class EscalationLevel(Enum):
    RETRY = "retry"              # Try again with more context
    SIMPLIFY = "simplify"        # Break into smaller sub-tasks
    ASK_USER = "ask_user"        # Request clarification
    HUMAN_REVIEW = "human_review"  # Queue for human
    FAIL_SAFE = "fail_safe"      # Return safe default

@dataclass
class EscalationDecision:
    level: EscalationLevel
    reason: str
    suggested_action: str
    context: dict

class EscalationManager:
    """Decides how to handle agent failures."""

    def __init__(self, llm_client):
        self.llm = llm_client

    async def decide(
        self,
        task: str,
        errors: list[str],
        attempts: int,
        is_user_facing: bool,
        is_critical: bool,
    ) -> EscalationDecision:
        if attempts <= 1:
            return EscalationDecision(
                level=EscalationLevel.RETRY,
                reason="First failure — retry with more context",
                suggested_action="Add error details to prompt",
                context={"errors": errors},
            )

        if attempts <= 2 and not is_critical:
            return EscalationDecision(
                level=EscalationLevel.SIMPLIFY,
                reason="Multiple failures — task may be too complex",
                suggested_action=(
                    "Decompose into simpler sub-tasks"
                ),
                context={"original_task": task},
            )

        if is_user_facing and attempts <= 3:
            # Generate a clarification question
            clarification = await self._generate_clarification(
                task, errors
            )
            return EscalationDecision(
                level=EscalationLevel.ASK_USER,
                reason="Unable to complete — need user input",
                suggested_action=clarification,
                context={"errors": errors},
            )

        if is_critical:
            return EscalationDecision(
                level=EscalationLevel.FAIL_SAFE,
                reason="Critical task failed — returning safe default",
                suggested_action="Return safe default and alert team",
                context={"errors": errors, "attempts": attempts},
            )

        return EscalationDecision(
            level=EscalationLevel.HUMAN_REVIEW,
            reason=f"Failed after {attempts} attempts",
            suggested_action="Create ticket for human review",
            context={
                "task": task,
                "errors": errors,
                "attempts": attempts,
            },
        )

    async def _generate_clarification(
        self, task: str, errors: list[str]
    ) -> str:
        response = await self.llm.chat(messages=[{
            "role": "user",
            "content": (
                f"I tried to complete this task but encountered "
                f"errors. Generate a clear, specific question to "
                f"ask the user that would help me succeed.\n\n"
                f"Task: {task}\n"
                f"Errors: {errors}\n\n"
                f"Question for user:"
            ),
        }])
        return response.content.strip()

Putting It All Together: Self-Correcting Agent Pipeline

Here is how all these patterns combine into a production self-correction pipeline:

class SelfCorrectingAgent:
    """Complete self-correcting agent with validation,
    reflection, retry, and escalation."""

    def __init__(
        self,
        llm_client,
        validator: OutputValidator,
        escalation: EscalationManager,
        max_retries: int = 3,
    ):
        self.llm = llm_client
        self.validator = validator
        self.reflection = ReflectionAgent(llm_client, validator)
        self.escalation = escalation
        self.max_retries = max_retries

    async def execute(
        self,
        task: str,
        is_user_facing: bool = True,
        is_critical: bool = False,
    ) -> dict:
        # Step 1: Generate with reflection-based self-correction
        result = await self.reflection.generate_with_reflection(
            prompt=task,
            max_retries=self.max_retries,
        )

        if result.get("fully_corrected", result["final_validation"].passed):
            return {
                "status": "success",
                "output": result["output"],
                "attempts": result["attempts"],
            }

        # Step 2: Self-correction failed — escalate
        errors = result["final_validation"].errors
        decision = await self.escalation.decide(
            task=task,
            errors=errors,
            attempts=result["attempts"],
            is_user_facing=is_user_facing,
            is_critical=is_critical,
        )

        return {
            "status": "escalated",
            "escalation": decision,
            "partial_output": result["output"],
            "attempts": result["attempts"],
        }

FAQ

How many retry attempts should a self-correcting agent make before escalating?

Three retries is the empirical sweet spot for most tasks. Data from production agent deployments shows that if the agent cannot produce a valid output in 3 attempts with reflection feedback, additional retries have diminishing returns (less than 5% improvement per attempt). The exception is code generation tasks, where 4-5 retries can be worthwhile because compile errors provide very specific feedback that the model can act on directly.

Does reflection prompting work with smaller models?

Reflection requires the model to accurately identify errors in its own output, which is a meta-cognitive task that scales with model capability. Models with 13B+ parameters can do basic reflection (identifying syntax errors, missing fields), but nuanced reflection (identifying logical errors, subtle hallucinations) requires 70B+ or frontier-class models. A practical compromise is to use a smaller model for generation and a larger model for reflection/evaluation.

How do you prevent infinite correction loops?

Three mechanisms: (1) a hard maximum retry count that triggers escalation regardless of what the reflection suggests, (2) a diversity check that ensures each retry attempt is meaningfully different from the previous one (if the model is producing the same wrong output repeatedly, escalate immediately), and (3) a cost budget that tracks total tokens consumed and escalates when the correction cost exceeds the value of the task.

Can self-correction fix hallucinations?

Self-correction can catch hallucinations that contradict verifiable facts (e.g., the agent says "Python was created in 2005" and a fact-checking tool catches it). It cannot catch hallucinations that are plausible but wrong, because the same model that generated the hallucination will likely validate it during reflection. For hallucination-sensitive applications, ground all outputs in retrieved documents (RAG) and validate factual claims against external sources rather than relying on the model's self-assessment.


#SelfCorrection #Reflection #Validation #ErrorHandling #AgentPatterns #AIReliability

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.