最近在帮一个电商团队搭建自动化通知系统时,发现他们需要同时处理订单状态变更、库存预警和客服工单分配三种业务场景。传统的单机器人架构不仅会导致消息混乱,还经常出现响应延迟。于是基于OpenClaw框架设计了一套多Agent协同方案,通过不同职能的Agent分工处理特定任务,再统一对接飞书机器人实现消息聚合分发。
这套方案最核心的价值在于:
plaintext复制[业务系统] --> [OpenClaw Gateway]
↗ ↑ ↖
[订单Agent] [库存Agent] [客服Agent]
↘ ↓ ↖
[飞书消息适配器] --> [飞书机器人]
OpenClaw Gateway
业务Agent
process_event标准接口飞书适配器
每个Agent启动时需要向Gateway注册能力声明:
python复制{
"agent_type": "order",
"version": "1.2.0",
"process_schema": {
"event_type": ["order_paid", "order_refund"],
"priority": 3
}
}
关键点:
priority字段决定消息路由顺序,建议交易类设为最高(1-3级),通知类设为普通(4-6级)
业务系统与Gateway的通信协议示例:
json复制{
"event_id": "20240520-0001",
"timestamp": 1716182400,
"event_type": "inventory_alert",
"data": {
"sku": "A2034",
"current_qty": 5,
"threshold": 10
}
}
适配器会将Agent输出转为飞书卡片消息:
python复制def build_card_content(agent_resp):
card = {
"header": {
"title": f"【{agent_resp['module']}】告警通知",
"template": "red" if agent_resp['level'] == "urgent" else "blue"
},
"elements": [
{
"tag": "markdown",
"content": f"**SKU**: {agent_resp['data']['sku']}\n**当前库存**: {agent_resp['data']['current_qty']}"
}
]
}
return json.dumps(card)
config/gateway.yaml示例:
yaml复制message_ttl: 3600 # 消息存活时间(s)
max_retries: 3
agents:
order:
endpoint: http://order-agent:8000
timeout: 5000 # 超时时间(ms)
inventory:
endpoint: http://inventory-agent:8001
bash复制# 启动Gateway
python gateway.py --config=config/gateway.yaml
# 启动订单Agent
python agents/order.py --port=8000
# 验证服务状态
curl http://gateway:8080/healthcheck
为每个Agent维护独立的HTTP连接池:
python复制import urllib3
from threading import Lock
class AgentConnector:
def __init__(self):
self._pools = {}
self._lock = Lock()
def get_connection(self, agent_id):
with self._lock:
if agent_id not in self._pools:
self._pools[agent_id] = urllib3.PoolManager(
maxsize=10,
timeout=urllib3.Timeout(connect=2.0, read=5.0)
)
return self._pools[agent_id]
适配器采用批量消息发送模式:
python复制def batch_send(messages):
from concurrent.futures import ThreadPoolExecutor
BATCH_SIZE = 5
with ThreadPoolExecutor(max_workers=3) as executor:
for i in range(0, len(messages), BATCH_SIZE):
batch = messages[i:i+BATCH_SIZE]
executor.submit(_send_to_feishu, batch)
| 错误类型 | 处理方式 | 重试间隔 |
|---|---|---|
| 网络超时 | 立即重试 | 1s → 3s → 5s |
| 飞书限频 | 延迟重试 | 60s固定 |
| 数据格式错误 | 丢弃消息 | - |
在Redis中设置专用存储空间:
python复制import redis
r = redis.Redis(host='redis', port=6379, db=1)
def save_dead_letter(msg):
r.rpush("dlq", json.dumps({
"timestamp": time.time(),
"original": msg,
"retry_count": 0
}))
yaml复制- name: agent_processing_time
type: histogram
labels: [agent_type]
help: "消息处理耗时分布"
- name: feishu_send_total
type: counter
labels: [status]
help: "飞书消息发送状态统计"
yaml复制groups:
- name: agent.rules
rules:
- alert: AgentProcessingTimeout
expr: rate(agent_processing_time_sum[1m]) / rate(agent_processing_time_count[1m]) > 5
for: 5m
labels:
severity: warning
飞书消息去重问题
发现相同内容消息会被折叠显示,解决方案是在消息头添加随机UUID:
python复制headers = {
"X-Request-ID": str(uuid.uuid4()),
"Content-Type": "application/json"
}
Agent进程假死
添加心跳检测机制:
python复制def health_check():
while True:
requests.post(f"http://gateway:8080/heartbeat/{AGENT_ID}")
time.sleep(30)
时区不一致
所有时间戳强制转为UTC+8:
python复制import pytz
def get_local_time():
return datetime.now(pytz.timezone('Asia/Shanghai'))
这套架构目前日均处理12万+消息,平均延迟控制在300ms以内。最关键的体会是:一定要为每个Agent设计独立的熔断机制,避免单个业务异常影响全局消息流。