写点什么

PAI Physical AI Notebook 详解 1:基于 Isaac 仿真的操作动作数据扩增与模仿学习

  • 2025-11-04
    浙江
  • 本文字数:9934 字

    阅读完需:约 33 分钟

Physical AI 是 AI 技术演进的一个热门话题,目的是基于 Transformer、Diffusion 等主流大模型结构,训练得到可以在实际物理空间中指导机器人本体完成各种任务的 AI 模型。


在 Physical AI 模型的开发过程中,需要用到遥操采集、数据合成、数据增强、模仿学习、模型测评等多个过程,也会用到 Isaac Sim、Isaac Lab、Cosmos 等多种工具栈和模型。


基于和 NVIDIA Physical AI 套件的深度整合,阿里云 PAI 人工智能平台整合了上述全套工具栈,并在 Notebook Gallery 中上线了多套开箱即用的最佳实践,供开发者熟悉技术、快速启动 Physical AI 项目使用。



从本期开始,我们会陆续介绍 5 款最佳实践:

  • 基于 Isaac 仿真的操作动作数据扩增与模仿学习

  • 基于 Cosmos 世界模型的操作动作数据扩增与模仿学习

  • 基于 X-Mobility 的通用导航与运动控制

  • GR00T-N1 模型微调

  • 基于 Isaac-Cortex 的软件在环验证


本期我们介绍基于 Isaac 仿真的操作动作数据扩增与模仿学习

Physical AI 模型模仿学习的一般过程


和 LLM 一样,Physical AI 模型的训练也是神经网络模型,也需要大量数据训练,才能学会特定技能。但是与 LLM 使用文本数据训练不同,Physical AI 模型训练使用的数据,需要是针对特定场景、特定对象、特定任务的多模态数据,一般包含视频、触觉传感信息、关节位置数据。


一般来说,遥控真实机器人完成特定任务,即可获取上述数据。但是,由于真实机器人尚未大规模普及,为了特定任务专门搭建演示场景的成本又较高,一般使用人工演示 + 数据扩增 + 数据增强的方式来获取足够的训练数据,其中:


  • 人工演示:由真人在仿真环境,或真实环境中,遥控机器人完成特定任务,从而获取完整的任务数据

  • 数据扩增:基于人工演示数据,引入一定随机化(随机初始位置、随机过程轨迹等),扩增出与人工演示类似但有细微不同的数据

  • 数据增强:在扩增数据的基础上,通过生成式大模型,改善视频数据中的贴图、纹理、光照等细节,使之更符合真实环境

  • 模仿学习:使用扩增并增强后的数据对基础模型进行训练

  • 模型测评:在仿真环境或真实环境中,使用训练得到的模型指导机器人运行,完成任务 A,记录其成功率,以体现模型质量


在 PAI 的 Notebook Gallery 中,我们已经预置了一个最佳实践,就是这个过程的一个具体示例:

https://gallery.pai-ml.com/#/preview/deepLearning/cv/isaac_lab_wf1


下面我们来详细解读这个示例。

人工少量演示

人工少量演示的目的是给后续的数据扩增打下样例。人工演示可以在真实环境下通过遥控真机完成,也可以在仿真环境下通过遥控仿真环境下的本体完成。


在本最佳实践中,我们基于 MimicGen 进行数据扩增,而 MimicGen 需要仿真环境提供的真值数据,因此我们在 Isaac Lab 仿真环境下采集人工演示。以下是在 DSW 中启动 Isaac Lab 仿真环境的一个样例:

# 使用键盘遥操仿真环境进行数据集标注cmd = f"PUBLIC_IP=$(curl -s ifconfig.me) /workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/environments/teleoperation/teleop_se3_agent.py --task Isaac-Stack-Cube-Franka-IK-Rel-v0 --num_envs 1 --teleop_device keyboard --livestream 1"    print(f"执行命令: {cmd}")!{cmd}
复制代码


在 DSW 中执行此命令,稍等片刻,即可通过 Isaac Sim WebRTC Streaming Client 连接 DSW 公网 IP 从而启动遥操界面


通过以下键盘操作可以控制机械臂的动作:

    Toggle gripper (open/close): K    Move arm along x-axis: W/S    Move arm along y-axis: A/D    Move arm along z-axis: Q/E    Rotate arm along x-axis: Z/X    Rotate arm along y-axis: T/G    Rotate arm along z-axis: C/V
复制代码


