去年我在重构公司知识库系统时,第一次真正体会到玩具RAG和工程化RAG的天壤之别。当时我们用LangChain快速搭建的Demo在测试集上表现不错,但上线第一天就遭遇了灾难——高峰期响应时间超过5秒,错误率高达30%,最致命的是有用户反馈系统经常给出与问题完全无关的回答。
我们最初版本的代码与常见的玩具RAG如出一辙:
python复制from langchain.document_loaders import WebBaseLoader
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.chat_models import ChatOpenAI
loader = WebBaseLoader(["https://example.com/knowledge"])
docs = loader.load()
vectorstore = Chroma.from_documents(
documents=docs,
embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
llm = ChatOpenAI(temperature=0)
def naive_rag(question):
docs = retriever.get_relevant_documents(question)
return llm(f"基于以下上下文回答问题:\n{docs}\n\n问题:{question}")
这个实现存在几个致命问题:
经过三个月的重构,我们的系统最终实现了以下关键能力:
我们的生产系统采用明确的分层架构:
code复制┌───────────────────────────────────────┐
│ API层 │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ REST API │ │ WebSocket │ │
│ └─────────────┘ └─────────────┘ │
└───────────────────────────────────────┘
↓
┌───────────────────────────────────────┐
│ 业务逻辑层 │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ 流水线引擎 │ │ 缓存管理 │ │
│ └─────────────┘ └─────────────┘ │
└───────────────────────────────────────┘
↓
┌───────────────────────────────────────┐
│ 数据访问层 │
│ ┌───────┐ ┌───────┐ ┌─────────────┐ │
│ │向量库 │ │BM25索引│ │ 知识图谱 │ │
│ └───────┘ └───────┘ └─────────────┘ │
└───────────────────────────────────────┘
我们的流水线引擎主要处理流程如下:
python复制class RAGPipeline:
def __init__(self):
self.query_analyzer = QueryAnalyzer()
self.multi_retriever = MultiChannelRetriever()
self.reranker = CrossEncoderReranker()
self.generator = SafeGenerator()
self.validator = AnswerValidator()
async def process(self, question: str) -> RAGResponse:
# 查询分析阶段
parsed_query = await self.query_analyzer.analyze(question)
# 多路召回阶段
retrieved = await self.multi_retriever.retrieve(
parsed_query,
top_k=50
)
# 重排序阶段
reranked = await self.reranker.rerank(
question,
retrieved,
top_k=5
)
# 生成阶段
generated = await self.generator.generate(
question,
reranked
)
# 验证阶段
validated = await self.validator.validate(
question,
generated,
reranked
)
return RAGResponse(
question=question,
answer=validated.answer,
sources=validated.sources,
metrics=validated.metrics
)
我们采用的三路混合检索方案:
python复制class HybridRetriever:
def __init__(self):
self.dense_retriever = DenseRetriever()
self.sparse_retriever = SparseRetriever()
self.kg_retriever = KnowledgeGraphRetriever()
self.fusion_algorithm = ReciprocalRankFusion()
async def retrieve(self, query: ParsedQuery, top_k: int):
# 并行执行三路召回
dense_task = self.dense_retriever.retrieve(query, top_k*3)
sparse_task = self.sparse_retriever.retrieve(query, top_k*3)
kg_task = self.kg_retriever.retrieve(query, top_k)
results = await asyncio.gather(dense_task, sparse_task, kg_task)
# 结果融合
fused = self.fusion_algorithm.fuse(
results[0], results[1], results[2],
weights=[0.4, 0.4, 0.2],
top_k=top_k
)
return fused
我们实现的查询理解模块包含以下关键功能:
python复制class QueryAnalyzer:
def __init__(self):
self.rewriter = T5Rewriter()
self.intent_classifier = BertIntentClassifier()
self.entity_extractor = HybridEntityExtractor()
async def analyze(self, raw_query: str) -> ParsedQuery:
# 并行执行各项分析
rewrite_task = self.rewriter.rewrite(raw_query)
intent_task = self.intent_classifier.classify(raw_query)
entity_task = self.entity_extractor.extract(raw_query)
rewritten, intent, entities = await asyncio.gather(
rewrite_task, intent_task, entity_task
)
return ParsedQuery(
original=raw_query,
rewritten=rewritten,
intent=intent,
entities=entities,
timestamp=time.time()
)
我们开发的分块引擎能够根据文档类型自动选择最佳分块策略:
python复制class SmartChunker:
STRATEGIES = {
"code": CodeChunker(),
"paper": AcademicChunker(),
"legal": LegalChunker(),
"general": RecursiveChunker()
}
def chunk(self, document: Document) -> List[Chunk]:
doc_type = self._detect_type(document)
chunker = self.STRATEGIES.get(doc_type, self.STRATEGIES["general"])
chunks = chunker.chunk(document)
# 添加全局唯一ID和元数据
for i, chunk in enumerate(chunks):
chunk.id = f"{document.id}_{i}"
chunk.metadata.update({
"doc_type": doc_type,
"chunk_strategy": chunker.__class__.__name__,
"position": i
})
return chunks
def _detect_type(self, document: Document) -> str:
content = document.content
if "```" in content or "def " in content:
return "code"
elif "abstract" in content.lower() and "references" in content.lower():
return "paper"
elif any(term in content.lower() for term in ["条款", "第.*条", "法律"]):
return "legal"
return "general"
我们在不同分块策略下的召回率对比:
| 分块策略 | 技术文档 | 法律条文 | 学术论文 | 平均 |
|---|---|---|---|---|
| 固定512字符 | 58% | 62% | 51% | 57% |
| 按段落分块 | 65% | 78% | 63% | 69% |
| 递归分块 | 72% | 75% | 68% | 72% |
| 自适应分块(本方案) | 89% | 92% | 83% | 88% |
我们通过以下措施将P99延迟从3.2s降低到680ms:
分级缓存系统:
异步并行处理:
python复制async def process_query(query):
# 并行执行独立任务
analyze_task = query_analyzer.analyze(query)
cache_task = cache.get(query)
parsed_query, cached = await asyncio.gather(analyze_task, cache_task)
if cached:
return cached
# 继续处理...
python复制# 低效的单条处理
for query in queries:
embedding = model.encode(query)
# 优化的批量处理
batch_embeddings = model.encode(queries)
我们建立的监控看板包含以下核心指标:
检索指标:
生成指标:
系统指标:
我们设计的测试框架包含:
python复制class RAGTestRunner:
def __init__(self):
self.test_cases = self._load_test_cases()
self.evaluator = RAGEvaluator()
async def run_tests(self):
results = []
for case in self.test_cases:
response = await pipeline.process(case.question)
metrics = self.evaluator.evaluate(
question=case.question,
response=response,
expected=case.expected_answer
)
results.append(metrics)
self._generate_report(results)
测试用例覆盖:
我们建立的Bad Case分析机制:
在实际落地过程中,我们积累了以下关键经验:
不要过度依赖单一检索方式:
分块策略需要领域适配:
监控比想象中更重要:
缓存设计需要多级考虑:
测试框架要尽早建立:
在项目上线半年后,我们的系统日均处理查询量达到15万次,平均响应时间保持在800ms以内,用户满意度评分从最初的3.2提升到4.5(满分5分)。这个过程中最深的体会是:工程化RAG不是简单的技术堆砌,而是需要建立完整的系统思维,从检索质量、生成效果、系统性能等多个维度进行综合考量。