最近在尝试用LangChain结合LangGraph构建复杂任务处理智能体时,发现这两个工具的组合简直就像给AI装上了"思维导航系统"。传统LangChain虽然能处理线性任务流,但在需要动态决策的复杂场景下就显得力不从心。而LangGraph的引入,让智能体真正具备了根据上下文调整执行路径的能力。
这个项目特别适合三类开发者:
LangGraph在LangChain基础上主要增加了三个关键能力:
这种设计使得智能体可以处理像这样的复杂逻辑:
python复制def should_continue(state):
if state["iteration"] > 5:
return "end"
return "continue"
在实际项目中,我常用以下三种架构模式:
中心调度模式:
链式反应模式:
黑板架构模式:
先看一个最简单的客服对话智能体实现:
python复制from langgraph.graph import Graph
from langchain_core.messages import HumanMessage
workflow = Graph()
def receive_input(state):
return {"user_input": state["last_message"]}
def generate_response(state):
# 这里接入实际的LLM调用
return {"response": f"已处理:{state['user_input']}"}
workflow.add_node("receive", receive_input)
workflow.add_node("respond", generate_response)
workflow.set_entry_point("receive")
workflow.add_edge("receive", "respond")
workflow.add_edge("respond", END)
# 使用示例
result = workflow.invoke({"last_message": "我的订单状态?"})
下面是一个电商售后处理的增强版实现:
python复制def check_order_status(state):
# 模拟订单系统查询
return {"status": "shipped" if random.random() > 0.5 else "processing"}
def handle_shipped(state):
return {"response": "您的订单已发货,预计3天内送达"}
def handle_processing(state):
return {"response": "订单正在处理中,请耐心等待"}
workflow = Graph()
workflow.add_node("check_status", check_order_status)
workflow.add_node("shipped", handle_shipped)
workflow.add_node("processing", handle_processing)
workflow.set_entry_point("check_status")
workflow.add_conditional_edges(
"check_status",
lambda x: "shipped" if x["status"] == "shipped" else "processing",
{"shipped": "shipped", "processing": "processing"}
)
workflow.add_edge("shipped", END)
workflow.add_edge("processing", END)
在处理高并发请求时,我总结了这些优化点:
节点缓存:
python复制@functools.lru_cache(maxsize=128)
def expensive_operation(param):
# 耗时计算
return result
异步执行:
python复制async def async_node(state):
# 异步调用LLM
return await llm.ainvoke(...)
批量处理:
python复制def batch_node(state):
inputs = state["batch_inputs"]
return {"outputs": [process(x) for x in inputs]}
调试复杂工作流时,这些工具特别有用:
可视化追踪:
python复制from langgraph.graph import draw_graph
draw_graph(workflow).show()
执行日志:
python复制class DebugNode:
def __call__(self, state):
print(f"Current state: {state}")
return state
workflow.add_node("debug", DebugNode())
性能分析:
python复制from line_profiler import LineProfiler
profiler = LineProfiler()
profiler.add_function(workflow.invoke)
profiler.run('workflow.invoke(input)')
健壮的生产系统需要完善的错误处理:
节点级重试:
python复制from tenacity import retry, stop_after_attempt
@retry(stop=stop_after_attempt(3))
def unreliable_node(state):
# 可能失败的操作
return result
工作流级回退:
python复制def fallback_node(state):
return {"response": "系统繁忙,请稍后再试"}
workflow.add_node("fallback", fallback_node)
workflow.add_edge("fallback", END)
超时控制:
python复制import timeout_decorator
@timeout_decorator.timeout(5)
def time_sensitive_node(state):
# 必须在5秒内完成
return result
在处理用户输入时特别注意:
输入净化:
python复制def sanitize_input(text):
return text.replace("<", "<").replace(">", ">")
权限控制:
python复制def check_permission(state):
if not state["user"].has_permission():
raise PermissionError("操作未授权")
敏感数据过滤:
python复制def filter_output(state):
if "credit_card" in state["response"]:
state["response"] = "[REDACTED]"
return state
| 错误现象 | 可能原因 | 解决方案 |
|---|---|---|
| 工作流卡死 | 循环条件未终止 | 检查continue边的终止条件 |
| 节点未执行 | 边配置错误 | 验证add_edge调用顺序 |
| 状态丢失 | 节点未返回正确字段 | 确保每个节点返回完整状态 |
| 性能低下 | 节点计算密集 | 添加缓存或异步处理 |
最小复现法:
状态快照:
python复制def debug_node(state):
with open("state_snapshot.json", "w") as f:
json.dump(state, f)
return state
断点调试:
python复制import pdb
def debug_node(state):
pdb.set_trace() # 交互式调试
return state
在实际项目中,我发现最难调试的是循环条件设置不当导致的工作流卡死。一个实用的技巧是在循环节点添加计数器:
python复制def loop_node(state):
state["iteration"] = state.get("iteration", 0) + 1
if state["iteration"] > MAX_ITER:
raise ValueError("循环次数超出限制")
return state