1. 项目概述:从零构建ReAct Agent的核心架构
在当今AI应用开发领域,ReAct(Reasoning + Acting)模式已经成为构建智能Agent的主流范式。与传统的单次问答不同,ReAct通过"思考-行动-观察"的循环机制,使AI系统能够像人类一样逐步解决问题。这种模式特别适合需要多步骤推理、工具调用和实时数据获取的复杂场景。
本项目将完全从零开始实现一个功能完整的ReAct Agent系统,不使用LangChain等现成框架,而是基于原生Python和OpenAI API构建。这种底层实现方式能让我们更深入理解Agent的工作原理,为后续定制开发打下坚实基础。
整个系统由三个核心模块组成:
- Agent核心框架:负责与大模型交互,管理对话历史
- 工具函数集合:提供具体功能实现(如计算器、数据查询)
- ReAct执行引擎:驱动"思考-行动-观察"的循环流程
这种架构设计具有以下优势:
- 轻量级:不依赖复杂框架,核心代码不到300行
- 易扩展:工具系统采用插件式设计,新增功能只需添加工具函数
- 可调试:每个推理步骤都清晰可见,便于问题排查
- 低成本:基于开源模型API,无需昂贵基础设施
2. 核心模块实现详解
2.1 Agent核心框架设计
Agent类是整个系统的大脑,负责与大模型API交互并维护对话上下文。其核心实现如下:
python复制# agent.py
import os
import dotenv
from openai import OpenAI
# 环境变量配置
dotenv.load_dotenv()
client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_API_BASE")
)
DEFAULT_MODEL = os.getenv("AI_MODEL", "deepseek-chat")
class Agent:
"""Agent推理框架"""
def __init__(self, system_prompt="", max_history=10):
self._messages = []
self.max_history = max_history
if system_prompt:
self._messages.append({"role": "system", "content": system_prompt})
def invoke(self, query: str) -> str:
"""调用Agent进行推理"""
self._messages.append({"role": "user", "content": query})
result = self.exec()
self._messages.append({"role": "assistant", "content": result})
# 历史消息压缩
if len(self._messages) > self.max_history:
self._messages = self._messages[:1] + self._messages[-self.max_history:]
return result
def exec(self) -> str:
"""执行推理,返回结果"""
completion = client.chat.completions.create(
model=DEFAULT_MODEL,
messages=self._messages,
temperature=0 # 确定性输出
)
return completion.choices[0].message.content
关键技术解析:
-
temperature参数控制:
- 设置为0确保输出确定性,这对工具调用至关重要
- 不同场景下的推荐值:
python复制# 推理任务(工具调用、逻辑推理) temperature = 0 # 创意任务(故事写作、头脑风暴) temperature = 0.7 - 1.0 # 平衡任务(需要一定创造性但也要准确) temperature = 0.3 - 0.5
-
对话历史管理:
- 采用滑动窗口机制控制历史消息长度
- 保留system消息(角色定义)和最近的N条对话
- 避免上下文过长导致的API成本增加和性能下降
-
消息格式规范:
- 严格遵循OpenAI的消息角色系统(system/user/assistant)
- 确保模型能正确理解上下文关系
2.2 工具系统实现
工具是Agent能力的延伸,每个工具都是一个独立的Python函数:
python复制# tools.py
import ast
from functools import lru_cache
@lru_cache(maxsize=100) # 添加缓存提高性能
def calculate(expression: str) -> str:
"""安全计算数学表达式
使用ast.literal_eval替代eval防止代码注入
"""
try:
node = ast.parse(expression, mode='eval')
if not all(isinstance(n, ast.NameConstant) or
isinstance(n, ast.Num) or
isinstance(n, ast.BinOp) for n in ast.walk(node)):
raise ValueError("Unsafe expression")
return str(eval(expression))
except Exception as e:
return f"计算错误: {str(e)}"
def ask_fruit_unit_price(fruit: str) -> str:
"""查询水果单价(模拟数据库查询)"""
price_map = {
"apple": 10,
"banana": 6,
"orange": 8,
"pear": 7
}
fruit = fruit.lower()
if fruit in price_map:
return f"{fruit.capitalize()}单价是 {price_map[fruit]}元/公斤"
return f"未找到{fruit}的价格信息"
工具设计最佳实践:
-
输入验证:
- 对传入参数进行类型和内容检查
- 使用安全函数(如ast.literal_eval)处理用户输入
-
错误处理:
- 捕获所有可能的异常
- 返回友好的错误信息供Agent理解
-
性能优化:
- 对频繁调用的工具添加缓存(如lru_cache)
- 避免重复计算和冗余操作
-
文档规范:
- 每个工具都应有清晰的docstring
- 说明参数、返回值和可能出现的错误
3. ReAct引擎实现
3.1 核心执行流程
ReAct引擎是系统的协调中心,驱动整个推理循环:
python复制# main.py
import re
from agent import Agent
from tools import calculate, ask_fruit_unit_price
# 工具注册表
known_tools = {
"calculate": calculate,
"ask_fruit_unit_price": ask_fruit_unit_price
}
# Action指令正则匹配(增强容错性)
action_re = re.compile(r'^Action:\s*(\w+)\s*:\s*(.*)$')
def react(query: str, max_turns: int = 5) -> str:
"""ReAct执行引擎
Args:
query: 用户问题
max_turns: 最大推理轮次
Returns:
最终答案或错误信息
"""
agent = Agent(system_prompt=build_react_prompt())
history = []
current_question = query
for turn in range(max_turns):
# 获取Agent输出
result = agent.invoke(current_question)
history.append(f"Turn {turn + 1}:\nQuestion: {current_question}\nResponse: {result}")
# 解析Action指令
action_match = None
for line in result.split('\n'):
action_match = action_re.match(line.strip())
if action_match:
break
if not action_match:
return result # 没有Action,直接返回答案
# 执行工具调用
tool_name, params = action_match.groups()
if tool_name not in known_tools:
return f"未知工具: {tool_name}"
try:
observation = known_tools[tool_name](params)
history.append(f"Tool: {tool_name}\nParams: {params}\nObservation: {observation}")
current_question = f"Observation: {observation}"
except Exception as e:
return f"工具执行错误: {str(e)}"
return f"达到最大推理轮次({max_turns})仍未得到答案"
3.2 Prompt工程实践
Prompt质量直接决定Agent的表现。以下是经过优化的Prompt模板:
python复制def build_react_prompt() -> str:
"""构建ReAct提示词模板"""
return """你运行在Thought(思考)、Action(行动)、PAUSE(暂停)、Observation(观察)的循环中。
循环结束时你将输出最终Answer(答案)。
可用工具:
1. calculate: 执行数学计算
示例:
- Action: calculate: 3 * 5 + 2
- 返回: 计算结果
2. ask_fruit_unit_price: 查询水果单价
示例:
- Action: ask_fruit_unit_price: apple
- 返回: 水果单价信息
执行规则:
1. 每个Action后必须跟PAUSE
2. 观察结果后会继续给你上下文
3. 最终必须输出Answer
示例会话1:
Question: 苹果和香蕉哪个更便宜?
Thought: 我需要分别查询苹果和香蕉的价格
Action: ask_fruit_unit_price: apple
PAUSE
Observation: Apple单价是 10元/公斤
Thought: 现在查询香蕉价格
Action: ask_fruit_unit_price: banana
PAUSE
Observation: Banana单价是 6元/公斤
Thought: 比较两个价格
Answer: 香蕉(6元)比苹果(10元)便宜
示例会话2:
Question: 计算(3 + 5) * 2的值
Thought: 需要计算这个表达式
Action: calculate: (3 + 5) * 2
PAUSE
Observation: 16
Answer: (3 + 5) * 2 = 16
重要提示:
- 工具名称必须完全匹配
- 参数不要包含多余符号
- 遇到错误时尝试修正而非放弃
- 最终答案要简洁完整"""
Prompt设计要点:
- 结构化流程:明确Thought→Action→PAUSE→Observation的循环
- 工具文档:每个工具都有名称、示例和返回说明
- Few-shot示例:提供多个完整会话示例
- 错误预防:强调工具调用的准确性要求
- 输出规范:规定最终答案的格式标准
4. 高级功能扩展
4.1 多工具并行执行
对于独立的任务,可以并行执行工具调用提高效率:
python复制from concurrent.futures import ThreadPoolExecutor
def parallel_tool_calls(tasks: list) -> dict:
"""并行执行多个工具调用
Args:
tasks: [(tool_name, params), ...]
Returns:
{tool_name: result, ...}
"""
results = {}
with ThreadPoolExecutor() as executor:
futures = {
executor.submit(known_tools[tool], param): tool
for tool, param in tasks
}
for future in concurrent.futures.as_completed(futures):
tool = futures[future]
try:
results[tool] = future.result()
except Exception as e:
results[tool] = f"{tool}执行错误: {str(e)}"
return results
4.2 长期记忆机制
通过向量数据库实现跨会话记忆:
python复制from sentence_transformers import SentenceTransformer
import numpy as np
class MemorySystem:
def __init__(self):
self.encoder = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
self.memories = []
def add_memory(self, text: str):
"""添加记忆"""
embedding = self.encoder.encode(text)
self.memories.append((text, embedding))
def recall(self, query: str, top_k=3) -> list:
"""回忆相关记忆"""
query_embed = self.encoder.encode(query)
similarities = [
(text, np.dot(query_embed, embed))
for text, embed in self.memories
]
similarities.sort(key=lambda x: x[1], reverse=True)
return [item[0] for item in similarities[:top_k]]
4.3 自动化测试框架
确保Agent行为的稳定性:
python复制import unittest
class TestReActAgent(unittest.TestCase):
def setUp(self):
self.agent = Agent(system_prompt=build_react_prompt())
def test_calculation(self):
result = react("计算(3 + 5) * 2的值")
self.assertIn("16", result)
def test_price_comparison(self):
result = react("苹果和香蕉哪个更贵?")
self.assertIn("苹果", result)
self.assertIn("香蕉", result)
def test_error_handling(self):
result = react("计算1/0")
self.assertIn("错误", result.lower())
if __name__ == '__main__':
unittest.main()
5. 生产环境部署建议
5.1 性能优化方案
-
工具调用缓存:
python复制from functools import lru_cache @lru_cache(maxsize=1000) def cached_calculate(expr: str) -> str: return calculate(expr) -
异步执行:
python复制import asyncio async def async_react(query: str): agent = Agent() result = await asyncio.to_thread(agent.invoke, query) # 处理结果... -
批量处理:
python复制def batch_react(queries: list): with ThreadPoolExecutor() as executor: return list(executor.map(react, queries))
5.2 监控与日志
-
调用日志记录:
python复制import logging logging.basicConfig(filename='agent.log', level=logging.INFO) def log_invocation(query, result): logging.info(f"Input: {query}\nOutput: {result}\n{'='*50}") -
性能指标收集:
python复制from time import perf_counter def timed_invoke(agent, query): start = perf_counter() result = agent.invoke(query) elapsed = perf_counter() - start return result, elapsed -
异常监控:
python复制import sentry_sdk sentry_sdk.init("your_dsn") try: react(query) except Exception as e: sentry_sdk.capture_exception(e)
6. 典型应用场景
6.1 电商价格比较Agent
python复制def compare_prices(products: list) -> str:
"""比较多个商品价格"""
tasks = [("ask_fruit_unit_price", p) for p in products]
prices = parallel_tool_calls(tasks)
valid_prices = {}
for product, result in prices.items():
if "元" in result:
price = float(result.split("元")[0].split()[-1])
valid_prices[product] = price
if not valid_prices:
return "无法获取有效的价格信息"
sorted_prices = sorted(valid_prices.items(), key=lambda x: x[1])
report = "\n".join(f"{p}: {price}元/公斤" for p, price in sorted_prices)
cheapest = sorted_prices[0][0]
return f"价格比较结果:\n{report}\n最便宜的是: {cheapest}"
6.2 数学解题Agent
python复制def solve_math_problem(problem: str) -> str:
"""解决数学应用题"""
react_prompt = """你是一个数学解题助手。按以下步骤工作:
1. 分析题目中的已知条件和要求
2. 列出需要求解的中间步骤
3. 对每个步骤调用calculate工具
4. 综合所有结果给出最终答案"""
agent = Agent(system_prompt=react_prompt)
return agent.invoke(problem)
6.3 智能客服Agent
python复制def customer_service(query: str) -> str:
"""智能客服场景"""
knowledge_base = {
"退货政策": "7天无理由退货",
"运费说明": "满99元包邮",
"支付方式": "支持支付宝、微信、银行卡"
}
tools = {
"query_policy": lambda x: knowledge_base.get(x, "未找到相关信息"),
"search_order": lambda x: f"订单{x}状态: 已发货"
}
prompt = """你是客服助手,可以:
- query_policy: 查询政策条款
- search_order: 查询订单状态"""
return react(query, tools=tools, prompt=prompt)
7. 常见问题解决方案
7.1 Agent输出不符合预期
问题现象:
- 工具名称拼写错误
- 参数格式不规范
- 缺少PAUSE指令
解决方案:
- 增强Prompt中的示例数量和质量
- 添加输出格式校验:
python复制def validate_action(text: str) -> bool: patterns = [ r'^Action:\s*\w+\s*:\s*.+$', r'^PAUSE$' ] return any(re.match(p, text.strip()) for p in patterns) - 实现自动修正机制:
python复制def fix_action(text: str) -> str: text = text.replace(" ", "").lower() if "action" not in text: text = "Action:" + text if ":" not in text: text = text.replace("action", "Action:") return text
7.2 工具调用性能瓶颈
优化方案:
- 异步调用:
python复制async def async_tool_call(tool, param): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, known_tools[tool], param) - 请求批处理:
python复制def batch_tools(calls: list): return [known_tools[t](p) for t, p in calls] - 结果缓存:
python复制from diskcache import Cache cache = Cache('tool_cache') @cache.memoize() def cached_tool(tool, param): return known_tools[tool](param)
7.3 复杂任务规划困难
增强方案:
- 分层任务分解:
python复制def hierarchical_react(query, depth=3): if depth == 0: return "达到最大递归深度" sub_tasks = agent.invoke(f"将任务分解为子步骤:\n{query}") results = [] for task in sub_tasks.split("\n"): if task.strip(): results.append(react(task, depth-1)) return "\n".join(results) - 外部规划器:
python复制def planner_react(query): plan = agent.invoke(f"为以下任务创建执行计划:\n{query}") steps = [s for s in plan.split("\n") if s.strip()] for step in steps: result = react(step) if "失败" in result: return f"步骤失败: {step}" return "所有步骤完成"
8. 性能优化深度实践
8.1 工具调用链路优化
原始流程:
- Agent生成Action文本
- 正则匹配提取工具和参数
- 查找工具函数
- 执行工具
- 返回结果
优化后流程:
python复制# 预编译工具映射
tool_map = {
name: (func, re.compile(fr'^{name}\s*:\s*(.*)$', re.I))
for name, func in known_tools.items()
}
def optimized_react(text: str):
for name, (func, pattern) in tool_map.items():
match = pattern.search(text)
if match:
return func(match.group(1))
return "未匹配到有效工具"
8.2 上下文压缩技术
对于长对话场景,使用摘要技术压缩历史:
python复制from transformers import pipeline
summarizer = pipeline("summarization")
def summarize_history(messages: list) -> str:
"""摘要压缩对话历史"""
text = "\n".join(f"{m['role']}: {m['content']}" for m in messages)
summary = summarizer(text, max_length=150, min_length=30, do_sample=False)
return summary[0]['summary_text']
8.3 自适应temperature调节
根据任务类型动态调整temperature:
python复制def dynamic_temperature_agent(query):
# 分析问题类型
if "计算" in query or "多少" in query:
temp = 0 # 确定性任务
elif "创意" in query or "想法" in query:
temp = 0.7 # 创造性任务
else:
temp = 0.3 # 默认值
agent = Agent(temperature=temp)
return agent.invoke(query)
9. 安全加固方案
9.1 输入消毒处理
python复制import html
def sanitize_input(text: str) -> str:
"""防止XSS等攻击"""
# 移除危险HTML标签
text = re.sub(r'<script.*?>.*?</script>', '', text, flags=re.I|re.S)
# 转义特殊字符
text = html.escape(text)
# 限制长度
return text[:1000]
9.2 工具权限控制
python复制from functools import wraps
def restrict_tools(allowed_tools):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
tool = kwargs.get('tool') or args[0]
if tool not in allowed_tools:
raise PermissionError(f"无权访问工具 {tool}")
return func(*args, **kwargs)
return wrapper
return decorator
@restrict_tools(['calculate'])
def safe_react(query, tool):
return known_tools[tool](query)
9.3 审计日志
python复制import json
from datetime import datetime
def audit_log(action, params, result, user):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"user": user,
"action": action,
"params": params,
"result": str(result)[:500] # 限制日志长度
}
with open("audit.log", "a") as f:
f.write(json.dumps(log_entry) + "\n")
10. 项目演进路线
10.1 短期优化
-
增强工具集:
- 添加网络搜索工具
- 集成数据库查询
- 支持文件操作
-
UI交互界面:
python复制import gradio as gr def build_ui(): with gr.Blocks() as demo: input_box = gr.Textbox(label="输入问题") output_box = gr.Textbox(label="Agent回答") submit = gr.Button("提交") submit.click(react, inputs=input_box, outputs=output_box) return demo -
性能监控面板:
python复制from prometheus_client import start_http_server, Counter REACT_CALLS = Counter('react_calls', 'Number of react invocations') def monitored_react(query): REACT_CALLS.inc() # ...原有逻辑...
10.2 中期规划
-
多Agent协作系统:
python复制class MultiAgentSystem: def __init__(self): self.agents = { 'math': Agent(math_prompt), 'general': Agent(general_prompt) } def route(self, query): if "计算" in query: return self.agents['math'].invoke(query) return self.agents['general'].invoke(query) -
领域知识增强:
python复制def load_knowledge(domain): with open(f"knowledge/{domain}.json") as f: return json.load(f) def domain_agent(query, domain): knowledge = load_knowledge(domain) tools = {**known_tools, 'query_knowledge': lambda x: knowledge.get(x, "未知")} return react(query, tools=tools) -
自动化测试覆盖:
python复制def generate_test_cases(): return [ ("计算2+2", "4"), ("苹果价格", "元/公斤"), # 自动生成更多测试用例... ] def run_validation(): for input_text, expected in generate_test_cases(): result = react(input_text) assert expected in result, f"测试失败: {input_text}"
10.3 长期愿景
-
自优化系统:
python复制class SelfImprovingAgent: def __init__(self): self.memory = VectorMemory() self.error_log = [] def learn_from_mistakes(self): for error in self.error_log: # 分析错误模式 # 自动调整Prompt # 更新工具配置 pass -
多模态扩展:
python复制def vision_agent(image_path, question): image_desc = vision_model.describe(image_path) return react(f"图片内容: {image_desc}\n问题: {question}") -
分布式执行:
python复制from ray import serve @serve.deployment class AgentWorker: def __init__(self): self.agent = Agent() def invoke(self, query): return self.agent.invoke(query) # 部署多个worker serve.run(AgentWorker.bind(), name="agent_cluster")
通过这个完整的实现方案,我们不仅构建了一个可用的ReAct Agent系统,还建立了一个可扩展、可定制的开发框架。这种底层实现方式虽然需要更多开发工作,但能提供更大的灵活性和更深入的理解,是进阶AI开发的必经之路。