在构建基于大语言模型的应用时,我们常常会遇到这样的困境:单个Prompt往往难以处理复杂的业务逻辑。就像搭积木一样,我们需要把不同的功能模块有机组合起来——这正是LangChain框架中"Chain"概念的精髓所在。
我去年为某金融客户开发智能投研助手时就深有体会。单纯让GPT分析财报数据效果有限,但当我们把数据提取、指标计算、风险预警等环节串联成链后,系统突然变得"聪明"起来。今天要详解的三种基础链式结构(基础链、顺序链、分支链),就是构建这类复杂应用的乐高积木。
作为最基础的链结构,LLMChain本质上是一个Prompt模板与大模型的组合体。但它的精妙之处在于动态变量绑定机制。比如这个股票分析链:
python复制from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
analysis_template = """作为资深分析师,请根据以下{report_type}报告:
{report_content}
提取三个关键指标并给出投资建议"""
prompt = PromptTemplate(
input_variables=["report_type", "report_content"],
template=analysis_template
)
analysis_chain = LLMChain(llm=llm, prompt=prompt)
实际使用时,report_type和report_content会被动态注入。这种设计使得同一个链可以处理季报、年报等不同场景。
实战经验:模板中的变量名建议采用下划线命名法,避免使用空格或特殊字符,否则在复杂链组合时容易引发解析错误。
基础链的输入输出需要严格定义。良好的接口设计应该像这样:
python复制# 输入规范示例
input_schema = {
"report_type": {
"type": "string",
"enum": ["quarterly", "annual", "special"],
"description": "报告类型"
},
"report_content": {
"type": "string",
"minLength": 100,
"description": "报告全文内容"
}
}
# 输出处理函数
def output_parser(raw_output):
import re
pattern = r"指标\d+:(.*?)\n建议:(.*)"
return dict(zip(["indicators", "advice"], re.findall(pattern, raw_output)))
这种设计能确保链与链之间的数据交互不会出现"鸡同鸭讲"的情况。我在实际项目中曾因忽略这点导致下游链接收到意外格式的数据,排查了整整两天。
当处理像文档摘要→情感分析→分类这样的线性流程时,SimpleSequentialChain是最佳选择。它的特点是:
python复制from langchain.chains import SimpleSequentialChain
summary_chain = LLMChain(...)
sentiment_chain = LLMChain(...)
classify_chain = LLMChain(...)
pipeline = SimpleSequentialChain(
chains=[summary_chain, sentiment_chain, classify_chain],
verbose=True
)
调试技巧:设置verbose=True可以看到每个步骤的输入输出,这在链出现意外行为时非常有用。不过生产环境记得关闭,否则会泄露敏感信息。
对于需要保留中间结果的场景,就要用SequentialChain。比如这个电商评论处理流程:
python复制processing_chain = SequentialChain(
chains=[feature_chain, sentiment_chain, urgency_chain],
input_variables=["user_review"],
output_variables=["product_features", "sentiment_score", "urgency_level"],
verbose=True
)
这里每个链的输出都会被保留,最终返回包含所有结果的字典。关键点在于:
我遇到过一个典型错误:两个链都输出名为"result"的变量,导致后者覆盖前者。解决方法是用业务语义明确的变量名,比如"sentiment_result"和"feature_result"。
在进入主处理流程前,经常需要对输入数据进行清洗和转换。TransformChain就是专门处理这种需求的工具:
python复制def clean_text(inputs):
import re
text = inputs["raw_text"]
return {
"clean_text": re.sub(r'[^\w\s]', '', text).strip()
}
preprocess_chain = TransformChain(
input_variables=["raw_text"],
output_variables=["clean_text"],
transform=clean_text
)
这种链的特点是不调用LLM,只做数据预处理。常见用途包括:
当需要根据输入内容动态选择处理路径时,路由链就派上用场了。完整配置示例:
python复制finance_prompt = PromptTemplate(...)
tech_prompt = PromptTemplate(...)
default_prompt = PromptTemplate(...)
destinations = [
{
"name": "finance",
"description": "适合处理财经类问题",
"prompt": finance_prompt
},
{
"name": "tech",
"description": "处理技术问题",
"prompt": tech_prompt
}
]
router_chain = RouterChain.from_destinations(
llm=llm,
destinations=destinations,
default_chain=LLMChain(llm=llm, prompt=default_prompt)
)
路由决策过程完全由LLM根据问题内容自动完成。为了提高准确性,需要注意:
让我们用组合链构建一个能处理多种请求的智能客服系统:
code复制用户输入 → 路由链 →
├─ 产品咨询 → 产品链 → 推荐链
├─ 投诉处理 → 情感分析链 → 补偿建议链
└─ 技术支持 → 问题分类链 → 解决方案链
对应的代码结构:
python复制router = RouterChain(...)
product_flow = SequentialChain(...)
complaint_flow = SequentialChain(...)
tech_flow = SequentialChain(...)
full_system = {
"router": router,
"product": product_flow,
"complaint": complaint_flow,
"tech": tech_flow
}
def handle_request(user_input):
route = router.route(user_input)
return full_system[route.destination].run(user_input)
在复杂链式结构中,必须考虑错误处理:
python复制try:
result = handle_request(input)
except Exception as e:
logger.error(f"Chain failed: {str(e)}")
if isinstance(e, RouterError):
return default_chain.run(input)
elif isinstance(e, LLMTimeout):
return {"error": "处理超时,请稍后再试"}
else:
return {"error": "系统繁忙"}
建议为每个关键链设置超时和重试机制:
python复制from langchain.chains import TimeoutChain
safe_chain = TimeoutChain(
chain=main_chain,
timeout=30,
callback=timeout_handler
)
为了避免重复计算,可以添加缓存层:
python复制from langchain.cache import SQLiteCache
import langchain
langchain.llm_cache = SQLiteCache(database_path=".langchain.db")
# 带版本控制的缓存
def hashed_input(input_dict):
import hashlib
return hashlib.md5(str(sorted(input_dict.items())).encode()).hexdigest()
class VersionedCache(SQLiteCache):
def __init__(self, version):
self.version = version
super().__init__()
def lookup(self, prompt, llm_string):
original_key = super().lookup(prompt, llm_string)
return f"{self.version}:{original_key}"
使用回调系统记录关键指标:
python复制from langchain.callbacks import FileCallbackHandler
class AnalyticsCallback(FileCallbackHandler):
def on_chain_start(self, serialized, inputs, **kwargs):
start_time = time.time()
self.chain_times[serialized["id"]] = start_time
def on_chain_end(self, outputs, **kwargs):
end_time = time.time()
duration = end_time - self.chain_times.pop(serialized["id"])
log_metric("chain_duration", duration)
对于需要迭代处理的任务,可以设计循环链:
python复制max_retries = 3
def refine_loop(inputs):
current = inputs["initial"]
for _ in range(max_retries):
feedback = analyzer_chain.run(current)
if feedback["satisfactory"]:
break
current = refiner_chain.run(
{"draft": current, "feedback": feedback}
)
return {"final": current}
利用Python的并发机制加速处理:
python复制from concurrent.futures import ThreadPoolExecutor
def parallel_execute(chains, input_data):
with ThreadPoolExecutor() as executor:
futures = {
name: executor.submit(chain.run, input_data)
for name, chain in chains.items()
}
return {
name: future.result()
for name, future in futures.items()
}
这种模式特别适合需要同时调用多个API或处理多个独立子任务的场景。但要注意: