Skip to content

Commit f7056c0

Browse files
alsoramergify[bot]
andauthored
fix events-executor warm-up bug and add unit-tests (#2591)
* add unit-test to verify that spin-all doesn't need warm-up Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> * improve comment and add callback group test Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> * move executor tests to a new file Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> * do not require warm up iteration with events executor spin_some Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> * add spin_some tests and cleanup Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> * add missing include directives Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> --------- Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent 2f71d6e commit f7056c0

File tree

3 files changed

+273
-1
lines changed

3 files changed

+273
-1
lines changed

rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ EventsExecutor::setup_notify_waitable()
7979
// ---> we need to wake up the executor so that it can terminate
8080
// - a node or callback group guard condition is triggered:
8181
// ---> the entities collection is changed, we need to update callbacks
82-
entities_need_rebuild_ = false;
8382
this->handle_updated_entities(false);
8483
});
8584

@@ -168,6 +167,14 @@ EventsExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhau
168167
return false;
169168
};
170169

170+
// If this spin is not exhaustive (e.g. spin_some), we need to explicitly check
171+
// if entities need to be rebuilt here rather than letting the notify waitable event do it.
172+
// A non-exhaustive spin would not check for work a second time, thus delaying the execution
173+
// of some entities to the next invocation of spin.
174+
if (!exhaustive) {
175+
this->handle_updated_entities(false);
176+
}
177+
171178
// Get the number of events and timers ready at start
172179
const size_t ready_events_at_start = events_queue_->size();
173180
size_t executed_events = 0;
@@ -314,6 +321,17 @@ void
314321
EventsExecutor::handle_updated_entities(bool notify)
315322
{
316323
(void)notify;
324+
325+
// Do not rebuild if we don't need to.
326+
// A rebuild event could be generated, but then
327+
// this function could end up being called from somewhere else
328+
// before that event gets processed, for example if
329+
// a node or callback group is manually added to the executor.
330+
const bool notify_waitable_triggered = entities_need_rebuild_.exchange(false);
331+
if (!notify_waitable_triggered && !this->collector_.has_pending()) {
332+
return;
333+
}
334+
317335
// Build the new collection
318336
this->collector_.update_collections();
319337
auto callback_groups = this->collector_.get_all_callback_groups();

rclcpp/test/rclcpp/CMakeLists.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,16 @@ if(TARGET test_executors_busy_waiting)
507507
target_link_libraries(test_executors_busy_waiting ${PROJECT_NAME})
508508
endif()
509509

510+
ament_add_gtest(
511+
test_executors_warmup
512+
executors/test_executors_warmup.cpp
513+
executors/test_waitable.cpp
514+
APPEND_LIBRARY_DIRS "${append_library_dirs}"
515+
TIMEOUT 180)
516+
if(TARGET test_executors_warmup)
517+
target_link_libraries(test_executors_warmup ${PROJECT_NAME} ${test_msgs_TARGETS})
518+
endif()
519+
510520
ament_add_gtest(test_static_single_threaded_executor executors/test_static_single_threaded_executor.cpp
511521
APPEND_LIBRARY_DIRS "${append_library_dirs}")
512522
ament_add_test_label(test_static_single_threaded_executor mimick)
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
// Copyright 2024 Open Source Robotics Foundation, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* This test checks all implementations of rclcpp::executor to check they pass they basic API
17+
* tests. Anything specific to any executor in particular should go in a separate test file.
18+
*/
19+
20+
#include <gtest/gtest.h>
21+
22+
#include <chrono>
23+
#include <cstddef>
24+
#include <memory>
25+
#include <string>
26+
#include <type_traits>
27+
#include <utility>
28+
29+
#include "rclcpp/rclcpp.hpp"
30+
31+
#include "test_msgs/msg/empty.hpp"
32+
33+
#include "./executor_types.hpp"
34+
35+
using namespace std::chrono_literals;
36+
37+
template<typename T>
38+
class TestExecutorsWarmup : public ::testing::Test
39+
{
40+
public:
41+
void SetUp()
42+
{
43+
rclcpp::init(0, nullptr);
44+
}
45+
46+
void TearDown()
47+
{
48+
rclcpp::shutdown();
49+
}
50+
};
51+
52+
TYPED_TEST_SUITE(TestExecutorsWarmup, ExecutorTypes, ExecutorTypeNames);
53+
54+
// This test verifies that spin_all is correctly collecting work multiple times
55+
// even when one of the items of work is a notifier waitable event and thus results in
56+
// rebuilding the entities collection.
57+
// When spin_all goes back to collect more work, it should see the ready items from
58+
// the new added entities
59+
TYPED_TEST(TestExecutorsWarmup, spin_all_doesnt_require_warmup)
60+
{
61+
using ExecutorType = TypeParam;
62+
ExecutorType executor;
63+
64+
// Enable intra-process to guarantee deterministic and synchronous delivery of the message / event
65+
auto node_options = rclcpp::NodeOptions().use_intra_process_comms(true);
66+
auto node = std::make_shared<rclcpp::Node>("test_node", node_options);
67+
68+
// Add node to the executor before creating the entities
69+
executor.add_node(node);
70+
71+
// Create entities, this will produce a notifier waitable event, telling the executor to refresh
72+
// the entities collection
73+
auto publisher = node->create_publisher<test_msgs::msg::Empty>("test_topic", rclcpp::QoS(10));
74+
size_t callback_count = 0;
75+
auto callback = [&callback_count](test_msgs::msg::Empty::ConstSharedPtr) {callback_count++;};
76+
auto subscription =
77+
node->create_subscription<test_msgs::msg::Empty>(
78+
"test_topic", rclcpp::QoS(10), std::move(callback));
79+
80+
ASSERT_EQ(callback_count, 0u);
81+
82+
// Publish a message so that the new entities (i.e. the subscriber) already have work to do
83+
publisher->publish(test_msgs::msg::Empty());
84+
85+
// We need to select a duration that is greater than
86+
// the time taken to refresh the entities collection and rebuild the waitset.
87+
// spin-all is expected to process the notifier waitable event, rebuild the collection,
88+
// and then collect more work, finding the subscription message event.
89+
// This duration has been selected empirically.
90+
executor.spin_all(std::chrono::milliseconds(500));
91+
92+
// Verify that the callback is called as part of the spin above
93+
EXPECT_EQ(callback_count, 1u);
94+
}
95+
96+
// Same test as `spin_all_doesnt_require_warmup`, but uses a callback group
97+
// This test reproduces the bug reported by https://github.com/ros2/rclcpp/issues/2589
98+
TYPED_TEST(TestExecutorsWarmup, spin_all_doesnt_require_warmup_with_cbgroup)
99+
{
100+
using ExecutorType = TypeParam;
101+
102+
// TODO(alsora): Enable when https://github.com/ros2/rclcpp/pull/2595 gets merged
103+
if (
104+
std::is_same<ExecutorType, rclcpp::executors::SingleThreadedExecutor>() ||
105+
std::is_same<ExecutorType, rclcpp::executors::MultiThreadedExecutor>())
106+
{
107+
GTEST_SKIP();
108+
}
109+
110+
ExecutorType executor;
111+
112+
// Enable intra-process to guarantee deterministic and synchronous delivery of the message / event
113+
auto node_options = rclcpp::NodeOptions().use_intra_process_comms(true);
114+
auto node = std::make_shared<rclcpp::Node>("test_node", node_options);
115+
116+
auto callback_group = node->create_callback_group(
117+
rclcpp::CallbackGroupType::MutuallyExclusive,
118+
false);
119+
120+
// Add callback group to the executor before creating the entities
121+
executor.add_callback_group(callback_group, node->get_node_base_interface());
122+
123+
// Create entities, this will produce a notifier waitable event, telling the executor to refresh
124+
// the entities collection
125+
auto publisher = node->create_publisher<test_msgs::msg::Empty>("test_topic", rclcpp::QoS(10));
126+
size_t callback_count = 0;
127+
auto callback = [&callback_count](test_msgs::msg::Empty::ConstSharedPtr) {callback_count++;};
128+
rclcpp::SubscriptionOptions sub_options;
129+
sub_options.callback_group = callback_group;
130+
auto subscription =
131+
node->create_subscription<test_msgs::msg::Empty>(
132+
"test_topic", rclcpp::QoS(10), std::move(callback), sub_options);
133+
134+
ASSERT_EQ(callback_count, 0u);
135+
136+
// Publish a message so that the new entities (i.e. the subscriber) already have work to do
137+
publisher->publish(test_msgs::msg::Empty());
138+
139+
// We need to select a duration that is greater than
140+
// the time taken to refresh the entities collection and rebuild the waitset.
141+
// spin-all is expected to process the notifier waitable event, rebuild the collection,
142+
// and then collect more work, finding the subscription message event.
143+
// This duration has been selected empirically.
144+
executor.spin_all(std::chrono::milliseconds(500));
145+
146+
// Verify that the callback is called as part of the spin above
147+
EXPECT_EQ(callback_count, 1u);
148+
}
149+
150+
TYPED_TEST(TestExecutorsWarmup, spin_some_doesnt_require_warmup)
151+
{
152+
using ExecutorType = TypeParam;
153+
154+
// TODO(alsora): currently only the events-executor passes this test.
155+
// Enable single-threaded and multi-threaded executors
156+
// when https://github.com/ros2/rclcpp/pull/2595 gets merged
157+
if (
158+
!std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>())
159+
{
160+
GTEST_SKIP();
161+
}
162+
163+
ExecutorType executor;
164+
165+
// Enable intra-process to guarantee deterministic and synchronous delivery of the message / event
166+
auto node_options = rclcpp::NodeOptions().use_intra_process_comms(true);
167+
auto node = std::make_shared<rclcpp::Node>("test_node", node_options);
168+
169+
// Add node to the executor before creating the entities
170+
executor.add_node(node);
171+
172+
// Create entities, this will produce a notifier waitable event, telling the executor to refresh
173+
// the entities collection
174+
auto publisher = node->create_publisher<test_msgs::msg::Empty>("test_topic", rclcpp::QoS(10));
175+
size_t callback_count = 0;
176+
auto callback = [&callback_count](test_msgs::msg::Empty::ConstSharedPtr) {callback_count++;};
177+
auto subscription =
178+
node->create_subscription<test_msgs::msg::Empty>(
179+
"test_topic", rclcpp::QoS(10), std::move(callback));
180+
181+
ASSERT_EQ(callback_count, 0u);
182+
183+
// Publish a message so that the new entities (i.e. the subscriber) already have work to do
184+
publisher->publish(test_msgs::msg::Empty());
185+
186+
// NOTE: intra-process communication is enabled, so the subscription will immediately see
187+
// the new message, no risk of race conditions where spin_some gets called before the
188+
// message has been delivered.
189+
executor.spin_some();
190+
191+
// Verify that the callback is called as part of the spin above
192+
EXPECT_EQ(callback_count, 1u);
193+
}
194+
195+
TYPED_TEST(TestExecutorsWarmup, spin_some_doesnt_require_warmup_with_cbgroup)
196+
{
197+
using ExecutorType = TypeParam;
198+
199+
// TODO(alsora): currently only the events-executor passes this test.
200+
// Enable single-threaded and multi-threaded executors
201+
// when https://github.com/ros2/rclcpp/pull/2595 gets merged
202+
if (
203+
!std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>())
204+
{
205+
GTEST_SKIP();
206+
}
207+
208+
ExecutorType executor;
209+
210+
// Enable intra-process to guarantee deterministic and synchronous delivery of the message / event
211+
auto node_options = rclcpp::NodeOptions().use_intra_process_comms(true);
212+
auto node = std::make_shared<rclcpp::Node>("test_node", node_options);
213+
214+
auto callback_group = node->create_callback_group(
215+
rclcpp::CallbackGroupType::MutuallyExclusive,
216+
false);
217+
218+
// Add callback group to the executor before creating the entities
219+
executor.add_callback_group(callback_group, node->get_node_base_interface());
220+
221+
// Create entities, this will produce a notifier waitable event, telling the executor to refresh
222+
// the entities collection
223+
auto publisher = node->create_publisher<test_msgs::msg::Empty>("test_topic", rclcpp::QoS(10));
224+
size_t callback_count = 0;
225+
auto callback = [&callback_count](test_msgs::msg::Empty::ConstSharedPtr) {callback_count++;};
226+
rclcpp::SubscriptionOptions sub_options;
227+
sub_options.callback_group = callback_group;
228+
auto subscription =
229+
node->create_subscription<test_msgs::msg::Empty>(
230+
"test_topic", rclcpp::QoS(10), std::move(callback), sub_options);
231+
232+
ASSERT_EQ(callback_count, 0u);
233+
234+
// Publish a message so that the new entities (i.e. the subscriber) already have work to do
235+
publisher->publish(test_msgs::msg::Empty());
236+
237+
// NOTE: intra-process communication is enabled, so the subscription will immediately see
238+
// the new message, no risk of race conditions where spin_some gets called before the
239+
// message has been delivered.
240+
executor.spin_some();
241+
242+
// Verify that the callback is called as part of the spin above
243+
EXPECT_EQ(callback_count, 1u);
244+
}

0 commit comments

Comments
 (0)