在构建智能助手类应用时,我们经常会遇到这样的用户请求:"帮我查一下明天北京到上海的高铁,顺便看看上海天气"。这类复合型请求包含多个独立子任务,传统单一Agent架构面临两个核心痛点:
我在实际项目中发现,当系统需要处理3个以上并行任务时,串行架构的响应时间会呈线性增长。例如查询"高铁班次+目的地天气+当地防疫政策",串行处理可能需要6-8秒,而并行方案可以压缩到2-3秒。
本项目采用Supervisor-Worker架构,包含三类核心组件:
Supervisor节点:负责意图识别和任务分发
Worker节点:专业Agent处理具体任务
Synthesizer节点:结果聚合与呈现
| 技术栈 | 选型理由 |
|---|---|
| LangGraph | 原生支持并行执行和状态管理,提供Send API和Reducer机制 |
| FastAPI | 异步框架完美支持SSE流式传输,与LangGraph天然契合 |
| Pydantic v2 | 强类型验证确保系统各模块间的数据契约 |
| 通义千问 | 中文场景性能优异,API兼容OpenAI标准,便于后续切换模型 |
并行系统最棘手的问题是状态竞争。我们通过自定义Reducer实现安全的状态管理:
python复制def _reset_on_empty(existing: list, new: list) -> list:
"""智能列表Reducer实现"""
if not new: # 收到空列表时重置状态
return []
return existing + new # 非空时追加新结果
class AgentState(TypedDict):
agent_outputs: Annotated[list, _reset_on_empty] # 并行安全的结果收集
messages: Annotated[Sequence[BaseMessage], add_messages] # 对话历史管理
这种设计带来三个优势:
路由模块采用分级处理策略,兼顾性能和准确率:
第一层:关键词快速路由
第二层:LLM精确路由
实测数据显示,这种混合路由策略相比纯LLM路由方案,整体响应速度提升40%,同时保持95%+的意图识别准确率。
LangGraph的Send API是实现真正并发的关键:
python复制def route_after_supervisor(state: AgentState):
next_agents = state.get("next_agents", [])
if len(next_agents) > 1: # 多任务并行分支
return [Send(agent, state) for agent in next_agents]
elif next_agents: # 单任务直连
return next_agents[0]
else: # 无任务结束
return END
这种设计使得N个独立任务的总耗时从ΣTn降低到max(Tn),在天气查询+车票查询的典型场景下,响应时间从1.2s降至0.7s。
根据任务类型采用不同的输出策略:
| 场景 | 处理方式 | 技术实现 | 用户体验优势 |
|---|---|---|---|
| 单任务 | 实时流式输出 | astream_events捕获子图内部token | 首token时间优化到200ms内 |
| 多任务 | 完整收集后统一流式汇总 | ainvoke收集+LLM流式合成 | 避免结果碎片化 |
定制化的事件流协议包含6种核心事件:
thinking_start:思考链开始thinking_stream:实时推送思考过程content_stream:内容流式输出thinking_end:思考链结束response_end:完整响应结束error:错误处理前端通过状态机精确控制每种事件的渲染方式,实现终端风格的交互体验。
新增Agent只需三步:
python复制class HotelSearchTool(BaseTool):
name = "hotel_search"
description = "酒店查询工具"
args_schema = HotelSearchInput
def _run(self, city: str, checkin: str) -> str:
# 调用酒店API
return f"{city}的酒店信息..."
python复制registry.register(
name="hotel_search",
keywords=["酒店", "住宿"],
tool_class=HotelSearchTool,
priority=2
)
这种设计使得系统功能扩展完全不影响核心工作流,符合开闭原则。
采用pydantic-settings统一管理配置项:
python复制class Settings(BaseSettings):
LLM_API_KEY: str
LLM_API_BASE: str = "https://dashscope.aliyuncs.com/compatible-mode/v1"
LLM_MODEL: str = "qwen-max"
API_PORT: int = 8002
class Config:
env_file = ".env"
这种集中式配置带来三个好处:
经过优化后,系统在典型负载下(4核8G云主机)的表现:
| 指标 | 数值 |
|---|---|
| 平均响应时间 | 1.2s |
| 99分位响应时间 | 2.5s |
| 单机QPS | 120 |
| 错误率 | <0.5% |
现象:Worker节点的思考链丢失
根因:contextvars在协程切换时未正确传播
解决方案:
python复制def ensure_config_context(config: RunnableConfig):
if config and not var_child_runnable_config.get(None):
var_child_runnable_config.set(config)
现象:并行Agent结果互相覆盖
解决:使用Annotated Reducer实现原子化追加
python复制agent_outputs: Annotated[list, _reset_on_empty]
现象:前端收到工具调用的JSON片段
解决:过滤tool_call_chunks
python复制if content and not getattr(chunk, "tool_call_chunks", []):
writer.send_content_update(content)
推荐使用Docker Compose部署:
yaml复制services:
backend:
image: plan-multiagent:latest
ports:
- "8002:8002"
environment:
- LLM_API_KEY=${LLM_API_KEY}
frontend:
image: nginx:alpine
ports:
- "5173:80"
volumes:
- ./frontend/dist:/usr/share/nginx/html
建议配置以下监控指标:
在实际使用中,我们发现以下优化方向值得关注:
这个架构经过三个月的生产验证,日均处理请求量超过50万次,证明了其稳定性和扩展性。最让我惊喜的是LangGraph的Reducer设计,用声明式的方法解决了并行编程中最棘手的共享状态问题。