act数据层

发布时间:2026/7/3 7:56:48
act数据层 record_sim_episodes.py生成训练数据集将脚本策略的演示记录成 HDF5 文件供 ACT 训练使用。task_name args[task_name] dataset_dir args[dataset_dir] num_episodes args[num_episodes] onscreen_render args[onscreen_render] inject_noise False render_cam_name angle从命令行参数字典中提取各字段。inject_noise硬编码为False脚本策略录制不加噪声确保高成功率。render_cam_name固定用angle摄像头预览。if not os.path.isdir(dataset_dir): os.makedirs(dataset_dir, exist_okTrue) episode_len SIM_TASK_CONFIGS[task_name][episode_len] camera_names SIM_TASK_CONFIGS[task_name][camera_names] if task_name sim_transfer_cube_scripted: policy_cls PickAndTransferPolicy elif task_name sim_insertion_scripted: policy_cls InsertionPolicy else: raise NotImplementedError设置路径读取轨迹长度和摄像头的名称取出对应的打点的类for episode_idx in range(num_episodes):为主采集循环print(f{episode_idx}) print(Rollout out EE space scripted policy) # setup the environment env make_ee_sim_env(task_name) ts env.reset() episode [ts] policy policy_cls(inject_noise) # setup plotting if onscreen_render: ax plt.subplot() plt_img ax.imshow(ts.observation[images][render_cam_name]) plt.ion() for step in range(episode_len): action policy(ts) ts env.step(action) episode.append(ts) if onscreen_render: plt_img.set_data(ts.observation[images][render_cam_name]) plt.pause(0.002) plt.close()设置摄像头渲染一下执行episode_len步策略根据当前观测生成 16 维 EE 动作 → 环境执行 → 追加到episodeepisode_return np.sum([ts.reward for ts in episode[1:]]) episode_max_reward np.max([ts.reward for ts in episode[1:]]) if episode_max_reward env.task.max_reward: print(f{episode_idx} Successful, {episode_return}) else: print(f{episode_idx} Failed)判断着一个阶段是否成功joint_traj [ts.observation[qpos] for ts in episode] # replace gripper pose with gripper control gripper_ctrl_traj [ts.observation[gripper_ctrl] for ts in episode] for joint, ctrl in zip(joint_traj, gripper_ctrl_traj): left_ctrl PUPPET_GRIPPER_POSITION_NORMALIZE_FN(ctrl[0]) right_ctrl PUPPET_GRIPPER_POSITION_NORMALIZE_FN(ctrl[2]) joint[6] left_ctrl joint[67] right_ctrlEE 环境中夹爪的qpos[6]是物理模拟的内部值不等于真实控制命令。用gripper_ctrlMuJoCo actuator 的实际控制量替换再经PUPPET_GRIPPER_POSITION_NORMALIZE_FN归一化。ctrl[0]是左夹爪ctrl[2]是右夹爪ctrl[1]是无关项。print(Replaying joint commands) env make_sim_env(task_name) BOX_POSE[0] subtask_info # make sure the sim_env has the same object configurations as ee_sim_env ts env.reset()之后时第二阶段关节空间回放直接将阶段1的关节轨迹作为动作序列回放。阶段2的成功判断真正计入success列表。只有在关节空间回放中成功的 episode 才算真正有效的数据data_dict { /observations/qpos: [], /observations/qvel: [], /action: [], } for cam_name in camera_names: data_dict[f/observations/images/{cam_name}] [] # because the replaying, there will be eps_len 1 actions and eps_len 2 timesteps # truncate here to be consistent joint_traj joint_traj[:-1] episode_replay episode_replay[:-1] # len(joint_traj) i.e. actions: max_timesteps # len(episode_replay) i.e. time steps: max_timesteps 1 max_timesteps len(joint_traj) while joint_traj: action joint_traj.pop(0) ts episode_replay.pop(0) data_dict[/observations/qpos].append(ts.observation[qpos]) data_dict[/observations/qvel].append(ts.observation[qvel]) data_dict[/action].append(action) for cam_name in camera_names: data_dict[f/observations/images/{cam_name}].append(ts.observation[images][cam_name]) # HDF5 t0 time.time() dataset_path os.path.join(dataset_dir, fepisode_{episode_idx}) with h5py.File(dataset_path .hdf5, w, rdcc_nbytes1024 ** 2 * 2) as root: root.attrs[sim] True obs root.create_group(observations) image obs.create_group(images) for cam_name in camera_names: _ image.create_dataset(cam_name, (max_timesteps, 480, 640, 3), dtypeuint8, chunks(1, 480, 640, 3), ) # compressiongzip,compression_opts2,) # compression32001, compression_opts(0, 0, 0, 0, 9, 1, 1), shuffleFalse) qpos obs.create_dataset(qpos, (max_timesteps, 14)) qvel obs.create_dataset(qvel, (max_timesteps, 14)) action root.create_dataset(action, (max_timesteps, 14)) for name, array in data_dict.items(): root[name][...] array print(fSaving: {time.time() - t0:.1f} secs\n) print(fSaved to {dataset_dir}) print(fSuccess: {np.sum(success)} / {len(success)})最后创造字典并且保存HDF5文件util.py这个文件是 ACT 训练管道的数据加载与工具库分为四大模块数据集类(EpisodicDataset) — 从 HDF5 文件加载单个 episode统计计算(get_norm_stats) — 计算数据集的归一化参数数据加载管道(load_data) — 构建训练/验证 DataLoader辅助函数— 环境采样、字典操作、随机种子class EpisodicDataset(torch.utils.data.Dataset):def __init__(self, episode_ids, dataset_dir, camera_names, norm_stats):episode_ids要加载的 episode 编号列表如[0, 1, 5, 10, ...]dataset_dirHDF5 文件所在目录camera_names摄像头名称列表如[top, angle]norm_stats来自get_norm_stats()的归一化参数字典调用父类初始化def __len__(self):读取episode总数def __getitem__(self, index):返回单个样本def __getitem__(self, index): sample_full_episode False # hardcode episode_id self.episode_ids[index] dataset_path os.path.join(self.dataset_dir, fepisode_{episode_id}.hdf5) with h5py.File(dataset_path, r) as root: is_sim root.attrs[sim] original_action_shape root[/action].shape episode_len original_action_shape[0] if sample_full_episode: start_ts 0 else: start_ts np.random.choice(episode_len) # get observation at start_ts only qpos root[/observations/qpos][start_ts] qvel root[/observations/qvel][start_ts] image_dict dict() for cam_name in self.camera_names: image_dict[cam_name] root[f/observations/images/{cam_name}][start_ts] # get all actions after and including start_ts if is_sim: action root[/action][start_ts:] action_len episode_len - start_ts else: action root[/action][max(0, start_ts - 1):] # hack, to make timesteps more aligned action_len episode_len - max(0, start_ts - 1) # hack, to make timesteps more aligned self.is_sim is_sim padded_action np.zeros(original_action_shape, dtypenp.float32) padded_action[:action_len] action is_pad np.zeros(episode_len) is_pad[action_len:] 1 # new axis for different cameras all_cam_images [] for cam_name in self.camera_names: all_cam_images.append(image_dict[cam_name]) all_cam_images np.stack(all_cam_images, axis0) # construct observations image_data torch.from_numpy(all_cam_images) qpos_data torch.from_numpy(qpos).float() action_data torch.from_numpy(padded_action).float() is_pad torch.from_numpy(is_pad).bool() # channel last image_data torch.einsum(k h w c - k c h w, image_data) # normalize image and change dtype to float image_data image_data / 255.0 action_data (action_data - self.norm_stats[action_mean]) / self.norm_stats[action_std] qpos_data (qpos_data - self.norm_stats[qpos_mean]) / self.norm_stats[qpos_std] return image_data, qpos_data, action_data, is_padsample_full_episodeFalse表示不采样完整 episode而是随机截取片段。之后根据index获取对应的 episode ID构造 HDF5 文件路径。打开文件并读取可以自己选取起始时刻之后读取出时的observation和动作把没有动作的地方进行掩码得到is_pad最后对多个摄像头堆叠颜色编导0到1,动作减去均值除以标准差进行归一化def get_norm_stats(dataset_dir, num_episodes):扫描整个数据集计算训练时要用的归一化统计量也就是action_mean/action_stdqpos_mean/qpos_stddef load_data(dataset_dir, num_episodes, camera_names, batch_size_train, batch_size_val):def load_data(dataset_dir, num_episodes, camera_names, batch_size_train, batch_size_val): print(f\nData from: {dataset_dir}\n) # obtain train test split train_ratio 0.8 shuffled_indices np.random.permutation(num_episodes) train_indices shuffled_indices[:int(train_ratio * num_episodes)] val_indices shuffled_indices[int(train_ratio * num_episodes):] # obtain normalization stats for qpos and action norm_stats get_norm_stats(dataset_dir, num_episodes) # construct dataset and dataloader train_dataset EpisodicDataset(train_indices, dataset_dir, camera_names, norm_stats) val_dataset EpisodicDataset(val_indices, dataset_dir, camera_names, norm_stats) train_dataloader DataLoader(train_dataset, batch_sizebatch_size_train, shuffleTrue, pin_memoryTrue, num_workers1, prefetch_factor1) val_dataloader DataLoader(val_dataset, batch_sizebatch_size_val, shuffleTrue, pin_memoryTrue, num_workers1, prefetch_factor1) return train_dataloader, val_dataloader, norm_stats, train_dataset.is_simdataset_dir数据目录里面放着episode_0.hdf5之类的文件num_episodes总共有多少个 episodecamera_names要读取哪些摄像头图像batch_size_train训练集 batch sizebatch_size_val验证集 batch size先设定80%为训练集对轨迹进行打乱之后选取80%为训练集20%为数据集然后计算均值标准差之后得到归一化之后的数据集然后使用dataloader构建数据集def sample_box_pose():这个函数用于抓取/搬运方块任务随机生成一个方块的初始位姿。def compute_dict_mean(epoch_dicts):计算字典平均值visualize_episodes.py用途时生成视频读取 HDF5 episode↓提取 qpos / qvel / action / images↓保存多摄像头视频↓绘制关节状态和动作曲线