summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/asyncStore
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-06-19 12:57:15 +0000
committerKim van der Riet <kpvdr@apache.org>2012-06-19 12:57:15 +0000
commit5083cef28cd7d1f594a7632ffec109567f5a3b2b (patch)
tree3b1e3d822339fffed4bf1e9f010e2a2b5a20f3b6 /cpp/src/qpid/asyncStore
parent58337ca40df3a57a16cdee9b7f6b4fe0361b0018 (diff)
downloadqpid-python-5083cef28cd7d1f594a7632ffec109567f5a3b2b.tar.gz
QPID-3858: WIP: Solved race conditions affecting transactional publishing.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1351689 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/asyncStore')
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.cpp22
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.cpp2
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.h6
3 files changed, 19 insertions, 11 deletions
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp
index 4eabf82004..0b7f58bd6c 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.cpp
+++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp
@@ -50,17 +50,23 @@ OperationQueue::submit(boost::shared_ptr<const AsyncOperation> op)
OperationQueue::OpQueue::Batch::const_iterator
OperationQueue::handle(const OperationQueue::OpQueue::Batch& e)
{
- for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
+ try {
+ for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext();
- if (bc) {
- qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue();
- if (arq) {
- qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc);
- boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi));
- arq->submit(arh);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext();
+ if (bc) {
+ qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue();
+ if (arq) {
+ qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc);
+ boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi));
+ arq->submit(arh);
+ }
}
}
+ } catch (const std::exception& e) {
+ std::cerr << "qpid::asyncStore::OperationQueue: Exception thrown processing async op: " << e.what() << std::endl;
+ } catch (...) {
+ std::cerr << "qpid::asyncStore::OperationQueue: Unknown exception thrown processing async op" << std::endl;
}
return e.end();
}
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
index 945b50861d..af3a8f01cf 100644
--- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
@@ -89,12 +89,14 @@ TxnHandleImpl::is2pc() const
void
TxnHandleImpl::incrOpCnt()
{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_asyncOpCntMutex);
++m_asyncOpCnt;
}
void
TxnHandleImpl::decrOpCnt()
{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_asyncOpCntMutex);
if (m_asyncOpCnt == 0UL) {
throw qpid::Exception("Transaction async operation count underflow");
}
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h
index e357791508..9452044d66 100644
--- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h
@@ -24,9 +24,8 @@
#ifndef qpid_asyncStore_TxnHandleImpl_h_
#define qpid_asyncStore_TxnHandleImpl_h_
-#include "AtomicCounter.h"
-
#include "qpid/RefCounted.h"
+#include "qpid/sys/Mutex.h"
#include <stdint.h> // uint32_t
#include <string>
@@ -59,7 +58,8 @@ public:
private:
std::string m_xid;
bool m_tpcFlag;
- AsyncOpCounter m_asyncOpCnt;
+ uint32_t m_asyncOpCnt;
+ qpid::sys::Mutex m_asyncOpCntMutex;
qpid::broker::TxnBuffer* const m_txnBuffer;
void createLocalXid();