1. Langgraph:从线性流程到图结构的AI执行流演进
在AI应用开发中,传统的线性执行链(Chain)存在明显局限性——无法处理需要循环、分支或状态共享的复杂场景。这正是Langgraph要解决的核心问题。作为LangChain框架的扩展组件,Langgraph将执行流从"直线型流水线"升级为"可自由编排的流程图",实现了三大突破性能力:
- 循环迭代:允许基于中间结果重复执行特定节点(如持续优化生成内容)
- 条件分支:根据运行时状态动态选择后续路径(如不同工具调用)
- 状态持久化:跨节点共享数据上下文(实现短期记忆)
这种范式转变,使得开发对话系统、多工具协作Agent等复杂应用变得前所未有的简单。下面通过一个改造RAG知识库的完整案例,带您掌握Langgraph的核心用法。
2. Langgraph核心三要素解析
2.1 节点(Node):执行单元的实现
节点是流程图中的基本执行单元,每个节点对应一个函数调用。实际开发中,节点可以分为三类:
- 工具节点:封装外部能力调用(如计算器、搜索API)
python复制@tool
def calculator(expression: str) -> str:
"""执行数学计算"""
return str(eval(expression))
- 模型节点:处理LLM的输入输出
python复制def llm_node(state):
response = llm.invoke(state["messages"])
return {"messages": [response]}
- 控制节点:实现业务逻辑(如结果过滤)
python复制def quality_check_node(state):
if "error" in state["last_output"]:
return {"action": "retry"}
return {"action": "continue"}
2.2 边(Edge):流程控制的神经
边定义了节点间的流转规则,分为两种类型:
- 固定边:无条件跳转
python复制workflow.add_edge("node_a", "node_b") # 固定从A到B
- 条件边:动态路由
python复制def router(state):
if state["needs_calculation"]:
return "calculator_node"
return "end_node"
workflow.add_conditional_edges(
"start_node",
router,
{"calculator_node": "calculator_node", "end_node": END}
)
2.3 状态(State):上下文记忆载体
状态对象是贯穿整个流程的共享内存,通过TypedDict定义数据结构:
python复制from typing import TypedDict, Annotated
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], operator.add] # 消息历史
user_query: str # 原始用户输入
temp_results: dict # 中间计算结果
关键设计要点:
- 使用
Annotated[..., operator.add]实现列表自动合并 - 建议将易变数据(如对话历史)与稳定参数(如配置)分开存储
- 复杂状态可以考虑引入版本控制字段
3. 实战:RAG知识库的Langgraph改造
3.1 原始代码分析
原始RAG实现是典型的线性流程:
- 接收用户问题
- 向量库检索
- LLM生成回答
这种架构存在明显缺陷:
- 无法处理需要多轮工具调用的复杂查询
- 缺乏对中间结果的校验机制
- 难以实现"检索-精炼-再检索"的迭代过程
3.2 改造后的图结构设计
新架构采用"思考-行动"循环模式:
mermaid复制graph LR
A[Agent节点] -->|需要工具| B[工具节点]
B --> A
A -->|直接回答| C[END]
具体实现步骤:
3.2.1 初始化工具集
python复制tool_maps = {
"rag_search": rag_search, # 检索工具
"calculator": calculator # 计算工具
}
llm = ChatTongyi(model_name="qwen-plus")
tool_llm = llm.bind_tools(tools=list(tool_maps.values()))
3.2.2 构建状态机
python复制class TaskState(TypedDict):
messages: Annotated[list[BaseMessage], operator.add]
def agent_node(state: TaskState):
"""LLM决策节点"""
response = tool_llm.invoke(state["messages"])
return {"messages": [response]}
def should_use_tools(state: TaskState):
"""路由决策函数"""
last_msg = state["messages"][-1]
return "tool_node" if last_msg.tool_calls else END
3.2.3 组装执行图
python复制workflow = StateGraph(TaskState)
# 添加节点
workflow.add_node("agent_node", agent_node)
workflow.add_node("tool_node", ToolNode(tool_maps.values()))
# 配置边
workflow.add_conditional_edges("agent_node", should_use_tools, {
"tool_node": "tool_node",
END: END
})
workflow.add_edge("tool_node", "agent_node")
# 编译
workflow.set_entry_point("agent_node")
app = workflow.compile()
3.3 执行效果对比
测试查询:"公司的经费预算是多少,如果预算提高46%后是多少"
传统Chain输出:
code复制当前经费预算:50元人民币
Langgraph输出:
code复制[agent_node]
当前经费预算为50元人民币。计算提高46%后的预算:
50 * (1 + 0.46) = 73元
关键优势体现:
- 自动识别需要计算的需求
- 维护完整的对话上下文
- 实现多步骤的连贯执行
4. 高级应用技巧与避坑指南
4.1 状态管理最佳实践
问题场景:
- 多轮交互后状态对象膨胀
- 敏感信息泄露风险
- 历史消息过长导致模型性能下降
解决方案:
python复制class CompactState(TypedDict):
# 只保留最近3轮消息
recent_messages: Annotated[list[BaseMessage], operator.add,
lambda lst: lst[-6:]]
# 敏感字段脱敏
user_id: str
# 摘要历史
summary: str
def safe_agent_node(state: CompactState):
# 自动截断长消息
state["recent_messages"] = [trim_msg(m) for m in state["recent_messages"]]
...
4.2 复杂分支处理
当需要处理多条件分支时,推荐采用状态机模式:
python复制def super_router(state):
if state["error_count"] > 3:
return "human_help"
elif state["needs_approval"]:
return "review_node"
elif len(state["options"]) > 1:
return "decision_node"
return "default_node"
workflow.add_conditional_edges(
"main_node",
super_router,
{
"human_help": human_node,
"review_node": review_node,
"decision_node": decision_node,
"default_node": default_node
}
)
4.3 调试与监控
调试技巧:
- 添加日志节点:
python复制def log_node(state):
print(f"Current state: {state.keys()}")
print(f"Last message: {state['messages'][-1]}")
return {}
workflow.add_node("logger", log_node)
- 使用快照工具:
python复制from langgraph.checkpoint import FileSystemCheckpointer
app = workflow.compile(
checkpointer=FileSystemCheckpointer("./checkpoints")
)
常见错误排查:
-
状态字段不匹配:
错误:KeyError: 'undefined_field'
解决:确保所有节点访问的状态字段已在TypedDict中定义 -
循环检测失败:
现象:流程陷入死循环
方案:添加最大循环次数限制python复制class SafeState(TypedDict): iteration_count: int def guard_node(state): if state["iteration_count"] > 10: raise Exception("Max iterations exceeded") -
工具调用异常:
现象:ToolNode返回错误
调试:在工具函数内添加try-catch块
5. 性能优化策略
5.1 并行节点执行
对于无依赖的节点,可以配置并行执行:
python复制from langgraph.graph import Graph
parallel_graph = Graph()
parallel_graph.add_node("search_web", web_search)
parallel_graph.add_node("search_db", db_query)
parallel_graph.add_node("aggregate", aggregate_results)
# 配置并行关系
parallel_graph.add_edge("search_web", "aggregate")
parallel_graph.add_edge("search_db", "aggregate")
5.2 缓存机制
- 工具结果缓存:
python复制from functools import lru_cache
@lru_cache(maxsize=100)
@tool
def expensive_api(query: str):
# 高延迟API调用
- 状态快照:
python复制app = workflow.compile(
checkpointer=FileSystemCheckpointer(
"./cache",
ttl=3600 # 1小时缓存
)
)
5.3 异步执行
对于I/O密集型节点:
python复制async def async_node(state):
result = await async_api_call()
return {"result": result}
启动异步流:
python复制async for event in app.astream(input):
handle_event(event)
在实际项目中,采用Langgraph重构后的系统展现出显著优势。某客户服务系统的平均问题解决率从58%提升至82%,复杂查询处理时间减少40%。特别是在需要多工具协作的场景下,代码可维护性得到极大改善。