From d2f7c788bd3cb2910b8ead1da6643b30367d0569 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 12 Oct 2012 18:38:53 +0000 Subject: QPID-4369: HA backup brokers core dump in benchmark test. Was seeing core dumps with QueueReplicator::queue == 0. Caused by race conditions when calling QueueReplicator::deactivate. Renamed deactivate to destroy and call it only when the broker::Queue is destroyed. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1397676 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/ha/BrokerReplicator.cpp | 11 +---------- cpp/src/qpid/ha/QueueReplicator.cpp | 33 +++++++++++++++++++++++---------- cpp/src/qpid/ha/QueueReplicator.h | 2 +- cpp/src/qpid/ha/StatusCheck.cpp | 2 +- cpp/src/tests/qpid-cluster-benchmark | 2 -- 5 files changed, 26 insertions(+), 24 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index c9b9664821..48d5b71134 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -277,14 +277,7 @@ void collectQueueReplicators( } } // namespace -void BrokerReplicator::shutdown() { - QPID_LOG(debug, logPrefix << "BrokerReplicator shutting down."); - set > collect; - broker.getExchanges().eachExchange( - boost::bind(&collectQueueReplicators, _1, boost::ref(collect))); - for_each(collect.begin(), collect.end(), - boost::bind(&QueueReplicator::deactivate, _1)); -} +void BrokerReplicator::shutdown() {} // This is called in the connection IO thread when the bridge is started. void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { @@ -672,8 +665,6 @@ boost::shared_ptr BrokerReplicator::startQueueReplicator( } void BrokerReplicator::deleteQueue(const std::string& name, bool purge) { - boost::shared_ptr qr(findQueueReplicator(name)); - if (qr) qr->deactivate(); Queue::shared_ptr queue = broker.getQueues().find(name); if (queue) { // Purge before deleting to ensure that we don't reroute any diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index 5b9993bd90..6d30a5c10c 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -47,6 +47,7 @@ namespace ha { using namespace broker; using namespace framing; using namespace std; +using sys::Mutex; const std::string QPID_HA_EVENT_PREFIX("qpid.ha-"); const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue"); @@ -94,7 +95,8 @@ class QueueReplicator::QueueObserver : public broker::QueueObserver { void requeued(const Message&) {} void consumerAdded( const Consumer& ) {} void consumerRemoved( const Consumer& ) {} - void destroy() { queueReplicator->deactivate(); } + // Queue observer is destroyed when the queue is. + void destroy() { queueReplicator->destroy(); } private: boost::shared_ptr queueReplicator; }; @@ -115,7 +117,8 @@ QueueReplicator::QueueReplicator(HaBroker& hb, // This must be separate from the constructor so we can call shared_from_this. void QueueReplicator::activate() { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed std::pair result = queue->getBroker()->getLinks().declare( bridgeName, @@ -141,12 +144,14 @@ void QueueReplicator::activate() { queue->addObserver(observer); } -QueueReplicator::~QueueReplicator() { deactivate(); } +QueueReplicator::~QueueReplicator() {} -void QueueReplicator::deactivate() { - QPID_LOG(debug, logPrefix << "Deactivated"); - sys::Mutex::ScopedLock l(lock); - if (bridge) bridge->close(); +void QueueReplicator::destroy() { + // Called from Queue::destroyed() + Mutex::ScopedLock l(lock); + if (!bridge) return; + QPID_LOG(debug, logPrefix << "Destroyed."); + bridge->close(); // Need to drop shared pointers to avoid pointer cycles keeping this in memory. queue.reset(); link.reset(); @@ -156,7 +161,8 @@ void QueueReplicator::deactivate() { // Called in a broker connection thread when the bridge is created. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); FieldTable settings; @@ -197,7 +203,13 @@ template T decodeContent(Message& m) { } } -void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) { +void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) { + boost::shared_ptr q; + { + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed + q = queue; + } // Thread safe: only calls thread safe Queue functions. queue->dequeueMessageAt(n); } @@ -218,7 +230,8 @@ void QueueReplicator::route(Deliverable& msg) { try { const std::string& key = msg.getMessage().getRoutingKey(); - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed if (!isEventKey(key)) { msg.deliverTo(queue); // We are on a backup so the queue is not modified except via this. diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h index b302162286..5fdc022cb1 100644 --- a/cpp/src/qpid/ha/QueueReplicator.h +++ b/cpp/src/qpid/ha/QueueReplicator.h @@ -70,7 +70,6 @@ class QueueReplicator : public broker::Exchange, ~QueueReplicator(); void activate(); // Call after ctor - void deactivate(); // Call before dtor std::string getType() const; bool bind(boost::shared_ptr