1. 从50行代码到多智能体系统的架构演进
当我第一次看到Claude Code这个项目时,就被它精妙的设计哲学所震撼。这个由MIT团队开源的AI智能体框架,用12个渐进式模块向我们展示了如何从最基础的50行代码开始,逐步构建出一个功能完备的多智能体系统。这不仅仅是一个技术实现,更是一场关于AI智能体本质的认知革命。
1.1 核心设计理念:模型即智能体
传统AI开发中,我们往往把大量精力花在构建复杂的框架和工作流上。但Claude Code提出了一个颠覆性的观点:
AI智能体本质上就是一个经过训练的深度神经网络,而我们的工作是为它构建能够有效运作的环境——包括工具、知识、上下文管理和观察接口。
这个理念体现在项目的整体架构中:一个永远不变的核心循环,加上一系列可插拔的harness(操控系统)机制。这种设计实现了极致的关注点分离,让系统既保持简单又具备强大的扩展能力。
1.2 12层架构全景图
让我们快速浏览这12个渐进式模块的设计思路:
- 基础层(s01-s02):50行的核心循环+工具分发机制
- 认知层(s03-s05):任务规划、子智能体隔离、按需知识加载
- 扩展层(s06-s07):上下文压缩、文件级任务持久化
- 并发层(s08-s10):后台任务、团队邮箱通信、协作协议
- 自主层(s11-s12):任务自动认领、Git工作树隔离
这种分层设计让学习者可以循序渐进地理解每个概念,而开发者则能按需选择功能模块。
2. 基础架构深度解析
2.1 核心循环:50行的智能本质
s01_agent_loop.py是整个系统的DNA,其核心逻辑简洁得令人惊叹:
python复制while True:
response = llm.call(messages)
if response.stop_reason == "tool_use":
tool_result = execute_tool(response.tool)
messages.append({"role": "tool", "content": tool_result})
elif response.stop_reason == "end_turn":
return response.content
这个循环实现了智能体的基本交互模式:
- 调用LLM获取响应
- 如果是工具调用,执行并记录结果
- 如果是结束标记,返回最终响应
- 重复这个过程
这个模式在整个项目中从未改变,后续所有功能都是在这个循环基础上添加的harness机制。
2.2 工具分发:系统的扩展基石
s02_tool_use.py引入了工具分发机制,这是系统可扩展性的关键:
python复制TOOLS = {
"bash": bash_handler,
"read_file": read_file_handler,
"write_file": write_file_handler
}
def execute_tool(tool_call):
handler = TOOLS[tool_call.name]
return handler(tool_call.arguments)
这种name-to-handler的映射模式带来了几个重要优势:
- 松耦合:新增工具只需注册新条目,不修改核心逻辑
- 易维护:工具实现与调用逻辑分离
- 动态加载:运行时可以动态更新TOOLS字典
3. 认知能力构建
3.1 TodoWrite:显式任务规划
s03_todo_write.py引入的TodoManager让智能体学会了"先计划后行动":
python复制class TodoManager:
def __init__(self):
self.todos = []
self.current_index = 0
def create(self, items):
self.todos = [{"id": i, "text": item, "status": "pending"}
for i, item in enumerate(items)]
def mark_complete(self, todo_id):
self.todos[todo_id]["status"] = "completed"
self._check_and_nag()
def _check_and_nag(self):
pending = [t for t in self.todos if t["status"] == "pending"]
if not pending and self.todos:
return "✅ All tasks completed!"
return f"⚠️ {len(pending)} tasks remaining"
这个机制的价值在于:
- 将隐式的思考过程显式化
- 提供任务执行的可观察性
- 实现自动化的进度跟踪
3.2 子智能体:上下文隔离的艺术
s04_subagent.py解决了复杂任务中的上下文污染问题:
python复制class Subagent:
def __init__(self, parent_messages):
self.messages = [] # 全新的上下文!
self.parent_messages = parent_messages
def execute(self, task_description):
self.messages.append({
"role": "system",
"content": f"You are a subagent. Task: {task_description}"
})
# 运行独立的agent循环
while True:
response = llm.call(self.messages)
if response.stop_reason == "end_turn":
return response.content
# ... 处理工具调用
关键设计点:
- 每个子智能体有独立的messages数组
- 父任务上下文通过构造函数传入但不直接使用
- 子任务完成后返回结果,不污染父上下文
4. 系统级能力增强
4.1 上下文压缩:突破token限制
s06_context_compact.py实现了三层压缩策略:
python复制class ContextCompressor:
def micro_compact(self, messages):
"""移除工具调用的详细参数,只保留结果摘要"""
return [self._summarize_tool_call(m) if m["role"] == "tool"
else m for m in messages]
def auto_compact(self, messages, threshold=0.8):
"""当token使用率达到阈值时自动压缩"""
if self.token_count(messages) > threshold * MAX_TOKENS:
return self._compact_older_messages(messages, keep_last=10)
return messages
def full_summary(self, messages):
"""生成完整会话摘要,替换历史记录"""
summary = llm.generate_summary(messages)
return [{"role": "system", "content": f"Session summary: {summary}"}]
这种分级压缩策略实现了:
- 实时轻量压缩:不影响性能的微压缩
- 自动触发:基于token使用率的智能压缩
- 完整摘要:长期记忆保留
4.2 任务系统:持久化与依赖管理
s07_task_system.py将任务管理提升到生产级别:
python复制class TaskSystem:
def __init__(self, tasks_dir="./tasks"):
self.tasks_dir = tasks_dir
os.makedirs(tasks_dir, exist_ok=True)
def create(self, task_id, description, dependencies=[]):
task_file = f"{self.tasks_dir}/{task_id}.json"
task = {
"id": task_id,
"description": description,
"status": "pending",
"dependencies": dependencies,
"created_at": datetime.now().isoformat(),
"result": None
}
with open(task_file, 'w') as f:
json.dump(task, f, indent=2)
def can_execute(self, task_id):
"""检查依赖是否全部完成"""
task = self.load(task_id)
for dep_id in task["dependencies"]:
dep_task = self.load(dep_id)
if dep_task["status"] != "completed":
return False
return True
这个系统实现了:
- 文件级持久化:重启不丢失
- 依赖管理:形成任务DAG
- 原子操作:避免竞态条件
5. 多智能体协作
5.1 团队邮箱:JSONL通信协议
s09_agent_teams.py实现了基于JSONL的邮箱系统:
python复制class AgentMailbox:
def __init__(self, agent_id, team_dir="./team"):
self.agent_id = agent_id
self.inbox_file = f"{team_dir}/{agent_id}_inbox.jsonl"
self.outbox_file = f"{team_dir}/{agent_id}_outbox.jsonl"
def send(self, to_agent_id, message_type, content):
"""发送消息到队友的邮箱"""
message = {
"from": self.agent_id,
"to": to_agent_id,
"type": message_type,
"content": content,
"timestamp": datetime.now().isoformat(),
"read": False
}
outbox_file = f"./team/{to_agent_id}_inbox.jsonl"
with open(outbox_file, 'a') as f:
f.write(json.dumps(message) + '\n')
JSONL格式的优势:
- 追加写入:无需文件锁
- 行独立:易于解析和错误恢复
- 持久化:重启不丢失消息
5.2 协作协议:状态机驱动
s10_team_protocols.py引入了关键的状态机:
python复制class PlanApprovalFSM:
def __init__(self):
self.state = "idle" # idle → proposed → approved → executing → completed
self.current_plan = None
self.approvals = set()
def propose_plan(self, agent_id, plan):
if self.state == "idle":
self.current_plan = plan
self.proposer = agent_id
self.approvals = {agent_id}
self.state = "proposed"
return True
return False
def approve_plan(self, agent_id):
if self.state == "proposed":
self.approvals.add(agent_id)
if len(self.approvals) >= self.required_approvals:
self.state = "approved"
return True
return False
这种显式的状态管理:
- 避免竞态条件
- 提供清晰的协作流程
- 易于调试和监控
6. 生产级实现与最佳实践
6.1 完整系统整合
s_full.py将前11个session整合成一个741行的生产级实现:
python复制class FullAgent:
def __init__(self):
# s01: 核心循环
self.messages = []
# s02: 工具分发
self.tools = self._initialize_tools()
# s03: 任务管理
self.todo_manager = TodoManager()
# ... 其他模块初始化
def repl(self):
"""交互式命令行"""
while True:
user_input = input(">>> ")
if user_input.startswith("/"):
self._handle_command(user_input)
else:
self._handle_user_message(user_input)
关键整合技巧:
- 模块化初始化
- 命令路由
- 状态统一管理
6.2 文件系统作为状态存储
整个系统大量使用文件系统而非数据库:
- 任务:JSON文件
- 通信:JSONL邮箱
- 上下文:序列化messages
- 版本:Git worktree
优势:
- 零依赖
- 可观察性强
- 易于备份
- 支持分布式
7. 实战应用场景
7.1 智能代码审查系统
结合技能加载和团队协作:
python复制# 加载代码审查技能
skill = skill_loader.load("code-review")
# 创建审查团队
team = AgentTeam(["reviewer_1", "reviewer_2", "security_expert"])
# 自动化任务分配
for module in codebase.modules:
task_system.create(
task_id=f"review_{module}",
description=f"Review {module} for bugs",
dependencies=[]
)
7.2 数据处理流水线
利用后台任务和工作树隔离:
python复制# 创建工作树隔离的代理
agents = [
IsolatedAgent(f"agent_{i}", task_system, worktree_manager)
for i in range(10)
]
# 分布式处理数据分片
for shard in data_shards:
task_system.create(
task_id=f"process_{shard.id}",
description=f"Process data shard {shard.id}",
dependencies=[f"validate_{shard.id}"]
)
8. 架构启示与经验总结
8.1 设计哲学启示
- 不变的核心:一个简单的循环足以支撑复杂系统
- 可插拔机制:通过harness添加功能而非修改核心
- 渐进式复杂度:每个session只引入一个新概念
- 文件系统优先:简单可靠的持久化方案
8.2 性能优化经验
- 上下文压缩时机:在auto_compact中使用0.8阈值是基于实测的最佳平衡点
- 任务分片大小:保持任务在100-500token范围内效率最高
- 子智能体深度:建议不超过3层嵌套以避免复杂度过高
8.3 调试技巧
- 隔离测试:每个harness机制都有独立测试用例
- 状态可视化:定期dump messages数组到文件
- 协议验证器:为每个状态机添加invariant检查
9. 学习路径建议
9.1 循序渐进的学习路线
- 第一周:掌握核心循环和工具分发(s01-s02)
- 第二周:理解任务规划和子智能体(s03-s04)
- 第三周:实践上下文压缩和任务系统(s05-s07)
- 第四周:构建多智能体系统(s08-s12)
9.2 推荐实践项目
- 个人知识助手:整合TodoWrite和技能加载
- 自动化测试系统:使用子智能体并行执行测试用例
- 智能运维监控:结合后台任务和自主代理
10. 常见问题解决方案
10.1 工具调用失败处理
python复制def execute_tool(tool_call):
try:
handler = TOOLS[tool_call.name]
return handler(tool_call.arguments)
except KeyError:
return f"Error: Unknown tool {tool_call.name}"
except Exception as e:
return f"Error executing {tool_call.name}: {str(e)}"
10.2 死锁预防策略
- 为任务设置超时时间
- 实现死锁检测算法
- 提供手动干预接口
10.3 上下文丢失问题
- 实现消息版本控制
- 定期快照关键状态
- 添加重放日志
11. 性能调优实战
11.1 并发控制参数
python复制class ConcurrentAgent:
def __init__(self):
self.max_concurrent_tools = 3 # 并行工具调用上限
self.tool_timeout = 30 # 秒
self.rate_limit = 100 # 每分钟LLM调用次数
11.2 缓存策略实现
python复制class ToolCache:
def __init__(self):
self.cache = {}
self.ttl = 3600 # 1小时
def get(self, tool_name, args):
key = self._make_key(tool_name, args)
if key in self.cache:
return self.cache[key]
return None
def set(self, tool_name, args, result):
key = self._make_key(tool_name, args)
self.cache[key] = result
12. 安全与权限管理
12.1 工具权限控制
python复制TOOL_PERMISSIONS = {
"read_file": {"roles": ["admin", "editor"]},
"write_file": {"roles": ["admin"]},
"bash": {"roles": ["admin"], "confirm": True}
}
def check_permission(tool_name, user_role):
perm = TOOL_PERMISSIONS.get(tool_name, {})
if not perm:
return False
return user_role in perm.get("roles", [])
12.2 敏感数据过滤
python复制class DataFilter:
def __init__(self):
self.patterns = [
r"\b\d{3}-\d{2}-\d{4}\b", # SSN
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b" # Email
]
def sanitize(self, text):
for pattern in self.patterns:
text = re.sub(pattern, "[REDACTED]", text)
return text
13. 监控与日志体系
13.1 关键指标收集
python复制class MetricsCollector:
def __init__(self):
self.metrics = {
"llm_calls": 0,
"tool_calls": defaultdict(int),
"avg_response_time": 0,
"errors": defaultdict(int)
}
def record_llm_call(self, duration):
self.metrics["llm_calls"] += 1
# 指数移动平均计算
self.metrics["avg_response_time"] = (
0.9 * self.metrics["avg_response_time"] + 0.1 * duration
)
13.2 结构化日志
python复制import logging
from pythonjsonlogger import jsonlogger
def setup_logging():
logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
"%(asctime)s %(levelname)s %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
14. 测试策略与方法
14.1 单元测试示例
python复制import unittest
class TestAgentLoop(unittest.TestCase):
def setUp(self):
self.agent = Agent()
def test_tool_dispatch(self):
test_tool = MockTool()
self.agent.register_tool("test_tool", test_tool)
result = self.agent.handle_tool_use("test_tool", {"param": "value"})
self.assertTrue(test_tool.called)
self.assertEqual(result, "expected_result")
14.2 集成测试方案
- 端到端测试:模拟用户完整会话流
- 负载测试:使用Locust模拟并发请求
- 混沌测试:随机杀死子进程测试恢复能力
15. 部署与运维实践
15.1 容器化部署
dockerfile复制FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "s_full.py"]
15.2 健康检查端点
python复制from fastapi import FastAPI
app = FastAPI()
@app.get("/health")
def health_check():
return {
"status": "healthy",
"metrics": metrics_collector.get_metrics()
}
16. 项目演进路线图
16.1 短期计划
- 增强工具生态系统
- 改进上下文压缩算法
- 添加更多预置技能
16.2 长期愿景
- 可视化编排界面
- 自动性能优化器
- 联邦学习支持
17. 社区资源与生态
17.1 核心资源
- 官方文档:详细的中英文教程
- GitHub仓库:MIT开源许可证
- Discord社区:活跃的开发者交流
17.2 扩展工具库
- Web浏览器工具:安全页面访问
- 数据分析工具:Pandas集成
- 可视化工具:图表生成
18. 架构演进思考
18.1 当前架构局限
- 文件系统性能瓶颈
- 缺乏正式验证手段
- 调试工具不够完善
18.2 未来改进方向
- 可插拔存储后端
- 形式化协议验证
- 时间旅行调试器
19. 行业应用展望
19.1 企业知识管理
- 自动化文档整理
- 智能问答系统
- 会议纪要分析
19.2 软件开发领域
- 智能代码审查
- 自动化测试生成
- 异常日志分析
20. 开发者成长建议
20.1 技能树构建
- 基础层:Python/系统设计
- 算法层:LLM原理/优化算法
- 工程层:分布式系统/可靠性工程
20.2 学习资源推荐
- 书籍:《AI Engineering》《Designing Data-Intensive Applications》
- 课程:MIT分布式系统/斯坦福NLP
- 论文:Transformer架构/RLHF技术
21. 性能基准测试数据
21.1 单智能体性能
| 指标 | 数值 |
|---|---|
| 平均响应时间 | 1.2s |
| 最大并发工具 | 5 |
| 上下文窗口 | 8K tokens |
21.2 多智能体扩展性
| 智能体数量 | 任务吞吐量 | 延迟增长 |
|---|---|---|
| 1 | 100任务/分 | - |
| 10 | 850任务/分 | 15% |
| 100 | 6500任务/分 | 40% |
22. 设计模式提炼
22.1 核心模式
- 主从反应器:事件循环+工具分发
- 装饰器链:上下文压缩管道
- 状态模式:协作协议实现
22.2 创新模式
- 渐进式harness:可插拔功能模块
- JSONL邮箱:去中心化通信
- 工作树沙盒:并行文件操作
23. 技术选型对比
23.1 与LangChain对比
| 特性 | Claude Code | LangChain |
|---|---|---|
| 架构理念 | 模型为中心 | 链式编排 |
| 扩展性 | 渐进式harness | 预制组件 |
| 学习曲线 | 陡峭但系统 | 平但分散 |
23.2 与AutoGPT对比
| 维度 | Claude Code | AutoGPT |
|---|---|---|
| 可控性 | 精细调控 | 自主性强 |
| 复杂度 | 模块清晰 | 整体复杂 |
| 适用场景 | 生产系统 | 探索性任务 |
24. 关键代码片段解析
24.1 核心循环增强版
python复制def enhanced_agent_loop():
while True:
# 1. 上下文压缩检查
messages = compressor.auto_compact(messages)
# 2. 调用LLM
response = llm.call(messages)
# 3. 工具调用处理
if response.stop_reason == "tool_use":
tool_result = execute_tool(response.tool)
messages.append({"role": "tool", "content": tool_result})
# 4. 子智能体调用
elif response.stop_reason == "subagent":
subagent = Subagent(messages)
result = subagent.execute(response.task)
messages.append({"role": "subagent", "content": result})
# 5. 结束处理
elif response.stop_reason == "end_turn":
return response.content
24.2 安全工具调用
python复制def safe_execute_tool(tool_call):
# 1. 权限检查
if not check_permission(tool_call.name, current_role):
raise PermissionError(f"Role {current_role} cannot use {tool_call.name}")
# 2. 参数验证
validated_args = validate_args(tool_call.name, tool_call.arguments)
# 3. 沙盒执行
with Sandbox() as sandbox:
result = TOOLS[tool_call.name](validated_args)
# 4. 输出过滤
return output_filter.filter(result)
25. 异常处理体系
25.1 错误分类策略
| 错误类型 | 处理方式 | 恢复策略 |
|---|---|---|
| 工具错误 | 记录并返回 | 重试/跳过 |
| LLM错误 | 指数退避 | 切换模型 |
| 系统错误 | 警报通知 | 重启服务 |
25.2 熔断机制实现
python复制class CircuitBreaker:
def __init__(self, max_failures=3, reset_timeout=60):
self.failures = 0
self.last_failure = None
self.max_failures = max_failures
self.reset_timeout = reset_timeout
def __call__(self, func):
def wrapper(*args, **kwargs):
if self.is_open():
raise CircuitOpenError("Breaker is open")
try:
result = func(*args, **kwargs)
self._reset()
return result
except Exception as e:
self._record_failure()
raise
return wrapper
def is_open(self):
return (self.failures >= self.max_failures and
time.time() - self.last_failure < self.reset_timeout)
26. 配置管理系统
26.1 分层配置设计
python复制class Config:
def __init__(self):
# 1. 默认配置
self.defaults = {
"max_tokens": 8000,
"temperature": 0.7,
"timeout": 30
}
# 2. 文件配置
self.file_config = self._load_file_config()
# 3. 环境变量
self.env_config = self._load_env_vars()
# 4. 运行时覆盖
self.runtime_overrides = {}
def get(self, key):
return (self.runtime_overrides.get(key) or
self.env_config.get(key) or
self.file_config.get(key) or
self.defaults[key])
26.2 热重载机制
python复制import watchdog.events
class ConfigReloader(watchdog.events.FileSystemEventHandler):
def __init__(self, config):
self.config = config
def on_modified(self, event):
if event.src_path.endswith("config.json"):
self.config.reload()
logger.info("Config reloaded")
27. 安全最佳实践
27.1 输入验证框架
python复制class InputValidator:
SCHEMAS = {
"filename": {
"type": "string",
"regex": r"^[a-zA-Z0-9_\-\.]+$",
"max_length": 255
},
"email": {
"type": "string",
"regex": r"^[^@]+@[^@]+\.[^@]+$"
}
}
def validate(self, field, value):
schema = self.SCHEMAS.get(field, {})
if schema.get("type") and not isinstance(value, eval(schema["type"])):
raise ValidationError(f"Invalid type for {field}")
if schema.get("regex") and not re.match(schema["regex"], str(value)):
raise ValidationError(f"Invalid format for {field}")
return True
27.2 审计日志规范
python复制class AuditLogger:
def log(self, action, target, actor, metadata=None):
entry = {
"timestamp": datetime.utcnow().isoformat(),
"action": action,
"target": target,
"actor": actor,
"metadata": metadata or {},
"ip": get_client_ip()
}
self._write_entry(entry)
def _write_entry(self, entry):
with open("audit.log", "a") as f:
f.write(json.dumps(entry) + "\n")
28. 性能优化技巧
28.1 预编译正则表达式
python复制class RegexCache:
_cache = {}
@classmethod
def get(cls, pattern):
if pattern not in cls._cache:
cls._cache[pattern] = re.compile(pattern)
return cls._cache[pattern]
28.2 智能批处理
python复制class BatchProcessor:
def __init__(self, max_batch_size=10, max_wait_time=0.1):
self.batch = []
self.max_batch_size = max_batch_size
self.max_wait_time = max_wait_time
self.last_process_time = time.time()
def add(self, item):
self.batch.append(item)
if (len(self.batch) >= self.max_batch_size or
time.time() - self.last_process_time >= self.max_wait_time):
self.process()
def process(self):
if not self.batch:
return
# 执行批量处理
process_batch(self.batch)
self.batch = []
self.last_process_time = time.time()
29. 扩展性设计
29.1 插件系统架构
python复制class Plugin:
def __init__(self, agent):
self.agent = agent
def on_message(self, message):
"""处理消息钩子"""
pass
def on_tool_call(self, tool_call):
"""拦截工具调用"""
pass
class PluginManager:
def __init__(self):
self.plugins = []
def register(self, plugin):
self.plugins.append(plugin)
def dispatch_message(self, message):
for plugin in self.plugins:
message = plugin.on_message(message) or message
return message
29.2 热插拔工具
python复制class HotSwapTools:
def __init__(self):
self._tools = {}
self._lock = threading.RLock()
def register(self, name, tool):
with self._lock:
self._tools[name] = tool
def unregister(self, name):
with self._lock:
self._tools.pop(name, None)
def get(self, name):
with self._lock:
return self._tools.get(name)
30. 项目实战建议
30.1 开发环境配置
- Python环境:建议3.9+,使用venv隔离
- 开发工具:VS Code + Pylance + GitLens
- 测试框架:pytest + coverage
30.2 代码质量保障
- 静态检查:mypy + pylint + bandit
- 格式化:black + isort
- 提交钩子:pre-commit配置
31. 调试与诊断
31.1 交互式调试器
python复制def debug_hook(frame, event, arg):
if event == "call" and frame.f_code.co_name == "execute_tool":
print(f"Entering execute_tool with {frame.f_locals}")
return debug_hook
sys.settrace(debug_hook)
31.2 状态检查端点
python复制@app.get("/debug/state")
def get_state():
return {
"messages": agent.messages[-5:],
"active_tools": list(agent.tools.keys()),
"pending_tasks": task_system.count_pending()
}
32. 部署架构模式
32.1 单机部署
mermaid复制graph TD
A[Client] --> B[Nginx]
B --> C[Agent Process]
C --> D[LLM API]
C --> E[Tool Executors]
32.2 分布式部署
mermaid复制graph TD
A[Client] --> B[Load Balancer]
B --> C[Agent Node 1]
B --> D[Agent Node 2]
C & D --> E[Shared Storage]
C & D --> F[LLM Cluster]
33. 成本优化策略
33.1 LLM调用优化
- 缓存层:对相似请求返回缓存结果
- 小模型路由:简单任务使用小模型
- 结果预处理:减少输出token数量
33.2 资源复用方案
- 连接池:数据库/API连接复用
- 工具实例池:避免重复初始化
- 子智能体复用:空闲智能体池
34. 监控指标体系
34.1 核心监控项
| 指标 | 类型 | 告警阈值 |
|---|---|---|
| LLM P99延迟 | 性能 | >3s |
| 工具错误率 | 错误 | >5% |
| 内存使用 | 资源 | >80% |
34.2 Prometheus指标示例
python复制from prometheus_client import Gauge
LLM_CALLS = Gauge('llm_calls_total', 'Total LLM calls')
TOOL_CALLS = Gauge('tool_calls_total', 'Total tool calls', ['tool_name'])
def instrumented_llm_call(messages):
start = time.time()
result = llm.call(messages)
duration = time.time() - start
LLM_CALLS.inc()
LLM_DURATION.observe(duration)
return result
35. 灾备与恢复
35.1 检查点实现
python复制class Checkpointer:
def __init__(self, interval=300):
self.interval = interval
self.last_checkpoint = 0
def maybe_checkpoint(self, state):
now = time.time()
if now - self.last_checkpoint >= self.interval:
self._save_checkpoint(state)
self.last_checkpoint = now
def _save_checkpoint(self, state):
with open("checkpoint.json", "w") as f:
json.dump({
"timestamp": time.time(),
"state": state
}, f)
35.2 回滚策略
- 消息日志回放:从持久化日志重建状态
- 快照恢复:加载最近检查点
- 补偿事务:针对关键操作的逆向操作
36. 文档与知识管理
36.1 自动化文档生成
python复制def generate_docs(agent):
docs = {
"tools": [],
"protocols": [],
"examples": []
}
for name, tool in agent.tools.items():
docs["tools"].append({
"name": name,
"description": tool.__doc__,
"parameters": inspect.signature(tool).parameters
})
return docs
36.2 知识图谱集成
python复制class KnowledgeGraph:
def query(self, question):
results = self.search_graph(question)
return self.format_for_agent(results)
def update(self, fact):
self.validate_fact(fact)
self.store_fact(fact)
37. 团队协作规范
37.1 开发流程
- 特性分支:每个功能独立分支
- 代码审查:至少两人审查
- 自动化测试:CI流水线门禁
37.2 编码标准
- 类型注解:全面使用type hints
- 文档字符串:Google风格文档
- 错误处理:明确错误分类