From c3c3bc4016270dd75b4c6a1e6831408cd4a5d055 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 22 Jun 2016 20:41:43 +0000 Subject: QPID-7306: Memory management error in Link/Bridge qpid::broker Link and Bridge use Connection::requestIOProcessing() to register callbacks in the connection thread. They were binding a plain "this" pointer to the callback, but the classes are managed by boost::shared_ptr so if all the shared_ptr were released, the callback could happen on a dangling pointer. This fix uses boost::weak_ptr in the callbacks, so if all shared_ptr instances are released, we don't use the dead pointer. Link::destroy cannot be skipped, so use a shared_ptr for that. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1749780 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Bridge.cpp | 15 +++++++++++---- qpid/cpp/src/qpid/broker/Link.cpp | 14 +++++++++----- qpid/cpp/src/qpid/broker/Link.h | 5 ++++- qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h | 20 +++++++++++++++++++- 4 files changed, 43 insertions(+), 11 deletions(-) (limited to 'qpid/cpp') diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 06d3a0dd52..d6cd3e20e7 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -381,8 +381,11 @@ void Bridge::propagateBinding(const string& key, const string& tagList, else bindArgs.setString(qpidFedOrigin, origin); - conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this, - queueName, args.i_src, key, bindArgs)); + conn->requestIOProcessing( + weakCallback( + boost::bind(&Bridge::ioThreadPropagateBinding, _1, + queueName, args.i_src, key, bindArgs), + this)); } } @@ -393,9 +396,13 @@ void Bridge::sendReorigin() bindArgs.setString(qpidFedOp, fedOpReorigin); bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag()); - conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this, - queueName, args.i_src, args.i_key, bindArgs)); + conn->requestIOProcessing( + weakCallback( + boost::bind(&Bridge::ioThreadPropagateBinding, _1, + queueName, args.i_src, args.i_key, bindArgs), + this)); } + bool Bridge::resetProxy() { SessionHandler& sessionHandler = conn->getChannel(channel); diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 9b85917251..037d1e16c6 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -250,7 +250,8 @@ void Link::established(qpid::broker::amqp_0_10::Connection* c) currentInterval = 1; visitCount = 0; connection = c; - c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + c->requestIOProcessing ( + weakCallback(boost::bind(&Link::ioThreadProcessing, _1), this)); } } if (isClosing) @@ -416,7 +417,8 @@ void Link::add(Bridge::shared_ptr bridge) Mutex::ScopedLock mutex(lock); created.push_back (bridge); if (connection) - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + connection->requestIOProcessing ( + weakCallback(boost::bind(&Link::ioThreadProcessing, _1), this)); } @@ -443,7 +445,8 @@ void Link::cancel(Bridge::shared_ptr bridge) needIOProcessing = !cancellations.empty(); } if (needIOProcessing && connection) - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + connection->requestIOProcessing ( + weakCallback(boost::bind(&Link::ioThreadProcessing, _1), this)); } void Link::ioThreadProcessing() @@ -507,7 +510,8 @@ void Link::maintenanceVisit () case STATE_OPERATIONAL: if ((!active.empty() || !created.empty() || !cancellations.empty()) && connection && connection->isOpen()) - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + connection->requestIOProcessing ( + weakCallback(boost::bind(&Link::ioThreadProcessing, _1), this)); break; default: // no-op for all other states @@ -691,7 +695,7 @@ void Link::close() { setStateLH(STATE_CLOSING); if (connection) { //connection can only be closed on the connections own IO processing thread - connection->requestIOProcessing(boost::bind(&Link::destroy, this)); + connection->requestIOProcessing(boost::bind(&Link::destroy, shared_from_this())); } else if (old_state == STATE_CONNECTING) { // cannot destroy Link now since a connection request is outstanding. // destroy the link after we get a response (see Link::established, diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index ceee0186f3..c60632025c 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -23,6 +23,7 @@ */ #include +#include #include "qpid/Url.h" #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/PersistableConfig.h" @@ -50,7 +51,9 @@ namespace amqp_0_10 { class Connection; } -class Link : public PersistableConfig, public management::Manageable { +class Link : public PersistableConfig, public management::Manageable, + public boost::enable_shared_from_this +{ private: mutable sys::Mutex lock; const std::string name; diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h index 1bf6b2cee3..8fff5206ea 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h @@ -226,7 +226,25 @@ friend class OutboundFrameTracker; qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; } }; +} + +// See weakCallback below. +template void callIfValid(boost::function1 f, boost::weak_ptr wp) { + boost::shared_ptr sp = wp.lock(); + if (sp) f(sp.get()); +} + +// Memory safety helper for requestIOProcessing with boost::shared_ptr. +// +// Makes a function that calls f(p) only if p is still valid. The returned +// function is bound to a weak_ptr, and only calls f(p) if the weak pointer is +// still valid. Note this does not prevent the object being deleted before the +// IO callback, instead it skips the callback if the object is already deleted. +template +boost::function0 weakCallback(boost::function1 f, T* p) { + return boost::bind(&callIfValid, f, p->shared_from_this()); +} -}}} +}} #endif /*!QPID_BROKER_AMQP_0_10_CONNECTION_H*/ -- cgit v1.2.1