diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-07-31 13:35:53 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-07-31 13:35:53 +0000 |
| commit | 63c6598f401ac6406e5a31c602c7892b798536fc (patch) | |
| tree | 73b3c1a519ada213c9e117244aab99d2e64d4f2a /cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp | |
| parent | b435b07eb8fa9db484f85b39daaf43642dd623ca (diff) | |
| download | qpid-python-63c6598f401ac6406e5a31c602c7892b798536fc.tar.gz | |
QPID-3858: WIP: Durable transactions fixed
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1367535 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp')
| -rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp | 37 |
1 files changed, 12 insertions, 25 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 4a2bc2bf0c..6aa477c470 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -47,22 +47,18 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, m_queue(queue) {} -MessageConsumer::~MessageConsumer() -{} +MessageConsumer::~MessageConsumer() {} void -MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr) -{ +MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr) { m_unacked.push_back(dr); } void -MessageConsumer::commitComplete() -{} +MessageConsumer::commitComplete() {} void* -MessageConsumer::runConsumers() -{ +MessageConsumer::runConsumers() { const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U; uint16_t opsInTxnCnt = 0U; qpid::broker::TxnBuffer* tb = 0; @@ -78,17 +74,13 @@ MessageConsumer::runConsumers() ++numMsgs; if (useTxns) { // --- Transactional dequeue --- + boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked)); + m_unacked.clear(); + tb->enlist(ta); if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) { - if (m_perfTestParams.m_durable) { - boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked)); - m_unacked.clear(); - tb->enlist(ta); - tb->commitLocal(m_store); - if (numMsgs < m_perfTestParams.m_numMsgs) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); - } - } else { - tb->commit(); + tb->commitLocal(m_store); + if (numMsgs < m_perfTestParams.m_numMsgs) { + tb = new qpid::broker::TxnBuffer(m_resultQueue); } opsInTxnCnt = 0U; } @@ -105,11 +97,7 @@ MessageConsumer::runConsumers() } if (opsInTxnCnt) { - if (m_perfTestParams.m_durable) { - tb->commitLocal(m_store); - } else { - tb->commit(); - } + tb->commitLocal(m_store); } return reinterpret_cast<void*>(0); @@ -117,8 +105,7 @@ MessageConsumer::runConsumers() //static void* -MessageConsumer::startConsumers(void* ptr) -{ +MessageConsumer::startConsumers(void* ptr) { return reinterpret_cast<MessageConsumer*>(ptr)->runConsumers(); } |
