-
Notifications
You must be signed in to change notification settings - Fork 280
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race conditions in MessageFilter #539
Open
rhaschke
wants to merge
9
commits into
ros:noetic-devel
Choose a base branch
from
rhaschke:fix-race-condition-noetic
base: noetic-devel
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
07b5a27
MessageFilter: unregister TransformableCallback on destruction
rhaschke 58a597d
Add stress test for MessageFilter
rhaschke 529ce02
Fix race condition
rhaschke 91d7601
Remove debug output
rhaschke 23e6295
Revert #144: Solve a bug that causes a deadlock in MessageFilter
rhaschke 436caee
Simplify #101
rhaschke 9fdf547
Alternative implementation to resolve deadlock #91/#144
rhaschke 4a05272
Resolve additional deadlock between MessageFilter and BufferCore
rhaschke e469abf
Reduce probability of race condition with CBQueueCallback
rhaschke File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -215,14 +215,15 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi | |
~MessageFilter() | ||
{ | ||
message_connection_.disconnect(); | ||
|
||
MessageFilter::clear(); | ||
bc_.removeTransformableCallback(callback_handle_); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
||
TF2_ROS_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu", | ||
(long long unsigned int)successful_transform_count_, | ||
(long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_, | ||
(long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_); | ||
|
||
boost::unique_lock<boost::shared_mutex> lock(cbqueue_mutex_); // ensure that no more callback queue calls are active | ||
} | ||
|
||
/** | ||
|
@@ -277,13 +278,11 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi | |
*/ | ||
void clear() | ||
{ | ||
boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_); | ||
|
||
TF2_ROS_MESSAGEFILTER_DEBUG("%s", "Cleared"); | ||
|
||
bc_.removeTransformableCallback(callback_handle_); | ||
callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4, boost::placeholders::_5)); | ||
|
||
// acquire after remove/addTransformableCallback to avoid deadlock! | ||
boost::unique_lock<boost::shared_mutex> unique_lock(messages_mutex_); | ||
messages_.clear(); | ||
message_count_ = 0; | ||
|
||
|
@@ -292,6 +291,7 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi | |
callback_queue_->removeByID((uint64_t)this); | ||
|
||
warned_about_empty_frame_id_ = false; | ||
TF2_ROS_MESSAGEFILTER_DEBUG("%s", "Cleared"); | ||
} | ||
|
||
void add(const MEvent& evt) | ||
|
@@ -363,6 +363,7 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi | |
} | ||
} | ||
|
||
L_MessageInfo msgs_to_drop; | ||
|
||
// We can transform already | ||
if (info.success_count == expected_success_count_) | ||
|
@@ -371,26 +372,13 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi | |
} | ||
else | ||
{ | ||
boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_); | ||
boost::unique_lock<boost::shared_mutex> unique_lock(messages_mutex_); | ||
// If this message is about to push us past our queue size, erase the oldest message | ||
if (queue_size_ != 0 && message_count_ + 1 > queue_size_) | ||
{ | ||
++dropped_message_count_; | ||
const MessageInfo& front = messages_.front(); | ||
TF2_ROS_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_, | ||
(mt::FrameId<M>::value(*front.event.getMessage())).c_str(), mt::TimeStamp<M>::value(*front.event.getMessage()).toSec()); | ||
|
||
V_TransformableRequestHandle::const_iterator it = front.handles.begin(); | ||
V_TransformableRequestHandle::const_iterator end = front.handles.end(); | ||
|
||
for (; it != end; ++it) | ||
{ | ||
bc_.cancelTransformableRequest(*it); | ||
} | ||
|
||
messageDropped(front.event, filter_failure_reasons::Unknown); | ||
messages_.pop_front(); | ||
--message_count_; | ||
// move front element from messages_ to msgs_to_drop for later dropping | ||
msgs_to_drop.splice(msgs_to_drop.begin(), messages_, messages_.begin()); | ||
--message_count_; | ||
} | ||
|
||
// Add the message to our list | ||
|
@@ -399,6 +387,19 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi | |
++message_count_; | ||
} | ||
|
||
// Delay dropping of messages until we released messages_mutex_ to avoid deadlocks (#91, #101, #144) | ||
for (const MessageInfo &msg : msgs_to_drop) | ||
{ | ||
++dropped_message_count_; | ||
TF2_ROS_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_, | ||
(mt::FrameId<M>::value(*msg.event.getMessage())).c_str(), mt::TimeStamp<M>::value(*msg.event.getMessage()).toSec()); | ||
|
||
for (const auto req : msg.handles) | ||
bc_.cancelTransformableRequest(req); | ||
|
||
messageDropped(msg.event, filter_failure_reasons::Unknown); | ||
} | ||
|
||
TF2_ROS_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_); | ||
|
||
++incoming_message_count_; | ||
|
@@ -461,7 +462,7 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi | |
{ | ||
namespace mt = ros::message_traits; | ||
|
||
boost::upgrade_lock< boost::shared_mutex > lock(messages_mutex_); | ||
boost::upgrade_lock<boost::shared_mutex> read_lock(messages_mutex_); | ||
|
||
// find the message this request is associated with | ||
typename L_MessageInfo::iterator msg_it = messages_.begin(); | ||
|
@@ -524,8 +525,6 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi | |
can_transform = false; | ||
} | ||
|
||
// We will be mutating messages now, require unique lock | ||
boost::upgrade_to_unique_lock< boost::shared_mutex > uniqueLock(lock); | ||
if (can_transform) | ||
{ | ||
TF2_ROS_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1); | ||
|
@@ -543,6 +542,8 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi | |
messageDropped(info.event, filter_failure_reasons::Unknown); | ||
} | ||
|
||
// We will be mutating messages now, require unique lock | ||
boost::upgrade_to_unique_lock<boost::shared_mutex> write_lock(read_lock); | ||
messages_.erase(msg_it); | ||
--message_count_; | ||
} | ||
|
@@ -595,6 +596,7 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi | |
|
||
virtual CallResult call() | ||
{ | ||
boost::shared_lock<boost::shared_mutex> lock(filter_->cbqueue_mutex_); | ||
if (success_) | ||
{ | ||
filter_->signalMessage(event_); | ||
|
@@ -668,7 +670,8 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi | |
V_string target_frames_; ///< The frames we need to be able to transform to before a message is ready | ||
std::string target_frames_string_; | ||
boost::mutex target_frames_mutex_; ///< A mutex to protect access to the target_frames_ list and target_frames_string. | ||
uint32_t queue_size_; ///< The maximum number of messages we queue up | ||
boost::shared_mutex cbqueue_mutex_; ///< A mutex protecting calls from callback queues | ||
uint32_t queue_size_; ///< The maximum number of messages we queue up | ||
tf2::TransformableCallbackHandle callback_handle_; | ||
|
||
typedef std::vector<tf2::TransformableRequestHandle> V_TransformableRequestHandle; | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The objects referenced here might get destroyed before they are used below, resulting in a segfault.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to look a little deeper that this isn't going to restore the potential deadlock of #91
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Fortunately, unit tests nicely cover this deadlock. While 23e6295 restores the deadlock (as expected), resulting in a
RLTestTimeoutException
, 9fdf547 fixes it again using the method described in #538.