Advanced RAG (Retrieval-Augmented Generation) patterns go beyond simple vector search to improve retrieval quality, relevance, and response accuracy. This article covers techniques to build production-grade RAG systems with Amazon Bedrock.
Advanced RAG Architecture
flowchart TB
subgraph Query["Query Processing"]
A["User Query"]
B["Query Rewriting"]
C["Query Expansion"]
end
subgraph Retrieval["Multi-Stage Retrieval"]
D["Semantic Search"]
E["Keyword Search"]
F["Hybrid Fusion"]
end
subgraph Ranking["Post-Processing"]
G["Re-ranking"]
H["Context Compression"]
end
subgraph Generation["Response"]
I["LLM Generation"]
J["Citation"]
end
A --> B
B --> C
C --> D
C --> E
D --> F
E --> F
F --> G
G --> H
H --> I
I --> J
style B fill:#3b82f6,color:#fff
style G fill:#8b5cf6,color:#fff
Query Rewriting
HyDE (Hypothetical Document Embeddings)
import boto3
import json
runtime = boto3.client('bedrock-runtime')
bedrock_agent = boto3.client('bedrock-agent-runtime')
def generate_hypothetical_document(query: str) -> str:
"""Generate a hypothetical answer to use for retrieval."""
response = runtime.converse(
modelId='anthropic.claude-3-haiku-20240307-v1:0',
messages=[{
'role': 'user',
'content': [{
'text': f"""Write a detailed paragraph that would be a perfect answer to this question: {query}
Write as if this is from a technical document. Do not include any preamble."""
}]
}],
inferenceConfig={'maxTokens': 300}
)
return response['output']['message']['content'][0]['text']
def hyde_retrieval(query: str, knowledge_base_id: str) -> list:
# Generate hypothetical document
hypothetical_doc = generate_hypothetical_document(query)
# Use hypothetical document for retrieval
response = bedrock_agent.retrieve(
knowledgeBaseId=knowledge_base_id,
retrievalQuery={'text': hypothetical_doc},
retrievalConfiguration={
'vectorSearchConfiguration': {
'numberOfResults': 5
}
}
)
return response['retrievalResults']
Query Expansion
def expand_query(query: str) -> list:
"""Generate multiple query variations."""
response = runtime.converse(
modelId='anthropic.claude-3-haiku-20240307-v1:0',
messages=[{
'role': 'user',
'content': [{
'text': f"""Generate 3 alternative phrasings for this search query. Return only the queries, one per line.
Original query: {query}"""
}]
}],
inferenceConfig={'maxTokens': 200}
)
variations = response['output']['message']['content'][0]['text'].strip().split('\n')
return [query] + [v.strip() for v in variations if v.strip()]
def multi_query_retrieval(query: str, knowledge_base_id: str) -> list:
queries = expand_query(query)
all_results = []
seen_ids = set()
for q in queries:
response = bedrock_agent.retrieve(
knowledgeBaseId=knowledge_base_id,
retrievalQuery={'text': q},
retrievalConfiguration={
'vectorSearchConfiguration': {'numberOfResults': 3}
}
)
for result in response['retrievalResults']:
result_id = result['location']['s3Location']['uri']
if result_id not in seen_ids:
seen_ids.add(result_id)
all_results.append(result)
return all_results
Hybrid Search
Combining Semantic and Keyword Search
from opensearchpy import OpenSearch
class HybridSearchRetriever:
def __init__(self, opensearch_host: str, index_name: str, knowledge_base_id: str):
self.os_client = OpenSearch(
hosts=[opensearch_host],
use_ssl=True
)
self.index_name = index_name
self.knowledge_base_id = knowledge_base_id
self.bedrock_agent = boto3.client('bedrock-agent-runtime')
def keyword_search(self, query: str, k: int = 5) -> list:
"""BM25 keyword search."""
response = self.os_client.search(
index=self.index_name,
body={
'query': {
'multi_match': {
'query': query,
'fields': ['content', 'title^2'],
'type': 'best_fields'
}
},
'size': k
}
)
return [hit['_source'] for hit in response['hits']['hits']]
def semantic_search(self, query: str, k: int = 5) -> list:
"""Vector semantic search via Knowledge Bases."""
response = self.bedrock_agent.retrieve(
knowledgeBaseId=self.knowledge_base_id,
retrievalQuery={'text': query},
retrievalConfiguration={
'vectorSearchConfiguration': {'numberOfResults': k}
}
)
return response['retrievalResults']
def hybrid_search(self, query: str, k: int = 5,
semantic_weight: float = 0.7) -> list:
"""Combine keyword and semantic search with RRF."""
keyword_results = self.keyword_search(query, k * 2)
semantic_results = self.semantic_search(query, k * 2)
# Reciprocal Rank Fusion
rrf_scores = {}
rrf_k = 60 # RRF constant
for rank, result in enumerate(keyword_results):
doc_id = result.get('id', str(result))
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + (1 - semantic_weight) / (rrf_k + rank + 1)
for rank, result in enumerate(semantic_results):
doc_id = result['location']['s3Location']['uri']
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + semantic_weight / (rrf_k + rank + 1)
# Sort by RRF score and return top k
sorted_docs = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
return sorted_docs[:k]
Re-ranking
LLM-based Re-ranking
def rerank_with_llm(query: str, documents: list, top_k: int = 3) -> list:
"""Re-rank documents using LLM scoring."""
# Format documents for scoring
doc_list = "\n".join([
f"[{i+1}] {doc['content']['text'][:500]}"
for i, doc in enumerate(documents)
])
response = runtime.converse(
modelId='anthropic.claude-3-haiku-20240307-v1:0',
messages=[{
'role': 'user',
'content': [{
'text': f"""Given the query and documents below, rank the documents by relevance to the query.
Return only the document numbers in order of relevance, comma-separated.
Query: {query}
Documents:
{doc_list}
Most relevant to least relevant (numbers only):"""
}]
}],
inferenceConfig={'maxTokens': 50}
)
# Parse ranking
ranking_text = response['output']['message']['content'][0]['text']
rankings = [int(x.strip()) - 1 for x in ranking_text.split(',') if x.strip().isdigit()]
# Reorder documents
reranked = [documents[i] for i in rankings if i < len(documents)]
return reranked[:top_k]
Cross-Encoder Re-ranking
def cross_encoder_rerank(query: str, documents: list, top_k: int = 3) -> list:
"""Score each query-document pair independently."""
scored_docs = []
for doc in documents:
response = runtime.converse(
modelId='anthropic.claude-3-haiku-20240307-v1:0',
messages=[{
'role': 'user',
'content': [{
'text': f"""Rate the relevance of this document to the query on a scale of 0-10.
Return only the number.
Query: {query}
Document: {doc['content']['text'][:1000]}
Relevance score:"""
}]
}],
inferenceConfig={'maxTokens': 10}
)
try:
score = float(response['output']['message']['content'][0]['text'].strip())
except ValueError:
score = 0
scored_docs.append((doc, score))
# Sort by score
scored_docs.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, _ in scored_docs[:top_k]]
Context Compression
Extractive Compression
def compress_context(query: str, documents: list, max_tokens: int = 2000) -> str:
"""Extract only relevant portions of documents."""
combined_text = "\n\n---\n\n".join([
doc['content']['text'] for doc in documents
])
response = runtime.converse(
modelId='anthropic.claude-3-sonnet-20240229-v1:0',
messages=[{
'role': 'user',
'content': [{
'text': f"""Extract only the sentences and paragraphs from the following documents that are directly relevant to answering this question: {query}
Preserve exact wording. Do not summarize or paraphrase. Include source markers.
Documents:
{combined_text}
Relevant excerpts:"""
}]
}],
inferenceConfig={'maxTokens': max_tokens}
)
return response['output']['message']['content'][0]['text']
Multi-Step Retrieval
Iterative Retrieval
def iterative_retrieval(query: str, knowledge_base_id: str, max_iterations: int = 3) -> dict:
"""Iteratively retrieve and refine based on gaps."""
all_context = []
current_query = query
for iteration in range(max_iterations):
# Retrieve
response = bedrock_agent.retrieve(
knowledgeBaseId=knowledge_base_id,
retrievalQuery={'text': current_query},
retrievalConfiguration={
'vectorSearchConfiguration': {'numberOfResults': 3}
}
)
new_context = [r['content']['text'] for r in response['retrievalResults']]
all_context.extend(new_context)
# Check if we have enough information
check_response = runtime.converse(
modelId='anthropic.claude-3-haiku-20240307-v1:0',
messages=[{
'role': 'user',
'content': [{
'text': f"""Given this context, can you fully answer the question: {query}
Context: {' '.join(all_context)}
If yes, respond "SUFFICIENT". If no, respond with what additional information is needed."""
}]
}],
inferenceConfig={'maxTokens': 100}
)
check_text = check_response['output']['message']['content'][0]['text']
if 'SUFFICIENT' in check_text.upper():
break
# Refine query for next iteration
current_query = f"{query} Additional context needed: {check_text}"
return {
'context': all_context,
'iterations': iteration + 1
}
Complete Advanced RAG Pipeline
class AdvancedRAGPipeline:
def __init__(self, knowledge_base_id: str):
self.knowledge_base_id = knowledge_base_id
self.runtime = boto3.client('bedrock-runtime')
self.bedrock_agent = boto3.client('bedrock-agent-runtime')
def process_query(self, query: str) -> str:
# Step 1: Query expansion
expanded_queries = self._expand_query(query)
# Step 2: Multi-query retrieval
all_results = []
for q in expanded_queries:
results = self._retrieve(q)
all_results.extend(results)
# Step 3: Deduplicate
unique_results = self._deduplicate(all_results)
# Step 4: Re-rank
reranked = self._rerank(query, unique_results)
# Step 5: Compress context
compressed = self._compress(query, reranked)
# Step 6: Generate response
response = self._generate(query, compressed)
return response
def _expand_query(self, query: str) -> list:
response = self.runtime.converse(
modelId='anthropic.claude-3-haiku-20240307-v1:0',
messages=[{
'role': 'user',
'content': [{'text': f"Generate 2 alternative search queries for: {query}\nReturn only queries, one per line."}]
}],
inferenceConfig={'maxTokens': 100}
)
variations = response['output']['message']['content'][0]['text'].strip().split('\n')
return [query] + [v.strip() for v in variations[:2]]
def _retrieve(self, query: str) -> list:
response = self.bedrock_agent.retrieve(
knowledgeBaseId=self.knowledge_base_id,
retrievalQuery={'text': query},
retrievalConfiguration={'vectorSearchConfiguration': {'numberOfResults': 5}}
)
return response['retrievalResults']
def _deduplicate(self, results: list) -> list:
seen = set()
unique = []
for r in results:
uri = r['location']['s3Location']['uri']
if uri not in seen:
seen.add(uri)
unique.append(r)
return unique
def _rerank(self, query: str, results: list) -> list:
if len(results) <= 3:
return results
doc_list = "\n".join([f"[{i+1}] {r['content']['text'][:300]}" for i, r in enumerate(results[:10])])
response = self.runtime.converse(
modelId='anthropic.claude-3-haiku-20240307-v1:0',
messages=[{'role': 'user', 'content': [{'text': f"Rank by relevance to '{query}':\n{doc_list}\nReturn numbers only, comma-separated:"}]}],
inferenceConfig={'maxTokens': 30}
)
rankings = [int(x.strip())-1 for x in response['output']['message']['content'][0]['text'].split(',') if x.strip().isdigit()]
return [results[i] for i in rankings[:5] if i < len(results)]
def _compress(self, query: str, results: list) -> str:
context = "\n\n".join([r['content']['text'] for r in results])
if len(context) < 3000:
return context
response = self.runtime.converse(
modelId='anthropic.claude-3-haiku-20240307-v1:0',
messages=[{'role': 'user', 'content': [{'text': f"Extract relevant parts for '{query}':\n{context}"}]}],
inferenceConfig={'maxTokens': 1500}
)
return response['output']['message']['content'][0]['text']
def _generate(self, query: str, context: str) -> str:
response = self.runtime.converse(
modelId='anthropic.claude-3-sonnet-20240229-v1:0',
messages=[{'role': 'user', 'content': [{'text': f"Context:\n{context}\n\nQuestion: {query}\n\nAnswer based on the context:"}]}],
inferenceConfig={'maxTokens': 1024}
)
return response['output']['message']['content'][0]['text']
# Usage
pipeline = AdvancedRAGPipeline('your-knowledge-base-id')
answer = pipeline.process_query("What are the best practices for error handling?")
Best Practices
| Technique | When to Use |
|---|---|
| Query expansion | Ambiguous or short queries |
| HyDE | Technical/domain-specific queries |
| Hybrid search | Mixed keyword/semantic needs |
| Re-ranking | Large result sets |
| Context compression | Token limit constraints |
Key Takeaways
- Query preprocessing - Expand and rewrite queries for better retrieval
- Hybrid search - Combine semantic and keyword search
- Re-ranking - Use LLM to improve result ordering
- Context optimization - Compress to fit token limits
- Iterative retrieval - Refine based on information gaps