最近半年,大模型应用开发领域最火的趋势莫过于Agent架构的兴起。不同于传统单次问答的prompt工程,Agent通过任务分解、工具调用和自主决策能力,让大模型真正具备了处理复杂工作流的能力。但在实际开发中,我们发现很多团队都面临这样的困境:已有的业务逻辑代码如何快速适配Agent架构?这就是Agent转换工具的价值所在。
我最近在金融行业知识管理系统改造中,就用这类工具将传统RAG系统升级为智能问答Agent,开发效率提升了3倍以上。本文将分享从工具选型到落地实践的全套经验,包含可直接复用的代码模板和避坑指南。无论你是刚接触LLM开发的新手,还是正在寻找架构升级方案的老手,都能找到对应的解决方案。
传统大模型应用通常采用"用户输入-模型响应"的直线式交互,而Agent系统则具备三个核心特征:
python复制# 传统RAG系统典型流程
def rag_query(question):
docs = retrieve(question) # 检索文档
return generate_response(question, docs) # 生成回答
# Agent系统典型流程
class QA_Agent:
def __init__(self):
self.memory = ConversationMemory()
self.tools = [WebSearch(), Calculator()]
def run(self, task):
while not task.completed:
plan = self.analyze_task(task) # 任务分析
for step in plan:
tool = self.select_tool(step) # 工具选择
result = tool.execute(step) # 执行
self.memory.store(result) # 记忆存储
return self.compile_results() # 结果整合
优秀Agent转换工具通常提供以下关键能力:
| 功能模块 | 实现方式 | 典型代表 |
|---|---|---|
| 代码包装器 | 将现有函数转为Tool对象 | LangChain Tool装饰器 |
| 协议适配器 | 转换不同Agent框架的接口 | AutoGPT兼容层 |
| 流程分析器 | 自动识别可并行化步骤 | CrewAI任务分解器 |
| 异常处理器 | 错误重试和降级策略 | LlamaIndex回调系统 |
实践建议:选择工具时重点考察其对异步任务的支持程度,这是实现高效Agent系统的关键。我在项目中就曾因忽略这点导致工具链整体重构。
LangChain的Tool接口是目前最成熟的转换方案,其核心优势在于:
python复制from langchain.tools import tool
import requests
@tool
def get_stock_price(symbol: str) -> float:
"""查询实时股票价格,输入为股票代码"""
api_url = f"https://api.example.com/stock/{symbol}"
try:
response = requests.get(api_url, timeout=5)
return response.json()['price']
except Exception as e:
raise ValueError(f"查询失败: {str(e)}")
# 转换后的工具可直接被Agent使用
agent_tools = [get_stock_price]
实战踩坑记录:
对于已有AutoGPT插件的系统,可采用适配器模式进行转换:
python复制class LegacyPluginAdapter:
def __init__(self, plugin):
self.plugin = plugin
def __call__(self, input_str):
# 转换AutoGPT的JSON输入输出格式
try:
params = json.loads(input_str)
result = self.plugin.execute(**params)
return json.dumps({"status": "success", "data": result})
except Exception as e:
return json.dumps({"status": "error", "message": str(e)})
# 使用示例
legacy_plugin = StockAnalysisPlugin()
adapted_tool = LegacyPluginAdapter(legacy_plugin)
当需要处理复杂输入输出时,可借助Pydantic模型增强鲁棒性:
python复制from pydantic import BaseModel
from typing import List
class GeoCoordinates(BaseModel):
lat: float
lng: float
class NearbyPOIsInput(BaseModel):
location: GeoCoordinates
radius: int = 1000
types: List[str] = ["restaurant"]
@tool(args_schema=NearbyPOIsInput)
def find_nearby_pois(data: NearbyPOIsInput) -> List[dict]:
"""查找周边兴趣点"""
# 实现代码...
推荐使用Poetry管理依赖:
bash复制poetry add langchain openai pydantic requests
poetry add --group dev pytest pytest-asyncio
假设已有天气预报查询函数:
python复制def get_weather(city: str, date: str) -> dict:
# 原有实现...
改造步骤:
python复制@tool
def weather_tool(city: str, date: str = None) -> dict:
"""查询城市天气情况,支持历史数据查询
Args:
city: 城市名称(中文)
date: 日期(YYYY-MM-DD),默认为当天
Returns:
{
"temperature": 25,
"conditions": "晴",
"humidity": 0.6
}
"""
try:
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
return get_weather(city, date)
except Exception as e:
raise ValueError(f"天气查询失败: {str(e)}")
使用OpenAI Assistant API进行集成:
python复制from openai import OpenAI
client = OpenAI()
# 创建具备天气查询能力的Agent
assistant = client.beta.assistants.create(
name="天气助手",
tools=[{
"type": "function",
"function": weather_tool.schema()
}],
model="gpt-4-turbo"
)
# 测试对话
thread = client.beta.threads.create(messages=[
{"role": "user", "content": "上海明天会下雨吗?"}
])
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id
)
并行化改造示例:
python复制import asyncio
async def async_weather_tool(cities: List[str]) -> dict:
"""批量查询多城市天气"""
tasks = [asyncio.create_task(get_weather_async(city)) for city in cities]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {city: result for city, result in zip(cities, results)}
使用Redis进行结果缓存:
python复制from redis import Redis
from functools import wraps
redis = Redis(host='localhost', port=6379)
def cache_weather(func):
@wraps(func)
def wrapper(city: str, date: str):
cache_key = f"weather:{city}:{date}"
cached = redis.get(cache_key)
if cached:
return json.loads(cached)
result = func(city, date)
redis.setex(cache_key, 3600, json.dumps(result)) # 缓存1小时
return result
return wrapper
@cache_weather
@tool
def weather_tool_with_cache(city: str, date: str) -> dict:
# 实现...
推荐使用OpenTelemetry实现可观测性:
python复制from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
@tracer.start_as_current_span("weather_tool")
def weather_tool(city: str, date: str) -> dict:
# 工具实现...
症状:Agent无法识别已注册的工具
实现工具级别的访问控制:
python复制from functools import wraps
def require_role(role: str):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
if current_user.role != role:
raise PermissionError(f"需要{role}权限")
return func(*args, **kwargs)
return wrapper
return decorator
@require_role("finance")
@tool
def process_payment(amount: float) -> str:
# 支付处理逻辑...
处理包含多个选项的输入:
python复制class AnalysisConfig(BaseModel):
method: Literal["regression", "classification"]
parameters: dict
sensitivity: float = 0.5
@tool(args_schema=AnalysisConfig)
def data_analysis(config: AnalysisConfig) -> dict:
"""执行数据分析任务"""
# 实现...
实现工具链式调用:
python复制from langchain.agents import Tool
class ToolChain(Tool):
def __init__(self, tools: List[Tool]):
self.tools = {t.name: t for t in tools}
def run(self, input_str: str) -> str:
# 解析输入格式:tool1|input1 -> tool2|input2
steps = input_str.split("->")
result = None
for step in steps:
tool_name, tool_input = step.split("|")
result = self.tools[tool_name].run(tool_input)
return result
实现按需加载工具:
python复制import importlib
class DynamicToolLoader:
def __init__(self, tool_config: dict):
self.config = tool_config
def get_tool(self, name: str) -> Tool:
if name not in self.config:
raise ValueError(f"未知工具: {name}")
module_path, class_name = self.config[name].rsplit(".", 1)
module = importlib.import_module(module_path)
tool_class = getattr(module, class_name)
return tool_class()
创建自定义工具描述语言:
python复制from lark import Lark
tool_grammar = """
start: command+
command: NAME "(" args ")"
args: arg ("," arg)*
arg: NAME "=" value
value: NUMBER | STRING
"""
class ToolDSL:
def __init__(self, tools: List[Tool]):
self.parser = Lark(tool_grammar)
self.tools = {t.name: t for t in tools}
def execute(self, dsl_str: str):
tree = self.parser.parse(dsl_str)
# 解析并执行DSL指令...
在实际项目部署时,建议先用小流量验证工具转换效果。我在电商客服系统改造中,就通过A/B测试发现转换后的退货处理Agent比原系统处理速度快40%,且用户满意度提升15个百分点。