Claude API Error Handling: Building Resilient AI Applications
Comprehensive guide to handling every error type in the Claude API. Covers HTTP status codes, SDK exceptions, retry strategies, circuit breakers, graceful degradation, and production monitoring patterns.
Why Error Handling Matters More for AI APIs
Traditional API error handling is straightforward: retry on 5xx, fix on 4xx. AI APIs introduce additional complexity:
- Responses are non-deterministic, so retries may produce different results
- Token-based billing means partial failures can still incur costs
- Long-running requests (streaming, extended thinking) have more failure modes
- Rate limits are more aggressive due to compute-intensive processing
- Context window limits create a class of errors unique to LLM APIs
A production application that calls the Claude API without robust error handling will fail in unpredictable and expensive ways.
Claude API Error Types
HTTP Status Codes
| Code | Error | Cause | Action |
|---|---|---|---|
| 400 | Invalid Request | Malformed request, bad parameters | Fix the request; do not retry |
| 401 | Authentication | Invalid or missing API key | Check API key configuration |
| 403 | Permission Denied | Key lacks permission for the resource | Check API key permissions |
| 404 | Not Found | Invalid endpoint or model | Verify model name and endpoint |
| 413 | Request Too Large | Input exceeds maximum size | Reduce input size |
| 429 | Rate Limited | Too many requests or tokens | Retry with backoff |
| 500 | Internal Server Error | Anthropic server issue | Retry with backoff |
| 529 | Overloaded | API is temporarily overloaded | Retry with longer backoff |
SDK Exception Hierarchy (Python)
from anthropic import (
APIError, # Base class for all API errors
APIConnectionError, # Network/connection failures
RateLimitError, # 429 responses
APIStatusError, # All non-2xx responses
AuthenticationError, # 401 responses
PermissionDeniedError, # 403 responses
NotFoundError, # 404 responses
BadRequestError, # 400 responses
InternalServerError, # 500 responses
)
Comprehensive Error Handler
import time
import random
import logging
from anthropic import (
Anthropic, APIConnectionError, RateLimitError,
APIStatusError, BadRequestError
)
logger = logging.getLogger(__name__)
client = Anthropic()
class ClaudeAPIError(Exception):
"""Custom exception with context for Claude API failures."""
def __init__(self, message: str, retryable: bool, original_error: Exception = None):
super().__init__(message)
self.retryable = retryable
self.original_error = original_error
def call_claude(
messages: list,
model: str = "claude-sonnet-4-5-20250514",
max_tokens: int = 4096,
max_retries: int = 3,
base_delay: float = 1.0,
**kwargs,
):
"""Call Claude API with comprehensive error handling and retry logic."""
last_error = None
for attempt in range(max_retries + 1):
try:
response = client.messages.create(
model=model,
max_tokens=max_tokens,
messages=messages,
**kwargs,
)
return response
except BadRequestError as e:
# 400: Client error -- do not retry
logger.error(f"Bad request: {e.message}")
if "prompt is too long" in str(e).lower():
raise ClaudeAPIError(
"Input exceeds context window. Reduce input size.",
retryable=False, original_error=e,
)
if "invalid model" in str(e).lower():
raise ClaudeAPIError(
f"Invalid model: {model}",
retryable=False, original_error=e,
)
raise ClaudeAPIError(str(e), retryable=False, original_error=e)
except RateLimitError as e:
retry_after = int(e.response.headers.get("retry-after", 60))
logger.warning(
f"Rate limited (attempt {attempt + 1}). "
f"Waiting {retry_after}s."
)
last_error = e
if attempt < max_retries:
time.sleep(retry_after + random.uniform(0, 5))
continue
except APIConnectionError as e:
logger.warning(f"Connection error (attempt {attempt + 1}): {e}")
last_error = e
if attempt < max_retries:
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
time.sleep(delay)
continue
except APIStatusError as e:
if e.status_code >= 500:
logger.warning(f"Server error {e.status_code} (attempt {attempt + 1})")
last_error = e
if attempt < max_retries:
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
time.sleep(delay)
continue
else:
raise ClaudeAPIError(
f"API error {e.status_code}: {e.message}",
retryable=False, original_error=e,
)
raise ClaudeAPIError(
f"Failed after {max_retries + 1} attempts",
retryable=True, original_error=last_error,
)
Circuit Breaker Pattern
When the Claude API is experiencing sustained issues, a circuit breaker prevents your application from wasting resources on requests that will fail:
import time
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # All requests rejected
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60,
success_threshold: int = 2,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0
self._lock = Lock()
def can_execute(self) -> bool:
with self._lock:
if self.state == CircuitState.CLOSED:
return True
elif self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
return True
return False
else: # HALF_OPEN
return True
def record_success(self):
with self._lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
else:
self.failure_count = 0
def record_failure(self):
with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
elif self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
breaker = CircuitBreaker()
def call_with_circuit_breaker(messages: list, **kwargs):
if not breaker.can_execute():
raise ClaudeAPIError("Circuit breaker is OPEN. Service unavailable.", retryable=True)
try:
result = call_claude(messages, **kwargs)
breaker.record_success()
return result
except ClaudeAPIError as e:
if e.retryable:
breaker.record_failure()
raise
Graceful Degradation
When Claude is unavailable, your application should degrade gracefully rather than crash:
class AIService:
def __init__(self):
self.client = Anthropic()
self.breaker = CircuitBreaker()
def generate_response(self, user_message: str) -> dict:
"""Generate AI response with fallback chain."""
# Attempt 1: Primary model
try:
if self.breaker.can_execute():
response = call_claude(
messages=[{"role": "user", "content": user_message}],
model="claude-sonnet-4-5-20250514",
)
self.breaker.record_success()
return {"source": "claude-sonnet", "text": response.content[0].text}
except Exception:
self.breaker.record_failure()
# Attempt 2: Fallback to cheaper model
try:
response = call_claude(
messages=[{"role": "user", "content": user_message}],
model="claude-haiku-4-5-20250514",
max_retries=1,
)
return {"source": "claude-haiku-fallback", "text": response.content[0].text}
except Exception:
pass
# Attempt 3: Cached/static response
cached = self.get_cached_response(user_message)
if cached:
return {"source": "cache", "text": cached}
# Attempt 4: Human handoff
return {
"source": "fallback",
"text": "I am currently unable to process your request. "
"A team member will follow up shortly.",
}
Handling Streaming Errors
Streaming introduces mid-stream failure modes. The connection might drop after partial content has been delivered:
def stream_with_recovery(messages: list, max_retries: int = 2):
"""Stream with automatic recovery from mid-stream failures."""
collected_text = ""
for attempt in range(max_retries + 1):
try:
with client.messages.stream(
model="claude-sonnet-4-5-20250514",
max_tokens=4096,
messages=messages,
) as stream:
for text in stream.text_stream:
collected_text += text
yield text
return # Stream completed successfully
except APIConnectionError:
if attempt < max_retries:
logger.warning(f"Stream interrupted at {len(collected_text)} chars. Retrying...")
# On retry, ask Claude to continue from where it left off
messages = messages + [
{"role": "assistant", "content": collected_text},
{"role": "user", "content": "Continue from where you left off."},
]
continue
raise
Monitoring and Alerting
Track error metrics to detect issues before they cascade:
from dataclasses import dataclass, field
from collections import defaultdict
import time
@dataclass
class ErrorMetrics:
errors_by_type: dict = field(default_factory=lambda: defaultdict(int))
errors_by_minute: dict = field(default_factory=lambda: defaultdict(int))
total_requests: int = 0
total_errors: int = 0
def record_error(self, error_type: str):
self.total_errors += 1
self.errors_by_type[error_type] += 1
minute_key = int(time.time() / 60)
self.errors_by_minute[minute_key] += 1
def record_success(self):
self.total_requests += 1
@property
def error_rate(self) -> float:
total = self.total_requests + self.total_errors
return self.total_errors / total if total > 0 else 0
def check_alerts(self):
if self.error_rate > 0.10:
alert("HIGH: Claude API error rate exceeds 10%")
if self.errors_by_type.get("rate_limit", 0) > 50:
alert("WARN: Excessive rate limiting detected")
metrics = ErrorMetrics()
Idempotency for Retries
When retrying requests that have side effects (tool use, data modification), ensure idempotency by using unique request identifiers and checking for duplicate processing on the server side. The Claude API itself is stateless, but your tool implementations may not be.
Always design tool execution to be idempotent -- running the same tool call twice with the same input should produce the same result without unwanted side effects.
NYC News
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.