1. 课程概述:构建具备工具调用能力的AI智能体
在AI智能体开发领域,让大语言模型(LLM)具备调用外部工具的能力是突破纯文本交互限制的关键一步。本课程将带您深入实现一个生产级的工具调用系统,让您的AI助手能够:
- 读取和写入本地文件
- 执行系统命令
- 进行网络搜索
- 浏览目录结构
这个实现基于Python 3.10+和OpenAI函数调用API规范,采用了面向对象设计模式和异步编程模型,确保系统既具备良好的扩展性又能高效处理I/O密集型操作。
重要提示:本课程代码需要安装Python 3.10或更高版本,并预先配置好OpenAI API密钥。建议在虚拟环境中进行开发。
2. 工具调用系统架构设计
2.1 核心组件关系图
我们的工具调用系统由三个核心组件构成:
code复制[Tool基类] ← 继承 → [具体工具实现]
↑
| 注册
↓
[ToolRegistry] ← 使用 → [Agent]
2.2 工具调用工作流程
-
初始化阶段:
- 创建ToolRegistry实例
- 注册所有可用工具
- 初始化Agent时传入ToolRegistry
-
运行时交互:
mermaid复制sequenceDiagram participant User participant Agent participant LLM participant Tool User->>Agent: 发送请求 Agent->>LLM: 请求+工具定义 LLM->>Agent: 返回工具调用请求 Agent->>Tool: 执行工具 Tool->>Agent: 返回结果 Agent->>LLM: 发送工具结果 LLM->>Agent: 生成最终回复 Agent->>User: 返回回复
3. 基础类实现详解
3.1 Tool抽象基类
Tool是所有具体工具的基类,定义在ultrabot/tools/base.py中:
python复制from __future__ import annotations
import abc
from typing import Any
class Tool(abc.ABC):
"""所有工具的抽象基类"""
name: str = "" # 工具唯一标识符
description: str = "" # 工具功能描述
parameters: dict[str, Any] = {} # 参数JSON Schema
@abc.abstractmethod
async def execute(self, arguments: dict[str, Any]) -> str:
"""异步执行工具的核心方法"""
def to_definition(self) -> dict[str, Any]:
"""生成OpenAI兼容的工具定义"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
}
}
关键设计考虑:
- 使用
abc.ABC确保子类必须实现execute方法 - 参数使用JSON Schema规范,与OpenAI API兼容
async设计支持异步I/O操作
3.2 ToolRegistry工具注册表
python复制class ToolRegistry:
"""工具管理中心"""
def __init__(self) -> None:
self._tools: dict[str, Tool] = {}
def register(self, tool: Tool) -> None:
"""注册工具实例"""
if not tool.name:
raise ValueError("工具必须设置name属性")
self._tools[tool.name] = tool
def get(self, name: str) -> Tool | None:
"""按名称获取工具"""
return self._tools.get(name)
def get_definitions(self) -> list[dict[str, Any]]:
"""获取所有工具定义"""
return [tool.to_definition() for tool in self._tools.values()]
# 其他方法...
注册表的核心功能:
- 提供工具的单点管理
- 支持动态添加/移除工具
- 生成LLM所需的工具定义列表
4. 内置工具实现
我们实现了5个基础工具,涵盖常见操作场景。
4.1 文件操作工具
ReadFileTool - 文件读取工具
python复制class ReadFileTool(Tool):
name = "read_file"
description = "读取文件内容,可指定起始行和行数限制"
parameters = {
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"},
"offset": {"type": "integer", "description": "起始行号(1-based)"},
"limit": {"type": "integer", "description": "最大读取行数"}
},
"required": ["path"]
}
async def execute(self, arguments: dict[str, Any]) -> str:
path = Path(arguments["path"]).expanduser().resolve()
# 安全性检查
if not path.exists():
return f"错误:文件不存在: {path}"
if not path.is_file():
return f"错误:不是常规文件: {path}"
# 读取内容处理
text = path.read_text(errors="replace")
# 行切片处理
if "offset" in arguments or "limit" in arguments:
lines = text.splitlines(keepends=True)
start = max((arguments.get("offset", 1) - 1), 0)
end = start + arguments["limit"] if "limit" in arguments else len(lines)
text = "".join(lines[start:end])
return _truncate(text) # 防止内容过长
安全防护措施:
- 使用
resolve()解析绝对路径 - 检查文件存在性和类型
- 使用
errors="replace"处理编码问题 - 内容截断保护LLM上下文
WriteFileTool - 文件写入工具
python复制class WriteFileTool(Tool):
name = "write_file"
description = "写入内容到文件,自动创建父目录"
parameters = {
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"},
"content": {"type": "string", "description": "写入内容"}
},
"required": ["path", "content"]
}
async def execute(self, arguments: dict[str, Any]) -> str:
path = Path(arguments["path"]).expanduser().resolve()
path.parent.mkdir(parents=True, exist_ok=True) # 自动创建目录
try:
path.write_text(arguments["content"])
return f"成功写入 {len(arguments['content'])} 字符到 {path}"
except Exception as e:
return f"写入失败: {type(e).__name__}: {str(e)}"
关键特性:
- 自动处理路径中的
~扩展 - 递归创建父目录
- 详细的成功/错误反馈
4.2 系统操作工具
ListDirectoryTool - 目录浏览工具
python复制class ListDirectoryTool(Tool):
name = "list_directory"
description = "列出目录内容,显示名称、类型和大小"
parameters = {
"type": "object",
"properties": {
"path": {"type": "string", "description": "目录路径"}
},
"required": ["path"]
}
async def execute(self, arguments: dict[str, Any]) -> str:
dirpath = Path(arguments["path"]).expanduser().resolve()
# 验证目录
if not dirpath.exists():
return f"错误:目录不存在: {dirpath}"
if not dirpath.is_dir():
return f"错误:不是目录: {dirpath}"
# 获取并排序条目
entries = sorted(
dirpath.iterdir(),
key=lambda p: (not p.is_dir(), p.name.lower()) # 目录优先,按名称排序
)
# 构建输出
lines = [f"{dirpath} 内容 ({len(entries)} 项):", ""]
for entry in entries:
try:
st = entry.stat()
kind = "目录" if stat.S_ISDIR(st.st_mode) else "文件"
size = f"{st.st_size:,} 字节" if kind == "文件" else ""
lines.append(f" {kind:>5} {entry.name} {size}")
except OSError:
lines.append(f" ??? {entry.name}")
return "\n".join(lines)
输出优化技巧:
- 目录条目优先显示
- 统一大小写排序
- 格式化对齐输出
- 错误处理避免崩溃
ExecCommandTool - 命令执行工具
python复制class ExecCommandTool(Tool):
name = "exec_command"
description = "执行shell命令并返回输出"
parameters = {
"type": "object",
"properties": {
"command": {"type": "string", "description": "要执行的命令"},
"timeout": {"type": "integer", "description": "超时时间(秒)", "default": 60}
},
"required": ["command"]
}
async def execute(self, arguments: dict[str, Any]) -> str:
command = arguments["command"]
timeout = arguments.get("timeout", 60)
# 创建子进程
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
try:
# 带超时的等待
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=timeout)
output = stdout.decode(errors="replace") if stdout else ""
return _truncate(output) + f"\n[退出码: {proc.returncode}]"
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return f"错误:命令执行超时 ({timeout}秒)"
安全考量:
- 强制超时限制
- 错误解码处理
- 返回退出码信息
- 输出内容截断
4.3 网络工具
WebSearchTool - 网络搜索工具
python复制class WebSearchTool(Tool):
name = "web_search"
description = "使用DuckDuckGo进行网络搜索"
parameters = {
"type": "object",
"properties": {
"query": {"type": "string", "description": "搜索关键词"},
"max_results": {"type": "integer", "description": "最大结果数", "default": 5}
},
"required": ["query"]
}
async def execute(self, arguments: dict[str, Any]) -> str:
try:
from ddgs import DDGS
except ImportError:
return "错误:需要安装ddgs包,请执行: pip install ddgs"
query = arguments["query"]
max_results = arguments.get("max_results", 5)
# 在独立线程中执行同步搜索
loop = asyncio.get_running_loop()
results = await loop.run_in_executor(
None,
lambda: list(DDGS().text(query, max_results=max_results))
)
# 格式化结果
if not results:
return "未找到相关结果"
lines = []
for idx, r in enumerate(results, 1):
title = r.get("title", "")
url = r.get("href", "")
snippet = r.get("body", "")
lines.append(f"[{idx}] {title}\n URL: {url}\n {snippet}")
return "\n\n".join(lines)
实现要点:
- 使用线程池执行同步网络请求
- 结果结构化展示
- 友好的错误提示
- 可配置的结果数量
5. 工具注册与集成
5.1 注册内置工具
python复制def register_builtin_tools(registry: ToolRegistry) -> None:
"""注册所有内置工具到注册表"""
tools = [
ReadFileTool(),
WriteFileTool(),
ListDirectoryTool(),
ExecCommandTool(),
WebSearchTool()
]
for tool in tools:
registry.register(tool)
5.2 Agent集成工具调用
Agent类需要扩展以支持工具调用循环:
python复制class Agent:
def __init__(self, client: OpenAI, model: str, tool_registry: ToolRegistry = None):
self._tools = tool_registry or ToolRegistry()
# 其他初始化...
async def _execute_tool(self, tool_call: ToolCallRequest) -> str:
"""执行单个工具调用"""
tool = self._tools.get(tool_call.name)
if not tool:
return f"错误:未知工具 '{tool_call.name}'"
try:
return await tool.execute(tool_call.arguments)
except Exception as e:
return f"工具执行错误: {type(e).__name__}: {str(e)}"
def run(self, user_message: str) -> str:
"""处理用户消息的主循环"""
self._messages.append({"role": "user", "content": user_message})
for _ in range(self._max_iterations):
# 获取LLM响应
response = self._chat_stream()
# 处理工具调用
if response.tool_calls:
for call in response.tool_calls:
result = asyncio.run(self._execute_tool(call))
self._messages.append({
"role": "tool",
"content": result,
"tool_call_id": call.id
})
else:
return response.content or ""
return "达到最大迭代次数,请简化您的请求"
工具调用循环的关键点:
- 多轮迭代处理嵌套工具调用
- 严格的迭代次数限制
- 完整的消息上下文维护
- 错误处理和结果反馈
6. 实战测试与验证
6.1 测试工具定义
python复制def test_tool_definition():
"""测试工具定义生成"""
tool = ReadFileTool()
defn = tool.to_definition()
assert defn["type"] == "function"
assert defn["function"]["name"] == "read_file"
assert "parameters" in defn["function"]
assert defn["function"]["parameters"]["required"] == ["path"]
6.2 测试工具执行
python复制def test_read_file_tool(tmp_path):
"""测试文件读取工具"""
test_file = tmp_path / "test.txt"
test_file.write_text("Hello, world!")
tool = ReadFileTool()
result = asyncio.run(tool.execute({"path": str(test_file)}))
assert "Hello, world!" in result
assert "错误" not in result
6.3 端到端测试
python复制def test_agent_with_tools():
"""测试带工具的Agent完整流程"""
registry = ToolRegistry()
register_builtin_tools(registry)
agent = Agent(
client=OpenAI(),
model="gpt-4",
tool_registry=registry
)
# 测试目录列表
response = agent.run("列出当前目录内容")
assert "目录" in response or "文件" in response
# 测试文件操作
agent.run("创建test.txt文件,内容为'hello'")
response = agent.run("读取test.txt内容")
assert "hello" in response
7. 关键Python技术解析
7.1 异步编程模型
本实现大量使用Python的异步特性:
python复制import asyncio
async def example():
# 创建异步子进程
proc = await asyncio.create_subprocess_shell(
"ls -l",
stdout=asyncio.subprocess.PIPE
)
# 带超时的等待
try:
stdout, _ = await asyncio.wait_for(
proc.communicate(),
timeout=10
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
优势:
- 非阻塞I/O操作
- 高效处理并发任务
- 更好的资源利用率
7.2 路径处理最佳实践
使用pathlib进行安全的路径操作:
python复制from pathlib import Path
def safe_file_op(path: str):
# 解析路径
file_path = Path(path).expanduser().resolve()
# 安全检查
if not file_path.exists():
raise FileNotFoundError
if not file_path.is_file():
raise IsADirectoryError
# 读取内容
return file_path.read_text(errors="replace")
关键点:
expanduser()处理~扩展resolve()获取绝对路径- 显式的存在性检查
- 安全的错误处理
7.3 JSON Schema参数定义
工具参数使用标准JSON Schema定义:
python复制parameters = {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "文件路径"
},
"offset": {
"type": "integer",
"description": "起始行号",
"minimum": 1
}
},
"required": ["path"]
}
好处:
- 机器可读的接口定义
- 自动参数验证
- 与OpenAI API兼容
- 自动生成文档
8. 生产环境注意事项
8.1 安全性增强建议
-
路径安全:
python复制def validate_path(user_path: str, allowed_base: Path) -> Path: path = Path(user_path).expanduser().resolve() if not path.is_relative_to(allowed_base): raise ValueError("路径越界") return path -
命令执行限制:
python复制ALLOWED_COMMANDS = ["ls", "git", "python"] def validate_command(cmd: str) -> bool: return any(cmd.startswith(f"{a} ") for a in ALLOWED_COMMANDS) -
资源限制:
python复制# 在工具执行前检查 if get_system_load() > MAX_LOAD: raise ResourceWarning("系统负载过高")
8.2 性能优化技巧
-
并发工具执行:
python复制async def execute_multiple_tools(tool_calls): tasks = [self._execute_tool(tc) for tc in tool_calls] return await asyncio.gather(*tasks) -
结果缓存:
python复制from functools import lru_cache @lru_cache(maxsize=100) async def cached_read_file(path: str) -> str: return await ReadFileTool().execute({"path": path}) -
批量处理:
python复制class BatchReadTool(Tool): async def execute(self, arguments): paths = arguments["paths"] return "\n".join( await asyncio.gather( *(read_file(p) for p in paths) ) )
8.3 扩展性设计
-
动态工具加载:
python复制def load_tools_from_dir(dir_path: str): for file in Path(dir_path).glob("*.py"): module = import_module(f"tools.{file.stem}") for attr in dir(module): obj = getattr(module, attr) if isinstance(obj, type) and issubclass(obj, Tool) and obj != Tool: registry.register(obj()) -
工具组合:
python复制class PipelineTool(Tool): async def execute(self, arguments): step1 = await tool1.execute(arguments["step1"]) step2 = await tool2.execute({"input": step1, **arguments["step2"]}) return step2 -
权限系统集成:
python复制def check_permission(user: User, tool: Tool) -> bool: required = PERMISSIONS.get(tool.name, []) return all(p in user.permissions for p in required)
9. 常见问题解决方案
9.1 工具调用不触发
问题现象:LLM没有按预期调用工具
排查步骤:
- 检查工具定义是否完整(name、description、parameters)
- 验证工具描述是否清晰表达功能
- 测试系统提示词是否鼓励工具使用
- 检查API调用是否包含工具定义
示例修复:
python复制SYSTEM_PROMPT = """你是一个AI助手,请遵循以下规则:
1. 当任务需要文件操作、命令执行或网络访问时,优先使用工具
2. 工具列表:{tools}
3. 不要猜测无法确定的信息"""
9.2 参数解析失败
问题现象:工具收到无效参数
解决方案:
- 增强参数schema定义:
python复制parameters = { "type": "object", "properties": { "path": { "type": "string", "pattern": "^[a-zA-Z0-9_/-]+$" # 限制字符集 } }, "required": ["path"], "additionalProperties": False # 禁止额外参数 } - 添加参数预处理:
python复制async def execute(self, arguments): path = arguments["path"].strip() # 去除空白字符 if not path: raise ValueError("路径不能为空")
9.3 长时间无响应
问题现象:工具执行卡住
处理方案:
- 全局超时设置:
python复制async def run_with_timeout(coro, timeout=30): try: return await asyncio.wait_for(coro, timeout) except asyncio.TimeoutError: logger.warning("操作超时") raise - 资源监控:
python复制def monitor_resources(): while True: if psutil.cpu_percent() > 90: alert("高CPU使用率") await asyncio.sleep(5)
10. 课程总结与下一步
本课程实现了一个完整的工具调用系统,核心收获包括:
-
架构设计:
- 清晰的Tool抽象定义
- 集中式ToolRegistry管理
- 与Agent的无缝集成
-
关键技术:
- OpenAI函数调用规范
- 异步I/O编程模型
- 安全的子进程执行
- 结构化参数处理
-
生产级特性:
- 全面的错误处理
- 资源使用限制
- 输出内容控制
下一步学习方向:
- 课程4:实现记忆机制,使智能体记住对话历史
- 课程5:添加安全防护层,防止危险操作
- 课程6:构建Web界面,提供可视化交互
项目源码持续更新中,欢迎在GitHub仓库提交Issue和PR:
https://github.com/junfhu/UltrabotStepByStep