最近在技术社区看到不少关于LangChain Chain组件的讨论,但很多文章要么停留在概念层面,要么示例过于简单。作为一个在实际项目中深度使用LangChain的开发者,我想通过一个完整的AI论文生成案例,带大家真正掌握Chain链的核心用法。这个案例会用到RunnablePassthrough、RunnableParallel等关键组件,并解释为什么选择这种架构。
在自然语言处理项目中,我们经常需要将多个步骤串联起来:输入处理→提示词构建→模型调用→输出解析。传统做法是写一堆嵌套的回调函数,代码很快就会变得难以维护。LangChain的Chain链提供了一种声明式的流水线构建方式,就像Unix的管道(pipe)操作一样优雅。
以论文生成为例,完整流程需要:
如果手动实现,光是处理各步骤之间的数据传递就会让代码变得混乱。而用Chain链,我们可以用清晰的管道符号(|)连接各个环节。
在开始案例前,需要理解LangChain提供的几个基础构建块:
RunnablePassthrough
最简单的数据传递工具,有两种典型用法:
A → B(B直接接收A的输出).assign(new_field=value)(在原有数据上扩展)RunnableParallel
并发执行多个Chain,并将结果合并。比如同时获取天气和新闻:
python复制{
"weather": get_weather_chain,
"news": get_news_chain
}
RunnableLambda
自定义处理函数,适合需要特殊逻辑处理的环节。比如:
python复制def extract_keywords(input):
return {"keywords": input[:100]}
chain = RunnableLambda(extract_keywords)
LangChain提供两种主要Prompt构建方式:
ChatPromptTemplate.from_template:适合简单场景,类似f-stringpython复制"写一个关于{topic}的大纲"
ChatPromptTemplate.from_messages:支持复杂对话历史python复制[
("system", "你是一位专业作家"),
("human", "请写关于{topic}的大纲")
]
在论文生成案例中,我们主要使用from_template,因为不需要维护多轮对话状态。
首先配置通义千问模型(其他模型如GPT-3.5也可类似使用):
python复制import os
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
os.environ["DASHSCOPE_API_KEY"] = "your_api_key" # 替换为真实key
model = ChatTongyi(model="qwen-max")
大纲生成Chain:
python复制outline_prompt = ChatPromptTemplate.from_template(
"请给主题为{topic}的议论文写一个总-递进-总的简短大纲,共5段。"
)
outline_chain = outline_prompt | model | StrOutputParser()
这里使用管道符连接:提示词→模型调用→字符串输出解析
素材模拟Chain:
python复制def mock_search(input_data):
return """案例素材:
1. 利:AI医疗影像诊断准确率达95%
2. 弊:自动驾驶导致卡车司机失业"""
实际项目中可以替换为真实搜索引擎调用
论文生成Chain:
python复制output_prompt = ChatPromptTemplate.from_template(
"你是一位高考作文专家。基于大纲:{outline}\n"
"和案例素材:{data}\n"
"就主题【{topic}】写一篇950字左右的议论文。"
)
output_chain = output_prompt | model | StrOutputParser()
关键部分来了——使用RunnableParallel并发执行:
python复制complex_chain = (
RunnableParallel({
"outline": outline_chain,
"data": mock_search,
"topic": RunnablePassthrough()
})
| output_chain
)
执行结果:
python复制result = complex_chain.invoke({"topic": "AI发展的利与弊"})
print(result)
对比两种实现方式:
顺序执行版(不推荐):
python复制# 需要手动管理中间结果
outline = outline_chain.invoke({"topic": topic})
data = mock_search(topic)
essay = output_chain.invoke({
"outline": outline,
"data": data,
"topic": topic
})
并行执行版(推荐):
实测显示,并行版比顺序版快30%-40%(取决于子任务耗时)。
如果需要查看中间结果:
python复制debug_chain = (
RunnableParallel({
"outline": outline_chain,
"data": mock_search,
"topic": RunnablePassthrough()
})
| RunnablePassthrough.assign(essay=output_chain)
)
response = debug_chain.invoke({"topic": "AI伦理"})
print(response["outline"]) # 查看大纲
print(response["data"]) # 查看素材
print(response["essay"]) # 查看最终论文
问题1:Missing required input keys: 'topic'
问题2:TypeError: unhashable type: 'dict'
问题3:模型响应格式错误
在实际项目中,还需要考虑:
python复制from langchain_core.runnables import RunnableLambda
def safe_invoke(input):
try:
return model.invoke(input)
except Exception as e:
return f"Error: {str(e)}"
safe_chain = prompt | RunnableLambda(safe_invoke)
这个案例展示了如何用Chain链构建真实可用的AI应用。相比零散的API调用,Chain提供了更工程化的解决方案。我在实际项目中发现,合理使用RunnableParallel通常能带来显著的性能提升,特别是在需要聚合多个数据源的场景下。