/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /** * \file PerfTest.cpp */ #include "tests/storePerftools/asyncPerf/MessageConsumer.h" #include "tests/storePerftools/asyncPerf/MessageProducer.h" #include "tests/storePerftools/asyncPerf/PerfTest.h" #include "tests/storePerftools/common/ScopedTimer.h" #include "tests/storePerftools/common/Thread.h" #include "tests/storePerftools/version.h" #include "qpid/Modules.h" // Use with loading store as module #include "qpid/asyncStore/AsyncStoreImpl.h" #include "qpid/asyncStore/AsyncStoreOptions.h" #include "qpid/broker/SimpleQueue.h" #include namespace tests { namespace storePerftools { namespace asyncPerf { PerfTest::PerfTest(const TestOptions& to, const qpid::asyncStore::AsyncStoreOptions& aso) : m_testOpts(to), m_storeOpts(aso), m_testResult(to), m_msgData(new char[to.m_msgSize]), m_poller(new qpid::sys::Poller), m_pollingThread(m_poller.get()), m_resultQueue(m_poller), m_store(0) { std::memset(m_msgData, 0, (size_t)to.m_msgSize); } PerfTest::~PerfTest() { m_poller->shutdown(); m_pollingThread.join(); m_queueList.clear(); m_queueList.clear(); m_producers.clear(); delete[] m_msgData; } void PerfTest::run() { if (m_testOpts.m_durable) { prepareStore(); } prepareQueues(); // TODO: replace with qpid::sys::Thread std::deque > threads; { // --- Start of timed section --- tests::storePerftools::common::ScopedTimer st(m_testResult); for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) { for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads boost::shared_ptr mp(new MessageProducer(m_testOpts, m_msgData, m_store, m_resultQueue, m_queueList[q])); m_producers.push_back(mp); boost::shared_ptr tp(new tests::storePerftools::common::Thread(mp->startProducers, reinterpret_cast(mp.get()))); threads.push_back(tp); } for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads boost::shared_ptr mc(new MessageConsumer(m_testOpts, m_store, m_resultQueue, m_queueList[q])); m_consumers.push_back(mc); boost::shared_ptr tp(new tests::storePerftools::common::Thread(mc->startConsumers, reinterpret_cast(mc.get()))); threads.push_back(tp); } } while (threads.size()) { threads.front()->join(); threads.pop_front(); } } // --- End of timed section --- destroyQueues(); destroyStore(); } void PerfTest::toStream(std::ostream& os) const { m_testOpts.printVals(os); os << std::endl; m_storeOpts.printVals(os); os << std::endl; os << m_testResult << std::endl; } // private void PerfTest::prepareStore() { qpid::asyncStore::AsyncStoreImpl* s = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts); s->initialize(); m_store = s; } // private void PerfTest::destroyStore() { if (m_store) { delete m_store; } } // private void PerfTest::prepareQueues() { for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) { std::ostringstream qname; qname << "queue_" << std::setw(4) << std::setfill('0') << i; boost::shared_ptr mpq(new qpid::broker::SimpleQueue(qname.str(), m_queueArgs, m_store, m_resultQueue)); mpq->asyncCreate(); m_queueList.push_back(mpq); } } // private void PerfTest::destroyQueues() { while (m_queueList.size() > 0) { m_queueList.front()->asyncDestroy(m_testOpts.m_destroyQueuesOnCompletion); m_queueList.pop_front(); } } int runPerfTest(int argc, char** argv) { // Load async store module qpid::tryShlib ("asyncStore.so"); qpid::CommonOptions co; qpid::asyncStore::AsyncStoreOptions aso; TestOptions to; qpid::Options opts; opts.add(co).add(aso).add(to); try { opts.parse(argc, argv); aso.validate(); to.validate(); // Handle options that just print information then exit. if (co.version) { std::cout << tests::storePerftools::name() << " v." << tests::storePerftools::version() << std::endl; return 0; } if (co.help) { std::cout << tests::storePerftools::name() << ": asyncPerf" << std::endl; std::cout << "Performance test for the async store through the qpid async store interface." << std::endl; std::cout << "Usage: asyncPerf [options]" << std::endl; std::cout << opts << std::endl; return 0; } // Create and start test tests::storePerftools::asyncPerf::PerfTest apt(to, aso); apt.run(); // Print test result std::cout << apt << std::endl; //::sleep(1); } catch (const std::exception& e) { std::cerr << e.what() << std::endl; return 1; } return 0; } }}} // namespace tests::storePerftools::asyncPerf // ----------------------------------------------------------------- int main(int argc, char** argv) { return tests::storePerftools::asyncPerf::runPerfTest(argc, argv); }