Object Based Synthetic Dataset Generation#
Example of using Isaac Sim and Replicator to generate object centric synthetic datasets. The script spawns labeled and distractors assets in a predefined area (closed off with invisible collision walls) and captures scenes from multiple camera viewpoints. The script also demonstrates how to randomize the camera poses, apply random velocities to the objects, and trigger custom events to randomize the scene. The randomizers can be Replicator-based or custom Isaac Sim/USD API based and can be triggered at specific times.
Learning Objectives#
The goal of this tutorial is to demonstrate how to use Isaac Sim and replicator randomizers in a hybrid way in simulated environments. The tutorial covers the following topics:
How to create a custom USD stage and add rigid-body enabled assets with colliders.
How to spawn and add colliders and rigid body dynamics to assets.
How to create a collision box area around the assets to prevent them from drifting away.
How to add a physics scene and set custom physics settings.
How to create custom randomizers and trigger them at specific times.
How to randomize the camera poses to look at a random target asset.
How to randomize the shape distractors colors and apply random velocities to the floating shape distractors.
How to randomize the lights in the working area and the dome background.
How to capture motion blur by combining the number of pathtraced subframes samples simulated for the given duration.
How to enable motion blur and set the number of sub samples to render for motion blur in PathTracing mode.
How to set the render mode to PathTracing.
How to create a custom synthetic dataset generation pipeline.
Performance optimization by enabling rendering and data processing only for the frames to be captured.
Use custom writers to export the data.
Prerequisites#
Familiarity with USD / Isaac Sim APIs for creating and manipulating USD stages.
Familiarity with omni.replicator, its writers, and randomizers.
Basic understanding of OmniGraph for the Replicator randomization and trigger pipeline.
Familiarity with rigid-body dynamics and physics simulation in Isaac Sim.
Running simulations as Standalone Applications or via the Script Editor.
Getting Started#
The main script of the tutorial is located at <install_path>/standalone_examples/replicator/object_based_sdg/object_based_sdg.py
with its util functions at <install_path>/standalone_examples/replicator/object_based_sdg/object_based_sdg_utils.py
.
The script can be run as a standalone application using:
./python.sh standalone_examples/replicator/object_based_sdg/object_based_sdg.py
To overwrite the default configuration parameters, you can provide custom config files as a command-line argument for the script by using --config <path/to/file.json/yaml>
. Example config files are stored in <install_path>/standalone_examples/replicator/object_based_sdg/config/*
.
Example of running the script with a custom config file:
./python.sh standalone_examples/replicator/object_based_sdg/object_based_sdg.py \
--config standalone_examples/replicator/object_based_sdg/config/<example_config>.yaml
Full code example for the Script Editor:
Script Editor
import asyncio
import os
import random
import time
from itertools import chain
import carb.settings
import numpy as np
import omni.kit.app
import omni.replicator.core as rep
import omni.timeline
import omni.usd
import usdrt
from omni.isaac.core.utils.semantics import add_update_semantics, remove_all_semantics
from omni.isaac.nucleus import get_assets_root_path
from omni.kit.viewport.utility import get_active_viewport
from omni.physx import get_physx_interface, get_physx_scene_query_interface
from pxr import Gf, PhysxSchema, Sdf, Usd, UsdGeom, UsdPhysics
# Add transformation properties to the prim (if not already present)
def set_transform_attributes(prim, location=None, orientation=None, rotation=None, scale=None):
if location is not None:
if not prim.HasAttribute("xformOp:translate"):
UsdGeom.Xformable(prim).AddTranslateOp()
prim.GetAttribute("xformOp:translate").Set(location)
if orientation is not None:
if not prim.HasAttribute("xformOp:orient"):
UsdGeom.Xformable(prim).AddOrientOp()
prim.GetAttribute("xformOp:orient").Set(orientation)
if rotation is not None:
if not prim.HasAttribute("xformOp:rotateXYZ"):
UsdGeom.Xformable(prim).AddRotateXYZOp()
prim.GetAttribute("xformOp:rotateXYZ").Set(rotation)
if scale is not None:
if not prim.HasAttribute("xformOp:scale"):
UsdGeom.Xformable(prim).AddScaleOp()
prim.GetAttribute("xformOp:scale").Set(scale)
# Enables collisions with the asset (without rigid body dynamics the asset will be static)
def add_colliders(root_prim):
# Iterate descendant prims (including root) and add colliders to mesh or primitive types
for desc_prim in Usd.PrimRange(root_prim):
if desc_prim.IsA(UsdGeom.Mesh) or desc_prim.IsA(UsdGeom.Gprim):
# Physics
if not desc_prim.HasAPI(UsdPhysics.CollisionAPI):
collision_api = UsdPhysics.CollisionAPI.Apply(desc_prim)
else:
collision_api = UsdPhysics.CollisionAPI(desc_prim)
collision_api.CreateCollisionEnabledAttr(True)
# PhysX
if not desc_prim.HasAPI(PhysxSchema.PhysxCollisionAPI):
physx_collision_api = PhysxSchema.PhysxCollisionAPI.Apply(desc_prim)
else:
physx_collision_api = PhysxSchema.PhysxCollisionAPI(desc_prim)
# Set PhysX specific properties
physx_collision_api.CreateContactOffsetAttr(0.001)
physx_collision_api.CreateRestOffsetAttr(0.0)
# Add mesh specific collision properties only to mesh types
if desc_prim.IsA(UsdGeom.Mesh):
# Add mesh collision properties to the mesh (e.g. collider approximation type)
if not desc_prim.HasAPI(UsdPhysics.MeshCollisionAPI):
mesh_collision_api = UsdPhysics.MeshCollisionAPI.Apply(desc_prim)
else:
mesh_collision_api = UsdPhysics.MeshCollisionAPI(desc_prim)
mesh_collision_api.CreateApproximationAttr().Set("convexHull")
# Check if prim (or its descendants) has colliders
def has_colliders(root_prim):
for desc_prim in Usd.PrimRange(root_prim):
if desc_prim.HasAPI(UsdPhysics.CollisionAPI):
return True
return False
# Enables rigid body dynamics (physics simulation) on the prim
def add_rigid_body_dynamics(prim, disable_gravity=False, angular_damping=None):
if has_colliders(prim):
# Physics
if not prim.HasAPI(UsdPhysics.RigidBodyAPI):
rigid_body_api = UsdPhysics.RigidBodyAPI.Apply(prim)
else:
rigid_body_api = UsdPhysics.RigidBodyAPI(prim)
rigid_body_api.CreateRigidBodyEnabledAttr(True)
# PhysX
if not prim.HasAPI(PhysxSchema.PhysxRigidBodyAPI):
physx_rigid_body_api = PhysxSchema.PhysxRigidBodyAPI.Apply(prim)
else:
physx_rigid_body_api = PhysxSchema.PhysxRigidBodyAPI(prim)
physx_rigid_body_api.GetDisableGravityAttr().Set(disable_gravity)
if angular_damping is not None:
physx_rigid_body_api.CreateAngularDampingAttr().Set(angular_damping)
else:
print(f"Prim '{prim.GetPath()}' has no colliders. Skipping rigid body dynamics properties.")
# Add dynamics properties to the prim (if mesh or primitive) (rigid body to root + colliders to the meshes)
# https://docs.omniverse.nvidia.com/extensions/latest/ext_physics/rigid-bodies.html#rigid-body-simulation
def add_colliders_and_rigid_body_dynamics(prim, disable_gravity=False):
# Add colliders to mesh or primitive types of the descendants of the prim (including root)
add_colliders(prim)
# Add rigid body dynamics properties (to the root only) only if it has colliders
add_rigid_body_dynamics(prim, disable_gravity=disable_gravity)
# Create a collision box area wrapping the given working area with origin in (0, 0, 0) with thickness towards outside
def create_collision_box_walls(stage, path, width, depth, height, thickness=0.5, visible=False):
# Define the walls (name, location, size) with thickness towards outside of the working area
walls = [
("floor", (0, 0, (height + thickness) / -2.0), (width, depth, thickness)),
("ceiling", (0, 0, (height + thickness) / 2.0), (width, depth, thickness)),
("left_wall", ((width + thickness) / -2.0, 0, 0), (thickness, depth, height)),
("right_wall", ((width + thickness) / 2.0, 0, 0), (thickness, depth, height)),
("front_wall", (0, (depth + thickness) / 2.0, 0), (width, thickness, height)),
("back_wall", (0, (depth + thickness) / -2.0, 0), (width, thickness, height)),
]
for name, location, size in walls:
prim = stage.DefinePrim(f"{path}/{name}", "Cube")
scale = (size[0] / 2.0, size[1] / 2.0, size[2] / 2.0)
set_transform_attributes(prim, location=location, scale=scale)
add_colliders(prim)
if not visible:
UsdGeom.Imageable(prim).MakeInvisible()
# Create a random transformation values for location, rotation, and scale
def get_random_transform_values(
loc_min=(0, 0, 0), loc_max=(1, 1, 1), rot_min=(0, 0, 0), rot_max=(360, 360, 360), scale_min_max=(0.1, 1.0)
):
location = (
random.uniform(loc_min[0], loc_max[0]),
random.uniform(loc_min[1], loc_max[1]),
random.uniform(loc_min[2], loc_max[2]),
)
rotation = (
random.uniform(rot_min[0], rot_max[0]),
random.uniform(rot_min[1], rot_max[1]),
random.uniform(rot_min[2], rot_max[2]),
)
scale = tuple([random.uniform(scale_min_max[0], scale_min_max[1])] * 3)
return location, rotation, scale
# Generate a random pose on a sphere looking at the origin
# https://docs.omniverse.nvidia.com/isaacsim/latest/reference_conventions.html
def get_random_pose_on_sphere(origin, radius, camera_forward_axis=(0, 0, -1)):
origin = Gf.Vec3f(origin)
camera_forward_axis = Gf.Vec3f(camera_forward_axis)
# Generate random angles for spherical coordinates
theta = np.random.uniform(0, 2 * np.pi)
phi = np.arcsin(np.random.uniform(-1, 1))
# Spherical to Cartesian conversion
x = radius * np.cos(theta) * np.cos(phi)
y = radius * np.sin(phi)
z = radius * np.sin(theta) * np.cos(phi)
location = origin + Gf.Vec3f(x, y, z)
# Calculate direction vector from camera to look_at point
direction = origin - location
direction_normalized = direction.GetNormalized()
# Calculate rotation from forward direction (rotateFrom) to direction vector (rotateTo)
rotation = Gf.Rotation(Gf.Vec3d(camera_forward_axis), Gf.Vec3d(direction_normalized))
orientation = Gf.Quatf(rotation.GetQuat())
return location, orientation
# Enable or disable the render products and viewport rendering
def set_render_products_updates(render_products, enabled, include_viewport=False):
for rp in render_products:
rp.hydra_texture.set_updates_enabled(enabled)
if include_viewport:
get_active_viewport().updates_enabled = enabled
async def run_example_async(config):
# Isaac nucleus assets root path
assets_root_path = get_assets_root_path()
stage = None
# ENVIRONMENT
# Create an empty or load a custom stage (clearing any previous semantics)
env_url = config.get("env_url", "")
if env_url:
env_path = env_url if env_url.startswith("omniverse://") else assets_root_path + env_url
omni.usd.get_context().open_stage(env_path)
stage = omni.usd.get_context().get_stage()
# Remove any previous semantics in the loaded stage
for prim in stage.Traverse():
remove_all_semantics(prim)
else:
omni.usd.get_context().new_stage()
stage = omni.usd.get_context().get_stage()
# Add a distant light to the empty stage
distant_light = stage.DefinePrim("/World/Lights/DistantLight", "DistantLight")
distant_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(400.0)
if not distant_light.HasAttribute("xformOp:rotateXYZ"):
UsdGeom.Xformable(distant_light).AddRotateXYZOp()
distant_light.GetAttribute("xformOp:rotateXYZ").Set((0, 60, 0))
# Get the working area size and bounds (width=x, depth=y, height=z)
working_area_size = config.get("working_area_size", (3, 3, 3))
working_area_min = (working_area_size[0] / -2, working_area_size[1] / -2, working_area_size[2] / -2)
working_area_max = (working_area_size[0] / 2, working_area_size[1] / 2, working_area_size[2] / 2)
# Create a collision box area around the assets to prevent them from drifting away
create_collision_box_walls(
stage, "/World/CollisionWalls", working_area_size[0], working_area_size[1], working_area_size[2]
)
# Create a physics scene to add or modify custom physics settings
usdrt_stage = usdrt.Usd.Stage.Attach(omni.usd.get_context().get_stage_id())
physics_scenes = usdrt_stage.GetPrimsWithAppliedAPIName("PhysxSceneAPI")
if physics_scenes:
physics_scene = physics_scenes[0]
else:
physics_scene = UsdPhysics.Scene.Define(stage, "/PhysicsScene")
physx_scene = PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/PhysicsScene"))
physx_scene.GetTimeStepsPerSecondAttr().Set(60)
# TRAINING ASSETS
# Add the objects to be trained in the environment with their labels and properties
labeled_assets_and_properties = config.get("labeled_assets_and_properties", [])
floating_labeled_prims = []
falling_labeled_prims = []
labeled_prims = []
for obj in labeled_assets_and_properties:
obj_url = obj.get("url", "")
label = obj.get("label", "unknown")
count = obj.get("count", 1)
floating = obj.get("floating", False)
scale_min_max = obj.get("randomize_scale", (1, 1))
for i in range(count):
# Create a prim and add the asset reference
rand_loc, rand_rot, rand_scale = get_random_transform_values(
loc_min=working_area_min, loc_max=working_area_max, scale_min_max=scale_min_max
)
prim_path = omni.usd.get_stage_next_free_path(stage, f"/World/Labeled/{label}", False)
prim = stage.DefinePrim(prim_path, "Xform")
asset_path = obj_url if obj_url.startswith("omniverse://") else assets_root_path + obj_url
prim.GetReferences().AddReference(asset_path)
set_transform_attributes(prim, location=rand_loc, rotation=rand_rot, scale=rand_scale)
add_colliders(prim)
add_rigid_body_dynamics(prim, disable_gravity=floating)
# Label the asset (any previous 'class' label will be overwritten)
add_update_semantics(prim, label)
if floating:
floating_labeled_prims.append(prim)
else:
falling_labeled_prims.append(prim)
labeled_prims = floating_labeled_prims + falling_labeled_prims
# DISTRACTORS
# Add shape distractors to the environment as floating or falling objects
shape_distractors_types = config.get("shape_distractors_types", ["capsule", "cone", "cylinder", "sphere", "cube"])
shape_distractors_scale_min_max = config.get("shape_distractors_scale_min_max", (0.02, 0.2))
shape_distractors_num = config.get("shape_distractors_num", 350)
shape_distractors = []
floating_shape_distractors = []
falling_shape_distractors = []
for i in range(shape_distractors_num):
rand_loc, rand_rot, rand_scale = get_random_transform_values(
loc_min=working_area_min, loc_max=working_area_max, scale_min_max=shape_distractors_scale_min_max
)
rand_shape = random.choice(shape_distractors_types)
prim_path = omni.usd.get_stage_next_free_path(stage, f"/World/Distractors/{rand_shape}", False)
prim = stage.DefinePrim(prim_path, rand_shape.capitalize())
set_transform_attributes(prim, location=rand_loc, rotation=rand_rot, scale=rand_scale)
add_colliders(prim)
disable_gravity = random.choice([True, False])
add_rigid_body_dynamics(prim, disable_gravity)
if disable_gravity:
floating_shape_distractors.append(prim)
else:
falling_shape_distractors.append(prim)
shape_distractors.append(prim)
# Add mesh distractors to the environment as floating of falling objects
mesh_distactors_urls = config.get("mesh_distractors_urls", [])
mesh_distactors_scale_min_max = config.get("mesh_distractors_scale_min_max", (0.1, 2.0))
mesh_distactors_num = config.get("mesh_distractors_num", 10)
mesh_distractors = []
floating_mesh_distractors = []
falling_mesh_distractors = []
for i in range(mesh_distactors_num):
rand_loc, rand_rot, rand_scale = get_random_transform_values(
loc_min=working_area_min, loc_max=working_area_max, scale_min_max=mesh_distactors_scale_min_max
)
mesh_url = random.choice(mesh_distactors_urls)
prim_name = os.path.basename(mesh_url).split(".")[0]
prim_path = omni.usd.get_stage_next_free_path(stage, f"/World/Distractors/{prim_name}", False)
prim = stage.DefinePrim(prim_path, "Xform")
asset_path = mesh_url if mesh_url.startswith("omniverse://") else assets_root_path + mesh_url
prim.GetReferences().AddReference(asset_path)
set_transform_attributes(prim, location=rand_loc, rotation=rand_rot, scale=rand_scale)
add_colliders(prim)
disable_gravity = random.choice([True, False])
add_rigid_body_dynamics(prim, disable_gravity=disable_gravity)
if disable_gravity:
floating_mesh_distractors.append(prim)
else:
falling_mesh_distractors.append(prim)
mesh_distractors.append(prim)
# Remove any previous semantics on the mesh distractor
remove_all_semantics(prim, recursive=True)
# REPLICATOR
# Disable capturing every frame (capture will be triggered manually using the step function)
rep.orchestrator.set_capture_on_play(False)
# Create the camera prims and their properties
cameras = []
num_cameras = config.get("num_cameras", 1)
camera_properties_kwargs = config.get("camera_properties_kwargs", {})
for i in range(num_cameras):
# Create camera and add its properties (focal length, focus distance, f-stop, clipping range, etc.)
cam_prim = stage.DefinePrim(f"/World/Cameras/cam_{i}", "Camera")
for key, value in camera_properties_kwargs.items():
if cam_prim.HasAttribute(key):
cam_prim.GetAttribute(key).Set(value)
else:
print(f"Unknown camera attribute with {key}:{value}")
cameras.append(cam_prim)
# Add collision spheres (disabled by default) to cameras to avoid objects overlaping with the camera view
camera_colliders = []
camera_collider_radius = config.get("camera_collider_radius", 0)
if camera_collider_radius > 0:
for cam in cameras:
cam_path = cam.GetPath()
cam_collider = stage.DefinePrim(f"{cam_path}/CollisionSphere", "Sphere")
cam_collider.GetAttribute("radius").Set(camera_collider_radius)
add_colliders(cam_collider)
collision_api = UsdPhysics.CollisionAPI(cam_collider)
collision_api.GetCollisionEnabledAttr().Set(False)
UsdGeom.Imageable(cam_collider).MakeInvisible()
camera_colliders.append(cam_collider)
# Wait an app update to ensure the prim changes are applied
await omni.kit.app.get_app().next_update_async()
# Create render products using the cameras
render_products = []
resolution = config.get("resolution", (640, 480))
for cam in cameras:
rp = rep.create.render_product(cam.GetPath(), resolution)
render_products.append(rp)
# Enable rendering only at capture time
disable_render_products_between_captures = config.get("disable_render_products_between_captures", True)
if disable_render_products_between_captures:
set_render_products_updates(render_products, False, include_viewport=False)
# Create the writer and attach the render products
writer_type = config.get("writer_type", None)
writer_kwargs = config.get("writer_kwargs", {})
# If not an absolute path, set it relative to the current working directory
if out_dir := writer_kwargs.get("output_dir"):
if not os.path.isabs(out_dir):
out_dir = os.path.join(os.getcwd(), out_dir)
writer_kwargs["output_dir"] = out_dir
print(f"[SDG] Writing data to: {out_dir}")
if writer_type is not None and len(render_products) > 0:
writer = rep.writers.get(writer_type)
writer.initialize(**writer_kwargs)
writer.attach(render_products)
# RANDOMIZERS
# Apply a random (mostly) uppwards velocity to the objects overlapping the 'bounce' area
def on_overlap_hit(hit):
prim = stage.GetPrimAtPath(hit.rigid_body)
# Skip the camera collision spheres
if prim not in camera_colliders:
rand_vel = (random.uniform(-2, 2), random.uniform(-2, 2), random.uniform(4, 8))
prim.GetAttribute("physics:velocity").Set(rand_vel)
return True # return True to continue the query
# Area to check for overlapping objects (above the bottom collision box)
overlap_area_thickness = 0.1
overlap_area_origin = (0, 0, (-working_area_size[2] / 2) + (overlap_area_thickness / 2))
overlap_area_extent = (
working_area_size[0] / 2 * 0.99,
working_area_size[1] / 2 * 0.99,
overlap_area_thickness / 2 * 0.99,
)
# Triggered every physics update step to check for overlapping objects
def on_physics_step(dt: float):
hit_info = get_physx_scene_query_interface().overlap_box(
carb.Float3(overlap_area_extent),
carb.Float3(overlap_area_origin),
carb.Float4(0, 0, 0, 1),
on_overlap_hit,
False, # pass 'False' to indicate an 'overlap multiple' query.
)
# Subscribe to the physics step events to check for objects overlapping the 'bounce' area
physx_sub = get_physx_interface().subscribe_physics_step_events(on_physics_step)
# Pull assets towards the working area center by applying a random velocity towards the given target
def apply_velocities_towards_target(assets, target=(0, 0, 0)):
for prim in assets:
loc = prim.GetAttribute("xformOp:translate").Get()
strength = random.uniform(0.1, 1.0)
pull_vel = (
(target[0] - loc[0]) * strength,
(target[1] - loc[1]) * strength,
(target[2] - loc[2]) * strength,
)
prim.GetAttribute("physics:velocity").Set(pull_vel)
# Randomize camera poses to look at a random target asset (random distance and center offset)
camera_distance_to_target_min_max = config.get("camera_distance_to_target_min_max", (0.1, 0.5))
camera_look_at_target_offset = config.get("camera_look_at_target_offset", 0.2)
def randomize_camera_poses():
for cam in cameras:
# Get a random target asset to look at
target_asset = random.choice(labeled_prims)
# Add a look_at offset so the target is not always in the center of the camera view
loc_offset = (
random.uniform(-camera_look_at_target_offset, camera_look_at_target_offset),
random.uniform(-camera_look_at_target_offset, camera_look_at_target_offset),
random.uniform(-camera_look_at_target_offset, camera_look_at_target_offset),
)
target_loc = target_asset.GetAttribute("xformOp:translate").Get() + loc_offset
# Get a random distance to the target asset
distance = random.uniform(camera_distance_to_target_min_max[0], camera_distance_to_target_min_max[1])
# Get a random pose of the camera looking at the target asset from the given distance
cam_loc, quat = get_random_pose_on_sphere(origin=target_loc, radius=distance)
set_transform_attributes(cam, location=cam_loc, orientation=quat)
# Temporarily enable camera colliders and simulate for the given number of frames to push out any overlapping objects
async def simulate_camera_collision_async(num_frames=1):
for cam_collider in camera_colliders:
collision_api = UsdPhysics.CollisionAPI(cam_collider)
collision_api.GetCollisionEnabledAttr().Set(True)
if not timeline.is_playing():
timeline.play()
for _ in range(num_frames):
await omni.kit.app.get_app().next_update_async()
for cam_collider in camera_colliders:
collision_api = UsdPhysics.CollisionAPI(cam_collider)
collision_api.GetCollisionEnabledAttr().Set(False)
# Create a randomizer for the shape distractors colors, manually triggered at custom events
with rep.trigger.on_custom_event(event_name="randomize_shape_distractor_colors"):
shape_distractors_paths = [
prim.GetPath() for prim in chain(floating_shape_distractors, falling_shape_distractors)
]
shape_distractors_group = rep.create.group(shape_distractors_paths)
with shape_distractors_group:
rep.randomizer.color(colors=rep.distribution.uniform((0, 0, 0), (1, 1, 1)))
# Create a randomizer to apply random velocities to the floating shape distractors
with rep.trigger.on_custom_event(event_name="randomize_floating_distractor_velocities"):
shape_distractors_paths = [
prim.GetPath() for prim in chain(floating_shape_distractors, floating_mesh_distractors)
]
shape_distractors_group = rep.create.group(shape_distractors_paths)
with shape_distractors_group:
rep.physics.rigid_body(
velocity=rep.distribution.uniform((-2.5, -2.5, -2.5), (2.5, 2.5, 2.5)),
angular_velocity=rep.distribution.uniform((-45, -45, -45), (45, 45, 45)),
)
# Create a randomizer for lights in the working area, manually triggered at custom events
with rep.trigger.on_custom_event(event_name="randomize_lights"):
lights = rep.create.light(
light_type="Sphere",
color=rep.distribution.uniform((0, 0, 0), (1, 1, 1)),
temperature=rep.distribution.normal(6500, 500),
intensity=rep.distribution.normal(35000, 5000),
position=rep.distribution.uniform(working_area_min, working_area_max),
scale=rep.distribution.uniform(0.1, 1),
count=3,
)
# Create a randomizer for the dome background, manually triggered at custom events
with rep.trigger.on_custom_event(event_name="randomize_dome_background"):
dome_textures = [
assets_root_path + "/NVIDIA/Assets/Skies/Indoor/autoshop_01_4k.hdr",
assets_root_path + "/NVIDIA/Assets/Skies/Indoor/carpentry_shop_01_4k.hdr",
assets_root_path + "/NVIDIA/Assets/Skies/Indoor/hotel_room_4k.hdr",
assets_root_path + "/NVIDIA/Assets/Skies/Indoor/wooden_lounge_4k.hdr",
]
dome_light = rep.create.light(light_type="Dome")
with dome_light:
rep.modify.attribute("inputs:texture:file", rep.distribution.choice(dome_textures))
rep.randomizer.rotation()
# Capture motion blur by combining the number of pathtraced subframes samples simulated for the given duration
async def capture_with_motion_blur_and_pathtracing_async(duration=0.05, num_samples=8, spp=64):
# For small step sizes the physics FPS needs to be temporarily increased to provide movements every syb sample
orig_physics_fps = physx_scene.GetTimeStepsPerSecondAttr().Get()
target_physics_fps = 1 / duration * num_samples
if target_physics_fps > orig_physics_fps:
print(f"[SDG] Changing physics FPS from {orig_physics_fps} to {target_physics_fps}")
physx_scene.GetTimeStepsPerSecondAttr().Set(target_physics_fps)
# Enable motion blur (if not enabled)
is_motion_blur_enabled = carb.settings.get_settings().get("/omni/replicator/captureMotionBlur")
if not is_motion_blur_enabled:
carb.settings.get_settings().set("/omni/replicator/captureMotionBlur", True)
# Number of sub samples to render for motion blur in PathTracing mode
carb.settings.get_settings().set("/omni/replicator/pathTracedMotionBlurSubSamples", num_samples)
# Set the render mode to PathTracing
prev_render_mode = carb.settings.get_settings().get("/rtx/rendermode")
carb.settings.get_settings().set("/rtx/rendermode", "PathTracing")
carb.settings.get_settings().set("/rtx/pathtracing/spp", spp)
carb.settings.get_settings().set("/rtx/pathtracing/totalSpp", spp)
carb.settings.get_settings().set("/rtx/pathtracing/optixDenoiser/enabled", 0)
# Make sure the timeline is playing
if not timeline.is_playing():
timeline.play()
# Capture the frame by advancing the simulation for the given duration and combining the sub samples
await rep.orchestrator.step_async(delta_time=duration, pause_timeline=False)
# Restore the original physics FPS
if target_physics_fps > orig_physics_fps:
print(f"[SDG] Restoring physics FPS from {target_physics_fps} to {orig_physics_fps}")
physx_scene.GetTimeStepsPerSecondAttr().Set(orig_physics_fps)
# Restore the previous render and motion blur settings
carb.settings.get_settings().set("/omni/replicator/captureMotionBlur", is_motion_blur_enabled)
print(f"[SDG] Restoring render mode from 'PathTracing' to '{prev_render_mode}'")
carb.settings.get_settings().set("/rtx/rendermode", prev_render_mode)
# Update the app until a given simulation duration has passed (simulate the world between captures)
async def run_simulation_loop_async(duration):
timeline = omni.timeline.get_timeline_interface()
if not timeline.is_playing():
timeline.play()
elapsed_time = 0.0
previous_time = timeline.get_current_time()
app_updates_counter = 0
while elapsed_time <= duration:
await omni.kit.app.get_app().next_update_async()
print(f"\t [{i}] Simulation loop at {timeline.get_current_time():.2f}")
elapsed_time += timeline.get_current_time() - previous_time
previous_time = timeline.get_current_time()
app_updates_counter += 1
print(
f"\t Simulation loop at {timeline.get_current_time():.2f}, current elapsed time: {elapsed_time:.2f}, counter: {app_updates_counter}"
)
print(
f"[SDG] Simulation loop finished in {elapsed_time:.2f} seconds at {timeline.get_current_time():.2f} with {app_updates_counter} app updates."
)
# SDG
# Number of frames to capture
num_frames = config.get("num_frames", 10)
# Increase subframes if materials are not loaded on time, or ghosting artifacts appear on moving objects,
# see: https://docs.omniverse.nvidia.com/extensions/latest/ext_replicator/subframes_examples.html
rt_subframes = config.get("rt_subframes", -1)
# Amount of simulation time to wait between captures
sim_duration_between_captures = config.get("simulation_duration_between_captures", 0.025)
# Initial trigger for randomizers before the SDG loop with several app updates (ensures materials/textures are loaded)
rep.utils.send_og_event(event_name="randomize_shape_distractor_colors")
rep.utils.send_og_event(event_name="randomize_dome_background")
for _ in range(5):
await omni.kit.app.get_app().next_update_async()
# Set the timeline parameters (start, end, no looping) and start the timeline
timeline = omni.timeline.get_timeline_interface()
timeline.set_start_time(0)
timeline.set_end_time(1000000)
timeline.set_looping(False)
# If no custom physx scene is created, a default one will be created by the physics engine once the timeline starts
timeline.play()
timeline.commit()
await omni.kit.app.get_app().next_update_async()
# Store the wall start time for stats
wall_time_start = time.perf_counter()
# Run the simulation and capture data triggering randomizations and actions at custom frame intervals
for i in range(num_frames):
# Cameras will be moved to a random position and look at a randomly selected labeled asset
if i % 3 == 0:
print(f"\t Randomizing camera poses")
randomize_camera_poses()
# Temporarily enable camera colliders and simulate for a few frames to push out any overlapping objects
if camera_colliders:
await simulate_camera_collision_async(num_frames=4)
# Apply a random velocity towards the origin to the working area to pull the assets closer to the center
if i % 10 == 0:
print(f"\t Applying velocity towards the origin")
apply_velocities_towards_target(chain(labeled_prims, shape_distractors, mesh_distractors))
# Randomize lights locations and colors
if i % 5 == 0:
print(f"\t Randomizing lights")
rep.utils.send_og_event(event_name="randomize_lights")
# Randomize the colors of the primitive shape distractors
if i % 15 == 0:
print(f"\t Randomizing shape distractors colors")
rep.utils.send_og_event(event_name="randomize_shape_distractor_colors")
# Randomize the texture of the dome background
if i % 25 == 0:
print(f"\t Randomizing dome background")
rep.utils.send_og_event(event_name="randomize_dome_background")
# Apply a random velocity on the floating distractors (shapes and meshes)
if i % 17 == 0:
print(f"\t Randomizing shape distractors velocities")
rep.utils.send_og_event(event_name="randomize_floating_distractor_velocities")
# Enable render products only at capture time
if disable_render_products_between_captures:
set_render_products_updates(render_products, True, include_viewport=False)
# Capture the current frame
print(f"[SDG] Capturing frame {i}/{num_frames}, at simulation time: {timeline.get_current_time():.2f}")
if i % 5 == 0:
await capture_with_motion_blur_and_pathtracing_async(duration=0.025, num_samples=8, spp=128)
else:
await rep.orchestrator.step_async(delta_time=0.0, rt_subframes=rt_subframes, pause_timeline=False)
# Disable render products between captures
if disable_render_products_between_captures:
set_render_products_updates(render_products, False, include_viewport=False)
# Run the simulation for a given duration between frame captures
if sim_duration_between_captures > 0:
await run_simulation_loop_async(sim_duration_between_captures)
else:
await omni.kit.app.get_app().next_update_async()
# Wait for the data to be written (default writer backends are asynchronous)
await rep.orchestrator.wait_until_complete_async()
# Get the stats
wall_duration = time.perf_counter() - wall_time_start
sim_duration = timeline.get_current_time()
avg_frame_fps = num_frames / wall_duration
num_captures = num_frames * num_cameras
avg_capture_fps = num_captures / wall_duration
print(
f"[SDG] Captured {num_frames} frames, {num_captures} entries (frames * cameras) in {wall_duration:.2f} seconds.\n"
f"\t Simulation duration: {sim_duration:.2f}\n"
f"\t Simulation duration between captures: {sim_duration_between_captures:.2f}\n"
f"\t Average frame FPS: {avg_frame_fps:.2f}\n"
f"\t Average capture entries (frames * cameras) FPS: {avg_capture_fps:.2f}\n"
)
# Unsubscribe the physics overlap checks and stop the timeline
physx_sub.unsubscribe()
physx_sub = None
await omni.kit.app.get_app().next_update_async()
timeline.stop()
config = {
"launch_config": {
"renderer": "RayTracedLighting",
"headless": False,
},
"env_url": "",
"working_area_size": (5, 5, 3),
"rt_subframes": 4,
"num_frames": 10,
"num_cameras": 2,
"camera_collider_radius": 1.25,
"disable_render_products_between_captures": False,
"simulation_duration_between_captures": 0.05,
"resolution": (640, 480),
"camera_properties_kwargs": {
"focalLength": 24.0,
"focusDistance": 400,
"fStop": 0.0,
"clippingRange": (0.01, 10000),
},
"camera_look_at_target_offset": 0.15,
"camera_distance_to_target_min_max": (0.25, 0.75),
"writer_type": "PoseWriter",
"writer_kwargs": {
"output_dir": "_out_obj_based_sdg_pose_writer",
"format": None,
"use_subfolders": False,
"write_debug_images": True,
"skip_empty_frames": False,
},
"labeled_assets_and_properties": [
{
"url": "/Isaac/Props/YCB/Axis_Aligned/008_pudding_box.usd",
"label": "pudding_box",
"count": 5,
"floating": True,
"scale_min_max": (0.85, 1.25),
},
{
"url": "/Isaac/Props/YCB/Axis_Aligned_Physics/006_mustard_bottle.usd",
"label": "mustard_bottle",
"count": 7,
"floating": False,
"scale_min_max": (0.85, 3.25),
},
],
"shape_distractors_types": ["capsule", "cone", "cylinder", "sphere", "cube"],
"shape_distractors_scale_min_max": (0.015, 0.15),
"shape_distractors_num": 150,
"mesh_distractors_urls": [
"/Isaac/Environments/Simple_Warehouse/Props/SM_CardBoxD_04_1847.usd",
"/Isaac/Environments/Simple_Warehouse/Props/SM_CardBoxA_01_414.usd",
"/Isaac/Environments/Simple_Warehouse/Props/S_TrafficCone.usd",
"/Isaac/Environments/Simple_Warehouse/Props/S_WetFloorSign.usd",
"/Isaac/Environments/Simple_Warehouse/Props/SM_BarelPlastic_B_03.usd",
"/Isaac/Environments/Office/Props/SM_Board.usd",
"/Isaac/Environments/Office/Props/SM_Book_03.usd",
"/Isaac/Environments/Office/Props/SM_Book_34.usd",
"/Isaac/Environments/Office/Props/SM_BookOpen_01.usd",
"/Isaac/Environments/Office/Props/SM_Briefcase.usd",
"/Isaac/Environments/Office/Props/SM_Extinguisher.usd",
"/Isaac/Environments/Hospital/Props/SM_GasCart_01b.usd",
"/Isaac/Environments/Hospital/Props/SM_MedicalBag_01a.usd",
"/Isaac/Environments/Hospital/Props/SM_MedicalBox_01g.usd",
"/Isaac/Environments/Hospital/Props/SM_Toweldispenser_01a.usd",
],
"mesh_distractors_scale_min_max": (0.35, 1.35),
"mesh_distractors_num": 175,
}
asyncio.ensure_future(run_example_async(config))
Implementation#
The following section provides an implementation overview of the script. It includes details regarding the configuration parameters, scene generation helper functions, randomizations (Isaac Sim and Replicator), and data capture loop.
Config#
The script has the following main configuration parameters:
launch_config (dict): Configuration for the launch settings, such as the renderer and headless mode.
env_url (str): The URL of the environment to load, if empty a new empty stage is created.
working_area_size (tuple): The size of the area (width, depth, height) in which the objects will be placed, this area will be surrounded by invisible collision walls to prevent objects from drifting away.
num_frames (int): The number of frames to capture (the total number of entries will be num_frames * num_cameras).
num_cameras (int): The number of cameras to use for capturing the frames, these will be randomized and moved to look at different targets.
disable_render_products_between_captures (bool): If True, the render products will be disabled between captures to save resources.
simulation_duration_between_captures (float): The amount of simulation time to run between data captures.
camera_properties_kwargs (dict): The camera properties to set for the cameras (focal length, focus distance, f-stop, clipping range).
writer_type (str): The writer type to use to write the data to disk. (e.g. PoseWriter or BasicWriter)
writer_kwargs (dict): The writer parameters to use when initializing the writer. (e.g. output_dir, format, use_subfolders)
labeled_assets_and_properties (list): A list of dictionaries with the labeled assets to add to the environment with their properties.
shape_distractors_types (list): A list of shape types to use for the distractors (capsule, cone, cylinder, sphere, cube).
shape_distractors_num (int): The number of shape distractors to add to the environment.
mesh_distractors_urls (list): A list of mesh URLs to use for the distractors (e.g.
/Isaac/Environments/Simple_Warehouse/Props/SM_CardBoxD_04_1847.usd
oromniverse://...
)mesh_distractors_num (int): The number of mesh distractors to add to the environment.
Built-in (Default) Config Dict
The following is the default configuration used in the script, this can be extended and modified to suit the specific requirements of the SDG task:
config = {
"launch_config": {
"renderer": "RayTracedLighting",
"headless": False,
},
"env_url": "",
"working_area_size": (4, 4, 3),
"rt_subframes": 4,
"num_frames": 10,
"num_cameras": 3,
"camera_collider_radius": 1.25,
"disable_render_products_between_captures": False,
"simulation_duration_between_captures": 0.05,
"resolution": (640, 480),
"camera_properties_kwargs": {
"focalLength": 24.0,
"focusDistance": 400,
"fStop": 0.0,
"clippingRange": (0.01, 10000),
},
"camera_look_at_target_offset": 0.15,
"camera_distance_to_target_min_max": (0.25, 0.75),
"writer_type": "PoseWriter",
"writer_kwargs": {
"output_dir": "_out_obj_based_sdg_pose_writer",
"format": None,
"use_subfolders": False,
"write_debug_images": True,
"skip_empty_frames": False,
},
"labeled_assets_and_properties": [
{
"url": "/Isaac/Props/YCB/Axis_Aligned/008_pudding_box.usd",
"label": "pudding_box",
"count": 5,
"floating": True,
"scale_min_max": (0.85, 1.25),
},
{
"url": "/Isaac/Props/YCB/Axis_Aligned_Physics/006_mustard_bottle.usd",
"label": "mustard_bottle",
"count": 7,
"floating": True,
"scale_min_max": (0.85, 1.25),
},
],
"shape_distractors_types": ["capsule", "cone", "cylinder", "sphere", "cube"],
"shape_distractors_scale_min_max": (0.015, 0.15),
"shape_distractors_num": 50,
"mesh_distractors_urls": [
"/Isaac/Environments/Simple_Warehouse/Props/SM_CardBoxD_04_1847.usd",
"/Isaac/Environments/Simple_Warehouse/Props/SM_CardBoxA_01_414.usd",
"/Isaac/Environments/Simple_Warehouse/Props/S_TrafficCone.usd",
],
"mesh_distractors_scale_min_max": (0.35, 1.35),
"mesh_distractors_num": 75,
}
Util Functions#
The following helper functions are used throughout the script to set various prim attributes (colliders, rigid-body dynamics, transforms, etc.) or to generate randomized values (locations, rotations, scales, etc.).
Set 3D Transform Values
The following helper function is used to check and add 3d transform attributes (location, rotation/orientation, scale) to the prim.# Add transformation properties to the prim (if not already present)
def set_transform_attributes(prim, location=None, orientation=None, rotation=None, scale=None):
if location is not None:
if not prim.HasAttribute("xformOp:translate"):
UsdGeom.Xformable(prim).AddTranslateOp()
prim.GetAttribute("xformOp:translate").Set(location)
if orientation is not None:
if not prim.HasAttribute("xformOp:orient"):
UsdGeom.Xformable(prim).AddOrientOp()
prim.GetAttribute("xformOp:orient").Set(orientation)
if rotation is not None:
if not prim.HasAttribute("xformOp:rotateXYZ"):
UsdGeom.Xformable(prim).AddRotateXYZOp()
prim.GetAttribute("xformOp:rotateXYZ").Set(rotation)
if scale is not None:
if not prim.HasAttribute("xformOp:scale"):
UsdGeom.Xformable(prim).AddScaleOp()
prim.GetAttribute("xformOp:scale").Set(scale)
Example usage:
import object_based_sdg_utils
[..]
for i in range(shape_distractors_num):
rand_shape = random.choice(shape_distractors_types)
prim_path = omni.usd.get_stage_next_free_path(stage, f"/World/Distractors/{rand_shape}", False)
prim = stage.DefinePrim(prim_path, rand_shape.capitalize())
object_based_sdg_utils.set_transform_attributes(prim, location=rand_loc, rotation=rand_rot, scale=rand_scale)
Generate 3D Transform Values
The following functions are used to generate random 3D transform values for various scenarios.# Create a random transformation values for location, rotation, and scale
def get_random_transform_values(
loc_min=(0, 0, 0), loc_max=(1, 1, 1), rot_min=(0, 0, 0), rot_max=(360, 360, 360), scale_min_max=(0.1, 1.0)
):
location = (
random.uniform(loc_min[0], loc_max[0]),
random.uniform(loc_min[1], loc_max[1]),
random.uniform(loc_min[2], loc_max[2]),
)
rotation = (
random.uniform(rot_min[0], rot_max[0]),
random.uniform(rot_min[1], rot_max[1]),
random.uniform(rot_min[2], rot_max[2]),
)
scale = tuple([random.uniform(scale_min_max[0], scale_min_max[1])] * 3)
return location, rotation, scale
# Generate a random pose on a sphere looking at the origin
# https://docs.omniverse.nvidia.com/isaacsim/latest/reference_conventions.html
def get_random_pose_on_sphere(origin, radius, camera_forward_axis=(0, 0, -1)):
origin = Gf.Vec3f(origin)
camera_forward_axis = Gf.Vec3f(camera_forward_axis)
# Generate random angles for spherical coordinates
theta = np.random.uniform(0, 2 * np.pi)
phi = np.arcsin(np.random.uniform(-1, 1))
# Spherical to Cartesian conversion
x = radius * np.cos(theta) * np.cos(phi)
y = radius * np.sin(phi)
z = radius * np.sin(theta) * np.cos(phi)
location = origin + Gf.Vec3f(x, y, z)
# Calculate direction vector from camera to look_at point
direction = origin - location
direction_normalized = direction.GetNormalized()
# Calculate rotation from forward direction (rotateFrom) to direction vector (rotateTo)
rotation = Gf.Rotation(Gf.Vec3d(camera_forward_axis), Gf.Vec3d(direction_normalized))
orientation = Gf.Quatf(rotation.GetQuat())
return location, orientation
Example usage:
import object_based_sdg_utils
[..]
for i in range(shape_distractors_num):
rand_loc, rand_rot, rand_scale = object_based_sdg_utils.get_random_transform_values(
loc_min=working_area_min, loc_max=working_area_max, scale_min_max=shape_distractors_scale_min_max
)
[..]
object_based_sdg_utils.set_transform_attributes(prim, location=rand_loc, rotation=rand_rot, scale=rand_scale)
[..]
for cam in cameras:
[..]
# Get a random pose of the camera looking at the target asset from the given distance
cam_loc, quat = object_based_sdg_utils.get_random_pose_on_sphere(origin=target_loc, radius=distance)
object_based_sdg_utils.set_transform_attributes(cam, location=cam_loc, orientation=quat)
Rigid-body Dynamics
The following helper functions are used to check and add colliders and rigid-body dynamics on the root of the prims. This enables physics simulation with collisions on the assets.# Enables collisions with the asset (without rigid body dynamics the asset will be static)
def add_colliders(root_prim):
# Iterate descendant prims (including root) and add colliders to mesh or primitive types
for desc_prim in Usd.PrimRange(root_prim):
if desc_prim.IsA(UsdGeom.Mesh) or desc_prim.IsA(UsdGeom.Gprim):
# Physics
if not desc_prim.HasAPI(UsdPhysics.CollisionAPI):
collision_api = UsdPhysics.CollisionAPI.Apply(desc_prim)
else:
collision_api = UsdPhysics.CollisionAPI(desc_prim)
collision_api.CreateCollisionEnabledAttr(True)
# PhysX
if not desc_prim.HasAPI(PhysxSchema.PhysxCollisionAPI):
physx_collision_api = PhysxSchema.PhysxCollisionAPI.Apply(desc_prim)
else:
physx_collision_api = PhysxSchema.PhysxCollisionAPI(desc_prim)
# Set PhysX specific properties
physx_collision_api.CreateContactOffsetAttr(0.001)
physx_collision_api.CreateRestOffsetAttr(0.0)
# Add mesh specific collision properties only to mesh types
if desc_prim.IsA(UsdGeom.Mesh):
# Add mesh collision properties to the mesh (e.g. collider approximation type)
if not desc_prim.HasAPI(UsdPhysics.MeshCollisionAPI):
mesh_collision_api = UsdPhysics.MeshCollisionAPI.Apply(desc_prim)
else:
mesh_collision_api = UsdPhysics.MeshCollisionAPI(desc_prim)
mesh_collision_api.CreateApproximationAttr().Set("convexHull")
# Check if prim (or its descendants) has colliders
def has_colliders(root_prim):
for desc_prim in Usd.PrimRange(root_prim):
if desc_prim.HasAPI(UsdPhysics.CollisionAPI):
return True
return False
# Enables rigid body dynamics (physics simulation) on the prim
def add_rigid_body_dynamics(prim, disable_gravity=False, angular_damping=None):
if has_colliders(prim):
# Physics
if not prim.HasAPI(UsdPhysics.RigidBodyAPI):
rigid_body_api = UsdPhysics.RigidBodyAPI.Apply(prim)
else:
rigid_body_api = UsdPhysics.RigidBodyAPI(prim)
rigid_body_api.CreateRigidBodyEnabledAttr(True)
# PhysX
if not prim.HasAPI(PhysxSchema.PhysxRigidBodyAPI):
physx_rigid_body_api = PhysxSchema.PhysxRigidBodyAPI.Apply(prim)
else:
physx_rigid_body_api = PhysxSchema.PhysxRigidBodyAPI(prim)
physx_rigid_body_api.GetDisableGravityAttr().Set(disable_gravity)
if angular_damping is not None:
physx_rigid_body_api.CreateAngularDampingAttr().Set(angular_damping)
else:
print(f"Prim '{prim.GetPath()}' has no colliders. Skipping rigid body dynamics properties.")
# Add dynamics properties to the prim (if mesh or primitive) (rigid body to root + colliders to the meshes)
# https://docs.omniverse.nvidia.com/extensions/latest/ext_physics/rigid-bodies.html#rigid-body-simulation
def add_colliders_and_rigid_body_dynamics(prim, disable_gravity=False):
# Add colliders to mesh or primitive types of the descendants of the prim (including root)
add_colliders(prim)
# Add rigid body dynamics properties (to the root only) only if it has colliders
add_rigid_body_dynamics(prim, disable_gravity=disable_gravity)
Example usage:
import object_based_sdg_utils
[..]
for i in range(shape_distractors_num):
rand_shape = random.choice(shape_distractors_types)
prim_path = omni.usd.get_stage_next_free_path(stage, f"/World/Distractors/{rand_shape}", False)
prim = stage.DefinePrim(prim_path, rand_shape.capitalize())
[..]
object_based_sdg_utils.add_colliders(prim)
disable_gravity = random.choice([True, False])
object_based_sdg_utils.add_rigid_body_dynamics(prim, disable_gravity)
Randomizers#
The following snippets show the various randomizations used throughout the script. Isaac Sim/USD based: bounce randomizer, randomizing camera poses, applying custom velocities to assets; and Replicator based: randomizing lights, shape distractors colors, dome background, and floating distractors velocities.
Overlap Triggered Velocity Randomizer
The following snippet simulates a bouncing area above the bottom collision box. The function checks for overlapping objects in the area and applies a random velocity to the objects. The function is triggered every physics update step to check for objects overlapping the ‘bounce’ area.
# Apply a random (mostly) uppwards velocity to the objects overlapping the 'bounce' area
def on_overlap_hit(hit):
prim = stage.GetPrimAtPath(hit.rigid_body)
# Skip the camera collision spheres
if prim not in camera_colliders:
rand_vel = (random.uniform(-2, 2), random.uniform(-2, 2), random.uniform(4, 8))
prim.GetAttribute("physics:velocity").Set(rand_vel)
return True # return True to continue the query
# Area to check for overlapping objects (above the bottom collision box)
overlap_area_thickness = 0.1
overlap_area_origin = (0, 0, (-working_area_size[2] / 2) + (overlap_area_thickness / 2))
overlap_area_extent = (
working_area_size[0] / 2 * 0.99,
working_area_size[1] / 2 * 0.99,
overlap_area_thickness / 2 * 0.99,
)
# Triggered every physics update step to check for overlapping objects
def on_physics_step(dt: float):
hit_info = get_physx_scene_query_interface().overlap_box(
carb.Float3(overlap_area_extent),
carb.Float3(overlap_area_origin),
carb.Float4(0, 0, 0, 1),
on_overlap_hit,
False, # pass 'False' to indicate an 'overlap multiple' query.
)
# Subscribe to the physics step events to check for objects overlapping the 'bounce' area
physx_sub = get_physx_interface().subscribe_physics_step_events(on_physics_step)
Camera Randomization
The camera randomization function uses Isaac Sim/USD API to look at a randomly chosen labeled asset from a randomized distance together with an offset to avoid always looking at the center of the asset. If camera colliders are enabled, the function will temporarily enable them and simulate for a few frames to push out any overlapping objects.
# Create the camera prims and their properties
cameras = []
num_cameras = config.get("num_cameras", 1)
camera_properties_kwargs = config.get("camera_properties_kwargs", {})
for i in range(num_cameras):
# Create camera and add its properties (focal length, focus distance, f-stop, clipping range, etc.)
cam_prim = stage.DefinePrim(f"/World/Cameras/cam_{i}", "Camera")
for key, value in camera_properties_kwargs.items():
if cam_prim.HasAttribute(key):
cam_prim.GetAttribute(key).Set(value)
else:
print(f"Unknown camera attribute with {key}:{value}")
cameras.append(cam_prim)
# Add collision spheres (disabled by default) to cameras to avoid objects overlaping with the camera view
camera_colliders = []
camera_collider_radius = config.get("camera_collider_radius", 0)
if camera_collider_radius > 0:
for cam in cameras:
cam_path = cam.GetPath()
cam_collider = stage.DefinePrim(f"{cam_path}/CollisionSphere", "Sphere")
cam_collider.GetAttribute("radius").Set(camera_collider_radius)
add_colliders(cam_collider)
collision_api = UsdPhysics.CollisionAPI(cam_collider)
collision_api.GetCollisionEnabledAttr().Set(False)
UsdGeom.Imageable(cam_collider).MakeInvisible()
camera_colliders.append(cam_collider)
[..]
def randomize_camera_poses():
for cam in cameras:
# Get a random target asset to look at
target_asset = random.choice(labeled_prims)
# Add a look_at offset so the target is not always in the center of the camera view
loc_offset = (
random.uniform(-camera_look_at_target_offset, camera_look_at_target_offset),
random.uniform(-camera_look_at_target_offset, camera_look_at_target_offset),
random.uniform(-camera_look_at_target_offset, camera_look_at_target_offset),
)
target_loc = target_asset.GetAttribute("xformOp:translate").Get() + loc_offset
# Get a random distance to the target asset
distance = random.uniform(camera_distance_to_target_min_max[0], camera_distance_to_target_min_max[1])
# Get a random pose of the camera looking at the target asset from the given distance
cam_loc, quat = object_based_sdg_utils.get_random_pose_on_sphere(origin=target_loc, radius=distance)
object_based_sdg_utils.set_transform_attributes(cam, location=cam_loc, orientation=quat)
# Temporarily enable camera colliders and simulate for the given number of frames to push out any overlapping objects
def simulate_camera_collision(num_frames=1):
for cam_collider in camera_colliders:
collision_api = UsdPhysics.CollisionAPI(cam_collider)
collision_api.GetCollisionEnabledAttr().Set(True)
if not timeline.is_playing():
timeline.play()
for _ in range(num_frames):
simulation_app.update()
for cam_collider in camera_colliders:
collision_api = UsdPhysics.CollisionAPI(cam_collider)
collision_api.GetCollisionEnabledAttr().Set(False)
Apply Velocities Towards a Target
The following function applies velocities to the prims with a random mangitude towards the given target (center of the working area). This is making sure in the example scenario that the objects don’t drift away and are ocassionally pulled towards the center to clutter the scene.
# Pull assets towards the working area center by applying a random velocity towards the given target
def apply_velocities_towards_target(assets, target=(0, 0, 0)):
for prim in assets:
loc = prim.GetAttribute("xformOp:translate").Get()
strength = random.uniform(0.1, 1.0)
pull_vel = ((target[0] - loc[0]) * strength, (target[1] - loc[1]) * strength, (target[2] - loc[2]) * strength)
prim.GetAttribute("physics:velocity").Set(pull_vel)
Randomize Sphere Lights
The following snippet creates the given number of lights that will be added to a replicator randomization graph that will randomize the lights attributes (color, temperature, intensity, position, scale) when manually triggered (rep.utils.send_og_event(event_name="randomize_lights")
).
# Create a randomizer for lights in the working area, manually triggered at custom events
with rep.trigger.on_custom_event(event_name="randomize_lights"):
lights = rep.create.light(
light_type="Sphere",
color=rep.distribution.uniform((0, 0, 0), (1, 1, 1)),
temperature=rep.distribution.normal(6500, 500),
intensity=rep.distribution.normal(35000, 5000),
position=rep.distribution.uniform(working_area_min, working_area_max),
scale=rep.distribution.uniform(0.1, 1),
count=3,
)
Randomize Shape Distractors Colors
The following snippet creates a randomizer graph for the shape distractors colors, manually triggered at custom events (rep.utils.send_og_event(event_name="randomize_shape_distractor_colors")
. The paths of the shape distractors prims are used to create a graph node representing the distractor prims, which are then used in the built-in Replicator color randomizer (rep.randomizer.color
).
# Create a randomizer for the shape distractors colors, manually triggered at custom events
with rep.trigger.on_custom_event(event_name="randomize_shape_distractor_colors"):
shape_distractors_paths = [prim.GetPath() for prim in chain(floating_shape_distractors, falling_shape_distractors)]
shape_distractors_group = rep.create.group(shape_distractors_paths)
with shape_distractors_group:
rep.randomizer.color(colors=rep.distribution.uniform((0, 0, 0), (1, 1, 1)))
Randomize Floating Distractors Velocities
The following randomizer uses the built-in Replicator rigid-body randomizer (rep.physics.rigid_body
) to apply random velocities and angular velocities to the floating shape distractors. The distractors are wrapped into a replicator group node using their prim paths. The randomizer is then manually triggered at custom events (rep.utils.send_og_event(event_name="randomize_floating_distractor_velocities")
).
# Create a randomizer to apply random velocities to the floating shape distractors
with rep.trigger.on_custom_event(event_name="randomize_floating_distractor_velocities"):
shape_distractors_paths = [prim.GetPath() for prim in chain(floating_shape_distractors, floating_mesh_distractors)]
shape_distractors_group = rep.create.group(shape_distractors_paths)
with shape_distractors_group:
rep.physics.rigid_body(
velocity=rep.distribution.uniform((-2.5, -2.5, -2.5), (2.5, 2.5, 2.5)),
angular_velocity=rep.distribution.uniform((-45, -45, -45), (45, 45, 45)),
)
SDG Loop#
The following snippet shows the main data capture loop that runs the simulation for a given number of frames and captures the data at custom intervals. The loop triggers the randomizations and actions at custom frame intervals (e.g. randomizing camera poses, applying velocities towards the origin, randomizing lights, shape distractors colors, dome background, and floating distractors velocities).
SDG Loop
# Run the simulation and capture data triggering randomizations and actions at custom frame intervals
for i in range(num_frames):
# Cameras will be moved to a random position and look at a randomly selected labeled asset
if i % 3 == 0:
print(f"\t Randomizing camera poses")
randomize_camera_poses()
# Temporarily enable camera colliders and simulate for a few frames to push out any overlapping objects
if camera_colliders:
simulate_camera_collision(num_frames=4)
# Apply a random velocity towards the origin to the working area to pull the assets closer to the center
if i % 10 == 0:
print(f"\t Applying velocity towards the origin")
apply_velocities_towards_target(chain(labeled_prims, shape_distractors, mesh_distractors))
# Randomize lights locations and colors
if i % 5 == 0:
print(f"\t Randomizing lights")
rep.utils.send_og_event(event_name="randomize_lights")
# Randomize the colors of the primitive shape distractors
if i % 15 == 0:
print(f"\t Randomizing shape distractors colors")
rep.utils.send_og_event(event_name="randomize_shape_distractor_colors")
# Randomize the texture of the dome background
if i % 25 == 0:
print(f"\t Randomizing dome background")
rep.utils.send_og_event(event_name="randomize_dome_background")
# Apply a random velocity on the floating distractors (shapes and meshes)
if i % 17 == 0:
print(f"\t Randomizing shape distractors velocities")
rep.utils.send_og_event(event_name="randomize_floating_distractor_velocities")
# Enable render products only at capture time
if disable_render_products_between_captures:
object_based_sdg_utils.set_render_products_updates(render_products, True, include_viewport=False)
# Capture the current frame
print(f"[SDG] Capturing frame {i}/{num_frames}, at simulation time: {timeline.get_current_time():.2f}")
if i % 5 == 0:
capture_with_motion_blur_and_pathtracing(duration=0.025, num_samples=8, spp=128)
else:
rep.orchestrator.step(delta_time=0.0, rt_subframes=rt_subframes, pause_timeline=False)
# Disable render products between captures
if disable_render_products_between_captures:
object_based_sdg_utils.set_render_products_updates(render_products, False, include_viewport=False)
# Run the simulation for a given duration between frame captures
if sim_duration_between_captures > 0:
run_simulation_loop(duration=sim_duration_between_captures)
else:
simulation_app.update()
# Wait for the data to be written to disk
rep.orchestrator.wait_until_complete()
Motion Blur#
The following snippet captures the frames using path tracing and motion blur, it selects the duration of the movement to capture and the number of frames to combine.
Example of a captured frame using motion blur and path tracing:
Motion Blur
# Capture motion blur by combining the number of pathtraced subframes samples simulated for the given duration
async def capture_with_motion_blur_and_pathtracing_async(duration=0.05, num_samples=8, spp=64):
# For small step sizes the physics FPS needs to be temporarily increased to provide movements every syb sample
orig_physics_fps = physx_scene.GetTimeStepsPerSecondAttr().Get()
target_physics_fps = 1 / duration * num_samples
if target_physics_fps > orig_physics_fps:
print(f"[SDG] Changing physics FPS from {orig_physics_fps} to {target_physics_fps}")
physx_scene.GetTimeStepsPerSecondAttr().Set(target_physics_fps)
# Enable motion blur (if not enabled)
is_motion_blur_enabled = carb.settings.get_settings().get("/omni/replicator/captureMotionBlur")
if not is_motion_blur_enabled:
carb.settings.get_settings().set("/omni/replicator/captureMotionBlur", True)
# Number of sub samples to render for motion blur in PathTracing mode
carb.settings.get_settings().set("/omni/replicator/pathTracedMotionBlurSubSamples", num_samples)
# Set the render mode to PathTracing
prev_render_mode = carb.settings.get_settings().get("/rtx/rendermode")
carb.settings.get_settings().set("/rtx/rendermode", "PathTracing")
carb.settings.get_settings().set("/rtx/pathtracing/spp", spp)
carb.settings.get_settings().set("/rtx/pathtracing/totalSpp", spp)
carb.settings.get_settings().set("/rtx/pathtracing/optixDenoiser/enabled", 0)
# Make sure the timeline is playing
if not timeline.is_playing():
timeline.play()
# Capture the frame by advancing the simulation for the given duration and combining the sub samples
await rep.orchestrator.step_async(delta_time=duration, pause_timeline=False)
# Restore the original physics FPS
if target_physics_fps > orig_physics_fps:
print(f"[SDG] Restoring physics FPS from {target_physics_fps} to {orig_physics_fps}")
physx_scene.GetTimeStepsPerSecondAttr().Set(orig_physics_fps)
# Restore the previous render and motion blur settings
carb.settings.get_settings().set("/omni/replicator/captureMotionBlur", is_motion_blur_enabled)
print(f"[SDG] Restoring render mode from 'PathTracing' to '{prev_render_mode}'")
carb.settings.get_settings().set("/rtx/rendermode", prev_render_mode)
[..]
# Run the simulation and capture data triggering randomizations and actions at custom frame intervals
for i in range(num_frames):
[..]
# Capture the current frame
print(f"[SDG] Capturing frame {i}/{num_frames}, at simulation time: {timeline.get_current_time():.2f}")
if i % 5 == 0:
capture_with_motion_blur_and_pathtracing(duration=0.025, num_samples=8, spp=128)
else:
rep.orchestrator.step(delta_time=0.0, rt_subframes=rt_subframes, pause_timeline=False)
Performance Optimization#
To optimize the performance of the SDG pipeline, especially if there are many frames computed between captures, the render products (rendering and processing) can be disabled by default and only enabled during the capture time. This can be achieved by setting the disable_render_products_between_captures
parameter to True in the configuration. Setting the include_viewport
argument to True in the set_render_products_updates
function will also disable the viewport (UI) rendering, this will disable any live feedback in the viewport during the simulation, this can be especially useful if the pipeline is running on a headless server.
Toggle Render Products
# Enable or disable the render products and viewport rendering
def set_render_products_updates(render_products, enabled, include_viewport=False):
for rp in render_products:
rp.hydra_texture.set_updates_enabled(enabled)
if include_viewport:
get_active_viewport().updates_enabled = enabled
[..]
# Run the simulation and capture data triggering randomizations and actions at custom frame intervals
for i in range(num_frames):
[..]
# Enable render products only at capture time
if disable_render_products_between_captures:
object_based_sdg_utils.set_render_products_updates(render_products, True, include_viewport=False)
# Capture the current frame
print(f"[SDG] Capturing frame {i}/{num_frames}, at simulation time: {timeline.get_current_time():.2f}")
if i % 5 == 0:
capture_with_motion_blur_and_pathtracing(duration=0.025, num_samples=8, spp=128)
else:
rep.orchestrator.step(delta_time=0.0, rt_subframes=rt_subframes, pause_timeline=False)
# Disable render products between captures
if disable_render_products_between_captures:
object_based_sdg_utils.set_render_products_updates(render_products, False, include_viewport=False)
# Run the simulation for a given duration between frame captures
if sim_duration_between_captures > 0:
run_simulation_loop(duration=sim_duration_between_captures)
else:
simulation_app.update()
# Wait for the data to be written to disk
rep.orchestrator.wait_until_complete()
Writer#
By default the script uses the PoseWriter
writer to write the data to disk. The writer parameters are as follows:
output_dir (str): The output directory to write the data to
format (str): The format to use for the output files (e.g. CenterPose, DOPE), if None a default format will be used writing all the available data.
use_subfolders (bool): If True, the data will be written to subfolders based on the camera name.
write_debug_images (bool): If True, debug images will also be written (e.g. bounding box overlays).
skip_empty_frames (bool): If True, empty frames will be skipped when writing the data.
The PoseWriter
implementation can be found in the pose_writer.py
file in the omni.replicator.isaac
extension. Examples of using various output formats can be found in the /config/object_based_sdg_dope_config.yaml
and /config/object_based_sdg_centerpose_config.yaml
configuration files. Where the format
parameter is set to dope and centerpose respectively.
In order to use a custom writer, the writer_type
and writer_kwargs
parameters can be set in the config files or in the script to load a custom writer implementation.
"writer_type": "MyCustomWriter",
"writer_kwargs": {
"arg1": "val1",
"arg2": "val2",
"argn": "valn",
}
SyntheticaDETR#
SyntheticaDETR is a 2D object detection network aimed to detect indoor objects in RGB images. It is built on top of RT-DETR, a state of the art 2D object detection network on COCO dataset, with training done on data collected entirely in simulation using the Isaac Sim Replicator. As of today SyntheticaDETR is the top performing object detection network on the BOP leaderboard for YCBV dataset.
Leaderboard link: https://bop.felk.cvut.cz/leaderboards/detection-bop22/ycb-v/
Data Generation#
We generate data using Isaac Sim and Replicator with procedurally generated scenes. Objects are dropped from ceilings and simulation is run with physics enabled to avoid interpenetrations to allow for objects to settle into stable configurations on the floor. The RGB renderings are captured during the process along with the ground truth segmentation, depth and bounding boxes of visible objects in the view frustum. The image and ground truth pair are used to train networks using supervised learning.
Data Generation with Real World Asset Capture#
While the above data generation process is suited for objects with known 3D assets already available in digital form e.g. USD, OBJ format etc., there are scenarios where such assets are not available a priori.
Therefore, we use the AR Code app for iPad/iPhone to capture the assets. The app uses LiDAR and multiple images captured from various diverse viewpoints to obtain the 3D asset model directly in USD format suited for rendering with the Isaac Sim and Replicator.
Below we show the asset models captured via the app and visualized from different viewpoints.
These assets were used in the Synthetica rendering framework to obtain rendered images
The results of the detector trained on this synthetic data and tested directly on the real world images are shown below:
The numbers next to the labels on the bounding boxes represent the confidence values with which the detector is certain about the identity of the object.
SyntheticaDETR Model and Isaac ROS RT-DETR#
The SyntheticaDETR model is available in the NGC Catalog at the following link: SyntheticaDETR in NGC
Furthermore, to run the model in ROS, refer to this thorough tutorial: Isaac ROS RT-DETR Tutorial