summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp25
1 files changed, 12 insertions, 13 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
index 6aa477c470..6477696bd6 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -23,13 +23,12 @@
#include "MessageConsumer.h"
-#include "DeliveryRecord.h"
-#include "SimpleQueue.h"
#include "TestOptions.h"
-#include "TxnAccept.h"
-#include "qpid/asyncStore/AsyncStoreImpl.h"
-#include "qpid/broker/TxnBuffer.h"
+#include "qpid/broker/SimpleDeliveryRecord.h"
+#include "qpid/broker/SimpleQueue.h"
+#include "qpid/broker/SimpleTxnAccept.h"
+#include "qpid/broker/SimpleTxnBuffer.h"
#include <stdint.h> // uint32_t
@@ -38,9 +37,9 @@ namespace storePerftools {
namespace asyncPerf {
MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
- qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncStore* store,
qpid::broker::AsyncResultQueue& arq,
- boost::shared_ptr<SimpleQueue> queue) :
+ boost::shared_ptr<qpid::broker::SimpleQueue> queue) :
m_perfTestParams(perfTestParams),
m_store(store),
m_resultQueue(arq),
@@ -50,7 +49,7 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
MessageConsumer::~MessageConsumer() {}
void
-MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr) {
+MessageConsumer::record(boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> dr) {
m_unacked.push_back(dr);
}
@@ -61,9 +60,9 @@ void*
MessageConsumer::runConsumers() {
const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U;
uint16_t opsInTxnCnt = 0U;
- qpid::broker::TxnBuffer* tb = 0;
+ qpid::broker::SimpleTxnBuffer* tb = 0;
if (useTxns) {
- tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue);
}
uint32_t msgsPerConsumer = m_perfTestParams.m_numEnqThreadsPerQueue * m_perfTestParams.m_numMsgs /
@@ -74,19 +73,19 @@ MessageConsumer::runConsumers() {
++numMsgs;
if (useTxns) {
// --- Transactional dequeue ---
- boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked));
+ boost::shared_ptr<qpid::broker::SimpleTxnAccept> ta(new qpid::broker::SimpleTxnAccept(m_unacked));
m_unacked.clear();
tb->enlist(ta);
if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) {
tb->commitLocal(m_store);
if (numMsgs < m_perfTestParams.m_numMsgs) {
- tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue);
}
opsInTxnCnt = 0U;
}
} else {
// --- Non-transactional dequeue ---
- for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) {
+ for (std::deque<boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) {
(*i)->accept();
}
m_unacked.clear();