在完成遥控操作后,还需要使用 Mimic Gen 管线中的 annotate_demos.py 对人工演示数据进行子任务标注,这是确保 Isaac Lab Mimic 功能正常运行的关键步骤。

cmd = f"/workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/imitation_learning/isaaclab_mimic/annotate_demos.py \--enable_cameras --task Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Mimic-v0 --auto \--input_file {dataset_path} --output_file {annotated_dataset_path} --headless"    print(f"执行命令: {cmd}")!{cmd}
复制代码


完成子任务标注后,可以通过以下命令,查看人工演示的数据:

# 使用LiveStream预览已标注数据集os.environ["ACCEPT_EULA"] = "Y"cmd = f"PUBLIC_IP=$(curl -s ifconfig.me) /isaac-sim/python.sh /workspace/isaaclab/scripts/tools/replay_demos.py     --dataset_file /mnt/data/isaac_tmp/dataset/annotated_dataset.hdf5     --task Isaac-Stack-Cube-Franka-IK-Rel-v0     --num_envs 1  --livestream 1"print(f"执行命令: {cmd}")!{cmd}
复制代码

视频演示 >>

数据扩增

使用 Isaac Lab Mimic 功能,从少量已标注的专家演示中自动扩增额外的演示数据。用户可以调节命令中的 num_envs 参数,以增加或减少并行环境数量,充分利用显存。还可以调节 generation_num_trials,以调节希望扩增的演示数据数量。例如,可以设置 num_envs 为 8,generation_num_trials 为 1000,这样可以以 8 份数据为一组,不断重复,直至得到 1000 份数据。

# 使用Isaac Lab Mimic生成数据集os.environ["ACCEPT_EULA"] = "Y"cmd = f"PUBLIC_IP=$(curl -s ifconfig.me) /workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/imitation_learning/isaaclab_mimic/generate_dataset.py \--enable_cameras --headless --num_envs 8 --generation_num_trials 1000 \--input_file {annotated_dataset_path} --output_file {mimic_dataset_path} \--task Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Cosmos-Mimic-v0  --livestream 1"    print(f"执行命令: {cmd}")!{cmd}
复制代码

视频演示 >>


对于规模较大的生成或者 mimic 任务,也可以利用 DLC 拉起离线仿真任务;此外,DLC 也提供了对 Ray 的支持,我们也提供了示例,使用 DLC-Ray 拉起多任务,实现计算资源的编排和充分使用。

# 创建DLC作业。    create_job_resp = dlc_client.create_job(CreateJobRequest().from_map({        'WorkspaceId': workspace_id,        'DisplayName': display_name,        'JobType': 'RayJob',        'ResourceId': resource_quota_id,        'JobSpecs': [            {                #假设我们可用资源是8GPU * 128CPU * 1024G内存,可以按照如下分配head和worker                "Type": "Head",                "Image": image_uri,                "PodCount": 1,                "ResourceConfig": {                    "CPU": "8",       # 指定CPU核心数                    "Memory": "32Gi",   # 指定内存大小(单位:GB)                    "GPU": "0"        # Head节点无需GPU,因此设置为0                }            },            {                "Type": "Worker",                "Image": image_uri,                "PodCount": 2, #模拟两个worker node                "ResourceConfig": {                    "CPU": "56",       # 指定CPU核心数                    "Memory": "448Gi",   # 指定内存大小(单位:GB)                    "GPU": "4"        # 指定GPU数量,与UserCommand中的--num_per_worker一致,以保证一个任务使用一个gpu                }            },        ],        'DataSources': [            {                "DataSourceId": dataset_id,                "MountPath": "/mnt/data",  # 挂载路径            },            {                "DataSourceId": pub_dataset_id,                "MountPath": "/mnt/isaac_assets",  # 挂载路径            }        ],       'UserVpc': {            "VpcId": vpc_id,  # 替换为实际 VPC ID            "SwitchId": switch_id,  # 替换为实际交换机 ID            "SecurityGroupId": security_groupid  # 替换为实际安全组 ID        },                # 根据可用资源以及任务,示例中为每个任务分配1gpu * 14cpu * 96G内存,由于每个worker我们设置了4张卡,因此可以启动4个任务        "UserCommand": f"""        /workspace/isaaclab/isaaclab.sh -p /mnt/data/isaac_tmp/ray_isaac_new.py \        --command "/workspace/isaaclab/isaaclab.sh -p /mnt/data/isaac_tmp/generate_dataset_ray.py \        --enable_cameras --headless --num_envs 8 --generation_num_trials 125 \        --input_file {annotated_dataset_path} --output_file {mimic_dataset_path} \        --task Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Cosmos-Mimic-v0 \        --asset_path {asset_root_dir}" \        --gpu 1 \        --cpu 14 \        --memory 96 \        --num_per_worker 4        """    }))        job_id = create_job_resp.body.job_id    wait_for_job_to_terminate(dlc_client, job_id)
复制代码

