From b6851f7bafd90d24bb518b63e7fc9f91e1cd84eb Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Mon, 14 May 2012 13:06:21 +0000 Subject: QPID-3858: Fix directory names to match namespaces in test dir; Changed MockPersistableQueue to use intrusive pointers. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1338185 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/CMakeLists.txt | 56 ++-- .../asyncPerf/MockPersistableMessage.cpp | 181 ---------- .../asyncPerf/MockPersistableMessage.h | 104 ------ .../asyncPerf/MockPersistableQueue.cpp | 362 -------------------- .../asyncPerf/MockPersistableQueue.h | 136 -------- .../asyncPerf/MockTransactionContext.cpp | 222 ------------ .../asyncPerf/MockTransactionContext.h | 99 ------ .../tests/storePerfTools/asyncPerf/PerfTest.cpp | 177 ---------- cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h | 82 ----- .../storePerfTools/asyncPerf/QueuedMessage.cpp | 77 ----- .../tests/storePerfTools/asyncPerf/QueuedMessage.h | 61 ---- .../tests/storePerfTools/asyncPerf/TestOptions.cpp | 79 ----- .../tests/storePerfTools/asyncPerf/TestOptions.h | 60 ---- .../tests/storePerfTools/asyncPerf/TestResult.cpp | 63 ---- .../tests/storePerfTools/asyncPerf/TestResult.h | 93 ----- cpp/src/tests/storePerfTools/common/Parameters.cpp | 33 -- cpp/src/tests/storePerfTools/common/Parameters.h | 43 --- .../tests/storePerfTools/common/PerftoolError.cpp | 202 ----------- .../tests/storePerfTools/common/PerftoolError.h | 127 ------- .../tests/storePerfTools/common/ScopedTimable.cpp | 43 --- .../tests/storePerfTools/common/ScopedTimable.h | 56 ---- .../tests/storePerfTools/common/ScopedTimer.cpp | 58 ---- cpp/src/tests/storePerfTools/common/ScopedTimer.h | 91 ----- cpp/src/tests/storePerfTools/common/Streamable.cpp | 54 --- cpp/src/tests/storePerfTools/common/Streamable.h | 79 ----- .../tests/storePerfTools/common/TestOptions.cpp | 103 ------ cpp/src/tests/storePerfTools/common/TestOptions.h | 66 ---- .../tests/storePerfTools/common/TestParameters.cpp | 135 -------- .../tests/storePerfTools/common/TestParameters.h | 115 ------- cpp/src/tests/storePerfTools/common/TestResult.cpp | 38 --- cpp/src/tests/storePerfTools/common/TestResult.h | 44 --- cpp/src/tests/storePerfTools/common/Thread.cpp | 67 ---- cpp/src/tests/storePerfTools/common/Thread.h | 84 ----- cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp | 218 ------------ cpp/src/tests/storePerfTools/jrnlPerf/Journal.h | 169 ---------- .../storePerfTools/jrnlPerf/JournalParameters.cpp | 166 --------- .../storePerfTools/jrnlPerf/JournalParameters.h | 131 -------- cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.cpp | 241 ------------- cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.h | 135 -------- .../tests/storePerfTools/jrnlPerf/TestResult.cpp | 65 ---- cpp/src/tests/storePerfTools/jrnlPerf/TestResult.h | 92 ----- cpp/src/tests/storePerfTools/version.h | 49 --- .../asyncPerf/MockPersistableMessage.cpp | 181 ++++++++++ .../asyncPerf/MockPersistableMessage.h | 104 ++++++ .../asyncPerf/MockPersistableQueue.cpp | 373 +++++++++++++++++++++ .../asyncPerf/MockPersistableQueue.h | 140 ++++++++ .../asyncPerf/MockTransactionContext.cpp | 222 ++++++++++++ .../asyncPerf/MockTransactionContext.h | 99 ++++++ .../tests/storePerftools/asyncPerf/PerfTest.cpp | 177 ++++++++++ cpp/src/tests/storePerftools/asyncPerf/PerfTest.h | 79 +++++ .../storePerftools/asyncPerf/QueuedMessage.cpp | 77 +++++ .../tests/storePerftools/asyncPerf/QueuedMessage.h | 61 ++++ .../tests/storePerftools/asyncPerf/TestOptions.cpp | 79 +++++ .../tests/storePerftools/asyncPerf/TestOptions.h | 60 ++++ .../tests/storePerftools/asyncPerf/TestResult.cpp | 63 ++++ .../tests/storePerftools/asyncPerf/TestResult.h | 93 +++++ cpp/src/tests/storePerftools/common/Parameters.cpp | 33 ++ cpp/src/tests/storePerftools/common/Parameters.h | 43 +++ .../tests/storePerftools/common/PerftoolError.cpp | 202 +++++++++++ .../tests/storePerftools/common/PerftoolError.h | 127 +++++++ .../tests/storePerftools/common/ScopedTimable.cpp | 43 +++ .../tests/storePerftools/common/ScopedTimable.h | 56 ++++ .../tests/storePerftools/common/ScopedTimer.cpp | 58 ++++ cpp/src/tests/storePerftools/common/ScopedTimer.h | 91 +++++ cpp/src/tests/storePerftools/common/Streamable.cpp | 54 +++ cpp/src/tests/storePerftools/common/Streamable.h | 79 +++++ .../tests/storePerftools/common/TestOptions.cpp | 103 ++++++ cpp/src/tests/storePerftools/common/TestOptions.h | 66 ++++ .../tests/storePerftools/common/TestParameters.cpp | 135 ++++++++ .../tests/storePerftools/common/TestParameters.h | 115 +++++++ cpp/src/tests/storePerftools/common/TestResult.cpp | 38 +++ cpp/src/tests/storePerftools/common/TestResult.h | 44 +++ cpp/src/tests/storePerftools/common/Thread.cpp | 67 ++++ cpp/src/tests/storePerftools/common/Thread.h | 84 +++++ cpp/src/tests/storePerftools/jrnlPerf/Journal.cpp | 218 ++++++++++++ cpp/src/tests/storePerftools/jrnlPerf/Journal.h | 169 ++++++++++ .../storePerftools/jrnlPerf/JournalParameters.cpp | 166 +++++++++ .../storePerftools/jrnlPerf/JournalParameters.h | 131 ++++++++ cpp/src/tests/storePerftools/jrnlPerf/PerfTest.cpp | 241 +++++++++++++ cpp/src/tests/storePerftools/jrnlPerf/PerfTest.h | 135 ++++++++ .../tests/storePerftools/jrnlPerf/TestResult.cpp | 65 ++++ cpp/src/tests/storePerftools/jrnlPerf/TestResult.h | 92 +++++ cpp/src/tests/storePerftools/version.h | 49 +++ 83 files changed, 4540 insertions(+), 4528 deletions(-) delete mode 100644 cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp delete mode 100644 cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h delete mode 100644 cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp delete mode 100644 cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h delete mode 100644 cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp delete mode 100644 cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h delete mode 100644 cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp delete mode 100644 cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h delete mode 100644 cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp delete mode 100644 cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h delete mode 100644 cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp delete mode 100644 cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h delete mode 100644 cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp delete mode 100644 cpp/src/tests/storePerfTools/asyncPerf/TestResult.h delete mode 100644 cpp/src/tests/storePerfTools/common/Parameters.cpp delete mode 100644 cpp/src/tests/storePerfTools/common/Parameters.h delete mode 100644 cpp/src/tests/storePerfTools/common/PerftoolError.cpp delete mode 100644 cpp/src/tests/storePerfTools/common/PerftoolError.h delete mode 100644 cpp/src/tests/storePerfTools/common/ScopedTimable.cpp delete mode 100644 cpp/src/tests/storePerfTools/common/ScopedTimable.h delete mode 100644 cpp/src/tests/storePerfTools/common/ScopedTimer.cpp delete mode 100644 cpp/src/tests/storePerfTools/common/ScopedTimer.h delete mode 100644 cpp/src/tests/storePerfTools/common/Streamable.cpp delete mode 100644 cpp/src/tests/storePerfTools/common/Streamable.h delete mode 100644 cpp/src/tests/storePerfTools/common/TestOptions.cpp delete mode 100644 cpp/src/tests/storePerfTools/common/TestOptions.h delete mode 100644 cpp/src/tests/storePerfTools/common/TestParameters.cpp delete mode 100644 cpp/src/tests/storePerfTools/common/TestParameters.h delete mode 100644 cpp/src/tests/storePerfTools/common/TestResult.cpp delete mode 100644 cpp/src/tests/storePerfTools/common/TestResult.h delete mode 100644 cpp/src/tests/storePerfTools/common/Thread.cpp delete mode 100644 cpp/src/tests/storePerfTools/common/Thread.h delete mode 100644 cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp delete mode 100644 cpp/src/tests/storePerfTools/jrnlPerf/Journal.h delete mode 100644 cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.cpp delete mode 100644 cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.h delete mode 100644 cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.cpp delete mode 100644 cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.h delete mode 100644 cpp/src/tests/storePerfTools/jrnlPerf/TestResult.cpp delete mode 100644 cpp/src/tests/storePerfTools/jrnlPerf/TestResult.h delete mode 100644 cpp/src/tests/storePerfTools/version.h create mode 100644 cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp create mode 100644 cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h create mode 100644 cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp create mode 100644 cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h create mode 100644 cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp create mode 100644 cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h create mode 100644 cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp create mode 100644 cpp/src/tests/storePerftools/asyncPerf/PerfTest.h create mode 100644 cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp create mode 100644 cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h create mode 100644 cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp create mode 100644 cpp/src/tests/storePerftools/asyncPerf/TestOptions.h create mode 100644 cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp create mode 100644 cpp/src/tests/storePerftools/asyncPerf/TestResult.h create mode 100644 cpp/src/tests/storePerftools/common/Parameters.cpp create mode 100644 cpp/src/tests/storePerftools/common/Parameters.h create mode 100644 cpp/src/tests/storePerftools/common/PerftoolError.cpp create mode 100644 cpp/src/tests/storePerftools/common/PerftoolError.h create mode 100644 cpp/src/tests/storePerftools/common/ScopedTimable.cpp create mode 100644 cpp/src/tests/storePerftools/common/ScopedTimable.h create mode 100644 cpp/src/tests/storePerftools/common/ScopedTimer.cpp create mode 100644 cpp/src/tests/storePerftools/common/ScopedTimer.h create mode 100644 cpp/src/tests/storePerftools/common/Streamable.cpp create mode 100644 cpp/src/tests/storePerftools/common/Streamable.h create mode 100644 cpp/src/tests/storePerftools/common/TestOptions.cpp create mode 100644 cpp/src/tests/storePerftools/common/TestOptions.h create mode 100644 cpp/src/tests/storePerftools/common/TestParameters.cpp create mode 100644 cpp/src/tests/storePerftools/common/TestParameters.h create mode 100644 cpp/src/tests/storePerftools/common/TestResult.cpp create mode 100644 cpp/src/tests/storePerftools/common/TestResult.h create mode 100644 cpp/src/tests/storePerftools/common/Thread.cpp create mode 100644 cpp/src/tests/storePerftools/common/Thread.h create mode 100644 cpp/src/tests/storePerftools/jrnlPerf/Journal.cpp create mode 100644 cpp/src/tests/storePerftools/jrnlPerf/Journal.h create mode 100644 cpp/src/tests/storePerftools/jrnlPerf/JournalParameters.cpp create mode 100644 cpp/src/tests/storePerftools/jrnlPerf/JournalParameters.h create mode 100644 cpp/src/tests/storePerftools/jrnlPerf/PerfTest.cpp create mode 100644 cpp/src/tests/storePerftools/jrnlPerf/PerfTest.h create mode 100644 cpp/src/tests/storePerftools/jrnlPerf/TestResult.cpp create mode 100644 cpp/src/tests/storePerftools/jrnlPerf/TestResult.h create mode 100644 cpp/src/tests/storePerftools/version.h (limited to 'cpp/src') diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index 54fc4fdf2d..ba4f5d321e 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -350,19 +350,19 @@ add_library (dlclose_noop MODULE dlclose_noop.c) # New journal perf test (jrnl2Perf) set (jrnl2Perf_SOURCES - storePerfTools/jrnlPerf/Journal.cpp - storePerfTools/jrnlPerf/JournalParameters.cpp - storePerfTools/jrnlPerf/PerfTest.cpp - storePerfTools/jrnlPerf/TestResult.cpp + storePerftools/jrnlPerf/Journal.cpp + storePerftools/jrnlPerf/JournalParameters.cpp + storePerftools/jrnlPerf/PerfTest.cpp + storePerftools/jrnlPerf/TestResult.cpp - storePerfTools/common/Parameters.cpp - storePerfTools/common/PerftoolError.cpp - storePerfTools/common/ScopedTimable.cpp - storePerfTools/common/ScopedTimer.cpp - storePerfTools/common/Streamable.cpp - storePerfTools/common/TestParameters.cpp - storePerfTools/common/TestResult.cpp - storePerfTools/common/Thread.cpp + storePerftools/common/Parameters.cpp + storePerftools/common/PerftoolError.cpp + storePerftools/common/ScopedTimable.cpp + storePerftools/common/ScopedTimer.cpp + storePerftools/common/Streamable.cpp + storePerftools/common/TestParameters.cpp + storePerftools/common/TestResult.cpp + storePerftools/common/Thread.cpp ) if (UNIX) @@ -379,23 +379,23 @@ endif (UNIX) # Async store perf test (asyncPerf) set (asyncStorePerf_SOURCES - storePerfTools/asyncPerf/MockPersistableMessage.cpp - storePerfTools/asyncPerf/MockPersistableQueue.cpp - storePerfTools/asyncPerf/MockTransactionContext.cpp - storePerfTools/asyncPerf/PerfTest.cpp - storePerfTools/asyncPerf/QueuedMessage.cpp - storePerfTools/asyncPerf/TestOptions.cpp - storePerfTools/asyncPerf/TestResult.cpp + storePerftools/asyncPerf/MockPersistableMessage.cpp + storePerftools/asyncPerf/MockPersistableQueue.cpp + storePerftools/asyncPerf/MockTransactionContext.cpp + storePerftools/asyncPerf/PerfTest.cpp + storePerftools/asyncPerf/QueuedMessage.cpp + storePerftools/asyncPerf/TestOptions.cpp + storePerftools/asyncPerf/TestResult.cpp - storePerfTools/common/Parameters.cpp - storePerfTools/common/PerftoolError.cpp - storePerfTools/common/ScopedTimable.cpp - storePerfTools/common/ScopedTimer.cpp - storePerfTools/common/Streamable.cpp - storePerfTools/common/TestOptions.cpp - storePerfTools/common/TestParameters.cpp - storePerfTools/common/TestResult.cpp - storePerfTools/common/Thread.cpp + storePerftools/common/Parameters.cpp + storePerftools/common/PerftoolError.cpp + storePerftools/common/ScopedTimable.cpp + storePerftools/common/ScopedTimer.cpp + storePerftools/common/Streamable.cpp + storePerftools/common/TestOptions.cpp + storePerftools/common/TestParameters.cpp + storePerftools/common/TestResult.cpp + storePerftools/common/Thread.cpp ) if (UNIX) diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp deleted file mode 100644 index 5cc829f4d2..0000000000 --- a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MockPersistableMessage.cpp - */ - -#include "MockPersistableMessage.h" -#include "MockPersistableQueue.h" // debug statements in enqueueComplete() and dequeueComplete() - -#include "qpid/asyncStore/AsyncStoreImpl.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -// --- Inner class Queue::MessageContext --- - -MockPersistableMessage::MessageContext::MessageContext(MockPersistableMessagePtr msg, - const qpid::asyncStore::AsyncOperation::opCode op, - MockPersistableQueue* q) : - m_msg(msg), - m_op(op), - m_q(q) -{} - -MockPersistableMessage::MessageContext::~MessageContext() -{} - -const char* -MockPersistableMessage::MessageContext::getOp() const -{ - return qpid::asyncStore::AsyncOperation::getOpStr(m_op); -} - -void -MockPersistableMessage::MessageContext::destroy() -{ - delete this; -} - -// --- Class MockPersistableMessage --- - - -MockPersistableMessage::MockPersistableMessage(const char* msgData, - const uint32_t msgSize, - qpid::asyncStore::AsyncStoreImpl* store, - const bool persistent) : - m_persistenceId(0ULL), - m_msg(msgData, static_cast(msgSize)), - m_persistent(persistent), - m_msgHandle(store->createMessageHandle(this)) -{} - -MockPersistableMessage::~MockPersistableMessage() -{} - -// static -void -MockPersistableMessage::handleAsyncResult(const qpid::broker::AsyncResult* res, - qpid::broker::BrokerContext* bc) -{ - if (bc) { - MessageContext* mc = dynamic_cast(bc); - if (mc->m_msg) { - if (res->errNo) { - // TODO: Handle async failure here - std::cerr << "Message pid=0x" << std::hex << mc->m_msg->m_persistenceId << std::dec << ": Operation " - << mc->getOp() << ": failure " << res->errNo << " (" << res->errMsg << ")" << std::endl; - } else { - // Handle async success here - switch(mc->m_op) { - case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE: - mc->m_msg->dequeueComplete(mc); - break; - case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE: - mc->m_msg->enqueueComplete(mc); - break; - default: - std::ostringstream oss; - oss << "tests::storePerftools::asyncPerf::MockPersistableMessage::handleAsyncResult(): Unknown async queue operation: " << mc->m_op; - throw qpid::Exception(oss.str()); - }; - } - } - } - if (bc) delete bc; - if (res) delete res; -} - -qpid::broker::MessageHandle& -MockPersistableMessage::getHandle() -{ - return m_msgHandle; -} - -void -MockPersistableMessage::setPersistenceId(uint64_t id) const -{ - m_persistenceId = id; -} - -uint64_t -MockPersistableMessage::getPersistenceId() const -{ - return m_persistenceId; -} - -void -MockPersistableMessage::encode(qpid::framing::Buffer& buffer) const -{ - buffer.putRawData(m_msg); -} - -uint32_t -MockPersistableMessage::encodedSize() const -{ - return static_cast(m_msg.size()); -} - -void -MockPersistableMessage::allDequeuesComplete() -{} - -uint32_t -MockPersistableMessage::encodedHeaderSize() const -{ - return 0; -} - -bool -MockPersistableMessage::isPersistent() const -{ - return m_persistent; -} - -uint64_t -MockPersistableMessage::getSize() -{ - return m_msg.size(); -} - -void -MockPersistableMessage::write(char* target) -{ - ::memcpy(target, m_msg.data(), m_msg.size()); -} - -// protected -void -MockPersistableMessage::enqueueComplete(const MessageContext* mc) -{ -//std::cout << "~~~~~ Message pid=0x" << std::hex << mc->m_msg->getPersistenceId() << std::dec << ": enqueueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl << std::flush; - assert(mc->m_msg.get() == this); -} - -// protected -void -MockPersistableMessage::dequeueComplete(const MessageContext* mc) -{ -//std::cout << "~~~~~ Message pid=0x" << std::hex << mc->m_msg->getPersistenceId() << std::dec << ": dequeueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl << std::flush; - assert(mc->m_msg.get() == this); -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h deleted file mode 100644 index fa9bc8e937..0000000000 --- a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MockPersistableMessage.h - */ - -#ifndef tests_storePerfTools_asyncPerf_MockPersistableMessage_h_ -#define tests_storePerfTools_asyncPerf_MockPersistableMessage_h_ - -#include "qpid/asyncStore/AsyncOperation.h" -#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource -#include "qpid/broker/BrokerContext.h" -#include "qpid/broker/MessageHandle.h" -#include "qpid/broker/PersistableMessage.h" - -#include // uint32_t - -namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class MockPersistableMessage; -class MockPersistableQueue; - -typedef boost::shared_ptr MockPersistableMessagePtr; - -class MockPersistableMessage: public qpid::broker::PersistableMessage, qpid::broker::DataSource -{ -public: - class MessageContext : public qpid::broker::BrokerContext - { - public: - MessageContext(MockPersistableMessagePtr msg, - const qpid::asyncStore::AsyncOperation::opCode op, - MockPersistableQueue* q); - virtual ~MessageContext(); - const char* getOp() const; - void destroy(); - MockPersistableMessagePtr m_msg; - const qpid::asyncStore::AsyncOperation::opCode m_op; - MockPersistableQueue* m_q; - }; - - MockPersistableMessage(const char* msgData, - const uint32_t msgSize, - qpid::asyncStore::AsyncStoreImpl* store, - const bool persistent = true); - virtual ~MockPersistableMessage(); - static void handleAsyncResult(const qpid::broker::AsyncResult* res, - qpid::broker::BrokerContext* bc); - qpid::broker::MessageHandle& getHandle(); - - // Interface Persistable - virtual void setPersistenceId(uint64_t id) const; - virtual uint64_t getPersistenceId() const; - virtual void encode(qpid::framing::Buffer& buffer) const; - virtual uint32_t encodedSize() const; - - // Interface PersistableMessage - virtual void allDequeuesComplete(); - virtual uint32_t encodedHeaderSize() const; - virtual bool isPersistent() const; - - // Interface DataStore - virtual uint64_t getSize(); - virtual void write(char* target); - -protected: - mutable uint64_t m_persistenceId; - const std::string m_msg; - const bool m_persistent; - qpid::broker::MessageHandle m_msgHandle; - - // --- Ascnc op completions (called through handleAsyncResult) --- - void enqueueComplete(const MessageContext* mc); - void dequeueComplete(const MessageContext* mc); - -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerfTools_asyncPerf_MockPersistableMessage_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp deleted file mode 100644 index 69af020a26..0000000000 --- a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MockPersistableQueue.cpp - */ - -#include "MockPersistableQueue.h" - -#include "MockPersistableMessage.h" -#include "MockTransactionContext.h" -#include "QueuedMessage.h" -#include "TestOptions.h" - -#include "qpid/asyncStore/AsyncStoreImpl.h" -#include "qpid/broker/EnqueueHandle.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -// --- Inner class MockPersistableQueue::QueueContext --- - -MockPersistableQueue::QueueContext::QueueContext(MockPersistableQueuePtr q, - const qpid::asyncStore::AsyncOperation::opCode op) : - m_q(q), - m_op(op) -{} - -MockPersistableQueue::QueueContext::~QueueContext() -{} - -const char* -MockPersistableQueue::QueueContext::getOp() const -{ - return qpid::asyncStore::AsyncOperation::getOpStr(m_op); -} - -void -MockPersistableQueue::QueueContext::destroy() -{ - delete this; -} - -// --- Class MockPersistableQueue --- - -MockPersistableQueue::MockPersistableQueue(const std::string& name, - const qpid::framing::FieldTable& /*args*/, - qpid::asyncStore::AsyncStoreImpl* store, - const TestOptions& to, - const char* msgData) : - qpid::broker::PersistableQueue(), - m_name(name), - m_store(store), - m_persistenceId(0ULL), - m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. - m_perfTestOpts(to), - m_msgData(msgData) -{ - const qpid::types::Variant::Map qo; - m_queueHandle = m_store->createQueueHandle(m_name, qo); -} - -MockPersistableQueue::~MockPersistableQueue() -{ -// m_store->flush(*this); - // TODO: Make destroying the store a test parameter -// m_store->destroy(*this); -// m_store = 0; -} - -// static -void -MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, - qpid::broker::BrokerContext* bc) -{ - if (bc && res) { - QueueContext* qc = dynamic_cast(bc); - if (qc->m_q) { - if (res->errNo) { - // TODO: Handle async failure here - std::cerr << "Queue name=\"" << qc->m_q->m_name << "\": Operation " << qc->getOp() << ": failure " - << res->errNo << " (" << res->errMsg << ")" << std::endl; - } else { - // Handle async success here - switch(qc->m_op) { - case qpid::asyncStore::AsyncOperation::QUEUE_CREATE: - qc->m_q->createComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH: - qc->m_q->flushComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY: - qc->m_q->destroyComplete(qc); - break; - default: - std::ostringstream oss; - oss << "tests::storePerftools::asyncPerf::MockPersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->m_op; - throw qpid::Exception(oss.str()); - }; - } - } - } - if (bc) delete bc; - if (res) delete res; -} - -qpid::broker::QueueHandle& -MockPersistableQueue::getHandle() -{ - return m_queueHandle; -} - -// static -void -MockPersistableQueue::asyncStoreCreate(MockPersistableQueuePtr& qp) -{ - qp->m_store->submitCreate(qp->m_queueHandle, - dynamic_cast(qp.get()), - &handleAsyncResult, - new QueueContext(qp, qpid::asyncStore::AsyncOperation::QUEUE_CREATE)); -} - -// static -void -MockPersistableQueue::asyncStoreDestroy(MockPersistableQueuePtr& qp) -{ - qp->m_store->submitDestroy(qp->m_queueHandle, - &handleAsyncResult, - new QueueContext(qp, qpid::asyncStore::AsyncOperation::QUEUE_DESTROY)); -} - -void* -MockPersistableQueue::runEnqueues() -{ - uint32_t numMsgs = 0; - uint16_t txnCnt = 0; - const bool useTxn = m_perfTestOpts.m_enqTxnBlockSize > 0; - MockTransactionContextPtr txn; - while (numMsgs < m_perfTestOpts.m_numMsgs) { - if (useTxn && txnCnt == 0) { - txn.reset(new MockTransactionContext(m_store)); // equivalent to begin() - } - MockPersistableMessagePtr msg(new MockPersistableMessage(m_msgData, m_perfTestOpts.m_msgSize, m_store, true)); - msg->setPersistenceId(m_store->getNextRid()); - qpid::broker::EnqueueHandle enqHandle = m_store->createEnqueueHandle(msg->getHandle(), m_queueHandle); - MockPersistableMessage::MessageContext* msgCtxt = new MockPersistableMessage::MessageContext(msg, - qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, - this); - if (useTxn) { - m_store->submitEnqueue(enqHandle, - txn->getHandle(), - &MockPersistableMessage::handleAsyncResult, - dynamic_cast(msgCtxt)); - } else { - m_store->submitEnqueue(enqHandle, - &MockPersistableMessage::handleAsyncResult, - dynamic_cast(msgCtxt)); - } - QueuedMessagePtr qm(new QueuedMessage(msg, enqHandle, txn)); - push(qm); - if (useTxn && ++txnCnt >= m_perfTestOpts.m_enqTxnBlockSize) { - txn->commit(); - txnCnt = 0; - } - ++numMsgs; - } - if (txnCnt > 0) { - txn->commit(); - txnCnt = 0; - } - return 0; -} - -void* -MockPersistableQueue::runDequeues() -{ - uint32_t numMsgs = 0; - const uint32_t numMsgsToDequeue = m_perfTestOpts.m_numMsgs * m_perfTestOpts.m_numEnqThreadsPerQueue / m_perfTestOpts.m_numDeqThreadsPerQueue; - uint16_t txnCnt = 0; - const bool useTxn = m_perfTestOpts.m_deqTxnBlockSize > 0; - MockTransactionContextPtr txn; - QueuedMessagePtr qm; - while (numMsgs < numMsgsToDequeue) { - if (useTxn && txnCnt == 0) { - txn.reset(new MockTransactionContext(m_store)); // equivalent to begin() - } - pop(qm); - if (qm.get()) { - qpid::broker::EnqueueHandle enqHandle = qm->getEnqueueHandle(); - qpid::broker::BrokerContext* bc = new MockPersistableMessage::MessageContext(qm->getMessage(), - qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, - this); - if (useTxn) { - m_store->submitDequeue(enqHandle, - txn->getHandle(), - &MockPersistableMessage::handleAsyncResult, - bc); - } else { - m_store->submitDequeue(enqHandle, - &MockPersistableMessage::handleAsyncResult, - bc); - } - ++numMsgs; - qm.reset(static_cast(0)); - if (useTxn && ++txnCnt >= m_perfTestOpts.m_deqTxnBlockSize) { - txn->commit(); - txnCnt = 0; - } - } - } - if (txnCnt > 0) { - txn->commit(); - txnCnt = 0; - } - return 0; -} - -//static -void* -MockPersistableQueue::startEnqueues(void* ptr) -{ - return reinterpret_cast(ptr)->runEnqueues(); -} - -//static -void* -MockPersistableQueue::startDequeues(void* ptr) -{ - return reinterpret_cast(ptr)->runDequeues(); -} - -void -MockPersistableQueue::encode(qpid::framing::Buffer& buffer) const -{ - buffer.putShortString(m_name); -} - -uint32_t -MockPersistableQueue::encodedSize() const -{ - return m_name.size() + 1; -} - -uint64_t -MockPersistableQueue::getPersistenceId() const -{ - return m_persistenceId; -} - -void -MockPersistableQueue::setPersistenceId(uint64_t persistenceId) const -{ - m_persistenceId = persistenceId; -} - -void -MockPersistableQueue::flush() -{ - //if(m_store) m_store->flush(*this); -} - -const std::string& -MockPersistableQueue::getName() const -{ - return m_name; -} - -void -MockPersistableQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) -{ - if (externalQueueStore != inst && externalQueueStore) - delete externalQueueStore; - externalQueueStore = inst; -} - -uint64_t -MockPersistableQueue::getSize() -{ - return m_persistableData.size(); -} - -void -MockPersistableQueue::write(char* target) -{ - ::memcpy(target, m_persistableData.data(), m_persistableData.size()); -} - -// protected -void -MockPersistableQueue::createComplete(const QueueContext* qc) -{ -//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": createComplete()" << std::endl << std::flush; - assert(qc->m_q.get() == this); -} - -// protected -void -MockPersistableQueue::flushComplete(const QueueContext* qc) -{ -//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": flushComplete()" << std::endl << std::flush; - assert(qc->m_q.get() == this); -} - -// protected -void -MockPersistableQueue::destroyComplete(const QueueContext* qc) -{ -//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": destroyComplete()" << std::endl << std::flush; - assert(qc->m_q.get() == this); -} - -// protected -void -MockPersistableQueue::push(QueuedMessagePtr& qm) -{ - qpid::sys::ScopedLock l(m_enqueuedMsgsMutex); - m_enqueuedMsgs.push_back(qm); - m_dequeueCondition.notify(); -} - -// protected -void -MockPersistableQueue::pop(QueuedMessagePtr& qm) -{ - qpid::sys::ScopedLock l(m_enqueuedMsgsMutex); - while (m_enqueuedMsgs.empty()) { - m_dequeueCondition.wait(m_enqueuedMsgsMutex); - } - qm = m_enqueuedMsgs.front(); - if (qm->isTransactional()) { - // The next msg is still in an open transaction, skip and find next non-open-txn msg - MsgEnqListItr i = m_enqueuedMsgs.begin(); - while (++i != m_enqueuedMsgs.end()) { - if (!(*i)->isTransactional()) { - qm = *i; - m_enqueuedMsgs.erase(i); - } - } - } else { - // The next msg is not in an open txn - m_enqueuedMsgs.pop_front(); - } -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h deleted file mode 100644 index c3b461477c..0000000000 --- a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MockPersistableQueue.h - */ - -#ifndef tests_storePerfTools_asyncPerf_MockPersistableQueue_h_ -#define tests_storePerfTools_asyncPerf_MockPersistableQueue_h_ - -#include "qpid/asyncStore/AsyncOperation.h" -#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource -#include "qpid/broker/BrokerContext.h" -#include "qpid/broker/PersistableQueue.h" -#include "qpid/broker/QueueHandle.h" -#include "qpid/sys/Condition.h" -#include "qpid/sys/Mutex.h" - -#include -#include - -namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -} -namespace framing { -class FieldTable; -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class MockPersistableQueue; -class QueuedMessage; -class TestOptions; - -typedef boost::shared_ptr MockPersistableQueuePtr; -typedef boost::shared_ptr QueuedMessagePtr; - -class MockPersistableQueue : public qpid::broker::PersistableQueue, public qpid::broker::DataSource -{ -public: - class QueueContext : public qpid::broker::BrokerContext - { - public: - QueueContext(MockPersistableQueuePtr q, - const qpid::asyncStore::AsyncOperation::opCode op); - virtual ~QueueContext(); - const char* getOp() const; - void destroy(); - MockPersistableQueuePtr m_q; - const qpid::asyncStore::AsyncOperation::opCode m_op; - }; - - MockPersistableQueue(const std::string& name, - const qpid::framing::FieldTable& args, - qpid::asyncStore::AsyncStoreImpl* store, - const TestOptions& perfTestParams, - const char* msgData); - virtual ~MockPersistableQueue(); - - // --- Async functionality --- - static void handleAsyncResult(const qpid::broker::AsyncResult* res, - qpid::broker::BrokerContext* bc); - qpid::broker::QueueHandle& getHandle(); - static void asyncStoreCreate(MockPersistableQueuePtr& qp); - static void asyncStoreDestroy(MockPersistableQueuePtr& qp); - - // --- Performance test thread entry points --- - void* runEnqueues(); - void* runDequeues(); - static void* startEnqueues(void* ptr); - static void* startDequeues(void* ptr); - - // --- Interface qpid::broker::Persistable --- - virtual void encode(qpid::framing::Buffer& buffer) const; - virtual uint32_t encodedSize() const; - virtual uint64_t getPersistenceId() const; - virtual void setPersistenceId(uint64_t persistenceId) const; - - // --- Interface qpid::broker::PersistableQueue --- - virtual void flush(); - virtual const std::string& getName() const; - virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst); - - // --- Interface DataStore --- - virtual uint64_t getSize(); - virtual void write(char* target); - -protected: - const std::string m_name; - qpid::asyncStore::AsyncStoreImpl* m_store; - mutable uint64_t m_persistenceId; - std::string m_persistableData; - qpid::broker::QueueHandle m_queueHandle; - - // Test params - const TestOptions& m_perfTestOpts; - const char* m_msgData; - - typedef std::deque MsgEnqList; - typedef MsgEnqList::iterator MsgEnqListItr; - MsgEnqList m_enqueuedMsgs; - qpid::sys::Mutex m_enqueuedMsgsMutex; - qpid::sys::Condition m_dequeueCondition; - - // --- Ascnc op completions (called through handleAsyncResult) --- - void createComplete(const QueueContext* qc); - void flushComplete(const QueueContext* qc); - void destroyComplete(const QueueContext* qc); - - // --- Queue functionality --- - void push(QueuedMessagePtr& msg); - void pop(QueuedMessagePtr& msg); -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerfTools_asyncPerf_MockPersistableQueue_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp deleted file mode 100644 index 10be34c6f5..0000000000 --- a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MockTransactionContext.cpp - */ - -#include "MockTransactionContext.h" - -#include "QueuedMessage.h" - -#include "qpid/asyncStore/AsyncStoreImpl.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -// --- Inner class MockTransactionContext::QueueContext --- - -MockTransactionContext::TransactionContext::TransactionContext(MockTransactionContext* tc, - const qpid::asyncStore::AsyncOperation::opCode op) : - m_tc(tc), - m_op(op) -{} - -MockTransactionContext::TransactionContext::~TransactionContext() -{} - -const char* -MockTransactionContext::TransactionContext::getOp() const -{ - return qpid::asyncStore::AsyncOperation::getOpStr(m_op); -} - -void -MockTransactionContext::TransactionContext::destroy() -{ - delete this; -} - -// --- Class MockTransactionContext --- - - -MockTransactionContext::MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, - const std::string& xid) : - m_store(store), - m_txnHandle(store->createTxnHandle(xid)), - m_prepared(false), - m_enqueuedMsgs() -{ -//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; -} - -MockTransactionContext::~MockTransactionContext() -{} - -// static -void -MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res, - qpid::broker::BrokerContext* bc) -{ - if (bc && res) { - TransactionContext* tc = dynamic_cast(bc); - if (tc->m_tc) { - if (res->errNo) { - // TODO: Handle async failure here - std::cerr << "Transaction xid=\"" << tc->m_tc->getXid() << "\": Operation " << tc->getOp() << ": failure " - << res->errNo << " (" << res->errMsg << ")" << std::endl; - } else { - // Handle async success here - switch(tc->m_op) { - case qpid::asyncStore::AsyncOperation::TXN_PREPARE: - tc->m_tc->prepareComplete(tc); - break; - case qpid::asyncStore::AsyncOperation::TXN_COMMIT: - tc->m_tc->commitComplete(tc); - break; - case qpid::asyncStore::AsyncOperation::TXN_ABORT: - tc->m_tc->abortComplete(tc); - break; - default: - std::ostringstream oss; - oss << "tests::storePerftools::asyncPerf::MockTransactionContext::handleAsyncResult(): Unknown async operation: " << tc->m_op; - throw qpid::Exception(oss.str()); - }; - } - } - } - if (bc) delete bc; - if (res) delete res; -} - -qpid::broker::TxnHandle& -MockTransactionContext::getHandle() -{ - return m_txnHandle; -} - -bool -MockTransactionContext::is2pc() const -{ - return m_txnHandle.is2pc(); -} - -const std::string& -MockTransactionContext::getXid() const -{ - return m_txnHandle.getXid(); -} - -void -MockTransactionContext::addEnqueuedMsg(QueuedMessage* qm) -{ - qpid::sys::ScopedLock l(m_enqueuedMsgsMutex); - m_enqueuedMsgs.push_back(qm); -} - -void -MockTransactionContext::prepare() -{ - if (m_txnHandle.is2pc()) { - localPrepare(); - m_prepared = true; - } - std::ostringstream oss; - oss << "MockTransactionContext::prepare(): xid=\"" << getXid() - << "\": Transaction Error: called prepare() on local transaction"; - throw qpid::Exception(oss.str()); -} - -void -MockTransactionContext::abort() -{ - // TODO: Check the following XA transaction semantics: - // Assuming 2PC aborts can occur without a prepare. Do local prepare if not already prepared. - if (!m_prepared) { - localPrepare(); - } - m_store->submitAbort(m_txnHandle, - &handleAsyncResult, - dynamic_cast(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT))); -//std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; -} - -void -MockTransactionContext::commit() -{ - if (is2pc()) { - if (!m_prepared) { - std::ostringstream oss; - oss << "MockTransactionContext::abort(): xid=\"" << getXid() - << "\": Transaction Error: called commit() without prepare() on 2PC transaction"; - throw qpid::Exception(oss.str()); - } - } else { - localPrepare(); - } - m_store->submitCommit(m_txnHandle, - &handleAsyncResult, - dynamic_cast(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT))); -//std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; -} - - -// protected -void -MockTransactionContext::localPrepare() -{ - m_store->submitPrepare(m_txnHandle, - &handleAsyncResult, - dynamic_cast(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE))); -//std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; -} - -// protected -void -MockTransactionContext::prepareComplete(const TransactionContext* tc) -{ - qpid::sys::ScopedLock l(m_enqueuedMsgsMutex); - while (!m_enqueuedMsgs.empty()) { - m_enqueuedMsgs.front()->clearTransaction(); - m_enqueuedMsgs.pop_front(); - } -//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": prepareComplete()" << std::endl << std::flush; - assert(tc->m_tc == this); -} - - -// protected -void -MockTransactionContext::abortComplete(const TransactionContext* tc) -{ -//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": abortComplete()" << std::endl << std::flush; - assert(tc->m_tc == this); -} - - -// protected -void -MockTransactionContext::commitComplete(const TransactionContext* tc) -{ -//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": commitComplete()" << std::endl << std::flush; - assert(tc->m_tc == this); -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h deleted file mode 100644 index 3c467a7b5d..0000000000 --- a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MockTransactionContext.h - */ - -#ifndef tests_storePerfTools_asyncPerf_MockTransactionContext_h_ -#define tests_storePerfTools_asyncPerf_MockTransactionContext_h_ - -#include "qpid/asyncStore/AsyncOperation.h" - -#include "qpid/broker/BrokerContext.h" -#include "qpid/broker/TransactionalStore.h" // qpid::broker::TransactionContext -#include "qpid/broker/TxnHandle.h" -#include "qpid/sys/Mutex.h" - -#include -#include - -namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class QueuedMessage; - -class MockTransactionContext : public qpid::broker::TransactionContext -{ -public: - // NOTE: TransactionContext - Bad naming? This context is the async return handling context for class - // MockTransactionContext async ops. Other classes using this pattern simply use XXXContext for this class - // (e.g. QueueContext in MockPersistableQueue), but in this case it may be confusing. - class TransactionContext : public qpid::broker::BrokerContext - { - public: - TransactionContext(MockTransactionContext* tc, - const qpid::asyncStore::AsyncOperation::opCode op); - virtual ~TransactionContext(); - const char* getOp() const; - void destroy(); - MockTransactionContext* m_tc; - const qpid::asyncStore::AsyncOperation::opCode m_op; - }; - - MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, - const std::string& xid = std::string()); - virtual ~MockTransactionContext(); - static void handleAsyncResult(const qpid::broker::AsyncResult* res, - qpid::broker::BrokerContext* bc); - - qpid::broker::TxnHandle& getHandle(); - bool is2pc() const; - const std::string& getXid() const; - void addEnqueuedMsg(QueuedMessage* qm); - - void prepare(); - void abort(); - void commit(); - -protected: - qpid::asyncStore::AsyncStoreImpl* m_store; - qpid::broker::TxnHandle m_txnHandle; - bool m_prepared; - std::deque m_enqueuedMsgs; - qpid::sys::Mutex m_enqueuedMsgsMutex; - - void localPrepare(); - - // --- Ascnc op completions (called through handleAsyncResult) --- - void prepareComplete(const TransactionContext* tc); - void abortComplete(const TransactionContext* tc); - void commitComplete(const TransactionContext* tc); - -}; - -}}} // namespace tests:storePerftools::asyncPerf - -#endif // tests_storePerfTools_asyncPerf_MockTransactionContext_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp deleted file mode 100644 index 983f088616..0000000000 --- a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file PerfTest.cpp - */ - -#include "PerfTest.h" - -#include "MockPersistableQueue.h" - -#include "tests/storePerfTools/version.h" -#include "tests/storePerfTools/common/ScopedTimer.h" -#include "tests/storePerfTools/common/Thread.h" - -#include "qpid/asyncStore/AsyncStoreImpl.h" - -#include - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -PerfTest::PerfTest(const TestOptions& to, - const qpid::asyncStore::AsyncStoreOptions& aso) : - m_testOpts(to), - m_storeOpts(aso), - m_testResult(to), - m_msgData(new char[to.m_msgSize]), - m_poller(new qpid::sys::Poller), - m_pollingThread(m_poller.get()), - m_store(0) -{ - std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize); -} - -PerfTest::~PerfTest() -{ - m_poller->shutdown(); - m_pollingThread.join(); - - m_queueList.clear(); - - if (m_store) delete m_store; - delete[] m_msgData; -} - -void -PerfTest::prepareStore() -{ - m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts); - m_store->initialize(); -} - -void -PerfTest::prepareQueues() -{ - for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) { - std::ostringstream qname; - qname << "queue_" << std::setw(4) << std::setfill('0') << i; - MockPersistableQueuePtr mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store, m_testOpts, m_msgData)); - mpq->asyncStoreCreate(mpq); - m_queueList.push_back(mpq); - } -} - -void -PerfTest::destroyQueues() -{ - for (std::deque::iterator i=m_queueList.begin(); i!=m_queueList.end(); ++i) { - (*i)->asyncStoreDestroy(*i); - } -} - -void -PerfTest::run() -{ - typedef boost::shared_ptr ThreadPtr; // TODO - replace with qpid threads - - prepareStore(); - prepareQueues(); - - std::deque threads; - { // --- Start of timed section --- - tests::storePerftools::common::ScopedTimer st(m_testResult); - - for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) { - for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads - ThreadPtr tp(new tests::storePerftools::common::Thread(m_queueList[q]->startEnqueues, - reinterpret_cast(m_queueList[q].get()))); - threads.push_back(tp); - } - for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads - ThreadPtr tp(new tests::storePerftools::common::Thread(m_queueList[q]->startDequeues, - reinterpret_cast(m_queueList[q].get()))); - threads.push_back(tp); - } - } - while (threads.size()) { - threads.front()->join(); - threads.pop_front(); - } - } // --- End of timed section --- - // TODO: Add test param to allow queues to be destroyed or left when test ends - destroyQueues(); -} - -void -PerfTest::toStream(std::ostream& os) const -{ - m_testOpts.printVals(os); - os << std::endl; - m_storeOpts.printVals(os); - os << std::endl; - os << m_testResult << std::endl; -} - -}}} // namespace tests::storePerftools::asyncPerf - -// ----------------------------------------------------------------- - -int -main(int argc, char** argv) -{ - qpid::CommonOptions co; - qpid::asyncStore::AsyncStoreOptions aso; - tests::storePerftools::asyncPerf::TestOptions to; - qpid::Options opts; - opts.add(co).add(aso).add(to); - try { - opts.parse(argc, argv); - aso.validate(); - to.validate(); - } - catch (std::exception& e) { - std::cerr << e.what() << std::endl; - return 1; - } - - // Handle options that just print information then exit. - if (co.version) { - std::cout << tests::storePerftools::name() << " v." << tests::storePerftools::version() << std::endl; - return 0; - } - if (co.help) { - std::cout << tests::storePerftools::name() << ": asyncPerf" << std::endl; - std::cout << "Performance test for the async store through the qpid async store interface." << std::endl; - std::cout << "Usage: asyncPerf [options]" << std::endl; - std::cout << opts << std::endl; - return 0; - } - - // Create and start test - tests::storePerftools::asyncPerf::PerfTest apt(to, aso); - apt.run(); - - // Print test result - std::cout << apt << std::endl; - ::sleep(1); - return 0; -} diff --git a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h deleted file mode 100644 index 544cd6b3ae..0000000000 --- a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file PerfTest.h - */ - -#ifndef tests_storePerfTools_asyncPerf_PerfTest_h_ -#define tests_storePerfTools_asyncPerf_PerfTest_h_ - -#include "TestResult.h" - -#include "tests/storePerfTools/common/Streamable.h" - -#include "qpid/framing/FieldTable.h" -#include "qpid/sys/Poller.h" -#include "qpid/sys/Thread.h" - -#include -#include - -namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -class AsyncStoreOptions; -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class MockPersistableQueue; -class TestOptions; - -//typedef boost::shared_ptr AsyncStoreImplPtr; -typedef boost::shared_ptr MockPersistableQueuePtr; - -class PerfTest : public tests::storePerftools::common::Streamable -{ -public: - PerfTest(const TestOptions& to, - const qpid::asyncStore::AsyncStoreOptions& aso); - virtual ~PerfTest(); - void run(); - void toStream(std::ostream& os = std::cout) const; - -protected: - const TestOptions& m_testOpts; - const qpid::asyncStore::AsyncStoreOptions& m_storeOpts; - TestResult m_testResult; - qpid::framing::FieldTable m_queueArgs; - const char* m_msgData; - boost::shared_ptr m_poller; - qpid::sys::Thread m_pollingThread; - qpid::asyncStore::AsyncStoreImpl* m_store; - std::deque m_queueList; - - void prepareStore(); - void prepareQueues(); - void destroyQueues(); - -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerfTools_asyncPerf_PerfTest_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp deleted file mode 100644 index 315e202d8b..0000000000 --- a/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file QueuedMessage.cpp - */ - -#include "QueuedMessage.h" - -#include "MockTransactionContext.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -QueuedMessage::QueuedMessage(MockPersistableMessagePtr msg, - qpid::broker::EnqueueHandle& enqHandle, - MockTransactionContextPtr txn) : - m_msg(msg), - m_enqHandle(enqHandle), - m_txn(txn) -{ - if (txn) { - txn->addEnqueuedMsg(this); - } -} - -QueuedMessage::~QueuedMessage() -{} - -MockPersistableMessagePtr -QueuedMessage::getMessage() const -{ - return m_msg; -} - -qpid::broker::EnqueueHandle -QueuedMessage::getEnqueueHandle() const -{ - return m_enqHandle; -} - -MockTransactionContextPtr -QueuedMessage::getTransactionContext() const -{ - return m_txn; -} - -bool -QueuedMessage::isTransactional() const -{ - return m_txn.get() != 0; -} - -void -QueuedMessage::clearTransaction() -{ - m_txn.reset(static_cast(0)); -} - -}}} // namespace tests::storePerfTools diff --git a/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h deleted file mode 100644 index 4b3dab67f9..0000000000 --- a/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file QueuedMessage.h - */ - -#ifndef tests_storePerfTools_asyncPerf_QueuedMessage_h_ -#define tests_storePerfTools_asyncPerf_QueuedMessage_h_ - -#include "qpid/broker/EnqueueHandle.h" -#include - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class MockPersistableMessage; -class MockTransactionContext; - -typedef boost::shared_ptr MockPersistableMessagePtr; -typedef boost::shared_ptr MockTransactionContextPtr; - -class QueuedMessage -{ -public: - QueuedMessage(MockPersistableMessagePtr msg, - qpid::broker::EnqueueHandle& enqHandle, - MockTransactionContextPtr txn); - virtual ~QueuedMessage(); - MockPersistableMessagePtr getMessage() const; - qpid::broker::EnqueueHandle getEnqueueHandle() const; - MockTransactionContextPtr getTransactionContext() const; - bool isTransactional() const; - void clearTransaction(); - -protected: - MockPersistableMessagePtr m_msg; - qpid::broker::EnqueueHandle m_enqHandle; - MockTransactionContextPtr m_txn; -}; - -}}} // namespace tests::storePerfTools - -#endif // tests_storePerfTools_asyncPerf_QueuedMessage_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp deleted file mode 100644 index 27784ef661..0000000000 --- a/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TestOptions.cpp - */ - -#include "TestOptions.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -// static declarations -uint16_t TestOptions::s_defaultEnqTxnBlkSize = 0; -uint16_t TestOptions::s_defaultDeqTxnBlkSize = 0; - -TestOptions::TestOptions(const std::string& name) : - tests::storePerftools::common::TestOptions(name), - m_enqTxnBlockSize(s_defaultEnqTxnBlkSize), - m_deqTxnBlockSize(s_defaultDeqTxnBlkSize) -{ - doAddOptions(); -} - -TestOptions::TestOptions(const uint32_t numMsgs, - const uint32_t msgSize, - const uint16_t numQueues, - const uint16_t numEnqThreadsPerQueue, - const uint16_t numDeqThreadsPerQueue, - const uint16_t enqTxnBlockSize, - const uint16_t deqTxnBlockSize, - const std::string& name) : - tests::storePerftools::common::TestOptions(numMsgs, msgSize, numQueues, numEnqThreadsPerQueue, numDeqThreadsPerQueue, name), - m_enqTxnBlockSize(enqTxnBlockSize), - m_deqTxnBlockSize(deqTxnBlockSize) -{ - doAddOptions(); -} - -TestOptions::~TestOptions() -{} - -void -TestOptions::printVals(std::ostream& os) const -{ - tests::storePerftools::common::TestOptions::printVals(os); - os << " Num enqueus per transaction [-t, --enq-txn-size]: " << m_enqTxnBlockSize << std::endl; - os << " Num dequeues per transaction [-d, --deq-txn-size]: " << m_deqTxnBlockSize << std::endl; -} - -void -TestOptions::doAddOptions() -{ - addOptions() - ("enq-txn-size,t", qpid::optValue(m_enqTxnBlockSize, "N"), - "Num enqueus per transaction (0 = no transactions)") - ("deq-txn-size,d", qpid::optValue(m_deqTxnBlockSize, "N"), - "Num dequeues per transaction (0 = no transactions)") - ; -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h deleted file mode 100644 index b0e1e4ce74..0000000000 --- a/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TestOptions.h - */ - -#ifndef tests_storePerfTools_asyncPerf_TestOptions_h_ -#define tests_storePerfTools_asyncPerf_TestOptions_h_ - -#include "tests/storePerfTools/common/TestOptions.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class TestOptions : public tests::storePerftools::common::TestOptions -{ -public: - TestOptions(const std::string& name="Test Options"); - TestOptions(const uint32_t numMsgs, - const uint32_t msgSize, - const uint16_t numQueues, - const uint16_t numEnqThreadsPerQueue, - const uint16_t numDeqThreadsPerQueue, - const uint16_t enqTxnBlockSize, - const uint16_t deqTxnBlockSize, - const std::string& name="Test Options"); - virtual ~TestOptions(); - void printVals(std::ostream& os) const; - - uint16_t m_enqTxnBlockSize; ///< Transaction block size for enqueues - uint16_t m_deqTxnBlockSize; ///< Transaction block size for dequeues - -protected: - static uint16_t s_defaultEnqTxnBlkSize; ///< Default transaction block size for enqueues - static uint16_t s_defaultDeqTxnBlkSize; ///< Default transaction block size for dequeues - - void doAddOptions(); -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerfTools_asyncPerf_TestOptions_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp deleted file mode 100644 index cf6f293494..0000000000 --- a/cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TestResult.cpp - */ - -#include "TestResult.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -TestResult::TestResult(const TestOptions& to) : - tests::storePerftools::common::TestResult(), - m_testOpts(to) -{} - -TestResult::~TestResult() -{} - -void -TestResult::toStream(std::ostream& os) const -{ - double msgsRate; - os << "TEST RESULTS:" << std::endl; - os << " Msgs per thread: " << m_testOpts.m_numMsgs << std::endl; - os << " Msg size: " << m_testOpts.m_msgSize << std::endl; - os << " No. queues: " << m_testOpts.m_numQueues << std::endl; - os << " No. enq threads/queue: " << m_testOpts.m_numEnqThreadsPerQueue << std::endl; - os << " No. deq threads/queue: " << m_testOpts.m_numDeqThreadsPerQueue << std::endl; - os << " Time taken: " << m_elapsed << " sec" << std::endl; - uint32_t msgsPerQueue = m_testOpts.m_numMsgs * m_testOpts.m_numEnqThreadsPerQueue; - if (m_testOpts.m_numQueues > 1) { - msgsRate = double(msgsPerQueue) / m_elapsed; - os << " No. msgs per queue: " << msgsPerQueue << std::endl; - os << "Per queue msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl; - os << " " << (msgsRate * m_testOpts.m_msgSize / 1e6) << " MB/sec" << std::endl; - } - uint32_t totalMsgs = msgsPerQueue * m_testOpts.m_numQueues; - msgsRate = double(totalMsgs) / m_elapsed; - os << " Total no. msgs: " << totalMsgs << std::endl; - os << " Broker msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl; - os << " " << (msgsRate * m_testOpts.m_msgSize / 1e6) << " MB/sec" << std::endl; -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestResult.h b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.h deleted file mode 100644 index 1310689ff8..0000000000 --- a/cpp/src/tests/storePerfTools/asyncPerf/TestResult.h +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TestResult.h - */ - -#ifndef tests_storePerfTools_asyncPerf_TestResult_h_ -#define tests_storePerfTools_asyncPerf_TestResult_h_ - -#include "TestOptions.h" - -#include "tests/storePerfTools/common/TestResult.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class TestOptions; - -/** - * \brief Results class that accepts an elapsed time to calculate the rate of message throughput in the journal. - * - * This class (being subclassed from ScopedTimable) is passed to a ScopedTimer object on construction, and the - * inherited _elapsed member will be written with the calculated elapsed time (in seconds) on destruction of the - * ScopedTimer object. This time (initially set to 0.0) is used to calculate message and message byte throughput. - * The message number and size information comes from the JrnlPerfTestParameters object passed to the constructor. - * - * Results are available through the use of toStream(), toString() or the << operators. - * - * Output is in the following format: - *
- * TEST RESULTS:
- *     Msgs per thread: 10000
- *            Msg size: 2048
- *          No. queues: 2
- *   No. threads/queue: 2
- *          Time taken: 1.6626 sec
- *      Total no. msgs: 40000
- *      Msg throughput: 24.0587 kMsgs/sec
- *                      49.2723 MB/sec
- * 
- */ -class TestResult : public tests::storePerftools::common::TestResult -{ -public: - /** - * \brief Constructor - * - * Constructor. Will start the time interval measurement. - * - * \param tp Test parameter details used to calculate the performance results. - */ - TestResult(const TestOptions& to); - - /** - * \brief Virtual destructor - */ - virtual ~TestResult(); - - /** - * \brief Stream the performance test results to an output stream - * - * Convenience feature which streams a multi-line performance result an output stream. - * - * \param os Output stream to which the results are to be streamed - */ - void toStream(std::ostream& os = std::cout) const; - -protected: - TestOptions m_testOpts; ///< Test parameters used for performance calculations - -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerfTools_asyncPerf_TestResult_h_ diff --git a/cpp/src/tests/storePerfTools/common/Parameters.cpp b/cpp/src/tests/storePerfTools/common/Parameters.cpp deleted file mode 100644 index 8e4bafaf86..0000000000 --- a/cpp/src/tests/storePerfTools/common/Parameters.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file Parameters.cpp - */ - -#include "Parameters.h" - -namespace tests { -namespace storePerftools { -namespace common { - -Parameters::~Parameters() -{} - -}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/Parameters.h b/cpp/src/tests/storePerfTools/common/Parameters.h deleted file mode 100644 index 03dd2163f8..0000000000 --- a/cpp/src/tests/storePerfTools/common/Parameters.h +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file Parameters.h - */ -#ifndef tests_storePerfTools_common_Parameters_h_ -#define tests_storePerfTools_common_Parameters_h_ - -#include "Streamable.h" - -namespace tests { -namespace storePerftools { -namespace common { - -class Parameters: public Streamable -{ -public: - virtual ~Parameters(); - virtual bool parseArg(const int arg, - const char* optarg) = 0; - -}; - -}}} // namespace tests::storePerfTools::common - -#endif // tests_storePerfTools_common_Parameters_h_ diff --git a/cpp/src/tests/storePerfTools/common/PerftoolError.cpp b/cpp/src/tests/storePerfTools/common/PerftoolError.cpp deleted file mode 100644 index 5bb61b6519..0000000000 --- a/cpp/src/tests/storePerfTools/common/PerftoolError.cpp +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file PerftoolError.cpp - */ - -#include "PerftoolError.h" - -#include // std::setfill(), std::setw() - -namespace tests { -namespace storePerftools { -namespace common { - -// private -PerftoolError::PerftoolError() : - std::runtime_error(std::string()) -{} - -PerftoolError::PerftoolError(const uint32_t errCode) throw () : - std::runtime_error(std::string()), - m_errCode(errCode) -{ - formatWhatStr(); -} - -PerftoolError::PerftoolError(const std::string& errMsg) throw () : - std::runtime_error(std::string()), - m_errCode(0), - m_errMsg(errMsg) -{ - formatWhatStr(); -} - -PerftoolError::PerftoolError(const uint32_t errCode, - const std::string& errMsg) throw () : - std::runtime_error(std::string()), - m_errCode(errCode), - m_errMsg(errMsg) -{ - formatWhatStr(); -} - -PerftoolError::PerftoolError(const uint32_t errCode, - const std::string& throwingClass, - const std::string& throwingFunction) throw () : - std::runtime_error(std::string()), - m_errCode(errCode), - m_throwingClass(throwingClass), - m_throwingFunction(throwingFunction) -{ - formatWhatStr(); -} - -PerftoolError::PerftoolError(const std::string& errMsg, - const std::string& throwingClass, - const std::string& throwingFunction) throw () : - std::runtime_error(std::string()), - m_errCode(0), - m_errMsg(errMsg), - m_throwingClass(throwingClass), - m_throwingFunction(throwingFunction) -{ - formatWhatStr(); -} - -PerftoolError::PerftoolError(const uint32_t errCode, - const std::string& errMsg, - const std::string& throwingClass, - const std::string& throwingFunction) throw () : - std::runtime_error(std::string()), - m_errCode(errCode), - m_errMsg(errMsg), - m_throwingClass(throwingClass), - m_throwingFunction(throwingFunction) -{} - -PerftoolError::~PerftoolError() throw() -{} - -const char* -PerftoolError::what() const throw () -{ - return m_what.c_str(); -} - -uint32_t -PerftoolError::getErrorCode() const throw () -{ - return m_errCode; -} - -const std::string -PerftoolError::getAdditionalInfo() const throw () -{ - return m_errMsg; -} - -const std::string -PerftoolError::getThrowingClass() const throw () -{ - return m_throwingClass; -} - -const std::string -PerftoolError::getThrowingFunction() const throw () -{ - return m_throwingFunction; -} - -void -PerftoolError::toStream(std::ostream& os) const -{ - os << what(); -} - -// protected -void -PerftoolError::formatWhatStr() throw () -{ - try { - const bool ai = !m_errMsg.empty(); - const bool tc = !m_throwingClass.empty(); - const bool tf = !m_throwingFunction.empty(); - std::ostringstream oss; - oss << className() << " 0x" << std::hex << std::setfill('0') << std::setw(4) << m_errCode << " "; - if (tc) { - oss << m_throwingClass; - if (tf) { - oss << "::"; - } else { - oss << " "; - } - } - if (tf) { - oss << m_throwingFunction << "() "; - } - if (tc || tf) { - oss << "threw " << s_errorMessage(m_errCode); - } - if (ai) { - oss << " (" << m_errMsg << ")"; - } - m_what.assign(oss.str()); - } catch (...) {} -} - -// protected -const char* -PerftoolError::className() -{ - return s_className; -} - -//static -const char* PerftoolError::s_className = "PerftoolError"; - -// --- Static definitions --- -PerftoolError::errorMap_t PerftoolError::s_errorMap; -PerftoolError::errorMapCitr_t PerftoolError::s_errorMapIterator; -bool PerftoolError::s_initializedFlag = PerftoolError::s_initialize(); - -// --- Generic and system errors --- -const uint32_t PerftoolError::PERR_PTHREAD = 0x0001; - -// static -const char* -PerftoolError::s_errorMessage(const uint32_t err_no) throw () -{ - s_errorMapIterator = s_errorMap.find(err_no); - if (s_errorMapIterator == s_errorMap.end()) - return ""; - return s_errorMapIterator->second; -} - -// protected static -bool -PerftoolError::s_initialize() -{ - s_errorMap[PERR_PTHREAD] = "ERR_PTHREAD: pthread operation failure"; - - return true; -} - -}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/PerftoolError.h b/cpp/src/tests/storePerfTools/common/PerftoolError.h deleted file mode 100644 index 8f994d7d28..0000000000 --- a/cpp/src/tests/storePerfTools/common/PerftoolError.h +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file PerftoolError.h - */ - -#ifndef tests_storePerfTools_common_PerftoolError_h_ -#define tests_storePerfTools_common_PerftoolError_h_ - -#include "Streamable.h" - -#include -#include // std::runtime_error -#include // uint32_t - -// Macro definitions - -#include // std::strerror() -#include // std::ostringstream - -/** - * \brief Macro to retrieve and format the C errno value as a string. - * - * \param errno Value of errno to be formatted. - */ -#define FORMAT_SYSERR(errno) " errno=" << errno << " (" << std::strerror(errno) << ")" - -/** - * \brief Macro to check for a clean pthread creation and throwing a JournalException with code JERR_PTHREAD if - * thread creation failed. - * - * \param err Value or errno. - * \param pfn Name of system call that failed. - * \param cls Name of class in which function failed. - * \param fn Name of class function that failed. - */ -#define PTHREAD_CHK(err, pfn, cls, fn) if(err != 0) { \ - std::ostringstream oss; \ - oss << pfn << " failed: " << FORMAT_SYSERR(err); \ - throw tests::storePerftools::common::PerftoolError(tests::storePerftools::common::PerftoolError::PERR_PTHREAD, oss.str(), cls, fn); \ - } - -namespace tests { -namespace storePerftools { -namespace common { - -class PerftoolError: public std::runtime_error, public Streamable -{ -public: - // --- Constructors & destructors --- - PerftoolError(const uint32_t errCode) throw (); - PerftoolError(const std::string& errMsg) throw (); - PerftoolError(const uint32_t errCode, - const std::string& errMsg) throw (); - PerftoolError(const uint32_t errCode, - const std::string& throwingClass, - const std::string& throwingFunction) throw (); - PerftoolError(const std::string& errMsg, - const std::string& throwingClass, - const std::string& throwingFunction) throw (); - PerftoolError(const uint32_t errCode, - const std::string& errMsg, - const std::string& throwingClass, - const std::string& throwingFunction) throw (); - virtual ~PerftoolError() throw(); - - const char* what() const throw (); // overrides std::runtime_error::what() - uint32_t getErrorCode() const throw (); - const std::string getAdditionalInfo() const throw (); - const std::string getThrowingClass() const throw (); - const std::string getThrowingFunction() const throw (); - - // --- Implementation of class Streamable --- - virtual void toStream(std::ostream& os = std::cout) const; - - // --- Generic and system errors --- - static const uint32_t PERR_PTHREAD; ///< pthread operation failure - - - static const char* s_errorMessage(const uint32_t err_no) throw (); - - -protected: - uint32_t m_errCode; ///< Error or failure code, taken from JournalErrors. - std::string m_errMsg; ///< Additional information pertaining to the error or failure. - std::string m_throwingClass; ///< Name of the class throwing the error. - std::string m_throwingFunction; ///< Name of the function throwing the error. - std::string m_what; ///< Standard error of failure message, taken from JournalErrors. - - void formatWhatStr() throw (); - virtual const char* className(); - - typedef std::map errorMap_t; ///< Type for map of error messages - typedef errorMap_t::const_iterator errorMapCitr_t; ///< Const iterator for map of error messages - - static errorMap_t s_errorMap; ///< Map of error messages - static errorMapCitr_t s_errorMapIterator; ///< Const iterator - -private: - static const char* s_className; ///< Name of this class, used in formatting error messages. - static bool s_initializedFlag; ///< Dummy flag, used to initialize map. - - PerftoolError(); - static bool s_initialize(); ///< Static fn for initializing static data - -}; - -}}} // namespace tests::stprePerftools::common - -#endif // tests_storePerfTools_common_PerftoolError_h_ diff --git a/cpp/src/tests/storePerfTools/common/ScopedTimable.cpp b/cpp/src/tests/storePerfTools/common/ScopedTimable.cpp deleted file mode 100644 index c2023b7854..0000000000 --- a/cpp/src/tests/storePerfTools/common/ScopedTimable.cpp +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file ScopedTimable.cpp - */ - -#include "ScopedTimable.h" - -namespace tests { -namespace storePerftools { -namespace common { - -ScopedTimable::ScopedTimable() : - m_elapsed(0.0) -{} - -ScopedTimable::~ScopedTimable() -{} - -double& -ScopedTimable::getElapsedRef() -{ - return m_elapsed; -} - -}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/ScopedTimable.h b/cpp/src/tests/storePerfTools/common/ScopedTimable.h deleted file mode 100644 index 3c21a4aafa..0000000000 --- a/cpp/src/tests/storePerfTools/common/ScopedTimable.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file ScopedTimable.h - */ - -#ifndef tests_storePerfTools_common_ScopedTimable_h_ -#define tests_storePerfTools_common_ScopedTimable_h_ - -namespace tests { -namespace storePerftools { -namespace common { - -/** - * \brief Scoped timer class that starts timing on construction and finishes on destruction. - * - * This class is designed to be the parent class for a performance result class which depends on the elapsed - * time of some process or event. By passing this (or its subclasses) to ScopedTimer (which only exists within - * the scope of the event), the _elapsed member of this class will be written with the elapsed time when the - * ScopedTimer object goes out of scope or is destroyed. - * - * Subclasses may be aware of the parameters being timed, and may thus print and/or display performance and/or - * rate information for these parameters. - */ -class ScopedTimable -{ -public: - ScopedTimable(); - virtual ~ScopedTimable(); - double& getElapsedRef(); - -protected: - double m_elapsed; ///< Elapsed time, will be written on destruction of ScopedTimer instances - -}; - -}}} // namespace tests::storePerftools::common - -#endif // tests_storePerfTools_common_ScopedTimable_h_ diff --git a/cpp/src/tests/storePerfTools/common/ScopedTimer.cpp b/cpp/src/tests/storePerfTools/common/ScopedTimer.cpp deleted file mode 100644 index 8312174cad..0000000000 --- a/cpp/src/tests/storePerfTools/common/ScopedTimer.cpp +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file ScopedTimer.cpp - */ - -#include "ScopedTimer.h" - -#include "ScopedTimable.h" - -namespace tests { -namespace storePerftools { -namespace common { - -ScopedTimer::ScopedTimer(double& elapsed) : - m_elapsed(elapsed) -{ - ::clock_gettime(CLOCK_REALTIME, &m_startTime); -} - -ScopedTimer::ScopedTimer(ScopedTimable& st) : - m_elapsed(st.getElapsedRef()) -{ - ::clock_gettime(CLOCK_REALTIME, &m_startTime); -} - -ScopedTimer::~ScopedTimer() -{ - ::timespec stopTime; - ::clock_gettime(CLOCK_REALTIME, &stopTime); - m_elapsed = _s_getDoubleTime(stopTime) - _s_getDoubleTime(m_startTime); -} - -// static -double ScopedTimer::_s_getDoubleTime(const ::timespec& ts) -{ - return ts.tv_sec + (double(ts.tv_nsec) / 1e9); -} - - -}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/ScopedTimer.h b/cpp/src/tests/storePerfTools/common/ScopedTimer.h deleted file mode 100644 index c23d056f10..0000000000 --- a/cpp/src/tests/storePerfTools/common/ScopedTimer.h +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file ScopedTimer.h - */ - -#ifndef tests_storePerfTools_common_ScopedTimer_h_ -#define tests_storePerfTools_common_ScopedTimer_h_ - -#include - -namespace tests { -namespace storePerftools { -namespace common { - -class ScopedTimable; - -/** - * \brief Scoped timer class that starts timing on construction and finishes on destruction. - * - * The scoped timer will take the current time on construction and again on destruction. The destructor - * will calculate the elapsed time from the difference between these two times and write the result - * as a double to the double ref supplied to the constructor. A second constructor will accept a class (or - * subclass) of ScopedTimable, which contains a double to which the result may be written and accessed at a - * later time. - */ -class ScopedTimer -{ -public: - /** - * \brief Constructor - * - * Constructor which accepts a ref to a double. Will start the time interval measurement. - * - * \param elapsed A ref to a double which will contain the elapsed time in seconds after this class instance - * is destroyed. - */ - ScopedTimer(double& elapsed); - - /** - * \brief Constructor - * - * Constructor which accepts a ref to a ScopedTimable. Will start the time interval measurement. - * - * \param st A ref to a ScopedTimable into which the result of the ScopedTimer can be written. - */ - ScopedTimer(ScopedTimable& st); - - /** - * \brief Destructor - * - * Destructor. Will stop the time interval measurement and write the calculated elapsed time into _elapsed. - */ - virtual ~ScopedTimer(); - -protected: - double& m_elapsed; ///< Ref to elapsed time, will be written on destruction of ScopedTimer instances - ::timespec m_startTime; ///< Start time, set on construction - - /** - * \brief Convert ::timespec to seconds - * - * Static function to convert a ::timespec struct into a double representation in seconds. - * - * \param ts std::timespec struct containing the time to be converted. - * \return A double which represents the time in parameter ts in seconds. - */ - static double _s_getDoubleTime(const ::timespec& ts); - -}; - -}}} // namespace tests::storePerftools::common - -#endif // tests_storePerfTools_common_ScopedTimer_h_ diff --git a/cpp/src/tests/storePerfTools/common/Streamable.cpp b/cpp/src/tests/storePerfTools/common/Streamable.cpp deleted file mode 100644 index 8c58f1c03e..0000000000 --- a/cpp/src/tests/storePerfTools/common/Streamable.cpp +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file Streamable.cpp - */ - -#include "Streamable.h" - -#include - -namespace tests { -namespace storePerftools { -namespace common { - -std::string -Streamable::toString() const -{ - std::ostringstream oss; - toStream(oss); - return oss.str(); -} - -std::ostream& -operator<<(std::ostream& os, const Streamable& s) -{ - s.toStream(os); - return os; -} - -std::ostream& -operator<<(std::ostream& os, const Streamable* sPtr) -{ - sPtr->toStream(os); - return os; -} - -}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/Streamable.h b/cpp/src/tests/storePerfTools/common/Streamable.h deleted file mode 100644 index 504a3d97dd..0000000000 --- a/cpp/src/tests/storePerfTools/common/Streamable.h +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file Streamable.h - */ - -#ifndef tests_storePerfTools_common_Streamable_h_ -#define tests_storePerfTools_common_Streamable_h_ - -#include - -namespace tests { -namespace storePerftools { -namespace common { - -/** - * \brief Abstract class which provides the mechanisms to stream - * - * An abstract class which provides stream functions. The toStream() function must be implemented by subclasses, - * and is used by the remaining functions. For convenience, toString() returns a std::string object. - */ -class Streamable -{ -public: - /** - * \brief Virtual destructor - */ - virtual ~Streamable() {} - - /*** - * \brief Stream some representation of the object to an output stream - * - * \param os Output stream to which the class data is to be streamed - */ - virtual void toStream(std::ostream& os = std::cout) const = 0; - - /** - * \brief Creates a string representation of the test parameters - * - * Convenience feature which creates and returns a std::string object containing the content of toStream(). - * - * \return Content of toStream() - */ - std::string toString() const; - - /** - * \brief Stream the object to an output stream - */ - friend std::ostream& operator<<(std::ostream& os, - const Streamable& s); - - /** - * \brief Stream the object to an output stream through an object pointer - */ - friend std::ostream& operator<<(std::ostream& os, - const Streamable* sPtr); - -}; - -}}} // namespace tests::storePerftools::common - -#endif // tests_storePerfTools_common_Streamable_h_ diff --git a/cpp/src/tests/storePerfTools/common/TestOptions.cpp b/cpp/src/tests/storePerfTools/common/TestOptions.cpp deleted file mode 100644 index 39e3434a6c..0000000000 --- a/cpp/src/tests/storePerfTools/common/TestOptions.cpp +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TestOptions.cpp - */ - -#include "TestOptions.h" - -namespace tests { -namespace storePerftools { -namespace common { - -// static declarations -uint32_t TestOptions::s_defaultNumMsgs = 1024; -uint32_t TestOptions::s_defaultMsgSize = 1024; -uint16_t TestOptions::s_defaultNumQueues = 1; -uint16_t TestOptions::s_defaultEnqThreadsPerQueue = 1; -uint16_t TestOptions::s_defaultDeqThreadsPerQueue = 1; - -TestOptions::TestOptions(const std::string& name) : - qpid::Options(name), - m_numMsgs(s_defaultNumMsgs), - m_msgSize(s_defaultMsgSize), - m_numQueues(s_defaultNumQueues), - m_numEnqThreadsPerQueue(s_defaultEnqThreadsPerQueue), - m_numDeqThreadsPerQueue(s_defaultDeqThreadsPerQueue) -{ - doAddOptions(); -} - -TestOptions::TestOptions(const uint32_t numMsgs, - const uint32_t msgSize, - const uint16_t numQueues, - const uint16_t numEnqThreadsPerQueue, - const uint16_t numDeqThreadsPerQueue, - const std::string& name) : - qpid::Options(name), - m_numMsgs(numMsgs), - m_msgSize(msgSize), - m_numQueues(numQueues), - m_numEnqThreadsPerQueue(numEnqThreadsPerQueue), - m_numDeqThreadsPerQueue(numDeqThreadsPerQueue) -{ - doAddOptions(); -} - -TestOptions::~TestOptions() -{} - -void -TestOptions::printVals(std::ostream& os) const -{ - os << "TEST OPTIONS:" << std::endl; - os << " Number of queues [-q, --num-queues]: " << m_numQueues << std::endl; - os << " Number of producers per queue [-p, --num-producers]: " << m_numEnqThreadsPerQueue << std::endl; - os << " Number of consumers per queue [-c, --num-consumers]: " << m_numDeqThreadsPerQueue << std::endl; - os << " Number of messages to send per producer [-m, --num-msgs]: " << m_numMsgs << std::endl; - os << " Size of each message (bytes) [-s, --msg-size]: " << m_msgSize << std::endl; -} - -void -TestOptions::validate() -{ - if (((m_numEnqThreadsPerQueue * m_numMsgs) % m_numDeqThreadsPerQueue) != 0) { - throw qpid::Exception("Parameter Error: (num-producers * num-msgs) must be a multiple of num-consumers."); - } -} - -void -TestOptions::doAddOptions() -{ - addOptions() - ("num-queues,q", qpid::optValue(m_numQueues, "N"), - "Number of queues") - ("num-producers,p", qpid::optValue(m_numEnqThreadsPerQueue, "N"), - "Number of producers per queue") - ("num-consumers,c", qpid::optValue(m_numDeqThreadsPerQueue, "N"), - "Number of consumers per queue") - ("num-msgs,m", qpid::optValue(m_numMsgs, "N"), - "Number of messages to send per producer") - ("msg-size,s", qpid::optValue(m_msgSize, "N"), - "Size of each message (bytes)") - ; -} - -}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/TestOptions.h b/cpp/src/tests/storePerfTools/common/TestOptions.h deleted file mode 100644 index 6a4479fa56..0000000000 --- a/cpp/src/tests/storePerfTools/common/TestOptions.h +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TestOptions.h - */ - -#ifndef tests_storePerfTools_common_TestOptions_h_ -#define tests_storePerfTools_common_TestOptions_h_ - -#include "qpid/Options.h" - -namespace tests { -namespace storePerftools { -namespace common { - -class TestOptions : public qpid::Options -{ -public: - TestOptions(const std::string& name="Test Options"); - TestOptions(const uint32_t numMsgs, - const uint32_t msgSize, - const uint16_t numQueues, - const uint16_t numEnqThreadsPerQueue, - const uint16_t numDeqThreadsPerQueue, - const std::string& name="Test Options"); - virtual ~TestOptions(); - void printVals(std::ostream& os) const; - void validate(); - - uint32_t m_numMsgs; ///< Number of messages to be sent - uint32_t m_msgSize; ///< Message size in bytes - uint16_t m_numQueues; ///< Number of queues to test simultaneously - uint16_t m_numEnqThreadsPerQueue; ///< Number of enqueue threads per queue - uint16_t m_numDeqThreadsPerQueue; ///< Number of dequeue threads per queue - -protected: - static uint32_t s_defaultNumMsgs; ///< Default number of messages to be sent - static uint32_t s_defaultMsgSize; ///< Default message size in bytes - static uint16_t s_defaultNumQueues; ///< Default number of queues to test simultaneously - static uint16_t s_defaultEnqThreadsPerQueue; ///< Default number of enqueue threads per queue - static uint16_t s_defaultDeqThreadsPerQueue; ///< Default number of dequeue threads per queue - - void doAddOptions(); - -}; - -}}} // namespace tests::storePerftools::common - -#endif // tests_storePerfTools_common_TestOptions_h_ diff --git a/cpp/src/tests/storePerfTools/common/TestParameters.cpp b/cpp/src/tests/storePerfTools/common/TestParameters.cpp deleted file mode 100644 index f36a2d3bda..0000000000 --- a/cpp/src/tests/storePerfTools/common/TestParameters.cpp +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TestParameters.cpp - */ - -#include "TestParameters.h" - -#include // std::atoi, std::atol - -namespace tests { -namespace storePerftools { -namespace common { - -// static declarations -uint32_t TestParameters::s_defaultNumMsgs = 1024; -uint32_t TestParameters::s_defaultMsgSize = 1024; -uint16_t TestParameters::s_defaultNumQueues = 1; -uint16_t TestParameters::s_defaultEnqThreadsPerQueue = 1; -uint16_t TestParameters::s_defaultDeqThreadsPerQueue = 1; - -TestParameters::TestParameters(): - Parameters(), - m_numMsgs(s_defaultNumMsgs), - m_msgSize(s_defaultMsgSize), - m_numQueues(s_defaultNumQueues), - m_numEnqThreadsPerQueue(s_defaultEnqThreadsPerQueue), - m_numDeqThreadsPerQueue(s_defaultDeqThreadsPerQueue)//, -{} - -TestParameters::TestParameters(const uint32_t numMsgs, - const uint32_t msgSize, - const uint16_t numQueues, - const uint16_t numEnqThreadsPerQueue, - const uint16_t numDeqThreadsPerQueue) : - Parameters(), - m_numMsgs(numMsgs), - m_msgSize(msgSize), - m_numQueues(numQueues), - m_numEnqThreadsPerQueue(numEnqThreadsPerQueue), - m_numDeqThreadsPerQueue(numDeqThreadsPerQueue) -{} - -TestParameters::TestParameters(const TestParameters& tp): - Parameters(), - m_numMsgs(tp.m_numMsgs), - m_msgSize(tp.m_msgSize), - m_numQueues(tp.m_numQueues), - m_numEnqThreadsPerQueue(tp.m_numEnqThreadsPerQueue), - m_numDeqThreadsPerQueue(tp.m_numDeqThreadsPerQueue) -{} - -TestParameters::~TestParameters() -{} - -void -TestParameters::toStream(std::ostream& os) const -{ - os << "Test Parameters:" << std::endl; - os << " num_msgs = " << m_numMsgs << std::endl; - os << " msg_size = " << m_msgSize << std::endl; - os << " num_queues = " << m_numQueues << std::endl; - os << " num_enq_threads_per_queue = " << m_numEnqThreadsPerQueue << std::endl; - os << " num_deq_threads_per_queue = " << m_numDeqThreadsPerQueue << std::endl; -} - -bool -TestParameters::parseArg(const int arg, - const char* optarg) -{ - switch(arg) { - case 'm': - m_numMsgs = uint32_t(std::atol(optarg)); - break; - case 'S': - m_msgSize = uint32_t(std::atol(optarg)); - break; - case 'q': - m_numQueues = uint16_t(std::atoi(optarg)); - break; - case 'e': - m_numEnqThreadsPerQueue = uint16_t(std::atoi(optarg)); - break; - case 'd': - m_numDeqThreadsPerQueue = uint16_t(std::atoi(optarg)); - break; - default: - return false; - } - return true; -} - -// static -void -TestParameters::printArgs(std::ostream& os) -{ - os << "Test parameters:" << std::endl; - os << " -m --num_msgs: Number of messages to send per enqueue thread [" - << TestParameters::s_defaultNumMsgs << "]" << std::endl; - os << " -S --msg_size: Size of each message to be sent [" - << TestParameters::s_defaultMsgSize << "]" << std::endl; - os << " -q --num_queues: Number of simultaneous queues [" - << TestParameters::s_defaultNumQueues << "]" << std::endl; - os << " -e --num_enq_threads_per_queue: Number of enqueue threads per queue [" - << TestParameters::s_defaultEnqThreadsPerQueue << "]" << std::endl; - os << " -d --num_deq_threads_per_queue: Number of dequeue threads per queue [" - << TestParameters::s_defaultDeqThreadsPerQueue << "]" << std::endl; - os << std::endl; -} - -// static -std::string -TestParameters::shortArgs() -{ - return "m:S:q:e:d:"; -} - -}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/TestParameters.h b/cpp/src/tests/storePerfTools/common/TestParameters.h deleted file mode 100644 index ea73589609..0000000000 --- a/cpp/src/tests/storePerfTools/common/TestParameters.h +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TestParameters.h - */ - -#ifndef tests_storePerfTools_common_TestParameters_h_ -#define tests_storePerfTools_common_TestParameters_h_ - -#include "Parameters.h" - -#include // uint16_t, uint32_t - -namespace tests { -namespace storePerftools { -namespace common { - -class TestOptions; - -/** - * \brief Struct for aggregating the test parameters - * - * This struct is used to aggregate and keep together all the test parameters. These affect the test itself, the - * journal geometry is aggregated in class JrnlParameters. - */ -class TestParameters : public Parameters -{ -public: - static uint32_t s_defaultNumMsgs; ///< Default number of messages to be sent - static uint32_t s_defaultMsgSize; ///< Default message size in bytes - static uint16_t s_defaultNumQueues; ///< Default number of queues to test simultaneously - static uint16_t s_defaultEnqThreadsPerQueue; ///< Default number of enqueue threads per queue - static uint16_t s_defaultDeqThreadsPerQueue; ///< Default number of dequeue threads per queue - - uint32_t m_numMsgs; ///< Number of messages to be sent - uint32_t m_msgSize; ///< Message size in bytes - uint16_t m_numQueues; ///< Number of queues to test simultaneously - uint16_t m_numEnqThreadsPerQueue; ///< Number of enqueue threads per queue - uint16_t m_numDeqThreadsPerQueue; ///< Number of dequeue threads per queue - - /** - * \brief Defaault constructor - * - * Default constructor. Uses the default values for all parameters. - */ - TestParameters(); - - /** - * \brief Constructor - * - * Convenience constructor. - * - * \param numMsgs Number of messages to be sent - * \param msgSize Message size in bytes - * \param numQueues Number of queues to test simultaneously - * \param numEnqThreadsPerQueue Number of enqueue threads per queue - * \param numDeqThreadsPerQueue Number of dequeue threads per queue - */ - TestParameters(const uint32_t numMsgs, - const uint32_t msgSize, - const uint16_t numQueues, - const uint16_t numEnqThreadsPerQueue, - const uint16_t numDeqThreadsPerQueue); - - /** - * \brief Copy constructor - * - * \param tp Reference to JrnlPerfTestParameters instance to be copied - */ - TestParameters(const TestParameters& tp); - - /** - * \brief Virtual destructor - */ - virtual ~TestParameters(); - - virtual bool parseArg(const int arg, - const char* optarg); - - static void printArgs(std::ostream& os); - - static std::string shortArgs(); - - /*** - * \brief Stream the test parameters to an output stream - * - * Convenience feature which streams a multi-line representation of all the test parameters, one per line to an - * output stream. - * - * \param os Output stream to which the class data is to be streamed - */ - void toStream(std::ostream& os = std::cout) const; - -}; - -}}} // namespace tests::storePerftools::common - -#endif // tests_storePerfTools_common_TestParameters_h_ diff --git a/cpp/src/tests/storePerfTools/common/TestResult.cpp b/cpp/src/tests/storePerfTools/common/TestResult.cpp deleted file mode 100644 index c3e9d27dfc..0000000000 --- a/cpp/src/tests/storePerfTools/common/TestResult.cpp +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TestResult.cpp - */ - -#include "TestResult.h" - -namespace tests { -namespace storePerftools { -namespace common { - -TestResult::TestResult() : - ScopedTimable(), - Streamable() -{} - -TestResult::~TestResult() -{} - -}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/TestResult.h b/cpp/src/tests/storePerfTools/common/TestResult.h deleted file mode 100644 index e2217286b4..0000000000 --- a/cpp/src/tests/storePerfTools/common/TestResult.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TestResult.h - */ - -#ifndef tests_storePerfTools_common_TestResult_h_ -#define tests_storePerfTools_common_TestResult_h_ - -#include "ScopedTimable.h" -#include "Streamable.h" - -namespace tests { -namespace storePerftools { -namespace common { - -class TestResult : public ScopedTimable, public Streamable -{ -public: - TestResult(); - virtual ~TestResult(); - void toStream(std::ostream& os = std::cout) const = 0; -}; - -}}} // namespace tests:storePerftools::common - -#endif // tests_storePerfTools_common_TestResult_h_ diff --git a/cpp/src/tests/storePerfTools/common/Thread.cpp b/cpp/src/tests/storePerfTools/common/Thread.cpp deleted file mode 100644 index 188e102e8f..0000000000 --- a/cpp/src/tests/storePerfTools/common/Thread.cpp +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file Thread.cpp - */ - -#include "Thread.h" - -#include "PerftoolError.h" - -namespace tests { -namespace storePerftools { -namespace common { - -Thread::Thread(startFn_t sf, - void* p) : - m_running(true) -{ - PTHREAD_CHK(::pthread_create(&m_thread, NULL, sf, p), "::pthread_create", "Thread", "Thread"); -} - -Thread::Thread(Thread::startFn_t sf, - void* p, - const std::string& id) : - m_id(id), - m_running(true) -{ - PTHREAD_CHK(::pthread_create(&m_thread, NULL, sf, p), "::pthread_create", "Thread", "Thread"); -} - -Thread::~Thread() -{ - if (m_running) { - PTHREAD_CHK(::pthread_detach(m_thread), "pthread_detach", "~Thread", "Thread"); - } -} - -const std::string& -Thread::getId() const -{ - return m_id; -} - -void Thread::join() -{ - PTHREAD_CHK(::pthread_join(m_thread, NULL), "pthread_join", "join", "Thread"); - m_running = false; -} - -}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/Thread.h b/cpp/src/tests/storePerfTools/common/Thread.h deleted file mode 100644 index 505d038162..0000000000 --- a/cpp/src/tests/storePerfTools/common/Thread.h +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file Thread.h - */ - -#ifndef tests_storePerfTools_common_Thread_h_ -#define tests_storePerfTools_common_Thread_h_ - -#include -#include - -namespace tests { -namespace storePerftools { -namespace common { - -/** - * \brief Ultra-simple pthread class. - */ -class Thread { -public: - typedef void*(*startFn_t)(void*); ///< Thread entry point function pointer type - - /** - * \brief Constructor - * \param sf Pointer to thread entry function - * \param p Void pointer to parameter of start function - */ - Thread(startFn_t sf, - void* p); - - /** - * \brief Constructor - * \param sf Pointer to thread entry function - * \param p Void pointer to parameter of start function - * \param id Name of this thread instance - */ - Thread(startFn_t sf, - void* p, - const std::string& id); - - /** - * \brief Destructor - */ - virtual ~Thread(); - - /** - * \brief Get the name of this thread. - * \return Name as supplied to the constructor. - */ - const std::string& getId() const; - - /** - * \brief Wait for this thread instance to finish running startFn(). - */ - void join(); - -private: - ::pthread_t m_thread; ///< Internal posix thread - std::string m_id; ///< Identifier for this thread instance - bool m_running; ///< \b true is the thread is active and running, \b false when not yet started or joined. - -}; - -}}} // namespace tests::storePerftools::common - -#endif // tests_storePerfTools_common_Thread_h_ diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp b/cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp deleted file mode 100644 index 6efdc06fc8..0000000000 --- a/cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file Journal.cpp - */ - -#include "Journal.h" - -#ifdef JOURNAL2 -# include "qpid/asyncStore/jrnl2/DataToken.h" - -# define X_JRNL_FN_DEQUEUE(dtok) dequeue(dtok, 0, 0) -# define X_JRNL_FN_ENQUEUE(dtok, msgData, msgSize) enqueue(dtok, msgData, msgSize, 0, 0, false) -# define X_JRNL_FN_FLUSH(jrnlPtr) { jrnlPtr->flush(); jrnlPtr->sync(); } -# define X_JRNL_FN_GETDTOKSTATUS(dtok) dtok -# define X_JRNL_FN_GETEVENTS(timeout) processCompletedAioWriteEvents(timeout) -# define X_JRNL_FN_GETIOSTR(iores) qpid::asyncStore::jrnl2::g_ioResAsString(iores) -# define X_JRNL_IO_OP_RES qpid::asyncStore::jrnl2::jrnlOpRes -# define X_JRNL_IO_OP_RES_BUSY qpid::asyncStore::jrnl2::RHM_IORES_BUSY -# define X_JRNL_IO_OP_RES_ENQCAPTHRESH qpid::asyncStore::jrnl2::RHM_IORES_ENQCAPTHRESH -# define X_JRNL_IO_OP_RES_SUCCESS 0 -# define X_SCOPED_LOCK qpid::asyncStore::jrnl2::ScopedLock -#else -# include "jrnl/jcntl.hpp" -# include "jrnl/data_tok.hpp" - -# define X_JRNL_FN_DEQUEUE(dtok) dequeue_data_record(dtok) -# define X_JRNL_FN_ENQUEUE(dtok, msgData, msgSize) enqueue_data_record(msgData, msgSize, msgSize, dtok); -# define X_JRNL_FN_FLUSH(jrnlPtr) jrnlPtr->flush(true) -# define X_JRNL_FN_GETDTOKSTATUS(dtok) dtok->status_str() -# define X_JRNL_FN_GETEVENTS(timeout) get_wr_events(timeout) -# define X_JRNL_FN_GETIOSTR(iores) mrg::journal::iores_str(iores) -# define X_JRNL_IO_OP_RES mrg::journal::iores -# define X_JRNL_IO_OP_RES_BUSY mrg::journal::RHM_IORES_BUSY -# define X_JRNL_IO_OP_RES_ENQCAPTHRESH mrg::journal::RHM_IORES_ENQCAPTHRESH -# define X_JRNL_IO_OP_RES_SUCCESS mrg::journal::RHM_IORES_SUCCESS -# define X_SCOPED_LOCK mrg::journal::slock -#endif - -#include - -namespace tests { -namespace storePerftools { -namespace jrnlPerf { - -Journal::Journal(const uint32_t numMsgs, - const uint32_t msgSize, - const char* msgData, - X_ASYNC_JOURNAL* const jrnlPtr) : - m_numMsgs(numMsgs), - m_msgSize(msgSize), - m_msgData(msgData), - m_jrnlPtr(jrnlPtr) -{} - -Journal::~Journal() -{ - delete m_jrnlPtr; -} - - -// *** MUST BE THREAD-SAFE **** -// This method will be called by multiple threads simultaneously -// Enqueue thread entry point -void* -Journal::runEnqueues() -{ - bool misfireFlag = false; - uint32_t i = 0; - while (i < m_numMsgs) { - X_DATA_TOKEN* mtokPtr = new X_DATA_TOKEN(); - X_JRNL_IO_OP_RES jrnlIoRes = m_jrnlPtr->X_JRNL_FN_ENQUEUE(mtokPtr, m_msgData, m_msgSize); - switch (jrnlIoRes) { - case X_JRNL_IO_OP_RES_SUCCESS: - i++; - misfireFlag = false; - break; - case X_JRNL_IO_OP_RES_BUSY: - if (!misfireFlag) { - std::cout << "-" << std::flush; - } - delete mtokPtr; - misfireFlag = true; - break; - case X_JRNL_IO_OP_RES_ENQCAPTHRESH: - if (!misfireFlag) { - std::cout << "*" << std::flush; - } - delete mtokPtr; - misfireFlag = true; - ::usleep(10); - break; - default: - delete mtokPtr; - std::cerr << "enqueue_data_record FAILED with " << X_JRNL_FN_GETIOSTR(jrnlIoRes) << std::endl; - } - } - /// \todo handle these results - X_JRNL_FN_FLUSH(m_jrnlPtr); - return NULL; -} - - -// *** MUST BE THREAD-SAFE **** -// This method will be called by multiple threads simultaneously -// Dequeue thread entry point -void* -Journal::runDequeues() -{ - uint32_t i = 0; - X_JRNL_IO_OP_RES jrnlIoRes; - while (i < m_numMsgs) { - X_DATA_TOKEN* mtokPtr = 0; - while (!mtokPtr) { - bool procAioEventsFlag; - { // --- START OF CRITICAL SECTION --- - X_SCOPED_LOCK l(m_unprocCallbacksMutex); - procAioEventsFlag = m_unprocCallbacks.empty(); - if (!procAioEventsFlag) { - mtokPtr = m_unprocCallbacks.back(); - m_unprocCallbacks.pop_back(); - } - } // --- END OF CRITICAL SECTION --- - if (procAioEventsFlag) { - m_jrnlPtr->X_JRNL_FN_GETEVENTS(0); - ::usleep(1); - } - } - bool done = false; - while (!done) { - jrnlIoRes = m_jrnlPtr->X_JRNL_FN_DEQUEUE(mtokPtr); - switch (jrnlIoRes) { - case X_JRNL_IO_OP_RES_SUCCESS: - i ++; - done = true; - break; - case X_JRNL_IO_OP_RES_BUSY: - //::usleep(10); - break; - default: - std::cerr << "dequeue_data_record FAILED with " << X_JRNL_FN_GETIOSTR(jrnlIoRes) << ": " - << X_JRNL_FN_GETDTOKSTATUS(mtokPtr) << std::endl; - delete mtokPtr; - done = true; - } - } - m_jrnlPtr->X_JRNL_FN_GETEVENTS(0); - } - /// \todo handle these results - X_JRNL_FN_FLUSH(m_jrnlPtr); - return NULL; -} - -//static -void* -Journal::startEnqueues(void* ptr) -{ - return reinterpret_cast(ptr)->runEnqueues(); -} - -//static -void* -Journal:: startDequeues(void* ptr) -{ - return reinterpret_cast(ptr)->runDequeues(); -} - -// *** MUST BE THREAD-SAFE **** -// This method will be called by multiple threads simultaneously -void -Journal::X_AIO_WR_CALLBACK(std::vector& msgTokenList) -{ - X_DATA_TOKEN* mtokPtr; - while (msgTokenList.size()) { - mtokPtr = msgTokenList.back(); - msgTokenList.pop_back(); -#ifdef JOURNAL2 - switch (mtokPtr->getDataOpState().get()) { - case qpid::asyncStore::jrnl2::OP_ENQUEUE: -#else - switch (mtokPtr->wstate()) { - case X_DATA_TOKEN::ENQ: -#endif - { // --- START OF CRITICAL SECTION --- - X_SCOPED_LOCK l(m_unprocCallbacksMutex); - m_unprocCallbacks.push_back(mtokPtr); - } // --- END OF CRITICAL SECTION --- - break; - default: - delete mtokPtr; - } - } -} - -// *** MUST BE THREAD-SAFE **** -// This method will be called by multiple threads simultaneously -void -Journal::X_AIO_RD_CALLBACK(std::vector& /*buffPageCtrlBlkIndexList*/) -{} - -}}} // namespace tests::storePerftools::jrnlPerf diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/Journal.h b/cpp/src/tests/storePerfTools/jrnlPerf/Journal.h deleted file mode 100644 index 19d0980b53..0000000000 --- a/cpp/src/tests/storePerfTools/jrnlPerf/Journal.h +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file Journal.h - */ - -#ifndef tests_storePerfTools_jrnlPerf_Journal_h_ -#define tests_storePerfTools_jrnlPerf_Journal_h_ - -#ifdef JOURNAL2 -# include "qpid/asyncStore/jrnl2/AioCallback.h" -# include "qpid/asyncStore/jrnl2/AsyncJournal.h" -# include "qpid/asyncStore/jrnl2/ScopedLock.h" -#else -# include "jrnl/aio_callback.hpp" -# include "jrnl/smutex.hpp" -#endif - -#include // uint16_t, uint32_t - -#ifdef JOURNAL2 -# define X_AIO_CALLBACK qpid::asyncStore::jrnl2::AioCallback -# define X_AIO_RD_CALLBACK readAioCompleteCallback -# define X_AIO_WR_CALLBACK writeAioCompleteCallback -# define X_ASYNC_JOURNAL qpid::asyncStore::jrnl2::AsyncJournal -# define X_DATA_TOKEN qpid::asyncStore::jrnl2::DataToken -# define X_SCOPED_MUTEX qpid::asyncStore::jrnl2::ScopedMutex -#else -# define X_AIO_CALLBACK mrg::journal::aio_callback -# define X_AIO_RD_CALLBACK rd_aio_cb -# define X_AIO_WR_CALLBACK wr_aio_cb -# define X_ASYNC_JOURNAL mrg::journal::jcntl -# define X_DATA_TOKEN mrg::journal::data_tok -# define X_SCOPED_MUTEX mrg::journal::smutex -#endif - -#ifndef JOURNAL2 -namespace mrg { -namespace journal { -class jcntl; -}} // namespace mrg::journal -namespace qpid { -namespace asyncStore { -namespace jrnl2 { -class AsyncJournal; -}}} // namespace qpid::asyncStore::jrnl2 -#endif - -namespace tests { -namespace storePerftools { -namespace jrnlPerf { - -/** - * \brief Test journal instance. Each queue to be tested results in one instance of this class. - * - * Journal test harness which contains the journal to be tested. Each queue to be tested in the test parameters - * results in one instance of this class being instantiated, and consequently one set of journals on disk. The - * journal instance is provided as a pointer to the constructor. - */ -class Journal : public X_AIO_CALLBACK -{ -public: - /** - * \brief Constructor - * - * \param numMsgs Number of messages per thread to be enqueued then dequeued (ie both ways through broker) - * \param msgSize Size of each message being enqueued - * \param msgData Pointer to message content (all messages have identical content) - * \param jrnlPtr Pinter to journal instance which is to be tested - */ - Journal(const uint32_t numMsgs, - const uint32_t msgSize, - const char* msgData, - X_ASYNC_JOURNAL* const jrnlPtr); - - /** - * \brief virtual destructor - */ - virtual ~Journal(); - - /** - * \brief Worker thread enqueue task - * - * This function is the worker thread enqueue task entry point. It enqueues _numMsgs onto the journal instance. - * A data tokens is created for each record, this is the start of the data token life cycle. All possible - * returns from the journal are handled appropriately. Since the enqueue threads also perform - * callbacks on completed AIO operations, the data tokens from completed enqueues are placed onto the - * unprocessed callback list (_unprocCallbackList) for dequeueing by the dequeue worker thread(s). - * - * This function must be thread safe. - */ - void* runEnqueues(); - - /** - * \brief Worker thread dequeue task - * - * This function is the worker thread dequeue task entry point. It dequeues messages which are on the - * unprocessed callback list (_unprocCallbackList). - * - * This function must be thread safe. - */ - void* runDequeues(); - - /** - * \brief Helper function to launch the run() function when starting a thread. - */ - static void* startEnqueues(void* ptr); - - /** - * \brief Helper function to launch the run() function when starting a thread. - */ - static void* startDequeues(void* ptr); - - /** - * \brief Write callback function. When AIO operations return, this function is called. - * - * When AIO operations return, this function will sort the enqueue ops from the rest and place the data tokens - * of these records onto the unprocessed callback list (_unprocCallbackList) for dequeueing by another thread. - * - * Returning dequeue ops have their data tokens destroyed, as this is the end of the life cycle of the data - * tokens. - * - * Required by all subclasses of mrg::journal::aio_callback. - * - * \param dataTokenList A vector of data tokens for those messages which have completed their AIO write - * operations - */ - void X_AIO_WR_CALLBACK(std::vector& dataTokenList); - - /** - * \brief Read callback function. When read AIO operations return, this function is called. - * - * Not used in this test, but required by all subclasses of mrg::journal::aio_callback. - * - * \param buffPageCtrlBlkIndexList A vector of indices to the buffer page control blocks for completed reads - */ - void X_AIO_RD_CALLBACK(std::vector& buffPageCtrlBlkIndexList); - -protected: - const uint32_t m_numMsgs; ///< Number of messages to be processed by this journal instance - const uint32_t m_msgSize; ///< Size of each message (in bytes) - const char* m_msgData; ///< Pointer to message content to be used for each message. - X_ASYNC_JOURNAL* const m_jrnlPtr; ///< Journal instance pointer - std::vector m_unprocCallbacks; ///< List of unprocessed callbacks to be dequeued - X_SCOPED_MUTEX m_unprocCallbacksMutex; ///< Mutex which protects the unprocessed callback queue - - -}; - -}}} // namespace tests::storePerftools::jrnlPerf - -#endif // tests_storePerfTools_jrnlPerf_Journal_h_ diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.cpp b/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.cpp deleted file mode 100644 index 2b07619041..0000000000 --- a/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.cpp +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file JournalParameters.cpp - */ - -#include "JournalParameters.h" - -#include // std::atof, std::atoi, std::atol - -namespace tests { -namespace storePerftools { -namespace jrnlPerf { - -#ifndef JOURNAL2 -// static declarations - for jrnl2, these are inherited -std::string JournalParameters::s_defaultJrnlDir = "/tmp/store"; -std::string JournalParameters::s_defaultJrnlBaseFileName = "JournalData"; -uint16_t JournalParameters::s_defaultNumJrnlFiles = 8; -uint32_t JournalParameters::s_defaultJrnlFileSize_sblks = 3072; -uint16_t JournalParameters::s_defaultWriteBuffNumPgs = 32; -uint32_t JournalParameters::s_defaultWriteBuffPgSize_sblks = 128; -#endif - -JournalParameters::JournalParameters() : -#ifdef JOURNAL2 - qpid::asyncStore::jrnl2::JournalParameters() -#else - Parameters(), - m_jrnlDir(s_defaultJrnlDir), - m_jrnlBaseFileName(s_defaultJrnlBaseFileName), - m_numJrnlFiles(s_defaultNumJrnlFiles), - m_jrnlFileSize_sblks(s_defaultJrnlFileSize_sblks), - m_writeBuffNumPgs(s_defaultWriteBuffNumPgs), - m_writeBuffPgSize_sblks(s_defaultWriteBuffPgSize_sblks) -#endif -{} - -JournalParameters::JournalParameters(const std::string& jrnlDir, - const std::string& jrnlBaseFileName, - const uint16_t numJrnlFiles, - const uint32_t jrnlFileSize_sblks, - const uint16_t writeBuffNumPgs, - const uint32_t writeBuffPgSize_sblks) : -#ifdef JOURNAL2 - qpid::asyncStore::jrnl2::JournalParameters(jrnlDir, jrnlBaseFileName, numJrnlFiles, jrnlFileSize_sblks, writeBuffNumPgs, - writeBuffPgSize_sblks) -#else - Parameters(), - m_jrnlDir(jrnlDir), - m_jrnlBaseFileName(jrnlBaseFileName), - m_numJrnlFiles(numJrnlFiles), - m_jrnlFileSize_sblks(jrnlFileSize_sblks), - m_writeBuffNumPgs(writeBuffNumPgs), - m_writeBuffPgSize_sblks(writeBuffPgSize_sblks) -#endif -{} - -#ifdef JOURNAL2 -JournalParameters::JournalParameters(const qpid::asyncStore::jrnl2::JournalParameters& jp) : - qpid::asyncStore::jrnl2::JournalParameters(jp) -{} -#endif - -JournalParameters::JournalParameters(const JournalParameters& jp) : -#ifdef JOURNAL2 - qpid::asyncStore::jrnl2::JournalParameters(jp) -#else - Parameters(), - m_jrnlDir(jp.m_jrnlDir), - m_jrnlBaseFileName(jp.m_jrnlBaseFileName), - m_numJrnlFiles(jp.m_numJrnlFiles), - m_jrnlFileSize_sblks(jp.m_jrnlFileSize_sblks), - m_writeBuffNumPgs(jp.m_writeBuffNumPgs), - m_writeBuffPgSize_sblks(jp.m_writeBuffPgSize_sblks) -#endif -{} - -JournalParameters::~JournalParameters() -{} - -void -JournalParameters::toStream(std::ostream& os) const -{ - os << "Journal Parameters:" << std::endl; - os << " jrnlDir = \"" << m_jrnlDir << "\"" << std::endl; - os << " jrnlBaseFileName = \"" << m_jrnlBaseFileName << "\"" << std::endl; - os << " numJrnlFiles = " << m_numJrnlFiles << std::endl; - os << " jrnlFileSize_sblks = " << m_jrnlFileSize_sblks << std::endl; - os << " writeBuffNumPgs = " << m_writeBuffNumPgs << std::endl; - os << " writeBuffPgSize_sblks = " << m_writeBuffPgSize_sblks << std::endl; -} - -bool -JournalParameters::parseArg(const int arg, - const char* optarg) -{ - switch(arg) { - case 'j': - m_jrnlDir.assign(optarg); - break; - case 'b': - m_jrnlBaseFileName.assign(optarg); - break; - case 'f': - m_numJrnlFiles = uint16_t(std::atoi(optarg)); - break; - case 's': - m_jrnlFileSize_sblks = uint32_t(std::atol(optarg)); - break; - case 'p': - m_writeBuffNumPgs = uint16_t(std::atoi(optarg)); - break; - case 'c': - m_writeBuffPgSize_sblks = uint32_t(std::atol(optarg)); - break; - default: - return false; - } - return true; -} - -// static -void -JournalParameters::printArgs(std::ostream& os) -{ - os << "Journal parameters:" << std::endl; - os << " -j --jrnl_dir: Store directory [\"" - << JournalParameters::s_defaultJrnlDir << "\"]" << std::endl; - os << " -b --jrnl_base_filename: Base name for journal files [\"" - << JournalParameters::s_defaultJrnlBaseFileName << "\"]" << std::endl; - os << " -f --num_jfiles: Number of journal files [" - << JournalParameters::s_defaultNumJrnlFiles << "]" << std::endl; - os << " -s --jfsize_sblks: Size of each journal file in sblks (512 byte blocks) [" - << JournalParameters::s_defaultJrnlFileSize_sblks << "]" << std::endl; - os << " -p --wcache_num_pages: Number of write buffer pages [" - << JournalParameters::s_defaultWriteBuffNumPgs << "]" << std::endl; - os << " -c --wcache_pgsize_sblks: Size of each write buffer page in sblks (512 byte blocks) [" - << JournalParameters::s_defaultWriteBuffPgSize_sblks << "]" << std::endl; -} - -// static -std::string -JournalParameters::shortArgs() -{ - return "j:b:f:s:p:c:"; -} - -}}} // namespace tests::storePerftools::jrnlPerf diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.h b/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.h deleted file mode 100644 index ab7f864a91..0000000000 --- a/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.h +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file JournalParameters.h - */ - -#ifndef tests_storePerfTools_jrnlPerf_JournalParameters_h_ -#define tests_storePerfTools_jrnlPerf_JournalParameters_h_ - -#include "tests/storePerfTools/common/Parameters.h" - -#ifdef JOURNAL2 -# include "qpid/asyncStore/jrnl2/JournalParameters.h" -#endif - -#include // uint16_6, uint32_t - -namespace tests { -namespace storePerftools { -namespace jrnlPerf { - -/** - * \brief Stuct for aggregating the common journal parameters - * - * This struct is used to aggregate and keep together all the common journal parameters. These affect the journal - * geometry and buffers. The test parameters are aggregated in class JrnlPerfTestParameters. - */ -class JournalParameters : -#ifdef JOURNAL2 - public qpid::asyncStore::jrnl2::JournalParameters, -#endif - public tests::storePerftools::common::Parameters -{ -public: -#ifndef JOURNAL2 - // static default store params - static std::string s_defaultJrnlDir; ///< Default journal directory - static std::string s_defaultJrnlBaseFileName; ///< Default journal base file name - static uint16_t s_defaultNumJrnlFiles; ///< Default number of journal data files - static uint32_t s_defaultJrnlFileSize_sblks; ///< Default journal data file size in softblocks - static uint16_t s_defaultWriteBuffNumPgs; ///< Default number of write buffer pages - static uint32_t s_defaultWriteBuffPgSize_sblks; ///< Default size of each write buffer page in softblocks - - std::string m_jrnlDir; ///< Journal directory - std::string m_jrnlBaseFileName; ///< Journal base file name - uint16_t m_numJrnlFiles; ///< Number of journal data files - uint32_t m_jrnlFileSize_sblks; ///< Journal data file size in softblocks - uint16_t m_writeBuffNumPgs; ///< Number of write buffer pages - uint32_t m_writeBuffPgSize_sblks; ///< Size of each write buffer page in softblocks -#endif - - /** - * \brief Default constructor - * - * Default constructor. Uses the default values for all parameters. - */ - JournalParameters(); - - /** - * \brief Constructor - * - * Convenience constructor. - * - * \param jrnlDir Journal directory - * \param jrnlBaseFileName Journal base file name - * \param numJrnlFiles Number of journal data files - * \param jrnlFileSize_sblks Journal data file size in softblocks - * \param writeBuffNumPgs Number of write buffer pages - * \param writeBuffPgSize_sblks Size of each write buffer page in softblocks - */ - JournalParameters(const std::string& jrnlDir, - const std::string& jrnlBaseFileName, - const uint16_t numJrnlFiles, - const uint32_t jrnlFileSize_sblks, - const uint16_t writeBuffNumPgs, - const uint32_t writeBuffPgSize_sblks); - - /** - * \brief Copy constructor - * - * \param jp Reference to JrnlParameters instance to be copied - */ -#ifdef JOURNAL2 - JournalParameters(const qpid::asyncStore::jrnl2::JournalParameters& jp); -#endif - JournalParameters(const JournalParameters& jp); - - /** - * \brief Virtual destructor - */ - virtual ~JournalParameters(); - - virtual bool parseArg(const int arg, - const char* optarg); - - static void printArgs(std::ostream& os); - - static std::string shortArgs(); - - /*** - * \brief Stream the journal parameters to an output stream - * - * Convenience feature which streams a multi-line representation of all the journal parameters, one per line to - * an output stream. - * - * \param os Output stream to which the class data is to be streamed - */ - void toStream(std::ostream& os = std::cout) const; - -}; - -}}} // namespace tests::storePerftools::jrnlPerf - -#endif // tests_storePerfTools_jrnlPerf_JournalParameters_h_ diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.cpp b/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.cpp deleted file mode 100644 index 17a9c9b6cb..0000000000 --- a/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.cpp +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file PerfTest.cpp - */ - -#include "PerfTest.h" - -#include "Journal.h" -#include "JournalParameters.h" - -#include "tests/storePerfTools/version.h" -#include "tests/storePerfTools/common/ScopedTimer.h" -#include "tests/storePerfTools/common/TestParameters.h" -#include "tests/storePerfTools/common/Thread.h" - -#ifdef JOURNAL2 -# include "qpid/asyncStore/jrnl2/AsyncJournal.h" -# include "qpid/asyncStore/jrnl2/JournalDirectory.h" -#else -# include "jrnl/jcntl.hpp" -# include "jrnl/jdir.hpp" -#endif - -#include -#include // getopt_long(), required_argument, no_argument -#include // std::setw() std::setfill() -#include // std::ostringstream -#include // uint16_t, uint32_t - -#ifdef ECLIPSE_CDT_ANNOYANCE // This prevents problems with Eclipse CODAN, which can't see this in getopt.h - struct option; - extern int getopt_long (int, char *const *, const char *, const struct option *, int *) __THROW; -# define no_argument 0 -# define required_argument 1 -# define optional_argument 2 -#endif - -namespace tests { -namespace storePerftools { -namespace jrnlPerf { - -PerfTest::PerfTest(const tests::storePerftools::common::TestParameters& tp, - const JournalParameters& jp) : - Streamable(), - m_testParams(tp), - m_jrnlParams(jp), - m_testResult(tp), - m_msgData(new char[tp.m_msgSize]) -{} - -PerfTest::~PerfTest() -{ - delete[] m_msgData; -} - -void -PerfTest::prepareJournals(std::vector& jrnlList) -{ -#ifdef JOURNAL2 - if (qpid::asyncStore::jrnl2::JournalDirectory::s_exists(m_jrnlParams.m_jrnlDir)) { - qpid::asyncStore::jrnl2::JournalDirectory::s_destroy(m_jrnlParams.m_jrnlDir); - } - qpid::asyncStore::jrnl2::JournalDirectory::s_create(m_jrnlParams.m_jrnlDir); - qpid::asyncStore::jrnl2::AsyncJournal* jp; -#else - if (mrg::journal::jdir::exists(m_jrnlParams.m_jrnlDir)) { - mrg::journal::jdir::delete_dir(m_jrnlParams.m_jrnlDir); - } - mrg::journal::jdir::create_dir(m_jrnlParams.m_jrnlDir); - mrg::journal::jcntl* jp; -#endif - Journal* ptp; - for (uint16_t j = 0; j < m_testParams.m_numQueues; j++) { - std::ostringstream jname; - jname << "jrnl_" << std::setw(4) << std::setfill('0') << j; - std::ostringstream jdir; - jdir << m_jrnlParams.m_jrnlDir << "/" << jname.str(); -#ifdef JOURNAL2 - jp = new qpid::asyncStore::jrnl2::AsyncJournal(jname.str(), jdir.str(), m_jrnlParams.m_jrnlBaseFileName); -#else - jp = new mrg::journal::jcntl(jname.str(), jdir.str(), m_jrnlParams.m_jrnlBaseFileName); -#endif - ptp = new Journal(m_testParams.m_numMsgs, m_testParams.m_msgSize, m_msgData, jp); -#ifdef JOURNAL2 - jp->initialize(&m_jrnlParams, ptp); -#else - jp->initialize(m_jrnlParams.m_numJrnlFiles, false, m_jrnlParams.m_numJrnlFiles, - m_jrnlParams.m_jrnlFileSize_sblks, m_jrnlParams.m_writeBuffNumPgs, - m_jrnlParams.m_writeBuffPgSize_sblks, ptp); -#endif - - jrnlList.push_back(ptp); - } -} - -void -PerfTest::destroyJournals(std::vector& jrnlList) -{ - while (jrnlList.size()) { - delete jrnlList.back(); - jrnlList.pop_back(); - } -} - -void -PerfTest::run() -{ - std::vector jrnlList; - prepareJournals(jrnlList); - - std::deque threads; - tests::storePerftools::common::Thread* tp; - { // --- Start of timed section --- - tests::storePerftools::common::ScopedTimer st(m_testResult); - - for (uint16_t q = 0; q < m_testParams.m_numQueues; q++) { - for (uint16_t t = 0; t < m_testParams.m_numEnqThreadsPerQueue; t++) { - tp = new tests::storePerftools::common::Thread(jrnlList[q]->startEnqueues, reinterpret_cast(jrnlList[q])); - threads.push_back(tp); - } - for (uint16_t dt = 0; dt < m_testParams.m_numDeqThreadsPerQueue; ++dt) { - tp = new tests::storePerftools::common::Thread(jrnlList[q]->startDequeues, reinterpret_cast(jrnlList[q])); - threads.push_back(tp); - } - } - - while (threads.size()) { - threads.front()->join(); - delete threads.front(); - threads.pop_front(); - } - } // --- End of timed section --- - destroyJournals(jrnlList); -} - -void -PerfTest::toStream(std::ostream& os) const -{ - os << m_testParams << std::endl; - os << m_jrnlParams << std::endl; - os << m_testResult << std::endl; -} - -void -printArgs(std::ostream& os) -{ - os << " -h --help: This help message" << std::endl; - os << std::endl; - - tests::storePerftools::common::TestParameters::printArgs(os); - os << std::endl; - - JournalParameters::printArgs(os); - os << std::endl; -} - -bool -readArgs(int argc, - char** argv, - tests::storePerftools::common::TestParameters& tp, - JournalParameters& jp) -{ - /// \todo TODO: At some point, find an easy way to aggregate these from JrnlPerfTestParameters and JrnlParameters themselves. - static struct option long_options[] = { - {"help", no_argument, 0, 'h'}, - {"version", no_argument, 0, 'v'}, - - // Test params - {"num_msgs", required_argument, 0, 'm'}, - {"msg_size", required_argument, 0, 'S'}, - {"num_queues", required_argument, 0, 'q'}, - {"num_enq_threads_per_queue", required_argument, 0, 'e'}, - {"num_deq_threads_per_queue", required_argument, 0, 'd'}, - - // Journal params - {"jrnl_dir", required_argument, 0, 'j'}, - {"jrnl_base_filename", required_argument, 0, 'b'}, - {"num_jfiles", required_argument, 0, 'f'}, - {"jfsize_sblks", required_argument, 0, 's'}, - {"wcache_num_pages", required_argument, 0, 'p'}, - {"wcache_pgsize_sblks", required_argument, 0, 'c'}, - - {0, 0, 0, 0} - }; - - bool err = false; - bool ver = false; - int c = 0; - while (true) { - int option_index = 0; - std::ostringstream oss; - oss << "hv" << tests::storePerftools::common::TestParameters::shortArgs() << JournalParameters::shortArgs(); - c = getopt_long(argc, argv, oss.str().c_str(), long_options, &option_index); - if (c == -1) break; - if (c == 'v') { - std::cout << tests::storePerftools::name() << " v." << tests::storePerftools::version() << std::endl; - ver = true; - break; - } - err = !(tp.parseArg(c, optarg) || jp.parseArg(c, optarg)); - } - if (err) { - std::cout << std::endl; - printArgs(); - } - return err || ver; -} - -}}} // namespace tests::storePerftools::jrnlPerf - -// ----------------------------------------------------------------- - -int -main(int argc, char** argv) -{ - tests::storePerftools::common::TestParameters tp; - tests::storePerftools::jrnlPerf::JournalParameters jp; - if (tests::storePerftools::jrnlPerf::readArgs(argc, argv, tp, jp)) return 1; - tests::storePerftools::jrnlPerf::PerfTest jpt(tp, jp); - jpt.run(); - std::cout << jpt << std::endl; - return 0; -} diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.h b/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.h deleted file mode 100644 index b105bc0488..0000000000 --- a/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.h +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file PerfTest.h - */ - -#ifndef tests_storePerfTools_jrnlPerf_PerfTest_h_ -#define tests_storePerfTools_jrnlPerf_PerfTest_h_ - -#include "TestResult.h" -#include "tests/storePerfTools/common/Streamable.h" - -#include - -namespace tests { -namespace storePerftools { -namespace common { -class TestParameters; -} -namespace jrnlPerf { - -class Journal; -class JournalParameters; - -/** - * \brief Main test class; Create an instance and execute run() - * - * Main test class which aggregates the components of a test. - */ -class PerfTest : public tests::storePerftools::common::Streamable -{ -public: - /** - * \brief Constructor - * - * \param tp Test parameters for the test - * \param jp Journal parameters for all queues (journals) in the test - */ - PerfTest(const tests::storePerftools::common::TestParameters& tp, - const JournalParameters& jp); - - /** - * \brief Virtual destructor - */ - virtual ~PerfTest(); - - /** - * \brief Runs the test and prints out the results. - * - * Runs the test as set by the test parameters and journal parameters. - */ - void run(); - - /** - * \brief Stream the test setup and results to an output stream - * - * Convenience feature which streams the test setup and results to an output stream. - * - * \param os Output stream to which the test setup and results are to be streamed. - */ - void toStream(std::ostream& os = std::cout) const; - -protected: - const tests::storePerftools::common::TestParameters& m_testParams; ///< Ref to a struct containing test params - const JournalParameters& m_jrnlParams; ///< Ref to a struct containing the journal parameters - TestResult m_testResult; ///< Journal performance object - const char* m_msgData; ///< Pointer to msg data, which is the same for all messages - - /** - * \brief Creates journals and JrnlInstance classes for all journals (queues) to be tested - * - * Creates a new journal instance and JrnlInstance instance for each queue. The journals are initialized - * which creates a new set of journal files on the local storage media (which is determined by path in - * JrnlParameters._jrnlDir). This activity is not timed, and is not a part of the performance test per se. - * - * \param jrnlList List which will be filled with pointers to the newly prepared journals - */ - void prepareJournals(std::vector& jrnlList); - - /** - * \brief Destroy the journal instances in list jrnlList - * - * \param jrnlList List of pointers to journals to be destroyed - */ - void destroyJournals(std::vector& jrnlList); - -}; - -/** - * \brief Print out the program arguments - * - * Print out the arguments to the performance program if requested by help or a parameter error. - * - * \param os Stream to which the arguments should be streamed. - */ -void printArgs(std::ostream& os = std::cout); - -/** - * \brief Process the command-line arguments - * - * Process the command-line arguments and populate the JrnlPerfTestParameters and JrnlParameters structs. Only the - * arguments supplied are on the command-line are changed in these structs, the others remain unchanged. It is - * important therefore to make sure that defaults are pre-loaded (the default behavior of the default constructors - * for these structs). - * - * \param argc Number of command-line arguments. Process directly from main(). - * \param argv Pointer to array of command-line argument pointers. Process directly from main(). - * \param tp Reference to test parameter object. Only params on the command-line are changed. - * \param jp Reference to journal parameter object. Only params on the command-line are changed. - */ -bool readArgs(int argc, - char** argv, - tests::storePerftools::common::TestParameters& tp, - JournalParameters& jp); - -}}} // namespace tests::storePerftools::jrnlPerf - -#endif // tests_storePerfTools_jrnlPerf_PerfTest_h_ diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.cpp b/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.cpp deleted file mode 100644 index 9fe214726d..0000000000 --- a/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TestResult.cpp - */ - -#include "TestResult.h" - -#include // uint32_t - -namespace tests { -namespace storePerftools { -namespace jrnlPerf { - -TestResult::TestResult(const tests::storePerftools::common::TestParameters& tp) : - tests::storePerftools::common::TestResult(), - m_testParams(tp) -{} - -TestResult::~TestResult() -{} - -void -TestResult::toStream(std::ostream& os) const -{ - double msgsRate; - os << "TEST RESULTS:" << std::endl; - os << " Msgs per thread: " << m_testParams.m_numMsgs << std::endl; - os << " Msg size: " << m_testParams.m_msgSize << std::endl; - os << " No. queues: " << m_testParams.m_numQueues << std::endl; - os << " No. enq threads/queue: " << m_testParams.m_numEnqThreadsPerQueue << std::endl; - os << " No. deq threads/queue: " << m_testParams.m_numDeqThreadsPerQueue << std::endl; - os << " Time taken: " << m_elapsed << " sec" << std::endl; - uint32_t msgsPerQueue = m_testParams.m_numMsgs * m_testParams.m_numEnqThreadsPerQueue; - if (m_testParams.m_numQueues > 1) { - msgsRate = double(msgsPerQueue) / m_elapsed; - os << " No. msgs per queue: " << msgsPerQueue << std::endl; - os << "Per queue msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl; - os << " " << (msgsRate * m_testParams.m_msgSize / 1e6) << " MB/sec" << std::endl; - } - uint32_t totalMsgs = msgsPerQueue * m_testParams.m_numQueues; - msgsRate = double(totalMsgs) / m_elapsed; - os << " Total no. msgs: " << totalMsgs << std::endl; - os << " Broker msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl; - os << " " << (msgsRate * m_testParams.m_msgSize / 1e6) << " MB/sec" << std::endl; -} - -}}} // namespace tests::storePerftools::jrnlPerf diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.h b/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.h deleted file mode 100644 index dae09a6032..0000000000 --- a/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.h +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TestResult.h - */ - -#ifndef tests_storePerfTools_jrnlPerf_TestResult_h_ -#define tests_storePerfTools_jrnlPerf_TestResult_h_ - -#include "tests/storePerfTools/common/TestParameters.h" -#include "tests/storePerfTools/common/TestResult.h" - -namespace tests { -namespace storePerftools { -namespace jrnlPerf { - -class TestOptions; - -/** - * \brief Results class that accepts an elapsed time to calculate the rate of message throughput in the journal. - * - * This class (being subclassed from ScopedTimable) is passed to a ScopedTimer object on construction, and the - * inherited _elapsed member will be written with the calculated elapsed time (in seconds) on destruction of the - * ScopedTimer object. This time (initially set to 0.0) is used to calculate message and message byte throughput. - * The message number and size information comes from the JrnlPerfTestParameters object passed to the constructor. - * - * Results are available through the use of toStream(), toString() or the << operators. - * - * Output is in the following format: - *
- * TEST RESULTS:
- *     Msgs per thread: 10000
- *            Msg size: 2048
- *          No. queues: 2
- *   No. threads/queue: 2
- *          Time taken: 1.6626 sec
- *      Total no. msgs: 40000
- *      Msg throughput: 24.0587 kMsgs/sec
- *                      49.2723 MB/sec
- * 
- */ -class TestResult : public tests::storePerftools::common::TestResult -{ -public: - /** - * \brief Constructor - * - * Constructor. Will start the time interval measurement. - * - * \param tp Test parameter details used to calculate the performance results. - */ - TestResult(const tests::storePerftools::common::TestParameters& tp); - - /** - * \brief Virtual destructor - */ - virtual ~TestResult(); - - /** - * \brief Stream the performance test results to an output stream - * - * Convenience feature which streams a multi-line performance result an output stream. - * - * \param os Output stream to which the results are to be streamed - */ - void toStream(std::ostream& os = std::cout) const; - -protected: - tests::storePerftools::common::TestParameters m_testParams; ///< Test parameters used for performance calculations - -}; - -}}} // namespace tests::storePerftools::jrnlPerf - -#endif // tests_storePerfTools_jrnlPerf_TestResult_h_ diff --git a/cpp/src/tests/storePerfTools/version.h b/cpp/src/tests/storePerfTools/version.h deleted file mode 100644 index 311b145330..0000000000 --- a/cpp/src/tests/storePerfTools/version.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file version.h - */ - -#ifndef tests_storePerftools_version_h_ -#define tests_storePerftools_version_h_ - -#include -#include - -namespace tests { -namespace storePerftools { - -static const int versionMajor = 0; -static const int versionMinor = 0; -static const int versionRevision = 1; - -std::string name() { - return "Qpid async store perftools"; -} - -std::string version() { - std::ostringstream oss; - oss << versionMajor << "." << versionMinor << "." << versionRevision; - return oss.str(); -} - -}} // namespace tests::perftools - -#endif // tests_storePerftools_version_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp new file mode 100644 index 0000000000..5cc829f4d2 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockPersistableMessage.cpp + */ + +#include "MockPersistableMessage.h" +#include "MockPersistableQueue.h" // debug statements in enqueueComplete() and dequeueComplete() + +#include "qpid/asyncStore/AsyncStoreImpl.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +// --- Inner class Queue::MessageContext --- + +MockPersistableMessage::MessageContext::MessageContext(MockPersistableMessagePtr msg, + const qpid::asyncStore::AsyncOperation::opCode op, + MockPersistableQueue* q) : + m_msg(msg), + m_op(op), + m_q(q) +{} + +MockPersistableMessage::MessageContext::~MessageContext() +{} + +const char* +MockPersistableMessage::MessageContext::getOp() const +{ + return qpid::asyncStore::AsyncOperation::getOpStr(m_op); +} + +void +MockPersistableMessage::MessageContext::destroy() +{ + delete this; +} + +// --- Class MockPersistableMessage --- + + +MockPersistableMessage::MockPersistableMessage(const char* msgData, + const uint32_t msgSize, + qpid::asyncStore::AsyncStoreImpl* store, + const bool persistent) : + m_persistenceId(0ULL), + m_msg(msgData, static_cast(msgSize)), + m_persistent(persistent), + m_msgHandle(store->createMessageHandle(this)) +{} + +MockPersistableMessage::~MockPersistableMessage() +{} + +// static +void +MockPersistableMessage::handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc) +{ + if (bc) { + MessageContext* mc = dynamic_cast(bc); + if (mc->m_msg) { + if (res->errNo) { + // TODO: Handle async failure here + std::cerr << "Message pid=0x" << std::hex << mc->m_msg->m_persistenceId << std::dec << ": Operation " + << mc->getOp() << ": failure " << res->errNo << " (" << res->errMsg << ")" << std::endl; + } else { + // Handle async success here + switch(mc->m_op) { + case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE: + mc->m_msg->dequeueComplete(mc); + break; + case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE: + mc->m_msg->enqueueComplete(mc); + break; + default: + std::ostringstream oss; + oss << "tests::storePerftools::asyncPerf::MockPersistableMessage::handleAsyncResult(): Unknown async queue operation: " << mc->m_op; + throw qpid::Exception(oss.str()); + }; + } + } + } + if (bc) delete bc; + if (res) delete res; +} + +qpid::broker::MessageHandle& +MockPersistableMessage::getHandle() +{ + return m_msgHandle; +} + +void +MockPersistableMessage::setPersistenceId(uint64_t id) const +{ + m_persistenceId = id; +} + +uint64_t +MockPersistableMessage::getPersistenceId() const +{ + return m_persistenceId; +} + +void +MockPersistableMessage::encode(qpid::framing::Buffer& buffer) const +{ + buffer.putRawData(m_msg); +} + +uint32_t +MockPersistableMessage::encodedSize() const +{ + return static_cast(m_msg.size()); +} + +void +MockPersistableMessage::allDequeuesComplete() +{} + +uint32_t +MockPersistableMessage::encodedHeaderSize() const +{ + return 0; +} + +bool +MockPersistableMessage::isPersistent() const +{ + return m_persistent; +} + +uint64_t +MockPersistableMessage::getSize() +{ + return m_msg.size(); +} + +void +MockPersistableMessage::write(char* target) +{ + ::memcpy(target, m_msg.data(), m_msg.size()); +} + +// protected +void +MockPersistableMessage::enqueueComplete(const MessageContext* mc) +{ +//std::cout << "~~~~~ Message pid=0x" << std::hex << mc->m_msg->getPersistenceId() << std::dec << ": enqueueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl << std::flush; + assert(mc->m_msg.get() == this); +} + +// protected +void +MockPersistableMessage::dequeueComplete(const MessageContext* mc) +{ +//std::cout << "~~~~~ Message pid=0x" << std::hex << mc->m_msg->getPersistenceId() << std::dec << ": dequeueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl << std::flush; + assert(mc->m_msg.get() == this); +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h new file mode 100644 index 0000000000..7039c4bd08 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockPersistableMessage.h + */ + +#ifndef tests_storePerftools_asyncPerf_MockPersistableMessage_h_ +#define tests_storePerftools_asyncPerf_MockPersistableMessage_h_ + +#include "qpid/asyncStore/AsyncOperation.h" +#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource +#include "qpid/broker/BrokerContext.h" +#include "qpid/broker/MessageHandle.h" +#include "qpid/broker/PersistableMessage.h" + +#include // uint32_t + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class MockPersistableMessage; +class MockPersistableQueue; + +typedef boost::shared_ptr MockPersistableMessagePtr; + +class MockPersistableMessage: public qpid::broker::PersistableMessage, qpid::broker::DataSource +{ +public: + class MessageContext : public qpid::broker::BrokerContext + { + public: + MessageContext(MockPersistableMessagePtr msg, + const qpid::asyncStore::AsyncOperation::opCode op, + MockPersistableQueue* q); + virtual ~MessageContext(); + const char* getOp() const; + void destroy(); + MockPersistableMessagePtr m_msg; + const qpid::asyncStore::AsyncOperation::opCode m_op; + MockPersistableQueue* m_q; + }; + + MockPersistableMessage(const char* msgData, + const uint32_t msgSize, + qpid::asyncStore::AsyncStoreImpl* store, + const bool persistent = true); + virtual ~MockPersistableMessage(); + static void handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc); + qpid::broker::MessageHandle& getHandle(); + + // Interface Persistable + virtual void setPersistenceId(uint64_t id) const; + virtual uint64_t getPersistenceId() const; + virtual void encode(qpid::framing::Buffer& buffer) const; + virtual uint32_t encodedSize() const; + + // Interface PersistableMessage + virtual void allDequeuesComplete(); + virtual uint32_t encodedHeaderSize() const; + virtual bool isPersistent() const; + + // Interface DataStore + virtual uint64_t getSize(); + virtual void write(char* target); + +protected: + mutable uint64_t m_persistenceId; + const std::string m_msg; + const bool m_persistent; + qpid::broker::MessageHandle m_msgHandle; + + // --- Ascnc op completions (called through handleAsyncResult) --- + void enqueueComplete(const MessageContext* mc); + void dequeueComplete(const MessageContext* mc); + +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerfools_asyncPerf_MockPersistableMessage_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp new file mode 100644 index 0000000000..c1d637f621 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockPersistableQueue.cpp + */ + +#include "MockPersistableQueue.h" + +#include "MockPersistableMessage.h" +#include "MockTransactionContext.h" +#include "QueuedMessage.h" +#include "TestOptions.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/broker/EnqueueHandle.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +// --- Inner class MockPersistableQueue::QueueContext --- + +MockPersistableQueue::QueueContext::QueueContext(intrusive_ptr q, + const qpid::asyncStore::AsyncOperation::opCode op) : + qpid::broker::BrokerContext(), + m_q(q), + m_op(op) +{ + assert(m_q.get() != 0); +} + +MockPersistableQueue::QueueContext::~QueueContext() +{} + +qpid::asyncStore::AsyncOperation::opCode +MockPersistableQueue::QueueContext::getOpCode() const +{ + return m_op; +} + +const char* +MockPersistableQueue::QueueContext::getOpStr() const +{ + return qpid::asyncStore::AsyncOperation::getOpStr(m_op); +} + +MockPersistableQueue::intrusive_ptr +MockPersistableQueue::QueueContext::getQueue() const +{ + return m_q; +} + +void +MockPersistableQueue::QueueContext::destroy() +{ + delete this; +} + +// --- Class MockPersistableQueue --- + +MockPersistableQueue::MockPersistableQueue(const std::string& name, + const qpid::framing::FieldTable& /*args*/, + qpid::asyncStore::AsyncStoreImpl* store, + const TestOptions& to, + const char* msgData) : + qpid::broker::PersistableQueue(), + m_name(name), + m_store(store), + m_persistenceId(0ULL), + m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. + m_perfTestOpts(to), + m_msgData(msgData) +{ + const qpid::types::Variant::Map qo; + m_queueHandle = m_store->createQueueHandle(m_name, qo); +} + +MockPersistableQueue::~MockPersistableQueue() +{ +// m_store->flush(*this); + // TODO: Make destroying the store a test parameter +// m_store->destroy(*this); +// m_store = 0; +} + +// static +void +MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc) +{ + if (bc && res) { + QueueContext* qc = dynamic_cast(bc); + if (res->errNo) { + // TODO: Handle async failure here + std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << res->errNo << " (" << res->errMsg << ")" << std::endl; + } else { + // Handle async success here + switch(qc->getOpCode()) { + case qpid::asyncStore::AsyncOperation::QUEUE_CREATE: + qc->getQueue()->createComplete(qc); + break; + case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH: + qc->getQueue()->flushComplete(qc); + break; + case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY: + qc->getQueue()->destroyComplete(qc); + break; + default: + std::ostringstream oss; + oss << "tests::storePerftools::asyncPerf::MockPersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode(); + throw qpid::Exception(oss.str()); + }; + } + } + if (bc) delete bc; + if (res) delete res; +} + +qpid::broker::QueueHandle& +MockPersistableQueue::getHandle() +{ + return m_queueHandle; +} + +void +MockPersistableQueue::asyncStoreCreate() +{ + m_store->submitCreate(m_queueHandle, + this, + &handleAsyncResult, + new QueueContext(this, qpid::asyncStore::AsyncOperation::QUEUE_CREATE)); +} + +void +MockPersistableQueue::asyncStoreDestroy() +{ + m_store->submitDestroy(m_queueHandle, + &handleAsyncResult, + new QueueContext(this, qpid::asyncStore::AsyncOperation::QUEUE_DESTROY)); +} + +void* +MockPersistableQueue::runEnqueues() +{ + uint32_t numMsgs = 0; + uint16_t txnCnt = 0; + const bool useTxn = m_perfTestOpts.m_enqTxnBlockSize > 0; + MockTransactionContextPtr txn; + while (numMsgs < m_perfTestOpts.m_numMsgs) { + if (useTxn && txnCnt == 0) { + txn.reset(new MockTransactionContext(m_store)); // equivalent to begin() + } + MockPersistableMessagePtr msg(new MockPersistableMessage(m_msgData, m_perfTestOpts.m_msgSize, m_store, true)); + msg->setPersistenceId(m_store->getNextRid()); + qpid::broker::EnqueueHandle enqHandle = m_store->createEnqueueHandle(msg->getHandle(), m_queueHandle); + MockPersistableMessage::MessageContext* msgCtxt = new MockPersistableMessage::MessageContext(msg, + qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, + this); + if (useTxn) { + m_store->submitEnqueue(enqHandle, + txn->getHandle(), + &MockPersistableMessage::handleAsyncResult, + dynamic_cast(msgCtxt)); + } else { + m_store->submitEnqueue(enqHandle, + &MockPersistableMessage::handleAsyncResult, + dynamic_cast(msgCtxt)); + } + QueuedMessagePtr qm(new QueuedMessage(msg, enqHandle, txn)); + push(qm); + if (useTxn && ++txnCnt >= m_perfTestOpts.m_enqTxnBlockSize) { + txn->commit(); + txnCnt = 0; + } + ++numMsgs; + } + if (txnCnt > 0) { + txn->commit(); + txnCnt = 0; + } + return 0; +} + +void* +MockPersistableQueue::runDequeues() +{ + uint32_t numMsgs = 0; + const uint32_t numMsgsToDequeue = m_perfTestOpts.m_numMsgs * m_perfTestOpts.m_numEnqThreadsPerQueue / m_perfTestOpts.m_numDeqThreadsPerQueue; + uint16_t txnCnt = 0; + const bool useTxn = m_perfTestOpts.m_deqTxnBlockSize > 0; + MockTransactionContextPtr txn; + QueuedMessagePtr qm; + while (numMsgs < numMsgsToDequeue) { + if (useTxn && txnCnt == 0) { + txn.reset(new MockTransactionContext(m_store)); // equivalent to begin() + } + pop(qm); + if (qm.get()) { + qpid::broker::EnqueueHandle enqHandle = qm->getEnqueueHandle(); + qpid::broker::BrokerContext* bc = new MockPersistableMessage::MessageContext(qm->getMessage(), + qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, + this); + if (useTxn) { + m_store->submitDequeue(enqHandle, + txn->getHandle(), + &MockPersistableMessage::handleAsyncResult, + bc); + } else { + m_store->submitDequeue(enqHandle, + &MockPersistableMessage::handleAsyncResult, + bc); + } + ++numMsgs; + qm.reset(static_cast(0)); + if (useTxn && ++txnCnt >= m_perfTestOpts.m_deqTxnBlockSize) { + txn->commit(); + txnCnt = 0; + } + } + } + if (txnCnt > 0) { + txn->commit(); + txnCnt = 0; + } + return 0; +} + +//static +void* +MockPersistableQueue::startEnqueues(void* ptr) +{ + return reinterpret_cast(ptr)->runEnqueues(); +} + +//static +void* +MockPersistableQueue::startDequeues(void* ptr) +{ + return reinterpret_cast(ptr)->runDequeues(); +} + +void +MockPersistableQueue::encode(qpid::framing::Buffer& buffer) const +{ + buffer.putShortString(m_name); +} + +uint32_t +MockPersistableQueue::encodedSize() const +{ + return m_name.size() + 1; +} + +uint64_t +MockPersistableQueue::getPersistenceId() const +{ + return m_persistenceId; +} + +void +MockPersistableQueue::setPersistenceId(uint64_t persistenceId) const +{ + m_persistenceId = persistenceId; +} + +void +MockPersistableQueue::flush() +{ + //if(m_store) m_store->flush(*this); +} + +const std::string& +MockPersistableQueue::getName() const +{ + return m_name; +} + +void +MockPersistableQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) +{ + if (externalQueueStore != inst && externalQueueStore) + delete externalQueueStore; + externalQueueStore = inst; +} + +uint64_t +MockPersistableQueue::getSize() +{ + return m_persistableData.size(); +} + +void +MockPersistableQueue::write(char* target) +{ + ::memcpy(target, m_persistableData.data(), m_persistableData.size()); +} + +// protected +void +MockPersistableQueue::createComplete(const QueueContext* qc) +{ +//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": createComplete()" << std::endl << std::flush; + assert(qc->getQueue().get() == this); +} + +// protected +void +MockPersistableQueue::flushComplete(const QueueContext* qc) +{ +//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": flushComplete()" << std::endl << std::flush; + assert(qc->getQueue().get() == this); +} + +// protected +void +MockPersistableQueue::destroyComplete(const QueueContext* qc) +{ +//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": destroyComplete()" << std::endl << std::flush; + assert(qc->getQueue().get() == this); +} + +// protected +void +MockPersistableQueue::push(QueuedMessagePtr& qm) +{ + qpid::sys::ScopedLock l(m_enqueuedMsgsMutex); + m_enqueuedMsgs.push_back(qm); + m_dequeueCondition.notify(); +} + +// protected +void +MockPersistableQueue::pop(QueuedMessagePtr& qm) +{ + qpid::sys::ScopedLock l(m_enqueuedMsgsMutex); + while (m_enqueuedMsgs.empty()) { + m_dequeueCondition.wait(m_enqueuedMsgsMutex); + } + qm = m_enqueuedMsgs.front(); + if (qm->isTransactional()) { + // The next msg is still in an open transaction, skip and find next non-open-txn msg + MsgEnqListItr i = m_enqueuedMsgs.begin(); + while (++i != m_enqueuedMsgs.end()) { + if (!(*i)->isTransactional()) { + qm = *i; + m_enqueuedMsgs.erase(i); + } + } + } else { + // The next msg is not in an open txn + m_enqueuedMsgs.pop_front(); + } +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h new file mode 100644 index 0000000000..2e06b0256e --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockPersistableQueue.h + */ + +#ifndef tests_storePerftools_asyncPerf_MockPersistableQueue_h_ +#define tests_storePerftools_asyncPerf_MockPersistableQueue_h_ + +#include "qpid/asyncStore/AsyncOperation.h" +#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource +#include "qpid/broker/BrokerContext.h" +#include "qpid/broker/PersistableQueue.h" +#include "qpid/broker/QueueHandle.h" +#include "qpid/sys/Condition.h" +#include "qpid/sys/Mutex.h" + +#include +#include +#include + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +} +namespace framing { +class FieldTable; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class QueuedMessage; +class TestOptions; + +typedef boost::shared_ptr QueuedMessagePtr; + +class MockPersistableQueue : public qpid::broker::PersistableQueue, public qpid::broker::DataSource +{ +public: + typedef boost::intrusive_ptr intrusive_ptr; + + class QueueContext : public qpid::broker::BrokerContext + { + public: + QueueContext(intrusive_ptr q, + const qpid::asyncStore::AsyncOperation::opCode op); + virtual ~QueueContext(); + qpid::asyncStore::AsyncOperation::opCode getOpCode() const; + const char* getOpStr() const; + intrusive_ptr getQueue() const; + void destroy(); + protected: + intrusive_ptr m_q; + const qpid::asyncStore::AsyncOperation::opCode m_op; + }; + + MockPersistableQueue(const std::string& name, + const qpid::framing::FieldTable& args, + qpid::asyncStore::AsyncStoreImpl* store, + const TestOptions& perfTestParams, + const char* msgData); + virtual ~MockPersistableQueue(); + + // --- Async functionality --- + static void handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc); + qpid::broker::QueueHandle& getHandle(); + void asyncStoreCreate(); + void asyncStoreDestroy(); + + // --- Performance test thread entry points --- + void* runEnqueues(); + void* runDequeues(); + static void* startEnqueues(void* ptr); + static void* startDequeues(void* ptr); + + // --- Interface qpid::broker::Persistable --- + virtual void encode(qpid::framing::Buffer& buffer) const; + virtual uint32_t encodedSize() const; + virtual uint64_t getPersistenceId() const; + virtual void setPersistenceId(uint64_t persistenceId) const; + + // --- Interface qpid::broker::PersistableQueue --- + virtual void flush(); + virtual const std::string& getName() const; + virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst); + + // --- Interface DataStore --- + virtual uint64_t getSize(); + virtual void write(char* target); + +protected: + const std::string m_name; + qpid::asyncStore::AsyncStoreImpl* m_store; + mutable uint64_t m_persistenceId; + std::string m_persistableData; + qpid::broker::QueueHandle m_queueHandle; + + // Test params + const TestOptions& m_perfTestOpts; + const char* m_msgData; + + typedef std::deque MsgEnqList; + typedef MsgEnqList::iterator MsgEnqListItr; + MsgEnqList m_enqueuedMsgs; + qpid::sys::Mutex m_enqueuedMsgsMutex; + qpid::sys::Condition m_dequeueCondition; + + // --- Ascnc op completions (called through handleAsyncResult) --- + void createComplete(const QueueContext* qc); + void flushComplete(const QueueContext* qc); + void destroyComplete(const QueueContext* qc); + + // --- Queue functionality --- + void push(QueuedMessagePtr& msg); + void pop(QueuedMessagePtr& msg); +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_MockPersistableQueue_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp new file mode 100644 index 0000000000..10be34c6f5 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockTransactionContext.cpp + */ + +#include "MockTransactionContext.h" + +#include "QueuedMessage.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +// --- Inner class MockTransactionContext::QueueContext --- + +MockTransactionContext::TransactionContext::TransactionContext(MockTransactionContext* tc, + const qpid::asyncStore::AsyncOperation::opCode op) : + m_tc(tc), + m_op(op) +{} + +MockTransactionContext::TransactionContext::~TransactionContext() +{} + +const char* +MockTransactionContext::TransactionContext::getOp() const +{ + return qpid::asyncStore::AsyncOperation::getOpStr(m_op); +} + +void +MockTransactionContext::TransactionContext::destroy() +{ + delete this; +} + +// --- Class MockTransactionContext --- + + +MockTransactionContext::MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, + const std::string& xid) : + m_store(store), + m_txnHandle(store->createTxnHandle(xid)), + m_prepared(false), + m_enqueuedMsgs() +{ +//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; +} + +MockTransactionContext::~MockTransactionContext() +{} + +// static +void +MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc) +{ + if (bc && res) { + TransactionContext* tc = dynamic_cast(bc); + if (tc->m_tc) { + if (res->errNo) { + // TODO: Handle async failure here + std::cerr << "Transaction xid=\"" << tc->m_tc->getXid() << "\": Operation " << tc->getOp() << ": failure " + << res->errNo << " (" << res->errMsg << ")" << std::endl; + } else { + // Handle async success here + switch(tc->m_op) { + case qpid::asyncStore::AsyncOperation::TXN_PREPARE: + tc->m_tc->prepareComplete(tc); + break; + case qpid::asyncStore::AsyncOperation::TXN_COMMIT: + tc->m_tc->commitComplete(tc); + break; + case qpid::asyncStore::AsyncOperation::TXN_ABORT: + tc->m_tc->abortComplete(tc); + break; + default: + std::ostringstream oss; + oss << "tests::storePerftools::asyncPerf::MockTransactionContext::handleAsyncResult(): Unknown async operation: " << tc->m_op; + throw qpid::Exception(oss.str()); + }; + } + } + } + if (bc) delete bc; + if (res) delete res; +} + +qpid::broker::TxnHandle& +MockTransactionContext::getHandle() +{ + return m_txnHandle; +} + +bool +MockTransactionContext::is2pc() const +{ + return m_txnHandle.is2pc(); +} + +const std::string& +MockTransactionContext::getXid() const +{ + return m_txnHandle.getXid(); +} + +void +MockTransactionContext::addEnqueuedMsg(QueuedMessage* qm) +{ + qpid::sys::ScopedLock l(m_enqueuedMsgsMutex); + m_enqueuedMsgs.push_back(qm); +} + +void +MockTransactionContext::prepare() +{ + if (m_txnHandle.is2pc()) { + localPrepare(); + m_prepared = true; + } + std::ostringstream oss; + oss << "MockTransactionContext::prepare(): xid=\"" << getXid() + << "\": Transaction Error: called prepare() on local transaction"; + throw qpid::Exception(oss.str()); +} + +void +MockTransactionContext::abort() +{ + // TODO: Check the following XA transaction semantics: + // Assuming 2PC aborts can occur without a prepare. Do local prepare if not already prepared. + if (!m_prepared) { + localPrepare(); + } + m_store->submitAbort(m_txnHandle, + &handleAsyncResult, + dynamic_cast(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT))); +//std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; +} + +void +MockTransactionContext::commit() +{ + if (is2pc()) { + if (!m_prepared) { + std::ostringstream oss; + oss << "MockTransactionContext::abort(): xid=\"" << getXid() + << "\": Transaction Error: called commit() without prepare() on 2PC transaction"; + throw qpid::Exception(oss.str()); + } + } else { + localPrepare(); + } + m_store->submitCommit(m_txnHandle, + &handleAsyncResult, + dynamic_cast(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT))); +//std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; +} + + +// protected +void +MockTransactionContext::localPrepare() +{ + m_store->submitPrepare(m_txnHandle, + &handleAsyncResult, + dynamic_cast(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE))); +//std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; +} + +// protected +void +MockTransactionContext::prepareComplete(const TransactionContext* tc) +{ + qpid::sys::ScopedLock l(m_enqueuedMsgsMutex); + while (!m_enqueuedMsgs.empty()) { + m_enqueuedMsgs.front()->clearTransaction(); + m_enqueuedMsgs.pop_front(); + } +//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": prepareComplete()" << std::endl << std::flush; + assert(tc->m_tc == this); +} + + +// protected +void +MockTransactionContext::abortComplete(const TransactionContext* tc) +{ +//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": abortComplete()" << std::endl << std::flush; + assert(tc->m_tc == this); +} + + +// protected +void +MockTransactionContext::commitComplete(const TransactionContext* tc) +{ +//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": commitComplete()" << std::endl << std::flush; + assert(tc->m_tc == this); +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h new file mode 100644 index 0000000000..883da198bb --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockTransactionContext.h + */ + +#ifndef tests_storePerftools_asyncPerf_MockTransactionContext_h_ +#define tests_storePerftools_asyncPerf_MockTransactionContext_h_ + +#include "qpid/asyncStore/AsyncOperation.h" + +#include "qpid/broker/BrokerContext.h" +#include "qpid/broker/TransactionalStore.h" // qpid::broker::TransactionContext +#include "qpid/broker/TxnHandle.h" +#include "qpid/sys/Mutex.h" + +#include +#include + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class QueuedMessage; + +class MockTransactionContext : public qpid::broker::TransactionContext +{ +public: + // NOTE: TransactionContext - Bad naming? This context is the async return handling context for class + // MockTransactionContext async ops. Other classes using this pattern simply use XXXContext for this class + // (e.g. QueueContext in MockPersistableQueue), but in this case it may be confusing. + class TransactionContext : public qpid::broker::BrokerContext + { + public: + TransactionContext(MockTransactionContext* tc, + const qpid::asyncStore::AsyncOperation::opCode op); + virtual ~TransactionContext(); + const char* getOp() const; + void destroy(); + MockTransactionContext* m_tc; + const qpid::asyncStore::AsyncOperation::opCode m_op; + }; + + MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, + const std::string& xid = std::string()); + virtual ~MockTransactionContext(); + static void handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc); + + qpid::broker::TxnHandle& getHandle(); + bool is2pc() const; + const std::string& getXid() const; + void addEnqueuedMsg(QueuedMessage* qm); + + void prepare(); + void abort(); + void commit(); + +protected: + qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::TxnHandle m_txnHandle; + bool m_prepared; + std::deque m_enqueuedMsgs; + qpid::sys::Mutex m_enqueuedMsgsMutex; + + void localPrepare(); + + // --- Ascnc op completions (called through handleAsyncResult) --- + void prepareComplete(const TransactionContext* tc); + void abortComplete(const TransactionContext* tc); + void commitComplete(const TransactionContext* tc); + +}; + +}}} // namespace tests:storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_MockTransactionContext_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp new file mode 100644 index 0000000000..7387c348fd --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file PerfTest.cpp + */ + +#include "PerfTest.h" + +#include "MockPersistableQueue.h" + +#include "tests/storePerftools/version.h" +#include "tests/storePerftools/common/ScopedTimer.h" +#include "tests/storePerftools/common/Thread.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" + +#include + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +PerfTest::PerfTest(const TestOptions& to, + const qpid::asyncStore::AsyncStoreOptions& aso) : + m_testOpts(to), + m_storeOpts(aso), + m_testResult(to), + m_msgData(new char[to.m_msgSize]), + m_poller(new qpid::sys::Poller), + m_pollingThread(m_poller.get()), + m_store(0) +{ + std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize); +} + +PerfTest::~PerfTest() +{ + m_poller->shutdown(); + m_pollingThread.join(); + + m_queueList.clear(); + + if (m_store) delete m_store; + delete[] m_msgData; +} + +void +PerfTest::prepareStore() +{ + m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts); + m_store->initialize(); +} + +void +PerfTest::prepareQueues() +{ + for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) { + std::ostringstream qname; + qname << "queue_" << std::setw(4) << std::setfill('0') << i; + MockPersistableQueue::intrusive_ptr mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store, m_testOpts, m_msgData)); + mpq->asyncStoreCreate(); + m_queueList.push_back(mpq); + } +} + +void +PerfTest::destroyQueues() +{ + for (std::deque::iterator i=m_queueList.begin(); i!=m_queueList.end(); ++i) { + (*i)->asyncStoreDestroy(); + } +} + +void +PerfTest::run() +{ + typedef boost::shared_ptr ThreadPtr; // TODO - replace with qpid threads + + prepareStore(); + prepareQueues(); + + std::deque threads; + { // --- Start of timed section --- + tests::storePerftools::common::ScopedTimer st(m_testResult); + + for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) { + for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads + ThreadPtr tp(new tests::storePerftools::common::Thread(m_queueList[q]->startEnqueues, + reinterpret_cast(m_queueList[q].get()))); + threads.push_back(tp); + } + for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads + ThreadPtr tp(new tests::storePerftools::common::Thread(m_queueList[q]->startDequeues, + reinterpret_cast(m_queueList[q].get()))); + threads.push_back(tp); + } + } + while (threads.size()) { + threads.front()->join(); + threads.pop_front(); + } + } // --- End of timed section --- + // TODO: Add test param to allow queues to be destroyed or left when test ends + destroyQueues(); +} + +void +PerfTest::toStream(std::ostream& os) const +{ + m_testOpts.printVals(os); + os << std::endl; + m_storeOpts.printVals(os); + os << std::endl; + os << m_testResult << std::endl; +} + +}}} // namespace tests::storePerftools::asyncPerf + +// ----------------------------------------------------------------- + +int +main(int argc, char** argv) +{ + qpid::CommonOptions co; + qpid::asyncStore::AsyncStoreOptions aso; + tests::storePerftools::asyncPerf::TestOptions to; + qpid::Options opts; + opts.add(co).add(aso).add(to); + try { + opts.parse(argc, argv); + aso.validate(); + to.validate(); + } + catch (std::exception& e) { + std::cerr << e.what() << std::endl; + return 1; + } + + // Handle options that just print information then exit. + if (co.version) { + std::cout << tests::storePerftools::name() << " v." << tests::storePerftools::version() << std::endl; + return 0; + } + if (co.help) { + std::cout << tests::storePerftools::name() << ": asyncPerf" << std::endl; + std::cout << "Performance test for the async store through the qpid async store interface." << std::endl; + std::cout << "Usage: asyncPerf [options]" << std::endl; + std::cout << opts << std::endl; + return 0; + } + + // Create and start test + tests::storePerftools::asyncPerf::PerfTest apt(to, aso); + apt.run(); + + // Print test result + std::cout << apt << std::endl; + ::sleep(1); + return 0; +} diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h new file mode 100644 index 0000000000..2b1e65f871 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file PerfTest.h + */ + +#ifndef tests_storePerftools_asyncPerf_PerfTest_h_ +#define tests_storePerftools_asyncPerf_PerfTest_h_ + +#include "MockPersistableQueue.h" +#include "TestResult.h" + +#include "tests/storePerftools/common/Streamable.h" + +#include "qpid/framing/FieldTable.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/Thread.h" + +#include +#include + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +class AsyncStoreOptions; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class TestOptions; + +class PerfTest : public tests::storePerftools::common::Streamable +{ +public: + PerfTest(const TestOptions& to, + const qpid::asyncStore::AsyncStoreOptions& aso); + virtual ~PerfTest(); + void run(); + void toStream(std::ostream& os = std::cout) const; + +protected: + const TestOptions& m_testOpts; + const qpid::asyncStore::AsyncStoreOptions& m_storeOpts; + TestResult m_testResult; + qpid::framing::FieldTable m_queueArgs; + const char* m_msgData; + boost::shared_ptr m_poller; + qpid::sys::Thread m_pollingThread; + qpid::asyncStore::AsyncStoreImpl* m_store; + std::deque m_queueList; + + void prepareStore(); + void prepareQueues(); + void destroyQueues(); + +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_PerfTest_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp new file mode 100644 index 0000000000..315e202d8b --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file QueuedMessage.cpp + */ + +#include "QueuedMessage.h" + +#include "MockTransactionContext.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +QueuedMessage::QueuedMessage(MockPersistableMessagePtr msg, + qpid::broker::EnqueueHandle& enqHandle, + MockTransactionContextPtr txn) : + m_msg(msg), + m_enqHandle(enqHandle), + m_txn(txn) +{ + if (txn) { + txn->addEnqueuedMsg(this); + } +} + +QueuedMessage::~QueuedMessage() +{} + +MockPersistableMessagePtr +QueuedMessage::getMessage() const +{ + return m_msg; +} + +qpid::broker::EnqueueHandle +QueuedMessage::getEnqueueHandle() const +{ + return m_enqHandle; +} + +MockTransactionContextPtr +QueuedMessage::getTransactionContext() const +{ + return m_txn; +} + +bool +QueuedMessage::isTransactional() const +{ + return m_txn.get() != 0; +} + +void +QueuedMessage::clearTransaction() +{ + m_txn.reset(static_cast(0)); +} + +}}} // namespace tests::storePerfTools diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h new file mode 100644 index 0000000000..be9694f6e5 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file QueuedMessage.h + */ + +#ifndef tests_storePerftools_asyncPerf_QueuedMessage_h_ +#define tests_storePerftools_asyncPerf_QueuedMessage_h_ + +#include "qpid/broker/EnqueueHandle.h" +#include + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class MockPersistableMessage; +class MockTransactionContext; + +typedef boost::shared_ptr MockPersistableMessagePtr; +typedef boost::shared_ptr MockTransactionContextPtr; + +class QueuedMessage +{ +public: + QueuedMessage(MockPersistableMessagePtr msg, + qpid::broker::EnqueueHandle& enqHandle, + MockTransactionContextPtr txn); + virtual ~QueuedMessage(); + MockPersistableMessagePtr getMessage() const; + qpid::broker::EnqueueHandle getEnqueueHandle() const; + MockTransactionContextPtr getTransactionContext() const; + bool isTransactional() const; + void clearTransaction(); + +protected: + MockPersistableMessagePtr m_msg; + qpid::broker::EnqueueHandle m_enqHandle; + MockTransactionContextPtr m_txn; +}; + +}}} // namespace tests::storePerfTools + +#endif // tests_storePerftools_asyncPerf_QueuedMessage_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp new file mode 100644 index 0000000000..27784ef661 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestOptions.cpp + */ + +#include "TestOptions.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +// static declarations +uint16_t TestOptions::s_defaultEnqTxnBlkSize = 0; +uint16_t TestOptions::s_defaultDeqTxnBlkSize = 0; + +TestOptions::TestOptions(const std::string& name) : + tests::storePerftools::common::TestOptions(name), + m_enqTxnBlockSize(s_defaultEnqTxnBlkSize), + m_deqTxnBlockSize(s_defaultDeqTxnBlkSize) +{ + doAddOptions(); +} + +TestOptions::TestOptions(const uint32_t numMsgs, + const uint32_t msgSize, + const uint16_t numQueues, + const uint16_t numEnqThreadsPerQueue, + const uint16_t numDeqThreadsPerQueue, + const uint16_t enqTxnBlockSize, + const uint16_t deqTxnBlockSize, + const std::string& name) : + tests::storePerftools::common::TestOptions(numMsgs, msgSize, numQueues, numEnqThreadsPerQueue, numDeqThreadsPerQueue, name), + m_enqTxnBlockSize(enqTxnBlockSize), + m_deqTxnBlockSize(deqTxnBlockSize) +{ + doAddOptions(); +} + +TestOptions::~TestOptions() +{} + +void +TestOptions::printVals(std::ostream& os) const +{ + tests::storePerftools::common::TestOptions::printVals(os); + os << " Num enqueus per transaction [-t, --enq-txn-size]: " << m_enqTxnBlockSize << std::endl; + os << " Num dequeues per transaction [-d, --deq-txn-size]: " << m_deqTxnBlockSize << std::endl; +} + +void +TestOptions::doAddOptions() +{ + addOptions() + ("enq-txn-size,t", qpid::optValue(m_enqTxnBlockSize, "N"), + "Num enqueus per transaction (0 = no transactions)") + ("deq-txn-size,d", qpid::optValue(m_deqTxnBlockSize, "N"), + "Num dequeues per transaction (0 = no transactions)") + ; +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.h b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.h new file mode 100644 index 0000000000..76b18717fa --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.h @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestOptions.h + */ + +#ifndef tests_storePerftools_asyncPerf_TestOptions_h_ +#define tests_storePerftools_asyncPerf_TestOptions_h_ + +#include "tests/storePerftools/common/TestOptions.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class TestOptions : public tests::storePerftools::common::TestOptions +{ +public: + TestOptions(const std::string& name="Test Options"); + TestOptions(const uint32_t numMsgs, + const uint32_t msgSize, + const uint16_t numQueues, + const uint16_t numEnqThreadsPerQueue, + const uint16_t numDeqThreadsPerQueue, + const uint16_t enqTxnBlockSize, + const uint16_t deqTxnBlockSize, + const std::string& name="Test Options"); + virtual ~TestOptions(); + void printVals(std::ostream& os) const; + + uint16_t m_enqTxnBlockSize; ///< Transaction block size for enqueues + uint16_t m_deqTxnBlockSize; ///< Transaction block size for dequeues + +protected: + static uint16_t s_defaultEnqTxnBlkSize; ///< Default transaction block size for enqueues + static uint16_t s_defaultDeqTxnBlkSize; ///< Default transaction block size for dequeues + + void doAddOptions(); +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_TestOptions_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp new file mode 100644 index 0000000000..cf6f293494 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestResult.cpp + */ + +#include "TestResult.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +TestResult::TestResult(const TestOptions& to) : + tests::storePerftools::common::TestResult(), + m_testOpts(to) +{} + +TestResult::~TestResult() +{} + +void +TestResult::toStream(std::ostream& os) const +{ + double msgsRate; + os << "TEST RESULTS:" << std::endl; + os << " Msgs per thread: " << m_testOpts.m_numMsgs << std::endl; + os << " Msg size: " << m_testOpts.m_msgSize << std::endl; + os << " No. queues: " << m_testOpts.m_numQueues << std::endl; + os << " No. enq threads/queue: " << m_testOpts.m_numEnqThreadsPerQueue << std::endl; + os << " No. deq threads/queue: " << m_testOpts.m_numDeqThreadsPerQueue << std::endl; + os << " Time taken: " << m_elapsed << " sec" << std::endl; + uint32_t msgsPerQueue = m_testOpts.m_numMsgs * m_testOpts.m_numEnqThreadsPerQueue; + if (m_testOpts.m_numQueues > 1) { + msgsRate = double(msgsPerQueue) / m_elapsed; + os << " No. msgs per queue: " << msgsPerQueue << std::endl; + os << "Per queue msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl; + os << " " << (msgsRate * m_testOpts.m_msgSize / 1e6) << " MB/sec" << std::endl; + } + uint32_t totalMsgs = msgsPerQueue * m_testOpts.m_numQueues; + msgsRate = double(totalMsgs) / m_elapsed; + os << " Total no. msgs: " << totalMsgs << std::endl; + os << " Broker msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl; + os << " " << (msgsRate * m_testOpts.m_msgSize / 1e6) << " MB/sec" << std::endl; +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestResult.h b/cpp/src/tests/storePerftools/asyncPerf/TestResult.h new file mode 100644 index 0000000000..1b831c3e17 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/TestResult.h @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestResult.h + */ + +#ifndef tests_storePerftools_asyncPerf_TestResult_h_ +#define tests_storePerftools_asyncPerf_TestResult_h_ + +#include "TestOptions.h" + +#include "tests/storePerftools/common/TestResult.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class TestOptions; + +/** + * \brief Results class that accepts an elapsed time to calculate the rate of message throughput in the journal. + * + * This class (being subclassed from ScopedTimable) is passed to a ScopedTimer object on construction, and the + * inherited _elapsed member will be written with the calculated elapsed time (in seconds) on destruction of the + * ScopedTimer object. This time (initially set to 0.0) is used to calculate message and message byte throughput. + * The message number and size information comes from the JrnlPerfTestParameters object passed to the constructor. + * + * Results are available through the use of toStream(), toString() or the << operators. + * + * Output is in the following format: + *
+ * TEST RESULTS:
+ *     Msgs per thread: 10000
+ *            Msg size: 2048
+ *          No. queues: 2
+ *   No. threads/queue: 2
+ *          Time taken: 1.6626 sec
+ *      Total no. msgs: 40000
+ *      Msg throughput: 24.0587 kMsgs/sec
+ *                      49.2723 MB/sec
+ * 
+ */ +class TestResult : public tests::storePerftools::common::TestResult +{ +public: + /** + * \brief Constructor + * + * Constructor. Will start the time interval measurement. + * + * \param tp Test parameter details used to calculate the performance results. + */ + TestResult(const TestOptions& to); + + /** + * \brief Virtual destructor + */ + virtual ~TestResult(); + + /** + * \brief Stream the performance test results to an output stream + * + * Convenience feature which streams a multi-line performance result an output stream. + * + * \param os Output stream to which the results are to be streamed + */ + void toStream(std::ostream& os = std::cout) const; + +protected: + TestOptions m_testOpts; ///< Test parameters used for performance calculations + +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_TestResult_h_ diff --git a/cpp/src/tests/storePerftools/common/Parameters.cpp b/cpp/src/tests/storePerftools/common/Parameters.cpp new file mode 100644 index 0000000000..8e4bafaf86 --- /dev/null +++ b/cpp/src/tests/storePerftools/common/Parameters.cpp @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Parameters.cpp + */ + +#include "Parameters.h" + +namespace tests { +namespace storePerftools { +namespace common { + +Parameters::~Parameters() +{} + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerftools/common/Parameters.h b/cpp/src/tests/storePerftools/common/Parameters.h new file mode 100644 index 0000000000..941cce2dc6 --- /dev/null +++ b/cpp/src/tests/storePerftools/common/Parameters.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Parameters.h + */ +#ifndef tests_storePerftools_common_Parameters_h_ +#define tests_storePerftools_common_Parameters_h_ + +#include "Streamable.h" + +namespace tests { +namespace storePerftools { +namespace common { + +class Parameters: public Streamable +{ +public: + virtual ~Parameters(); + virtual bool parseArg(const int arg, + const char* optarg) = 0; + +}; + +}}} // namespace tests::storePerfTools::common + +#endif // tests_storePerftools_common_Parameters_h_ diff --git a/cpp/src/tests/storePerftools/common/PerftoolError.cpp b/cpp/src/tests/storePerftools/common/PerftoolError.cpp new file mode 100644 index 0000000000..5bb61b6519 --- /dev/null +++ b/cpp/src/tests/storePerftools/common/PerftoolError.cpp @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file PerftoolError.cpp + */ + +#include "PerftoolError.h" + +#include // std::setfill(), std::setw() + +namespace tests { +namespace storePerftools { +namespace common { + +// private +PerftoolError::PerftoolError() : + std::runtime_error(std::string()) +{} + +PerftoolError::PerftoolError(const uint32_t errCode) throw () : + std::runtime_error(std::string()), + m_errCode(errCode) +{ + formatWhatStr(); +} + +PerftoolError::PerftoolError(const std::string& errMsg) throw () : + std::runtime_error(std::string()), + m_errCode(0), + m_errMsg(errMsg) +{ + formatWhatStr(); +} + +PerftoolError::PerftoolError(const uint32_t errCode, + const std::string& errMsg) throw () : + std::runtime_error(std::string()), + m_errCode(errCode), + m_errMsg(errMsg) +{ + formatWhatStr(); +} + +PerftoolError::PerftoolError(const uint32_t errCode, + const std::string& throwingClass, + const std::string& throwingFunction) throw () : + std::runtime_error(std::string()), + m_errCode(errCode), + m_throwingClass(throwingClass), + m_throwingFunction(throwingFunction) +{ + formatWhatStr(); +} + +PerftoolError::PerftoolError(const std::string& errMsg, + const std::string& throwingClass, + const std::string& throwingFunction) throw () : + std::runtime_error(std::string()), + m_errCode(0), + m_errMsg(errMsg), + m_throwingClass(throwingClass), + m_throwingFunction(throwingFunction) +{ + formatWhatStr(); +} + +PerftoolError::PerftoolError(const uint32_t errCode, + const std::string& errMsg, + const std::string& throwingClass, + const std::string& throwingFunction) throw () : + std::runtime_error(std::string()), + m_errCode(errCode), + m_errMsg(errMsg), + m_throwingClass(throwingClass), + m_throwingFunction(throwingFunction) +{} + +PerftoolError::~PerftoolError() throw() +{} + +const char* +PerftoolError::what() const throw () +{ + return m_what.c_str(); +} + +uint32_t +PerftoolError::getErrorCode() const throw () +{ + return m_errCode; +} + +const std::string +PerftoolError::getAdditionalInfo() const throw () +{ + return m_errMsg; +} + +const std::string +PerftoolError::getThrowingClass() const throw () +{ + return m_throwingClass; +} + +const std::string +PerftoolError::getThrowingFunction() const throw () +{ + return m_throwingFunction; +} + +void +PerftoolError::toStream(std::ostream& os) const +{ + os << what(); +} + +// protected +void +PerftoolError::formatWhatStr() throw () +{ + try { + const bool ai = !m_errMsg.empty(); + const bool tc = !m_throwingClass.empty(); + const bool tf = !m_throwingFunction.empty(); + std::ostringstream oss; + oss << className() << " 0x" << std::hex << std::setfill('0') << std::setw(4) << m_errCode << " "; + if (tc) { + oss << m_throwingClass; + if (tf) { + oss << "::"; + } else { + oss << " "; + } + } + if (tf) { + oss << m_throwingFunction << "() "; + } + if (tc || tf) { + oss << "threw " << s_errorMessage(m_errCode); + } + if (ai) { + oss << " (" << m_errMsg << ")"; + } + m_what.assign(oss.str()); + } catch (...) {} +} + +// protected +const char* +PerftoolError::className() +{ + return s_className; +} + +//static +const char* PerftoolError::s_className = "PerftoolError"; + +// --- Static definitions --- +PerftoolError::errorMap_t PerftoolError::s_errorMap; +PerftoolError::errorMapCitr_t PerftoolError::s_errorMapIterator; +bool PerftoolError::s_initializedFlag = PerftoolError::s_initialize(); + +// --- Generic and system errors --- +const uint32_t PerftoolError::PERR_PTHREAD = 0x0001; + +// static +const char* +PerftoolError::s_errorMessage(const uint32_t err_no) throw () +{ + s_errorMapIterator = s_errorMap.find(err_no); + if (s_errorMapIterator == s_errorMap.end()) + return ""; + return s_errorMapIterator->second; +} + +// protected static +bool +PerftoolError::s_initialize() +{ + s_errorMap[PERR_PTHREAD] = "ERR_PTHREAD: pthread operation failure"; + + return true; +} + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerftools/common/PerftoolError.h b/cpp/src/tests/storePerftools/common/PerftoolError.h new file mode 100644 index 0000000000..d4740f8d1d --- /dev/null +++ b/cpp/src/tests/storePerftools/common/PerftoolError.h @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file PerftoolError.h + */ + +#ifndef tests_storePerftools_common_PerftoolError_h_ +#define tests_storePerftools_common_PerftoolError_h_ + +#include "Streamable.h" + +#include +#include // std::runtime_error +#include // uint32_t + +// Macro definitions + +#include // std::strerror() +#include // std::ostringstream + +/** + * \brief Macro to retrieve and format the C errno value as a string. + * + * \param errno Value of errno to be formatted. + */ +#define FORMAT_SYSERR(errno) " errno=" << errno << " (" << std::strerror(errno) << ")" + +/** + * \brief Macro to check for a clean pthread creation and throwing a JournalException with code JERR_PTHREAD if + * thread creation failed. + * + * \param err Value or errno. + * \param pfn Name of system call that failed. + * \param cls Name of class in which function failed. + * \param fn Name of class function that failed. + */ +#define PTHREAD_CHK(err, pfn, cls, fn) if(err != 0) { \ + std::ostringstream oss; \ + oss << pfn << " failed: " << FORMAT_SYSERR(err); \ + throw tests::storePerftools::common::PerftoolError(tests::storePerftools::common::PerftoolError::PERR_PTHREAD, oss.str(), cls, fn); \ + } + +namespace tests { +namespace storePerftools { +namespace common { + +class PerftoolError: public std::runtime_error, public Streamable +{ +public: + // --- Constructors & destructors --- + PerftoolError(const uint32_t errCode) throw (); + PerftoolError(const std::string& errMsg) throw (); + PerftoolError(const uint32_t errCode, + const std::string& errMsg) throw (); + PerftoolError(const uint32_t errCode, + const std::string& throwingClass, + const std::string& throwingFunction) throw (); + PerftoolError(const std::string& errMsg, + const std::string& throwingClass, + const std::string& throwingFunction) throw (); + PerftoolError(const uint32_t errCode, + const std::string& errMsg, + const std::string& throwingClass, + const std::string& throwingFunction) throw (); + virtual ~PerftoolError() throw(); + + const char* what() const throw (); // overrides std::runtime_error::what() + uint32_t getErrorCode() const throw (); + const std::string getAdditionalInfo() const throw (); + const std::string getThrowingClass() const throw (); + const std::string getThrowingFunction() const throw (); + + // --- Implementation of class Streamable --- + virtual void toStream(std::ostream& os = std::cout) const; + + // --- Generic and system errors --- + static const uint32_t PERR_PTHREAD; ///< pthread operation failure + + + static const char* s_errorMessage(const uint32_t err_no) throw (); + + +protected: + uint32_t m_errCode; ///< Error or failure code, taken from JournalErrors. + std::string m_errMsg; ///< Additional information pertaining to the error or failure. + std::string m_throwingClass; ///< Name of the class throwing the error. + std::string m_throwingFunction; ///< Name of the function throwing the error. + std::string m_what; ///< Standard error of failure message, taken from JournalErrors. + + void formatWhatStr() throw (); + virtual const char* className(); + + typedef std::map errorMap_t; ///< Type for map of error messages + typedef errorMap_t::const_iterator errorMapCitr_t; ///< Const iterator for map of error messages + + static errorMap_t s_errorMap; ///< Map of error messages + static errorMapCitr_t s_errorMapIterator; ///< Const iterator + +private: + static const char* s_className; ///< Name of this class, used in formatting error messages. + static bool s_initializedFlag; ///< Dummy flag, used to initialize map. + + PerftoolError(); + static bool s_initialize(); ///< Static fn for initializing static data + +}; + +}}} // namespace tests::stprePerftools::common + +#endif // tests_storePerftools_common_PerftoolError_h_ diff --git a/cpp/src/tests/storePerftools/common/ScopedTimable.cpp b/cpp/src/tests/storePerftools/common/ScopedTimable.cpp new file mode 100644 index 0000000000..c2023b7854 --- /dev/null +++ b/cpp/src/tests/storePerftools/common/ScopedTimable.cpp @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file ScopedTimable.cpp + */ + +#include "ScopedTimable.h" + +namespace tests { +namespace storePerftools { +namespace common { + +ScopedTimable::ScopedTimable() : + m_elapsed(0.0) +{} + +ScopedTimable::~ScopedTimable() +{} + +double& +ScopedTimable::getElapsedRef() +{ + return m_elapsed; +} + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerftools/common/ScopedTimable.h b/cpp/src/tests/storePerftools/common/ScopedTimable.h new file mode 100644 index 0000000000..73d12b77f6 --- /dev/null +++ b/cpp/src/tests/storePerftools/common/ScopedTimable.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file ScopedTimable.h + */ + +#ifndef tests_storePerftools_common_ScopedTimable_h_ +#define tests_storePerftools_common_ScopedTimable_h_ + +namespace tests { +namespace storePerftools { +namespace common { + +/** + * \brief Scoped timer class that starts timing on construction and finishes on destruction. + * + * This class is designed to be the parent class for a performance result class which depends on the elapsed + * time of some process or event. By passing this (or its subclasses) to ScopedTimer (which only exists within + * the scope of the event), the _elapsed member of this class will be written with the elapsed time when the + * ScopedTimer object goes out of scope or is destroyed. + * + * Subclasses may be aware of the parameters being timed, and may thus print and/or display performance and/or + * rate information for these parameters. + */ +class ScopedTimable +{ +public: + ScopedTimable(); + virtual ~ScopedTimable(); + double& getElapsedRef(); + +protected: + double m_elapsed; ///< Elapsed time, will be written on destruction of ScopedTimer instances + +}; + +}}} // namespace tests::storePerftools::common + +#endif // tests_storePerftools_common_ScopedTimable_h_ diff --git a/cpp/src/tests/storePerftools/common/ScopedTimer.cpp b/cpp/src/tests/storePerftools/common/ScopedTimer.cpp new file mode 100644 index 0000000000..8312174cad --- /dev/null +++ b/cpp/src/tests/storePerftools/common/ScopedTimer.cpp @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file ScopedTimer.cpp + */ + +#include "ScopedTimer.h" + +#include "ScopedTimable.h" + +namespace tests { +namespace storePerftools { +namespace common { + +ScopedTimer::ScopedTimer(double& elapsed) : + m_elapsed(elapsed) +{ + ::clock_gettime(CLOCK_REALTIME, &m_startTime); +} + +ScopedTimer::ScopedTimer(ScopedTimable& st) : + m_elapsed(st.getElapsedRef()) +{ + ::clock_gettime(CLOCK_REALTIME, &m_startTime); +} + +ScopedTimer::~ScopedTimer() +{ + ::timespec stopTime; + ::clock_gettime(CLOCK_REALTIME, &stopTime); + m_elapsed = _s_getDoubleTime(stopTime) - _s_getDoubleTime(m_startTime); +} + +// static +double ScopedTimer::_s_getDoubleTime(const ::timespec& ts) +{ + return ts.tv_sec + (double(ts.tv_nsec) / 1e9); +} + + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerftools/common/ScopedTimer.h b/cpp/src/tests/storePerftools/common/ScopedTimer.h new file mode 100644 index 0000000000..dd056cf726 --- /dev/null +++ b/cpp/src/tests/storePerftools/common/ScopedTimer.h @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file ScopedTimer.h + */ + +#ifndef tests_storePerftools_common_ScopedTimer_h_ +#define tests_storePerftools_common_ScopedTimer_h_ + +#include + +namespace tests { +namespace storePerftools { +namespace common { + +class ScopedTimable; + +/** + * \brief Scoped timer class that starts timing on construction and finishes on destruction. + * + * The scoped timer will take the current time on construction and again on destruction. The destructor + * will calculate the elapsed time from the difference between these two times and write the result + * as a double to the double ref supplied to the constructor. A second constructor will accept a class (or + * subclass) of ScopedTimable, which contains a double to which the result may be written and accessed at a + * later time. + */ +class ScopedTimer +{ +public: + /** + * \brief Constructor + * + * Constructor which accepts a ref to a double. Will start the time interval measurement. + * + * \param elapsed A ref to a double which will contain the elapsed time in seconds after this class instance + * is destroyed. + */ + ScopedTimer(double& elapsed); + + /** + * \brief Constructor + * + * Constructor which accepts a ref to a ScopedTimable. Will start the time interval measurement. + * + * \param st A ref to a ScopedTimable into which the result of the ScopedTimer can be written. + */ + ScopedTimer(ScopedTimable& st); + + /** + * \brief Destructor + * + * Destructor. Will stop the time interval measurement and write the calculated elapsed time into _elapsed. + */ + virtual ~ScopedTimer(); + +protected: + double& m_elapsed; ///< Ref to elapsed time, will be written on destruction of ScopedTimer instances + ::timespec m_startTime; ///< Start time, set on construction + + /** + * \brief Convert ::timespec to seconds + * + * Static function to convert a ::timespec struct into a double representation in seconds. + * + * \param ts std::timespec struct containing the time to be converted. + * \return A double which represents the time in parameter ts in seconds. + */ + static double _s_getDoubleTime(const ::timespec& ts); + +}; + +}}} // namespace tests::storePerftools::common + +#endif // tests_storePerftools_common_ScopedTimer_h_ diff --git a/cpp/src/tests/storePerftools/common/Streamable.cpp b/cpp/src/tests/storePerftools/common/Streamable.cpp new file mode 100644 index 0000000000..8c58f1c03e --- /dev/null +++ b/cpp/src/tests/storePerftools/common/Streamable.cpp @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Streamable.cpp + */ + +#include "Streamable.h" + +#include + +namespace tests { +namespace storePerftools { +namespace common { + +std::string +Streamable::toString() const +{ + std::ostringstream oss; + toStream(oss); + return oss.str(); +} + +std::ostream& +operator<<(std::ostream& os, const Streamable& s) +{ + s.toStream(os); + return os; +} + +std::ostream& +operator<<(std::ostream& os, const Streamable* sPtr) +{ + sPtr->toStream(os); + return os; +} + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerftools/common/Streamable.h b/cpp/src/tests/storePerftools/common/Streamable.h new file mode 100644 index 0000000000..5fb6d5862b --- /dev/null +++ b/cpp/src/tests/storePerftools/common/Streamable.h @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Streamable.h + */ + +#ifndef tests_storePerftools_common_Streamable_h_ +#define tests_storePerftools_common_Streamable_h_ + +#include + +namespace tests { +namespace storePerftools { +namespace common { + +/** + * \brief Abstract class which provides the mechanisms to stream + * + * An abstract class which provides stream functions. The toStream() function must be implemented by subclasses, + * and is used by the remaining functions. For convenience, toString() returns a std::string object. + */ +class Streamable +{ +public: + /** + * \brief Virtual destructor + */ + virtual ~Streamable() {} + + /*** + * \brief Stream some representation of the object to an output stream + * + * \param os Output stream to which the class data is to be streamed + */ + virtual void toStream(std::ostream& os = std::cout) const = 0; + + /** + * \brief Creates a string representation of the test parameters + * + * Convenience feature which creates and returns a std::string object containing the content of toStream(). + * + * \return Content of toStream() + */ + std::string toString() const; + + /** + * \brief Stream the object to an output stream + */ + friend std::ostream& operator<<(std::ostream& os, + const Streamable& s); + + /** + * \brief Stream the object to an output stream through an object pointer + */ + friend std::ostream& operator<<(std::ostream& os, + const Streamable* sPtr); + +}; + +}}} // namespace tests::storePerftools::common + +#endif // tests_storePerftools_common_Streamable_h_ diff --git a/cpp/src/tests/storePerftools/common/TestOptions.cpp b/cpp/src/tests/storePerftools/common/TestOptions.cpp new file mode 100644 index 0000000000..39e3434a6c --- /dev/null +++ b/cpp/src/tests/storePerftools/common/TestOptions.cpp @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestOptions.cpp + */ + +#include "TestOptions.h" + +namespace tests { +namespace storePerftools { +namespace common { + +// static declarations +uint32_t TestOptions::s_defaultNumMsgs = 1024; +uint32_t TestOptions::s_defaultMsgSize = 1024; +uint16_t TestOptions::s_defaultNumQueues = 1; +uint16_t TestOptions::s_defaultEnqThreadsPerQueue = 1; +uint16_t TestOptions::s_defaultDeqThreadsPerQueue = 1; + +TestOptions::TestOptions(const std::string& name) : + qpid::Options(name), + m_numMsgs(s_defaultNumMsgs), + m_msgSize(s_defaultMsgSize), + m_numQueues(s_defaultNumQueues), + m_numEnqThreadsPerQueue(s_defaultEnqThreadsPerQueue), + m_numDeqThreadsPerQueue(s_defaultDeqThreadsPerQueue) +{ + doAddOptions(); +} + +TestOptions::TestOptions(const uint32_t numMsgs, + const uint32_t msgSize, + const uint16_t numQueues, + const uint16_t numEnqThreadsPerQueue, + const uint16_t numDeqThreadsPerQueue, + const std::string& name) : + qpid::Options(name), + m_numMsgs(numMsgs), + m_msgSize(msgSize), + m_numQueues(numQueues), + m_numEnqThreadsPerQueue(numEnqThreadsPerQueue), + m_numDeqThreadsPerQueue(numDeqThreadsPerQueue) +{ + doAddOptions(); +} + +TestOptions::~TestOptions() +{} + +void +TestOptions::printVals(std::ostream& os) const +{ + os << "TEST OPTIONS:" << std::endl; + os << " Number of queues [-q, --num-queues]: " << m_numQueues << std::endl; + os << " Number of producers per queue [-p, --num-producers]: " << m_numEnqThreadsPerQueue << std::endl; + os << " Number of consumers per queue [-c, --num-consumers]: " << m_numDeqThreadsPerQueue << std::endl; + os << " Number of messages to send per producer [-m, --num-msgs]: " << m_numMsgs << std::endl; + os << " Size of each message (bytes) [-s, --msg-size]: " << m_msgSize << std::endl; +} + +void +TestOptions::validate() +{ + if (((m_numEnqThreadsPerQueue * m_numMsgs) % m_numDeqThreadsPerQueue) != 0) { + throw qpid::Exception("Parameter Error: (num-producers * num-msgs) must be a multiple of num-consumers."); + } +} + +void +TestOptions::doAddOptions() +{ + addOptions() + ("num-queues,q", qpid::optValue(m_numQueues, "N"), + "Number of queues") + ("num-producers,p", qpid::optValue(m_numEnqThreadsPerQueue, "N"), + "Number of producers per queue") + ("num-consumers,c", qpid::optValue(m_numDeqThreadsPerQueue, "N"), + "Number of consumers per queue") + ("num-msgs,m", qpid::optValue(m_numMsgs, "N"), + "Number of messages to send per producer") + ("msg-size,s", qpid::optValue(m_msgSize, "N"), + "Size of each message (bytes)") + ; +} + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerftools/common/TestOptions.h b/cpp/src/tests/storePerftools/common/TestOptions.h new file mode 100644 index 0000000000..217e0ca2f9 --- /dev/null +++ b/cpp/src/tests/storePerftools/common/TestOptions.h @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestOptions.h + */ + +#ifndef tests_storePerftools_common_TestOptions_h_ +#define tests_storePerftools_common_TestOptions_h_ + +#include "qpid/Options.h" + +namespace tests { +namespace storePerftools { +namespace common { + +class TestOptions : public qpid::Options +{ +public: + TestOptions(const std::string& name="Test Options"); + TestOptions(const uint32_t numMsgs, + const uint32_t msgSize, + const uint16_t numQueues, + const uint16_t numEnqThreadsPerQueue, + const uint16_t numDeqThreadsPerQueue, + const std::string& name="Test Options"); + virtual ~TestOptions(); + void printVals(std::ostream& os) const; + void validate(); + + uint32_t m_numMsgs; ///< Number of messages to be sent + uint32_t m_msgSize; ///< Message size in bytes + uint16_t m_numQueues; ///< Number of queues to test simultaneously + uint16_t m_numEnqThreadsPerQueue; ///< Number of enqueue threads per queue + uint16_t m_numDeqThreadsPerQueue; ///< Number of dequeue threads per queue + +protected: + static uint32_t s_defaultNumMsgs; ///< Default number of messages to be sent + static uint32_t s_defaultMsgSize; ///< Default message size in bytes + static uint16_t s_defaultNumQueues; ///< Default number of queues to test simultaneously + static uint16_t s_defaultEnqThreadsPerQueue; ///< Default number of enqueue threads per queue + static uint16_t s_defaultDeqThreadsPerQueue; ///< Default number of dequeue threads per queue + + void doAddOptions(); + +}; + +}}} // namespace tests::storePerftools::common + +#endif // tests_storePerftools_common_TestOptions_h_ diff --git a/cpp/src/tests/storePerftools/common/TestParameters.cpp b/cpp/src/tests/storePerftools/common/TestParameters.cpp new file mode 100644 index 0000000000..f36a2d3bda --- /dev/null +++ b/cpp/src/tests/storePerftools/common/TestParameters.cpp @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestParameters.cpp + */ + +#include "TestParameters.h" + +#include // std::atoi, std::atol + +namespace tests { +namespace storePerftools { +namespace common { + +// static declarations +uint32_t TestParameters::s_defaultNumMsgs = 1024; +uint32_t TestParameters::s_defaultMsgSize = 1024; +uint16_t TestParameters::s_defaultNumQueues = 1; +uint16_t TestParameters::s_defaultEnqThreadsPerQueue = 1; +uint16_t TestParameters::s_defaultDeqThreadsPerQueue = 1; + +TestParameters::TestParameters(): + Parameters(), + m_numMsgs(s_defaultNumMsgs), + m_msgSize(s_defaultMsgSize), + m_numQueues(s_defaultNumQueues), + m_numEnqThreadsPerQueue(s_defaultEnqThreadsPerQueue), + m_numDeqThreadsPerQueue(s_defaultDeqThreadsPerQueue)//, +{} + +TestParameters::TestParameters(const uint32_t numMsgs, + const uint32_t msgSize, + const uint16_t numQueues, + const uint16_t numEnqThreadsPerQueue, + const uint16_t numDeqThreadsPerQueue) : + Parameters(), + m_numMsgs(numMsgs), + m_msgSize(msgSize), + m_numQueues(numQueues), + m_numEnqThreadsPerQueue(numEnqThreadsPerQueue), + m_numDeqThreadsPerQueue(numDeqThreadsPerQueue) +{} + +TestParameters::TestParameters(const TestParameters& tp): + Parameters(), + m_numMsgs(tp.m_numMsgs), + m_msgSize(tp.m_msgSize), + m_numQueues(tp.m_numQueues), + m_numEnqThreadsPerQueue(tp.m_numEnqThreadsPerQueue), + m_numDeqThreadsPerQueue(tp.m_numDeqThreadsPerQueue) +{} + +TestParameters::~TestParameters() +{} + +void +TestParameters::toStream(std::ostream& os) const +{ + os << "Test Parameters:" << std::endl; + os << " num_msgs = " << m_numMsgs << std::endl; + os << " msg_size = " << m_msgSize << std::endl; + os << " num_queues = " << m_numQueues << std::endl; + os << " num_enq_threads_per_queue = " << m_numEnqThreadsPerQueue << std::endl; + os << " num_deq_threads_per_queue = " << m_numDeqThreadsPerQueue << std::endl; +} + +bool +TestParameters::parseArg(const int arg, + const char* optarg) +{ + switch(arg) { + case 'm': + m_numMsgs = uint32_t(std::atol(optarg)); + break; + case 'S': + m_msgSize = uint32_t(std::atol(optarg)); + break; + case 'q': + m_numQueues = uint16_t(std::atoi(optarg)); + break; + case 'e': + m_numEnqThreadsPerQueue = uint16_t(std::atoi(optarg)); + break; + case 'd': + m_numDeqThreadsPerQueue = uint16_t(std::atoi(optarg)); + break; + default: + return false; + } + return true; +} + +// static +void +TestParameters::printArgs(std::ostream& os) +{ + os << "Test parameters:" << std::endl; + os << " -m --num_msgs: Number of messages to send per enqueue thread [" + << TestParameters::s_defaultNumMsgs << "]" << std::endl; + os << " -S --msg_size: Size of each message to be sent [" + << TestParameters::s_defaultMsgSize << "]" << std::endl; + os << " -q --num_queues: Number of simultaneous queues [" + << TestParameters::s_defaultNumQueues << "]" << std::endl; + os << " -e --num_enq_threads_per_queue: Number of enqueue threads per queue [" + << TestParameters::s_defaultEnqThreadsPerQueue << "]" << std::endl; + os << " -d --num_deq_threads_per_queue: Number of dequeue threads per queue [" + << TestParameters::s_defaultDeqThreadsPerQueue << "]" << std::endl; + os << std::endl; +} + +// static +std::string +TestParameters::shortArgs() +{ + return "m:S:q:e:d:"; +} + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerftools/common/TestParameters.h b/cpp/src/tests/storePerftools/common/TestParameters.h new file mode 100644 index 0000000000..c9ebc8cefa --- /dev/null +++ b/cpp/src/tests/storePerftools/common/TestParameters.h @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestParameters.h + */ + +#ifndef tests_storePerftools_common_TestParameters_h_ +#define tests_storePerftools_common_TestParameters_h_ + +#include "Parameters.h" + +#include // uint16_t, uint32_t + +namespace tests { +namespace storePerftools { +namespace common { + +class TestOptions; + +/** + * \brief Struct for aggregating the test parameters + * + * This struct is used to aggregate and keep together all the test parameters. These affect the test itself, the + * journal geometry is aggregated in class JrnlParameters. + */ +class TestParameters : public Parameters +{ +public: + static uint32_t s_defaultNumMsgs; ///< Default number of messages to be sent + static uint32_t s_defaultMsgSize; ///< Default message size in bytes + static uint16_t s_defaultNumQueues; ///< Default number of queues to test simultaneously + static uint16_t s_defaultEnqThreadsPerQueue; ///< Default number of enqueue threads per queue + static uint16_t s_defaultDeqThreadsPerQueue; ///< Default number of dequeue threads per queue + + uint32_t m_numMsgs; ///< Number of messages to be sent + uint32_t m_msgSize; ///< Message size in bytes + uint16_t m_numQueues; ///< Number of queues to test simultaneously + uint16_t m_numEnqThreadsPerQueue; ///< Number of enqueue threads per queue + uint16_t m_numDeqThreadsPerQueue; ///< Number of dequeue threads per queue + + /** + * \brief Defaault constructor + * + * Default constructor. Uses the default values for all parameters. + */ + TestParameters(); + + /** + * \brief Constructor + * + * Convenience constructor. + * + * \param numMsgs Number of messages to be sent + * \param msgSize Message size in bytes + * \param numQueues Number of queues to test simultaneously + * \param numEnqThreadsPerQueue Number of enqueue threads per queue + * \param numDeqThreadsPerQueue Number of dequeue threads per queue + */ + TestParameters(const uint32_t numMsgs, + const uint32_t msgSize, + const uint16_t numQueues, + const uint16_t numEnqThreadsPerQueue, + const uint16_t numDeqThreadsPerQueue); + + /** + * \brief Copy constructor + * + * \param tp Reference to JrnlPerfTestParameters instance to be copied + */ + TestParameters(const TestParameters& tp); + + /** + * \brief Virtual destructor + */ + virtual ~TestParameters(); + + virtual bool parseArg(const int arg, + const char* optarg); + + static void printArgs(std::ostream& os); + + static std::string shortArgs(); + + /*** + * \brief Stream the test parameters to an output stream + * + * Convenience feature which streams a multi-line representation of all the test parameters, one per line to an + * output stream. + * + * \param os Output stream to which the class data is to be streamed + */ + void toStream(std::ostream& os = std::cout) const; + +}; + +}}} // namespace tests::storePerftools::common + +#endif // tests_storePerftools_common_TestParameters_h_ diff --git a/cpp/src/tests/storePerftools/common/TestResult.cpp b/cpp/src/tests/storePerftools/common/TestResult.cpp new file mode 100644 index 0000000000..c3e9d27dfc --- /dev/null +++ b/cpp/src/tests/storePerftools/common/TestResult.cpp @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestResult.cpp + */ + +#include "TestResult.h" + +namespace tests { +namespace storePerftools { +namespace common { + +TestResult::TestResult() : + ScopedTimable(), + Streamable() +{} + +TestResult::~TestResult() +{} + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerftools/common/TestResult.h b/cpp/src/tests/storePerftools/common/TestResult.h new file mode 100644 index 0000000000..e878164837 --- /dev/null +++ b/cpp/src/tests/storePerftools/common/TestResult.h @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestResult.h + */ + +#ifndef tests_storePerftools_common_TestResult_h_ +#define tests_storePerftools_common_TestResult_h_ + +#include "ScopedTimable.h" +#include "Streamable.h" + +namespace tests { +namespace storePerftools { +namespace common { + +class TestResult : public ScopedTimable, public Streamable +{ +public: + TestResult(); + virtual ~TestResult(); + void toStream(std::ostream& os = std::cout) const = 0; +}; + +}}} // namespace tests:storePerftools::common + +#endif // tests_storePerftools_common_TestResult_h_ diff --git a/cpp/src/tests/storePerftools/common/Thread.cpp b/cpp/src/tests/storePerftools/common/Thread.cpp new file mode 100644 index 0000000000..188e102e8f --- /dev/null +++ b/cpp/src/tests/storePerftools/common/Thread.cpp @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Thread.cpp + */ + +#include "Thread.h" + +#include "PerftoolError.h" + +namespace tests { +namespace storePerftools { +namespace common { + +Thread::Thread(startFn_t sf, + void* p) : + m_running(true) +{ + PTHREAD_CHK(::pthread_create(&m_thread, NULL, sf, p), "::pthread_create", "Thread", "Thread"); +} + +Thread::Thread(Thread::startFn_t sf, + void* p, + const std::string& id) : + m_id(id), + m_running(true) +{ + PTHREAD_CHK(::pthread_create(&m_thread, NULL, sf, p), "::pthread_create", "Thread", "Thread"); +} + +Thread::~Thread() +{ + if (m_running) { + PTHREAD_CHK(::pthread_detach(m_thread), "pthread_detach", "~Thread", "Thread"); + } +} + +const std::string& +Thread::getId() const +{ + return m_id; +} + +void Thread::join() +{ + PTHREAD_CHK(::pthread_join(m_thread, NULL), "pthread_join", "join", "Thread"); + m_running = false; +} + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerftools/common/Thread.h b/cpp/src/tests/storePerftools/common/Thread.h new file mode 100644 index 0000000000..bab484dd66 --- /dev/null +++ b/cpp/src/tests/storePerftools/common/Thread.h @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Thread.h + */ + +#ifndef tests_storePerftools_common_Thread_h_ +#define tests_storePerftools_common_Thread_h_ + +#include +#include + +namespace tests { +namespace storePerftools { +namespace common { + +/** + * \brief Ultra-simple pthread class. + */ +class Thread { +public: + typedef void*(*startFn_t)(void*); ///< Thread entry point function pointer type + + /** + * \brief Constructor + * \param sf Pointer to thread entry function + * \param p Void pointer to parameter of start function + */ + Thread(startFn_t sf, + void* p); + + /** + * \brief Constructor + * \param sf Pointer to thread entry function + * \param p Void pointer to parameter of start function + * \param id Name of this thread instance + */ + Thread(startFn_t sf, + void* p, + const std::string& id); + + /** + * \brief Destructor + */ + virtual ~Thread(); + + /** + * \brief Get the name of this thread. + * \return Name as supplied to the constructor. + */ + const std::string& getId() const; + + /** + * \brief Wait for this thread instance to finish running startFn(). + */ + void join(); + +private: + ::pthread_t m_thread; ///< Internal posix thread + std::string m_id; ///< Identifier for this thread instance + bool m_running; ///< \b true is the thread is active and running, \b false when not yet started or joined. + +}; + +}}} // namespace tests::storePerftools::common + +#endif // tests_storePerftools_common_Thread_h_ diff --git a/cpp/src/tests/storePerftools/jrnlPerf/Journal.cpp b/cpp/src/tests/storePerftools/jrnlPerf/Journal.cpp new file mode 100644 index 0000000000..6efdc06fc8 --- /dev/null +++ b/cpp/src/tests/storePerftools/jrnlPerf/Journal.cpp @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Journal.cpp + */ + +#include "Journal.h" + +#ifdef JOURNAL2 +# include "qpid/asyncStore/jrnl2/DataToken.h" + +# define X_JRNL_FN_DEQUEUE(dtok) dequeue(dtok, 0, 0) +# define X_JRNL_FN_ENQUEUE(dtok, msgData, msgSize) enqueue(dtok, msgData, msgSize, 0, 0, false) +# define X_JRNL_FN_FLUSH(jrnlPtr) { jrnlPtr->flush(); jrnlPtr->sync(); } +# define X_JRNL_FN_GETDTOKSTATUS(dtok) dtok +# define X_JRNL_FN_GETEVENTS(timeout) processCompletedAioWriteEvents(timeout) +# define X_JRNL_FN_GETIOSTR(iores) qpid::asyncStore::jrnl2::g_ioResAsString(iores) +# define X_JRNL_IO_OP_RES qpid::asyncStore::jrnl2::jrnlOpRes +# define X_JRNL_IO_OP_RES_BUSY qpid::asyncStore::jrnl2::RHM_IORES_BUSY +# define X_JRNL_IO_OP_RES_ENQCAPTHRESH qpid::asyncStore::jrnl2::RHM_IORES_ENQCAPTHRESH +# define X_JRNL_IO_OP_RES_SUCCESS 0 +# define X_SCOPED_LOCK qpid::asyncStore::jrnl2::ScopedLock +#else +# include "jrnl/jcntl.hpp" +# include "jrnl/data_tok.hpp" + +# define X_JRNL_FN_DEQUEUE(dtok) dequeue_data_record(dtok) +# define X_JRNL_FN_ENQUEUE(dtok, msgData, msgSize) enqueue_data_record(msgData, msgSize, msgSize, dtok); +# define X_JRNL_FN_FLUSH(jrnlPtr) jrnlPtr->flush(true) +# define X_JRNL_FN_GETDTOKSTATUS(dtok) dtok->status_str() +# define X_JRNL_FN_GETEVENTS(timeout) get_wr_events(timeout) +# define X_JRNL_FN_GETIOSTR(iores) mrg::journal::iores_str(iores) +# define X_JRNL_IO_OP_RES mrg::journal::iores +# define X_JRNL_IO_OP_RES_BUSY mrg::journal::RHM_IORES_BUSY +# define X_JRNL_IO_OP_RES_ENQCAPTHRESH mrg::journal::RHM_IORES_ENQCAPTHRESH +# define X_JRNL_IO_OP_RES_SUCCESS mrg::journal::RHM_IORES_SUCCESS +# define X_SCOPED_LOCK mrg::journal::slock +#endif + +#include + +namespace tests { +namespace storePerftools { +namespace jrnlPerf { + +Journal::Journal(const uint32_t numMsgs, + const uint32_t msgSize, + const char* msgData, + X_ASYNC_JOURNAL* const jrnlPtr) : + m_numMsgs(numMsgs), + m_msgSize(msgSize), + m_msgData(msgData), + m_jrnlPtr(jrnlPtr) +{} + +Journal::~Journal() +{ + delete m_jrnlPtr; +} + + +// *** MUST BE THREAD-SAFE **** +// This method will be called by multiple threads simultaneously +// Enqueue thread entry point +void* +Journal::runEnqueues() +{ + bool misfireFlag = false; + uint32_t i = 0; + while (i < m_numMsgs) { + X_DATA_TOKEN* mtokPtr = new X_DATA_TOKEN(); + X_JRNL_IO_OP_RES jrnlIoRes = m_jrnlPtr->X_JRNL_FN_ENQUEUE(mtokPtr, m_msgData, m_msgSize); + switch (jrnlIoRes) { + case X_JRNL_IO_OP_RES_SUCCESS: + i++; + misfireFlag = false; + break; + case X_JRNL_IO_OP_RES_BUSY: + if (!misfireFlag) { + std::cout << "-" << std::flush; + } + delete mtokPtr; + misfireFlag = true; + break; + case X_JRNL_IO_OP_RES_ENQCAPTHRESH: + if (!misfireFlag) { + std::cout << "*" << std::flush; + } + delete mtokPtr; + misfireFlag = true; + ::usleep(10); + break; + default: + delete mtokPtr; + std::cerr << "enqueue_data_record FAILED with " << X_JRNL_FN_GETIOSTR(jrnlIoRes) << std::endl; + } + } + /// \todo handle these results + X_JRNL_FN_FLUSH(m_jrnlPtr); + return NULL; +} + + +// *** MUST BE THREAD-SAFE **** +// This method will be called by multiple threads simultaneously +// Dequeue thread entry point +void* +Journal::runDequeues() +{ + uint32_t i = 0; + X_JRNL_IO_OP_RES jrnlIoRes; + while (i < m_numMsgs) { + X_DATA_TOKEN* mtokPtr = 0; + while (!mtokPtr) { + bool procAioEventsFlag; + { // --- START OF CRITICAL SECTION --- + X_SCOPED_LOCK l(m_unprocCallbacksMutex); + procAioEventsFlag = m_unprocCallbacks.empty(); + if (!procAioEventsFlag) { + mtokPtr = m_unprocCallbacks.back(); + m_unprocCallbacks.pop_back(); + } + } // --- END OF CRITICAL SECTION --- + if (procAioEventsFlag) { + m_jrnlPtr->X_JRNL_FN_GETEVENTS(0); + ::usleep(1); + } + } + bool done = false; + while (!done) { + jrnlIoRes = m_jrnlPtr->X_JRNL_FN_DEQUEUE(mtokPtr); + switch (jrnlIoRes) { + case X_JRNL_IO_OP_RES_SUCCESS: + i ++; + done = true; + break; + case X_JRNL_IO_OP_RES_BUSY: + //::usleep(10); + break; + default: + std::cerr << "dequeue_data_record FAILED with " << X_JRNL_FN_GETIOSTR(jrnlIoRes) << ": " + << X_JRNL_FN_GETDTOKSTATUS(mtokPtr) << std::endl; + delete mtokPtr; + done = true; + } + } + m_jrnlPtr->X_JRNL_FN_GETEVENTS(0); + } + /// \todo handle these results + X_JRNL_FN_FLUSH(m_jrnlPtr); + return NULL; +} + +//static +void* +Journal::startEnqueues(void* ptr) +{ + return reinterpret_cast(ptr)->runEnqueues(); +} + +//static +void* +Journal:: startDequeues(void* ptr) +{ + return reinterpret_cast(ptr)->runDequeues(); +} + +// *** MUST BE THREAD-SAFE **** +// This method will be called by multiple threads simultaneously +void +Journal::X_AIO_WR_CALLBACK(std::vector& msgTokenList) +{ + X_DATA_TOKEN* mtokPtr; + while (msgTokenList.size()) { + mtokPtr = msgTokenList.back(); + msgTokenList.pop_back(); +#ifdef JOURNAL2 + switch (mtokPtr->getDataOpState().get()) { + case qpid::asyncStore::jrnl2::OP_ENQUEUE: +#else + switch (mtokPtr->wstate()) { + case X_DATA_TOKEN::ENQ: +#endif + { // --- START OF CRITICAL SECTION --- + X_SCOPED_LOCK l(m_unprocCallbacksMutex); + m_unprocCallbacks.push_back(mtokPtr); + } // --- END OF CRITICAL SECTION --- + break; + default: + delete mtokPtr; + } + } +} + +// *** MUST BE THREAD-SAFE **** +// This method will be called by multiple threads simultaneously +void +Journal::X_AIO_RD_CALLBACK(std::vector& /*buffPageCtrlBlkIndexList*/) +{} + +}}} // namespace tests::storePerftools::jrnlPerf diff --git a/cpp/src/tests/storePerftools/jrnlPerf/Journal.h b/cpp/src/tests/storePerftools/jrnlPerf/Journal.h new file mode 100644 index 0000000000..acf1e43486 --- /dev/null +++ b/cpp/src/tests/storePerftools/jrnlPerf/Journal.h @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Journal.h + */ + +#ifndef tests_storePerftools_jrnlPerf_Journal_h_ +#define tests_storePerftools_jrnlPerf_Journal_h_ + +#ifdef JOURNAL2 +# include "qpid/asyncStore/jrnl2/AioCallback.h" +# include "qpid/asyncStore/jrnl2/AsyncJournal.h" +# include "qpid/asyncStore/jrnl2/ScopedLock.h" +#else +# include "jrnl/aio_callback.hpp" +# include "jrnl/smutex.hpp" +#endif + +#include // uint16_t, uint32_t + +#ifdef JOURNAL2 +# define X_AIO_CALLBACK qpid::asyncStore::jrnl2::AioCallback +# define X_AIO_RD_CALLBACK readAioCompleteCallback +# define X_AIO_WR_CALLBACK writeAioCompleteCallback +# define X_ASYNC_JOURNAL qpid::asyncStore::jrnl2::AsyncJournal +# define X_DATA_TOKEN qpid::asyncStore::jrnl2::DataToken +# define X_SCOPED_MUTEX qpid::asyncStore::jrnl2::ScopedMutex +#else +# define X_AIO_CALLBACK mrg::journal::aio_callback +# define X_AIO_RD_CALLBACK rd_aio_cb +# define X_AIO_WR_CALLBACK wr_aio_cb +# define X_ASYNC_JOURNAL mrg::journal::jcntl +# define X_DATA_TOKEN mrg::journal::data_tok +# define X_SCOPED_MUTEX mrg::journal::smutex +#endif + +#ifndef JOURNAL2 +namespace mrg { +namespace journal { +class jcntl; +}} // namespace mrg::journal +namespace qpid { +namespace asyncStore { +namespace jrnl2 { +class AsyncJournal; +}}} // namespace qpid::asyncStore::jrnl2 +#endif + +namespace tests { +namespace storePerftools { +namespace jrnlPerf { + +/** + * \brief Test journal instance. Each queue to be tested results in one instance of this class. + * + * Journal test harness which contains the journal to be tested. Each queue to be tested in the test parameters + * results in one instance of this class being instantiated, and consequently one set of journals on disk. The + * journal instance is provided as a pointer to the constructor. + */ +class Journal : public X_AIO_CALLBACK +{ +public: + /** + * \brief Constructor + * + * \param numMsgs Number of messages per thread to be enqueued then dequeued (ie both ways through broker) + * \param msgSize Size of each message being enqueued + * \param msgData Pointer to message content (all messages have identical content) + * \param jrnlPtr Pinter to journal instance which is to be tested + */ + Journal(const uint32_t numMsgs, + const uint32_t msgSize, + const char* msgData, + X_ASYNC_JOURNAL* const jrnlPtr); + + /** + * \brief virtual destructor + */ + virtual ~Journal(); + + /** + * \brief Worker thread enqueue task + * + * This function is the worker thread enqueue task entry point. It enqueues _numMsgs onto the journal instance. + * A data tokens is created for each record, this is the start of the data token life cycle. All possible + * returns from the journal are handled appropriately. Since the enqueue threads also perform + * callbacks on completed AIO operations, the data tokens from completed enqueues are placed onto the + * unprocessed callback list (_unprocCallbackList) for dequeueing by the dequeue worker thread(s). + * + * This function must be thread safe. + */ + void* runEnqueues(); + + /** + * \brief Worker thread dequeue task + * + * This function is the worker thread dequeue task entry point. It dequeues messages which are on the + * unprocessed callback list (_unprocCallbackList). + * + * This function must be thread safe. + */ + void* runDequeues(); + + /** + * \brief Helper function to launch the run() function when starting a thread. + */ + static void* startEnqueues(void* ptr); + + /** + * \brief Helper function to launch the run() function when starting a thread. + */ + static void* startDequeues(void* ptr); + + /** + * \brief Write callback function. When AIO operations return, this function is called. + * + * When AIO operations return, this function will sort the enqueue ops from the rest and place the data tokens + * of these records onto the unprocessed callback list (_unprocCallbackList) for dequeueing by another thread. + * + * Returning dequeue ops have their data tokens destroyed, as this is the end of the life cycle of the data + * tokens. + * + * Required by all subclasses of mrg::journal::aio_callback. + * + * \param dataTokenList A vector of data tokens for those messages which have completed their AIO write + * operations + */ + void X_AIO_WR_CALLBACK(std::vector& dataTokenList); + + /** + * \brief Read callback function. When read AIO operations return, this function is called. + * + * Not used in this test, but required by all subclasses of mrg::journal::aio_callback. + * + * \param buffPageCtrlBlkIndexList A vector of indices to the buffer page control blocks for completed reads + */ + void X_AIO_RD_CALLBACK(std::vector& buffPageCtrlBlkIndexList); + +protected: + const uint32_t m_numMsgs; ///< Number of messages to be processed by this journal instance + const uint32_t m_msgSize; ///< Size of each message (in bytes) + const char* m_msgData; ///< Pointer to message content to be used for each message. + X_ASYNC_JOURNAL* const m_jrnlPtr; ///< Journal instance pointer + std::vector m_unprocCallbacks; ///< List of unprocessed callbacks to be dequeued + X_SCOPED_MUTEX m_unprocCallbacksMutex; ///< Mutex which protects the unprocessed callback queue + + +}; + +}}} // namespace tests::storePerftools::jrnlPerf + +#endif // tests_storePerftools_jrnlPerf_Journal_h_ diff --git a/cpp/src/tests/storePerftools/jrnlPerf/JournalParameters.cpp b/cpp/src/tests/storePerftools/jrnlPerf/JournalParameters.cpp new file mode 100644 index 0000000000..2b07619041 --- /dev/null +++ b/cpp/src/tests/storePerftools/jrnlPerf/JournalParameters.cpp @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file JournalParameters.cpp + */ + +#include "JournalParameters.h" + +#include // std::atof, std::atoi, std::atol + +namespace tests { +namespace storePerftools { +namespace jrnlPerf { + +#ifndef JOURNAL2 +// static declarations - for jrnl2, these are inherited +std::string JournalParameters::s_defaultJrnlDir = "/tmp/store"; +std::string JournalParameters::s_defaultJrnlBaseFileName = "JournalData"; +uint16_t JournalParameters::s_defaultNumJrnlFiles = 8; +uint32_t JournalParameters::s_defaultJrnlFileSize_sblks = 3072; +uint16_t JournalParameters::s_defaultWriteBuffNumPgs = 32; +uint32_t JournalParameters::s_defaultWriteBuffPgSize_sblks = 128; +#endif + +JournalParameters::JournalParameters() : +#ifdef JOURNAL2 + qpid::asyncStore::jrnl2::JournalParameters() +#else + Parameters(), + m_jrnlDir(s_defaultJrnlDir), + m_jrnlBaseFileName(s_defaultJrnlBaseFileName), + m_numJrnlFiles(s_defaultNumJrnlFiles), + m_jrnlFileSize_sblks(s_defaultJrnlFileSize_sblks), + m_writeBuffNumPgs(s_defaultWriteBuffNumPgs), + m_writeBuffPgSize_sblks(s_defaultWriteBuffPgSize_sblks) +#endif +{} + +JournalParameters::JournalParameters(const std::string& jrnlDir, + const std::string& jrnlBaseFileName, + const uint16_t numJrnlFiles, + const uint32_t jrnlFileSize_sblks, + const uint16_t writeBuffNumPgs, + const uint32_t writeBuffPgSize_sblks) : +#ifdef JOURNAL2 + qpid::asyncStore::jrnl2::JournalParameters(jrnlDir, jrnlBaseFileName, numJrnlFiles, jrnlFileSize_sblks, writeBuffNumPgs, + writeBuffPgSize_sblks) +#else + Parameters(), + m_jrnlDir(jrnlDir), + m_jrnlBaseFileName(jrnlBaseFileName), + m_numJrnlFiles(numJrnlFiles), + m_jrnlFileSize_sblks(jrnlFileSize_sblks), + m_writeBuffNumPgs(writeBuffNumPgs), + m_writeBuffPgSize_sblks(writeBuffPgSize_sblks) +#endif +{} + +#ifdef JOURNAL2 +JournalParameters::JournalParameters(const qpid::asyncStore::jrnl2::JournalParameters& jp) : + qpid::asyncStore::jrnl2::JournalParameters(jp) +{} +#endif + +JournalParameters::JournalParameters(const JournalParameters& jp) : +#ifdef JOURNAL2 + qpid::asyncStore::jrnl2::JournalParameters(jp) +#else + Parameters(), + m_jrnlDir(jp.m_jrnlDir), + m_jrnlBaseFileName(jp.m_jrnlBaseFileName), + m_numJrnlFiles(jp.m_numJrnlFiles), + m_jrnlFileSize_sblks(jp.m_jrnlFileSize_sblks), + m_writeBuffNumPgs(jp.m_writeBuffNumPgs), + m_writeBuffPgSize_sblks(jp.m_writeBuffPgSize_sblks) +#endif +{} + +JournalParameters::~JournalParameters() +{} + +void +JournalParameters::toStream(std::ostream& os) const +{ + os << "Journal Parameters:" << std::endl; + os << " jrnlDir = \"" << m_jrnlDir << "\"" << std::endl; + os << " jrnlBaseFileName = \"" << m_jrnlBaseFileName << "\"" << std::endl; + os << " numJrnlFiles = " << m_numJrnlFiles << std::endl; + os << " jrnlFileSize_sblks = " << m_jrnlFileSize_sblks << std::endl; + os << " writeBuffNumPgs = " << m_writeBuffNumPgs << std::endl; + os << " writeBuffPgSize_sblks = " << m_writeBuffPgSize_sblks << std::endl; +} + +bool +JournalParameters::parseArg(const int arg, + const char* optarg) +{ + switch(arg) { + case 'j': + m_jrnlDir.assign(optarg); + break; + case 'b': + m_jrnlBaseFileName.assign(optarg); + break; + case 'f': + m_numJrnlFiles = uint16_t(std::atoi(optarg)); + break; + case 's': + m_jrnlFileSize_sblks = uint32_t(std::atol(optarg)); + break; + case 'p': + m_writeBuffNumPgs = uint16_t(std::atoi(optarg)); + break; + case 'c': + m_writeBuffPgSize_sblks = uint32_t(std::atol(optarg)); + break; + default: + return false; + } + return true; +} + +// static +void +JournalParameters::printArgs(std::ostream& os) +{ + os << "Journal parameters:" << std::endl; + os << " -j --jrnl_dir: Store directory [\"" + << JournalParameters::s_defaultJrnlDir << "\"]" << std::endl; + os << " -b --jrnl_base_filename: Base name for journal files [\"" + << JournalParameters::s_defaultJrnlBaseFileName << "\"]" << std::endl; + os << " -f --num_jfiles: Number of journal files [" + << JournalParameters::s_defaultNumJrnlFiles << "]" << std::endl; + os << " -s --jfsize_sblks: Size of each journal file in sblks (512 byte blocks) [" + << JournalParameters::s_defaultJrnlFileSize_sblks << "]" << std::endl; + os << " -p --wcache_num_pages: Number of write buffer pages [" + << JournalParameters::s_defaultWriteBuffNumPgs << "]" << std::endl; + os << " -c --wcache_pgsize_sblks: Size of each write buffer page in sblks (512 byte blocks) [" + << JournalParameters::s_defaultWriteBuffPgSize_sblks << "]" << std::endl; +} + +// static +std::string +JournalParameters::shortArgs() +{ + return "j:b:f:s:p:c:"; +} + +}}} // namespace tests::storePerftools::jrnlPerf diff --git a/cpp/src/tests/storePerftools/jrnlPerf/JournalParameters.h b/cpp/src/tests/storePerftools/jrnlPerf/JournalParameters.h new file mode 100644 index 0000000000..5367be9a4d --- /dev/null +++ b/cpp/src/tests/storePerftools/jrnlPerf/JournalParameters.h @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file JournalParameters.h + */ + +#ifndef tests_storePerftools_jrnlPerf_JournalParameters_h_ +#define tests_storePerftools_jrnlPerf_JournalParameters_h_ + +#include "tests/storePerftools/common/Parameters.h" + +#ifdef JOURNAL2 +# include "qpid/asyncStore/jrnl2/JournalParameters.h" +#endif + +#include // uint16_6, uint32_t + +namespace tests { +namespace storePerftools { +namespace jrnlPerf { + +/** + * \brief Stuct for aggregating the common journal parameters + * + * This struct is used to aggregate and keep together all the common journal parameters. These affect the journal + * geometry and buffers. The test parameters are aggregated in class JrnlPerfTestParameters. + */ +class JournalParameters : +#ifdef JOURNAL2 + public qpid::asyncStore::jrnl2::JournalParameters, +#endif + public tests::storePerftools::common::Parameters +{ +public: +#ifndef JOURNAL2 + // static default store params + static std::string s_defaultJrnlDir; ///< Default journal directory + static std::string s_defaultJrnlBaseFileName; ///< Default journal base file name + static uint16_t s_defaultNumJrnlFiles; ///< Default number of journal data files + static uint32_t s_defaultJrnlFileSize_sblks; ///< Default journal data file size in softblocks + static uint16_t s_defaultWriteBuffNumPgs; ///< Default number of write buffer pages + static uint32_t s_defaultWriteBuffPgSize_sblks; ///< Default size of each write buffer page in softblocks + + std::string m_jrnlDir; ///< Journal directory + std::string m_jrnlBaseFileName; ///< Journal base file name + uint16_t m_numJrnlFiles; ///< Number of journal data files + uint32_t m_jrnlFileSize_sblks; ///< Journal data file size in softblocks + uint16_t m_writeBuffNumPgs; ///< Number of write buffer pages + uint32_t m_writeBuffPgSize_sblks; ///< Size of each write buffer page in softblocks +#endif + + /** + * \brief Default constructor + * + * Default constructor. Uses the default values for all parameters. + */ + JournalParameters(); + + /** + * \brief Constructor + * + * Convenience constructor. + * + * \param jrnlDir Journal directory + * \param jrnlBaseFileName Journal base file name + * \param numJrnlFiles Number of journal data files + * \param jrnlFileSize_sblks Journal data file size in softblocks + * \param writeBuffNumPgs Number of write buffer pages + * \param writeBuffPgSize_sblks Size of each write buffer page in softblocks + */ + JournalParameters(const std::string& jrnlDir, + const std::string& jrnlBaseFileName, + const uint16_t numJrnlFiles, + const uint32_t jrnlFileSize_sblks, + const uint16_t writeBuffNumPgs, + const uint32_t writeBuffPgSize_sblks); + + /** + * \brief Copy constructor + * + * \param jp Reference to JrnlParameters instance to be copied + */ +#ifdef JOURNAL2 + JournalParameters(const qpid::asyncStore::jrnl2::JournalParameters& jp); +#endif + JournalParameters(const JournalParameters& jp); + + /** + * \brief Virtual destructor + */ + virtual ~JournalParameters(); + + virtual bool parseArg(const int arg, + const char* optarg); + + static void printArgs(std::ostream& os); + + static std::string shortArgs(); + + /*** + * \brief Stream the journal parameters to an output stream + * + * Convenience feature which streams a multi-line representation of all the journal parameters, one per line to + * an output stream. + * + * \param os Output stream to which the class data is to be streamed + */ + void toStream(std::ostream& os = std::cout) const; + +}; + +}}} // namespace tests::storePerftools::jrnlPerf + +#endif // tests_storePerftools_jrnlPerf_JournalParameters_h_ diff --git a/cpp/src/tests/storePerftools/jrnlPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/jrnlPerf/PerfTest.cpp new file mode 100644 index 0000000000..9e322686ca --- /dev/null +++ b/cpp/src/tests/storePerftools/jrnlPerf/PerfTest.cpp @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file PerfTest.cpp + */ + +#include "PerfTest.h" + +#include "Journal.h" +#include "JournalParameters.h" + +#include "tests/storePerftools/version.h" +#include "tests/storePerftools/common/ScopedTimer.h" +#include "tests/storePerftools/common/TestParameters.h" +#include "tests/storePerftools/common/Thread.h" + +#ifdef JOURNAL2 +# include "qpid/asyncStore/jrnl2/AsyncJournal.h" +# include "qpid/asyncStore/jrnl2/JournalDirectory.h" +#else +# include "jrnl/jcntl.hpp" +# include "jrnl/jdir.hpp" +#endif + +#include +#include // getopt_long(), required_argument, no_argument +#include // std::setw() std::setfill() +#include // std::ostringstream +#include // uint16_t, uint32_t + +#ifdef ECLIPSE_CDT_ANNOYANCE // This prevents problems with Eclipse CODAN, which can't see this in getopt.h + struct option; + extern int getopt_long (int, char *const *, const char *, const struct option *, int *) __THROW; +# define no_argument 0 +# define required_argument 1 +# define optional_argument 2 +#endif + +namespace tests { +namespace storePerftools { +namespace jrnlPerf { + +PerfTest::PerfTest(const tests::storePerftools::common::TestParameters& tp, + const JournalParameters& jp) : + Streamable(), + m_testParams(tp), + m_jrnlParams(jp), + m_testResult(tp), + m_msgData(new char[tp.m_msgSize]) +{} + +PerfTest::~PerfTest() +{ + delete[] m_msgData; +} + +void +PerfTest::prepareJournals(std::vector& jrnlList) +{ +#ifdef JOURNAL2 + if (qpid::asyncStore::jrnl2::JournalDirectory::s_exists(m_jrnlParams.m_jrnlDir)) { + qpid::asyncStore::jrnl2::JournalDirectory::s_destroy(m_jrnlParams.m_jrnlDir); + } + qpid::asyncStore::jrnl2::JournalDirectory::s_create(m_jrnlParams.m_jrnlDir); + qpid::asyncStore::jrnl2::AsyncJournal* jp; +#else + if (mrg::journal::jdir::exists(m_jrnlParams.m_jrnlDir)) { + mrg::journal::jdir::delete_dir(m_jrnlParams.m_jrnlDir); + } + mrg::journal::jdir::create_dir(m_jrnlParams.m_jrnlDir); + mrg::journal::jcntl* jp; +#endif + Journal* ptp; + for (uint16_t j = 0; j < m_testParams.m_numQueues; j++) { + std::ostringstream jname; + jname << "jrnl_" << std::setw(4) << std::setfill('0') << j; + std::ostringstream jdir; + jdir << m_jrnlParams.m_jrnlDir << "/" << jname.str(); +#ifdef JOURNAL2 + jp = new qpid::asyncStore::jrnl2::AsyncJournal(jname.str(), jdir.str(), m_jrnlParams.m_jrnlBaseFileName); +#else + jp = new mrg::journal::jcntl(jname.str(), jdir.str(), m_jrnlParams.m_jrnlBaseFileName); +#endif + ptp = new Journal(m_testParams.m_numMsgs, m_testParams.m_msgSize, m_msgData, jp); +#ifdef JOURNAL2 + jp->initialize(&m_jrnlParams, ptp); +#else + jp->initialize(m_jrnlParams.m_numJrnlFiles, false, m_jrnlParams.m_numJrnlFiles, + m_jrnlParams.m_jrnlFileSize_sblks, m_jrnlParams.m_writeBuffNumPgs, + m_jrnlParams.m_writeBuffPgSize_sblks, ptp); +#endif + + jrnlList.push_back(ptp); + } +} + +void +PerfTest::destroyJournals(std::vector& jrnlList) +{ + while (jrnlList.size()) { + delete jrnlList.back(); + jrnlList.pop_back(); + } +} + +void +PerfTest::run() +{ + std::vector jrnlList; + prepareJournals(jrnlList); + + std::deque threads; + tests::storePerftools::common::Thread* tp; + { // --- Start of timed section --- + tests::storePerftools::common::ScopedTimer st(m_testResult); + + for (uint16_t q = 0; q < m_testParams.m_numQueues; q++) { + for (uint16_t t = 0; t < m_testParams.m_numEnqThreadsPerQueue; t++) { + tp = new tests::storePerftools::common::Thread(jrnlList[q]->startEnqueues, reinterpret_cast(jrnlList[q])); + threads.push_back(tp); + } + for (uint16_t dt = 0; dt < m_testParams.m_numDeqThreadsPerQueue; ++dt) { + tp = new tests::storePerftools::common::Thread(jrnlList[q]->startDequeues, reinterpret_cast(jrnlList[q])); + threads.push_back(tp); + } + } + + while (threads.size()) { + threads.front()->join(); + delete threads.front(); + threads.pop_front(); + } + } // --- End of timed section --- + destroyJournals(jrnlList); +} + +void +PerfTest::toStream(std::ostream& os) const +{ + os << m_testParams << std::endl; + os << m_jrnlParams << std::endl; + os << m_testResult << std::endl; +} + +void +printArgs(std::ostream& os) +{ + os << " -h --help: This help message" << std::endl; + os << std::endl; + + tests::storePerftools::common::TestParameters::printArgs(os); + os << std::endl; + + JournalParameters::printArgs(os); + os << std::endl; +} + +bool +readArgs(int argc, + char** argv, + tests::storePerftools::common::TestParameters& tp, + JournalParameters& jp) +{ + /// \todo TODO: At some point, find an easy way to aggregate these from JrnlPerfTestParameters and JrnlParameters themselves. + static struct option long_options[] = { + {"help", no_argument, 0, 'h'}, + {"version", no_argument, 0, 'v'}, + + // Test params + {"num_msgs", required_argument, 0, 'm'}, + {"msg_size", required_argument, 0, 'S'}, + {"num_queues", required_argument, 0, 'q'}, + {"num_enq_threads_per_queue", required_argument, 0, 'e'}, + {"num_deq_threads_per_queue", required_argument, 0, 'd'}, + + // Journal params + {"jrnl_dir", required_argument, 0, 'j'}, + {"jrnl_base_filename", required_argument, 0, 'b'}, + {"num_jfiles", required_argument, 0, 'f'}, + {"jfsize_sblks", required_argument, 0, 's'}, + {"wcache_num_pages", required_argument, 0, 'p'}, + {"wcache_pgsize_sblks", required_argument, 0, 'c'}, + + {0, 0, 0, 0} + }; + + bool err = false; + bool ver = false; + int c = 0; + while (true) { + int option_index = 0; + std::ostringstream oss; + oss << "hv" << tests::storePerftools::common::TestParameters::shortArgs() << JournalParameters::shortArgs(); + c = getopt_long(argc, argv, oss.str().c_str(), long_options, &option_index); + if (c == -1) break; + if (c == 'v') { + std::cout << tests::storePerftools::name() << " v." << tests::storePerftools::version() << std::endl; + ver = true; + break; + } + err = !(tp.parseArg(c, optarg) || jp.parseArg(c, optarg)); + } + if (err) { + std::cout << std::endl; + printArgs(); + } + return err || ver; +} + +}}} // namespace tests::storePerftools::jrnlPerf + +// ----------------------------------------------------------------- + +int +main(int argc, char** argv) +{ + tests::storePerftools::common::TestParameters tp; + tests::storePerftools::jrnlPerf::JournalParameters jp; + if (tests::storePerftools::jrnlPerf::readArgs(argc, argv, tp, jp)) return 1; + tests::storePerftools::jrnlPerf::PerfTest jpt(tp, jp); + jpt.run(); + std::cout << jpt << std::endl; + return 0; +} diff --git a/cpp/src/tests/storePerftools/jrnlPerf/PerfTest.h b/cpp/src/tests/storePerftools/jrnlPerf/PerfTest.h new file mode 100644 index 0000000000..d713cb837f --- /dev/null +++ b/cpp/src/tests/storePerftools/jrnlPerf/PerfTest.h @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file PerfTest.h + */ + +#ifndef tests_storePerftools_jrnlPerf_PerfTest_h_ +#define tests_storePerftools_jrnlPerf_PerfTest_h_ + +#include "TestResult.h" +#include "tests/storePerftools/common/Streamable.h" + +#include + +namespace tests { +namespace storePerftools { +namespace common { +class TestParameters; +} +namespace jrnlPerf { + +class Journal; +class JournalParameters; + +/** + * \brief Main test class; Create an instance and execute run() + * + * Main test class which aggregates the components of a test. + */ +class PerfTest : public tests::storePerftools::common::Streamable +{ +public: + /** + * \brief Constructor + * + * \param tp Test parameters for the test + * \param jp Journal parameters for all queues (journals) in the test + */ + PerfTest(const tests::storePerftools::common::TestParameters& tp, + const JournalParameters& jp); + + /** + * \brief Virtual destructor + */ + virtual ~PerfTest(); + + /** + * \brief Runs the test and prints out the results. + * + * Runs the test as set by the test parameters and journal parameters. + */ + void run(); + + /** + * \brief Stream the test setup and results to an output stream + * + * Convenience feature which streams the test setup and results to an output stream. + * + * \param os Output stream to which the test setup and results are to be streamed. + */ + void toStream(std::ostream& os = std::cout) const; + +protected: + const tests::storePerftools::common::TestParameters& m_testParams; ///< Ref to a struct containing test params + const JournalParameters& m_jrnlParams; ///< Ref to a struct containing the journal parameters + TestResult m_testResult; ///< Journal performance object + const char* m_msgData; ///< Pointer to msg data, which is the same for all messages + + /** + * \brief Creates journals and JrnlInstance classes for all journals (queues) to be tested + * + * Creates a new journal instance and JrnlInstance instance for each queue. The journals are initialized + * which creates a new set of journal files on the local storage media (which is determined by path in + * JrnlParameters._jrnlDir). This activity is not timed, and is not a part of the performance test per se. + * + * \param jrnlList List which will be filled with pointers to the newly prepared journals + */ + void prepareJournals(std::vector& jrnlList); + + /** + * \brief Destroy the journal instances in list jrnlList + * + * \param jrnlList List of pointers to journals to be destroyed + */ + void destroyJournals(std::vector& jrnlList); + +}; + +/** + * \brief Print out the program arguments + * + * Print out the arguments to the performance program if requested by help or a parameter error. + * + * \param os Stream to which the arguments should be streamed. + */ +void printArgs(std::ostream& os = std::cout); + +/** + * \brief Process the command-line arguments + * + * Process the command-line arguments and populate the JrnlPerfTestParameters and JrnlParameters structs. Only the + * arguments supplied are on the command-line are changed in these structs, the others remain unchanged. It is + * important therefore to make sure that defaults are pre-loaded (the default behavior of the default constructors + * for these structs). + * + * \param argc Number of command-line arguments. Process directly from main(). + * \param argv Pointer to array of command-line argument pointers. Process directly from main(). + * \param tp Reference to test parameter object. Only params on the command-line are changed. + * \param jp Reference to journal parameter object. Only params on the command-line are changed. + */ +bool readArgs(int argc, + char** argv, + tests::storePerftools::common::TestParameters& tp, + JournalParameters& jp); + +}}} // namespace tests::storePerftools::jrnlPerf + +#endif // tests_storePerftools_jrnlPerf_PerfTest_h_ diff --git a/cpp/src/tests/storePerftools/jrnlPerf/TestResult.cpp b/cpp/src/tests/storePerftools/jrnlPerf/TestResult.cpp new file mode 100644 index 0000000000..9fe214726d --- /dev/null +++ b/cpp/src/tests/storePerftools/jrnlPerf/TestResult.cpp @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestResult.cpp + */ + +#include "TestResult.h" + +#include // uint32_t + +namespace tests { +namespace storePerftools { +namespace jrnlPerf { + +TestResult::TestResult(const tests::storePerftools::common::TestParameters& tp) : + tests::storePerftools::common::TestResult(), + m_testParams(tp) +{} + +TestResult::~TestResult() +{} + +void +TestResult::toStream(std::ostream& os) const +{ + double msgsRate; + os << "TEST RESULTS:" << std::endl; + os << " Msgs per thread: " << m_testParams.m_numMsgs << std::endl; + os << " Msg size: " << m_testParams.m_msgSize << std::endl; + os << " No. queues: " << m_testParams.m_numQueues << std::endl; + os << " No. enq threads/queue: " << m_testParams.m_numEnqThreadsPerQueue << std::endl; + os << " No. deq threads/queue: " << m_testParams.m_numDeqThreadsPerQueue << std::endl; + os << " Time taken: " << m_elapsed << " sec" << std::endl; + uint32_t msgsPerQueue = m_testParams.m_numMsgs * m_testParams.m_numEnqThreadsPerQueue; + if (m_testParams.m_numQueues > 1) { + msgsRate = double(msgsPerQueue) / m_elapsed; + os << " No. msgs per queue: " << msgsPerQueue << std::endl; + os << "Per queue msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl; + os << " " << (msgsRate * m_testParams.m_msgSize / 1e6) << " MB/sec" << std::endl; + } + uint32_t totalMsgs = msgsPerQueue * m_testParams.m_numQueues; + msgsRate = double(totalMsgs) / m_elapsed; + os << " Total no. msgs: " << totalMsgs << std::endl; + os << " Broker msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl; + os << " " << (msgsRate * m_testParams.m_msgSize / 1e6) << " MB/sec" << std::endl; +} + +}}} // namespace tests::storePerftools::jrnlPerf diff --git a/cpp/src/tests/storePerftools/jrnlPerf/TestResult.h b/cpp/src/tests/storePerftools/jrnlPerf/TestResult.h new file mode 100644 index 0000000000..620a7dcedd --- /dev/null +++ b/cpp/src/tests/storePerftools/jrnlPerf/TestResult.h @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestResult.h + */ + +#ifndef tests_storePerftools_jrnlPerf_TestResult_h_ +#define tests_storePerftools_jrnlPerf_TestResult_h_ + +#include "tests/storePerftools/common/TestParameters.h" +#include "tests/storePerftools/common/TestResult.h" + +namespace tests { +namespace storePerftools { +namespace jrnlPerf { + +class TestOptions; + +/** + * \brief Results class that accepts an elapsed time to calculate the rate of message throughput in the journal. + * + * This class (being subclassed from ScopedTimable) is passed to a ScopedTimer object on construction, and the + * inherited _elapsed member will be written with the calculated elapsed time (in seconds) on destruction of the + * ScopedTimer object. This time (initially set to 0.0) is used to calculate message and message byte throughput. + * The message number and size information comes from the JrnlPerfTestParameters object passed to the constructor. + * + * Results are available through the use of toStream(), toString() or the << operators. + * + * Output is in the following format: + *
+ * TEST RESULTS:
+ *     Msgs per thread: 10000
+ *            Msg size: 2048
+ *          No. queues: 2
+ *   No. threads/queue: 2
+ *          Time taken: 1.6626 sec
+ *      Total no. msgs: 40000
+ *      Msg throughput: 24.0587 kMsgs/sec
+ *                      49.2723 MB/sec
+ * 
+ */ +class TestResult : public tests::storePerftools::common::TestResult +{ +public: + /** + * \brief Constructor + * + * Constructor. Will start the time interval measurement. + * + * \param tp Test parameter details used to calculate the performance results. + */ + TestResult(const tests::storePerftools::common::TestParameters& tp); + + /** + * \brief Virtual destructor + */ + virtual ~TestResult(); + + /** + * \brief Stream the performance test results to an output stream + * + * Convenience feature which streams a multi-line performance result an output stream. + * + * \param os Output stream to which the results are to be streamed + */ + void toStream(std::ostream& os = std::cout) const; + +protected: + tests::storePerftools::common::TestParameters m_testParams; ///< Test parameters used for performance calculations + +}; + +}}} // namespace tests::storePerftools::jrnlPerf + +#endif // tests_storePerftools_jrnlPerf_TestResult_h_ diff --git a/cpp/src/tests/storePerftools/version.h b/cpp/src/tests/storePerftools/version.h new file mode 100644 index 0000000000..311b145330 --- /dev/null +++ b/cpp/src/tests/storePerftools/version.h @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file version.h + */ + +#ifndef tests_storePerftools_version_h_ +#define tests_storePerftools_version_h_ + +#include +#include + +namespace tests { +namespace storePerftools { + +static const int versionMajor = 0; +static const int versionMinor = 0; +static const int versionRevision = 1; + +std::string name() { + return "Qpid async store perftools"; +} + +std::string version() { + std::ostringstream oss; + oss << versionMajor << "." << versionMinor << "." << versionRevision; + return oss.str(); +} + +}} // namespace tests::perftools + +#endif // tests_storePerftools_version_h_ -- cgit v1.2.1