Skip to content

Commit 54b8f9c

Browse files
authored
Have the EventsExecutor use more common code (#2570)
* move notify waitable setup to its own function * move mutex lock to retrieve_entity utility * use entities_need_rebuild_ atomic bool in events-executors * remove duplicated set_on_ready_callback for notify_waitable * use mutex from base class rather than a new recursive mutex * use current_collection_ member in events-executor * delay adding notify waitable to collection * postpone clearing the current collection * commonize notify waitable and collection * commonize add/remove node/cbg methods * fix linter errors --------- Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>
1 parent bdf1f8f commit 54b8f9c

File tree

9 files changed

+90
-244
lines changed

9 files changed

+90
-244
lines changed

rclcpp/include/rclcpp/executor.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -541,8 +541,9 @@ class Executor
541541
*
542542
* \param[in] notify if true will execute a trigger that will wake up a waiting executor
543543
*/
544-
void
545-
trigger_entity_recollect(bool notify);
544+
RCLCPP_PUBLIC
545+
virtual void
546+
handle_updated_entities(bool notify);
546547

547548
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
548549
std::atomic_bool spinning;

rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,14 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
122122
void
123123
clear_on_ready_callback() override;
124124

125+
/// Set a new callback to be called whenever this waitable is executed.
126+
/**
127+
* \param[in] on_execute_callback The new callback
128+
*/
129+
RCLCPP_PUBLIC
130+
void
131+
set_execute_callback(std::function<void(void)> on_execute_callback);
132+
125133
/// Remove a guard condition from being waited on.
126134
/**
127135
* \param[in] weak_guard_condition The guard condition to remove.
@@ -142,7 +150,10 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
142150
/// Callback to run when waitable executes
143151
std::function<void(void)> execute_callback_;
144152

153+
/// Mutex to procetect the guard conditions
145154
std::mutex guard_condition_mutex_;
155+
/// Mutex to protect the execute callback
156+
std::mutex execute_mutex_;
146157

147158
std::function<void(size_t)> on_ready_callback_;
148159

rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp

Lines changed: 13 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ namespace executors
5959
*/
6060
class EventsExecutor : public rclcpp::Executor
6161
{
62-
friend class EventsExecutorEntitiesCollector;
63-
6462
public:
6563
RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutor)
6664

@@ -72,7 +70,7 @@ class EventsExecutor : public rclcpp::Executor
7270
* \param[in] options Options used to configure the executor.
7371
*/
7472
RCLCPP_PUBLIC
75-
explicit EventsExecutor(
73+
EventsExecutor(
7674
rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue = std::make_unique<
7775
rclcpp::experimental::executors::SimpleEventsQueue>(),
7876
bool execute_timers_separate_thread = false,
@@ -128,87 +126,6 @@ class EventsExecutor : public rclcpp::Executor
128126
void
129127
spin_all(std::chrono::nanoseconds max_duration) override;
130128

131-
/// Add a node to the executor.
132-
/**
133-
* \sa rclcpp::Executor::add_node
134-
*/
135-
RCLCPP_PUBLIC
136-
void
137-
add_node(
138-
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
139-
bool notify = true) override;
140-
141-
/// Convenience function which takes Node and forwards NodeBaseInterface.
142-
/**
143-
* \sa rclcpp::EventsExecutor::add_node
144-
*/
145-
RCLCPP_PUBLIC
146-
void
147-
add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true) override;
148-
149-
/// Remove a node from the executor.
150-
/**
151-
* \sa rclcpp::Executor::remove_node
152-
*/
153-
RCLCPP_PUBLIC
154-
void
155-
remove_node(
156-
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
157-
bool notify = true) override;
158-
159-
/// Convenience function which takes Node and forwards NodeBaseInterface.
160-
/**
161-
* \sa rclcpp::Executor::remove_node
162-
*/
163-
RCLCPP_PUBLIC
164-
void
165-
remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true) override;
166-
167-
/// Add a callback group to an executor.
168-
/**
169-
* \sa rclcpp::Executor::add_callback_group
170-
*/
171-
RCLCPP_PUBLIC
172-
void
173-
add_callback_group(
174-
rclcpp::CallbackGroup::SharedPtr group_ptr,
175-
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
176-
bool notify = true) override;
177-
178-
/// Remove callback group from the executor
179-
/**
180-
* \sa rclcpp::Executor::remove_callback_group
181-
*/
182-
RCLCPP_PUBLIC
183-
void
184-
remove_callback_group(
185-
rclcpp::CallbackGroup::SharedPtr group_ptr,
186-
bool notify = true) override;
187-
188-
/// Get callback groups that belong to executor.
189-
/**
190-
* \sa rclcpp::Executor::get_all_callback_groups()
191-
*/
192-
RCLCPP_PUBLIC
193-
std::vector<rclcpp::CallbackGroup::WeakPtr>
194-
get_all_callback_groups() override;
195-
196-
/// Get callback groups that belong to executor.
197-
/**
198-
* \sa rclcpp::Executor::get_manually_added_callback_groups()
199-
*/
200-
RCLCPP_PUBLIC
201-
std::vector<rclcpp::CallbackGroup::WeakPtr>
202-
get_manually_added_callback_groups() override;
203-
204-
/// Get callback groups that belong to executor.
205-
/**
206-
* \sa rclcpp::Executor::get_automatically_added_callback_groups_from_nodes()
207-
*/
208-
RCLCPP_PUBLIC
209-
std::vector<rclcpp::CallbackGroup::WeakPtr>
210-
get_automatically_added_callback_groups_from_nodes() override;
211-
212129
protected:
213130
/// Internal implementation of spin_once
214131
RCLCPP_PUBLIC
@@ -220,16 +137,21 @@ class EventsExecutor : public rclcpp::Executor
220137
void
221138
spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive);
222139

