作为一名长期从事AI产品研发的技术人,我深刻理解记忆能力对于智能体的重要性。还记得去年我们团队开发的一个电商客服Agent,用户第一次咨询时说"我想买一台适合编程的笔记本电脑",Agent详细推荐了配置;但当用户十分钟后问"刚才说的那款有什么颜色可选"时,Agent却完全忘记了之前的对话——这种"金鱼脑"式的体验让用户非常失望。
这正是当前大模型应用面临的核心痛点之一:缺乏持续、可靠的记忆能力。传统聊天机器人只需要维护简单的对话上下文,而现代Agent要完成复杂任务,必须建立完善的记忆系统。本文将系统介绍Agent记忆技术的发展现状、实现方案和最佳实践。
在传统对话系统中,记忆往往被简化为"上下文管理"——维护最近几轮对话历史即可。但对于任务型Agent,这种设计存在严重不足:
我们做过一个对比实验:在同一电商场景下,使用相同的大模型基础:
现代Agent通常采用分层记忆设计,模仿人类记忆机制:
当对话长度超过模型上下文限制时,我们需要智能地管理历史信息。以下是经过实战检验的四种方法:
python复制def dynamic_trim(history, max_tokens=8000):
"""
智能裁剪对话历史,保留最关键信息
策略:优先保留最近对话、系统消息和包含实体名的内容
"""
current_length = sum(len(t) for t in history)
while current_length > max_tokens and len(history) > 1:
# 计算每条消息的重要性得分
scores = []
for i, msg in enumerate(history):
score = 0
if i >= len(history)-3: score += 2 # 最近消息
if msg.startswith("系统:"): score += 1 # 系统提示
if any(e in msg for e in ["姓名","地址","订单"]): score += 1 # 关键实体
scores.append(score)
# 移除得分最低的消息
min_idx = scores.index(min(scores))
removed = history.pop(min_idx)
current_length -= len(removed)
return history
我们开发了一种渐进式摘要方法:
python复制def generate_summary(dialogue_chunk):
prompt = f"""请用中文生成对话摘要,要求:
- 保留用户意图、决策和关键事实
- 忽略寒暄和重复内容
- 不超过100字
对话内容:
{dialogue_chunk}
摘要:"""
return llm_completion(prompt)
我们在客服Agent中实现了这样的笔记系统:
python复制class AgentNotebook:
def __init__(self):
self.facts = {} # 已验证的事实 {"产品X库存": "充足"}
self.preferences = {} # 用户偏好 {"配送方式": "次日达"}
self.task_state = { # 任务进度
"current_step": None,
"completed": [],
"pending": []
}
def update_from_dialogue(self, dialogue):
# 使用LLM提取笔记更新项
prompt = f"""从对话中提取:
1. 新确认的事实
2. 用户表达的偏好
3. 任务状态变化
对话:{dialogue}
返回JSON格式"""
updates = json.loads(llm_completion(prompt))
self.facts.update(updates.get("facts", {}))
self.preferences.update(updates.get("preferences", {}))
if "task_state" in updates:
self.task_state = updates["task_state"]
我们测试了三种主流方案:
| 方案 | 写入速度 | 查询延迟 | 内存占用 | 适合场景 |
|---|---|---|---|---|
| Chroma | 快 | 20-50ms | 低 | 开发原型 |
| Weaviate | 中 | 50-100ms | 中 | 生产环境 |
| Pinecone | 慢 | 10-30ms | 高 | 大规模部署 |
单纯的向量搜索在业务场景中往往不够,我们采用分层检索:
python复制def hybrid_search(query, user_id, top_k=5):
# 关键词提取
keywords = extract_keywords(query)
# 第一层:元数据过滤
base_filter = {
"user_id": user_id,
"timestamp": {"$gte": time.time() - 30*24*3600} # 最近30天
}
# 第二层:向量搜索
vector_results = collection.query(
query_texts=[query],
where=base_filter,
n_results=top_k*3 # 扩大召回
)
# 第三层:混合排序
def score_doc(doc):
# 语义分(已由向量数据库提供)
semantic_score = doc["similarity"]
# 关键词分
keyword_score = sum(1 for kw in keywords if kw in doc["text"])
# 时效分(最近7天权重更高)
recency = 1 - min(1, (time.time()-doc["timestamp"])/(7*24*3600))
return 0.6*semantic_score + 0.3*keyword_score + 0.1*recency
sorted_results = sorted(vector_results, key=score_doc, reverse=True)
return sorted_results[:top_k]
长期记忆需要定期维护,我们设计了这样的清洗规则:
python复制def memory_maintenance():
# 清理过期记忆
expired = collection.get(where={
"timestamp": {"$lt": time.time() - 90*24*3600}
})
archive_to_cold_storage(expired)
# 更新权重
all_memories = collection.get()
for mem in all_memories:
last_accessed = mem["last_retrieved"] or mem["timestamp"]
age_factor = 0.9 ** ((time.time()-last_accessed)/(24*3600))
new_weight = mem["weight"] * age_factor
collection.update(
id=mem["id"],
weight=new_weight
)
# 低权重记忆降级
low_weight = collection.get(where={"weight": {"$lt": 0.1}})
for mem in low_weight:
if mem["importance"] < 0.5:
collection.delete(mem["id"])
我们在客服系统中实现了这种方案,关键改进点:
python复制class ObservationMemory:
def __init__(self):
self.observation_stream = [] # 原始观测
self.compressed_views = {
"user_preferences": {},
"ongoing_issues": [],
"resolved_cases": []
}
def add_observation(self, user_input):
self.observation_stream.append(user_input)
# 触发增量压缩
if len(self.observation_stream) % 5 == 0:
self._compress_views()
def _compress_views(self):
# 偏好提取
pref_updates = llm_extract_preferences(self.observation_stream[-5:])
self.compressed_views["user_preferences"].update(pref_updates)
# 问题跟踪
new_issues = llm_detect_issues(self.observation_stream[-5:])
self.compressed_views["ongoing_issues"].extend(new_issues)
# 解决状态更新
resolved = llm_check_resolutions(self.observation_stream[-5:])
self.compressed_views["resolved_cases"].extend(resolved)
self.compressed_views["ongoing_issues"] = [
i for i in self.compressed_views["ongoing_issues"]
if i["id"] not in {r["issue_id"] for r in resolved}
]
对于复杂的客户服务场景,我们构建了知识图谱记忆:
python复制class GraphMemory:
def __init__(self):
self.graph = Graph()
self._init_schema()
def _init_schema(self):
# 定义节点和关系类型
self.graph.schema.add_node_type("User")
self.graph.schema.add_node_type("Issue")
self.graph.schema.add_node_type("Product")
self.graph.schema.add_node_type("Solution")
self.graph.schema.add_relation_type("encounters", ("User", "Issue"))
self.graph.schema.add_relation_type("affects", ("Issue", "Product"))
self.graph.schema.add_relation_type("has_solution", ("Product", "Solution"))
def add_interaction(self, user_id, dialogue):
# 从对话提取实体和关系
extraction_prompt = f"""从对话提取:
- 提及的用户特征
- 反映的问题
- 涉及的产品
- 提出的解决方案
对话:{dialogue}
返回JSON格式"""
entities = json.loads(llm_completion(extraction_prompt))
# 更新图结构
with self.graph.transaction():
user_node = self.graph.get_or_create("User", user_id)
for issue in entities.get("issues", []):
issue_node = self.graph.get_or_create("Issue", issue["id"])
self.graph.create_relation(user_node, "encounters", issue_node)
for product in issue.get("affected_products", []):
product_node = self.graph.get_or_create("Product", product["id"])
self.graph.create_relation(issue_node, "affects", product_node)
for solution in product.get("solutions", []):
solution_node = self.graph.get_or_create("Solution", solution["id"])
self.graph.create_relation(product_node, "has_solution", solution_node)
在真实业务场景中,我们总结了这些关键指标和优化方法:
| 指标 | 达标值 | 优化手段 |
|---|---|---|
| 记忆检索延迟 | <200ms | 分级缓存、预取策略 |
| 记忆更新吞吐量 | >1000次/秒 | 批量写入、异步处理 |
| 记忆一致性 | 最终一致 | 版本控制、冲突解决机制 |
| 存储成本 | <$0.1/用户/月 | 分层存储、压缩算法 |
我们采用的记忆系统高可用方案:
python复制class MemoryService:
def __init__(self):
self.cache = LRUCache(maxsize=10_000) # 本地缓存
self.redis = RedisCluster() # 分布式缓存
self.storage = VectorDB() # 持久化存储
self.write_queue = Queue() # 写入队列
# 启动后台处理线程
self._start_writer_thread()
def _start_writer_thread(self):
def writer():
batch = []
while True:
item = self.write_queue.get()
batch.append(item)
if len(batch) >= 100 or time.time()-last_write > 5:
try:
self.storage.batch_upsert(batch)
batch = []
except Exception as e:
log_error(e)
save_to_wal(batch) # 写入预写日志
Thread(target=writer, daemon=True).start()
def add_memory(self, user_id, memory):
# 先更新缓存
cache_key = f"{user_id}:{memory['id']}"
self.cache[cache_key] = memory
self.redis.set(cache_key, memory)
# 异步持久化
self.write_queue.put(memory)
当前记忆技术正在向三个方向发展:
对于准备实施记忆系统的团队,我的建议是:
记忆系统的建设不是一蹴而就的,我们团队的系统也经历了三次重大重构。关键是要建立持续改进的机制,定期收集用户反馈,分析记忆系统的使用效果。在我的实践中,最有效的优化往往来自于对真实用户痛点的深入理解,而不是单纯的技术升级。