diff options
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp')
| -rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 6477696bd6..cc24500800 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -43,7 +43,8 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, m_perfTestParams(perfTestParams), m_store(store), m_resultQueue(arq), - m_queue(queue) + m_queue(queue), + m_stopFlag(false) {} MessageConsumer::~MessageConsumer() {} @@ -56,6 +57,11 @@ MessageConsumer::record(boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> dr void MessageConsumer::commitComplete() {} +void +MessageConsumer::stop() { + m_stopFlag = true; +} + void* MessageConsumer::runConsumers() { const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U; @@ -68,7 +74,7 @@ MessageConsumer::runConsumers() { uint32_t msgsPerConsumer = m_perfTestParams.m_numEnqThreadsPerQueue * m_perfTestParams.m_numMsgs / m_perfTestParams.m_numDeqThreadsPerQueue; uint32_t numMsgs = 0UL; - while (numMsgs < msgsPerConsumer) { + while (numMsgs < msgsPerConsumer && !m_stopFlag) { if (m_queue->dispatch(*this)) { ++numMsgs; if (useTxns) { @@ -78,7 +84,7 @@ MessageConsumer::runConsumers() { tb->enlist(ta); if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) { tb->commitLocal(m_store); - if (numMsgs < m_perfTestParams.m_numMsgs) { + if (numMsgs < msgsPerConsumer) { tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue); } opsInTxnCnt = 0U; @@ -95,7 +101,7 @@ MessageConsumer::runConsumers() { } } - if (opsInTxnCnt) { + if (opsInTxnCnt && !m_stopFlag) { tb->commitLocal(m_store); } |
