Skip to content

Moves delay actuator functionality to IdealPDActuator #3023

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions source/isaaclab/isaaclab/actuators/actuator_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,17 @@ class IdealPDActuatorCfg(ActuatorBaseCfg):

class_type: type = actuator_pd.IdealPDActuator

min_delay: int = 0
"""Minimum number of physics time-steps with which the actuator command may be delayed. Defaults to 0."""

max_delay: int = 0
"""Maximum number of physics time-steps with which the actuator command may be delayed. Defaults to 0."""

motor_strength: tuple[float, float] = (1.0, 1.0)
"""The strength of the motor which is characterized by the motor torque. Defaults to (1.0, 1.0).

The final applied torque will be scaled with a value between the range of the tuple."""


@configclass
class DCMotorCfg(IdealPDActuatorCfg):
Expand Down Expand Up @@ -250,11 +261,12 @@ class DelayedPDActuatorCfg(IdealPDActuatorCfg):

class_type: type = actuator_pd.DelayedPDActuator

min_delay: int = 0
"""Minimum number of physics time-steps with which the actuator command may be delayed. Defaults to 0."""

max_delay: int = 0
"""Maximum number of physics time-steps with which the actuator command may be delayed. Defaults to 0."""
def __post_init__(self):
print(
DeprecationWarning(
"DelayPDActuatorCfg has been deprecated. All functionality has been added to IdealPDActuatorCfg"
)
)


@configclass
Expand Down
111 changes: 69 additions & 42 deletions source/isaaclab/isaaclab/actuators/actuator_pd.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ class IdealPDActuator(ActuatorBase):
The parameters :math:`\gamma` is the gear ratio of the gear box connecting the motor and the actuated joint ends,
and :math:`\tau_{motor, max}` is the maximum motor effort possible. These parameters are read from
the configuration instance passed to the class.

An optional functionality of delay is implemented, using a circular buffer that stores the actuator commands for
a certain number of physics steps. The most recent actuation value is pushed to the buffer at every physics step,
but the final actuation value applied to the simulation is delayed by a certain number of physics steps.

The amount of time delay is configurable and can be set to a random value between the minimum and maximum time
delay bounds at every reset. The minimum and maximum time delay values are set in the configuration instance passed
to the class.

Note that, before clipping, this Actuator scales the computed effort depending on the motor_strength.
"""

cfg: IdealPDActuatorCfg
Expand All @@ -178,17 +188,69 @@ class IdealPDActuator(ActuatorBase):
Operations.
"""

def __init__(self, cfg: IdealPDActuatorCfg, *args, **kwargs):
super().__init__(cfg, *args, **kwargs)
if cfg.max_delay < cfg.min_delay:
raise ValueError("IdealPDActuatorCfg: max_delay must be greater than or equal to min_delay")
# instantiate the delay buffers
if cfg.max_delay > 0:
self.positions_delay_buffer = DelayBuffer(cfg.max_delay, self._num_envs, device=self._device)
self.velocities_delay_buffer = DelayBuffer(cfg.max_delay, self._num_envs, device=self._device)
self.efforts_delay_buffer = DelayBuffer(cfg.max_delay, self._num_envs, device=self._device)
# all of the envs
self._ALL_INDICES = torch.arange(self._num_envs, dtype=torch.long, device=self._device)
# configs for the motor strength
if self.cfg.motor_strength is not None:
self._motor_strength_ranges = self.cfg.motor_strength
else:
self._motor_strength_ranges = (1.0, 1.0)

self._motor_strength = torch.empty((len(self.computed_effort), 1), device=self._device)
self._current_motor_strength = self._motor_strength.uniform_(*self._motor_strength_ranges)

def reset(self, env_ids: Sequence[int]):
pass
# number of environments (since env_ids can be a slice)
if env_ids is None or env_ids == slice(None):
num_envs = self._num_envs
else:
num_envs = len(env_ids)
# set a new random delay for environments in env_ids
if self.cfg.max_delay > 0:
time_lags = torch.randint(
low=self.cfg.min_delay,
high=self.cfg.max_delay + 1,
size=(num_envs,),
dtype=torch.int,
device=self._device,
)
# set delays
self.positions_delay_buffer.set_time_lag(time_lags, env_ids)
self.velocities_delay_buffer.set_time_lag(time_lags, env_ids)
self.efforts_delay_buffer.set_time_lag(time_lags, env_ids)
# reset buffers
self.positions_delay_buffer.reset(env_ids)
self.velocities_delay_buffer.reset(env_ids)
self.efforts_delay_buffer.reset(env_ids)

# resample a motor strength within the motor strength ranges
self._current_motor_strength[env_ids] = self._motor_strength.uniform_(*self._motor_strength_ranges)[env_ids]

def compute(
self, control_action: ArticulationActions, joint_pos: torch.Tensor, joint_vel: torch.Tensor
) -> ArticulationActions:
# apply delay based on the delay the model for all the set points
if self.cfg.max_delay > 0:
control_action.joint_positions = self.positions_delay_buffer.compute(control_action.joint_positions)
control_action.joint_velocities = self.velocities_delay_buffer.compute(control_action.joint_velocities)
control_action.joint_efforts = self.efforts_delay_buffer.compute(control_action.joint_efforts)