数据增强

通过 Isaac Lab 提供的格式转换功能,可以提取 hdf5 文件中记录的视频帧,将其转换为 mp4 文件:

    cmd = f"/workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/tools/hdf5_to_mp4.py \    --input_file {input_file} \    --output_dir {output_dir} \    --input_keys {' '.join(input_keys)}"        print(f"执行命令: {cmd}")    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)    print(result.stdout)
复制代码


不过,仿真得到的 mp4 文件通常不会十分逼真,例如:

视频演示 >>


我们可以通过 cosmos-transer1-7b 模型将其增强。


首先在 Model Gallery 中找到 cosmos-transfer1-7b 模型并将其部署:



在完成部署后,通过以下代码可以调用 cosmos 服务,从而增强扩增的数据:

import jsonfrom pathlib import Pathimport osimport shutilimport requestsimport gradio_client.client as gradio_clientimport gradio_client.utils as gradio_utils

def load_prompts_from_file(prompts_file_path): """从文件中加载提示词。""" if not prompts_file_path.exists(): return [] with open(prompts_file_path, 'r', encoding='utf-8') as f: return [line.strip() for line in f if line.strip()]
def create_cosmos_request(input_video_path, prompt=None, depth_control_path=None, seg_control_path=None): """动态创建Cosmos请求参数。""" request = { "vis": { "control_weight": 0.1, }, "edge": { "control_weight": 0.3, }, "depth": { "control_weight": 0.6, "input_control": depth_control_path, }, "seg": { "control_weight": 0.7, "input_control": seg_control_path, }, "input_video_path": input_video_path, "negative_prompt": "The video captures a game playing, with bad crappy graphics and cartoonish frames. It represents a recording of old outdated games. The lighting looks very fake. The textures are very raw and basic. The geometries are very primitive. The images are very pixelated and of poor CG quality. There are many subtitles in the footage. Overall, the video is unrealistic at all.", "prompt": prompt, "sigma_max": 50.0, } return request

def cosmos_sync_with_upload(client, input_video_path, output_dir, prompt, depth_video_path=None, seg_video_path=None): """上传文件,调用API进行增强,并下载结果。""" def upload_file(filepath): if not filepath or not filepath.exists(): return None print(f" Uploading: {filepath.name}") file_descriptor = gradio_utils.handle_file(str(filepath)) upload_result_str = client.predict(file_descriptor, api_name="/upload_file") return json.loads(upload_result_str).get("path")
remote_main_video_path = upload_file(input_video_path) if not remote_main_video_path: print(f" 主视频文件上传失败: {input_video_path.name}") return False, "主视频上传失败"
remote_depth_path = upload_file(depth_video_path) remote_seg_path = upload_file(seg_video_path) request_dict = create_cosmos_request(remote_main_video_path, prompt, remote_depth_path, remote_seg_path) print(" Sending generation request...") result = client.predict(json.dumps(request_dict), api_name="/generate_video") if result and isinstance(result, tuple) and len(result) >= 2: video_info, message = result print(video_info, message) if isinstance(video_info, dict) and "video" in video_info: video_path = video_info["video"]
if os.path.exists(video_path): output_file = Path(output_dir) / f"{Path(input_video_path).name}" import shutil shutil.copy2(video_path, output_file) return True, str(output_file)
if video_path.startswith(("http://", "https://")): import requests resp = requests.get(video_path, stream=True) output_file = Path(output_dir) / f"cosmos_{Path(input_video_path).name}" with open(output_file, "wb") as f: for chunk in resp.iter_content(chunk_size=8192): if chunk: f.write(chunk) return True, str(output_file)
def run_cosmos_augmentation_direct(mp4_input_dir, cosmos_output_dir, prompts_file_path): """ 主执行函数:遍历输入目录,为每个主视频调用增强服务。 """ input_dir = Path(mp4_input_dir) output_dir = Path(cosmos_output_dir) output_dir.mkdir(parents=True, exist_ok=True)
main_video_files = sorted(list(input_dir.glob("*cam.mp4"))) if not main_video_files: print(f"在目录 {input_dir} 中未找到任何 '*cam.mp4' 文件。") return 0, [] print(f"找到 {len(main_video_files)} 个主视频文件待处理。") prompts = load_prompts_from_file(prompts_file_path) client = gradio_client.Client(COSMOS_SERVICE_URL, hf_token=EAS_TOKEN) success_count = 0 failed_files = []
for idx, main_video_path in enumerate(main_video_files, 1): print("-" * 50) print(f"[{idx}/{len(main_video_files)}] Processing: {main_video_path.name}") base_name = main_video_path.name.replace("_cam.mp4", "") depth_path = main_video_path.with_name(f"{base_name}_cam_depth.mp4") seg_path = main_video_path.with_name(f"{base_name}_cam_shaded_segmentation.mp4")
depth_path = depth_path if depth_path.exists() else None seg_path = seg_path if seg_path.exists() else None prompt = prompts[(idx-1) % len(prompts)]
ok, result_info = cosmos_sync_with_upload( client, main_video_path, output_dir, prompt, depth_video_path=depth_path, seg_video_path=seg_path ) if ok: success_count += 1 print(result_info) else: failed_files.append(main_video_path.name)

