summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp14
1 files changed, 10 insertions, 4 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
index 6477696bd6..cc24500800 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -43,7 +43,8 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
m_perfTestParams(perfTestParams),
m_store(store),
m_resultQueue(arq),
- m_queue(queue)
+ m_queue(queue),
+ m_stopFlag(false)
{}
MessageConsumer::~MessageConsumer() {}
@@ -56,6 +57,11 @@ MessageConsumer::record(boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> dr
void
MessageConsumer::commitComplete() {}
+void
+MessageConsumer::stop() {
+ m_stopFlag = true;
+}
+
void*
MessageConsumer::runConsumers() {
const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U;
@@ -68,7 +74,7 @@ MessageConsumer::runConsumers() {
uint32_t msgsPerConsumer = m_perfTestParams.m_numEnqThreadsPerQueue * m_perfTestParams.m_numMsgs /
m_perfTestParams.m_numDeqThreadsPerQueue;
uint32_t numMsgs = 0UL;
- while (numMsgs < msgsPerConsumer) {
+ while (numMsgs < msgsPerConsumer && !m_stopFlag) {
if (m_queue->dispatch(*this)) {
++numMsgs;
if (useTxns) {
@@ -78,7 +84,7 @@ MessageConsumer::runConsumers() {
tb->enlist(ta);
if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) {
tb->commitLocal(m_store);
- if (numMsgs < m_perfTestParams.m_numMsgs) {
+ if (numMsgs < msgsPerConsumer) {
tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue);
}
opsInTxnCnt = 0U;
@@ -95,7 +101,7 @@ MessageConsumer::runConsumers() {
}
}
- if (opsInTxnCnt) {
+ if (opsInTxnCnt && !m_stopFlag) {
tb->commitLocal(m_store);
}