140+
/// Collect entities from callback groups and refresh the current collection with them
141+
RCLCPP_PUBLIC
142+
void
143+
handle_updated_entities(bool notify) override;
144+
223145
private:
224146
RCLCPP_DISABLE_COPY(EventsExecutor)
225147

226148
/// Execute a provided executor event if its associated entities are available
227149
void
228150
execute_event(const ExecutorEvent & event);
229151

230-
/// Collect entities from callback groups and refresh the current collection with them
152+
/// Rebuilds the executor's notify waitable, as we can't use the one built in the base class
231153
void
232-
refresh_current_collection_from_callback_groups();
154+
setup_notify_waitable();
233155

234156
/// Refresh the current collection using the provided new_collection
235157
void
@@ -253,6 +175,11 @@ class EventsExecutor : public rclcpp::Executor
253175
typename CollectionType::EntitySharedPtr
254176
retrieve_entity(typename CollectionType::Key entity_id, CollectionType & collection)
255177
{
178+
// Note: we lock the mutex because we assume that you are trying to get an element from the
179+
// current collection... If there will be a use-case to retrieve elements also from other
180+
// collections, we can move the mutex back to the calling codes.
181+
std::lock_guard<std::mutex> guard(mutex_);
182+
256183
// Check if the entity_id is in the collection
257184
auto it = collection.find(entity_id);
258185
if (it == collection.end()) {
@@ -273,16 +200,6 @@ class EventsExecutor : public rclcpp::Executor
273200
/// Queue where entities can push events
274201
rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue_;
275202

276-
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollector> entities_collector_;
277-
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;
278-
279-
/// Mutex to protect the current_entities_collection_
280-
std::recursive_mutex collection_mutex_;
281-
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;
282-
283-
/// Flag used to reduce the number of unnecessary waitable events
284-
std::atomic<bool> notify_waitable_event_pushed_ {false};
285-
286203
/// Timers manager used to track and/or execute associated timers
287204
std::shared_ptr<rclcpp::experimental::TimersManager> timers_manager_;
288205
};

rclcpp/src/rclcpp/executor.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ Executor::~Executor()
129129
}
130130

131131
void
132-
Executor::trigger_entity_recollect(bool notify)
132+
Executor::handle_updated_entities(bool notify)
133133
{
134134
this->entities_need_rebuild_.store(true);
135135

@@ -174,11 +174,11 @@ Executor::add_callback_group(
174174
this->collector_.add_callback_group(group_ptr);
175175

176176
try {
177-
this->trigger_entity_recollect(notify);
177+
this->handle_updated_entities(notify);
178178
} catch (const rclcpp::exceptions::RCLError & ex) {
179179
throw std::runtime_error(
180180
std::string(
181-
"Failed to trigger guard condition on callback group add: ") + ex.what());
181+
"Failed to handle entities update on callback group add: ") + ex.what());
182182
}
183183
}
184184

@@ -188,11 +188,11 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
188188
this->collector_.add_node(node_ptr);
189189

190190
try {
191-
this->trigger_entity_recollect(notify);
191+
this->handle_updated_entities(notify);
192192
} catch (const rclcpp::exceptions::RCLError & ex) {
193193
throw std::runtime_error(
194194
std::string(
195-
"Failed to trigger guard condition on node add: ") + ex.what());
195+
"Failed to handle entities update on node add: ") + ex.what());
196196
}
197197
}
198198

@@ -204,11 +204,11 @@ Executor::remove_callback_group(
204204
this->collector_.remove_callback_group(group_ptr);
205205

206206
try {
207-
this->trigger_entity_recollect(notify);
207+
this->handle_updated_entities(notify);
208208
} catch (const rclcpp::exceptions::RCLError & ex) {
209209
throw std::runtime_error(
210210
std::string(
211-
"Failed to trigger guard condition on callback group remove: ") + ex.what());
211+
"Failed to handle entities update on callback group remove: ") + ex.what());
212212
}
213213
}
214214

@@ -224,11 +224,11 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node
224224
this->collector_.remove_node(node_ptr);
225225

226226
try {
227-
this->trigger_entity_recollect(notify);
227+
this->handle_updated_entities(notify);
228228
} catch (const rclcpp::exceptions::RCLError & ex) {
229229
throw std::runtime_error(
230230
std::string(
231-
"Failed to trigger guard condition on node remove: ") + ex.what());
231+
"Failed to handle entities update on node remove: ") + ex.what());
232232
}
233233
}
234234

rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
#include <iostream>
16-
1715
#include "rclcpp/exceptions.hpp"
1816
#include "rclcpp/executors/executor_notify_waitable.hpp"
1917

@@ -91,9 +89,9 @@ ExecutorNotifyWaitable::is_ready(const rcl_wait_set_t & wait_set)
9189
}
9290

9391
void
94-
ExecutorNotifyWaitable::execute(const std::shared_ptr<void> & data)
92+
ExecutorNotifyWaitable::execute(const std::shared_ptr<void> & /*data*/)
9593
{
96-
(void) data;
94+
std::lock_guard<std::mutex> lock(execute_mutex_);
9795
this->execute_callback_();
9896
}
9997

@@ -149,6 +147,14 @@ ExecutorNotifyWaitable::clear_on_ready_callback()
149147
}
150148
}
151149

150+
RCLCPP_PUBLIC
151+
void
152+
ExecutorNotifyWaitable::set_execute_callback(std::function<void(void)> on_execute_callback)
153+
{
154+
std::lock_guard<std::mutex> lock(execute_mutex_);
155+
execute_callback_ = on_execute_callback;
156+
}
157+
152158
void
153159
ExecutorNotifyWaitable::add_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
154160
{

0 commit comments

Comments
 (0)