在当今信息爆炸的时代,如何高效管理和利用企业内部知识资产成为关键挑战。基于LangGraph的RAG(检索增强生成)系统提供了一种智能解决方案,能够将企业文档转化为可交互的知识库。这套系统不仅能理解自然语言提问,还能从海量文档中精准定位相关信息,生成专业、准确的回答。
与传统知识库相比,RAG系统具有三大核心优势:
本方案特别适合以下场景:
一个完整的RAG系统由五个关键组件构成:
工作流程如下图所示:
code复制[用户提问] → [检索相关文档] → [组合上下文] → [生成回答] → [返回结果]
在选择各组件技术方案时,需要考虑以下因素:
| 组件 | 选项 | 适用场景 | 优缺点 |
|---|---|---|---|
| 文档加载器 | Unstructured | 复杂文档 | 解析能力强但速度慢 |
| 文本分块 | RecursiveSplitter | 通用场景 | 保持语义连贯性 |
| 向量模型 | OpenAI/text-embedding-3 | 云端方案 | 效果好但需联网 |
| 向量数据库 | Chroma | 轻量级 | 易于部署但规模有限 |
| LLM | GPT-4 | 高质量回答 | 成本较高 |
提示:生产环境中建议根据数据敏感性和规模选择本地或云端方案。金融等敏感行业优先考虑本地部署的开源模型。
首先需要配置Python环境并安装必要依赖:
bash复制# 创建虚拟环境
python -m venv rag_env
source rag_env/bin/activate # Linux/Mac
rag_env\Scripts\activate # Windows
# 安装核心依赖
pip install langgraph langchain langchain-openai chromadb unstructured
对于特定文件格式支持,还需安装额外依赖:
bash复制# PDF处理
pip install pypdf pdf2image
# Office文档处理
pip install python-docx openpyxl
# 网页抓取
pip install beautifulsoup4 html2text
文档加载是RAG系统的第一步,需要根据文件类型选择合适的加载器:
python复制from langchain.document_loaders import (
PyPDFLoader,
UnstructuredWordLoader,
CSVLoader,
WebBaseLoader
)
# PDF文档加载
pdf_loader = PyPDFLoader("产品手册.pdf")
pdf_docs = pdf_loader.load()
# Word文档加载
docx_loader = UnstructuredWordLoader("合同模板.docx")
docx_docs = docx_loader.load()
# 网页内容抓取
web_loader = WebBaseLoader(["https://example.com/faq"])
web_docs = web_loader.load()
注意事项:处理扫描版PDF时需要使用OCR技术,推荐结合pdf2image和pytesseract实现。
合理的文本分块对检索效果至关重要:
python复制from langchain.text_splitter import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # 每块约500字符
chunk_overlap=100, # 块间重叠100字符
separators=["\n\n", "\n", "。", " ", ""] # 分割优先级
)
split_docs = text_splitter.split_documents(docs)
分块大小建议:
将文本转化为向量并存入数据库:
python复制from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
# 使用OpenAI的嵌入模型
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# 创建向量存储
vector_db = Chroma.from_documents(
documents=split_docs,
embedding=embeddings,
persist_directory="./chroma_db"
)
# 持久化保存
vector_db.persist()
对于本地部署方案,可替换为开源嵌入模型:
python复制from langchain.embeddings import HuggingFaceEmbeddings
embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-small-zh-v1.5",
model_kwargs={'device': 'cuda'} # 使用GPU加速
)
LangGraph通过状态机模型控制流程,首先定义状态结构:
python复制from typing import TypedDict, List
from langchain_core.messages import HumanMessage
class RAGState(TypedDict):
messages: List[HumanMessage] # 对话历史
question: str # 当前问题
context: List[str] # 检索到的上下文
answer: str # 生成的回答
然后实现各个功能节点:
python复制from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
def retrieve_node(state: RAGState):
"""检索节点"""
question = state["question"]
# 执行检索
retriever = vector_db.as_retriever(search_kwargs={"k": 3})
documents = retriever.invoke(question)
# 提取文本内容
context = [doc.page_content for doc in documents]
return {"context": context}
def generate_node(state: RAGState):
"""生成节点"""
prompt = ChatPromptTemplate.from_template("""
你是一个专业的知识库助手,请根据以下上下文回答问题。
如果上下文不包含答案,请回答"我不知道"。
上下文:
{context}
问题:
{question}
""")
chain = (
RunnablePassthrough.assign(context=lambda x: "\n\n".join(x["context"]))
| prompt
| llm
)
response = chain.invoke(state)
return {"answer": response.content}
将节点组合成完整的工作流:
python复制from langgraph.graph import StateGraph, END
# 创建图
workflow = StateGraph(RAGState)
# 添加节点
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
# 设置入口
workflow.set_entry_point("retrieve")
# 定义边
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", END)
# 编译
rag_app = workflow.compile()
LangGraph支持复杂流程控制,例如条件分支:
python复制from langgraph.graph import StateGraph, END
def should_retry(state: RAGState):
"""判断是否需要重新检索"""
return "不清楚" in state["answer"] or "我不知道" in state["answer"]
# 创建带条件分支的图
workflow = StateGraph(RAGState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_node("refine_question", refine_question_node)
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
# 条件边
workflow.add_conditional_edges(
"generate",
should_retry,
{
True: "refine_question",
False: END
}
)
workflow.add_edge("refine_question", "retrieve")
python复制from langchain.document_loaders import DirectoryLoader
# 批量加载整个目录下的PDF
loader = DirectoryLoader(
"./docs/",
glob="**/*.pdf",
loader_cls=PyPDFLoader,
show_progress=True
)
python复制import asyncio
from langchain.text_splitter import RecursiveCharacterTextSplitter
async def async_split_documents(docs):
splitter = RecursiveCharacterTextSplitter()
return await splitter.atransform_documents(docs)
python复制from langchain.cache import SQLiteCache
import langchain
# 启用SQLite缓存
langchain.llm_cache = SQLiteCache(database_path=".langchain.db")
python复制# 混合检索策略
from langchain.retrievers import BM25Retriever, EnsembleRetriever
bm25_retriever = BM25Retriever.from_documents(docs)
vector_retriever = vector_db.as_retriever()
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, vector_retriever],
weights=[0.4, 0.6]
)
python复制from langchain.retrievers.document_compressors import LLMChainRerank
compressor = LLMChainRerank(
llm=llm,
prompt="对以下文档与问题的相关性进行评分(1-10):\n问题:{query}\n文档:{document}"
)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=vector_retriever
)
python复制def validate_answer(state: RAGState):
"""验证回答质量"""
answer = state["answer"]
if "不确定" in answer or "我不知道" in answer:
return {"valid": False}
return {"valid": True}
python复制import logging
from datetime import datetime
logging.basicConfig(filename='rag_system.log', level=logging.INFO)
def log_interaction(question, answer, context):
logging.info(f"""
[{datetime.now()}]
问题: {question}
上下文: {context}
回答: {answer}
""")
python复制from sklearn.metrics import precision_score, recall_score
def evaluate_retrieval(questions, ground_truth):
"""评估检索效果"""
retrieved = [retriever.invoke(q) for q in questions]
precision = precision_score(ground_truth, retrieved)
recall = recall_score(ground_truth, retrieved)
return {"precision": precision, "recall": recall}
python复制import schedule
import time
def update_knowledge_base():
"""定时更新知识库"""
# 重新加载和处理文档
...
# 每天凌晨3点更新
schedule.every().day.at("03:00").do(update_knowledge_base)
while True:
schedule.run_pending()
time.sleep(60)
问题1:检索结果不相关
python复制retriever = vector_db.as_retriever(
filter={"department": "technical"} # 按元数据过滤
)
问题2:检索速度慢
python复制from langchain.vectorstores import FAISS
faiss_db = FAISS.from_documents(docs, embeddings)
faiss_db.save_local("faiss_index")
python复制db = Chroma.from_documents(
docs,
embeddings,
client_settings=Settings(anonymized_telemetry=False),
collection_metadata={"hnsw:space": "cosine"} # 启用HNSW索引
)
问题1:回答与上下文不符
python复制prompt = """你必须严格基于以下上下文回答,禁止编造信息:
上下文:{context}
问题:{question}
"""
python复制def add_citations(answer, sources):
return f"{answer}\n\n参考资料:\n" + "\n".join(sources)
问题2:回答过于冗长
python复制llm = ChatOpenAI(
model="gpt-4",
max_tokens=300 # 限制回答长度
)
python复制prompt = """请用不超过100字回答以下问题..."""
问题1:内存不足
python复制embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-small-zh-v1.5",
model_kwargs={'device': 'cpu'},
encode_kwargs={'normalize_embeddings': True}
)
python复制from langchain.vectorstores import Milvus
milvus_db = Milvus.from_documents(
docs,
embeddings,
connection_args={"host": "127.0.0.1", "port": "19530"},
collection_name="knowledge_shard1"
)
问题2:API调用限制
python复制from ratelimit import limits, sleep_and_retry
@sleep_and_retry
@limits(calls=30, period=60)
def call_llm_api(prompt):
# API调用代码
...
python复制from langchain_community.llms import Ollama
llm = Ollama(model="llama3", temperature=0)
实现根据问题自动选择知识库:
python复制def route_question(state: RAGState):
"""路由问题到不同知识库"""
question = state["question"]
if "技术" in question:
return {"knowledge_base": "technical"}
elif "财务" in question:
return {"knowledge_base": "financial"}
else:
return {"knowledge_base": "general"}
# 在检索节点中
def retrieve_node(state: RAGState):
kb = state["knowledge_base"]
if kb == "technical":
retriever = tech_db.as_retriever()
elif kb == "financial":
retriever = finance_db.as_retriever()
...
支持多轮对话上下文:
python复制class ConversationState(TypedDict):
messages: List[Union[HumanMessage, AIMessage]]
question: str
context: List[str]
def chat_node(state: ConversationState):
"""处理对话历史"""
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业助手"),
MessagesPlaceholder(variable_name="messages"),
("human", "{question}")
])
chain = prompt | llm
response = chain.invoke(state)
return {"messages": state["messages"] + [response]}
结合外部数据源增强回答:
python复制import requests
def query_product_api(product_id):
"""调用产品API获取实时数据"""
response = requests.get(f"https://api.example.com/products/{product_id}")
return response.json()
def enhance_with_api_data(state: RAGState):
"""用API数据增强回答"""
answer = state["answer"]
# 提取产品ID
product_ids = extract_product_ids(answer)
# 查询API
api_data = [query_product_api(pid) for pid in product_ids]
# 增强回答
enhanced_answer = answer + "\n\n实时数据:\n" + format_api_data(api_data)
return {"answer": enhanced_answer}
在实际部署RAG系统时,建议从简单场景开始,逐步扩展功能。初期可以重点关注检索质量优化,这是整个系统效果的基础。随着系统成熟,再逐步添加对话管理、多知识库集成等高级功能。