最近在帮团队搭建一个智能化的消息通知系统,需要把多个AI智能体(Agent)的决策结果实时推送到飞书群聊。传统的单机器人通知方式已经无法满足复杂业务场景的需求,比如同时需要处理客户咨询、异常告警、数据报表等不同类型的消息推送。经过技术选型,最终决定基于OpenClaw框架实现多Agent与飞书机器人的对接方案。
这个方案的核心价值在于:
code复制[Agent集群] -> [消息网关] -> [飞书机器人] -> [飞书群聊]
↑ ↑
[任务调度中心] [消息状态监控]
采用微服务架构,每个Agent独立部署,通过gRPC接口与消息网关通信。根据业务需求,我们部署了三种类型的Agent:
每个Agent都有独立的配置管理,包括:
yaml复制agent:
name: "customer_service"
priority: 1
rate_limit: 100/分钟
message_types: ["text", "rich_text"]
消息网关是整个系统的核心,主要功能包括:
关键代码片段:
python复制class MessageGateway:
def __init__(self):
self.robots = {
'alert': FeishuRobot(webhook1),
'report': FeishuRobot(webhook2)
}
async def dispatch(self, msg: Message):
if msg.type == 'alert' and msg.priority > 3:
await self.robots['alert'].send(msg)
code复制https://open.feishu.cn/open-apis/bot/v2/hook/{唯一标识符}
飞书支持多种消息类型,我们主要使用以下三种:
json复制{
"msg_type": "text",
"content": {
"text": "监控告警:服务器CPU使用率超过90%"
}
}
json复制{
"msg_type": "post",
"content": {
"post": {
"zh_cn": {
"title": "日报数据",
"content": [
[{"tag": "text", "text": "今日销售额:"}],
[{"tag": "a", "text": "查看详情", "href": "https://example.com"}]
]
}
}
}
}
支持按钮、选择器等交互组件:
python复制def build_interactive_msg():
return {
"msg_type": "interactive",
"card": {
"elements": [{
"tag": "button",
"text": {"content": "处理完成", "tag": "plain_text"},
"type": "primary",
"value": {"action": "resolve_alert"}
}]
}
}
由于Agent可能重复发送相同内容,我们实现了基于内容哈希的去重:
python复制import hashlib
from datetime import datetime, timedelta
class DedupManager:
def __init__(self):
self.cache = {}
def check_duplicate(self, content: str, ttl=300) -> bool:
content_hash = hashlib.md5(content.encode()).hexdigest()
if content_hash in self.cache:
return True
self.cache[content_hash] = datetime.now() + timedelta(seconds=ttl)
return False
采用指数退避算法实现自动重试:
python复制async def send_with_retry(robot, msg, max_retries=3):
base_delay = 1
for attempt in range(max_retries):
try:
return await robot.send(msg)
except Exception as e:
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
raise Exception(f"Failed after {max_retries} retries")
使用Redis实现分布式限流:
python复制import redis
from redis.exceptions import RedisError
class RateLimiter:
def __init__(self, redis_conn, key, max_requests, period):
self.redis = redis_conn
self.key = key
self.max = max_requests
self.period = period
async def acquire(self) -> bool:
try:
current = self.redis.llen(self.key)
if current >= self.max:
return False
self.redis.lpush(self.key, time.time())
self.redis.expire(self.key, self.period)
return True
except RedisError:
return True # 故障时放行
使用Prometheus采集关键指标:
python复制from prometheus_client import Counter, Gauge
# 定义指标
MESSAGES_SENT = Counter('messages_sent_total', 'Total messages sent')
MESSAGE_LATENCY = Gauge('message_delivery_latency', 'Message delivery latency')
# 在发送逻辑中记录
start_time = time.time()
await robot.send(msg)
MESSAGE_LATENCY.set(time.time() - start_time)
MESSAGES_SENT.inc()
示例告警规则(PromQL):
code复制# 消息积压告警
alert: MessageBacklogTooHigh
expr: rate(messages_sent_total[5m]) - rate(messages_delivered_total[5m]) > 100
for: 10m
# 高延迟告警
alert: MessageDeliveryHighLatency
expr: message_delivery_latency > 5
for: 5m
使用Docker Compose编排服务:
yaml复制version: '3'
services:
gateway:
image: openclaw-gateway:latest
ports:
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379
agent-customer:
image: customer-agent:1.2
depends_on:
- gateway
redis:
image: redis:alpine
连接池配置:
python复制import aiohttp
async def create_session():
return aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=100),
timeout=aiohttp.ClientTimeout(total=30)
)
批量消息处理:
python复制async def batch_send(messages):
semaphore = asyncio.Semaphore(100) # 并发控制
async with semaphore:
tasks = [send_message(msg) for msg in messages]
return await asyncio.gather(*tasks, return_exceptions=True)
将客服Agent接入工单系统后:
实现效果:
改造传统邮件告警为飞书交互式告警:
实施结果:
消息智能路由增强:
多平台支持:
消息追踪与分析:
这套系统经过半年多的生产环境验证,目前日均处理消息量超过50万条,可靠性达到99.99%。最大的收获是认识到好的消息系统不仅要保证送达,更要考虑消息的消费体验和后续处理流程。