Building production generative AI applications requires understanding common issues and their solutions. This guide covers troubleshooting techniques for Amazon Bedrock.
Common Error Categories
flowchart TB
subgraph API["API Errors"]
A["Authentication"]
B["Rate Limits"]
C["Validation"]
end
subgraph Model["Model Issues"]
D["Quality"]
E["Hallucination"]
F["Context"]
end
subgraph Infra["Infrastructure"]
G["Latency"]
H["Timeouts"]
I["Availability"]
end
style A fill:#ef4444,color:#fff
style D fill:#f59e0b,color:#fff
style G fill:#3b82f6,color:#fff
API Error Handling
Common Error Codes
| Error Code |
Cause |
Solution |
| AccessDeniedException |
Missing IAM permissions |
Update IAM policy |
| ValidationException |
Invalid request parameters |
Check request format |
| ThrottlingException |
Rate limit exceeded |
Implement backoff |
| ModelTimeoutException |
Request took too long |
Reduce input/output size |
| ServiceUnavailableException |
Service issue |
Retry with backoff |
Error Handling Implementation
import boto3
from botocore.exceptions import ClientError
import time
class BedrockClient:
def __init__(self):
self.runtime = boto3.client('bedrock-runtime')
self.max_retries = 3
self.base_delay = 1
def invoke_with_retry(self, model_id: str, messages: list) -> dict:
last_exception = None
for attempt in range(self.max_retries):
try:
response = self.runtime.converse(
modelId=model_id,
messages=messages,
inferenceConfig={'maxTokens': 1024}
)
return response
except ClientError as e:
error_code = e.response['Error']['Code']
last_exception = e
if error_code == 'ThrottlingException':
delay = self.base_delay * (2 ** attempt)
print(f"Rate limited. Retrying in {delay}s...")
time.sleep(delay)
continue
elif error_code == 'ModelTimeoutException':
raise Exception("Model timeout. Consider reducing input size.")
elif error_code == 'ValidationException':
raise Exception(f"Validation error: {e.response['Error']['Message']}")
elif error_code == 'AccessDeniedException':
raise Exception("Access denied. Check IAM permissions.")
elif error_code in ['ServiceUnavailableException', 'InternalServerException']:
delay = self.base_delay * (2 ** attempt)
print(f"Service error. Retrying in {delay}s...")
time.sleep(delay)
continue
else:
raise
raise last_exception
Rate Limit Management
import threading
from collections import deque
class RateLimiter:
def __init__(self, requests_per_minute: int):
self.requests_per_minute = requests_per_minute
self.request_times = deque()
self.lock = threading.Lock()
def wait_if_needed(self):
with self.lock:
now = time.time()
while self.request_times and now - self.request_times[0] > 60:
self.request_times.popleft()
if len(self.request_times) >= self.requests_per_minute:
sleep_time = 60 - (now - self.request_times[0])
if sleep_time > 0:
time.sleep(sleep_time)
self.request_times.append(time.time())
rate_limiter = RateLimiter(requests_per_minute=60)
def call_bedrock(prompt):
rate_limiter.wait_if_needed()
Model Quality Issues
Diagnosing Poor Responses
def diagnose_response_quality(prompt: str, response: str) -> dict:
"""Analyze response quality issues."""
issues = []
if len(response.strip()) < 10:
issues.append("Response too short - may need more specific prompt")
words = response.lower().split()
unique_ratio = len(set(words)) / len(words) if words else 0
if unique_ratio < 0.5:
issues.append("High repetition detected - lower temperature or revise prompt")
refusal_patterns = ["I cannot", "I'm unable", "I don't have", "As an AI"]
if any(pattern.lower() in response.lower() for pattern in refusal_patterns):
issues.append("Model may be refusing - check for restricted content")
if len(prompt) > 4000 and len(response) < 100:
issues.append("Long prompt with short response - model may be truncating")
return {
'issues': issues,
'word_count': len(words),
'unique_ratio': unique_ratio,
'has_issues': len(issues) > 0
}
Improving Response Quality
def improve_prompt(original_prompt: str, issue_type: str) -> str:
"""Generate improved prompt based on issue type."""
improvements = {
'too_short': f"""Please provide a detailed and comprehensive response.
{original_prompt}
Be thorough in your explanation and include relevant examples.""",
'repetitive': f"""Please provide a varied and diverse response without repetition.
{original_prompt}""",
'off_topic': f"""Focus specifically on answering this question. Stay on topic.
{original_prompt}
Only discuss information directly relevant to the question.""",
'hallucination': f"""Answer based only on factual information. If you're uncertain about something, say so.
{original_prompt}
Important: Do not make up information. Only state what you know to be accurate."""
}
return improvements.get(issue_type, original_prompt)
Handling Hallucinations
def detect_potential_hallucination(response: str, context: str = None) -> dict:
"""Detect potential hallucinations in response."""
indicators = []
specific_patterns = [
r'\d{4}',
r'\d+%',
r'\$\d+',
r'according to',
r'studies show',
r'research indicates'
]
import re
for pattern in specific_patterns:
if re.search(pattern, response) and context:
if not re.search(pattern, context):
indicators.append(f"Specific claim ({pattern}) not found in context")
high_confidence_phrases = [
'definitely', 'certainly', 'always', 'never', 'all', 'none'
]
for phrase in high_confidence_phrases:
if phrase in response.lower():
indicators.append(f"High confidence phrase: '{phrase}'")
return {
'potential_hallucination': len(indicators) > 0,
'indicators': indicators,
'recommendation': 'Verify claims against authoritative sources'
}
Knowledge Bases Troubleshooting
Common Issues
| Issue |
Cause |
Solution |
| No results |
Poor chunking or query |
Adjust chunk size, rephrase query |
| Irrelevant results |
Embedding mismatch |
Improve document quality |
| Missing documents |
Sync failure |
Check sync status, S3 permissions |
| Slow retrieval |
Large index |
Optimize vector store |
Debugging Retrieval
def debug_knowledge_base_retrieval(knowledge_base_id: str, query: str) -> dict:
"""Debug Knowledge Base retrieval issues."""
bedrock_agent = boto3.client('bedrock-agent-runtime')
bedrock = boto3.client('bedrock-agent')
try:
kb_response = bedrock.get_knowledge_base(knowledgeBaseId=knowledge_base_id)
kb_status = kb_response['knowledgeBase']['status']
print(f"Knowledge Base status: {kb_status}")
if kb_status != 'ACTIVE':
return {'error': f'Knowledge Base not active: {kb_status}'}
except Exception as e:
return {'error': f'Failed to get Knowledge Base: {str(e)}'}
try:
response = bedrock_agent.retrieve(
knowledgeBaseId=knowledge_base_id,
retrievalQuery={'text': query},
retrievalConfiguration={
'vectorSearchConfiguration': {
'numberOfResults': 10
}
}
)
results = response['retrievalResults']
print(f"Retrieved {len(results)} results")
analysis = {
'total_results': len(results),
'results': []
}
for i, result in enumerate(results):
score = result.get('score', 0)
content_preview = result['content']['text'][:200]
analysis['results'].append({
'rank': i + 1,
'score': score,
'preview': content_preview,
'location': result['location']['s3Location']['uri']
})
print(f"Result {i+1}: Score={score:.4f}")
print(f" Preview: {content_preview[:100]}...")
return analysis
except Exception as e:
return {'error': f'Retrieval failed: {str(e)}'}
Sync Issues
def check_data_source_sync(knowledge_base_id: str, data_source_id: str) -> dict:
"""Check and debug data source sync status."""
bedrock = boto3.client('bedrock-agent')
response = bedrock.get_data_source(
knowledgeBaseId=knowledge_base_id,
dataSourceId=data_source_id
)
data_source = response['dataSource']
print(f"Data source status: {data_source['status']}")
jobs_response = bedrock.list_ingestion_jobs(
knowledgeBaseId=knowledge_base_id,
dataSourceId=data_source_id,
maxResults=5
)
sync_info = {
'data_source_status': data_source['status'],
'recent_jobs': []
}
for job in jobs_response['ingestionJobSummaries']:
job_info = {
'job_id': job['ingestionJobId'],
'status': job['status'],
'started': str(job.get('startedAt', 'N/A')),
'updated': str(job.get('updatedAt', 'N/A'))
}
if job['status'] in ['COMPLETE', 'FAILED']:
detail_response = bedrock.get_ingestion_job(
knowledgeBaseId=knowledge_base_id,
dataSourceId=data_source_id,
ingestionJobId=job['ingestionJobId']
)
stats = detail_response['ingestionJob'].get('statistics', {})
job_info['statistics'] = stats
sync_info['recent_jobs'].append(job_info)
print(f"Job {job['ingestionJobId']}: {job['status']}")
return sync_info
Latency Troubleshooting
Measuring Latency
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class LatencyMetrics:
total_ms: float
time_to_first_token_ms: Optional[float]
input_tokens: int
output_tokens: int
tokens_per_second: float
def measure_latency(model_id: str, messages: list, stream: bool = False) -> LatencyMetrics:
"""Measure detailed latency metrics."""
runtime = boto3.client('bedrock-runtime')
start_time = time.time()
first_token_time = None
total_output_tokens = 0
if stream:
response = runtime.converse_stream(
modelId=model_id,
messages=messages,
inferenceConfig={'maxTokens': 1024}
)
for event in response['stream']:
if 'contentBlockDelta' in event:
if first_token_time is None:
first_token_time = time.time()
total_output_tokens += 1
end_time = time.time()
ttft = (first_token_time - start_time) * 1000 if first_token_time else None
else:
response = runtime.converse(
modelId=model_id,
messages=messages,
inferenceConfig={'maxTokens': 1024}
)
end_time = time.time()
ttft = None
total_output_tokens = response['usage']['outputTokens']
total_ms = (end_time - start_time) * 1000
tokens_per_second = total_output_tokens / (end_time - start_time) if total_output_tokens > 0 else 0
return LatencyMetrics(
total_ms=total_ms,
time_to_first_token_ms=ttft,
input_tokens=response.get('usage', {}).get('inputTokens', 0),
output_tokens=total_output_tokens,
tokens_per_second=tokens_per_second
)
Latency Optimization Checklist
def latency_optimization_checklist(metrics: LatencyMetrics) -> list:
"""Generate latency optimization recommendations."""
recommendations = []
if metrics.total_ms > 5000:
recommendations.append({
'issue': 'High total latency',
'suggestions': [
'Consider using a faster model (e.g., Haiku instead of Sonnet)',
'Reduce input prompt size',
'Use Provisioned Throughput for consistent latency'
]
})
if metrics.time_to_first_token_ms and metrics.time_to_first_token_ms > 2000:
recommendations.append({
'issue': 'Slow time to first token',
'suggestions': [
'Reduce system prompt length',
'Consider streaming for better perceived performance',
'Check for network latency to Bedrock endpoint'
]
})
if metrics.tokens_per_second < 20:
recommendations.append({
'issue': 'Low throughput',
'suggestions': [
'Check if hitting rate limits',
'Consider Provisioned Throughput',
'Review concurrent request patterns'
]
})
if metrics.input_tokens > 10000:
recommendations.append({
'issue': 'Large input size',
'suggestions': [
'Summarize or compress context',
'Use RAG to retrieve only relevant content',
'Split into multiple requests'
]
})
return recommendations
Debugging Checklist
def comprehensive_debug(model_id: str, prompt: str) -> dict:
"""Run comprehensive debugging checks."""
results = {
'checks': [],
'issues': [],
'recommendations': []
}
runtime = boto3.client('bedrock-runtime')
try:
bedrock = boto3.client('bedrock')
model_info = bedrock.get_foundation_model(modelIdentifier=model_id.split(':')[0])
results['checks'].append({'model_available': True})
except Exception as e:
results['issues'].append(f'Model not available: {str(e)}')
return results
try:
start = time.time()
response = runtime.converse(
modelId=model_id,
messages=[{'role': 'user', 'content': [{'text': 'Hello'}]}],
inferenceConfig={'maxTokens': 10}
)
latency = (time.time() - start) * 1000
results['checks'].append({'basic_invocation': True, 'latency_ms': latency})
except Exception as e:
results['issues'].append(f'Basic invocation failed: {str(e)}')
return results
try:
start = time.time()
response = runtime.converse(
modelId=model_id,
messages=[{'role': 'user', 'content': [{'text': prompt}]}],
inferenceConfig={'maxTokens': 1024}
)
latency = (time.time() - start) * 1000
results['checks'].append({
'full_prompt': True,
'latency_ms': latency,
'input_tokens': response['usage']['inputTokens'],
'output_tokens': response['usage']['outputTokens']
})
output = response['output']['message']['content'][0]['text']
quality = diagnose_response_quality(prompt, output)
if quality['has_issues']:
results['issues'].extend(quality['issues'])
except Exception as e:
results['issues'].append(f'Full prompt test failed: {str(e)}')
return results
Best Practices
| Category |
Practice |
| Error handling |
Always implement retry with backoff |
| Monitoring |
Track latency, errors, and token usage |
| Testing |
Test with edge cases and long inputs |
| Logging |
Log requests and responses for debugging |
| Alerts |
Set up CloudWatch alarms for anomalies |
Key Takeaways
- Implement robust error handling - Retry with exponential backoff
- Monitor response quality - Detect and address hallucinations
- Debug Knowledge Bases - Check sync status and retrieval scores
- Measure latency - Track TTFT and total response time
- Use checklists - Systematic debugging approach
References