diff options
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp')
| -rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp | 25 |
1 files changed, 12 insertions, 13 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 6aa477c470..6477696bd6 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -23,13 +23,12 @@ #include "MessageConsumer.h" -#include "DeliveryRecord.h" -#include "SimpleQueue.h" #include "TestOptions.h" -#include "TxnAccept.h" -#include "qpid/asyncStore/AsyncStoreImpl.h" -#include "qpid/broker/TxnBuffer.h" +#include "qpid/broker/SimpleDeliveryRecord.h" +#include "qpid/broker/SimpleQueue.h" +#include "qpid/broker/SimpleTxnAccept.h" +#include "qpid/broker/SimpleTxnBuffer.h" #include <stdint.h> // uint32_t @@ -38,9 +37,9 @@ namespace storePerftools { namespace asyncPerf { MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr<SimpleQueue> queue) : + boost::shared_ptr<qpid::broker::SimpleQueue> queue) : m_perfTestParams(perfTestParams), m_store(store), m_resultQueue(arq), @@ -50,7 +49,7 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, MessageConsumer::~MessageConsumer() {} void -MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr) { +MessageConsumer::record(boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> dr) { m_unacked.push_back(dr); } @@ -61,9 +60,9 @@ void* MessageConsumer::runConsumers() { const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U; uint16_t opsInTxnCnt = 0U; - qpid::broker::TxnBuffer* tb = 0; + qpid::broker::SimpleTxnBuffer* tb = 0; if (useTxns) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); + tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue); } uint32_t msgsPerConsumer = m_perfTestParams.m_numEnqThreadsPerQueue * m_perfTestParams.m_numMsgs / @@ -74,19 +73,19 @@ MessageConsumer::runConsumers() { ++numMsgs; if (useTxns) { // --- Transactional dequeue --- - boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked)); + boost::shared_ptr<qpid::broker::SimpleTxnAccept> ta(new qpid::broker::SimpleTxnAccept(m_unacked)); m_unacked.clear(); tb->enlist(ta); if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) { tb->commitLocal(m_store); if (numMsgs < m_perfTestParams.m_numMsgs) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); + tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue); } opsInTxnCnt = 0U; } } else { // --- Non-transactional dequeue --- - for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) { + for (std::deque<boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) { (*i)->accept(); } m_unacked.clear(); |
