1. 项目概述:构建带人工审批的智能体流水线
在AI应用开发领域,我们经常面临一个核心矛盾:如何平衡自动化效率与业务风险控制。传统聊天机器人要么完全自动化(风险高),要么完全人工(效率低)。LangGraph提供了一种创新解决方案,通过状态机模型实现"智能体流水线",在关键节点引入人工审批机制。
这个实战项目将展示如何使用Python+LangChain构建一个电商退款审批系统。系统特点包括:
- 自动处理小额退款(<1000元)
- 大额退款自动触发人工审批流程
- 审批前后状态持久化
- 可中断/恢复的执行流程
关键价值:相比传统自动化脚本,这种架构既保持了AI的处理效率,又在风险点设置人工检查站,特别适合金融、电商、客服等对准确性要求高的场景。
2. 核心架构解析
2.1 状态机模型设计
LangGraph的核心是状态机(State Machine)模式。在我们的退款系统中,状态定义如下:
python复制from typing import TypedDict, Annotated, List, Optional
import operator
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add] # 消息累积
order_id: Optional[str] # 订单号
amount: Optional[float] # 金额
refund_status: str # 状态标识
状态流转路径设计:
code复制pending → waiting_approval → approved/rejected
↗
pending → approved (金额<1000直接通过)
2.2 关键节点实现
退款处理节点
python复制def node_process_refund(state: AgentState):
# 模拟数据库查询
def get_order_amount(order_id):
return 1200.0 # 测试数据
amount = get_order_amount(state['order_id'])
# 业务规则引擎
if amount > 1000:
return {"amount": amount, "refund_status": "waiting_approval"}
else:
return {"amount": amount, "refund_status": "approved"}
人工审批节点
python复制def node_human_approval(state: AgentState):
# 实际逻辑由人工决策后更新状态
pass
2.3 条件路由设计
python复制def route_process(state):
if state['refund_status'] == "waiting_approval":
return "human_approval"
return END
3. 完整实现步骤
3.1 环境准备
安装依赖:
bash复制pip install langgraph langchain-openai langchain-core
langgraph:核心状态机库langchain-openai:GPT模型接入langchain-core:基础数据类型
3.2 构建工作流
python复制from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("process", node_process_refund)
workflow.add_node("human_approval", node_human_approval)
# 设置路由
workflow.set_entry_point("process")
workflow.add_conditional_edges("process", route_process)
workflow.add_edge("human_approval", END)
# 编译工作流
memory = MemorySaver()
app = workflow.compile(
checkpointer=memory,
interrupt_before=["human_approval"] # 关键拦截点
)
3.3 运行与交互
python复制config = {"configurable": {"thread_id": "thread-1"}}
# 初始执行
for event in app.stream({"order_id": "BIG888"}, config=config):
pass
# 检查中断状态
state = app.get_state(config)
if state.next and "human_approval" in state.next:
print(f"待审批金额: {state.values['amount']}")
# 模拟人工输入
user_input = input("批准?(yes/no): ")
# 更新状态
new_status = "approved" if user_input == "yes" else "rejected"
app.update_state(config, {"refund_status": new_status})
# 恢复执行
for event in app.stream(None, config=config):
pass
4. 生产级优化建议
4.1 状态持久化方案
内存存储(MemorySaver)仅适合演示,生产环境建议:
python复制from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string(":memory:") # 可替换为真实数据库
4.2 审批流程增强
实际业务中需要:
- 添加审批理由字段
- 实现多级审批(经理→总监)
- 设置审批超时机制
python复制class EnhancedAgentState(AgentState):
approve_reason: Optional[str]
approver: Optional[str]
expire_time: Optional[float]
4.3 安全防护措施
关键安全实践:
- 金额查询接口添加防SQL注入
- 审批操作需要身份验证
- 操作日志审计
python复制def safe_get_order_amount(order_id: str) -> float:
if not re.match(r'^[A-Z0-9]{6}$', order_id):
raise ValueError("非法订单号格式")
# 后续数据库查询...
5. 常见问题排查
5.1 状态不更新问题
现象:调用update_state后流程不继续
检查点:
- 确认thread_id一致
- 检查checkpointer是否配置正确
- 验证返回的状态字典格式
5.2 条件路由失效
调试方法:
python复制def debug_route(state):
print("[DEBUG] Current state:", state)
if state['refund_status'] == "waiting_approval":
return "human_approval"
return END
5.3 性能优化技巧
对于高频场景:
- 使用Redis作为checkpointer
- 批量处理小额退款
- 实现异步审批通知
6. 扩展应用场景
6.1 客服工单系统
- 自动分类工单类型
- 敏感问题转人工
- 知识库自动回复
6.2 财务报销流程
- 发票自动识别
- 规则校验
- 超标审批
6.3 内容审核流水线
- 初筛违规内容
- 疑似案例人工复核
- 记录审核轨迹
这种模式的核心优势在于将确定性的规则判断与需要人类智慧的决策点明确分离,既保持自动化效率,又确保关键决策的可控性。在实际项目中,我们可以通过增加更多状态属性和条件分支,构建出非常复杂的业务工作流。