最近在开发基于大语言模型的RAG(检索增强生成)系统时,深刻体会到文档处理环节的重要性。就像盖房子需要打好地基一样,TextLoader和文本分割器的选择直接决定了后续检索和生成的质量。今天想和大家分享我在LangChain框架下使用TextLoader加载文档,以及用RecursiveCharacterTextSplitter进行文本分割的实战经验。
这个教程特别适合两类开发者:一是刚接触RAG技术栈,正在搭建第一个知识库应用的初学者;二是已经实现基础流程,但想优化文档处理效果的中级开发者。通过合理配置文本分割器,我们能让大模型更精准地理解文档上下文,显著提升问答系统的准确率。
TextLoader是LangChain中最基础的文档加载器之一,它就像个智能文件读取器。我常用它来处理.txt、.md等纯文本文件。实际使用时发现几个关键点:
编码自动检测:当遇到中文文档时,建议显式指定encoding参数(如'utf-8'),避免自动检测失败。上周处理一批GBK编码的历史档案时就踩过坑。
元数据保留:虽然加载的是纯文本,但可以通过metadata参数添加文档来源、作者等信息。这些元数据在后续的检索环节非常有用。
python复制from langchain.document_loaders import TextLoader
# 最佳实践:显式指定编码并添加元数据
loader = TextLoader("金融报告.txt", encoding="utf-8",
metadata={"source": "2023年报", "department": "财务部"})
documents = loader.load()
原始文档往往过长(比如100页的PDF),直接喂给大模型会有几个问题:
这就好比让你背诵整本百科全书来回答问题,不如直接查阅相关章节高效。文本分割就是帮大模型"划重点"的过程。
这个分割器的设计非常巧妙,就像用不同尺寸的筛子层层过滤:
这种递归方式能最大限度保持语义完整性。我在处理技术文档时,发现它比普通字符分割器保留的代码片段更完整。
经过20+项目的实践,总结出这些黄金参数组合:
python复制from langchain.text_splitter import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # 每个片段的目标字符数
chunk_overlap=100, # 片段间重叠字符数
length_function=len, # 长度计算函数
separators=["\n\n", "\n", "。", ";", ",", " ", ""] # 中文需添加特定分隔符
)
特别说明几个易错点:
下面是我在金融问答系统中使用的完整处理流程:
python复制# 1. 文档加载
financial_loader = TextLoader("2023_Q4_earnings.txt",
encoding="utf-8",
metadata={"report_type": "quarterly"})
docs = financial_loader.load()
# 2. 文本分割
financial_splitter = RecursiveCharacterTextSplitter(
chunk_size=400,
chunk_overlap=80,
separators=["\n\n", "\n", "。", ":", ";", " ", ""]
)
splits = financial_splitter.split_documents(docs)
# 3. 向量化存储(以Chroma为例)
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
vectorstore = Chroma.from_documents(
documents=splits,
embedding=OpenAIEmbeddings()
)
分割后一定要人工检查样本:
我在电商知识库项目中就发现,产品参数表格被错误分割会导致价格信息丢失。后来通过调整separators参数,添加了"|"作为表格分隔符。
对于复杂文档(含代码、表格、文字),可以采用:
python复制from langchain.text_splitter import MarkdownHeaderTextSplitter
markdown_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[("#", "Header 1"), ("##", "Header 2")]
)
md_splits = markdown_splitter.split_text(markdown_content)
# 二次分割
final_splits = []
for split in md_splits:
final_splits += recursive_splitter.split_documents([split])
根据内容类型自动调整参数:
可以通过文档的metadata字段实现条件判断:
python复制def dynamic_splitter(doc):
if "contract" in doc.metadata.get("doc_type", ""):
return small_splitter.split_documents([doc])
else:
return large_splitter.split_documents([doc])
症状:中文句子被拦腰截断
解决方案:
症状:Python代码失去缩进
处理方法:
症状:CSV数据分散在不同chunk
优化方案:
当处理1000+文档时:
python复制from multiprocessing import Pool
def process_file(file_path):
loader = TextLoader(file_path)
# ...处理逻辑...
with Pool(4) as p:
results = p.map(process_file, file_list)
大文件处理时的注意事项:
python复制def batch_split(docs, batch_size=100):
for i in range(0, len(docs), batch_size):
batch = docs[i:i + batch_size]
yield from splitter.split_documents(batch)
经过这些优化,我们在处理10GB规模的金融报告时,处理时间从8小时缩短到45分钟。