在当今AI技术快速发展的背景下,传统的聊天机器人已经无法满足复杂场景需求。一个真正的AI Agent应该具备四大核心能力:大语言模型(LLM)的推理能力、任务规划能力、记忆存储能力和工具调用能力。这就像组建一个高效的工作团队——LLM是团队中的"大脑",负责思考和决策;规划模块是"项目经理",负责拆解和安排任务;记忆系统是"档案管理员",负责记录和检索信息;工具调用则是"执行小组",负责具体操作实施。
我最近在实际项目中构建了一个典型的企业级AI Agent案例,它能够处理公司内部数据查询和精确计算两类任务。这个Agent的核心价值在于:当用户询问"公司计划预算增加46%后是多少"这类复合问题时,它能自动分解任务,先查询原始预算数据,再进行数学计算,最后整合输出结果。整个过程完全自动化,不需要人工干预。
计算器工具看似简单,实则暗藏风险。原始实现直接使用Python的eval()函数,这相当于给系统开了一个后门。攻击者可能通过精心构造的输入执行任意代码,比如:
python复制"__import__('os').system('rm -rf /')" # 危险示例!
经过多次实践验证,我总结出三种安全加固方案:
python复制import re
def safe_calculator(expr: str) -> str:
if not re.match(r'^[\d\+\-\*\/\s\.\(\)]*$', expr):
return "错误:包含非法字符"
try:
return str(eval(expr))
except:
return "计算错误"
python复制import ast
def ast_calculator(expr: str) -> str:
try:
node = ast.parse(expr, mode='eval')
for n in ast.walk(node):
if not isinstance(n, (ast.Expression, ast.Num, ast.BinOp, ast.UnaryOp)):
raise ValueError("非法语法结构")
return str(eval(expr))
except:
return "计算错误"
python复制from pyparsing import Word, nums, oneOf, ParseException
def parser_calculator(expr: str) -> str:
integer = Word(nums)
operand = integer
operator = oneOf("+ - * /")
expr_stack = operand + (operator + operand)[...]
try:
expr_stack.parseString(expr, parseAll=True)
return str(eval(expr))
except ParseException:
return "表达式不合法"
重要提示:在真实商业环境中,建议使用成熟的数学表达式解析库如
numexpr或sympy,它们经过严格安全测试,能有效防范代码注入攻击。
原始示例中的RAG实现存在几个典型问题:
经过多次迭代测试,我的优化方案如下:
python复制from langchain_text_splitters import MarkdownHeaderTextSplitter
headers = [
("#", "项目标题"),
("##", "子标题"),
("###", "三级标题"),
]
def enhanced_rag_search(query: str) -> str:
raw_text = """...""" # 同原始文档
# 改进的分块策略
splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=headers,
chunk_size=300, # 更合理的上下文窗口
chunk_overlap=50,
)
# 多嵌入模型融合
embeddings = [
DashScopeEmbeddings(model="text-embedding-v1"),
# HuggingFaceEmbeddings(model_name="BAAI/bge-small-zh"),
]
# 混合检索策略
docs = splitter.split_text(raw_text)
vector_stores = [
FAISS.from_documents(docs, emb) for emb in embeddings
]
# 结果融合算法
results = []
for vs in vector_stores:
docs = vs.similarity_search(query, k=2)
results.extend(doc.page_content for doc in docs)
return "\n\n---\n\n".join(results)
关键优化点说明:
LangChain的工具绑定实际上是在LLM的prompt中注入工具描述信息。通过分析源码,我们发现关键步骤:
python复制def get_tool_description(tool):
return {
"name": tool.name,
"description": tool.description,
"parameters": tool.args
}
python复制TOOL_PROMPT = '''你可以使用以下工具:
{tools}
请严格按此格式响应:
Action: 工具名
Action Input: 工具参数
'''
python复制def parse_response(response):
if "Action:" in response:
parts = response.split("Action Input:")
return {
"tool": parts[0].replace("Action:", "").strip(),
"input": parts[1].strip()
}
在实际项目中,我总结出几个关键经验:
原始示例中的对话循环存在以下问题:
改进后的对话管理器实现:
python复制class DialogueManager:
def __init__(self, llm, tools, max_turns=5):
self.llm = llm.bind_tools(tools)
self.tools = {t.name: t for t in tools}
self.max_turns = max_turns
self.token_count = 0
self.history = []
def run_query(self, query):
messages = [HumanMessage(content=query)]
for turn in range(self.max_turns):
try:
response = self.llm.invoke(messages)
self.token_count += len(response.content)
if not response.tool_calls:
return self._format_result(response.content)
tool_results = []
for call in response.tool_calls:
result = self._execute_tool(call)
tool_results.append(result)
messages.extend(tool_results)
except Exception as e:
return f"系统错误: {str(e)}"
return "超过最大对话轮次"
def _execute_tool(self, tool_call):
tool = self.tools.get(tool_call["name"])
if not tool:
return ToolMessage(
content=f"工具{tool_call['name']}不存在",
tool_call_id=tool_call["id"]
)
try:
output = tool.invoke(tool_call["args"])
return ToolMessage(
content=output,
tool_call_id=tool_call["id"],
name=tool_call["name"]
)
except Exception as e:
return ToolMessage(
content=f"工具执行错误: {str(e)}",
tool_call_id=tool_call["id"]
)
def _format_result(self, content):
self.history.append(content)
return f"最终结果:\n{content}\n\n(本次消耗token: {self.token_count})"
核心改进:
在实际压力测试中,我们发现三个性能瓶颈:
对应的优化措施:
嵌入模型优化
python复制# 使用量化模型
embeddings = DashScopeEmbeddings(
model="text-embedding-v1",
model_kwargs={"quantization": "int8"}
)
# 启用缓存
from langchain.cache import SQLiteCache
import langchain
langchain.llm_cache = SQLiteCache(database=".cache.db")
向量检索优化
python复制# 启用HNSW索引
FAISS.from_documents(
documents,
embeddings,
index_factory="HNSW32" # 平衡速度与精度
)
# 定期压缩索引
faiss_index.optimize()
工具调用优化
python复制# 异步并行执行
import asyncio
async def parallel_tool_execution(tool_calls):
tasks = []
for call in tool_calls:
tool = self.tools[call["name"]]
tasks.append(
asyncio.create_task(
tool.ainvoke(call["args"])
)
)
return await asyncio.gather(*tasks)
构建五层防御体系:
python复制def sanitize_input(text: str) -> str:
# 移除特殊字符
cleaned = re.sub(r'[^\w\s\u4e00-\u9fa5,.?!]', '', text)
# 限制长度
return cleaned[:500]
python复制TOOL_PERMISSIONS = {
"calculator": ["finance"],
"rag_search": ["hr", "management"]
}
def check_permission(user_role, tool_name):
return user_role in TOOL_PERMISSIONS.get(tool_name, [])
python复制from fastapi import Request, HTTPException
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@limiter.limit("10/minute")
async def api_endpoint(request: Request):
...
python复制import logging
from datetime import datetime
audit_log = logging.getLogger("audit")
def log_operation(user, action, details):
audit_log.info(
f"{datetime.now()} | {user} | {action} | {details}"
)
python复制def filter_output(text: str) -> str:
sensitive_terms = ["机密", "密码", "密钥"]
for term in sensitive_terms:
text = text.replace(term, "***")
return text
问题现象:LLM没有按预期调用工具
排查步骤:
常见原因:
问题现象:对话轮次增加后响应质量下降
优化方案:
python复制from langchain.chains.summarize import load_summarize_chain
def summarize_history(history):
chain = load_summarize_chain(llm, "map_reduce")
return chain.run(history)
python复制from sklearn.feature_extraction.text import TfidfVectorizer
def check_relevance(query, history, threshold=0.3):
vectorizer = TfidfVectorizer()
tfidf = vectorizer.fit_transform([query] + history)
similarity = (tfidf[0] * tfidf[1:].T).A[0]
return any(sim > threshold for sim in similarity)
python复制from transformers import pipeline
topic_detector = pipeline(
"text-classification",
model="bert-base-chinese"
)
def detect_topic_shift(new_input, last_response):
result = topic_detector(f"{last_response} [SEP] {new_input}")
return result[0]["label"] == "different"
诊断工具:
python复制import cProfile
import pstats
def profile_agent(query):
profiler = cProfile.Profile()
profiler.enable()
result = run_agent(query)
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats("cumtime").print_stats(10)
return result
典型优化点:
将Agent与客服系统集成:
python复制class CustomerServiceAgent:
def __init__(self):
self.tools = [
product_search_tool,
order_status_tool,
refund_calculator_tool
]
self.llm = ChatOpenAI(model="gpt-4-turbo")
def handle_ticket(self, ticket):
# 自动分类
category = self._classify_ticket(ticket)
# 路由到专业工具
if category == "refund":
return self._handle_refund(ticket)
# ...
def _classify_ticket(self, text):
classifier = pipeline("text-classification")
return classifier(text)[0]["label"]
实现自然语言查询数据库:
python复制@tool
def sql_query(nl_query: str) -> str:
"""
将自然语言转换为SQL并执行
参数:
nl_query: 自然语言查询,如"上月销售额最高的产品"
返回:
JSON格式的查询结果
"""
# 自然语言转SQL
sql = llm.invoke(f"将以下转换为SQL:\n{nl_query}")
# 执行查询
return execute_sql(sql)
会议纪要自动化案例:
python复制from langchain_core.tools import StructuredTool
def create_meeting_minutes(
audio_path: str,
attendees: list[str],
agenda: str
) -> str:
# 语音转文字
transcript = transcribe(audio_path)
# 提取关键点
summary = summarize(transcript)
# 生成正式纪要
return format_minutes(summary, attendees, agenda)
meeting_tool = StructuredTool.from_function(
func=create_meeting_minutes,
name="meeting_minutes",
description="生成标准会议纪要"
)
在实际部署这类Agent系统时,我发现最关键的是要建立完善的监控体系。我通常会部署以下监控指标:
这些指标通过Grafana仪表板可视化,当任何指标超过阈值时触发告警。例如,当工具调用失败率连续5分钟超过5%,就会通知开发团队检查工具服务状态。