From 07b5a275dfd2e3c7cc63dab235fc70aa82d4d562 Mon Sep 17 00:00:00 2001 From: Robert Haschke Date: Thu, 30 Jun 2022 18:03:30 +0200 Subject: [PATCH 1/9] MessageFilter: unregister TransformableCallback on destruction --- tf2_ros/include/tf2_ros/message_filter.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tf2_ros/include/tf2_ros/message_filter.h b/tf2_ros/include/tf2_ros/message_filter.h index e8a47f532..e8f818d90 100644 --- a/tf2_ros/include/tf2_ros/message_filter.h +++ b/tf2_ros/include/tf2_ros/message_filter.h @@ -215,8 +215,8 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi ~MessageFilter() { message_connection_.disconnect(); - MessageFilter::clear(); + bc_.removeTransformableCallback(callback_handle_); TF2_ROS_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu", (long long unsigned int)successful_transform_count_, From 58a597d640df3503eb7191808f850cfeb861c338 Mon Sep 17 00:00:00 2001 From: Robert Haschke Date: Thu, 30 Jun 2022 23:34:21 +0200 Subject: [PATCH 2/9] Add stress test for MessageFilter ... revealing that propagating messages via a ROS callback queue is inherently unsafe. If the filter is going to be destroyed, a CBQueueCallback::call() might still be in action, locking the underlying Signal1's mutex_. This will cause as assertion failure during destruction of the mutex: boost::mutex::~mutex(): Assertion `!posix::pthread_mutex_destroy(&m)' failed. --- tf2_ros/test/message_filter_test.cpp | 123 +++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/tf2_ros/test/message_filter_test.cpp b/tf2_ros/test/message_filter_test.cpp index b613d24fe..879cc181e 100644 --- a/tf2_ros/test/message_filter_test.cpp +++ b/tf2_ros/test/message_filter_test.cpp @@ -118,6 +118,129 @@ TEST(tf2_ros_message_filter, multiple_frames_and_time_tolerance) ASSERT_TRUE(filter_callback_fired); } +template +class MessageGenerator : public message_filters::SimpleFilter +{ +public: + template + void connectInput(F &) + { + } + + void add(const ros::MessageEvent &) + { + } + + void generate(const std::string &frame_id, const ros::Time &time) + { + auto msg = boost::make_shared(); + msg->header.frame_id = frame_id; + msg->header.stamp = time; + this->signalMessage(msg); + } +}; + +class MessageFilterFixture : public ::testing::TestWithParam +{ + using M = geometry_msgs::PointStamped; + +protected: + tf2_ros::Buffer buffer; + MessageGenerator source; + std::list> filters; + bool run = true; + + struct Sink + { + std::string name_; + int delay_; + + Sink(const std::string &name, int delay = 0) : name_(name), delay_(delay) {} + void operator()(const boost::shared_ptr &msg) + { + std::cerr << name_ << " "; + std::this_thread::sleep_for(std::chrono::milliseconds(delay_)); + } + }; + +public: + void msg_gen() + { + ros::WallRate rate(100); // publish messages @ 100Hz + const std::string frame_id("target"); + while (ros::ok() && run) + { + std::cerr << "\ngenerated: "; + source.generate(frame_id, ros::Time::now()); + rate.sleep(); + } + }; + + void frame_gen() + { + ros::WallRate rate(50); // publish frame info @ 50 Hz (slower than msgs) + while (ros::ok() && run) + { + geometry_msgs::TransformStamped transform; + transform.header.stamp = ros::Time::now(); + transform.header.frame_id = "base"; + transform.child_frame_id = "target"; + transform.transform.translation.x = 0.0; + transform.transform.translation.y = 0.0; + transform.transform.translation.z = 0.0; + transform.transform.rotation.x = 0.0; + transform.transform.rotation.y = 0.0; + transform.transform.rotation.z = 0.0; + transform.transform.rotation.w = 1.0; + buffer.setTransform(transform, "frame_generator", false); + rate.sleep(); + } + }; + + void add_filter(int i, ros::CallbackQueueInterface *queue) + { + std::string name(queue ? "Q" : "S"); + name += std::to_string(i); + + filters.emplace_back(buffer, "base", i + 1, queue); + auto &f = filters.back(); + f.setName(name); + f.connectInput(source); + f.registerCallback(Sink(name, 1)); + }; +}; + +TEST_P(MessageFilterFixture, StressTest) +{ + ros::NodeHandle nh; + ros::AsyncSpinner spinner(1); + spinner.start(); + + std::thread msg_gen(&MessageFilterFixture::msg_gen, this); + std::thread frame_gen(&MessageFilterFixture::frame_gen, this); + + bool use_cbqueue = GetParam(); + ros::CallbackQueueInterface *queue = use_cbqueue ? nh.getCallbackQueue() : nullptr; + // use fewer filters for signal-only transmission as we can remove only a single filter per iteration + int num_filters = use_cbqueue ? 50 : 10; + for (int i = 0; i < num_filters; ++i) + add_filter(i, queue); + + // slowly remove filters + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + while (!filters.empty()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(7)); + std::cerr << "\033[31m" << filters.front().getName() << "\033[0m "; + filters.pop_front(); + } + + run = false; + msg_gen.join(); + frame_gen.join(); +} +INSTANTIATE_TEST_CASE_P(MessageFilterTests, MessageFilterFixture, ::testing::Values(false, true)); + int main(int argc, char **argv){ testing::InitGoogleTest(&argc, argv); ros::init(argc, argv, "tf2_ros_message_filter"); From 529ce02c01901f334e98d02de13efcb10564347a Mon Sep 17 00:00:00 2001 From: Robert Haschke Date: Thu, 30 Jun 2022 23:56:24 +0200 Subject: [PATCH 3/9] Fix race condition When MessageFilter::signalMessage() is called asynchronouyls via a ROS callback queue, this call might still be in progress while another (holding Signal1::mutex_), while another thread might attempt to remove the MessageFilter. This results in a failed assertion as the MessageFilter destructor attempts to destroy a locked mutex. Here, by introducing another mutex in MessageFilter, we ensure that CBQueueCallback::call() is finished before destroying the MessageFilter. --- tf2_ros/include/tf2_ros/message_filter.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tf2_ros/include/tf2_ros/message_filter.h b/tf2_ros/include/tf2_ros/message_filter.h index e8f818d90..f5b161036 100644 --- a/tf2_ros/include/tf2_ros/message_filter.h +++ b/tf2_ros/include/tf2_ros/message_filter.h @@ -217,6 +217,7 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi message_connection_.disconnect(); MessageFilter::clear(); bc_.removeTransformableCallback(callback_handle_); + boost::mutex::scoped_lock lock(cbqueue_mutex_); // ensure that no callback queue call is active TF2_ROS_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu", (long long unsigned int)successful_transform_count_, @@ -595,6 +596,7 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi virtual CallResult call() { + boost::mutex::scoped_lock lock(filter_->cbqueue_mutex_); if (success_) { filter_->signalMessage(event_); @@ -668,7 +670,8 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi V_string target_frames_; ///< The frames we need to be able to transform to before a message is ready std::string target_frames_string_; boost::mutex target_frames_mutex_; ///< A mutex to protect access to the target_frames_ list and target_frames_string. - uint32_t queue_size_; ///< The maximum number of messages we queue up + boost::mutex cbqueue_mutex_; ///< A mutex protecting calls from callback queues + uint32_t queue_size_; ///< The maximum number of messages we queue up tf2::TransformableCallbackHandle callback_handle_; typedef std::vector V_TransformableRequestHandle; From 91d7601e6f9beccf2af4217d584c272e46b9fb0a Mon Sep 17 00:00:00 2001 From: Robert Haschke Date: Thu, 30 Jun 2022 23:58:32 +0200 Subject: [PATCH 4/9] Remove debug output --- tf2_ros/test/message_filter_test.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/tf2_ros/test/message_filter_test.cpp b/tf2_ros/test/message_filter_test.cpp index 879cc181e..4f4e6cbf4 100644 --- a/tf2_ros/test/message_filter_test.cpp +++ b/tf2_ros/test/message_filter_test.cpp @@ -158,7 +158,6 @@ class MessageFilterFixture : public ::testing::TestWithParam Sink(const std::string &name, int delay = 0) : name_(name), delay_(delay) {} void operator()(const boost::shared_ptr &msg) { - std::cerr << name_ << " "; std::this_thread::sleep_for(std::chrono::milliseconds(delay_)); } }; @@ -170,7 +169,6 @@ class MessageFilterFixture : public ::testing::TestWithParam const std::string frame_id("target"); while (ros::ok() && run) { - std::cerr << "\ngenerated: "; source.generate(frame_id, ros::Time::now()); rate.sleep(); } @@ -231,7 +229,6 @@ TEST_P(MessageFilterFixture, StressTest) while (!filters.empty()) { std::this_thread::sleep_for(std::chrono::milliseconds(7)); - std::cerr << "\033[31m" << filters.front().getName() << "\033[0m "; filters.pop_front(); } From 23e6295ebb14279892fd55b7141d6f50b7ed0f25 Mon Sep 17 00:00:00 2001 From: Robert Haschke Date: Fri, 1 Jul 2022 13:41:51 +0200 Subject: [PATCH 5/9] Revert #144: Solve a bug that causes a deadlock in MessageFilter PR #144 attempted to resolve a deadlock by postponing calls to callbacks. However, this has the drawback of using references to TransformableRequests - outside the protection by transformable_requests_mutex_. Thus, these requests might get deleted by another thread, resulting in segfaults: terminate called after throwing an instance of 'boost::wrapexcept' what(): boost: mutex lock failed in pthread_mutex_lock: Invalid argument This reverts commit bfb803831b8a1f25a0ae2d6f18332209ed5b024a. --- tf2/src/buffer_core.cpp | 21 +++------------------ tf2_ros/include/tf2_ros/message_filter.h | 13 +++++++++++-- 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/tf2/src/buffer_core.cpp b/tf2/src/buffer_core.cpp index e16a5174a..10fe977ba 100644 --- a/tf2/src/buffer_core.cpp +++ b/tf2/src/buffer_core.cpp @@ -37,7 +37,6 @@ #include #include #include "tf2/LinearMath/Transform.h" -#include namespace tf2 { @@ -1411,11 +1410,6 @@ void BufferCore::testTransformableRequests() { boost::mutex::scoped_lock lock(transformable_requests_mutex_); V_TransformableRequest::iterator it = transformable_requests_.begin(); - - typedef boost::tuple TransformableTuple; - std::vector transformables; - for (; it != transformable_requests_.end();) { TransformableRequest& req = *it; @@ -1455,12 +1449,8 @@ void BufferCore::testTransformableRequests() M_TransformableCallback::iterator it = transformable_callbacks_.find(req.cb_handle); if (it != transformable_callbacks_.end()) { - transformables.push_back(boost::make_tuple(boost::ref(it->second), - req.request_handle, - lookupFrameString(req.target_id), - lookupFrameString(req.source_id), - boost::ref(req.time), - boost::ref(result))); + const TransformableCallback& cb = it->second; + cb(req.request_handle, lookupFrameString(req.target_id), lookupFrameString(req.source_id), req.time, result); } } @@ -1477,14 +1467,9 @@ void BufferCore::testTransformableRequests() } } - // unlock before allowing possible user callbacks to avoid potential deadlock (#91) + // unlock before allowing possible user callbacks to avoid potential detadlock (#91) lock.unlock(); - BOOST_FOREACH (TransformableTuple tt, transformables) - { - tt.get<0>()(tt.get<1>(), tt.get<2>(), tt.get<3>(), tt.get<4>(), tt.get<5>()); - } - // Backwards compatability callback for tf _transforms_changed_(); } diff --git a/tf2_ros/include/tf2_ros/message_filter.h b/tf2_ros/include/tf2_ros/message_filter.h index f5b161036..834895352 100644 --- a/tf2_ros/include/tf2_ros/message_filter.h +++ b/tf2_ros/include/tf2_ros/message_filter.h @@ -372,10 +372,12 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi } else { - boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_); // If this message is about to push us past our queue size, erase the oldest message if (queue_size_ != 0 && message_count_ + 1 > queue_size_) { + // While we're using the reference keep a shared lock on the messages. + boost::shared_lock< boost::shared_mutex > shared_lock(messages_mutex_); + ++dropped_message_count_; const MessageInfo& front = messages_.front(); TF2_ROS_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_, @@ -383,19 +385,26 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi V_TransformableRequestHandle::const_iterator it = front.handles.begin(); V_TransformableRequestHandle::const_iterator end = front.handles.end(); - for (; it != end; ++it) { bc_.cancelTransformableRequest(*it); } messageDropped(front.event, filter_failure_reasons::Unknown); + // Unlock the shared lock and get a unique lock. Upgradeable lock is used in transformable. + // There can only be one upgrade lock. It's important the cancelTransformableRequest not deadlock with transformable. + // They both require the transformable_requests_mutex_ in BufferCore. + shared_lock.unlock(); + // There is a very slight race condition if an older message arrives in this gap. + boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_); messages_.pop_front(); --message_count_; } // Add the message to our list info.event = evt; + // Lock access to the messages_ before modifying them. + boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_); messages_.push_back(info); ++message_count_; } From 436caeee0e3592255c1944513cd2db97c033b08b Mon Sep 17 00:00:00 2001 From: Robert Haschke Date: Fri, 1 Jul 2022 14:25:57 +0200 Subject: [PATCH 6/9] Simplify #101 --- tf2/src/buffer_core.cpp | 2 +- tf2_ros/include/tf2_ros/message_filter.h | 21 ++++++--------------- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/tf2/src/buffer_core.cpp b/tf2/src/buffer_core.cpp index 10fe977ba..24a2708b8 100644 --- a/tf2/src/buffer_core.cpp +++ b/tf2/src/buffer_core.cpp @@ -1467,7 +1467,7 @@ void BufferCore::testTransformableRequests() } } - // unlock before allowing possible user callbacks to avoid potential detadlock (#91) + // unlock before allowing possible user callbacks to avoid potential deadlock (#91) lock.unlock(); // Backwards compatability callback for tf diff --git a/tf2_ros/include/tf2_ros/message_filter.h b/tf2_ros/include/tf2_ros/message_filter.h index 834895352..9905a0699 100644 --- a/tf2_ros/include/tf2_ros/message_filter.h +++ b/tf2_ros/include/tf2_ros/message_filter.h @@ -372,12 +372,10 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi } else { + boost::unique_lock unique_lock(messages_mutex_); // If this message is about to push us past our queue size, erase the oldest message if (queue_size_ != 0 && message_count_ + 1 > queue_size_) { - // While we're using the reference keep a shared lock on the messages. - boost::shared_lock< boost::shared_mutex > shared_lock(messages_mutex_); - ++dropped_message_count_; const MessageInfo& front = messages_.front(); TF2_ROS_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_, @@ -391,20 +389,13 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi } messageDropped(front.event, filter_failure_reasons::Unknown); - // Unlock the shared lock and get a unique lock. Upgradeable lock is used in transformable. - // There can only be one upgrade lock. It's important the cancelTransformableRequest not deadlock with transformable. - // They both require the transformable_requests_mutex_ in BufferCore. - shared_lock.unlock(); - // There is a very slight race condition if an older message arrives in this gap. - boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_); + messages_.pop_front(); - --message_count_; + --message_count_; } // Add the message to our list info.event = evt; - // Lock access to the messages_ before modifying them. - boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_); messages_.push_back(info); ++message_count_; } @@ -471,7 +462,7 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi { namespace mt = ros::message_traits; - boost::upgrade_lock< boost::shared_mutex > lock(messages_mutex_); + boost::upgrade_lock read_lock(messages_mutex_); // find the message this request is associated with typename L_MessageInfo::iterator msg_it = messages_.begin(); @@ -534,8 +525,6 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi can_transform = false; } - // We will be mutating messages now, require unique lock - boost::upgrade_to_unique_lock< boost::shared_mutex > uniqueLock(lock); if (can_transform) { TF2_ROS_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1); @@ -553,6 +542,8 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi messageDropped(info.event, filter_failure_reasons::Unknown); } + // We will be mutating messages now, require unique lock + boost::upgrade_to_unique_lock write_lock(read_lock); messages_.erase(msg_it); --message_count_; } From 9fdf5474907bf1b9d0983ebdeb8b25c93a487960 Mon Sep 17 00:00:00 2001 From: Robert Haschke Date: Fri, 1 Jul 2022 14:44:15 +0200 Subject: [PATCH 7/9] Alternative implementation to resolve deadlock #91/#144 As pointed out in #144 the deadlock arises due to an inversion of locking order: - MessageFilter::add(), when removing the oldest message, locks: - MessageFilter::messages_mutex_ - BufferCore::transformable_requests_mutex_ (via cancelTransformableRequest()) - BufferCore::testTransformableRequests() locks: - transformable_requests_mutex_ - MessageFilter::message_mutex_ (via MessageFilter::transformable() callback) PR #144 attempted to resolve the issue by postponing callback calls. However, this has the drawback of using references to TransformableRequests - outside the protection by transformable_requests_mutex_. These requests might got deleted by another thread meanwhile! Here the deadlock is resolved in MessageFilter::add, cancelling the requests outside the messages_mutex_ protection. --- tf2_ros/include/tf2_ros/message_filter.h | 31 ++++++++++++------------ 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/tf2_ros/include/tf2_ros/message_filter.h b/tf2_ros/include/tf2_ros/message_filter.h index 9905a0699..6e3b31989 100644 --- a/tf2_ros/include/tf2_ros/message_filter.h +++ b/tf2_ros/include/tf2_ros/message_filter.h @@ -364,6 +364,7 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi } } + L_MessageInfo msgs_to_drop; // We can transform already if (info.success_count == expected_success_count_) @@ -376,21 +377,8 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi // If this message is about to push us past our queue size, erase the oldest message if (queue_size_ != 0 && message_count_ + 1 > queue_size_) { - ++dropped_message_count_; - const MessageInfo& front = messages_.front(); - TF2_ROS_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_, - (mt::FrameId::value(*front.event.getMessage())).c_str(), mt::TimeStamp::value(*front.event.getMessage()).toSec()); - - V_TransformableRequestHandle::const_iterator it = front.handles.begin(); - V_TransformableRequestHandle::const_iterator end = front.handles.end(); - for (; it != end; ++it) - { - bc_.cancelTransformableRequest(*it); - } - - messageDropped(front.event, filter_failure_reasons::Unknown); - - messages_.pop_front(); + // move front element from messages_ to msgs_to_drop for later dropping + msgs_to_drop.splice(msgs_to_drop.begin(), messages_, messages_.begin()); --message_count_; } @@ -400,6 +388,19 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi ++message_count_; } + // Delay dropping of messages until we released messages_mutex_ to avoid deadlocks (#91, #101, #144) + for (const MessageInfo &msg : msgs_to_drop) + { + ++dropped_message_count_; + TF2_ROS_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_, + (mt::FrameId::value(*msg.event.getMessage())).c_str(), mt::TimeStamp::value(*msg.event.getMessage()).toSec()); + + for (const auto req : msg.handles) + bc_.cancelTransformableRequest(req); + + messageDropped(msg.event, filter_failure_reasons::Unknown); + } + TF2_ROS_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_); ++incoming_message_count_; From 4a052726ae4e5a80e203062fa466c54c1ed663ed Mon Sep 17 00:00:00 2001 From: Robert Haschke Date: Fri, 1 Jul 2022 17:11:47 +0200 Subject: [PATCH 8/9] Resolve additional deadlock between MessageFilter and BufferCore --- tf2_ros/include/tf2_ros/message_filter.h | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tf2_ros/include/tf2_ros/message_filter.h b/tf2_ros/include/tf2_ros/message_filter.h index 6e3b31989..c4fcfba0a 100644 --- a/tf2_ros/include/tf2_ros/message_filter.h +++ b/tf2_ros/include/tf2_ros/message_filter.h @@ -278,13 +278,11 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi */ void clear() { - boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_); - - TF2_ROS_MESSAGEFILTER_DEBUG("%s", "Cleared"); - bc_.removeTransformableCallback(callback_handle_); callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4, boost::placeholders::_5)); + // acquire after remove/addTransformableCallback to avoid deadlock! + boost::unique_lock unique_lock(messages_mutex_); messages_.clear(); message_count_ = 0; @@ -293,6 +291,7 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi callback_queue_->removeByID((uint64_t)this); warned_about_empty_frame_id_ = false; + TF2_ROS_MESSAGEFILTER_DEBUG("%s", "Cleared"); } void add(const MEvent& evt) From e469abf54e9c3ca9062779f15a090fb1a58d67c0 Mon Sep 17 00:00:00 2001 From: Robert Haschke Date: Fri, 1 Jul 2022 21:10:15 +0200 Subject: [PATCH 9/9] Reduce probability of race condition with CBQueueCallback While MessageFilter::clear() removes pending messages from the callback queue, there might be still callbacks active when attempting to remove the MessageFilter. Depending on what goes first, either the already destroyed mutex is tried to lock or a locked mutex is tried to destroy, both resulting in an assertion failure. This race condition cannot completely avoided. Using a shared mutex for all callers and moving the unique lock to the end of the destructor hopefully reduces the probability of such a race condition. --- tf2_ros/include/tf2_ros/message_filter.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tf2_ros/include/tf2_ros/message_filter.h b/tf2_ros/include/tf2_ros/message_filter.h index c4fcfba0a..98c350dca 100644 --- a/tf2_ros/include/tf2_ros/message_filter.h +++ b/tf2_ros/include/tf2_ros/message_filter.h @@ -217,13 +217,13 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi message_connection_.disconnect(); MessageFilter::clear(); bc_.removeTransformableCallback(callback_handle_); - boost::mutex::scoped_lock lock(cbqueue_mutex_); // ensure that no callback queue call is active TF2_ROS_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu", (long long unsigned int)successful_transform_count_, (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_, (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_); + boost::unique_lock lock(cbqueue_mutex_); // ensure that no more callback queue calls are active } /** @@ -596,7 +596,7 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi virtual CallResult call() { - boost::mutex::scoped_lock lock(filter_->cbqueue_mutex_); + boost::shared_lock lock(filter_->cbqueue_mutex_); if (success_) { filter_->signalMessage(event_); @@ -670,7 +670,7 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi V_string target_frames_; ///< The frames we need to be able to transform to before a message is ready std::string target_frames_string_; boost::mutex target_frames_mutex_; ///< A mutex to protect access to the target_frames_ list and target_frames_string. - boost::mutex cbqueue_mutex_; ///< A mutex protecting calls from callback queues + boost::shared_mutex cbqueue_mutex_; ///< A mutex protecting calls from callback queues uint32_t queue_size_; ///< The maximum number of messages we queue up tf2::TransformableCallbackHandle callback_handle_;