Agent架构正在成为企业级AI应用的新范式。与传统的单轮对话模型不同,Agent系统具备自主规划、工具调用和迭代执行的能力,能够处理复杂的多步骤任务。想象一下,一个电商客服Agent不仅能回答产品问题,还能主动查询订单状态、发起售后流程、跟踪物流信息——这正是Agent架构带来的革命性变化。
在实际业务场景中,我们经常遇到三类典型问题:一是需要跨系统协作的复杂流程(如订单全生命周期管理);二是依赖实时数据更新的决策场景(如库存预警与补货建议);三是需要长期记忆支持的个性化服务(如客户偏好分析)。传统的大模型调用方式在这些场景下往往捉襟见肘,而Agent架构通过以下四个核心模块提供了系统性解决方案:
感知模块:就像人类的感官系统,负责接收和理解各类输入信息。在实际实现中,这包括自然语言理解、结构化数据解析、甚至多媒体输入处理。例如,当用户上传一张商品图片时,感知模块需要将其转换为Agent可处理的特征向量。
规划模块:相当于Agent的"大脑",负责任务分解和策略制定。通过大模型的思维链(Chain-of-Thought)能力,可以将"处理客户投诉"这样的复杂任务拆解为"验证订单信息"、"确认问题类型"、"生成解决方案"等可执行的子任务序列。
记忆模块:分为短期工作记忆和长期知识存储。短期记忆通常用Redis等高速缓存实现,保存当前会话的上下文;长期记忆则依托向量数据库(如Pinecone),存储历史交互数据和领域知识。这种分层设计既保证了响应速度,又实现了知识积累。
执行模块:作为Agent的"手和脚",负责调用外部工具和API。关键在于建立标准化的工具接口规范,例如使用OpenAPI标准描述工具功能,使Agent能动态发现和调用新工具。
提示:在初期架构设计时,建议采用"高内聚低耦合"的模块化思想。每个模块通过定义清晰的接口与其他模块交互,这样既便于单独优化各组件,又能灵活替换实现方案(如切换不同的向量数据库供应商)。
现代Agent开发已经形成相对成熟的工具链。我们推荐以下技术栈组合:
bash复制# 核心框架
pip install langchain==0.1.0 openai==1.12.0
# 向量数据库
pip install faiss-cpu==1.7.4
# 辅助工具
pip install python-dotenv==1.0.0 loguru==0.7.2
对于国内开发者,可能需要配置镜像源加速下载:
bash复制pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
环境配置中最关键的环节是API密钥管理。建议采用分级权限管理:
.env文件ini复制# 开发环境密钥
DEV_OPENAI_KEY=sk-your-dev-key
# 生产环境密钥(禁止直接写在代码中)
PROD_OPENAI_KEY=${VAULT:prod_openai_key}
工具定义的质量直接影响Agent的决策准确性。以下是经过实战检验的工具设计模式:
python复制from pydantic import BaseModel, Field
from typing import Optional, List
class OrderQueryInput(BaseModel):
order_id: str = Field(...,
description="16位数字订单编号,以2024开头",
regex="^2024\d{12}$")
@tool("query_order", args_schema=OrderQueryInput)
def query_order(order_id: str) -> str:
"""
高级订单查询接口
功能:
- 查询订单基本信息
- 返回支付状态、物流单号等关键字段
- 支持近3个月的历史订单查询
参数规范:
- order_id必须符合^2024\d{12}$格式
- 测试环境仅支持特定测试订单号
返回示例:
{
"status": "已发货",
"logistics_no": "SF123456789",
"last_update": "2024-05-01 14:00:00"
}
"""
# 实现逻辑...
关键设计要点:
记忆模块的架构设计需要平衡查询效率与存储成本。我们推荐以下分层存储方案:
python复制from langchain.memory import (
RedisChatMessageHistory,
VectorStoreRetrieverMemory
)
from langchain.vectorstores import FAISS
# 短期记忆 - Redis
short_term_memory = RedisChatMessageHistory(
session_id="user_123",
url="redis://:password@localhost:6379/0",
ttl=3600 # 1小时过期
)
# 长期记忆 - 向量数据库
vector_store = FAISS.from_texts(
texts=["历史会话数据..."],
embedding=OpenAIEmbeddings()
)
long_term_memory = VectorStoreRetrieverMemory(
retriever=vector_store.as_retriever(search_kwargs={"k": 3})
)
# 组合记忆系统
from langchain.memory import CombinedMemory
memory = CombinedMemory(memories=[
short_term_memory,
long_term_memory
])
注意:生产环境中务必为Redis配置持久化和备份策略。对于向量数据库,建议采用云服务商提供的托管服务(如阿里云向量检索服务),避免自维护的运维负担。
基础的Agent初始化往往不能满足生产需求,以下是一个增强版的配置示例:
python复制from langchain.agents import AgentExecutor, ConversationalAgent
from langchain.chains import LLMChain
# 定制化prompt模板
from langchain.prompts import PromptTemplate
agent_prompt = PromptTemplate.from_template("""
你是一个专业的电商客服Agent,需要遵循以下规则:
1. 始终使用中文回复
2. 对于不确定的信息回答"我需要进一步确认"
3. 涉及订单信息时必须验证用户身份
当前对话历史:
{chat_history}
工具列表:
{tools}
用户输入:{input}
请按照以下格式响应:
思考:<你的思考过程>
行动:<要调用的工具>
行动输入:<json格式的输入>
观察:<工具返回结果>
...(重复思考-行动-观察直到任务完成)
最终答案:<给用户的回复>
""")
# 构建定制化Agent
llm = OpenAI(temperature=0.3, max_tokens=2000)
agent = ConversationalAgent(
llm_chain=LLMChain(llm=llm, prompt=agent_prompt),
tools=tools,
verbose=True
)
# 配置执行器
agent_executor = AgentExecutor.from_agent_and_tools(
agent=agent,
tools=tools,
memory=memory,
max_iterations=5, # 防止无限循环
early_stopping_method="generate", # 超时后尝试生成合理回复
handle_parsing_errors=True # 自动处理解析错误
)
关键优化点:
工具调用是Agent系统的核心风险点,需要实现以下安全机制:
python复制from tenacity import retry, stop_after_attempt, wait_exponential
from circuitbreaker import circuit
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
reraise=True
)
@circuit(
failure_threshold=5,
recovery_timeout=30
)
def safe_tool_call(tool_func, *args, **kwargs):
"""
带重试和熔断机制的工具调用封装
特性:
- 指数退避重试(最多3次)
- 错误率超过阈值时自动熔断
- 调用超时控制(默认5秒)
- 输入参数校验
"""
# 参数校验逻辑
validate_params(tool_func, *args, **kwargs)
# 带超时的调用
try:
return timeout(5)(tool_func)(*args, **kwargs)
except Exception as e:
log_error(e)
raise
配套的监控指标应该包括:
记忆模块的常见性能瓶颈在于向量检索效率,以下是经过验证的优化方案:
python复制# 按数据类型建立多个索引
product_index = FAISS.from_texts(product_descriptions, embeddings)
order_index = FAISS.from_texts(order_records, embeddings)
# 查询时并行检索
from concurrent.futures import ThreadPoolExecutor
def retrieve_memory(query):
with ThreadPoolExecutor() as executor:
product_future = executor.submit(product_index.similarity_search, query, k=1)
order_future = executor.submit(order_index.similarity_search, query, k=1)
return {
"product": product_future.result(),
"order": order_future.result()
}
python复制from cachetools import TTLCache
# 构建带TTL的缓存
memory_cache = TTLCache(maxsize=1000, ttl=300)
def cached_retrieve(query):
if query in memory_cache:
return memory_cache[query]
result = vector_store.similarity_search(query)
memory_cache[query] = result
return result
python复制from langchain.text_splitter import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50,
length_function=len,
separators=["\n\n", "\n", "。", "?", "!"]
)
def preprocess_text(text):
# 清理特殊字符
cleaned = re.sub(r'[^\w\s\u4e00-\u9fa5]', '', text)
# 智能分块
chunks = text_splitter.split_text(cleaned)
# 提取关键词
keywords = extract_keywords(cleaned)
return {
"chunks": chunks,
"keywords": keywords
}
现代AI系统的最佳实践是采用容器化部署。以下是Dockerfile的推荐配置:
dockerfile复制# 基础镜像
FROM python:3.9-slim
# 设置时区
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
python3-dev \
&& rm -rf /var/lib/apt/lists/*
# 创建工作目录
WORKDIR /app
COPY . .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt \
&& pip install gunicorn==20.1.0
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["gunicorn", "-w 4", "-k uvicorn.workers.UvicornWorker", "app:agent_server"]
配套的docker-compose.yml应该包含以下服务:
yaml复制version: '3.8'
services:
agent:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- redis
- vector_db
redis:
image: redis:6-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
vector_db:
image: milvusdb/milvus:v2.3.0
ports:
- "19530:19530"
volumes:
- milvus_data:/var/lib/milvus
volumes:
redis_data:
milvus_data:
生产环境必须建立完善的监控系统,核心指标包括:
性能指标:
质量指标:
业务指标:
推荐使用Prometheus+Grafana构建监控看板:
yaml复制# prometheus.yml 配置示例
scrape_configs:
- job_name: 'agent'
metrics_path: '/metrics'
static_configs:
- targets: ['agent:8000']
- job_name: 'redis'
static_configs:
- targets: ['redis:6379']
Agent系统的更新需要谨慎的发布策略:
python复制from fastapi import Request
@app.middleware("http")
async def route_by_version(request: Request, call_next):
user_id = request.headers.get("X-User-ID")
if user_id and hash(user_id) % 100 < 10: # 10%流量到新版本
request.scope["path"] = "/v2" + request.scope["path"]
return await call_next(request)
python复制def evaluate_agent_performance(agent_a, agent_b, test_cases):
results = []
for case in test_cases:
try:
res_a = agent_a.run(case)
res_b = agent_b.run(case)
results.append({
"case": case,
"a": res_a,
"b": res_b,
"winner": human_evaluate(res_a, res_b)
})
except Exception as e:
log_error(e)
return results
bash复制# 快速回滚到上一个稳定版本
kubectl rollout undo deployment/agent --to-revision=1
问题现象:Agent频繁返回"我无法完成这个请求",日志显示工具调用失败率高达40%。
排查过程:
query_order工具解决方案:
python复制@tool("query_order_v2")
def query_order_v2(order_id: str, page: int = 1, page_size: int = 50):
"""
支持分页查询的订单接口
"""
start = (page - 1) * page_size
end = start + page_size
return get_order_details(order_id)[start:end]
python复制@cache.memoize(ttl=300)
def get_order_details(order_id):
# 数据库查询逻辑
python复制agent_executor = AgentExecutor(
# ...其他参数
max_retries=2,
retry_wait=1
)
优化效果:工具调用成功率提升至98%,平均响应时间降至1.2秒。
问题现象:Agent频繁返回与当前问题无关的历史信息,导致对话逻辑混乱。
根本原因:
优化方案:
python复制from sentence_transformers import CrossEncoder
reranker = CrossEncoder('bert-base-chinese')
def enhanced_retrieve(query, context, k=3):
# 初步召回
candidates = vector_store.similarity_search(query, k=10)
# 结合上下文重新排序
pairs = [(f"{context}\n{query}", doc.page_content) for doc in candidates]
scores = reranker.predict(pairs)
# 取Top-k
sorted_docs = [doc for _, doc in sorted(zip(scores, candidates), reverse=True)]
return sorted_docs[:k]
python复制class LayeredMemory:
def __init__(self):
self.product_mem = FAISS.from_texts([...])
self.order_mem = FAISS.from_texts([...])
self.general_mem = FAISS.from_texts([...])
def retrieve(self, query, domain_hint=None):
if domain_hint == "product":
return self.product_mem.similarity_search(query)
elif domain_hint == "order":
return self.order_mem.similarity_search(query)
else:
return self.general_mem.similarity_search(query)
优化效果:记忆检索准确率提升65%,对话连贯性显著改善。
问题现象:相同输入得到差异很大的输出,特别是在任务分解步骤。
解决方案:
python复制task_decomposition_prompt = """
你是一个任务规划专家,请严格按照以下规则分解任务:
1. 每个子任务必须是可执行的原子操作
2. 使用<tool_name>标记需要调用的工具
3. 输出格式为:
步骤1: <描述> <tool_name>
步骤2: <描述> <tool_name>
...
当前任务:{task}
相关历史:{history}
"""
python复制def validate_plan(plan):
required_fields = ["步骤", "工具"]
for step in plan.split("\n"):
if not all(field in step for field in required_fields):
return False
return True
def get_reliable_plan(task, max_attempts=3):
for _ in range(max_attempts):
plan = llm(task_decomposition_prompt.format(task=task))
if validate_plan(plan):
return plan
return DEFAULT_PLAN
python复制def ensemble_plan(task, models):
plans = [model.generate_plan(task) for model in models]
return max(set(plans), key=plans.count)
优化效果:任务分解一致性提升80%,工具调用准确率提高45%。
生产级Agent必须实现五层安全防护:
python复制from llm_security import Sanitizer
sanitizer = Sanitizer(
blocklist=["信用卡", "密码"],
regex_patterns=[r"\d{16}", r"\d{3}-\d{2}-\d{4}"], # 卡号、SSN等
replacement="[REDACTED]"
)
safe_input = sanitizer.sanitize(user_input)
python复制def check_tool_permission(tool_name, user_context):
permissions = {
"query_order": ["customer", "cs_agent"],
"refund_order": ["cs_manager"]
}
return user_context.role in permissions.get(tool_name, [])
python复制from transformers import pipeline
safety_checker = pipeline("text-classification", model="llm-security/safety-checker")
def is_safe_output(text):
result = safety_checker(text)
return result[0]["label"] == "safe"
python复制audit_logger = AuditLogger(
storage="s3://audit-logs",
retention_days=180,
alert_rules={
"sensitive_data_leak": {"pattern": r"\[\w{16}\]", "threshold": 1}
}
)
大模型应用的成本主要来自token消耗,以下是经过验证的优化方法:
python复制from langchain.chains import ConversationChain
from langchain.memory import ConversationBufferWindowMemory
compressed_memory = ConversationBufferWindowMemory(
k=3,
human_prefix="用户",
ai_prefix="AI",
memory_key="history",
output_key="response"
)
compressor_chain = ConversationChain(
llm=small_llm,
memory=compressed_memory,
prompt=COMPRESS_PROMPT
)
def compress_history(full_history):
return compressor_chain.run(
input="压缩以下对话,保留关键信息",
history=full_history
)
python复制from langchain.cache import SQLiteCache
import langchain
# 全局启用缓存
langchain.llm_cache = SQLiteCache(database_path=".langchain.db")
# 带语义相似度的缓存
from langchain.cache import SemanticCache
langchain.llm_cache = SemanticCache(
embedding=OpenAIEmbeddings(),
redis_url="redis://localhost:6379/1"
)
python复制class ModelRouter:
def __init__(self):
self.small_llm = ChatOpenAI(model="gpt-3.5-turbo")
self.large_llm = ChatOpenAI(model="gpt-4")
def route(self, query):
complexity = self.estimate_complexity(query)
if complexity < 0.5:
return self.small_llm
else:
return self.large_llm
def estimate_complexity(self, text):
# 基于长度、实体数量、意图分类等特征
return min(len(text) / 1000, 1.0)
python复制from langchain.callbacks import get_openai_callback
with get_openai_callback() as cb:
result = agent.run("用户查询")
print(f"本次调用消耗 {cb.total_tokens} tokens")
if cb.total_tokens > 1000:
alert("高token消耗预警")
随着业务发展,Agent需要支持动态扩展的工具生态系统:
python复制# 插件注册中心
class PluginRegistry:
def __init__(self):
self.plugins = {}
def register(self, name, metadata, func):
self.plugins[name] = {
"metadata": metadata,
"function": func
}
def discover(self, query):
# 基于语义搜索查找插件
return semantic_search(query, self.plugins)
# 插件开发SDK
class PluginSDK:
def __init__(self, registry):
self.registry = registry
def create_tool(self, name, description, params):
def decorator(func):
tool_metadata = {
"name": name,
"description": description,
"parameters": params
}
self.registry.register(name, tool_metadata, func)
return func
return decorator
# 使用示例
sdk = PluginSDK(global_registry)
@sdk.create_tool(
name="check_inventory",
description="查询商品库存",
params={"product_id": "string", "warehouse": "string"}
)
def check_inventory(product_id, warehouse):
# 实现逻辑...
复杂业务场景需要多个Agent协同工作:
python复制from typing import List, Dict
from langchain.agents import Agent
class Coordinator:
def __init__(self, agents: Dict[str, Agent]):
self.agents = agents
self.conversation = []
def delegate(self, task: str) -> str:
# 选择最合适的Agent
agent = self.select_agent(task)
# 执行任务
result = agent.run(task)
# 记录对话历史
self.conversation.append({
"task": task,
"agent": agent.name,
"result": result
})
return result
def select_agent(self, task: str) -> Agent:
# 基于任务类型选择Agent
if "订单" in task:
return self.agents["order_agent"]
elif "产品" in task:
return self.agents["product_agent"]
else:
return self.agents["general_agent"]
# 初始化协作系统
order_agent = create_order_agent()
product_agent = create_product_agent()
coordinator = Coordinator({
"order_agent": order_agent,
"product_agent": product_agent
})
让Agent能够从交互中持续学习:
python复制class OnlineLearner:
def __init__(self, agent, learning_rate=0.1):
self.agent = agent
self.learning_rate = learning_rate
self.feedback_buffer = []
def receive_feedback(self, feedback):
self.feedback_buffer.append(feedback)
if len(self.feedback_buffer) >= 10:
self.update_model()
def update_model(self):
# 准备训练数据
training_data = self.process_feedback()
# 微调模型
self.agent.llm.fine_tune(training_data, lr=self.learning_rate)
# 清空缓冲区
self.feedback_buffer = []
def process_feedback(self):
# 将反馈转换为训练样本
return [
(fb["input"], fb["ideal_output"])
for fb in self.feedback_buffer
]
# 使用示例
learner = OnlineLearner(agent)
while True:
user_input = get_user_input()
output = agent.run(user_input)
feedback = get_human_feedback(output)
learner.receive_feedback(feedback)
建立科学的性能评估体系:
python复制import time
from statistics import mean, stdev
class Benchmark:
def __init__(self, agent):
self.agent = agent
self.metrics = {
"latency": [],
"accuracy": [],
"cost": []
}
def run_test_case(self, test_case, expected):
start_time = time.time()
with get_openai_callback() as cb:
response = self.agent.run(test_case)
cost = cb.total_cost
latency = time.time() - start_time
accuracy = self.calculate_accuracy(response, expected)
self.metrics["latency"].append(latency)
self.metrics["accuracy"].append(accuracy)
self.metrics["cost"].append(cost)
return {
"test_case": test_case,
"response": response,
"latency": latency,
"accuracy": accuracy,
"cost": cost
}
def calculate_accuracy(self, response, expected):
# 实现自定义的准确率计算逻辑
return similarity_score(response, expected)
def generate_report(self):
return {
"avg_latency": mean(self.metrics["latency"]),
"latency_stdev": stdev(self.metrics["latency"]),
"avg_accuracy": mean(self.metrics["accuracy"]),
"accuracy_stdev": stdev(self.metrics["accuracy"]),
"total_cost": sum(self.metrics["cost"])
}
# 使用示例
benchmark = Benchmark(agent)
for case in test_cases:
benchmark.run_test_case(case["input"], case["expected"])
report = benchmark.generate_report()
高并发场景下的性能优化技巧:
python复制from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool
from langchain.callbacks import AsyncIteratorCallbackHandler
app = FastAPI()
@app.post("/chat")
async def chat_endpoint(request: Request):
user_input = await request.json()
# 异步流式响应
async def generate_response():
callback = AsyncIteratorCallbackHandler()
task = asyncio.create_task(
run_in_threadpool(
agent.run,
user_input,
callbacks=[callback]
)
)
async for token in callback.aiter():
yield token
await task
return StreamingResponse(generate_response())
# 速率限制中间件
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@app.post("/chat")
@limiter.limit("10/minute")
async def chat_endpoint(request: Request):
# 实现同上
针对不同部署环境的硬件优化:
python复制from langchain.llms import LlamaCpp
llm = LlamaCpp(
model_path="models/llama-2-7b-chat.gguf",
n_ctx=2048,
n_threads=8, # 根据CPU核心数调整
n_gpu_layers=0 # 纯CPU模式
)
python复制llm = LlamaCpp(
model_path="models/llama-2-7b-chat.gguf",
n_gpu_layers=40, # 使用GPU加速
n_threads=4 # 保留部分CPU线程
)
python复制from langchain.llms import TensorRTLLM
llm = TensorRTLLM(
engine_dir="trt_engines/llama-2-7b-chat",
tokenizer_name="meta-llama/Llama-2-7b-chat-hf"
)
python复制llm = LlamaCpp(
model_path="models/llama-2-7b-chat.Q4_K_M.gguf", # 4-bit量化模型
n_ctx=2048
)
针对电商场景的特殊优化:
python复制class EcommerceAgent:
def __init__(self):
# 初始化核心组件
self.llm = self.init_llm()
self.tools = self.load_tools()
self.memory = self.setup_memory()
# 电商专用模块
self.product_catalog = ProductCatalog()
self.promotion_engine = PromotionEngine()
self.sentiment_analyzer = SentimentAnalyzer()
def init_llm(self):
return ChatOpenAI(
model="gpt-4",
temperature=0.2,
max_tokens=1000,
model_kwargs={
"stop": ["\n客户:", "\n客服:"]
}
)
def process_input(self, user_input):
# 情感分析
sentiment = self.sentiment_analyzer.analyze(user_input)
# 实体识别
entities = extract_entities(user_input)
# 促销信息注入
if "price" in entities:
promotions = self.promotion_engine.get_relevant_promotions(entities)
user_input += f"\n当前促销信息:{promotions}"
return {
"processed_input": user_input,
"sentiment": sentiment,
"entities": entities
}
def run(self, user_input):
# 预处理
context = self.process_input(user_input)
# 构建prompt
prompt = self.build_prompt(context)
# 执行Agent
response = self.agent_executor.run(
input=prompt,
memory=self.memory
)
# 后处理
return self.post_process(response, context)
自动化处理技术支持的Agent设计:
python复制class SupportTicketAgent:
def __init__(self):
self.llm = OpenAI(temperature=0)
self.knowledge_base = FAISS.load_local("kb_index")
self.ticket_system = TicketSystemAPI()
self.tools = [
self.create_search_kb_tool(),
self.create_create_ticket_tool(),
self.create_escalate_tool()
]
self.agent = initialize_agent(
tools=self.tools,
llm=self.llm,
agent="conversational-react-description",
verbose=True
)
def create_search_kb_tool(self):
@tool
def search_knowledge_base(query: str) -> str:
"""搜索知识库文章"""
docs = self.knowledge_base.similarity_search(query, k=3)
return "\n\n".join(doc.page_content for doc in docs)
return search_knowledge_base
def run(self, user_query):
# 预分类问题类型
problem_type = self.classify_problem(user_query)
# 执行Agent
result = self.agent.run(
input=f"问题类型:{problem_type}\n用户问题:{user_query}"
)
# 自动记录工单
if "无法解决" in result:
ticket_id = self.ticket_system.create_ticket(
description=user_query,
category=problem_type
)
result += f"\n已创建工单#{ticket_id},技术团队将尽快处理"
return result
为数据分析师设计的专业Agent:
python复制class DataAnalysisAgent:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4-0613")
self.tools = self.load_data_tools()
self.visualization_tools = self.load_viz_tools()
self.agent = initialize_agent(
tools=self.tools + self.visualization_tools,
llm=self.llm,
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True
)
def load_data_tools(self):
return [
self.create_sql_query_tool(),
self.create_data_clean_tool(),
self.create_stat_analysis_tool()
]
def create_sql_query_tool(self):
@tool
def query_database(query: str) -> str:
"""执行SQL查询并返回结果"""
# 连接数据库执行查询
conn = create_connection()
df = pd.read_sql(query, conn)
return df.to_json(orient="records")
return query_database
def run_analysis(self, task_description):
# 验证任务可行性
if not self.validate_task(task_description):
return "无法理解分析需求,请提供更具体的描述"
# 执行分析
result = self.agent.run(
f"作为数据分析助手,请完成以下任务:{task_description}"
)
#