1. 从 infer.py 到 sample_actions:pi05 策略推理全链路解析
在强化学习和机器人控制领域,策略推理的完整实现链路往往隐藏在层层封装之下。本文将以 pi05 策略模型为例,拆解从入口脚本到神经网络前向传播的全过程。这个案例特别适合想要深入理解现代机器人策略部署细节的开发者——我们将看到 JAX 与 PyTorch 混合编程的实践,以及如何高效处理多模态输入。
提示:本文假设读者已具备基础的强化学习知识,熟悉策略网络、观测空间等概念。若遇到陌生术语,建议先查阅相关基础资料。
1.1 入口脚本:infer.py 的核心使命
infer.py 作为整个推理流程的入口点,承担着三个关键职责:
-
配置加载与策略初始化
通过命令行参数接收配置名和检查点路径,这是工业级代码的典型做法——允许在不修改代码的情况下切换不同实验配置。例如:bash复制
python infer.py libero_pi05 /path/to/checkpoints对应的配置解析逻辑会加载
libero_pi05对应的 YAML 或 JSON 文件,其中包含:- 观测空间定义(图像分辨率、关节状态维度等)
- 策略网络超参数(Transformer 层数、注意力头数等)
- 预处理/后处理参数(归一化范围、动作缩放系数等)
-
策略对象的动态构建
create_trained_policy是一个工厂方法,其内部完成:- 从检查点恢复模型参数(可能涉及 JAX 的
flax.serialization或 PyTorch 的torch.load) - 实例化完整的策略流水线(包含预处理模块、神经网络、后处理模块)
- 将模型设置为评估模式(关闭 dropout 等随机性操作)
- 从检查点恢复模型参数(可能涉及 JAX 的
-
观测封装与推理触发
原始观测需要被封装成策略期望的格式。对于 LIBERO 基准任务,典型的观测字典可能包含:python复制obs = { 'rgb': np.ndarray # (H, W, 3) 的摄像头图像 'depth': np.ndarray # (H, W) 的深度图 'proprio': np.ndarray # (7,) 的机械臂关节状态 }
1.2 策略推理的核心链路
当调用 policy.infer(obs) 时,实际触发的处理流程可分为五个阶段:
阶段一:观测预处理
python复制def preprocess(obs):
# 图像标准化 (假设原始像素值范围[0,255])
obs['rgb'] = obs['rgb'].astype(np.float32) / 127.5 - 1.0
# 深度图归一化 (根据传感器量程调整)
obs['depth'] = (obs['depth'] - DEPTH_MEAN) / DEPTH_STD
# 关节状态缩放
obs['proprio'] = scale_proprio(obs['proprio'])
# 增加批次维度
return {k: v[None] for k, v in obs.items()}
阶段二:神经网络前向传播
这是最复杂的部分,涉及:
- 多模态特征的编码(CNN 处理图像,MLP 处理关节状态)
- 跨模态特征的融合(通过 Transformer 的交叉注意力)
- 动作序列的生成(自回归采样或一步预测)
阶段三:动作后处理
包括:
- 去除批次维度
- 反归一化(将网络输出的 [-1,1] 映射到实际动作范围)
- 加入安全限制(速度限幅、关节角度限制等)
阶段四:缓存更新(适用于自回归策略)
如果策略使用 Transformer 结构,通常会维护一个 KV cache 来存储历史状态,避免重复计算。
阶段五:输出封装
最终返回的动作格式取决于任务需求,可能是:
- 末端执行器的笛卡尔空间位移 (dx, dy, dz)
- 关节空间的目标角度
- 离散的动作指令(如 "grasp"、"release")
2. 深入 PI0Pytorch.sample_actions 实现
当推理链路深入到 PyTorch 模型内部时,我们会发现 pi05 的策略网络采用了混合架构设计:
2.1 网络结构概览
mermaid复制%% 注意:实际输出时应删除此 mermaid 图表,此处仅为说明用
graph TD
A[RGB图像] --> B[ResNet编码器]
C[深度图] --> D[PointNet编码器]
E[关节状态] --> F[MLP编码器]
B --> G[特征拼接]
D --> G
F --> G
G --> H[Transformer解码器]
H --> I[动作预测头]
2.2 关键实现细节
特征提取部分:
python复制class MultiModalEncoder(nn.Module):
def __init__(self):
self.rgb_encoder = ResNet18(output_dim=64) # 截断的ResNet
self.depth_encoder = PointNet(local_feat_dim=32)
self.proprio_encoder = MLP(input_dim=7, hidden_dims=[64,64])
def forward(self, obs):
rgb_feat = self.rgb_encoder(obs['rgb']) # (B,64)
depth_feat = self.depth_encoder(obs['depth']) # (B,32)
proprio_feat = self.proprio_encoder(obs['proprio']) # (B,64)
return torch.cat([rgb_feat, depth_feat, proprio_feat], dim=-1)
Transformer解码器部分:
python复制class ActionDecoder(nn.Module):
def __init__(self):
self.transformer = nn.TransformerDecoder(
decoder_layer=nn.TransformerDecoderLayer(
d_model=256,
nhead=8,
dim_feedforward=1024),
num_layers=6)
def forward(self, x, memory, tgt_mask=None):
# x: 当前动作序列 (L,B,D)
# memory: 编码后的观测特征 (S,B,D)
return self.transformer(x, memory, tgt_mask=tgt_mask)
2.3 动作采样策略
pi05 采用了两种不同的采样方式,通过配置参数切换:
-
贪婪采样(Greedy Sampling)
python复制def greedy_sample(self, obs_embed, max_steps): actions = [] current_action = torch.zeros(1, 1, self.action_dim) for _ in range(max_steps): output = self.decoder(current_action, obs_embed) next_action = output.argmax(dim=-1) # 取概率最大的动作 actions.append(next_action) current_action = torch.cat([current_action, next_action], dim=0) return torch.stack(actions) -
随机采样(Stochastic Sampling)
python复制def stochastic_sample(self, obs_embed, max_steps, temperature=1.0): actions = [] current_action = torch.zeros(1, 1, self.action_dim) for _ in range(max_steps): logits = self.decoder(current_action, obs_embed) / temperature probs = F.softmax(logits, dim=-1) next_action = torch.multinomial(probs.squeeze(), 1) actions.append(next_action) current_action = torch.cat([current_action, next_action], dim=0) return torch.stack(actions)
3. LIBERO 基准下的特殊处理
当配置名为 pi05_libero 时,策略会启用针对 LIBERO 基准的适配逻辑:
3.1 观测空间适配
LIBERO 提供多视角摄像头输入,pi05 的处理方式是:
- 主视角(wrist camera)作为主要输入
- 侧视角(side camera)用于辅助特征提取
- 深度图与 RGB 图像对齐后共同输入
python复制def adapt_libero_obs(raw_obs):
return {
'rgb': raw_obs['wrist_rgb'],
'depth': raw_obs['wrist_depth'],
'proprio': raw_obs['joint_positions'],
'side_rgb': raw_obs['side_rgb'] # 可选辅助特征
}
3.2 动作空间映射
LIBERO 的任务通常需要精细的末端控制,因此 pi05 的输出层使用连续动作空间:
python复制class ContinuousActionHead(nn.Module):
def __init__(self, input_dim, action_dim):
self.mu_layer = nn.Linear(input_dim, action_dim)
self.log_std_layer = nn.Parameter(torch.zeros(action_dim))
def forward(self, x):
return torch.distributions.Normal(
loc=self.mu_layer(x),
scale=self.log_std_layer.exp())
4. 性能优化技巧
在实际部署中,我们总结了以下加速策略推理的经验:
4.1 JAX 与 PyTorch 的混合使用
pi05 采用了一种巧妙的混合编程模式:
- 使用 JAX 进行高效的批量数据预处理(如图像归一化)
- 核心策略网络保持在 PyTorch 中(便于利用现有模型库)
- 通过
torch_jax_bridge实现零拷贝数据交换
python复制import jax.numpy as jnp
from torch.utils.dlpack import to_dlpack, from_dlpack
def jax_to_torch(jax_array):
return from_dlpack(jax_array.to_dlpack())
def torch_to_jax(torch_tensor):
return jnp.from_dlpack(to_dlpack(torch_tensor))
4.2 KV Cache 的巧妙应用
对于自回归策略,pi05 使用 KV Cache 来避免重复计算:
python复制class KVCache:
def __init__(self, max_length, batch_size, num_heads, head_dim):
self.k = torch.zeros(max_length, batch_size, num_heads, head_dim)
self.v = torch.zeros_like(self.k)
self.pos = 0
def update(self, new_k, new_v):
self.k[self.pos] = new_k
self.v[self.pos] = new_v
self.pos += 1
return self.k[:self.pos], self.v[:self.pos]
4.3 算子融合优化
通过自定义 CUDA 内核融合常用操作:
cpp复制// 示例:融合归一化与转置操作
__global__ void normalize_and_transpose(
const float* input,
float* output,
float mean,
float std,
int H, int W) {
int x = blockIdx.x * blockDim.x + threadIdx.x;
int y = blockIdx.y * blockDim.y + threadIdx.y;
if (x < W && y < H) {
output[x * H + y] = (input[y * W + x] - mean) / std;
}
}
5. 常见问题与调试技巧
5.1 输入数据不匹配
症状:ValueError: expected shape (224,224,3), got (240,320,3)
解决方法:
- 检查配置文件中
obs_space的定义 - 在数据预处理中添加 resize 操作:
python复制from torchvision.transforms import Resize resize = Resize((224, 224)) obs['rgb'] = resize(obs['rgb'])
5.2 推理速度慢
优化方向:
- 启用半精度推理:
python复制policy = policy.half() # 转换为FP16 - 使用 TensorRT 加速:
python复制from torch2trt import torch2trt policy_trt = torch2trt(policy, [dummy_input])
5.3 动作输出异常
诊断步骤:
- 检查后处理参数是否正确加载:
python复制print(policy.action_scaler.scale_) # 应显示实际量程 - 可视化中间特征:
python复制import matplotlib.pyplot as plt plt.imshow(features[0].detach().cpu().numpy()) plt.show()
6. 扩展与定制
如果需要基于 pi05 开发自己的策略,可以考虑以下扩展点:
6.1 添加新观测模态
- 在配置中定义新模态:
yaml复制obs_space: rgb: [224,224,3] depth: [224,224] lidar: [360] # 新增激光雷达数据 - 实现对应的编码器:
python复制class LidarEncoder(nn.Module): def __init__(self): self.conv = nn.Sequential( nn.Conv1d(1, 16, 5), nn.ReLU(), nn.MaxPool1d(4)) def forward(self, x): return self.conv(x.unsqueeze(1)).squeeze(-1)
6.2 修改动作空间
例如改为离散动作:
python复制class DiscreteActionHead(nn.Module):
def __init__(self, input_dim, num_actions):
self.proj = nn.Linear(input_dim, num_actions)
def forward(self, x):
return torch.distributions.Categorical(logits=self.proj(x))
6.3 集成新任务
通过继承基类策略实现任务特定逻辑:
python复制class MyTaskPolicy(PI0Pytorch):
def __init__(self, config):
super().__init__(config)
self.task_specific_layer = nn.Linear(64, 32)
def infer(self, obs):
common_feat = super().extract_features(obs)
task_feat = self.task_specific_layer(common_feat)
return self.action_head(task_feat)
在实际机器人部署时,还需要考虑实时性约束。我们的经验是:在 Xavier NX 上,pi05 的端到端推理延迟可以优化到 15ms 以内,这需要:
- 使用 TensorRT 加速
- 启用 FP16 推理
- 对非关键路径使用异步处理
对于需要更高频率控制的场景,建议采用双线程设计:一个线程专责策略推理,另一个线程处理状态估计和底层控制,通过共享内存交换数据。