1. LangGraph工作流与智能体概述
在当今人工智能应用开发领域,LangGraph作为LangChain生态系统的重要组成部分,为开发者提供了构建复杂工作流和智能体的强大工具集。不同于传统的线性脚本执行,LangGraph允许我们以图结构的方式组织大模型调用和任务流程,这在处理需要多步骤决策、条件分支或迭代优化的场景时尤为有用。
工作流(Workflow)和智能体(Agent)是LangGraph中两种核心模式,它们分别适用于不同的应用场景:
-
工作流:适合有明确执行路径的任务,开发者可以预先定义好每个步骤的执行顺序和条件分支。典型应用包括文档处理流水线、多步骤信息提取、内容生成与优化等。
-
智能体:适合需要动态决策的场景,系统可以根据当前状态自主决定下一步行动。典型应用包括客服对话系统、复杂问题求解、自主决策任务等。
提示:选择工作流还是智能体,关键看任务是否具有确定性。如果能预先定义所有可能路径,工作流更高效;如果需要动态应对未知情况,智能体更合适。
2. 环境配置与基础准备
2.1 安装必要依赖
开始使用LangGraph前,需要安装以下Python包:
bash复制pip install langchain_core langchain-anthropic langgraph
这里我们使用Anthropic的Claude模型作为基础大模型,你也可以替换为其他支持结构化输出和工具调用的模型。
2.2 初始化大语言模型
python复制import os
import getpass
from langchain_anthropic import ChatAnthropic
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
llm = ChatAnthropic(model="claude-sonnet-4-5-20250929")
2.3 增强模型能力
LangGraph的强大之处在于可以轻松扩展大模型的基础能力:
python复制# 结构化输出增强
from pydantic import BaseModel, Field
class SearchQuery(BaseModel):
search_query: str = Field(None, description="优化后的网络搜索查询")
justification: str = Field(
None, description="说明该查询与用户请求相关的原因"
)
structured_llm = llm.with_structured_output(SearchQuery)
# 工具调用增强
def multiply(a: int, b: int) -> int:
return a * b
llm_with_tools = llm.bind_tools([multiply])
3. 核心工作流模式详解
3.1 提示词链(Prompt Chaining)
提示词链是最基本的工作流模式,将复杂任务分解为多个可验证的小步骤,每个步骤的输出作为下一步的输入。
3.1.1 Graph API实现
python复制from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
topic: str
joke: str
improved_joke: str
final_joke: str
def generate_joke(state: State):
msg = llm.invoke(f"写一个关于{state['topic']}的短笑话")
return {"joke": msg.content}
def check_punchline(state: State):
if "?" in state["joke"] or "!" in state["joke"]:
return "Pass"
return "Fail"
def improve_joke(state: State):
msg = llm.invoke(f"用文字游戏让这个笑话更有趣:{state['joke']}")
return {"improved_joke": msg.content}
def polish_joke(state: State):
msg = llm.invoke(f"给这个笑话加一个惊喜反转:{state['improved_joke']}")
return {"final_joke": msg.content}
workflow = StateGraph(State)
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("improve_joke", improve_joke)
workflow.add_node("polish_joke", polish_joke)
workflow.add_edge(START, "generate_joke")
workflow.add_conditional_edges(
"generate_joke",
check_punchline,
{"Fail": "improve_joke", "Pass": END}
)
workflow.add_edge("improve_joke", "polish_joke")
workflow.add_edge("polish_joke", END)
chain = workflow.compile()
3.1.2 Functional API实现
python复制from langgraph.func import entrypoint, task
@task
def generate_joke(topic: str):
msg = llm.invoke(f"写一个关于{topic}的短笑话")
return msg.content
def check_punchline(joke: str):
if "?" in joke or "!" in joke:
return "Pass"
return "Fail"
@task
def improve_joke(joke: str):
msg = llm.invoke(f"用文字游戏让这个笑话更有趣:{joke}")
return msg.content
@task
def polish_joke(joke: str):
msg = llm.invoke(f"给这个笑话加一个惊喜反转:{joke}")
return msg.content
@entrypoint()
def prompt_chaining_workflow(topic: str):
original_joke = generate_joke(topic).result()
if check_punchline(original_joke) == "Pass":
return original_joke
improved_joke = improve_joke(original_joke).result()
return polish_joke(improved_joke).result()
注意事项:提示词链适合线性任务,但要注意控制链的长度,过长的链可能导致错误累积和响应延迟。
3.2 并行执行(Parallel Execution)
当任务可以拆分为多个独立子任务时,并行执行可以显著提高效率。
3.2.1 Graph API实现
python复制class State(TypedDict):
topic: str
joke: str
story: str
poem: str
combined_output: str
def call_llm_1(state: State):
msg = llm.invoke(f"写一个关于{state['topic']}的笑话")
return {"joke": msg.content}
def call_llm_2(state: State):
msg = llm.invoke(f"写一个关于{state['topic']}的故事")
return {"story": msg.content}
def call_llm_3(state: State):
msg = llm.invoke(f"写一首关于{state['topic']}的诗")
return {"poem": msg.content}
def aggregator(state: State):
combined = f"关于{state['topic']}的创作:\n\n"
combined += f"故事:\n{state['story']}\n\n"
combined += f"笑话:\n{state['joke']}\n\n"
combined += f"诗:\n{state['poem']}"
return {"combined_output": combined}
parallel_builder = StateGraph(State)
parallel_builder.add_node("call_llm_1", call_llm_1)
parallel_builder.add_node("call_llm_2", call_llm_2)
parallel_builder.add_node("call_llm_3", call_llm_3)
parallel_builder.add_node("aggregator", aggregator)
parallel_builder.add_edge(START, "call_llm_1")
parallel_builder.add_edge(START, "call_llm_2")
parallel_builder.add_edge(START, "call_llm_3")
parallel_builder.add_edge("call_llm_1", "aggregator")
parallel_builder.add_edge("call_llm_2", "aggregator")
parallel_builder.add_edge("call_llm_3", "aggregator")
parallel_builder.add_edge("aggregator", END)
parallel_workflow = parallel_builder.compile()
3.2.2 Functional API实现
python复制@task
def call_llm_1(topic: str):
msg = llm.invoke(f"写一个关于{topic}的笑话")
return msg.content
@task
def call_llm_2(topic: str):
msg = llm.invoke(f"写一个关于{topic}的故事")
return msg.content
@task
def call_llm_3(topic):
msg = llm.invoke(f"写一首关于{topic}的诗")
return msg.content
@task
def aggregator(topic, joke, story, poem):
combined = f"关于{topic}的创作:\n\n"
combined += f"故事:\n{story}\n\n"
combined += f"笑话:\n{joke}\n\n"
combined += f"诗:\n{poem}"
return combined
@entrypoint()
def parallel_workflow(topic: str):
joke_fut = call_llm_1(topic)
story_fut = call_llm_2(topic)
poem_fut = call_llm_3(topic)
return aggregator(
topic,
joke_fut.result(),
story_fut.result(),
poem_fut.result()
).result()
实操心得:并行执行时要注意API的速率限制,可以适当添加延迟或使用批处理来避免触发限制。
4. 高级工作流模式
4.1 路由工作流(Routing)
路由工作流根据输入内容动态决定执行路径,适合处理多样化的请求类型。
4.1.1 Graph API实现
python复制from typing_extensions import Literal
from langchain.messages import HumanMessage, SystemMessage
class Route(BaseModel):
step: Literal["poem", "story", "joke"] = Field(
None, description="路由流程的下一步"
)
router = llm.with_structured_output(Route)
class State(TypedDict):
input: str
decision: str
output: str
def llm_call_1(state: State):
result = llm.invoke(state["input"])
return {"output": result.content}
def llm_call_2(state: State):
result = llm.invoke(state["input"])
return {"output": result.content}
def llm_call_3(state: State):
result = llm.invoke(state["input"])
return {"output": result.content}
def llm_call_router(state: State):
decision = router.invoke(
[
SystemMessage(content="根据用户请求,将输入路由到故事、笑话或诗歌。"),
HumanMessage(content=state["input"]),
]
)
return {"decision": decision.step}
def route_decision(state: State):
if state["decision"] == "story":
return "llm_call_1"
elif state["decision"] == "joke":
return "llm_call_2"
elif state["decision"] == "poem":
return "llm_call_3"
router_builder = StateGraph(State)
router_builder.add_node("llm_call_1", llm_call_1)
router_builder.add_node("llm_call_2", llm_call_2)
router_builder.add_node("llm_call_3", llm_call_3)
router_builder.add_node("llm_call_router", llm_call_router)
router_builder.add_edge(START, "llm_call_router")
router_builder.add_conditional_edges(
"llm_call_router",
route_decision,
{
"llm_call_1": "llm_call_1",
"llm_call_2": "llm_call_2",
"llm_call_3": "llm_call_3",
},
)
router_builder.add_edge("llm_call_1", END)
router_builder.add_edge("llm_call_2", END)
router_builder.add_edge("llm_call_3", END)
router_workflow = router_builder.compile()
4.2 调度器-工作器模式(Orchestrator-Worker)
这种模式将任务分解分配给多个工作器执行,适合处理大型、复杂的任务。
4.2.1 Graph API实现
python复制from typing import Annotated, List
import operator
class Section(BaseModel):
name: str = Field(description="报告章节名称")
description: str = Field(description="本章主要内容与概念的简要概述")
class Sections(BaseModel):
sections: List[Section] = Field(description="报告章节列表")
planner = llm.with_structured_output(Sections)
class State(TypedDict):
topic: str
sections: list[Section]
completed_sections: Annotated[list, operator.add]
final_report: str
class WorkerState(TypedDict):
section: Section
completed_sections: Annotated[list, operator.add]
def orchestrator(state: State):
report_sections = planner.invoke(
[
SystemMessage(content="生成一份报告大纲。"),
HumanMessage(content=f"报告主题:{state['topic']}"),
]
)
return {"sections": report_sections.sections}
def llm_call(state: WorkerState):
section = llm.invoke(
[
SystemMessage(content="按名称与描述撰写报告章节,使用Markdown格式。"),
HumanMessage(content=f"章节:{state['section'].name}\n描述:{state['section'].description}")
]
)
return {"completed_sections": [section.content]}
def synthesizer(state: State):
completed_report = "\n\n---\n\n".join(state["completed_sections"])
return {"final_report": completed_report}
def assign_workers(state: State):
return [Send("llm_call", {"section": s}) for s in state["sections"]]
orchestrator_worker_builder = StateGraph(State)
orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("llm_call", llm_call)
orchestrator_worker_builder.add_node("synthesizer", synthesizer)
orchestrator_worker_builder.add_edge(START, "orchestrator")
orchestrator_worker_builder.add_conditional_edges(
"orchestrator",
assign_workers,
["llm_call"]
)
orchestrator_worker_builder.add_edge("llm_call", "synthesizer")
orchestrator_worker_builder.add_edge("synthesizer", END)
orchestrator_worker = orchestrator_worker_builder.compile()
注意事项:调度器-工作器模式中,工作器的数量不宜过多,否则可能导致资源竞争和性能下降。建议根据任务复杂度和可用资源合理分配。
5. 智能体开发实战
智能体相比工作流更加自主,能够根据环境和任务需求动态决定行动方案。
5.1 基础智能体实现
python复制from langchain.tools import tool
from langgraph.graph import MessagesState
from langchain.messages import SystemMessage, HumanMessage, ToolMessage
@tool
def multiply(a: int, b: int) -> int:
"""乘法计算 a * b"""
return a * b
@tool
def add(a: int, b: int) -> int:
"""加法计算 a + b"""
return a + b
tools = [add, multiply]
tools_by_name = {tool.name: tool for tool in tools}
llm_with_tools = llm.bind_tools(tools)
def llm_call(state: MessagesState):
return {
"messages": [
llm_with_tools.invoke(
[SystemMessage(content="你是一个执行算术运算的助手")]
+ state["messages"]
)
]
}
def tool_node(state: dict):
result = []
for tool_call in state["messages"][-1].tool_calls:
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append(ToolMessage(content=str(observation), tool_call_id=tool_call["id"]))
return {"messages": result}
def should_continue(state: MessagesState) -> Literal["tool_node", END]:
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
return "tool_node"
return END
agent_builder = StateGraph(MessagesState)
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("tool_node", tool_node)
agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
"llm_call",
should_continue,
["tool_node", END]
)
agent_builder.add_edge("tool_node", "llm_call")
agent = agent_builder.compile()
5.2 智能体优化技巧
-
工具设计原则:
- 每个工具应专注于单一功能
- 工具描述要清晰准确
- 输入输出类型要明确定义
-
对话历史管理:
- 控制对话轮次,避免历史过长
- 定期总结对话内容
- 重要信息可以显式提醒智能体
-
错误处理机制:
- 工具调用失败时的回退策略
- 无效输入的处理方式
- 超时和重试机制
python复制# 增强版智能体示例
from typing import Optional
from datetime import datetime, timedelta
class EnhancedAgentState(TypedDict):
messages: list
last_active: datetime
retry_count: int
def enhanced_llm_call(state: EnhancedAgentState):
# 检查会话是否超时
if datetime.now() - state["last_active"] > timedelta(minutes=5):
return {"messages": [SystemMessage(content="会话已超时,请重新开始。")], "should_end": True}
response = llm_with_tools.invoke(
[SystemMessage(content="你是一个增强版算术助手")]
+ state["messages"]
)
return {
"messages": [response],
"last_active": datetime.now(),
"should_end": False
}
def enhanced_tool_node(state: EnhancedAgentState):
result = []
for tool_call in state["messages"][-1].tool_calls:
try:
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append(ToolMessage(content=str(observation), tool_call_id=tool_call["id"]))
except Exception as e:
result.append(ToolMessage(content=f"工具调用失败: {str(e)}", tool_call_id=tool_call["id"]))
return {
"messages": result,
"last_active": datetime.now(),
"retry_count": 0
}
def enhanced_should_continue(state: EnhancedAgentState) -> Literal["tool_node", "llm_call", END]:
if state.get("should_end", False):
return END
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
return "tool_node"
if "我不确定" in last_message.content or "无法" in last_message.content:
if state["retry_count"] < 2:
return "llm_call"
return END
enhanced_agent_builder = StateGraph(EnhancedAgentState)
enhanced_agent_builder.add_node("llm_call", enhanced_llm_call)
enhanced_agent_builder.add_node("tool_node", enhanced_tool_node)
enhanced_agent_builder.add_edge(START, "llm_call")
enhanced_agent_builder.add_conditional_edges(
"llm_call",
enhanced_should_continue,
{"tool_node": "tool_node", "llm_call": "llm_call", END: END}
)
enhanced_agent_builder.add_edge("tool_node", "llm_call")
enhanced_agent = enhanced_agent_builder.compile()
实操心得:智能体开发中最常遇到的问题是大模型无法正确选择工具。解决方法包括:优化工具描述、提供更多示例、在系统提示中明确指导工具选择逻辑。
6. 性能优化与调试技巧
6.1 工作流性能优化
- 缓存策略:
- 对确定性高的步骤结果进行缓存
- 使用稳定的哈希键(如输入参数的MD5)
- 设置合理的缓存过期时间
python复制from functools import lru_cache
import hashlib
def get_cache_key(*args, **kwargs):
"""生成稳定的缓存键"""
key = str(args) + str(kwargs)
return hashlib.md5(key.encode()).hexdigest()
@lru_cache(maxsize=100)
def cached_llm_call(prompt: str):
return llm.invoke(prompt)
- 批处理:
- 将多个独立请求合并为批处理
- 注意控制批次大小以避免超时
python复制def batch_llm_call(prompts: list[str]):
"""批量处理多个提示词"""
from langchain_core.runnables import RunnableLambda
def process_batch(batch):
return [llm.invoke(p) for p in batch]
batch_size = 5 # 根据API限制调整
batched = RunnableLambda(lambda x: [x[i:i+batch_size] for i in range(0, len(x), batch_size)])
return batched | RunnableLambda(process_batch)
6.2 调试技巧
-
可视化工作流:
python复制# 生成工作流图 workflow.get_graph().draw_mermaid_png() -
中间结果检查:
python复制# 在关键节点添加调试输出 def debug_node(state): print(f"当前状态: {state}") return state # 插入调试节点 workflow.add_node("debug", debug_node) workflow.add_edge("some_node", "debug") workflow.add_edge("debug", "next_node") -
错误处理与重试:
python复制from tenacity import retry, stop_after_attempt, wait_exponential @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) def reliable_llm_call(prompt): try: return llm.invoke(prompt) except Exception as e: print(f"调用失败: {str(e)}") raise
7. 部署与生产化考虑
7.1 持久化工作流状态
对于长时间运行的工作流,状态持久化至关重要:
python复制import json
from datetime import datetime
from typing import Optional
class WorkflowState(TypedDict):
state: dict
created_at: datetime
updated_at: datetime
status: Literal["running", "completed", "failed"]
error: Optional[str]
class WorkflowPersistence:
def __init__(self, db_path="workflow_states.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
import sqlite3
self.conn = sqlite3.connect(self.db_path)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS workflow_states (
id TEXT PRIMARY KEY,
state TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
status TEXT NOT NULL,
error TEXT
)
""")
def save_state(self, workflow_id: str, state: WorkflowState):
self.conn.execute(
"INSERT OR REPLACE INTO workflow_states VALUES (?, ?, ?, ?, ?, ?)",
(
workflow_id,
json.dumps(state["state"]),
state["created_at"].isoformat(),
state["updated_at"].isoformat(),
state["status"],
state.get("error")
)
)
self.conn.commit()
def load_state(self, workflow_id: str) -> Optional[WorkflowState]:
cur = self.conn.execute(
"SELECT state, created_at, updated_at, status, error FROM workflow_states WHERE id = ?",
(workflow_id,)
)
row = cur.fetchone()
if not row:
return None
return {
"state": json.loads(row[0]),
"created_at": datetime.fromisoformat(row[1]),
"updated_at": datetime.fromisoformat(row[2]),
"status": row[3],
"error": row[4]
}
7.2 监控与日志
完善的监控系统应包含:
-
性能指标:
- 每个节点的执行时间
- 大模型调用的token使用量
- 错误率和重试次数
-
业务指标:
- 工作流完成率
- 关键节点的成功率
- 用户满意度指标
python复制from prometheus_client import Counter, Histogram, start_http_server
# 定义监控指标
WORKFLOW_STARTED = Counter('workflow_started', 'Number of workflows started')
WORKFLOW_COMPLETED = Counter('workflow_completed', 'Number of workflows completed')
WORKFLOW_FAILED = Counter('workflow_failed', 'Number of workflows failed')
NODE_EXECUTION_TIME = Histogram('node_execution_time', 'Time spent in node execution')
def monitored_node(func):
"""监控装饰器"""
def wrapper(state):
WORKFLOW_STARTED.inc()
start_time = time.time()
try:
result = func(state)
WORKFLOW_COMPLETED.inc()
return result
except Exception as e:
WORKFLOW_FAILED.inc()
raise
finally:
NODE_EXECUTION_TIME.observe(time.time() - start_time)
return wrapper
# 启动监控服务器
start_http_server(8000)
8. 实际应用案例
8.1 客服工单处理系统
python复制from enum import Enum
class TicketType(str, Enum):
BILLING = "billing"
TECHNICAL = "technical"
GENERAL = "general"
class Ticket(TypedDict):
id: str
customer_id: str
type: TicketType
description: str
status: Literal["open", "in_progress", "resolved"]
created_at: datetime
resolved_at: Optional[datetime]
class TicketState(TypedDict):
ticket: Ticket
conversation: list[dict]
current_step: str
required_info: dict
def classify_ticket(state: TicketState):
"""工单分类"""
classification = llm.with_structured_output(TicketType).invoke(
f"分类工单:{state['ticket']['description']}"
)
state["ticket"]["type"] = classification
return state
def gather_billing_info(state: TicketState):
"""收集账单信息"""
if state["ticket"]["type"] != TicketType.BILLING:
return state
questions = [
"请提供账单日期",
"请描述具体问题",
"您的付款方式是什么?"
]
for q in questions:
answer = input(q + ": ")
state["required_info"][q] = answer
return state
def resolve_technical_ticket(state: TicketState):
"""解决技术问题"""
if state["ticket"]["type"] != TicketType.TECHNICAL:
return state
solution = llm.invoke(
f"技术问题:{state['ticket']['description']}\n"
f"已尝试步骤:{state.get('attempted_steps', [])}\n"
"请提供解决方案:"
)
state["conversation"].append({
"role": "system",
"content": solution.content
})
confirmation = input("问题是否解决?(y/n): ")
if confirmation.lower() == "y":
state["ticket"]["status"] = "resolved"
state["ticket"]["resolved_at"] = datetime.now()
return state
ticket_workflow = StateGraph(TicketState)
ticket_workflow.add_node("classify", classify_ticket)
ticket_workflow.add_node("billing", gather_billing_info)
ticket_workflow.add_node("technical", resolve_technical_ticket)
ticket_workflow.add_edge(START, "classify")
ticket_workflow.add_conditional_edges(
"classify",
lambda s: s["ticket"]["type"].value,
{
TicketType.BILLING: "billing",
TicketType.TECHNICAL: "technical",
TicketType.GENERAL: END
}
)
ticket_workflow.add_edge("billing", END)
ticket_workflow.add_edge("technical", END)
ticket_system = ticket_workflow.compile()
8.2 内容审核流水线
python复制class Content(TypedDict):
id: str
text: str
author: str
created_at: datetime
flags: list[str]
class ModerationResult(TypedDict):
content: Content
decision: Literal["approve", "reject", "human_review"]
reasons: list[str]
modified_content: Optional[str]
def detect_sensitive_content(state: ModerationResult):
"""检测敏感内容"""
sensitive_topics = ["暴力", "仇恨言论", "成人内容"]
detected = []
for topic in sensitive_topics:
if topic in state["content"]["text"]:
detected.append(topic)
if detected:
state["decision"] = "reject"
state["reasons"] = detected
return state
def check_grammar(state: ModerationResult):
"""语法检查"""
if state["decision"] == "reject":
return state
response = llm.invoke(
f"检查以下内容的语法错误:{state['content']['text']}\n"
"列出所有错误并提供修正建议。"
)
errors = response.content.split("\n")
if len(errors) > 2: # 假设响应包含标题行
state["decision"] = "human_review"
state["reasons"].extend(["语法问题"] + errors)
return state
def suggest_improvements(state: ModerationResult):
"""内容改进建议"""
if state["decision"] != "human_review":
return state
response = llm.invoke(
f"原始内容:{state['content']['text']}\n"
"问题:" + "\n".join(state["reasons"]) + "\n"
"请提供改进版本:"
)
state["modified_content"] = response.content
return state
moderation_workflow = StateGraph(ModerationResult)
moderation_workflow.add_node("sensitive", detect_sensitive_content)
moderation_workflow.add_node("grammar", check_grammar)
moderation_workflow.add_node("improve", suggest_improvements)
moderation_workflow.add_edge(START, "sensitive")
moderation_workflow.add_edge("sensitive", "grammar")
moderation_workflow.add_edge("grammar", "improve")
moderation_workflow.add_edge("improve", END)
content_moderator = moderation_workflow.compile()
9. 常见问题与解决方案
9.1 工作流卡住或无限循环
症状:工作流在某个节点反复执行,无法前进。
解决方案:
- 检查条件边的逻辑是否正确
- 添加最大重试次数限制
- 实现超时机制
python复制from datetime import datetime, timedelta
class TimeoutState(TypedDict):
# 原有状态字段
start_time: datetime
max_duration: timedelta
def check_timeout(state: TimeoutState):
if datetime.now() - state["start_time"] > state["max_duration"]:
raise TimeoutError("工作流执行超时")
return state
# 在关键节点前插入超时检查
workflow.add_node("timeout_check", check_timeout)
workflow.add_edge("some_node", "timeout_check")
workflow.add_edge("timeout_check", "next_node")
9.2 大模型响应不一致
症状:相同输入得到不同输出,导致工作流行为不稳定。
解决方案:
- 调整temperature参数降低随机性
- 使用结构化输出约束响应格式
- 添加后处理验证步骤
python复制consistent_llm = llm.with_temperature(0.3).with_structured_output(YourSchema)
def validate_output(state):
"""验证大模型输出是否符合预期"""
if not is_valid(state["output"]):
state["output"] = fallback_value
return state
9.3 工具调用失败
症状:智能体无法正确调用工具或处理工具响应。
解决方案:
- 增强工具描述和参数说明
- 添加工具调用示例
- 实现工具调用重试和回退
python复制@tool
def reliable_tool(
param1: str = Field(..., description="参数1说明,示例:'example_value'"),
param2: int = Field(..., description="参数2说明,示例:42")
):
"""工具功能的详细描述,包含使用示例:
- 示例1:用于场景A
- 示例2:用于场景B
"""
try:
# 工具实现
return result
except Exception as e:
return f"工具错误:{str(e)}"
10. 最佳实践总结
-
模块化设计:
- 将复杂工作流分解为可重用的子工作流
- 每个节点保持单一职责
- 明确定义节点接口和状态结构
-
渐进式复杂化:
- 从简单线性工作流开始
- 逐步添加条件分支和并行处理
- 最后引入动态决策和智能体
-
全面测试:
- 单元测试每个节点
- 集成测试完整工作流
- 压力测试关键路径
-
监控与维护:
- 记录工作流执行历史
- 监控性能指标
- 定期审查和优化
-
文档与示例:
- 为每个工作流编写使用说明
- 提供典型输入输出示例
- 记录常见问题和解决方法
python复制def document_workflow(workflow):
"""自动生成工作流文档"""
doc = f"# {workflow.name}工作流文档\n\n"
doc += "## 节点列表\n"
for node in workflow.nodes:
doc += f"- {node.name}: {node.description}\n"
doc += "\n## 典型流程\n"
doc += "```mermaid\n"
doc += workflow.get_graph().draw_mermaid()
doc += "```\n"
doc += "\n## 示例输入\n```json\n"
doc += json.dumps(workflow.example_input, indent=2)
doc += "\n```\n"
return doc
在实际项目中,我们团队发现LangGraph最适合中等复杂度的业务流程自动化。对于非常简单的任务,直接脚本可能更高效;对于极端复杂的动态场景,可能需要定制开发更专门的解决方案。但在这两者之间,LangGraph提供了完美的平衡点——足够的灵活性来处理现实世界的不确定性,同时又保持了良好的结构和可维护性。