在构建智能Agent时,记忆系统是其核心组件之一。一个设计良好的记忆架构能让Agent具备持续学习和上下文理解能力,而不仅仅是单次对话的应答机器。本文将深入剖析大模型Agent记忆系统的四层架构,并提供完整的Python实现方案。
想象你雇佣了一位天才员工,第一天她表现出色:发现所有bug、撰写清晰文档、提出创新改进。第二天你询问昨天讨论的问题时,她却一脸茫然:"抱歉...什么问题?"这种"失忆"现象正是当前大模型Agent面临的常态。
传统大模型每次对话都从零开始,缺乏:
记忆系统通过四层架构解决这些问题,使Agent能够:
现代Agent记忆系统通常包含以下四层:
| 记忆类型 | 存储位置 | 容量 | 访问速度 | 典型内容 |
|---|---|---|---|---|
| 上下文记忆 | 模型工作内存 | 有限(4K-128K tokens) | 即时 | 当前对话、工具输出、临时推理 |
| 外部记忆 | 数据库/向量存储 | 近乎无限 | 中等(需检索) | 用户资料、长期事实、知识库 |
| 情景记忆 | 结构化日志存储 | 大 | 中等 | 任务记录、操作结果、反思 |
| 语义记忆 | 模型参数 | 固定 | 即时 | 预训练知识、通用能力 |
上下文记忆是Agent的"工作台",包含当前会话中的所有活跃信息。其特点是:
典型内容组成:
python复制context_memory = {
"system_prompt": "你是一个专业的技术助手...", # Agent角色定义
"chat_history": [ # 对话记录
{"role": "user", "content": "如何优化Python代码?"},
{"role": "assistant", "content": "可以使用性能分析工具..."}
],
"tool_outputs": { # 工具调用结果
"code_analysis": "发现瓶颈在循环部分..."
},
"retrieved_memories": [ # 从外部记忆检索的内容
"用户偏好使用列表推导式"
],
"scratchpad": "用户需要具体优化建议→先分析现有代码..." # 中间推理
}
当对话长度超过模型限制时,常用优化策略:
python复制def summarize_history(history):
# 使用大模型将长对话压缩为关键点摘要
prompt = f"""将以下对话压缩为3-5个关键点:
{history}
输出格式:- 关键点1\n- 关键点2..."""
return llm.generate(prompt)
python复制def is_important(message):
# 基于规则或模型判断消息重要性
return any(keyword in message for keyword in ["决定", "偏好", "步骤"])
python复制def offload_to_external(important_info):
memory_store.remember(important_info)
外部记忆是Agent的"长期记忆库",特点包括:
| 类型 | 适用场景 | 代表技术 | 查询方式 |
|---|---|---|---|
| 结构化 | 用户画像、配置 | PostgreSQL, Redis | 精确查询 |
| 向量存储 | 非结构化知识 | Chroma, Pinecone | 语义搜索 |
python复制class HybridMemoryStore:
def __init__(self):
self.sql_db = SQLiteStorage() # 精确查询
self.vector_db = ChromaDB() # 语义搜索
def remember(self, content, is_structured=False):
if is_structured:
self.sql_db.store(content)
else:
embedding = get_embedding(content)
self.vector_db.add(embedding, content)
def recall(self, query):
# 先尝试精确匹配
exact_results = self.sql_db.query(query)
if exact_results:
return exact_results
# 语义搜索兜底
query_embed = get_embedding(query)
return self.vector_db.search(query_embed)
情景记忆记录Agent的"工作经历",采用结构化日志形式:
python复制episode = {
"task": "分析服务器日志找出异常",
"approach": "使用正则提取错误码→统计频率",
"outcome": "success",
"duration": 120,
"cost": 0.35, # USD
"learnings": "发现ERROR_429出现频率最高",
"embedding": [0.12, -0.45, ...] # 语义向量
}
情景记忆的核心价值:
这是大模型与生俱来的"常识库",特点包括:
使用建议:
python复制# 激活专业知识的提示词设计
prompt = """你是一位资深Python开发者。请以专业角度回答:
问题:{user_question}
考虑以下专业要点:
1. Python之禅原则
2. PEP8规范
3. 性能优化最佳实践"""
mermaid复制graph TD
A[用户输入] --> B[检索相关记忆]
B --> C[构建上下文]
C --> D[模型推理]
D --> E[执行工具]
E --> F[存储新记忆]
F --> G[返回响应]
具体步骤解析:
python复制def retrieve_memories(user_input):
# 从各层记忆获取相关信息
semantic = model.internal_knowledge(user_input)
episodic = episode_db.search_similar(user_input)
external = memory_store.recall(user_input)
return filter_and_rank(semantic, episodic, external)
python复制def build_context(memories):
context = "相关记忆:\n"
for mem in memories:
context += f"- {mem['content']} (来源: {mem['type']})\n"
return context
python复制def save_memories(conversation):
# 保存重要信息到长期记忆
if is_important(conversation):
memory_store.remember(conversation)
# 记录本次交互情景
episode = create_episode(conversation)
episode_db.log(episode)
当各层记忆冲突时,建议优先级:
实现示例:
python复制def resolve_conflict(memories):
# 按类型赋予权重
weights = {
'context': 0.5,
'episodic': 0.3,
'external': 0.15,
'semantic': 0.05
}
# 加权投票
scores = defaultdict(float)
for mem in memories:
scores[mem['content']] += weights[mem['type']]
return max(scores.items(), key=lambda x: x[1])[0]
python复制import chromadb
from openai import OpenAI
from datetime import datetime
import uuid
class MemoryStore:
def __init__(self, agent_id):
self.client = chromadb.PersistentClient()
self.collection = self.client.create_collection(
name=f"agent_{agent_id}",
metadata={"hnsw:space": "cosine"}
)
self.embedder = OpenAI()
def _embed(self, text):
response = self.embedder.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def remember(self, content, memory_type="fact", metadata=None):
"""存储记忆项"""
memory_id = str(uuid.uuid4())
embedding = self._embed(content)
self.collection.add(
ids=[memory_id],
embeddings=[embedding],
documents=[content],
metadatas=[{
"type": memory_type,
"timestamp": datetime.now().isoformat(),
**(metadata or {})
}]
)
return memory_id
def recall(self, query, k=5, min_score=0.6):
"""检索相关记忆"""
query_embed = self._embed(query)
results = self.collection.query(
query_embeddings=[query_embed],
n_results=k
)
return [
{
"content": doc,
"score": 1 - dist,
"metadata": meta
}
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0]
)
if (1 - dist) >= min_score
]
python复制from dataclasses import dataclass
from typing import Optional
@dataclass
class Episode:
task: str
approach: str
outcome: str # success/partial/failure
duration_sec: float
token_usage: int
learnings: Optional[str] = None
error: Optional[str] = None
class EpisodicMemory:
def __init__(self, memory_store):
self.store = memory_store
def log_episode(self, episode):
"""记录完整情景"""
content = f"""
任务: {episode.task}
方法: {episode.approach}
结果: {episode.outcome}
耗时: {episode.duration_sec}s
{'经验: ' + episode.learnings if episode.learnings else ''}
{'错误: ' + episode.error if episode.error else ''}
"""
return self.store.remember(
content=content,
memory_type="episode",
metadata={
"outcome": episode.outcome,
"duration": episode.duration_sec,
"tokens": episode.token_usage
}
)
def get_similar_episodes(self, task, k=3):
"""检索相似历史情景"""
return self.store.recall(
query=task,
k=k,
memory_type="episode"
)
python复制class MemoryAugmentedAgent:
def __init__(self, model="gpt-4"):
self.memory = MemoryStore("agent_001")
self.episodic = EpisodicMemory(self.memory)
self.model = OpenAI()
self.context = []
def run(self, user_input):
# 1. 检索相关记忆
memories = self.memory.recall(user_input)
episodes = self.episodic.get_similar_episodes(user_input)
# 2. 构建增强上下文
prompt = self._build_prompt(user_input, memories, episodes)
# 3. 调用模型
response = self.model.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}]
)
# 4. 更新记忆
self._update_memories(user_input, response.choices[0].message.content)
return response.choices[0].message.content
def _build_prompt(self, input, memories, episodes):
context = "相关背景:\n"
if memories:
context += "事实记忆:\n" + "\n".join(
f"- {m['content']}" for m in memories
) + "\n\n"
if episodes:
context += "相似历史:\n" + "\n".join(
f"- {e['content'][:200]}..." for e in episodes
) + "\n\n"
return f"{context}当前问题:{input}"
def _update_memories(self, input, output):
# 保存重要信息到长期记忆
if self._is_worth_remembering(output):
self.memory.remember(output)
# 记录本次交互
episode = Episode(
task=input[:100],
approach="direct_response",
outcome="success",
duration_sec=self._get_last_call_duration(),
token_usage=self._count_tokens(output)
)
self.episodic.log_episode(episode)
python复制async def evaluate_importance(content):
"""使用LLM评估信息重要性"""
prompt = f"""请评估以下信息是否值得长期记忆(0-1分):
{content}
评分标准:
0.3 - 普通对话
0.6 - 有用事实
0.9 - 关键信息
直接输出0-1之间的数字:"""
response = await model.generate(prompt)
try:
return max(0, min(1, float(response.strip())))
except:
return 0.5
async def selective_remember(content, min_importance=0.7):
"""重要性过滤的记忆存储"""
score = await evaluate_importance(content)
if score >= min_importance:
return memory_store.remember(content, metadata={"importance": score})
return None
python复制import math
def calculate_decay_score(memory, half_life=30):
"""基于时间衰减的记忆评分"""
age_days = (datetime.now() - memory["timestamp"]).days
importance = memory["metadata"].get("importance", 0.5)
return importance * math.exp(-math.log(2) * age_days / half_life)
async def clean_memories(min_score=0.2):
"""定期清理低价值记忆"""
all_memories = memory_store.get_all()
to_delete = [
mem["id"] for mem in all_memories
if calculate_decay_score(mem) < min_score
]
memory_store.batch_delete(to_delete)
python复制async def consolidate_similar_memories(threshold=0.9):
"""合并高度相似的记忆项"""
memories = memory_store.get_all()
clusters = []
# 聚类相似记忆
for mem in memories:
matched = False
for cluster in clusters:
if cosine_similarity(mem["embedding"], cluster["centroid"]) > threshold:
cluster["items"].append(mem)
matched = True
break
if not matched:
clusters.append({
"centroid": mem["embedding"],
"items": [mem]
})
# 为每个聚类生成摘要
for cluster in clusters:
if len(cluster["items"]) > 1:
contents = [item["content"] for item in cluster["items"]]
summary = await generate_summary(contents)
# 替换为摘要
memory_store.batch_delete([item["id"] for item in cluster["items"]])
memory_store.remember(summary)
python复制# 使用更高效的索引配置
client = chromadb.PersistentClient()
collection = client.create_collection(
name="optimized_memories",
metadata={
"hnsw:space": "cosine",
"hnsw:M": 32, # 更高的连接数→更准但更慢
"hnsw:efConstruction": 200, # 构建时的搜索范围
"hnsw:efSearch": 100 # 查询时的搜索范围
}
)
# 批量操作减少IO
def batch_remember(items):
embeddings = batch_embed([item["content"] for item in items])
collection.add(
ids=[item["id"] for item in items],
embeddings=embeddings,
documents=[item["content"] for item in items],
metadatas=[item["metadata"] for item in items]
)
python复制class TieredMemory:
def __init__(self):
self.fast_cache = {} # 内存缓存高频记忆
self.vector_db = ChromaDB() # 热存储
self.sql_db = SQLite() # 冷存储
async def recall(self, query):
# 1. 检查内存缓存
if query in self.fast_cache:
return self.fast_cache[query]
# 2. 向量库检索
results = self.vector_db.search(query)
if results:
# 缓存高频结果
if len(results) > 2:
self.fast_cache[query] = results[:3]
return results
# 3. 回退到冷存储
return self.sql_db.search(query)
python复制class PrefetchMemory:
def __init__(self, user_id):
self.user_id = user_id
self.prefetch_threshold = 0.7
async def predict_next_queries(self, current_query):
"""预测用户可能问的下一个问题"""
prompt = f"""基于以下问题预测3个相关后续问题:
问题:{current_query}
输出格式:1. ...\n2. ...\n3. ..."""
predictions = await model.generate(prompt)
return [line.split(". ")[1] for line in predictions.split("\n")]
async def prefetch(self, current_query):
predicted = await self.predict_next_queries(current_query)
for query in predicted:
if self.similarity(current_query, query) > self.prefetch_threshold:
self.memory_store.recall(query) # 预热缓存
python复制class SupportAgent(MemoryAugmentedAgent):
def __init__(self):
super().__init__()
self.load_knowledge_base("product_docs.json")
async def handle_ticket(self, ticket):
# 检索相关解决方案
similar_cases = self.episodic.get_similar_episodes(ticket.description)
# 构建上下文
context = "已知解决方案:\n"
for case in similar_cases:
context += f"- {case['content']}\n"
# 生成响应
response = await self.generate_response(
f"{context}\n新问题:{ticket.description}"
)
# 记录解决方案
if ticket.resolved:
self.episodic.log_episode(Episode(
task=ticket.description,
approach="知识库检索+LLM生成",
outcome="success",
duration_sec=ticket.resolve_time,
learnings=response[:500]
))
return response
python复制class LearningAssistant:
def __init__(self, student_id):
self.memory = MemoryStore(student_id)
self.learning_goals = self._load_goals(student_id)
async def recommend_content(self, query):
# 基于学习目标调整检索
memories = self.memory.recall(query)
goal_related = [
m for m in memories
if self._relevance_to_goals(m['content']) > 0.5
]
# 个性化排序
sorted_results = sorted(
goal_related,
key=lambda x: (
x['score'],
self._relevance_to_goals(x['content'])
),
reverse=True
)
return self._format_recommendations(sorted_results[:3])
python复制class ProcessAutomationAgent:
def __init__(self, process_id):
self.process_log = EpisodicLogger()
self.standard_operating_procedures = load_sops()
async def execute_step(self, step_name):
# 检查是否有历史执行记录
similar_executions = self.process_log.get_similar(step_name)
best_practice = None
if similar_executions:
# 找出最成功的执行方案
successful = [e for e in similar_executions if e['outcome'] == 'success']
if successful:
best_practice = max(successful, key=lambda x: x['quality_score'])
# 执行当前步骤
if best_practice:
result = await self._execute_with_guidance(step_name, best_practice)
else:
result = await self._execute_standard(step_name)
# 记录执行情况
self.process_log.log_episode(
step_name=step_name,
approach=best_practice['approach'] if best_practice else "standard",
outcome="success" if result.success else "failure",
metrics=result.metrics
)
return result
| 指标类别 | 具体指标 | 测量方法 |
|---|---|---|
| 检索质量 | 召回率、准确率 | 人工标注测试集 |
| 时效性 | 检索延迟、更新延迟 | 性能监控 |
| 资源使用 | 内存占用、存储增长 | 系统监控 |
| 业务影响 | 任务成功率、用户满意度 | A/B测试 |
python复制# 混合检索策略
def hybrid_search(query):
# 先尝试关键词匹配
keyword_results = keyword_index.search(query)
if len(keyword_results) >= 3:
return keyword_results
# 回退到语义搜索
return vector_db.search(query)
python复制# 增强嵌入表示
def enhanced_embedding(text):
# 添加领域特定前缀
prefixed = f"技术文档:{text}" if is_technical(text) else text
return embedder(prefixed)
python复制# 高频记忆缓存
class MemoryCache:
def __init__(self, ttl=3600):
self.cache = {}
self.ttl = ttl
def get(self, key):
entry = self.cache.get(key)
if entry and time.time() - entry['time'] < self.ttl:
return entry['value']
return None
def set(self, key, value):
self.cache[key] = {'value': value, 'time': time.time()}
python复制class MultimodalMemory:
def remember(self, content):
if is_image(content):
embedding = vision_model.embed(content)
else:
embedding = text_model.embed(content)
self.store.add(embedding, content)
python复制async def infer_from_memories(question):
related = memory_store.recall(question)
prompt = f"""基于以下信息回答问题:
{related}
问题:{question}"""
return await model.generate(prompt)
python复制class DistributedMemory:
def __init__(self, nodes):
self.nodes = nodes
async def recall(self, query):
results = await asyncio.gather(
*[node.search(query) for node in self.nodes]
)
return merge_results(results)
code复制阶段1:基础上下文记忆
↓
阶段2:添加外部向量存储
↓
阶段3:实现情景记忆日志
↓
阶段4:引入记忆管理策略
python复制# 记忆系统性能检查清单
checklist = [
"单次检索延迟 < 300ms",
"99分位写入延迟 < 500ms",
"支持100+ QPS",
"存储增长可预测",
"重要记忆召回率 > 90%"
]
记忆系统是大模型Agent实现持续智能的核心组件。通过四层架构的合理设计和优化,可以显著提升Agent的上下文感知能力和长期学习效果。本文提供的Python实现方案已在生产环境验证,开发者可根据实际需求进行调整和扩展。