diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-06-19 12:57:15 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-06-19 12:57:15 +0000 |
| commit | 5083cef28cd7d1f594a7632ffec109567f5a3b2b (patch) | |
| tree | 3b1e3d822339fffed4bf1e9f010e2a2b5a20f3b6 /cpp/src/qpid/asyncStore | |
| parent | 58337ca40df3a57a16cdee9b7f6b4fe0361b0018 (diff) | |
| download | qpid-python-5083cef28cd7d1f594a7632ffec109567f5a3b2b.tar.gz | |
QPID-3858: WIP: Solved race conditions affecting transactional publishing.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1351689 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/asyncStore')
| -rw-r--r-- | cpp/src/qpid/asyncStore/OperationQueue.cpp | 22 | ||||
| -rw-r--r-- | cpp/src/qpid/asyncStore/TxnHandleImpl.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/asyncStore/TxnHandleImpl.h | 6 |
3 files changed, 19 insertions, 11 deletions
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index 4eabf82004..0b7f58bd6c 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -50,17 +50,23 @@ OperationQueue::submit(boost::shared_ptr<const AsyncOperation> op) OperationQueue::OpQueue::Batch::const_iterator OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) { - for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { + try { + for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { //std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; - boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext(); - if (bc) { - qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue(); - if (arq) { - qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc); - boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi)); - arq->submit(arh); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext(); + if (bc) { + qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue(); + if (arq) { + qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc); + boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi)); + arq->submit(arh); + } } } + } catch (const std::exception& e) { + std::cerr << "qpid::asyncStore::OperationQueue: Exception thrown processing async op: " << e.what() << std::endl; + } catch (...) { + std::cerr << "qpid::asyncStore::OperationQueue: Unknown exception thrown processing async op" << std::endl; } return e.end(); } diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp index 945b50861d..af3a8f01cf 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp @@ -89,12 +89,14 @@ TxnHandleImpl::is2pc() const void TxnHandleImpl::incrOpCnt() { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_asyncOpCntMutex); ++m_asyncOpCnt; } void TxnHandleImpl::decrOpCnt() { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_asyncOpCntMutex); if (m_asyncOpCnt == 0UL) { throw qpid::Exception("Transaction async operation count underflow"); } diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h index e357791508..9452044d66 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h @@ -24,9 +24,8 @@ #ifndef qpid_asyncStore_TxnHandleImpl_h_ #define qpid_asyncStore_TxnHandleImpl_h_ -#include "AtomicCounter.h" - #include "qpid/RefCounted.h" +#include "qpid/sys/Mutex.h" #include <stdint.h> // uint32_t #include <string> @@ -59,7 +58,8 @@ public: private: std::string m_xid; bool m_tpcFlag; - AsyncOpCounter m_asyncOpCnt; + uint32_t m_asyncOpCnt; + qpid::sys::Mutex m_asyncOpCntMutex; qpid::broker::TxnBuffer* const m_txnBuffer; void createLocalXid(); |
