在信息爆炸的时代,如何高效地从海量文档中提取有价值的信息成为技术团队面临的共同挑战。LlamaIndex作为当前最受欢迎的检索增强生成(RAG)框架之一,其文档连接器(Data Connectors)功能正是解决这一痛点的利器。Simple Directory Reader作为LlamaIndex内置的基础文档连接器,虽然结构简单,却涵盖了文档处理的完整链路,是理解RAG系统数据预处理环节的最佳切入点。
我在多个企业级知识管理系统中实施过RAG方案,发现90%的落地问题都出现在数据接入阶段——格式混乱的PDF、编码各异的HTML、版本繁杂的Office文档,这些"脏数据"直接影响了后续的检索和生成质量。通过剖析这个看似简单的目录读取器,我们可以掌握文档标准化的核心方法论。
Simple Directory Reader的实现基于模块化设计思想,主要包含三个关键组件:
input_files参数精确指定文件列表python复制# 典型初始化配置
from llama_index.core import SimpleDirectoryReader
reader = SimpleDirectoryReader(
input_dir="path/to/docs",
recursive=True,
exclude_hidden=True,
required_exts=[".pdf", ".md"]
)
文件加载器(File Loader):
文档标准化器(Document Normalizer):
多线程加载优化:
当处理包含数百个文件的目录时,串行加载会导致明显延迟。我们在生产环境中通过改造load_data方法实现了并行加载:
python复制from concurrent.futures import ThreadPoolExecutor
def parallel_load(self):
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(self._load_file, file)
for file in self.input_files]
return [future.result() for future in futures]
编码自动检测机制:
对于文本文件,采用chardet库进行编码探测,配合fallback机制确保不会因编码问题中断流程:
python复制def detect_encoding(file_path):
with open(file_path, 'rb') as f:
rawdata = f.read(1024) # 采样前1KB内容
result = chardet.detect(rawdata)
return result['encoding'] or 'utf-8'
内存管理技巧:
处理大型PDF文档时容易引发内存溢出,我们通过流式读取解决:
python复制from pypdf import PdfReader
def stream_pdf(file_path):
with open(file_path, "rb") as f:
reader = PdfReader(f)
for page in reader.pages:
yield page.extract_text() # 逐页生成
缓存策略:
对频繁访问的目录实现本地缓存,减少重复解析开销:
python复制from diskcache import Cache
cache = Cache("~/.llama_cache")
@cache.memoize()
def load_cached(file_path):
return SimpleDirectoryReader._load_file(file_path)
根据线上运维数据,文档处理中最常见的三类问题及解决方案:
python复制try:
doc = loader.load(file_path)
except (PDFSyntaxError, DocxError) as e:
logger.warning(f"损坏文件跳过: {file_path}")
return None
python复制text = raw_text.encode('ascii', errors='ignore').decode('ascii')
python复制import hashlib
def safe_filename(original):
return hashlib.md5(original.encode()).hexdigest()[:8]
通过继承BaseReader类实现自定义处理器:
python复制from llama_index.core.readers.base import BaseReader
class ExcelReader(BaseReader):
def load_data(self, file):
import pandas as pd
df = pd.read_excel(file)
return [Document(text=str(row)) for row in df.itertuples()]
在金融领域文档处理中,我们扩展了元数据字段:
python复制def enrich_metadata(doc):
doc.metadata["department"] = classify_department(doc.text)
doc.metadata["security_level"] = detect_sensitivity(doc.text)
return doc
建议在生产环境实现以下监控指标:
| 指标名称 | 计算方式 | 告警阈值 |
|---|---|---|
| 文件解析成功率 | 成功数/总数 × 100% | < 95% |
| 平均处理时延 | 总耗时/文件数 | > 500ms |
| 内存峰值消耗 | max(rss) during processing | > 1GB |
实现Prometheus监控的示例:
python复制from prometheus_client import Gauge
parse_success = Gauge('doc_parse_success', '文件解析成功数')
parse_failure = Gauge('doc_parse_failure', '文件解析失败数')
def instrumented_load(file):
try:
result = original_load(file)
parse_success.inc()
return result
except Exception:
parse_failure.inc()
raise
我们在银行知识库项目中遇到的真实案例:
问题现象:
PDF中的表格内容提取为乱码
排查过程:
解决方案:
python复制from pdfminer.high_level import extract_text
text = extract_text(
file_path,
codec='utf-8',
laparams={'detect_vertical': True}
)
性能对比数据:
| 方法 | 100页PDF耗时 | 内存占用 |
|---|---|---|
| 原生PyPDF2 | 12.4s | 870MB |
| pdfminer.six优化版 | 8.7s | 1.2GB |
| 我们的流式处理 | 6.2s | 210MB |
根据我们在医疗行业的实施经验,下一步改进可关注:
python复制from transformers import pipeline
classifier = pipeline("text-classification")
def auto_categorize(doc):
result = classifier(doc.text[:512])
doc.metadata["category"] = result[0]['label']
return doc
python复制import pytesseract
def extract_scanned(file):
img = pdf2image.convert_from_path(file)[0]
return pytesseract.image_to_string(img)
python复制def get_file_hash(file):
with open(file, "rb") as f:
return hashlib.md5(f.read()).hexdigest()