From 5083cef28cd7d1f594a7632ffec109567f5a3b2b Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Tue, 19 Jun 2012 12:57:15 +0000 Subject: QPID-3858: WIP: Solved race conditions affecting transactional publishing. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1351689 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/asyncStore/OperationQueue.cpp | 22 ++++++++++++++-------- cpp/src/qpid/asyncStore/TxnHandleImpl.cpp | 2 ++ cpp/src/qpid/asyncStore/TxnHandleImpl.h | 6 +++--- 3 files changed, 19 insertions(+), 11 deletions(-) (limited to 'cpp/src/qpid/asyncStore') diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index 4eabf82004..0b7f58bd6c 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -50,17 +50,23 @@ OperationQueue::submit(boost::shared_ptr op) OperationQueue::OpQueue::Batch::const_iterator OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) { - for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { + try { + for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { //std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; - boost::shared_ptr bc = (*i)->getBrokerContext(); - if (bc) { - qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue(); - if (arq) { - qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc); - boost::shared_ptr arh(new qpid::broker::AsyncResultHandle(arhi)); - arq->submit(arh); + boost::shared_ptr bc = (*i)->getBrokerContext(); + if (bc) { + qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue(); + if (arq) { + qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc); + boost::shared_ptr arh(new qpid::broker::AsyncResultHandle(arhi)); + arq->submit(arh); + } } } + } catch (const std::exception& e) { + std::cerr << "qpid::asyncStore::OperationQueue: Exception thrown processing async op: " << e.what() << std::endl; + } catch (...) { + std::cerr << "qpid::asyncStore::OperationQueue: Unknown exception thrown processing async op" << std::endl; } return e.end(); } diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp index 945b50861d..af3a8f01cf 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp @@ -89,12 +89,14 @@ TxnHandleImpl::is2pc() const void TxnHandleImpl::incrOpCnt() { + qpid::sys::ScopedLock l(m_asyncOpCntMutex); ++m_asyncOpCnt; } void TxnHandleImpl::decrOpCnt() { + qpid::sys::ScopedLock l(m_asyncOpCntMutex); if (m_asyncOpCnt == 0UL) { throw qpid::Exception("Transaction async operation count underflow"); } diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h index e357791508..9452044d66 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h @@ -24,9 +24,8 @@ #ifndef qpid_asyncStore_TxnHandleImpl_h_ #define qpid_asyncStore_TxnHandleImpl_h_ -#include "AtomicCounter.h" - #include "qpid/RefCounted.h" +#include "qpid/sys/Mutex.h" #include // uint32_t #include @@ -59,7 +58,8 @@ public: private: std::string m_xid; bool m_tpcFlag; - AsyncOpCounter m_asyncOpCnt; + uint32_t m_asyncOpCnt; + qpid::sys::Mutex m_asyncOpCntMutex; qpid::broker::TxnBuffer* const m_txnBuffer; void createLocalXid(); -- cgit v1.2.1