1. 项目概述
在机器人强化学习领域,Unitree RL GYM 是一个基于 Unitree 四足机器人的开源强化学习控制框架。作为该系列文章的第三期,我们将深入解析其中的三个核心组件:OnPolicyRunner、VecEnv 和 RolloutStorage 的实现原理与代码细节。这些组件共同构成了一个完整的强化学习训练系统,支持从仿真到真实世界的策略迁移(sim-to-real)。
1.1 核心需求解析
在强化学习训练过程中,我们需要解决以下几个关键问题:
- 高效数据采集:需要并行运行多个环境实例以加速数据收集
- 轨迹存储与管理:需要有效存储和管理采样得到的轨迹数据
- 策略优化:需要实现PPO等策略优化算法的核心计算逻辑
这三个组件分别对应了这些需求:
- VecEnv:负责多环境并行运行
- RolloutStorage:负责轨迹数据的存储和管理
- OnPolicyRunner:协调整个训练流程
2. 核心组件解析
2.1 观测与特权观测系统
在深入代码前,我们需要理解一个关键概念:Asymmetric Actor-Critic(非对称Actor-Critic)架构中的观测系统设计。
2.1.1 观测(Observation)设计
对于四足机器人(如Go2、H1等),观测通常包含以下信息:
| 类型 | 说明 | 示例维度 |
|---|---|---|
| 关节位置 | 当前各关节角度 | 12 |
| 关节速度 | 各关节角速度 | 12 |
| 基座线速度 | 机器人底盘在x,y,z方向的速度 | 3 |
| 基座角速度 | 机器人底盘绕x,y,z轴的角速度 | 3 |
| IMU加速度 | 加速度传感器读数 | 3 |
| IMU角速度 | 陀螺仪读数 | 3 |
| 末端足接触 | 各足是否接触地面(触觉传感器) | 4 |
| 目标信息 | 目标位置或运动指令 | 3 |
这些观测构成了Actor网络的输入,决定了策略可以看到环境的哪些信息。
2.1.2 特权观测(Privileged Observation)设计
特权观测是Critic网络独有的输入,通常包含更多环境信息:
| 类型 | 说明 | 示例维度 |
|---|---|---|
| 完整机器人状态 | 包含底盘位姿(位置+姿态) | 6 |
| 关节力矩 | 各关节实际力矩 | 12 |
| 地形高度图 | 机器人下方或周围地形信息 | 12~20 |
| 物理全局状态 | 机器人每个关节和连杆的全局位置/速度 | N/A |
| 环境动态信息 | 例如障碍物位置、速度 | 可选 |
2.1.3 非对称架构的优势
这种设计形成了以下数据流:
code复制Environment ──> observation ──> Actor ──> action ──> Environment
└─> privileged_observation ──> Critic ──> value
其优势在于:
- Actor只需依赖真实机器人可获取的传感器数据,确保策略可在真实环境中部署
- Critic利用更多信息计算更准确的价值函数,加速训练过程
- 特别适合sim-to-real迁移学习场景
3. RolloutStorage实现详解
RolloutStorage是PPO算法中用于存储和管理采样轨迹的核心组件。
3.1 类结构与初始化
python复制class RolloutStorage:
class Transition:
def __init__(self):
self.observations = None
self.critic_observations = None
self.actions = None
self.rewards = None
self.dones = None
self.values = None
self.actions_log_prob = None
self.action_mean = None
self.action_sigma = None
self.hidden_states = None
def clear(self):
self.__init__()
def __init__(self, num_envs, num_transitions_per_env, obs_shape, privileged_obs_shape, actions_shape, device='cpu'):
self.device = device
self.obs_shape = obs_shape
self.privileged_obs_shape = privileged_obs_shape
self.actions_shape = actions_shape
# 核心缓冲区
self.observations = torch.zeros(num_transitions_per_env, num_envs, *obs_shape, device=self.device)
if privileged_obs_shape[0] is not None:
self.privileged_observations = torch.zeros(num_transitions_per_env, num_envs, *privileged_obs_shape, device=self.device)
else:
self.privileged_observations = None
self.rewards = torch.zeros(num_transitions_per_env, num_envs, 1, device=self.device)
self.actions = torch.zeros(num_transitions_per_env, num_envs, *actions_shape, device=self.device)
self.dones = torch.zeros(num_transitions_per_env, num_envs, 1, device=self.device).byte()
# PPO专用缓冲区
self.actions_log_prob = torch.zeros(num_transitions_per_env, num_envs, 1, device=self.device)
self.values = torch.zeros(num_transitions_per_env, num_envs, 1, device=self.device)
self.returns = torch.zeros(num_transitions_per_env, num_envs, 1, device=self.device)
self.advantages = torch.zeros(num_transitions_per_env, num_envs, 1, device=self.device)
self.mu = torch.zeros(num_transitions_per_env, num_envs, *actions_shape, device=self.device)
self.sigma = torch.zeros(num_transitions_per_env, num_envs, *actions_shape, device=self.device)
self.num_transitions_per_env = num_transitions_per_env
self.num_envs = num_envs
# RNN状态
self.saved_hidden_states_a = None
self.saved_hidden_states_c = None
self.step = 0
初始化参数说明:
num_envs: 并行环境数量num_transitions_per_env: 每个环境采样的步数obs_shape: 观测空间维度privileged_obs_shape: 特权观测维度actions_shape: 动作空间维度
3.2 核心方法实现
3.2.1 添加过渡数据
python复制def add_transitions(self, transition: Transition):
if self.step >= self.num_transitions_per_env:
raise AssertionError("Rollout buffer overflow")
self.observations[self.step].copy_(transition.observations)
if self.privileged_observations is not None:
self.privileged_observations[self.step].copy_(transition.critic_observations)
self.actions[self.step].copy_(transition.actions)
self.rewards[self.step].copy_(transition.rewards.view(-1, 1))
self.dones[self.step].copy_(transition.dones.view(-1, 1))
self.values[self.step].copy_(transition.values)
self.actions_log_prob[self.step].copy_(transition.actions_log_prob.view(-1, 1))
self.mu[self.step].copy_(transition.action_mean)
self.sigma[self.step].copy_(transition.action_sigma)
self._save_hidden_states(transition.hidden_states)
self.step += 1
该方法将单步交互数据存入缓冲区,包括:
- 观测和特权观测
- 动作、奖励、完成标志
- 价值估计、动作对数概率
- 高斯策略参数(均值和标准差)
- RNN隐藏状态(如果使用)
3.2.2 计算回报和优势
python复制def compute_returns(self, last_values, gamma, lam):
advantage = 0
for step in reversed(range(self.num_transitions_per_env)):
if step == self.num_transitions_per_env - 1:
next_values = last_values
else:
next_values = self.values[step + 1]
next_is_not_terminal = 1.0 - self.dones[step].float()
delta = self.rewards[step] + next_is_not_terminal * gamma * next_values - self.values[step]
advantage = delta + next_is_not_terminal * gamma * lam * advantage
self.returns[step] = advantage + self.values[step]
# 标准化优势函数
self.advantages = self.returns - self.values
self.advantages = (self.advantages - self.advantages.mean()) / (self.advantages.std() + 1e-8)
这段代码实现了广义优势估计(GAE),关键点:
- 反向计算:从最后一步开始向前递推
- 考虑episode终止:通过next_is_not_terminal处理
- 优势函数标准化:提高训练稳定性
数学表达式:
- TD误差:δₜ = rₜ + γV(sₜ₊₁) - V(sₜ)
- 优势估计:Aₜ = δₜ + γλAₜ₊₁
- 回报估计:Rₜ = Aₜ + V(sₜ)
3.2.3 生成mini-batch
python复制def mini_batch_generator(self, num_mini_batches, num_epochs=8):
batch_size = self.num_envs * self.num_transitions_per_env
mini_batch_size = batch_size // num_mini_batches
indices = torch.randperm(num_mini_batches*mini_batch_size, device=self.device)
# 展平所有数据
observations = self.observations.flatten(0, 1)
critic_observations = self.privileged_observations.flatten(0, 1) if self.privileged_observations is not None else observations
actions = self.actions.flatten(0, 1)
values = self.values.flatten(0, 1)
returns = self.returns.flatten(0, 1)
old_actions_log_prob = self.actions_log_prob.flatten(0, 1)
advantages = self.advantages.flatten(0, 1)
old_mu = self.mu.flatten(0, 1)
old_sigma = self.sigma.flatten(0, 1)
for epoch in range(num_epochs):
for i in range(num_mini_batches):
start = i*mini_batch_size
end = (i+1)*mini_batch_size
batch_idx = indices[start:end]
yield (
observations[batch_idx],
critic_observations[batch_idx],
actions[batch_idx],
values[batch_idx],
advantages[batch_idx],
returns[batch_idx],
old_actions_log_prob[batch_idx],
old_mu[batch_idx],
old_sigma[batch_idx],
(None, None), # 隐藏状态
None # mask
)
该方法实现了:
- 数据展平:将(T,N,...)形状的数据变为(T*N,...)
- 随机打乱:生成随机索引
- 分批生成:按指定大小生成mini-batch
4. VecEnv实现解析
VecEnv是向量化环境接口,支持并行运行多个环境实例。
4.1 抽象基类设计
python复制from abc import ABC, abstractmethod
import torch
from typing import Tuple, Union
class VecEnv(ABC):
num_envs: int
num_obs: int
num_privileged_obs: int
num_actions: int
max_episode_length: int
privileged_obs_buf: torch.Tensor
obs_buf: torch.Tensor
rew_buf: torch.Tensor
reset_buf: torch.Tensor
episode_length_buf: torch.Tensor
extras: dict
device: torch.device
@abstractmethod
def step(self, actions: torch.Tensor) -> Tuple[torch.Tensor, Union[torch.Tensor, None], torch.Tensor, torch.Tensor, dict]:
pass
@abstractmethod
def reset(self, env_ids: Union[list, torch.Tensor]):
pass
@abstractmethod
def get_observations(self) -> torch.Tensor:
pass
@abstractmethod
def get_privileged_observations(self) -> Union[torch.Tensor, None]:
pass
关键属性和方法:
num_envs: 并行环境数量step(): 执行并行环境步进reset(): 重置指定环境- 各种缓冲区:存储观测、奖励等信息
4.2 典型实现要点
在实际实现中,VecEnv通常会:
- 使用GPU加速:所有缓冲区放在GPU上
- 支持同步/异步重置:可以只重置部分环境
- 提供额外信息:通过extras字典返回调试信息
5. OnPolicyRunner工作流程
OnPolicyRunner协调整个训练流程,其主要工作流程如下:
-
初始化阶段:
- 创建VecEnv实例
- 初始化策略网络和RolloutStorage
- 设置优化器和超参数
-
训练循环:
python复制while not done:
# 数据收集阶段
for _ in range(num_steps_per_env):
actions = actor_critic.act(obs)
next_obs, rewards, dones, infos = vec_env.step(actions)
rollout_storage.add_transitions(Transition(obs, actions, rewards, dones, ...))
obs = next_obs
# 策略优化阶段
last_values = actor_critic.evaluate(last_obs)
rollout_storage.compute_returns(last_values, gamma, lam)
for _ in range(num_epochs):
for batch in rollout_storage.mini_batch_generator(num_mini_batches):
# 计算损失并更新策略
loss = compute_loss(*batch)
optimizer.zero_grad()
loss.backward()
optimizer.step()
rollout_storage.clear()
6. 实际应用中的注意事项
6.1 参数调优经验
-
GAE参数选择:
- γ (折扣因子):通常0.99-0.999
- λ (GAE参数):通常0.95-0.99
- 较高的λ会增加方差但减少偏差
-
批量大小设置:
- 每个mini-batch建议包含128-2048个样本
- 较大的批量可以提高训练稳定性但会降低样本效率
-
优势标准化:
- 务必进行优势标准化(减去均值,除以标准差)
- 添加小常数(1e-8)避免除零错误
6.2 常见问题排查
-
训练不稳定:
- 检查优势计算是否正确
- 验证观测和奖励的尺度是否合理
- 确保随机种子固定以便复现
-
性能下降:
- 监控KL散度,防止策略更新过大
- 检查梯度是否爆炸或消失
- 验证环境重置逻辑是否正确
-
内存问题:
- 合理设置rollout长度和环境数量
- 使用混合精度训练减少内存占用
- 定期检查GPU内存使用情况
7. 性能优化技巧
-
GPU利用率优化:
- 使用CUDA图(cuda graphs)减少内核启动开销
- 启用TensorCore加速矩阵运算
- 使用异步数据传输重叠计算和通信
-
并行化策略:
- 环境仿真和策略更新并行执行
- 使用多进程处理I/O密集型任务
- 考虑分布式训练扩展环境数量
-
内存优化:
- 使用内存池减少分配开销
- 及时释放不需要的中间结果
- 考虑梯度检查点技术节省内存
在实际机器人强化学习项目中,这套实现已经证明了其高效性和稳定性。通过合理调整参数和优化实现,可以在保持训练稳定性的同时获得良好的样本效率。