summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-07-31 13:35:53 +0000
committerKim van der Riet <kpvdr@apache.org>2012-07-31 13:35:53 +0000
commit63c6598f401ac6406e5a31c602c7892b798536fc (patch)
tree73b3c1a519ada213c9e117244aab99d2e64d4f2a /cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
parentb435b07eb8fa9db484f85b39daaf43642dd623ca (diff)
downloadqpid-python-63c6598f401ac6406e5a31c602c7892b798536fc.tar.gz
QPID-3858: WIP: Durable transactions fixed
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1367535 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp37
1 files changed, 12 insertions, 25 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
index 4a2bc2bf0c..6aa477c470 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -47,22 +47,18 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
m_queue(queue)
{}
-MessageConsumer::~MessageConsumer()
-{}
+MessageConsumer::~MessageConsumer() {}
void
-MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr)
-{
+MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr) {
m_unacked.push_back(dr);
}
void
-MessageConsumer::commitComplete()
-{}
+MessageConsumer::commitComplete() {}
void*
-MessageConsumer::runConsumers()
-{
+MessageConsumer::runConsumers() {
const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U;
uint16_t opsInTxnCnt = 0U;
qpid::broker::TxnBuffer* tb = 0;
@@ -78,17 +74,13 @@ MessageConsumer::runConsumers()
++numMsgs;
if (useTxns) {
// --- Transactional dequeue ---
+ boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked));
+ m_unacked.clear();
+ tb->enlist(ta);
if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) {
- if (m_perfTestParams.m_durable) {
- boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked));
- m_unacked.clear();
- tb->enlist(ta);
- tb->commitLocal(m_store);
- if (numMsgs < m_perfTestParams.m_numMsgs) {
- tb = new qpid::broker::TxnBuffer(m_resultQueue);
- }
- } else {
- tb->commit();
+ tb->commitLocal(m_store);
+ if (numMsgs < m_perfTestParams.m_numMsgs) {
+ tb = new qpid::broker::TxnBuffer(m_resultQueue);
}
opsInTxnCnt = 0U;
}
@@ -105,11 +97,7 @@ MessageConsumer::runConsumers()
}
if (opsInTxnCnt) {
- if (m_perfTestParams.m_durable) {
- tb->commitLocal(m_store);
- } else {
- tb->commit();
- }
+ tb->commitLocal(m_store);
}
return reinterpret_cast<void*>(0);
@@ -117,8 +105,7 @@ MessageConsumer::runConsumers()
//static
void*
-MessageConsumer::startConsumers(void* ptr)
-{
+MessageConsumer::startConsumers(void* ptr) {
return reinterpret_cast<MessageConsumer*>(ptr)->runConsumers();
}