From 40e74eaa3f8a345e7bc888e36de79717b7c761d0 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 19 Dec 2014 03:18:57 +0000 Subject: QPID-6278: HA broker abort in TXN soak test The crash appears to be a race condition in async completion exposed by the HA TX code code as follows: 1. Message received and placed on tx-replication queue, completion delayed till backups ack. Completion count goes up for each backup then down as each backup acks. 2. Prepare received, message placed on primary's local persistent queue. Completion count goes up one then down one for local store completion (null store in this case). The race is something like this: - last backup ack arrives (on backup IO thread) and drops completion count to 0. - prepare arrives (on client thread) null store bumps count to 1 and immediately drops to 0. - both threads try to invoke the completion callback, one deletes it while the other is still invoking. The old completion logic assumed that only one thread can see the atomic counter go to 0. It does not handle the count going to 0 in one thread and concurrently being increased and decreased back to 0 in another. This case is introduced by HA transactions because the same message is put onto a tx-replication queue and then put again onto another persistent local queue, so there are two cycles of completion. The new logic fixes this only one call to completion callback is possible in all cases. Also fixed missing lock in ha/Primary.cpp. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1646618 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/AsyncCompletion.h | 5 +++-- qpid/cpp/src/qpid/ha/Primary.cpp | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h index 1ab69f32d3..cb5d58977b 100644 --- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h +++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h @@ -111,13 +111,14 @@ class AsyncCompletion : public virtual RefCounted qpid::sys::Mutex::ScopedLock l(callbackLock); if (active) { if (callback.get()) { + boost::intrusive_ptr save = callback; + callback = boost::intrusive_ptr(); // Nobody else can run callback. inCallback = true; { qpid::sys::Mutex::ScopedUnlock ul(callbackLock); - callback->completed(sync); + save->completed(sync); } inCallback = false; - callback = boost::intrusive_ptr(); callbackLock.notifyAll(); } active = false; diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 0e87346ac1..870e4723b2 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -482,6 +482,7 @@ shared_ptr Primary::makeTxObserver( { shared_ptr observer = PrimaryTxObserver::create(*this, haBroker, txBuffer); + sys::Mutex::ScopedLock l(lock); txMap[observer->getTxQueue()->getName()] = observer; return observer; } -- cgit v1.2.1