在构建AI智能体(Agent)系统时,中间件作为连接不同组件的桥梁发挥着关键作用。Langchain框架内置的中间件模块为开发者提供了开箱即用的管道处理能力,能够在不修改核心逻辑的情况下增强Agent的功能表现。这类中间件通常运行在核心业务逻辑外层,像过滤器一样对输入输出进行预处理和后处理。
典型应用场景:当需要统一处理多个Agent的日志记录、输入验证或性能监控时,中间件可以避免在每个Agent中重复编写相同代码。
Langchain 9.1版本提供的中间件主要覆盖以下维度:
| 中间件类型 | 功能描述 | 技术实现要点 |
|---|---|---|
| 日志记录 | 记录Agent执行过程的完整轨迹 | 采用装饰器模式包裹执行函数 |
| 异常处理 | 统一捕获和处理运行时异常 | 实现异常过滤器链 |
| 性能监控 | 统计各环节耗时并生成性能报告 | 基于时间戳差值计算 |
| 输入校验 | 验证输入数据格式和业务规则 | 集成Pydantic验证模型 |
| 缓存代理 | 对重复请求返回缓存结果 | 使用LRU缓存策略 |
| 限流控制 | 防止API调用频率过高 | 令牌桶算法实现 |
Langchain采用洋葱模型处理中间件调用链,每个中间件都包含before和after两个处理阶段:
python复制async def middleware_chain(agent, input_data):
# 前置处理
for middleware in reversed(middlewares):
input_data = await middleware.before(agent, input_data)
# 核心逻辑执行
output_data = await agent.execute(input_data)
# 后置处理
for middleware in middlewares:
output_data = await middleware.after(agent, output_data)
return output_data
这种设计使得中间件可以:
以日志中间件为例,其核心实现逻辑包含:
python复制class LoggingMiddleware:
def __init__(self, logger):
self.logger = logger
async def before(self, agent, input_data):
self.logger.info(f"Request received by {agent.name}: {input_data}")
return input_data
async def after(self, agent, output_data):
self.logger.info(f"Response from {agent.name}: {output_data}")
return output_data
在Langchain中启用中间件需要三个步骤:
python复制from langchain.middleware import LoggingMiddleware
logging_middleware = LoggingMiddleware(logger)
python复制middlewares = [
logging_middleware,
# 其他中间件...
]
python复制agent = Agent(
middleware=middlewares,
# 其他配置...
)
不同中间件类型需要关注特定配置项:
缓存中间件:
python复制CacheMiddleware(
max_size=1000, # 缓存条目上限
ttl=3600, # 缓存有效期(秒)
key_builder=lambda x: hashlib.md5(x.encode()).hexdigest() # 缓存键生成器
)
限流中间件:
python复制RateLimitMiddleware(
calls=100, # 单位时间最大调用次数
period=60, # 时间窗口(秒)
storage=RedisStorage() # 计数存储后端
)
通过性能监控中间件收集的数据显示(测试环境):
| 中间件类型 | 平均耗时(ms) | 内存占用(MB) |
|---|---|---|
| 基础日志 | 2.1 | 1.2 |
| 详细调试日志 | 8.7 | 3.5 |
| 输入验证 | 5.3 | 2.1 |
| 缓存查询 | 1.4 | 4.8 |
| 远程调用监控 | 12.6 | 6.2 |
按需加载原则:
执行顺序优化:
python复制# 优化后的中间件顺序
middlewares = [
RateLimitMiddleware(), # 最先执行-快速失败
CacheMiddleware(), # 尽早返回缓存
ValidationMiddleware(), # 验证有效负载
LoggingMiddleware() # 最后记录
]
缓存策略调整:
建议实现分级的异常处理策略:
python复制class ErrorHandlingMiddleware:
async def after(self, agent, output_data):
if isinstance(output_data, Error):
if output_data.code == 429:
return {"error": "请求过于频繁"}
elif output_data.code == 400:
return {"error": "无效输入"}
else:
logger.error(f"未处理异常: {output_data}")
return {"error": "系统繁忙"}
return output_data
结合中间件实现基础熔断:
python复制class CircuitBreaker:
def __init__(self, max_failures=3, reset_timeout=60):
self.failures = 0
self.last_failure = None
async def before(self, agent, input_data):
if self.failures >= max_failures and
time.time() - self.last_failure < reset_timeout:
raise CircuitOpenError()
return input_data
async def after(self, agent, output_data):
if isinstance(output_data, Error):
self.failures += 1
self.last_failure = time.time()
自定义中间件需要实现基础接口:
python复制from typing import Any, Awaitable, Callable
class CustomMiddleware:
async def __call__(
self,
agent: "Agent",
input_data: Any,
next_fn: Callable[[Any], Awaitable[Any]]
) -> Any:
# 前置处理
processed_input = await self.before(agent, input_data)
try:
# 调用下一个中间件或Agent核心逻辑
output_data = await next_fn(processed_input)
# 后置处理
return await self.after(agent, output_data)
except Exception as e:
# 异常处理
return await self.on_error(agent, e)
审计追踪:
数据脱敏:
协议转换:
在实现自定义中间件时,需要注意避免阻塞主线程的操作,对于IO密集型任务建议使用async/await语法。测试阶段应该验证中间件在以下场景的表现: