在LangChain生态中处理文档时,我们经常面临一个典型问题:不同格式的文件(PDF、Markdown、Word等)需要不同的解析逻辑,但它们的加载方式却大同小异。传统做法是为每种文件类型单独实现加载器和解析器,导致大量重复代码。Blob方案的引入正是为了解决这种"加载与解析强耦合"的问题。
Blob的设计灵感源自Web API中的Blob规范,它将数据加载和解析分离为三个核心组件:
这种分层设计带来几个关键优势:
提示:虽然当前LangChain对Blob方案的支持还比较基础,但理解这个设计模式对未来扩展自定义文件处理流程非常有帮助。
Blob类是整个方案的基础,它封装了原始数据及其元信息。关键属性和方法包括:
python复制class Blob:
def __init__(
self,
data: Union[str, bytes], # 原始数据
mimetype: Optional[str] = None, # 文件类型如'application/pdf'
encoding: str = "utf-8", # 文本编码
metadata: Optional[dict] = None # 元数据字典
):
...
@property
def source(self) -> str: # 数据来源标识
...
def as_bytes(self) -> bytes: # 获取字节数据
...
def as_string(self) -> str: # 获取文本数据
...
def as_bytes_io(self) -> BytesIO: # 获取字节流(用于逐行读取)
...
@classmethod
def from_path(cls, path: Union[str, Path]) -> "Blob": # 从文件创建
...
@classmethod
def from_data(cls, data: Union[str, bytes]) -> "Blob": # 从内存数据创建
...
实际使用时,根据数据来源选择适当的构造方式:
python复制# 从文件创建
pdf_blob = Blob.from_path("report.pdf")
# 从内存数据创建
text_blob = Blob(data="Hello\nWorld", mimetype="text/plain")
# 访问数据内容
with pdf_blob.as_bytes_io() as stream:
first_line = stream.readline() # 流式读取
BaseBlobParser是所有解析器的基类,核心是实现lazy_parse方法:
python复制from typing import Iterator
from langchain_core.documents import Document
class BaseBlobParser(ABC):
@abstractmethod
def lazy_parse(self, blob: Blob) -> Iterator[Document]:
"""流式解析Blob数据为Document迭代器"""
自定义解析器示例 - 处理CSV文件:
python复制import csv
class CSVParser(BaseBlobParser):
def lazy_parse(self, blob: Blob) -> Iterator[Document]:
with blob.as_bytes_io() as stream:
reader = csv.reader(TextIOWrapper(stream, encoding=blob.encoding))
headers = next(reader) # 读取表头
for i, row in enumerate(reader):
yield Document(
page_content="\n".join(f"{h}:{v}" for h,v in zip(headers, row)),
metadata={
"source": blob.source,
"row_num": i,
"headers": headers
}
)
BlobLoader抽象类定义了数据加载接口:
python复制class BlobLoader(ABC):
@abstractmethod
def yield_blobs(self) -> Iterable[Blob]:
"""生成Blob对象的迭代器"""
文件系统加载器示例:
python复制from pathlib import Path
class FileSystemBlobLoader(BlobLoader):
def __init__(self, path: str, glob_pattern: str = "*"):
self.path = Path(path)
self.glob_pattern = glob_pattern
def yield_blobs(self) -> Iterable[Blob]:
for file_path in self.path.glob(self.glob_pattern):
if file_path.is_file():
yield Blob.from_path(file_path)
假设我们需要解析Markdown文件,并提取所有二级标题及其后续内容:
python复制import re
from typing import Iterator
class MarkdownSectionParser(BaseBlobParser):
def lazy_parse(self, blob: Blob) -> Iterator[Document]:
content = blob.as_string()
pattern = r"(## .+?)(?=## |\Z)"
for i, match in enumerate(re.finditer(pattern, content, re.DOTALL)):
yield Document(
page_content=match.group(1).strip(),
metadata={
"source": blob.source,
"section_num": i,
"type": "markdown_section"
}
)
使用组合:
python复制loader = FileSystemBlobLoader("./docs", "*.md")
parser = MarkdownSectionParser()
for blob in loader.yield_blobs():
for doc in parser.lazy_parse(blob):
print(f"提取章节: {doc.metadata['section_num']}")
print(doc.page_content[:50] + "...")
从MySQL数据库加载BLOB字段的示例:
python复制import mysql.connector
from mysql.connector import Error
class DatabaseBlobLoader(BlobLoader):
def __init__(self, query: str, db_config: dict):
self.query = query
self.db_config = db_config
def yield_blobs(self) -> Iterable[Blob]:
try:
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor()
cursor.execute(self.query)
for (blob_data, filename) in cursor:
yield Blob(
data=blob_data,
metadata={"source": f"db://{filename}"}
)
except Error as e:
print(f"数据库错误: {e}")
finally:
cursor.close()
conn.close()
LangChain提供了GenericLoader作为快捷方式:
python复制from langchain_community.document_loaders.generic import GenericLoader
from langchain_community.document_loaders.blob_loaders import FileSystemBlobLoader
# 加载所有PDF文件
pdf_loader = GenericLoader.from_filesystem(
path="./documents",
glob="*.pdf",
parser=PDFParser() # 假设已实现
)
# 加载并立即解析所有文档
docs = list(pdf_loader.lazy_load())
处理大文件时需要特别注意内存使用:
始终优先使用流式处理:
python复制# 推荐 - 流式读取
with blob.as_bytes_io() as f:
for line in f:
process(line)
# 避免 - 全量读取
content = blob.as_string() # 大文件会占用大量内存
分批处理文档:
python复制for i, doc in enumerate(parser.lazy_parse(blob)):
process(doc)
if i % 100 == 0:
gc.collect() # 定期垃圾回收
| 问题现象 | 可能原因 | 解决方案 |
|---|---|---|
| 编码错误 | 文件实际编码与声明不符 | 使用chardet检测实际编码:blob = Blob(..., encoding=detected_encoding) |
| 解析部分内容丢失 | 解析器未处理二进制格式 | 确保先调用as_bytes()或as_bytes_io()处理二进制数据 |
| 内存溢出 | 同时加载过多大文件 | 使用yield_blobs()逐个处理,避免列表缓存所有Blob |
| 元数据缺失 | 未正确设置metadata参数 | 在Blob构造时或解析器中补充完整元数据 |
检查Blob原始内容:
python复制blob = Blob.from_path("data.bin")
print(f"Size: {len(blob.as_bytes())} bytes")
print(f"First 100 bytes: {blob.as_bytes()[:100]}")
验证解析器逻辑:
python复制class DebugParser(BaseBlobParser):
def lazy_parse(self, blob: Blob) -> Iterator[Document]:
print(f"Parsing blob from {blob.source}")
yield Document(
page_content="DEBUG",
metadata={"raw_size": len(blob.as_bytes())}
)
性能分析:
python复制from time import time
start = time()
loader = FileSystemBlobLoader("./large_dir")
for blob in loader.yield_blobs():
parse_time = time()
list(parser.lazy_parse(blob))
print(f"{blob.source}: {time() - parse_time:.2f}s")
print(f"Total: {time() - start:.2f}s")
根据文件类型自动选择解析器:
python复制class SmartParser(BaseBlobParser):
def __init__(self):
self.pdf_parser = PDFParser()
self.txt_parser = TextParser()
def lazy_parse(self, blob: Blob) -> Iterator[Document]:
if blob.mimetype == "application/pdf":
yield from self.pdf_parser.lazy_parse(blob)
else:
# 默认文本处理
yield from self.txt_parser.lazy_parse(blob)
结合Celery实现分布式解析:
python复制from celery import Celery
app = Celery('parser_worker')
@app.task
def parse_blob(blob_data: bytes, source: str) -> list[dict]:
blob = Blob(data=blob_data, metadata={"source": source})
parser = get_appropriate_parser(blob)
return [
{"content": doc.page_content, "meta": doc.metadata}
for doc in parser.lazy_parse(blob)
]
# 主节点
loader = S3BlobLoader(bucket="my-docs")
for blob in loader.yield_blobs():
parse_blob.delay(blob.as_bytes(), blob.source)
将Blob处理流程接入LangChain整体架构:
python复制from langchain.chains import AnalyzeDocumentChain
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 自定义处理管道
def blob_processing_pipeline(blob: Blob) -> list[Document]:
# 1. 解析原始内容
raw_docs = list(MarkdownParser().lazy_parse(blob))
# 2. 文本分割
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
# 3. 返回处理后的文档
return splitter.split_documents(raw_docs)
# 创建分析链
analyze_chain = AnalyzeDocumentChain(
document_loader=FileSystemBlobLoader("./reports"),
document_processor=blob_processing_pipeline,
analysis_chain=load_qa_chain(...)
)
在实际项目中,Blob方案特别适合以下场景:
随着LangChain对Blob方案的持续完善,预计未来会出现更多内置的BlobLoader和BlobParser实现,进一步简化文档处理流程的开发。对于当前版本,理解这些核心概念和设计模式,能够帮助开发者构建更灵活、高效的文档处理组件。