# compute errors
error_pos = control_action.joint_positions - joint_pos
error_vel = control_action.joint_velocities - joint_vel
# calculate the desired joint torques
self.computed_effort = self.stiffness * error_pos + self.damping * error_vel + control_action.joint_efforts
# apply motor_strength
self.computed_effort *= self._current_motor_strength
# clip the torques based on the motor limits
self.applied_effort = self._clip_effort(self.computed_effort)
# set the computed actions back into the control action
Expand Down Expand Up @@ -316,52 +378,17 @@ class DelayedPDActuator(IdealPDActuator):

def __init__(self, cfg: DelayedPDActuatorCfg, *args, **kwargs):
super().__init__(cfg, *args, **kwargs)
# instantiate the delay buffers
self.positions_delay_buffer = DelayBuffer(cfg.max_delay, self._num_envs, device=self._device)
self.velocities_delay_buffer = DelayBuffer(cfg.max_delay, self._num_envs, device=self._device)
self.efforts_delay_buffer = DelayBuffer(cfg.max_delay, self._num_envs, device=self._device)
# all of the envs
self._ALL_INDICES = torch.arange(self._num_envs, dtype=torch.long, device=self._device)

def reset(self, env_ids: Sequence[int]):
super().reset(env_ids)
# number of environments (since env_ids can be a slice)
if env_ids is None or env_ids == slice(None):
num_envs = self._num_envs
else:
num_envs = len(env_ids)
# set a new random delay for environments in env_ids
time_lags = torch.randint(
low=self.cfg.min_delay,
high=self.cfg.max_delay + 1,
size=(num_envs,),
dtype=torch.int,
device=self._device,
print(
DeprecationWarning(
"DelayPDActuator has been deprecated. All functionality has been added to IdealPDActuator"
)
)
# set delays
self.positions_delay_buffer.set_time_lag(time_lags, env_ids)
self.velocities_delay_buffer.set_time_lag(time_lags, env_ids)
self.efforts_delay_buffer.set_time_lag(time_lags, env_ids)
# reset buffers
self.positions_delay_buffer.reset(env_ids)
self.velocities_delay_buffer.reset(env_ids)
self.efforts_delay_buffer.reset(env_ids)

def compute(
self, control_action: ArticulationActions, joint_pos: torch.Tensor, joint_vel: torch.Tensor
) -> ArticulationActions:
# apply delay based on the delay the model for all the setpoints
control_action.joint_positions = self.positions_delay_buffer.compute(control_action.joint_positions)
control_action.joint_velocities = self.velocities_delay_buffer.compute(control_action.joint_velocities)
control_action.joint_efforts = self.efforts_delay_buffer.compute(control_action.joint_efforts)
# compte actuator model
return super().compute(control_action, joint_pos, joint_vel)


class RemotizedPDActuator(DelayedPDActuator):
class RemotizedPDActuator(IdealPDActuator):
"""Ideal PD actuator with angle-dependent torque limits.

This class extends the :class:`DelayedPDActuator` class by adding angle-dependent torque limits to the actuator.
This class extends the :class:`IdealPDActuator` class by adding angle-dependent torque limits to the actuator.
The torque limits are applied by querying a lookup table describing the relationship between the joint angle
and the maximum output torque. The lookup table is provided in the configuration instance passed to the class.

Expand Down
172 changes: 171 additions & 1 deletion source/isaaclab/test/actuators/test_ideal_pd_actuator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

import pytest

from isaaclab.actuators import IdealPDActuatorCfg
from isaaclab.actuators import IdealPDActuator, IdealPDActuatorCfg
from isaaclab.utils.buffers import DelayBuffer
from isaaclab.utils.types import ArticulationActions


Expand Down Expand Up @@ -270,5 +271,174 @@ def test_ideal_pd_compute(num_envs, num_joints, device, effort_lim):
)


@pytest.mark.parametrize("num_envs", [1, 2])
@pytest.mark.parametrize("num_joints", [1, 2])
@pytest.mark.parametrize("device", ["cuda:0", "cpu"])
def test_ideal_pd_actuator_init_delay(num_envs, num_joints, device):
"""Test initialization of ideal pd actuator with delay."""
joint_names = [f"joint_{d}" for d in range(num_joints)]
joint_ids = [d for d in range(num_joints)]
stiffness = 200
damping = 10
effort_limit = 60.0
velocity_limit = 50
motor_strength = (1.0, 1.0)
delay = 5

actuator_cfg = IdealPDActuatorCfg(
joint_names_expr=joint_names,
stiffness=stiffness,
damping=damping,
effort_limit=effort_limit,
velocity_limit=velocity_limit,
motor_strength=motor_strength,
min_delay=delay,
max_delay=delay,
)

# assume Articulation class:
# - finds joints (names and ids) associate with the provided joint_names_expr

actuator = actuator_cfg.class_type(
actuator_cfg,
joint_names=joint_names,
joint_ids=joint_ids,
num_envs=num_envs,
device=device,
)
assert isinstance(actuator, IdealPDActuator)
# check device and shape
torch.testing.assert_close(actuator.computed_effort, torch.zeros(num_envs, num_joints, device=device))
torch.testing.assert_close(actuator.applied_effort, torch.zeros(num_envs, num_joints, device=device))
torch.testing.assert_close(
actuator.effort_limit,
effort_limit * torch.ones(num_envs, num_joints, device=device),
)
torch.testing.assert_close(
actuator.velocity_limit, velocity_limit * torch.ones(num_envs, num_joints, device=device)
)

# check delay buffers
assert isinstance(actuator.positions_delay_buffer, DelayBuffer)
assert isinstance(actuator.velocities_delay_buffer, DelayBuffer)
assert isinstance(actuator.efforts_delay_buffer, DelayBuffer)

assert actuator.positions_delay_buffer.history_length == delay
assert actuator.velocities_delay_buffer.history_length == delay
assert actuator.efforts_delay_buffer.history_length == delay

# check motor strength
torch.testing.assert_close(actuator._current_motor_strength, torch.ones((num_envs, 1), device=device))


@pytest.mark.parametrize("num_envs", [1, 2])
@pytest.mark.parametrize("num_joints", [1, 2])
@pytest.mark.parametrize("device", ["cuda:0", "cpu"])
@pytest.mark.parametrize("effort_lim", [None, 80])
@pytest.mark.parametrize("motor_strength_scalar", (1.0, 0.0))
def test_delay_pd_actuator_compute(num_envs, num_joints, device, effort_lim, motor_strength_scalar):
"""Test the computation of the delay pd actuator."""
joint_names = [f"joint_{d}" for d in range(num_joints)]
joint_ids = [d for d in range(num_joints)]
stiffness = 20
damping = 1
effort_limit = effort_lim
velocity_limit = 50
motor_strength = (motor_strength_scalar, motor_strength_scalar)
delay = 3
# configure actuator
actuator_cfg = IdealPDActuatorCfg(
joint_names_expr=joint_names,
stiffness=stiffness,
damping=damping,
effort_limit=effort_limit,
velocity_limit=velocity_limit,
motor_strength=motor_strength,
min_delay=delay,
max_delay=delay,
)
# instantiate actuator
actuator = actuator_cfg.class_type(
actuator_cfg,
joint_names=joint_names,
joint_ids=joint_ids,
num_envs=num_envs,
device=device,
stiffness=actuator_cfg.stiffness,
damping=actuator_cfg.damping,
)
# requires a restart to create the delay
actuator.reset(range(num_envs))

desired_pos = [2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0]
desired_vel = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
feedforward_effort = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

measured_joint_pos = 1.0
measured_joint_vel = 0.1

# check delay buffer filling properly
for i in range(delay + 4):

# check motor strength
torch.testing.assert_close(
motor_strength[0] * torch.ones(num_envs, 1, device=device),
actuator._current_motor_strength,
)

desired_control_action = ArticulationActions()
desired_control_action.joint_positions = desired_pos[i] * torch.ones(num_envs, num_joints, device=device)
desired_control_action.joint_velocities = desired_vel[i] * torch.ones(num_envs, num_joints, device=device)
desired_control_action.joint_efforts = feedforward_effort[i] * torch.ones(num_envs, num_joints, device=device)

computed_control_action = actuator.compute(
desired_control_action,
measured_joint_pos * torch.ones(num_envs, num_joints, device=device),
measured_joint_vel * torch.ones(num_envs, num_joints, device=device),
)

if i <= delay:
expect = motor_strength[0] * (
stiffness * (desired_pos[0] - measured_joint_pos)
+ damping * (desired_vel[0] - measured_joint_vel)
+ feedforward_effort[0]
)
else:
expect = motor_strength[0] * (
stiffness * (desired_pos[i - delay] - measured_joint_pos)
+ damping * (desired_vel[i - delay] - measured_joint_vel)
+ feedforward_effort[i - delay]
)

if effort_lim is not None:
expect_apply = min(expect, effort_lim)
else:
expect_apply = expect

torch.testing.assert_close(
expect * torch.ones(num_envs, num_joints, device=device),
actuator.computed_effort,
)
torch.testing.assert_close(
expect_apply * torch.ones(num_envs, num_joints, device=device),
actuator.applied_effort,
)
torch.testing.assert_close(
actuator.applied_effort,
computed_control_action.joint_efforts,
)
# check reset single env
actuator.reset([0])
assert actuator.positions_delay_buffer._circular_buffer._num_pushes[0] == 0
assert actuator.velocities_delay_buffer._circular_buffer._num_pushes[0] == 0
assert actuator.efforts_delay_buffer._circular_buffer._num_pushes[0] == 0
if num_envs > 1:
assert actuator.positions_delay_buffer._circular_buffer._num_pushes[1] == i + 1
assert actuator.velocities_delay_buffer._circular_buffer._num_pushes[1] == i + 1
assert actuator.efforts_delay_buffer._circular_buffer._num_pushes[1] == i + 1
# check actuator reset all
actuator.reset(range(num_envs))


if __name__ == "__main__":
pytest.main([__file__, "-v", "--maxfail=1"])
Loading