diff --git a/source/isaaclab/config/extension.toml b/source/isaaclab/config/extension.toml index 21ba73bcd95..d5cc08d815e 100644 --- a/source/isaaclab/config/extension.toml +++ b/source/isaaclab/config/extension.toml @@ -1,7 +1,7 @@ [package] # Note: Semantic Versioning is used: https://semver.org/ -version = "0.45.0" +version = "0.45.1" # Description diff --git a/source/isaaclab/docs/CHANGELOG.rst b/source/isaaclab/docs/CHANGELOG.rst index dbd4a090bec..0acebaf7131 100644 --- a/source/isaaclab/docs/CHANGELOG.rst +++ b/source/isaaclab/docs/CHANGELOG.rst @@ -1,6 +1,20 @@ Changelog --------- +0.45.1 (2025-08-16) +~~~~~~~~~~~~~~~~~~~ + +Added +^^^^^ + +* Added validations for scale-based randomization ranges across mass, actuator, joint, and tendon parameters. + +Changed +^^^^^^^ + +* Refactored randomization functions into classes with initialization-time checks to avoid runtime overhead. + + 0.45.0 (2025-08-07) ~~~~~~~~~~~~~~~~~~~ diff --git a/source/isaaclab/isaaclab/envs/mdp/events.py b/source/isaaclab/isaaclab/envs/mdp/events.py index de40a71f902..2dd34d4a36a 100644 --- a/source/isaaclab/isaaclab/envs/mdp/events.py +++ b/source/isaaclab/isaaclab/envs/mdp/events.py @@ -278,15 +278,7 @@ def __call__( self.asset.root_physx_view.set_material_properties(materials, env_ids) -def randomize_rigid_body_mass( - env: ManagerBasedEnv, - env_ids: torch.Tensor | None, - asset_cfg: SceneEntityCfg, - mass_distribution_params: tuple[float, float], - operation: Literal["add", "scale", "abs"], - distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform", - recompute_inertia: bool = True, -): +class randomize_rigid_body_mass(ManagerTermBase): """Randomize the mass of the bodies by adding, scaling, or setting random values. This function allows randomizing the mass of the bodies of the asset. The function samples random values from the @@ -301,56 +293,94 @@ def randomize_rigid_body_mass( This function uses CPU tensors to assign the body masses. It is recommended to use this function only during the initialization of the environment. """ - # extract the used quantities (to enable type-hinting) - asset: RigidObject | Articulation = env.scene[asset_cfg.name] - # resolve environment ids - if env_ids is None: - env_ids = torch.arange(env.scene.num_envs, device="cpu") - else: - env_ids = env_ids.cpu() + def __init__(self, cfg: EventTermCfg, env: ManagerBasedEnv): + """Initialize the term. - # resolve body indices - if asset_cfg.body_ids == slice(None): - body_ids = torch.arange(asset.num_bodies, dtype=torch.int, device="cpu") - else: - body_ids = torch.tensor(asset_cfg.body_ids, dtype=torch.int, device="cpu") + Args: + cfg: The configuration of the event term. + env: The environment instance. - # get the current masses of the bodies (num_assets, num_bodies) - masses = asset.root_physx_view.get_masses() + Raises: + TypeError: If `params` is not a tuple of two numbers. + ValueError: If the operation is not supported. + ValueError: If the lower bound is negative or zero when not allowed. + ValueError: If the upper bound is less than the lower bound. + """ + super().__init__(cfg, env) - # apply randomization on default values - # this is to make sure when calling the function multiple times, the randomization is applied on the - # default values and not the previously randomized values - masses[env_ids[:, None], body_ids] = asset.data.default_mass[env_ids[:, None], body_ids].clone() + # extract the used quantities (to enable type-hinting) + self.asset_cfg: SceneEntityCfg = cfg.params["asset_cfg"] + self.asset: RigidObject | Articulation = env.scene[self.asset_cfg.name] + # check for valid operation + if cfg.params["operation"] == "scale": + if "mass_distribution_params" in cfg.params: + _validate_scale_range( + cfg.params["mass_distribution_params"], "mass_distribution_params", allow_zero=False + ) + elif cfg.params["operation"] not in ("abs", "add"): + raise ValueError( + "Randomization term 'randomize_rigid_body_mass' does not support operation:" + f" '{cfg.params['operation']}'." + ) - # sample from the given range - # note: we modify the masses in-place for all environments - # however, the setter takes care that only the masses of the specified environments are modified - masses = _randomize_prop_by_op( - masses, mass_distribution_params, env_ids, body_ids, operation=operation, distribution=distribution - ) + def __call__( + self, + env: ManagerBasedEnv, + env_ids: torch.Tensor | None, + asset_cfg: SceneEntityCfg, + mass_distribution_params: tuple[float, float], + operation: Literal["add", "scale", "abs"], + distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform", + recompute_inertia: bool = True, + ): + # resolve environment ids + if env_ids is None: + env_ids = torch.arange(env.scene.num_envs, device="cpu") + else: + env_ids = env_ids.cpu() - # set the mass into the physics simulation - asset.root_physx_view.set_masses(masses, env_ids) - - # recompute inertia tensors if needed - if recompute_inertia: - # compute the ratios of the new masses to the initial masses - ratios = masses[env_ids[:, None], body_ids] / asset.data.default_mass[env_ids[:, None], body_ids] - # scale the inertia tensors by the the ratios - # since mass randomization is done on default values, we can use the default inertia tensors - inertias = asset.root_physx_view.get_inertias() - if isinstance(asset, Articulation): - # inertia has shape: (num_envs, num_bodies, 9) for articulation - inertias[env_ids[:, None], body_ids] = ( - asset.data.default_inertia[env_ids[:, None], body_ids] * ratios[..., None] - ) + # resolve body indices + if self.asset_cfg.body_ids == slice(None): + body_ids = torch.arange(self.asset.num_bodies, dtype=torch.int, device="cpu") else: - # inertia has shape: (num_envs, 9) for rigid object - inertias[env_ids] = asset.data.default_inertia[env_ids] * ratios - # set the inertia tensors into the physics simulation - asset.root_physx_view.set_inertias(inertias, env_ids) + body_ids = torch.tensor(self.asset_cfg.body_ids, dtype=torch.int, device="cpu") + + # get the current masses of the bodies (num_assets, num_bodies) + masses = self.asset.root_physx_view.get_masses() + + # apply randomization on default values + # this is to make sure when calling the function multiple times, the randomization is applied on the + # default values and not the previously randomized values + masses[env_ids[:, None], body_ids] = self.asset.data.default_mass[env_ids[:, None], body_ids].clone() + + # sample from the given range + # note: we modify the masses in-place for all environments + # however, the setter takes care that only the masses of the specified environments are modified + masses = _randomize_prop_by_op( + masses, mass_distribution_params, env_ids, body_ids, operation=operation, distribution=distribution + ) + + # set the mass into the physics simulation + self.asset.root_physx_view.set_masses(masses, env_ids) + + # recompute inertia tensors if needed + if recompute_inertia: + # compute the ratios of the new masses to the initial masses + ratios = masses[env_ids[:, None], body_ids] / self.asset.data.default_mass[env_ids[:, None], body_ids] + # scale the inertia tensors by the the ratios + # since mass randomization is done on default values, we can use the default inertia tensors + inertias = self.asset.root_physx_view.get_inertias() + if isinstance(self.asset, Articulation): + # inertia has shape: (num_envs, num_bodies, 9) for articulation + inertias[env_ids[:, None], body_ids] = ( + self.asset.data.default_inertia[env_ids[:, None], body_ids] * ratios[..., None] + ) + else: + # inertia has shape: (num_envs, 9) for rigid object + inertias[env_ids] = self.asset.data.default_inertia[env_ids] * ratios + # set the inertia tensors into the physics simulation + self.asset.root_physx_view.set_inertias(inertias, env_ids) def randomize_rigid_body_com( @@ -494,15 +524,7 @@ def randomize_physics_scene_gravity( physics_sim_view.set_gravity(carb.Float3(*gravity)) -def randomize_actuator_gains( - env: ManagerBasedEnv, - env_ids: torch.Tensor | None, - asset_cfg: SceneEntityCfg, - stiffness_distribution_params: tuple[float, float] | None = None, - damping_distribution_params: tuple[float, float] | None = None, - operation: Literal["add", "scale", "abs"] = "abs", - distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform", -): +class randomize_actuator_gains(ManagerTermBase): """Randomize the actuator gains in an articulation by adding, scaling, or setting random values. This function allows randomizing the actuator stiffness and damping gains. @@ -515,69 +537,103 @@ def randomize_actuator_gains( For implicit actuators, this function uses CPU tensors to assign the actuator gains into the simulation. In such cases, it is recommended to use this function only during the initialization of the environment. """ - # Extract the used quantities (to enable type-hinting) - asset: Articulation = env.scene[asset_cfg.name] - # Resolve environment ids - if env_ids is None: - env_ids = torch.arange(env.scene.num_envs, device=asset.device) + def __init__(self, cfg: EventTermCfg, env: ManagerBasedEnv): + """Initialize the term. - def randomize(data: torch.Tensor, params: tuple[float, float]) -> torch.Tensor: - return _randomize_prop_by_op( - data, params, dim_0_ids=None, dim_1_ids=actuator_indices, operation=operation, distribution=distribution - ) + Args: + cfg: The configuration of the event term. + env: The environment instance. + + Raises: + TypeError: If `params` is not a tuple of two numbers. + ValueError: If the operation is not supported. + ValueError: If the lower bound is negative or zero when not allowed. + ValueError: If the upper bound is less than the lower bound. + """ + super().__init__(cfg, env) + + # extract the used quantities (to enable type-hinting) + self.asset_cfg: SceneEntityCfg = cfg.params["asset_cfg"] + self.asset: RigidObject | Articulation = env.scene[self.asset_cfg.name] + # check for valid operation + if cfg.params["operation"] == "scale": + if "stiffness_distribution_params" in cfg.params: + _validate_scale_range( + cfg.params["stiffness_distribution_params"], "stiffness_distribution_params", allow_zero=False + ) + if "damping_distribution_params" in cfg.params: + _validate_scale_range(cfg.params["damping_distribution_params"], "damping_distribution_params") + elif cfg.params["operation"] not in ("abs", "add"): + raise ValueError( + "Randomization term 'randomize_actuator_gains' does not support operation:" + f" '{cfg.params['operation']}'." + ) + + def __call__( + self, + env: ManagerBasedEnv, + env_ids: torch.Tensor | None, + asset_cfg: SceneEntityCfg, + stiffness_distribution_params: tuple[float, float] | None = None, + damping_distribution_params: tuple[float, float] | None = None, + operation: Literal["add", "scale", "abs"] = "abs", + distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform", + ): + # Resolve environment ids + if env_ids is None: + env_ids = torch.arange(env.scene.num_envs, device=self.asset.device) + + def randomize(data: torch.Tensor, params: tuple[float, float]) -> torch.Tensor: + return _randomize_prop_by_op( + data, params, dim_0_ids=None, dim_1_ids=actuator_indices, operation=operation, distribution=distribution + ) - # Loop through actuators and randomize gains - for actuator in asset.actuators.values(): - if isinstance(asset_cfg.joint_ids, slice): - # we take all the joints of the actuator - actuator_indices = slice(None) - if isinstance(actuator.joint_indices, slice): - global_indices = slice(None) + # Loop through actuators and randomize gains + for actuator in self.asset.actuators.values(): + if isinstance(self.asset_cfg.joint_ids, slice): + # we take all the joints of the actuator + actuator_indices = slice(None) + if isinstance(actuator.joint_indices, slice): + global_indices = slice(None) + else: + global_indices = torch.tensor(actuator.joint_indices, device=self.asset.device) + elif isinstance(actuator.joint_indices, slice): + # we take the joints defined in the asset config + global_indices = actuator_indices = torch.tensor(self.asset_cfg.joint_ids, device=self.asset.device) else: - global_indices = torch.tensor(actuator.joint_indices, device=asset.device) - elif isinstance(actuator.joint_indices, slice): - # we take the joints defined in the asset config - global_indices = actuator_indices = torch.tensor(asset_cfg.joint_ids, device=asset.device) - else: - # we take the intersection of the actuator joints and the asset config joints - actuator_joint_indices = torch.tensor(actuator.joint_indices, device=asset.device) - asset_joint_ids = torch.tensor(asset_cfg.joint_ids, device=asset.device) - # the indices of the joints in the actuator that have to be randomized - actuator_indices = torch.nonzero(torch.isin(actuator_joint_indices, asset_joint_ids)).view(-1) - if len(actuator_indices) == 0: - continue - # maps actuator indices that have to be randomized to global joint indices - global_indices = actuator_joint_indices[actuator_indices] - # Randomize stiffness - if stiffness_distribution_params is not None: - stiffness = actuator.stiffness[env_ids].clone() - stiffness[:, actuator_indices] = asset.data.default_joint_stiffness[env_ids][:, global_indices].clone() - randomize(stiffness, stiffness_distribution_params) - actuator.stiffness[env_ids] = stiffness - if isinstance(actuator, ImplicitActuator): - asset.write_joint_stiffness_to_sim(stiffness, joint_ids=actuator.joint_indices, env_ids=env_ids) - # Randomize damping - if damping_distribution_params is not None: - damping = actuator.damping[env_ids].clone() - damping[:, actuator_indices] = asset.data.default_joint_damping[env_ids][:, global_indices].clone() - randomize(damping, damping_distribution_params) - actuator.damping[env_ids] = damping - if isinstance(actuator, ImplicitActuator): - asset.write_joint_damping_to_sim(damping, joint_ids=actuator.joint_indices, env_ids=env_ids) + # we take the intersection of the actuator joints and the asset config joints + actuator_joint_indices = torch.tensor(actuator.joint_indices, device=self.asset.device) + asset_joint_ids = torch.tensor(self.asset_cfg.joint_ids, device=self.asset.device) + # the indices of the joints in the actuator that have to be randomized + actuator_indices = torch.nonzero(torch.isin(actuator_joint_indices, asset_joint_ids)).view(-1) + if len(actuator_indices) == 0: + continue + # maps actuator indices that have to be randomized to global joint indices + global_indices = actuator_joint_indices[actuator_indices] + # Randomize stiffness + if stiffness_distribution_params is not None: + stiffness = actuator.stiffness[env_ids].clone() + stiffness[:, actuator_indices] = self.asset.data.default_joint_stiffness[env_ids][ + :, global_indices + ].clone() + randomize(stiffness, stiffness_distribution_params) + actuator.stiffness[env_ids] = stiffness + if isinstance(actuator, ImplicitActuator): + self.asset.write_joint_stiffness_to_sim( + stiffness, joint_ids=actuator.joint_indices, env_ids=env_ids + ) + # Randomize damping + if damping_distribution_params is not None: + damping = actuator.damping[env_ids].clone() + damping[:, actuator_indices] = self.asset.data.default_joint_damping[env_ids][:, global_indices].clone() + randomize(damping, damping_distribution_params) + actuator.damping[env_ids] = damping + if isinstance(actuator, ImplicitActuator): + self.asset.write_joint_damping_to_sim(damping, joint_ids=actuator.joint_indices, env_ids=env_ids) -def randomize_joint_parameters( - env: ManagerBasedEnv, - env_ids: torch.Tensor | None, - asset_cfg: SceneEntityCfg, - friction_distribution_params: tuple[float, float] | None = None, - armature_distribution_params: tuple[float, float] | None = None, - lower_limit_distribution_params: tuple[float, float] | None = None, - upper_limit_distribution_params: tuple[float, float] | None = None, - operation: Literal["add", "scale", "abs"] = "abs", - distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform", -): +class randomize_joint_parameters(ManagerTermBase): """Randomize the simulated joint parameters of an articulation by adding, scaling, or setting random values. This function allows randomizing the joint parameters of the asset. These correspond to the physics engine @@ -592,97 +648,126 @@ def randomize_joint_parameters( This function uses CPU tensors to assign the joint properties. It is recommended to use this function only during the initialization of the environment. """ - # extract the used quantities (to enable type-hinting) - asset: Articulation = env.scene[asset_cfg.name] - # resolve environment ids - if env_ids is None: - env_ids = torch.arange(env.scene.num_envs, device=asset.device) + def __init__(self, cfg: EventTermCfg, env: ManagerBasedEnv): + """Initialize the term. - # resolve joint indices - if asset_cfg.joint_ids == slice(None): - joint_ids = slice(None) # for optimization purposes - else: - joint_ids = torch.tensor(asset_cfg.joint_ids, dtype=torch.int, device=asset.device) - - # sample joint properties from the given ranges and set into the physics simulation - # joint friction coefficient - if friction_distribution_params is not None: - friction_coeff = _randomize_prop_by_op( - asset.data.default_joint_friction_coeff.clone(), - friction_distribution_params, - env_ids, - joint_ids, - operation=operation, - distribution=distribution, - ) - asset.write_joint_friction_coefficient_to_sim( - friction_coeff[env_ids[:, None], joint_ids], joint_ids=joint_ids, env_ids=env_ids - ) + Args: + cfg: The configuration of the event term. + env: The environment instance. - # joint armature - if armature_distribution_params is not None: - armature = _randomize_prop_by_op( - asset.data.default_joint_armature.clone(), - armature_distribution_params, - env_ids, - joint_ids, - operation=operation, - distribution=distribution, - ) - asset.write_joint_armature_to_sim(armature[env_ids[:, None], joint_ids], joint_ids=joint_ids, env_ids=env_ids) - - # joint position limits - if lower_limit_distribution_params is not None or upper_limit_distribution_params is not None: - joint_pos_limits = asset.data.default_joint_pos_limits.clone() - # -- randomize the lower limits - if lower_limit_distribution_params is not None: - joint_pos_limits[..., 0] = _randomize_prop_by_op( - joint_pos_limits[..., 0], - lower_limit_distribution_params, + Raises: + TypeError: If `params` is not a tuple of two numbers. + ValueError: If the operation is not supported. + ValueError: If the lower bound is negative or zero when not allowed. + ValueError: If the upper bound is less than the lower bound. + """ + super().__init__(cfg, env) + + # extract the used quantities (to enable type-hinting) + self.asset_cfg: SceneEntityCfg = cfg.params["asset_cfg"] + self.asset: RigidObject | Articulation = env.scene[self.asset_cfg.name] + # check for valid operation + if cfg.params["operation"] == "scale": + if "friction_distribution_params" in cfg.params: + _validate_scale_range(cfg.params["friction_distribution_params"], "friction_distribution_params") + if "armature_distribution_params" in cfg.params: + _validate_scale_range(cfg.params["armature_distribution_params"], "armature_distribution_params") + elif cfg.params["operation"] not in ("abs", "add"): + raise ValueError( + "Randomization term 'randomize_fixed_tendon_parameters' does not support operation:" + f" '{cfg.params['operation']}'." + ) + + def __call__( + self, + env: ManagerBasedEnv, + env_ids: torch.Tensor | None, + asset_cfg: SceneEntityCfg, + friction_distribution_params: tuple[float, float] | None = None, + armature_distribution_params: tuple[float, float] | None = None, + lower_limit_distribution_params: tuple[float, float] | None = None, + upper_limit_distribution_params: tuple[float, float] | None = None, + operation: Literal["add", "scale", "abs"] = "abs", + distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform", + ): + # resolve environment ids + if env_ids is None: + env_ids = torch.arange(env.scene.num_envs, device=self.asset.device) + + # resolve joint indices + if self.asset_cfg.joint_ids == slice(None): + joint_ids = slice(None) # for optimization purposes + else: + joint_ids = torch.tensor(self.asset_cfg.joint_ids, dtype=torch.int, device=self.asset.device) + + # sample joint properties from the given ranges and set into the physics simulation + # joint friction coefficient + if friction_distribution_params is not None: + friction_coeff = _randomize_prop_by_op( + self.asset.data.default_joint_friction_coeff.clone(), + friction_distribution_params, env_ids, joint_ids, operation=operation, distribution=distribution, ) - # -- randomize the upper limits - if upper_limit_distribution_params is not None: - joint_pos_limits[..., 1] = _randomize_prop_by_op( - joint_pos_limits[..., 1], - upper_limit_distribution_params, + self.asset.write_joint_friction_coefficient_to_sim( + friction_coeff[env_ids[:, None], joint_ids], joint_ids=joint_ids, env_ids=env_ids + ) + + # joint armature + if armature_distribution_params is not None: + armature = _randomize_prop_by_op( + self.asset.data.default_joint_armature.clone(), + armature_distribution_params, env_ids, joint_ids, operation=operation, distribution=distribution, ) + self.asset.write_joint_armature_to_sim( + armature[env_ids[:, None], joint_ids], joint_ids=joint_ids, env_ids=env_ids + ) - # extract the position limits for the concerned joints - joint_pos_limits = joint_pos_limits[env_ids[:, None], joint_ids] - if (joint_pos_limits[..., 0] > joint_pos_limits[..., 1]).any(): - raise ValueError( - "Randomization term 'randomize_joint_parameters' is setting lower joint limits that are greater than" - " upper joint limits. Please check the distribution parameters for the joint position limits." + # joint position limits + if lower_limit_distribution_params is not None or upper_limit_distribution_params is not None: + joint_pos_limits = self.asset.data.default_joint_pos_limits.clone() + # -- randomize the lower limits + if lower_limit_distribution_params is not None: + joint_pos_limits[..., 0] = _randomize_prop_by_op( + joint_pos_limits[..., 0], + lower_limit_distribution_params, + env_ids, + joint_ids, + operation=operation, + distribution=distribution, + ) + # -- randomize the upper limits + if upper_limit_distribution_params is not None: + joint_pos_limits[..., 1] = _randomize_prop_by_op( + joint_pos_limits[..., 1], + upper_limit_distribution_params, + env_ids, + joint_ids, + operation=operation, + distribution=distribution, + ) + + # extract the position limits for the concerned joints + joint_pos_limits = joint_pos_limits[env_ids[:, None], joint_ids] + if (joint_pos_limits[..., 0] > joint_pos_limits[..., 1]).any(): + raise ValueError( + "Randomization term 'randomize_joint_parameters' is setting lower joint limits that are greater" + " than upper joint limits. Please check the distribution parameters for the joint position limits." + ) + # set the position limits into the physics simulation + self.asset.write_joint_position_limit_to_sim( + joint_pos_limits, joint_ids=joint_ids, env_ids=env_ids, warn_limit_violation=False ) - # set the position limits into the physics simulation - asset.write_joint_position_limit_to_sim( - joint_pos_limits, joint_ids=joint_ids, env_ids=env_ids, warn_limit_violation=False - ) -def randomize_fixed_tendon_parameters( - env: ManagerBasedEnv, - env_ids: torch.Tensor | None, - asset_cfg: SceneEntityCfg, - stiffness_distribution_params: tuple[float, float] | None = None, - damping_distribution_params: tuple[float, float] | None = None, - limit_stiffness_distribution_params: tuple[float, float] | None = None, - lower_limit_distribution_params: tuple[float, float] | None = None, - upper_limit_distribution_params: tuple[float, float] | None = None, - rest_length_distribution_params: tuple[float, float] | None = None, - offset_distribution_params: tuple[float, float] | None = None, - operation: Literal["add", "scale", "abs"] = "abs", - distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform", -): +class randomize_fixed_tendon_parameters(ManagerTermBase): """Randomize the simulated fixed tendon parameters of an articulation by adding, scaling, or setting random values. This function allows randomizing the fixed tendon parameters of the asset. @@ -693,115 +778,166 @@ def randomize_fixed_tendon_parameters( particular property, the function does not modify the property. """ - # extract the used quantities (to enable type-hinting) - asset: Articulation = env.scene[asset_cfg.name] - # resolve environment ids - if env_ids is None: - env_ids = torch.arange(env.scene.num_envs, device=asset.device) + def __init__(self, cfg: EventTermCfg, env: ManagerBasedEnv): + """Initialize the term. - # resolve joint indices - if asset_cfg.fixed_tendon_ids == slice(None): - tendon_ids = slice(None) # for optimization purposes - else: - tendon_ids = torch.tensor(asset_cfg.fixed_tendon_ids, dtype=torch.int, device=asset.device) - - # sample tendon properties from the given ranges and set into the physics simulation - # stiffness - if stiffness_distribution_params is not None: - stiffness = _randomize_prop_by_op( - asset.data.default_fixed_tendon_stiffness.clone(), - stiffness_distribution_params, - env_ids, - tendon_ids, - operation=operation, - distribution=distribution, - ) - asset.set_fixed_tendon_stiffness(stiffness[env_ids[:, None], tendon_ids], tendon_ids, env_ids) - - # damping - if damping_distribution_params is not None: - damping = _randomize_prop_by_op( - asset.data.default_fixed_tendon_damping.clone(), - damping_distribution_params, - env_ids, - tendon_ids, - operation=operation, - distribution=distribution, - ) - asset.set_fixed_tendon_damping(damping[env_ids[:, None], tendon_ids], tendon_ids, env_ids) - - # limit stiffness - if limit_stiffness_distribution_params is not None: - limit_stiffness = _randomize_prop_by_op( - asset.data.default_fixed_tendon_limit_stiffness.clone(), - limit_stiffness_distribution_params, - env_ids, - tendon_ids, - operation=operation, - distribution=distribution, - ) - asset.set_fixed_tendon_limit_stiffness(limit_stiffness[env_ids[:, None], tendon_ids], tendon_ids, env_ids) - - # position limits - if lower_limit_distribution_params is not None or upper_limit_distribution_params is not None: - limit = asset.data.default_fixed_tendon_pos_limits.clone() - # -- lower limit - if lower_limit_distribution_params is not None: - limit[..., 0] = _randomize_prop_by_op( - limit[..., 0], - lower_limit_distribution_params, + Args: + cfg: The configuration of the event term. + env: The environment instance. + + Raises: + TypeError: If `params` is not a tuple of two numbers. + ValueError: If the operation is not supported. + ValueError: If the lower bound is negative or zero when not allowed. + ValueError: If the upper bound is less than the lower bound. + """ + super().__init__(cfg, env) + + # extract the used quantities (to enable type-hinting) + self.asset_cfg: SceneEntityCfg = cfg.params["asset_cfg"] + self.asset: RigidObject | Articulation = env.scene[self.asset_cfg.name] + # check for valid operation + if cfg.params["operation"] == "scale": + if "stiffness_distribution_params" in cfg.params: + _validate_scale_range( + cfg.params["stiffness_distribution_params"], "stiffness_distribution_params", allow_zero=False + ) + if "damping_distribution_params" in cfg.params: + _validate_scale_range(cfg.params["damping_distribution_params"], "damping_distribution_params") + if "limit_stiffness_distribution_params" in cfg.params: + _validate_scale_range( + cfg.params["limit_stiffness_distribution_params"], "limit_stiffness_distribution_params" + ) + elif cfg.params["operation"] not in ("abs", "add"): + raise ValueError( + "Randomization term 'randomize_fixed_tendon_parameters' does not support operation:" + f" '{cfg.params['operation']}'." + ) + + def __call__( + self, + env: ManagerBasedEnv, + env_ids: torch.Tensor | None, + asset_cfg: SceneEntityCfg, + stiffness_distribution_params: tuple[float, float] | None = None, + damping_distribution_params: tuple[float, float] | None = None, + limit_stiffness_distribution_params: tuple[float, float] | None = None, + lower_limit_distribution_params: tuple[float, float] | None = None, + upper_limit_distribution_params: tuple[float, float] | None = None, + rest_length_distribution_params: tuple[float, float] | None = None, + offset_distribution_params: tuple[float, float] | None = None, + operation: Literal["add", "scale", "abs"] = "abs", + distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform", + ): + # resolve environment ids + if env_ids is None: + env_ids = torch.arange(env.scene.num_envs, device=self.asset.device) + + # resolve joint indices + if self.asset_cfg.fixed_tendon_ids == slice(None): + tendon_ids = slice(None) # for optimization purposes + else: + tendon_ids = torch.tensor(self.asset_cfg.fixed_tendon_ids, dtype=torch.int, device=self.asset.device) + + # sample tendon properties from the given ranges and set into the physics simulation + # stiffness + if stiffness_distribution_params is not None: + stiffness = _randomize_prop_by_op( + self.asset.data.default_fixed_tendon_stiffness.clone(), + stiffness_distribution_params, env_ids, tendon_ids, operation=operation, distribution=distribution, ) - # -- upper limit - if upper_limit_distribution_params is not None: - limit[..., 1] = _randomize_prop_by_op( - limit[..., 1], - upper_limit_distribution_params, + self.asset.set_fixed_tendon_stiffness(stiffness[env_ids[:, None], tendon_ids], tendon_ids, env_ids) + + # damping + if damping_distribution_params is not None: + damping = _randomize_prop_by_op( + self.asset.data.default_fixed_tendon_damping.clone(), + damping_distribution_params, env_ids, tendon_ids, operation=operation, distribution=distribution, ) + self.asset.set_fixed_tendon_damping(damping[env_ids[:, None], tendon_ids], tendon_ids, env_ids) - # check if the limits are valid - tendon_limits = limit[env_ids[:, None], tendon_ids] - if (tendon_limits[..., 0] > tendon_limits[..., 1]).any(): - raise ValueError( - "Randomization term 'randomize_fixed_tendon_parameters' is setting lower tendon limits that are greater" - " than upper tendon limits." + # limit stiffness + if limit_stiffness_distribution_params is not None: + limit_stiffness = _randomize_prop_by_op( + self.asset.data.default_fixed_tendon_limit_stiffness.clone(), + limit_stiffness_distribution_params, + env_ids, + tendon_ids, + operation=operation, + distribution=distribution, + ) + self.asset.set_fixed_tendon_limit_stiffness( + limit_stiffness[env_ids[:, None], tendon_ids], tendon_ids, env_ids ) - asset.set_fixed_tendon_position_limit(tendon_limits, tendon_ids, env_ids) - - # rest length - if rest_length_distribution_params is not None: - rest_length = _randomize_prop_by_op( - asset.data.default_fixed_tendon_rest_length.clone(), - rest_length_distribution_params, - env_ids, - tendon_ids, - operation=operation, - distribution=distribution, - ) - asset.set_fixed_tendon_rest_length(rest_length[env_ids[:, None], tendon_ids], tendon_ids, env_ids) - - # offset - if offset_distribution_params is not None: - offset = _randomize_prop_by_op( - asset.data.default_fixed_tendon_offset.clone(), - offset_distribution_params, - env_ids, - tendon_ids, - operation=operation, - distribution=distribution, - ) - asset.set_fixed_tendon_offset(offset[env_ids[:, None], tendon_ids], tendon_ids, env_ids) - # write the fixed tendon properties into the simulation - asset.write_fixed_tendon_properties_to_sim(tendon_ids, env_ids) + # position limits + if lower_limit_distribution_params is not None or upper_limit_distribution_params is not None: + limit = self.asset.data.default_fixed_tendon_pos_limits.clone() + # -- lower limit + if lower_limit_distribution_params is not None: + limit[..., 0] = _randomize_prop_by_op( + limit[..., 0], + lower_limit_distribution_params, + env_ids, + tendon_ids, + operation=operation, + distribution=distribution, + ) + # -- upper limit + if upper_limit_distribution_params is not None: + limit[..., 1] = _randomize_prop_by_op( + limit[..., 1], + upper_limit_distribution_params, + env_ids, + tendon_ids, + operation=operation, + distribution=distribution, + ) + + # check if the limits are valid + tendon_limits = limit[env_ids[:, None], tendon_ids] + if (tendon_limits[..., 0] > tendon_limits[..., 1]).any(): + raise ValueError( + "Randomization term 'randomize_fixed_tendon_parameters' is setting lower tendon limits that are" + " greater than upper tendon limits." + ) + self.asset.set_fixed_tendon_position_limit(tendon_limits, tendon_ids, env_ids) + + # rest length + if rest_length_distribution_params is not None: + rest_length = _randomize_prop_by_op( + self.asset.data.default_fixed_tendon_rest_length.clone(), + rest_length_distribution_params, + env_ids, + tendon_ids, + operation=operation, + distribution=distribution, + ) + self.asset.set_fixed_tendon_rest_length(rest_length[env_ids[:, None], tendon_ids], tendon_ids, env_ids) + + # offset + if offset_distribution_params is not None: + offset = _randomize_prop_by_op( + self.asset.data.default_fixed_tendon_offset.clone(), + offset_distribution_params, + env_ids, + tendon_ids, + operation=operation, + distribution=distribution, + ) + self.asset.set_fixed_tendon_offset(offset[env_ids[:, None], tendon_ids], tendon_ids, env_ids) + + # write the fixed tendon properties into the simulation + self.asset.write_fixed_tendon_properties_to_sim(tendon_ids, env_ids) def apply_external_force_torque( @@ -1568,3 +1704,47 @@ def _randomize_prop_by_op( f"Unknown operation: '{operation}' for property randomization. Please use 'add', 'scale', or 'abs'." ) return data + + +def _validate_scale_range( + params: tuple[float, float] | None, + name: str, + *, + allow_negative: bool = False, + allow_zero: bool = True, +) -> None: + """ + Validates a (low, high) tuple used in scale-based randomization. + + This function ensures the tuple follows expected rules when applying a 'scale' + operation. It performs type and value checks, optionally allowing negative or + zero lower bounds. + + Args: + params (tuple[float, float] | None): The (low, high) range to validate. If None, + validation is skipped. + name (str): The name of the parameter being validated, used for error messages. + allow_negative (bool, optional): If True, allows the lower bound to be negative. + Defaults to False. + allow_zero (bool, optional): If True, allows the lower bound to be zero. + Defaults to True. + + Raises: + TypeError: If `params` is not a tuple of two numbers. + ValueError: If the lower bound is negative or zero when not allowed. + ValueError: If the upper bound is less than the lower bound. + + Example: + _validate_scale_range((0.5, 1.5), "mass_scale") + """ + if params is None: # caller didn’t request randomisation for this field + return + low, high = params + if not isinstance(low, (int, float)) or not isinstance(high, (int, float)): + raise TypeError(f"{name}: expected (low, high) to be a tuple of numbers, got {params}.") + if not allow_negative and not allow_zero and low <= 0: + raise ValueError(f"{name}: lower bound must be > 0 when using the 'scale' operation (got {low}).") + if not allow_negative and allow_zero and low < 0: + raise ValueError(f"{name}: lower bound must be ≥ 0 when using the 'scale' operation (got {low}).") + if high < low: + raise ValueError(f"{name}: upper bound ({high}) must be ≥ lower bound ({low}).")