1. 项目概述
"langgraph的一个例子"这个标题看似简单,却蕴含着丰富的技术内涵。作为一名长期从事语言模型应用开发的工程师,我经常需要处理复杂的对话流程和状态管理。LangGraph正是为解决这类问题而生的工具库,它基于有向图的概念,帮助我们构建和管理复杂的语言模型工作流。
在实际项目中,我发现LangGraph特别适合处理以下场景:
- 多步骤的对话系统
- 需要条件分支的问答流程
- 带状态维护的交互式应用
- 需要回溯或循环的对话管理
2. 核心概念解析
2.1 LangGraph基础架构
LangGraph的核心是构建有向图,其中节点代表处理步骤,边代表控制流。每个节点可以是一个函数或语言模型调用,边则定义了执行路径的条件。
典型的LangGraph应用包含三个关键组件:
- 状态对象:在整个图执行过程中传递和修改的数据结构
- 节点函数:对状态进行操作的具体业务逻辑
- 边定义:决定下一个执行节点的条件逻辑
2.2 状态管理机制
LangGraph的状态管理采用不可变设计,每个节点接收前一个节点的状态副本,修改后传递给下一个节点。这种设计带来了几个优势:
- 避免意外的状态共享
- 便于调试和追踪状态变化
- 支持时间旅行调试(回溯历史状态)
3. 完整实现示例
3.1 环境准备
首先安装必要的依赖:
bash复制pip install langgraph langchain-openai
然后设置环境变量(以OpenAI为例):
python复制import os
os.environ["OPENAI_API_KEY"] = "your-api-key"
3.2 构建对话流程图
下面实现一个简单的客服对话系统:
python复制from langgraph.graph import Graph
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
# 定义状态类型
from typing import TypedDict, List, Annotated
import operator
class State(TypedDict):
messages: Annotated[List[Union[HumanMessage, AIMessage]], operator.add]
# 初始化模型
model = ChatOpenAI(model="gpt-3.5-turbo")
# 定义节点函数
def user_input(state: State):
user_message = input("User: ")
return {"messages": [HumanMessage(content=user_message)]}
def ai_response(state: State):
last_message = state["messages"][-1]
response = model.invoke([last_message])
return {"messages": [response]}
# 构建图
workflow = Graph()
workflow.add_node("user", user_input)
workflow.add_node("ai", ai_response)
workflow.add_edge("user", "ai")
workflow.add_edge("ai", "user")
workflow.set_entry_point("user")
# 编译并运行
app = workflow.compile()
for chunk in app.stream({"messages": []}):
print(chunk)
3.3 条件分支实现
更复杂的场景可以添加条件分支:
python复制from langgraph.graph import END
from langgraph.checkpoint.sqlite import SqliteSaver
def should_continue(state: State):
last_message = state["messages"][-1].content
if "再见" in last_message.lower():
return END
return "ai"
workflow = Graph()
workflow.add_node("user", user_input)
workflow.add_node("ai", ai_response)
workflow.add_conditional_edges("ai", should_continue)
workflow.add_edge("user", "ai")
workflow.set_entry_point("user")
# 添加持久化
memory = SqliteSaver.from_conn_string(":memory:")
app = workflow.compile(checkpointer=memory)
4. 高级应用技巧
4.1 并行执行优化
对于可以并行执行的节点,可以使用add_edge的map参数:
python复制def product_info(state: State):
# 获取产品信息
return {"product": "..."}
def user_profile(state: State):
# 获取用户画像
return {"profile": "..."}
workflow = Graph()
workflow.add_node("get_product", product_info)
workflow.add_node("get_profile", user_profile)
workflow.add_node("generate", ai_response)
workflow.add_edge("get_product", "generate")
workflow.add_edge("get_profile", "generate")
4.2 状态版本控制
利用检查点机制实现状态回滚:
python复制# 从特定步骤继续执行
app = workflow.compile(checkpointer=memory)
thread_id = "user_123"
config = {"configurable": {"thread_id": thread_id}}
# 获取历史记录
history = memory.list(thread_id)
if history:
# 从最后一步继续
app.invoke(None, config)
5. 常见问题排查
5.1 状态不更新问题
症状:节点执行后状态没有变化
排查步骤:
- 检查节点函数是否返回了正确的字典结构
- 确认状态类型定义中的
operator.add是否正确使用 - 验证边是否正确连接
5.2 循环检测问题
症状:图执行陷入无限循环
解决方案:
- 设置最大循环次数
python复制from langgraph.constants import INTERRUPT
def limit_cycles(state: State, cycle_count=0):
if cycle_count > 5:
return INTERRUPT
return "next_node"
- 使用
add_conditional_edges添加中断条件
5.3 性能优化技巧
对于复杂图结构:
- 使用
@node装饰器缓存节点结果 - 对计算密集型节点启用并行执行
- 对LLM调用实现批处理
6. 生产环境最佳实践
在实际部署中,我总结了以下经验:
- 为每个对话线程分配唯一ID
- 实现检查点持久化(如Redis或SQLite)
- 添加监控和日志记录
python复制def logged_node(func):
def wrapper(state: State):
print(f"Executing {func.__name__} with state: {state}")
result = func(state)
print(f"Result: {result}")
return result
return wrapper
@logged_node
def business_node(state: State):
# 业务逻辑
return state
- 实现超时控制
python复制import signal
from contextlib import contextmanager
class TimeoutException(Exception): pass
@contextmanager
def time_limit(seconds):
def signal_handler(signum, frame):
raise TimeoutException("Timed out!")
signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
try:
with time_limit(5):
app.invoke(...)
except TimeoutException:
print("Execution timed out")