print("\n" + "="*20 + " 处理完成统计 " + "="*20) print(f"成功: {success_count}/{len(main_video_files)}") print(f"失败: {len(failed_files)}") if failed_files: print("失败文件列表:", failed_files) return success_count, failed_files
# --- 程序入口 ---if __name__ == "__main__": if not mp4_output_dir.exists(): print(f"错误:输入目录 {mp4_output_dir} 不存在!") else: print("开始进行Cosmos视觉增强...") success_count, _ = run_cosmos_augmentation_direct( # mp4_output_dir, "/mnt/data/isaac_tmp/dataset/mimic_dataset_1k_mp4_backup", cosmos_output_dir, prompts_file ) print(f"Cosmos增强流程结束!共成功处理 {success_count} 个文件。")
复制代码


完成增强后的视频文件可以得到更逼真的视觉效果:

视频演示 >>

模仿学习

在本最佳实践中,使用简单的 BC-RNN 模型作为模仿学习的示例:

def train_model(task_name, dataset_path, model_name):    """    训练视觉运动BC代理    """    cmd = f"cd {workspace_dir} && /workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/imitation_learning/robomimic/train.py \    --task {task_name} --algo bc \    --dataset {dataset_path} \    --name {model_name}"        print(f"训练模型: {cmd}")    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)    print(result.stdout)        return f"logs/robomimic/{task_name}/{model_name}"
# 选择要使用的数据集if merged_dataset_path.exists(): training_dataset = merged_dataset_path print(f"使用合并数据集进行训练: {training_dataset}")elif cosmos_dataset_path.exists(): training_dataset = cosmos_dataset_path print(f"使用Cosmos数据集进行训练: {training_dataset}")elif mimic_dataset_path.exists(): training_dataset = mimic_dataset_path print(f"使用Mimic数据集进行训练: {training_dataset}")else: print("没有可用的训练数据集") training_dataset = None
if training_dataset: model_dir = train_model(task_name, training_dataset, model_name) print(f"模型训练完成,保存在: {model_dir}")
复制代码


同样,也可以使用 DLC 启动分布式任务来提高训练效率:

# 创建DLC作业。    create_job_resp = dlc_client.create_job(CreateJobRequest().from_map({        'WorkspaceId': workspace_id,        'DisplayName': display_name,        'JobType': 'PyTorchJob',        'ResourceId': resource_quota_id,        'JobSpecs': [            {                "Type": "Master",                "Image": image_uri,                "PodCount": 1,                # "EcsSpec": ecs_spec,                "ResourceConfig": {                    "CPU": "48",       # 指定CPU核心数                    "Memory": "256Gi",   # 指定内存大小(单位:GB)                    "GPU": "2"        # 指定GPU数量,与UserCommand中的--num_per_worker一致,以保证一个任务使用一个gpu                }            },        ],        'DataSources': [            {                "DataSourceId": dataset_id,                "MountPath": "/mnt/data",  # 挂载路径            },            {                "DataSourceId": pub_dataset_id,                "MountPath": "/mnt/isaac_assets",  # 挂载路径            }        ],       'UserVpc': {            "VpcId": vpc_id,  # 替换为实际 VPC ID            "SwitchId": switch_id,  # 替换为实际交换机 ID            "SecurityGroupId": security_groupid  # 替换为实际安全组 ID        },        "UserCommand": f"cd {workspace_dir} && \        /workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/imitation_learning/robomimic/train.py \        --task {task_name} --algo bc \        --dataset {dataset_path} \        --name {model_name} && \        sleep 30",    }))    job_id = create_job_resp.body.job_id
wait_for_job_to_terminate(dlc_client, job_id)
复制代码


