在当今AI技术快速发展的背景下,我们正见证着一个重要的转变:从简单的脚本执行到真正的智能体系统的演进。作为一名长期从事AI工程实践的开发者,我深刻体会到这个转变背后的技术挑战和工程价值。
传统脚本式AI应用通常表现为:
而真正的生产级AI智能体应该具备:
在竞品分析领域,我们面临的首要挑战是如何从混乱的网页数据中提取有价值的信息。典型问题包括:
传统基于规则(如XPath/CSS选择器)的爬虫方案维护成本极高,每次目标网站改版都需要重新调整提取规则。这正是AI技术可以大显身手的地方。
一个完整的竞品分析流程通常包含多个步骤:
传统实现中,任何一个环节出错都会导致整个流程中断。我们需要构建具备自我修复能力的系统。
现代开发者通常工作在复杂的环境中:
理想的竞品分析工具应该能够无缝融入这些环境,而不是作为一个孤立的系统存在。
我们采用领域驱动设计(DDD)和六边形架构来构建系统,核心目录结构如下:
code复制src/competitor_hunter/
├── core/ # 领域模型和业务逻辑
│ ├── entities/ # 核心业务对象
│ ├── services/ # 领域服务
│ └── ports/ # 抽象接口定义
├── infrastructure/ # 具体实现
│ ├── adapters/ # 外部服务适配器
│ └── clients/ # 第三方客户端
└── interface/ # 对外暴露的接口
├── mcp/ # MCP协议实现
└── cli/ # 命令行界面
这种架构的关键优势在于:
MCP是一种标准化的AI交互协议,我们的实现包含以下关键部分:
python复制class MCPServer:
def __init__(self, agent: CompetitorHunterAgent):
self.agent = agent
async def handle_request(self, request: MCPRequest) -> MCPResponse:
# 解析请求参数
target_url = request.params.get("url")
analysis_type = request.params.get("analysis_type")
# 调用核心业务逻辑
try:
result = await self.agent.analyze_competitor(
url=target_url,
analysis_type=analysis_type
)
return MCPResponse.success(result)
except Exception as e:
return MCPResponse.error(str(e))
通过实现MCP接口,我们的智能体可以:
我们使用LangGraph构建了一个健壮的状态机来处理竞品分析流程:
python复制from langgraph.graph import Graph
from langgraph.nodes import Node, ConditionalEdge
# 定义节点
class ScrapeNode(Node):
async def run(self, state):
# 实现页面抓取逻辑
pass
class ExtractNode(Node):
async def run(self, state):
# 实现信息提取逻辑
pass
class SaveNode(Node):
async def run(self, state):
# 实现数据存储逻辑
pass
class RetryNode(Node):
async def run(self, state):
# 实现重试逻辑
pass
# 构建图
workflow = Graph()
workflow.add_node("scrape", ScrapeNode())
workflow.add_node("extract", ExtractNode())
workflow.add_node("save", SaveNode())
workflow.add_node("retry", RetryNode())
# 定义边
workflow.add_edge("scrape", "extract")
workflow.add_edge("extract", "save")
# 条件边:当抓取失败时转向重试
workflow.add_conditional_edge(
"scrape",
lambda state: "error" in state,
{"retry": True, "extract": False}
)
workflow.add_edge("retry", "scrape") # 重试后回到抓取节点
这种设计使得系统能够:
我们使用Pydantic V2定义严格的数据模型:
python复制from pydantic import BaseModel, Field
from typing import List
class PricingTier(BaseModel):
name: str = Field(description="定价层级名称")
price: float = Field(description="价格数值")
unit: str = Field(description="价格单位")
features: List[str] = Field(description="包含的功能")
class CompetitorProduct(BaseModel):
product_name: str = Field(description="产品名称", min_length=1)
pricing_tiers: List[PricingTier] = Field(description="定价梯度列表", min_items=1)
core_features: List[str] = Field(description="核心功能点", min_items=3)
summary: str = Field(description="Markdown格式的深度总结", min_length=100)
@model_validator(mode="after")
def validate_pricing(self):
# 自定义验证逻辑
if len(self.pricing_tiers) > 5:
raise ValueError("定价梯度不应超过5个")
return self
结合LangChain的结构化输出功能,我们确保LLM生成的数据符合预期:
python复制from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
parser = PydanticOutputParser(pydantic_object=CompetitorProduct)
prompt = ChatPromptTemplate.from_template("""
请分析以下网站内容并提取竞品信息:
{content}
请严格按照要求返回JSON数据:
{format_instructions}
""")
chain = prompt | llm | parser
我们使用Playwright进行网页自动化,并实现了多项优化:
python复制async def scrape_page(url: str, max_retries: int = 3) -> str:
browser = await playwright.chromium.launch()
context = await browser.new_context(
user_agent="Mozilla/5.0...",
viewport={"width": 1920, "height": 1080}
)
for attempt in range(max_retries):
try:
page = await context.new_page()
await page.goto(url, timeout=30000)
# 等待关键元素加载
await page.wait_for_selector(".pricing-section", timeout=5000)
# 处理懒加载内容
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(1)
# 获取完整HTML
content = await page.content()
return content
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # 指数退避
await browser.close()
关键优化点包括:
我们设计了多阶段提示策略:
python复制page_analysis_prompt = """
你是一个专业的网页分析师。请仔细分析以下网页内容:
{page_content}
请回答:
1. 这个页面的主要目的是什么?
2. 关键信息分布在哪些区域?
3. 页面结构有什么特点?
"""
python复制data_extraction_prompt = """
基于之前的分析,请从以下内容中提取结构化信息:
{content}
提取要求:
- 产品名称:从标题或显著位置获取
- 定价信息:包括套餐名称、价格、周期和包含功能
- 核心功能:列出至少5个主要功能点
- 独特卖点:找出与竞品不同的特色
"""
python复制summary_prompt = """
请根据以下数据生成专业的竞品分析摘要:
{structured_data}
要求:
- 使用Markdown格式
- 包含比较分析
- 突出关键差异点
- 保持客观中立
"""
这种分阶段处理方式显著提高了信息提取的准确性和完整性。
我们采用最新的Python工具链:
bash复制# 使用uv替代pip和venv
uv pip install -r requirements.txt
# 使用rye管理项目
rye sync
# 开发模式运行
rye run dev
MCP服务使用FastAPI实现:
python复制from fastapi import FastAPI
from mcp_protocol import MCPRequest, MCPResponse
app = FastAPI()
@app.post("/mcp")
async def handle_mcp(request: MCPRequest):
agent = get_agent() # 获取已配置的智能体实例
return await agent.handle_request(request)
启动命令:
bash复制uvicorn mcp_server:app --host 0.0.0.0 --port 8000 --reload
在Cursor或VSCode的配置中添加:
json复制{
"ai.tools": {
"CompetitorHunter": {
"endpoint": "http://localhost:8000/mcp",
"commands": [
{
"name": "analyze",
"description": "分析竞品网站",
"parameters": ["url"]
}
]
}
}
}
我们实现了多级缓存:
python复制from functools import lru_cache
import diskcache
# 内存缓存
@lru_cache(maxsize=100)
def get_page_structure(url: str) -> str:
...
# 磁盘缓存
cache = diskcache.Cache("tmp/cache")
@cache.memoize()
def analyze_content(content: str) -> dict:
...
使用Prometheus进行指标收集:
python复制from prometheus_client import Counter, Histogram
REQUEST_COUNT = Counter(
"competitor_hunter_requests_total",
"Total number of requests",
["endpoint", "status"]
)
REQUEST_LATENCY = Histogram(
"competitor_hunter_request_latency_seconds",
"Request latency in seconds",
["endpoint"]
)
@app.middleware("http")
async def monitor_requests(request, call_next):
start_time = time.time()
endpoint = request.url.path
try:
response = await call_next(request)
REQUEST_COUNT.labels(endpoint, response.status_code).inc()
return response
finally:
latency = time.time() - start_time
REQUEST_LATENCY.labels(endpoint).observe(latency)
我们定义了详细的异常层次:
python复制class CompetitorHunterError(Exception):
"""基础异常类型"""
pass
class RetryableError(CompetitorHunterError):
"""可重试的临时性错误"""
pass
class ConfigurationError(CompetitorHunterError):
"""配置错误"""
pass
class LLMGenerationError(CompetitorHunterError):
"""LLM生成内容不符合要求"""
pass
def handle_error(error: Exception) -> MCPResponse:
if isinstance(error, RetryableError):
return MCPResponse.retry_later(str(error))
elif isinstance(error, LLMGenerationError):
return MCPResponse.invalid_content(str(error))
else:
return MCPResponse.error(str(error))
对于常见问题,我们实现了自动修复:
python复制async def auto_fix_extraction(
content: str,
schema: Type[BaseModel],
max_attempts: int = 3
) -> BaseModel:
parser = PydanticOutputParser(pydantic_object=schema)
for attempt in range(max_attempts):
try:
result = await llm_chain.arun(content=content)
return result
except ValidationError as e:
if attempt == max_attempts - 1:
raise
# 将验证错误反馈给LLM进行修正
feedback = f"Previous error: {str(e)}\nPlease correct your output."
content = f"{content}\n\nError Feedback: {feedback}"
我们采取了以下措施:
python复制async def anonymize_data(product: CompetitorProduct) -> CompetitorProduct:
"""移除可能包含的敏感信息"""
anonymized = product.copy()
anonymized.product_name = hashlib.sha256(product.product_name.encode()).hexdigest()[:8]
return anonymized
基于API密钥的访问控制:
python复制from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-KEY")
@app.post("/mcp")
async def secured_endpoint(
request: MCPRequest,
api_key: str = Depends(api_key_header)
):
if not validate_api_key(api_key):
raise HTTPException(status_code=403)
...
我们设计了可扩展的插件接口:
python复制class AnalysisPlugin(ABC):
@abstractmethod
async def analyze(self, content: str) -> dict:
pass
class PricingAnalysisPlugin(AnalysisPlugin):
async def analyze(self, content: str) -> dict:
# 实现特定的定价分析逻辑
pass
# 注册插件
agent.register_plugin("pricing", PricingAnalysisPlugin())
支持用户自定义报告格式:
python复制from jinja2 import Template
class ReportGenerator:
def __init__(self, template_path: str):
with open(template_path) as f:
self.template = Template(f.read())
def generate(self, data: dict) -> str:
return self.template.render(**data)
在开发过程中,我们积累了一些宝贵经验:
浏览器自动化最佳实践:
LLM交互的可靠性技巧:
性能优化发现:
错误处理经验:
这个项目展示了如何将现代AI技术与软件工程最佳实践相结合,构建真正可靠的生产级AI应用。通过采用清晰的架构设计、严格的数据验证和健壮的错误处理,我们成功将一个原本脆弱的脚本转变为可投入实际使用的智能体系统。