1. LangChain 中的 Runnable 概念解析
在 LangChain 生态系统中,Runnable 是最基础也是最重要的抽象接口之一。简单来说,任何可以被调用(invoke)或流式处理(stream)的组件都应该实现 Runnable 接口。这就像是我们日常使用的电器插头标准 - 无论什么品牌的电器,只要符合插头标准就能接入电网工作。
Runnable 的设计哲学体现了几个关键特性:
-
统一接口规范:所有实现了 Runnable 的组件都提供一致的调用方法(invoke/ainvoke)和流式处理方法(stream/astream)。这种一致性使得不同组件可以无缝衔接,就像乐高积木一样能够自由组合。
-
链式组合能力:通过
|运算符(管道操作符),多个 Runnable 可以轻松连接成处理链。这种设计借鉴了 Unix 管道的思想,让数据能够像流水线一样在不同组件间传递。 -
配置灵活性:每个 Runnable 都支持运行时配置(RunnableConfig),这让我们可以在不修改代码的情况下调整组件行为,比如设置超时时间、添加重试逻辑等。
实际开发中,我经常遇到需要临时开启调试日志的情况。通过 RunnableConfig 的 metadata 功能,可以动态控制日志级别,而不需要重启应用。
2. 为什么需要自定义 Runnable
虽然 LangChain 提供了丰富的内置组件,但在实际业务场景中,我们总会遇到需要定制化处理的情况。根据我的项目经验,以下五种场景最常需要自定义 Runnable:
2.1 数据预处理/后处理
大型语言模型(LLM)的输入输出往往需要特殊处理。比如:
- 清理用户输入中的敏感信息
- 将模型输出的非结构化文本转换为结构化数据
- 处理模型返回的包含多余字符的 JSON
2.2 业务逻辑集成
每个业务领域都有独特的规则需要嵌入到处理流程中。例如:
- 电商场景下的价格计算规则
- 客服系统中的话术合规检查
- 内容审核流程中的敏感词过滤
2.3 格式转换中介
当链中的前后组件使用不同数据格式时,需要转换适配器。常见的有:
- 将数据库查询结果转换为自然语言描述
- 把 API 响应重新组织为模型需要的格式
- 不同模型输出之间的标准化处理
2.4 增强的错误处理
内置组件的错误处理往往比较基础,我们需要:
- 添加业务特定的错误恢复机制
- 实现重试策略(如指数退避)
- 提供用户友好的错误信息转换
2.5 性能优化
针对高并发场景的优化手段:
- 实现结果缓存(特别是对相同提示词的响应)
- 批量处理请求以减少 I/O 开销
- 并行执行独立任务
在我的一个电商推荐系统项目中,就曾通过自定义 Runnable 实现了以下功能:
- 清理用户历史行为数据中的噪声
- 将推荐结果按业务规则排序
- 添加 AB 测试分流逻辑
- 缓存热门商品的推荐结果
3. Runnable 核心接口详解
要实现一个功能完整的自定义 Runnable,需要理解以下几个核心方法:
3.1 基础同步/异步方法
python复制from langchain_core.runnables import Runnable
from typing import Any, Optional
class BasicRunnable(Runnable):
def invoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
"""
同步调用方法 - 必须实现
:param input: 输入数据
:param config: 运行时配置
:return: 处理结果
"""
# 实现你的处理逻辑
processed = self._process_input(input)
return processed
async def ainvoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
"""
异步调用方法 - 强烈建议实现
默认实现直接调用同步方法,对于I/O密集型操作应该重写
"""
return self.invoke(input, config)
3.2 流式处理方法
python复制from typing import Iterator, AsyncIterator
class StreamRunnable(Runnable):
def stream(self, input: Any, config: Optional[RunnableConfig] = None) -> Iterator[Any]:
"""
同步流式处理
:return: 生成器,逐个产出处理结果
"""
for chunk in self._split_input(input):
yield self._process_chunk(chunk)
async def astream(self, input: Any, config: Optional[RunnableConfig] = None) -> AsyncIterator[Any]:
"""
异步流式处理
"""
async for chunk in self._async_split_input(input):
yield self._process_chunk(chunk)
3.3 批量处理方法
python复制from typing import List
class BatchRunnable(Runnable):
def batch(self, inputs: List[Any], config: Optional[RunnableConfig] = None) -> List[Any]:
"""
同步批量处理
:param inputs: 输入列表
:return: 结果列表
"""
return [self.invoke(item, config) for item in inputs]
async def abatch(self, inputs: List[Any], config: Optional[RunnableConfig] = None) -> List[Any]:
"""
异步批量处理
"""
import asyncio
tasks = [self.ainvoke(item, config) for item in inputs]
return await asyncio.gather(*tasks)
3.4 配置参数处理
RunnableConfig 包含多个有用的运行时信息:
python复制config = {
'callbacks': [callback1, callback2], # 回调函数列表
'tags': ['production', 'v2'], # 标签用于分类和监控
'metadata': {'user_id': '123'}, # 业务元数据
'configurable': { # 可配置参数
'timeout': 30,
'retry_times': 3
}
}
4. 实现自定义 Runnable 的完整步骤
让我们通过一个实际的 JSON 处理示例,详细讲解实现过程。
4.1 定义需求
假设我们需要一个能够:
- 从 LLM 输出的杂乱文本中提取 JSON 内容
- 验证 JSON 结构是否符合预期
- 支持自定义清理规则
- 提供严格/宽松两种处理模式
4.2 基础实现
python复制import re
import json
from typing import Any, Optional, Callable, Type
from langchain_core.runnables import Runnable
from langchain_core.pydantic_v1 import BaseModel
class JSONProcessor(Runnable):
def __init__(self,
schema: Optional[Type[BaseModel]] = None,
strict: bool = False,
cleaner: Optional[Callable[[str], str]] = None):
"""
:param schema: Pydantic 模型用于验证
:param strict: 严格模式会抛出验证异常
:param cleaner: 自定义文本清理函数
"""
self.schema = schema
self.strict = strict
self.cleaner = cleaner or self._default_cleaner
def _default_cleaner(self, text: str) -> str:
"""默认清理函数:提取第一个有效JSON对象"""
try:
json.loads(text)
return text
except json.JSONDecodeError:
matches = re.findall(r'\{[^{}]*\}', text)
for match in matches:
try:
json.loads(match)
return match
except json.JSONDecodeError:
continue
return text
def _validate(self, json_str: str) -> bool:
"""验证JSON结构"""
if not self.schema:
return True
try:
data = json.loads(json_str)
self.schema(**data)
return True
except (json.JSONDecodeError, ValueError) as e:
if self.strict:
raise ValueError(f"Validation failed: {str(e)}")
return False
def invoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
cleaned = self.cleaner(str(input))
if not self._validate(cleaned):
return {"error": "Invalid JSON", "original": cleaned} if not self.strict else None
return json.loads(cleaned)
4.3 添加高级功能
流式处理支持
python复制class StreamingJSONProcessor(JSONProcessor):
def stream(self, input: Any, config: Optional[RunnableConfig] = None) -> Iterator[Any]:
buffer = ""
for chunk in input:
buffer += chunk
try:
cleaned = self.cleaner(buffer)
if self._validate(cleaned):
yield json.loads(cleaned)
buffer = ""
except ValueError:
continue
批量处理优化
python复制class BatchJSONProcessor(JSONProcessor):
def batch(self, inputs: List[Any], config: Optional[RunnableConfig] = None) -> List[Any]:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
return list(executor.map(lambda x: self.invoke(x, config), inputs))
5. 实战:构建生产级 JSON 后处理器
让我们完善之前的 JSONProcessor,使其达到生产环境要求。
5.1 增强的错误处理
python复制class RobustJSONProcessor(JSONProcessor):
def invoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
try:
# 记录开始时间
start_time = time.time()
# 调用父类处理逻辑
result = super().invoke(input, config)
# 记录成功日志
if config and config.get('callbacks'):
duration = time.time() - start_time
for cb in config['callbacks']:
if hasattr(cb, 'on_chain_end'):
cb.on_chain_end(
{"output": result, "duration": duration},
metadata={"operation": "json_processing"}
)
return result
except Exception as e:
# 错误处理和日志记录
if config and config.get('callbacks'):
for cb in config['callbacks']:
if hasattr(cb, 'on_chain_error'):
cb.on_chain_error(
error=e,
metadata={"input": str(input)[:100]}
)
# 根据模式决定是抛出还是降级处理
if self.strict:
raise
return {
"status": "error",
"error_type": type(e).__name__,
"message": str(e)
}
5.2 性能优化技巧
- 缓存机制:对相同的输入直接返回缓存结果
python复制from functools import lru_cache
class CachedJSONProcessor(RobustJSONProcessor):
@lru_cache(maxsize=1024)
def _process_with_cache(self, input_str: str) -> Any:
return super().invoke(input_str)
def invoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
input_str = str(input)
return self._process_with_cache(input_str)
- 预处理优化:使用更高效的正则表达式
python复制def _optimized_cleaner(self, text: str) -> str:
# 编译正则表达式提高性能
if not hasattr(self, '_json_pattern'):
self._json_pattern = re.compile(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}')
# 先尝试直接解析
try:
json.loads(text)
return text
except json.JSONDecodeError:
pass
# 使用预编译的正则
match = self._json_pattern.search(text)
return match.group(0) if match else text
6. 最佳实践与经验分享
6.1 错误处理经验
在实际项目中,我发现以下几个错误处理策略特别有效:
-
分级错误处理:根据错误严重程度采取不同策略
- 语法错误:尝试自动修复
- 验证错误:返回结构化错误信息
- 系统错误:触发告警并降级
-
上下文保留:始终在错误信息中包含原始输入的摘要,方便调试但不会泄露敏感信息
-
重试策略:对于暂时性错误(如网络问题),实现指数退避重试
python复制def invoke_with_retry(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
max_retries = config.get('max_retries', 3) if config else 3
for attempt in range(max_retries):
try:
return self.invoke(input, config)
except TemporaryError as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # 指数退避
6.2 性能优化技巧
-
批量处理:对于数据库或API调用,尽可能使用批量接口
-
异步优化:I/O密集型操作一定要实现异步版本
-
选择性验证:在链式调用中,可以跳过中间步骤的完整验证
-
延迟加载:对于重型资源(如模型),实现按需加载
python复制class LazyLoadProcessor(Runnable):
def __init__(self):
self._model = None
@property
def model(self):
if self._model is None:
self._model = load_heavy_model()
return self._model
def invoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
return self.model.process(input)
6.3 测试策略建议
- 单元测试:对每个方法单独测试,特别是边界条件
- 集成测试:验证与其他组件的兼容性
- 性能测试:确保在高负载下表现稳定
- 模糊测试:用随机输入测试鲁棒性
python复制# 示例单元测试
def test_json_extractor():
processor = JSONProcessor()
messy_input = "Some text before {\"name\":\"John\", \"age\":30} and after"
result = processor.invoke(messy_input)
assert result == {"name": "John", "age": 30}
def test_error_handling():
processor = JSONProcessor(strict=False)
result = processor.invoke("invalid{json")
assert "error" in result
7. 常见问题解决方案
7.1 类型不匹配问题
问题现象:当把自定义 Runnable 接入现有链时,出现类型错误
解决方案:
- 明确声明输入输出类型
- 实现类型转换适配器
- 添加中间验证步骤
python复制from typing import Dict
class TypeAdapter(Runnable):
def invoke(self, input: Dict[str, Any], config: Optional[RunnableConfig] = None) -> str:
"""将字典转换为JSON字符串"""
import json
return json.dumps(input)
7.2 配置传递问题
问题现象:链式调用中配置信息丢失
解决方案:
- 确保正确传递 config 参数
- 合并多个配置源
- 提供配置默认值
python复制def invoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
# 合并默认配置和传入配置
final_config = {
'timeout': 30,
'retries': 3,
**(config or {})
}
# 使用final_config处理逻辑
7.3 流式处理中断
问题现象:流式响应中途断开
解决方案:
- 实现心跳机制
- 添加超时控制
- 提供恢复能力
python复制class ResilientStreamer(Runnable):
def stream(self, input: Any, config: Optional[RunnableConfig] = None) -> Iterator[Any]:
timeout = config.get('timeout', 30) if config else 30
last_active = time.time()
for chunk in input:
last_active = time.time()
yield self.process(chunk)
# 检查超时
if time.time() - last_active > timeout:
raise TimeoutError("Stream inactive for too long")
7.4 内存泄漏问题
问题现象:长时间运行后内存占用持续增长
解决方案:
- 定期清理缓存
- 使用弱引用
- 实现资源清理接口
python复制class MemorySafeRunnable(Runnable):
def __init__(self):
self._cache = {}
self._last_cleanup = time.time()
def _cleanup_cache(self):
now = time.time()
if now - self._last_cleanup > 3600: # 每小时清理一次
self._cache = {k: v for k, v in self._cache.items() if v.valid}
self._last_cleanup = now
8. 项目实战经验分享
在最近的一个智能客服项目中,我们通过自定义 Runnable 实现了以下功能架构:
code复制用户输入 → [输入清洗] → [意图识别] → [业务处理] → [回复生成] → [输出格式化]
其中每个方括号都是一个自定义 Runnable,具体实现要点:
-
输入清洗:
- 去除特殊字符
- 识别并处理附件
- 敏感信息脱敏
-
业务处理:
- 与CRM系统集成
- 查询订单状态
- 处理退货请求
-
输出格式化:
- 根据渠道适配格式(短信/邮件/网页)
- 添加免责声明
- 多语言支持
关键收获:
- 每个 Runnable 保持单一职责
- 通过组合实现复杂逻辑
- 统一错误处理框架
- 完善的监控和日志
这种架构使我们能够:
- 快速迭代单个组件
- 方便进行AB测试
- 灵活调整处理流程
- 容易定位性能瓶颈