完成训练后,BC-RNN 模型即可获得模仿人工演示任务的能力:

模型测评

可以使用以下代码,在不同的环境条件变化下,测试不同模型模仿人工演示任务的成功率

import glob
def evaluate_model(task_name, model_dir, log_dir, num_rollouts=15): """ 评估最新的训练权重 """ timestamp_dirs = glob.glob(f"{model_dir}/[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]") timestamp_dirs.sort(key=lambda x: os.path.getmtime(x), reverse=True) latest_dir = timestamp_dirs[0] model_dir_path = f"{latest_dir}/models" cmd = f"cd {workspace_dir} && /workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/imitation_learning/robomimic/robust_eval.py \ --task {task_name} \ --input_dir {model_dir_path} \ --log_dir {log_dir} \ --log_file result \ --enable_cameras \ --livestream 1 \ --seeds 0 \ --num_rollouts {num_rollouts} \ --headless" print(f"评估模型: {cmd}") print("\n 注意: 评估过程可能需要很长时间(超过一天),这是正常现象。") result = subprocess.run(cmd, shell=True, capture_output=True, text=True) print(result.stdout)
# 评估设置说明evaluation_settings = { "Vanilla": "与Mimic数据生成时完全相同的设置", "Light Intensity": "光照强度/亮度变化,其他方面保持不变", "Light Color": "光照颜色变化,其他方面保持不变", "Light Texture (Background)": "光照纹理/背景变化,其他方面保持不变", "Table Texture": "桌面视觉纹理变化,其他方面保持不变", "Robot Arm Texture": "机器人手臂视觉纹理变化,其他方面保持不变"}
print("评估设置说明:")for setting, description in evaluation_settings.items(): print(f"- {setting}: {description}")
# 执行评估(如果模型已训练)# 假如您希望使用预训练的模型,请取消下列路径其中之一的注释# model_name = "franka_stack_mimic_1k_table_only"# model_name = "franka_stack_mimic_2k_table_only"# model_name = "franka_stack_mimic_cosmos_2k_table_only"model_dir = f"{workspace_dir}/logs/robomimic/{task_name}/{model_name}"if Path(model_dir).exists(): log_dir = f"robust_results/{model_name}" evaluate_model(task_name, model_dir, log_dir)else: print("模型目录不存在,跳过评估步骤")
复制代码


如果一切正常,可以得到以下测试结果:


测试结果显示,在大部分环境条件变化下,使用数据扩增+数据增强训练得到的模型,总能得到最好的结果。尤其在光照强度、光照颜色和光照纹理变化的条件下,使用数据扩增+数据增强得到的模型,成功率得到了大幅提升。

总结

在本最佳实践中,基于阿里云 PAI 平台的特性,我们实现了基于 Isaac 仿真的操作动作数据扩增与模仿学习,包含从人工少量演示、数据扩增、数据增强、模仿学习再到模型测评的端到端实现:


  • 人工演示:在仿真环境下遥控机械臂本体,进行任务演示数据的记录

  • 数据扩增:使用 Isaac Lab Mimic 从少量专家演示生成大规模仿真数据集

  • 数据增强:通过 Cosmos-Transfer1 对视频数据进行增强,提升数据多样性

  • 模仿学习:基于 robomimic 训练 BC-RNN 视觉运动策略模型


Cosmos 数据增强后的训练模型在各个场景下的成功率均有较高提升,这一工作流程为具身智能的机器人视觉操作提供了一套完整的技术解决方案,在 sim2real 的训练过程中提高了模型在复杂视觉环境下的泛化能力。

用户头像

还未添加个人签名 2020-10-15 加入

分享阿里云计算平台的大数据和AI方向的技术创新和趋势、实战案例、经验总结。

评论

发布
暂无评论
PAI Physical AI Notebook详解1:基于Isaac仿真的操作动作数据扩增与模仿学习_阿里云_阿里云大数据AI技术_InfoQ写作社区