在大模型应用开发领域,LangChain已经成为构建智能体(Agent)和复杂工作流的事实标准框架。而LCEL(LangChain Expression Language)作为LangChain 0.1+版本引入的核心抽象层,正在彻底改变开发者构建LLM应用的方式。
在LCEL出现之前,开发者通常使用传统的Chain构建方式,例如:
python复制from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
qa_chain = RetrievalQA.from_chain_type(
llm=OpenAI(),
retriever=retriever,
chain_type="stuff"
)
这种方式虽然简单直接,但存在几个明显的局限性:
LCEL通过引入"Runnable"这一核心抽象,彻底改变了LLM应用的构建范式:
python复制from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt_template
| llm
| StrOutputParser()
)
LCEL的核心创新点包括:
Runnable是LCEL的基石,定义在langchain_core.runnables模块中。它规定了所有可组合组件必须实现的标准接口:
python复制class Runnable(Generic[Input, Output]):
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
...
def stream(self, input: Input, config: Optional[RunnableConfig] = None) -> Iterator[Output]:
...
def batch(self, inputs: List[Input], config: Optional[RunnableConfig] = None) -> List[Output]:
...
# 异步方法
async def ainvoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
...
async def astream(self, input: Input, config: Optional[RunnableConfig] = None) -> AsyncIterator[Output]:
...
async def abatch(self, inputs: List[Input], config: Optional[RunnableConfig] = None) -> List[Output]:
...
LCEL提供了一系列强大的内置组件,可以满足绝大多数LLM应用场景:
RunnablePassthrough: 透传输入,常用于保留原始输入RunnableAssign: 动态添加或修改输入字段StrOutputParser/JsonOutputParser: 标准化输出格式RunnableParallel: 并行执行多个RunnableRunnableBranch: 实现条件分支逻辑RunnableLambda: 将普通函数包装为RunnableRunnableWithMessageHistory: 自动管理对话历史RunnableBinding: 绑定固定参数或配置LCEL的流式处理不仅仅是LLM输出的流式传递,而是整个处理管道的端到端流式:
python复制async def progressive_rag_stream(question):
retrieval_task = asyncio.create_task(retriever.ainvoke(question))
docs = await retrieval_task
prompt = build_prompt(docs, question)
async for chunk in llm.astream(prompt):
yield chunk
流式处理的关键点:
生产环境中必须考虑的错误处理策略:
python复制from langchain_core.runnables import RunnableRetry
robust_llm = chat_model.with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True
)
fallback_llm = ChatOpenAI(model="gpt-3.5-turbo")
primary_llm = ChatOpenAI(model="gpt-4-turbo")
safe_llm = primary_llm.with_fallbacks([fallback_llm])
关键容错技术:
通过回调机制实现深度监控:
python复制from langchain_core.callbacks import BaseCallbackHandler
import time
class TimingHandler(BaseCallbackHandler):
def on_chain_start(self, serialized, prompts, **kwargs):
self.start_time = time.time()
def on_chain_end(self, outputs, **kwargs):
print(f"Chain took {time.time() - self.start_time:.2f}s")
chain.invoke("hello", config={"callbacks": [TimingHandler()]})
监控维度建议:
python复制from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnablePassthrough, RunnableWithMessageHistory
from langchain_core.output_parsers import StrOutputParser
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.runnables.utils import ConfigurableFieldSpec
# 初始化组件
vectorstore = FAISS.load_local("my_index", OpenAIEmbeddings())
retriever = vectorstore.as_retriever()
llm = ChatOpenAI(model="gpt-3.5-turbo").with_retry(stop_after_attempt=2)
# 构建Prompt模板
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant. Use the following context: {context}"),
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
])
# 基础RAG链
rag_chain = (
{
"context": retriever | format_docs,
"input": RunnablePassthrough(),
"history": RunnablePassthrough()
}
| prompt
| llm
| StrOutputParser()
)
# 历史管理
store = {}
def get_session_history(session_id: str) -> BaseChatMessageHistory:
if session_id not in store:
store[session_id] = ChatMessageHistory()
return store[session_id]
# 最终链
chain_with_history = RunnableWithMessageHistory(
rag_chain,
get_session_history,
input_messages_key="input",
history_messages_key="history",
history_factory_config=[
ConfigurableFieldSpec(
id="session_id",
annotation=str,
name="Session ID",
description="Unique session identifier",
default="",
is_shared=True,
),
],
)
对于不需要检索的简单查询,可以优化处理流程:
python复制from langchain_core.runnables import RunnableBranch
from operator import itemgetter
def needs_retrieval(question: str) -> bool:
greetings = ["hi", "hello", "hey", "good morning"]
return not any(g in question.lower() for g in greetings)
branch = RunnableBranch(
(lambda x: needs_retrieval(x["input"]),
{
"context": itemgetter("input") | retriever | format_docs,
"input": itemgetter("input"),
"history": itemgetter("history")
} | prompt | llm | StrOutputParser()
),
ChatPromptTemplate.from_messages([
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
]) | llm | StrOutputParser()
)
final_chain = RunnableWithMessageHistory(
branch,
get_session_history,
input_messages_key="input",
history_messages_key="history",
...
)
异步并发:充分利用异步特性提高吞吐量
python复制async def process_queries(queries):
return await chain.abatch(queries)
缓存策略:对频繁相同的查询结果进行缓存
python复制from langchain.cache import InMemoryCache
from langchain.globals import set_llm_cache
set_llm_cache(InMemoryCache())
批量处理:使用batch方法减少API调用开销
python复制results = chain.batch(["query1", "query2", "query3"])
中间结果检查:
python复制debug_chain = chain.with_config(run_name="debug_chain")
for step in debug_chain.stream("input"):
print(f"Intermediate: {step}")
LangSmith集成:
python复制import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
错误隔离测试:对链中每个组件单独测试,确保独立可用性
输入过滤:对用户输入进行严格的过滤和清理
python复制def sanitize_input(user_input: str) -> str:
# 实现具体的过滤逻辑
return cleaned_input
输出审查:对模型输出进行合规性检查
python复制def validate_output(output: str) -> bool:
# 实现合规检查
return is_valid
访问控制:实现基于角色的权限管理
python复制def check_access(user: User, chain: Runnable) -> bool:
# 实现访问控制逻辑
return has_access
在实际工程实践中,LCEL的价值不仅体现在开发效率的提升上,更重要的是它为LLM应用带来了工程化的可能。通过标准化的接口设计、灵活的组合方式和强大的生产就绪特性,LCEL正在成为构建可靠、高效、可维护的智能体系统的基石。