在智能体开发领域,我们常常会遇到这样的困境:训练好的模型面对训练集之外的问题时,表现往往不尽如人意。这就像让一个学生只靠死记硬背应付考试,一旦遇到没背过的题目就束手无策。RAG(Retrieval-Augmented Generation)技术的出现,为这个问题提供了优雅的解决方案。
RAG的核心思想很简单——当Agent遇到问题时,不是仅依赖模型内部的知识,而是先从一个外部知识库中检索相关信息,再基于这些信息生成回答。这就像给Agent配备了一个随时可查阅的"外脑",让它能够突破训练数据的限制,处理更广泛的问题。
我在实际项目中发现,RAG特别适合以下场景:
一个完整的RAG系统通常包含三个关键组件:
检索器(Retriever):负责从知识库中查找相关文档
生成器(Generator):基于检索结果生成最终回答
知识库:存储可供检索的结构化/非结构化数据
工作流程示例:
python复制def rag_pipeline(query):
# 检索阶段
retrieved_docs = retriever.search(query, top_k=5)
# 重排序阶段(可选)
reranked_docs = reranker(query, retrieved_docs)
# 生成阶段
prompt = build_prompt(query, reranked_docs)
response = generator.generate(prompt)
return response
在实际项目中,检索器的性能往往决定了整个系统的上限。经过多次实验,我总结了以下经验:
密集检索 vs 稀疏检索
提示:生产环境中建议使用ColBERT这样的延迟优化方案,它在保持精度的同时将检索延迟降低了3-5倍。
混合检索实践
python复制from pyserini.search import LuceneSearcher
from sentence_transformers import CrossEncoder
# 初始化组件
sparse_retriever = LuceneSearcher('indexes/')
dense_retriever = DPRRetriever()
reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
def hybrid_search(query, top_k=10):
# 并行执行两种检索
sparse_results = sparse_retriever.search(query, k=top_k*3)
dense_results = dense_retriever.search(query, k=top_k*3)
# 合并并去重
all_results = merge_results(sparse_results, dense_results)
# 重排序
reranked = reranker.predict([(query, doc.text) for doc in all_results])
final_results = [doc for _, doc in sorted(zip(reranked, all_results), reverse=True)]
return final_results[:top_k]
知识库的质量直接影响检索效果。在最近的一个医疗问答项目中,我们踩过不少坑后总结出以下流程:
文档清洗
文本分块
元数据标注
python复制from langchain.text_splitter import RecursiveCharacterTextSplitter
# 更智能的分块方案
medical_splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=64,
separators=["\n\n## ", "\n\n", "。", "!", "?", "\n", " "],
keep_separator=True
)
# 添加自定义处理
def preprocess_medical_text(text):
# 标准化药品名称
text = standardize_drug_names(text)
# 处理缩写
text = expand_abbreviations(text)
return text
向量检索是RAG的核心,但直接使用原始嵌入效果往往不佳。我们通过以下技巧提升了20%的检索准确率:
嵌入模型选择
索引优化技巧
python复制import faiss
from sentence_transformers import SentenceTransformer
# 构建FAISS索引
encoder = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
embeddings = encoder.encode(docs)
# 降维处理
pca = faiss.PCA_transform(embeddings.shape[1], 256)
embeddings = pca.apply(embeddings)
# 构建HNSW索引
index = faiss.IndexHNSWFlat(256, 32)
index.add(embeddings)
检索到的文档如何有效地输入生成模型?这是我们经过大量实验总结的prompt模板:
基础模板
code复制请基于以下提供的参考资料回答问题。如果资料不足以回答问题,请明确说明。
问题:{query}
参考资料:
1. {doc1}
2. {doc2}
...
进阶技巧
python复制def build_medical_prompt(query, docs):
prompt = f"""你是一位专业的医疗助手。请谨慎基于以下信息回答问题:
问题:{query}
参考资料:
"""
for i, doc in enumerate(docs, 1):
prompt += f"{i}. {doc['text']}\n来源:{doc['metadata']['source']}\n\n"
prompt += """
回答要求:
1. 先判断问题是否属于医疗咨询
2. 说明参考资料是否足够回答
3. 给出专业但易懂的解释
4. 标注引用来源编号
"""
return prompt
在医疗、法律等高风险领域,生成结果的可靠性至关重要。我们设计了多层验证机制:
事实一致性检查
安全过滤
溯源增强
python复制from transformers import pipeline
# 初始化验证组件
nli_checker = pipeline("text-classification", model="roberta-large-mnli")
safety_filter = SafetyFilter()
def validate_response(response, source_docs):
# 事实一致性检查
claims = extract_claims(response)
for claim in claims:
evidence = find_supporting_evidence(claim, source_docs)
if not evidence:
response = add_disclaimer(response)
# 安全过滤
if safety_filter.detect_unsafe_content(response):
response = "出于安全考虑,我无法回答这个问题。"
return response
在生产环境中,RAG系统的响应速度至关重要。通过以下优化,我们将端到端延迟从2.3s降到了680ms:
检索阶段
生成阶段
系统级优化
python复制import torch
from transformers import pipeline
# 量化模型加载
generator = pipeline(
"text-generation",
model="distilgpt3-medical",
device="cuda",
torch_dtype=torch.float16
)
# 带缓存的检索
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_retrieval(query):
return hybrid_search(query)
要全面评估RAG系统,需要关注三个维度的指标:
检索质量
生成质量
系统性能
我们开发了一个自动化评估框架:
python复制class RAGEvaluator:
def __init__(self, test_queries):
self.test_queries = test_queries
def evaluate_retrieval(self, retriever):
recalls = []
for query, gold_docs in self.test_queries:
results = retriever.search(query)
recall = len(set(gold_docs) & set(results)) / len(gold_docs)
recalls.append(recall)
return np.mean(recalls)
def evaluate_generation(self, full_pipeline):
scores = []
for query, gold_answer in self.test_queries:
pred = full_pipeline(query)
scores.append(calculate_rouge(pred, gold_answer))
return np.mean(scores)
在实际部署中,我们遇到过各种意外情况,以下是几个典型案例:
冷启动问题
概念漂移
长尾查询
python复制# 概念漂移检测示例
from sklearn.decomposition import PCA
def detect_concept_drift(new_docs, old_embeddings):
new_embeds = encoder.encode(new_docs)
combined = np.vstack([old_embeddings, new_embeds])
# 降维可视化
pca = PCA(n_components=2)
reduced = pca.fit_transform(combined)
# 计算分布差异
old_center = np.mean(reduced[:len(old_embeddings)], axis=0)
new_center = np.mean(reduced[len(old_embeddings):], axis=0)
distance = np.linalg.norm(old_center - new_center)
return distance > threshold
一个健壮的RAG系统需要完善的监控体系:
性能监控
质量监控
异常检测
我们使用的监控看板包含以下关键指标:
code复制- 实时QPS
- 平均延迟(P50/P95/P99)
- 检索召回率(滑动窗口)
- 生成质量评分(用户反馈)
- 知识库覆盖率
在电商领域,我们扩展了传统RAG以处理图像和文本混合查询:
跨模态检索
多模态生成
python复制import clip
# 初始化多模态模型
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)
# 跨模态检索
def image_to_text_search(query_image, text_db, top_k=3):
image_features = model.encode_image(preprocess(query_image).unsqueeze(0).to(device))
text_features = model.encode_text(clip.tokenize(text_db).to(device))
similarities = (image_features @ text_features.T).squeeze(0)
return [text_db[i] for i in similarities.argsort(descending=True)[:top_k]]
为了让RAG系统能够持续进化,我们设计了以下机制:
用户反馈闭环
自动知识更新
模型迭代
python复制class ContinuousLearner:
def __init__(self, rag_system):
self.system = rag_system
self.feedback_db = FeedbackDatabase()
def process_feedback(self, query, user_feedback):
# 记录反馈
self.feedback_db.add(query, user_feedback)
# 触发重新训练条件
if self.feedback_db.needs_retraining():
self.retrain_system()
def retrain_system(self):
# 准备训练数据
train_data = self.feedback_db.get_training_examples()
# 微调检索器
self.system.retriever.train(train_data)
# 微调生成器
self.system.generator.fine_tune(train_data)
在最近的一个金融客服项目中,通过持续学习机制,系统准确率在3个月内从78%提升到了92%。关键是在每次更新后都保留了完整的版本控制,确保可以快速回滚。