diff options
Diffstat (limited to 'cpp/src/tests/legacystore')
22 files changed, 3282 insertions, 0 deletions
diff --git a/cpp/src/tests/legacystore/.valgrind.supp b/cpp/src/tests/legacystore/.valgrind.supp new file mode 100644 index 0000000000..5c1c5377bf --- /dev/null +++ b/cpp/src/tests/legacystore/.valgrind.supp @@ -0,0 +1,35 @@ +{ + <insert_a_suppression_name_here> + Memcheck:Leak + fun:_Znwm + fun:_ZNSs4_Rep9_S_createEmmRKSaIcE + fun:_ZNSs12_S_constructIPKcEEPcT_S3_RKSaIcESt20forward_iterator_tag + fun:_ZNSsC1EPKcRKSaIcE +} + +{ + <insert_a_suppression_name_here> + Memcheck:Leak + fun:_Znwm + fun:_ZNSs4_Rep9_S_createEmmRKSaIcE + fun:_ZNSs4_Rep8_M_cloneERKSaIcEm + fun:_ZNSs7reserveEm +} + +{ + <insert_a_suppression_name_here> + Memcheck:Leak + fun:_Znwm + fun:_ZNSs4_Rep9_S_createEmmRKSaIcE + fun:_ZNSs9_M_mutateEmmm + fun:_ZNSs15_M_replace_safeEmmPKcm +} + +{ + <insert_a_suppression_name_here> + Memcheck:Leak + fun:_Znwm + fun:_ZNSs4_Rep9_S_createEmmRKSaIcE + fun:_ZNSsC1IPcEET_S1_RKSaIcE +} + diff --git a/cpp/src/tests/legacystore/.valgrindrc b/cpp/src/tests/legacystore/.valgrindrc new file mode 100644 index 0000000000..4aba7661de --- /dev/null +++ b/cpp/src/tests/legacystore/.valgrindrc @@ -0,0 +1,7 @@ +--gen-suppressions=all +--leak-check=full +--demangle=yes +--suppressions=.valgrind.supp +--num-callers=25 +--trace-children=yes + diff --git a/cpp/src/tests/legacystore/CMakeLists.txt b/cpp/src/tests/legacystore/CMakeLists.txt new file mode 100644 index 0000000000..6cfaa7ec17 --- /dev/null +++ b/cpp/src/tests/legacystore/CMakeLists.txt @@ -0,0 +1,117 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if(BUILD_LEGACYSTORE) + +message(STATUS "Building legacystore tests") + +# Enable dashboard reporting. +include (CTest) + +# Make sure that everything get built before the tests +# Need to create a var with all the necessary top level targets + +# If we're linking Boost for DLLs, turn that on for the unit test too. +if (QPID_LINK_BOOST_DYNAMIC) + add_definitions(-DBOOST_TEST_DYN_LINK) +endif (QPID_LINK_BOOST_DYNAMIC) + +include_directories( ${CMAKE_CURRENT_SOURCE_DIR} ) + +include (FindPythonInterp) + +# # Inherit environment from parent script +# set (abs_srcdir ${CMAKE_CURRENT_SOURCE_DIR}) +# set (abs_builddir ${CMAKE_CURRENT_BINARY_DIR}) +# set (abs_top_srcdir ${CMAKE_SOURCE_DIR}) +# set (abs_top_builddir ${CMAKE_BINARY_DIR}) +# set (builddir_lib_suffix "") + +# If valgrind is selected in the configuration step, set up the path to it +# for CTest. +if (ENABLE_VALGRIND) + set (MEMORYCHECK_COMMAND ${VALGRIND}) + set (MEMORYCHECK_COMMAND_OPTIONS "--gen-suppressions=all +--leak-check=full +--demangle=yes +--suppressions=${CMAKE_CURRENT_SOURCE_DIR}/.valgrind.supp +--num-callers=25 +--log-file=ctest_valgrind.vglog") +endif (ENABLE_VALGRIND) + +# Like this to work with cmake 2.4 on Unix +set (qpid_test_boost_libs + ${Boost_UNIT_TEST_FRAMEWORK_LIBRARY} ${Boost_SYSTEM_LIBRARY}) + +# +# Unit test program +# +# Unit tests are built as a single program to reduce valgrind overhead +# when running the tests. If you want to build a subset of the tests run +# ccmake and set unit_tests_to_build to the set you want to build. +# HACK ALERT - Unit tests are built individually to resolve a conflict +# with running multiple brokers that connect to 0.0.0.0:5672 and that +# womp on each other's store directory. + +# +# define_selftest +# macro to accept the name of a single source file and to create a +# unit test executable that runs the source. +# +MACRO (define_selftest theSourceFile) +add_executable (legacystore_${theSourceFile} + unit_test + ${theSourceFile} + ${platform_test_additions}) +target_link_libraries (legacystore_${theSourceFile} + ${qpid_test_boost_libs} + qpidmessaging qpidbroker qmfconsole legacystore) +get_property(ls_include TARGET legacystore_${theSourceFile} PROPERTY INCLUDE_DIRECTORIES) +list(APPEND ls_include ${abs_top_srcdir}/src/qpid/legacystore) +list(APPEND ls_include ${abs_top_srcdir}/src/tests) +set_target_properties (legacystore_${theSourceFile} PROPERTIES + INCLUDE_DIRECTORIES "${ls_include}" + COMPILE_DEFINITIONS _IN_QPID_BROKER) +remember_location(legacystore_${theSourceFile}) +set(test_wrap ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_test${test_script_suffix}) + +add_test (legacystore_${theSourceFile} ${test_wrap} ${legacystore_${theSourceFile}_LOCATION}) +ENDMACRO (define_selftest) + +# add_definitions(-H) + +define_selftest (SimpleTest) +define_selftest (OrderingTest) +define_selftest (TransactionalTest) +define_selftest (TwoPhaseCommitTest) + +# +# Other test programs +# + +# This should ideally be done as part of the test run, but I don't know a way +# to get these arguments and the working directory set like Makefile.am does, +# and have that run during the test pass. +if (PYTHON_EXECUTABLE) + set (python_bld ${CMAKE_CURRENT_BINARY_DIR}/python) + execute_process(COMMAND ${PYTHON_EXECUTABLE} setup.py install --prefix=${pythoon_bld} --install-lib=${python_bld} --install-scripts=${python_bld}/commands + WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/../python) +endif (PYTHON_EXECUTABLE) + +endif (BUILD_LEGACYSTORE) diff --git a/cpp/src/tests/legacystore/MessageUtils.h b/cpp/src/tests/legacystore/MessageUtils.h new file mode 100644 index 0000000000..6552357c72 --- /dev/null +++ b/cpp/src/tests/legacystore/MessageUtils.h @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/broker/Message.h> +#include <qpid/broker/Queue.h> +#include <qpid/broker/amqp_0_10/MessageTransfer.h> +#include <qpid/framing/AMQFrame.h> +#include <qpid/framing/all_method_bodies.h> +#include <qpid/framing/Uuid.h> + +using namespace qpid::broker; +using namespace qpid::framing; + +struct MessageUtils +{ + static Message createMessage(const std::string& exchange, const std::string& routingKey, + const Uuid& messageId=Uuid(), const bool durable = false, + const uint64_t contentSize = 0, const std::string& correlationId = std::string()) + { + boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg(new qpid::broker::amqp_0_10::MessageTransfer()); + + AMQFrame method(( MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); + AMQFrame header((AMQHeaderBody())); + + msg->getFrames().append(method); + msg->getFrames().append(header); + MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(contentSize); + props->setMessageId(messageId); + props->setCorrelationId(correlationId); + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); + if (durable) + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(PERSISTENT); + return Message(msg, msg); + } + + static void addContent(Message msg, const std::string& data) + { + AMQFrame content((AMQContentBody(data))); + qpid::broker::amqp_0_10::MessageTransfer::get(msg).getFrames().append(content); + } + + struct MessageRetriever : public Consumer + { + MessageRetriever(Queue& q) : Consumer("test", CONSUMER), queue(q) {}; + + bool deliver(const QueueCursor& c, const Message& m) + { + message = m; + cursor = c; + return true; + }; + void notify() {} + void cancel() {} + void acknowledged(const DeliveryRecord&) {} + OwnershipToken* getSession() { return 0; } + + const Queue& queue; + Message message; + QueueCursor cursor; + }; + + static Message get(Queue& queue, QueueCursor* cursor = 0) + { + boost::shared_ptr<MessageRetriever> consumer(new MessageRetriever(queue)); + if (!queue.dispatch(consumer))throw qpid::Exception("No message found!"); + if (cursor) *cursor = consumer->cursor; + return consumer->message; + } + + static Uuid getMessageId(const Message& message) + { + return qpid::broker::amqp_0_10::MessageTransfer::get(message).getProperties<MessageProperties>()->getMessageId(); + } + + static std::string getCorrelationId(const Message& message) + { + return qpid::broker::amqp_0_10::MessageTransfer::get(message).getProperties<MessageProperties>()->getCorrelationId(); + } + + static void deliver(Message& msg, FrameHandler& h, uint16_t framesize) + { + qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendHeader(h, framesize, false, 0, 0, qpid::types::Variant::Map()); + qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendContent(h, framesize); + } + +}; diff --git a/cpp/src/tests/legacystore/OrderingTest.cpp b/cpp/src/tests/legacystore/OrderingTest.cpp new file mode 100644 index 0000000000..92a09f0c60 --- /dev/null +++ b/cpp/src/tests/legacystore/OrderingTest.cpp @@ -0,0 +1,168 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "unit_test.h" + +#include "qpid/legacystore/MessageStoreImpl.h" +#include <iostream> +#include "MessageUtils.h" +#include <qpid/broker/Queue.h> +#include <qpid/broker/RecoveryManagerImpl.h> +#include <qpid/framing/AMQHeaderBody.h> +#include "qpid/log/Logger.h" +#include "qpid/sys/Timer.h" + +using namespace qpid; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace mrg::msgstore; + +qpid::broker::Broker::Options opts; +qpid::broker::Broker br(opts); + +QPID_AUTO_TEST_SUITE(OrderingTest) + +#define SET_LOG_LEVEL(level) \ + qpid::log::Options opts(""); \ + opts.selectors.clear(); \ + opts.selectors.push_back(level); \ + qpid::log::Logger::instance().configure(opts); + +const std::string test_filename("OrderingTest"); +const char* tdp = getenv("TMP_DATA_DIR"); +const std::string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/OrderingTest"); + +// === Helper fns === + +const std::string name("OrderingQueue"); +std::auto_ptr<MessageStoreImpl> store; +QueueRegistry queues; +Queue::shared_ptr queue; +std::queue<Uuid> ids; + +class TestConsumer : public Consumer +{ + public: + + TestConsumer(Queue::shared_ptr q, std::queue<Uuid>& i) : Consumer("test", CONSUMER), queue(q), ids(i) {}; + + bool deliver(const QueueCursor& cursor, const Message& message) + { + queue->dequeue(0, cursor); + BOOST_CHECK_EQUAL(ids.front(), MessageUtils::getMessageId(message)); + ids.pop(); + return true; + }; + void notify() {} + void cancel() {} + void acknowledged(const DeliveryRecord&) {} + OwnershipToken* getSession() { return 0; } + private: + Queue::shared_ptr queue; + std::queue<Uuid>& ids; +}; +boost::shared_ptr<TestConsumer> consumer; + +void setup() +{ + store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(&br)); + store->init(test_dir, 4, 1, true); // truncate store + + queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0)); + queue->create(); + consumer = boost::shared_ptr<TestConsumer>(new TestConsumer(queue, ids)); +} + +void push() +{ + Uuid messageId(true); + ids.push(messageId); + + Message msg = MessageUtils::createMessage("exchange", "routing_key", messageId, true, 0); + + queue->deliver(msg); +} + +bool pop() +{ + return queue->dispatch(consumer); +} + +void restart() +{ + queue.reset(); + store.reset(); + + store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(&br)); + store->init(test_dir, 4, 1); + ExchangeRegistry exchanges; + LinkRegistry links; + sys::Timer t; + DtxManager mgr(t); + mgr.setStore (store.get()); + RecoveryManagerImpl recoveryMgr(queues, exchanges, links, mgr, br.getProtocolRegistry()); + store->recover(recoveryMgr); + + queue = queues.find(name); + consumer = boost::shared_ptr<TestConsumer>(new TestConsumer(queue, ids)); +} + +void check() +{ + BOOST_REQUIRE(queue); + BOOST_CHECK_EQUAL((u_int32_t) ids.size(), queue->getMessageCount()); + while (pop()) ;//keeping popping 'till all messages are dequeued + BOOST_CHECK_EQUAL((u_int32_t) 0, queue->getMessageCount()); + BOOST_CHECK_EQUAL((size_t) 0, ids.size()); +} + + +// === Test suite === + +QPID_AUTO_TEST_CASE(Basic) +{ + SET_LOG_LEVEL("error+"); // This only needs to be set once. + + std::cout << test_filename << ".Basic: " << std::flush; + setup(); + //push on 10 messages + for (int i = 0; i < 10; i++) push(); + restart(); + check(); + std::cout << "ok" << std::endl; +} + +QPID_AUTO_TEST_CASE(Cycle) +{ + std::cout << test_filename << ".Cycle: " << std::flush; + setup(); + //push on 10 messages: + for (int i = 0; i < 10; i++) push(); + //pop 5: + for (int i = 0; i < 5; i++) pop(); + //push on another 5: + for (int i = 0; i < 5; i++) push(); + restart(); + check(); + std::cout << "ok" << std::endl; +} + +QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/legacystore/SimpleTest.cpp b/cpp/src/tests/legacystore/SimpleTest.cpp new file mode 100644 index 0000000000..a49333d876 --- /dev/null +++ b/cpp/src/tests/legacystore/SimpleTest.cpp @@ -0,0 +1,497 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "unit_test.h" + +#include "qpid/legacystore/MessageStoreImpl.h" +#include <iostream> +#include "tests/legacystore/MessageUtils.h" +#include "qpid/legacystore/StoreException.h" +#include "qpid/broker/DirectExchange.h" +#include <qpid/broker/Queue.h> +#include <qpid/broker/QueueSettings.h> +#include <qpid/broker/RecoveryManagerImpl.h> +#include <qpid/framing/AMQHeaderBody.h> +#include <qpid/framing/FieldTable.h> +#include <qpid/framing/FieldValue.h> +#include "qpid/log/Logger.h" +#include "qpid/sys/Timer.h" + +qpid::broker::Broker::Options opts; +qpid::broker::Broker br(opts); + +#define SET_LOG_LEVEL(level) \ + qpid::log::Options opts(""); \ + opts.selectors.clear(); \ + opts.selectors.push_back(level); \ + qpid::log::Logger::instance().configure(opts); + + +using boost::intrusive_ptr; +using boost::static_pointer_cast; +using namespace qpid; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace mrg::msgstore; +using namespace std; + +QPID_AUTO_TEST_SUITE(SimpleTest) + +const string test_filename("SimpleTest"); +const char* tdp = getenv("TMP_DATA_DIR"); +const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/SimpleTest"); + +// === Helper fns === + +struct DummyHandler : OutputHandler +{ + std::vector<AMQFrame> frames; + + virtual void send(AMQFrame& frame){ + frames.push_back(frame); + } +}; + +void recover(MessageStoreImpl& store, QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links) +{ + sys::Timer t; + DtxManager mgr(t); + mgr.setStore (&store); + RecoveryManagerImpl recovery(queues, exchanges, links, mgr, br.getProtocolRegistry()); + store.recover(recovery); +} + +void recover(MessageStoreImpl& store, ExchangeRegistry& exchanges) +{ + QueueRegistry queues; + LinkRegistry links; + recover(store, queues, exchanges, links); +} + +void recover(MessageStoreImpl& store, QueueRegistry& queues) +{ + ExchangeRegistry exchanges; + LinkRegistry links; + recover(store, queues, exchanges, links); +} + +void bindAndUnbind(const string& exchangeName, const string& queueName, + const string& key, const FieldTable& args) +{ + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args)); + Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0)); + store.create(*exchange, qpid::framing::FieldTable()); + store.create(*queue, qpid::framing::FieldTable()); + BOOST_REQUIRE(exchange->bind(queue, key, &args)); + store.bind(*exchange, *queue, key, args); + }//db will be closed + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + ExchangeRegistry exchanges; + QueueRegistry queues; + LinkRegistry links; + + recover(store, queues, exchanges, links); + + Exchange::shared_ptr exchange = exchanges.get(exchangeName); + Queue::shared_ptr queue = queues.find(queueName); + // check exchange args are still set + for (FieldTable::ValueMap::const_iterator i = args.begin(); i!=args.end(); i++) { + BOOST_CHECK(exchange->getArgs().get((*i).first)->getData() == (*i).second->getData()); + } + //check it is bound by unbinding + BOOST_REQUIRE(exchange->unbind(queue, key, &args)); + store.unbind(*exchange, *queue, key, args); + } + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + ExchangeRegistry exchanges; + QueueRegistry queues; + LinkRegistry links; + + recover(store, queues, exchanges, links); + + Exchange::shared_ptr exchange = exchanges.get(exchangeName); + Queue::shared_ptr queue = queues.find(queueName); + // check exchange args are still set + for (FieldTable::ValueMap::const_iterator i = args.begin(); i!=args.end(); i++) { + BOOST_CHECK(exchange->getArgs().get((*i).first)->getData() == (*i).second->getData()); + } + //make sure it is no longer bound + BOOST_REQUIRE(!exchange->unbind(queue, key, &args)); + } +} + + +// === Test suite === + +QPID_AUTO_TEST_CASE(CreateDelete) +{ + SET_LOG_LEVEL("error+"); // This only needs to be set once. + + cout << test_filename << ".CreateDelete: " << flush; + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + string name("CreateDeleteQueue"); + Queue queue(name, 0, &store, 0); + store.create(queue, qpid::framing::FieldTable()); +// TODO - check dir exists + BOOST_REQUIRE(queue.getPersistenceId()); + store.destroy(queue); +// TODO - check dir is deleted + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(EmptyRecover) +{ + cout << test_filename << ".EmptyRecover: " << flush; + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + QueueRegistry registry; + registry.setStore (&store); + recover(store, registry); + //nothing to assert, just testing it doesn't blow up + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(QueueCreate) +{ + cout << test_filename << ".QueueCreate: " << flush; + + uint64_t id(0); + string name("MyDurableQueue"); + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + Queue queue(name, 0, &store, 0); + store.create(queue, qpid::framing::FieldTable()); + BOOST_REQUIRE(queue.getPersistenceId()); + id = queue.getPersistenceId(); + }//db will be closed + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + QueueRegistry registry; + registry.setStore (&store); + recover(store, registry); + Queue::shared_ptr queue = registry.find(name); + BOOST_REQUIRE(queue.get()); + BOOST_CHECK_EQUAL(id, queue->getPersistenceId()); + } + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(QueueCreateWithSettings) +{ + cout << test_filename << ".QueueCreateWithSettings: " << flush; + + FieldTable arguments; + arguments.setInt("qpid.max_count", 202); + arguments.setInt("qpid.max_size", 1003); + QueueSettings settings; + settings.populate(arguments, settings.storeSettings); + string name("MyDurableQueue"); + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + Queue queue(name, settings, &store, 0); + queue.create(); + BOOST_REQUIRE(queue.getPersistenceId()); + }//db will be closed + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + QueueRegistry registry; + registry.setStore (&store); + recover(store, registry); + Queue::shared_ptr queue = registry.find(name); + BOOST_REQUIRE(queue); + BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), 202); + BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), 1003); + BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), queue->getSettings().maxDepth.getCount()); + BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), queue->getSettings().maxDepth.getSize()); + } + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(QueueDestroy) +{ + cout << test_filename << ".QueueDestroy: " << flush; + + string name("MyDurableQueue"); + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + Queue queue(name, 0, &store, 0); + store.create(queue, qpid::framing::FieldTable()); + store.destroy(queue); + }//db will be closed + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + QueueRegistry registry; + registry.setStore (&store); + recover(store, registry); + BOOST_REQUIRE(!registry.find(name)); + } + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(Enqueue) +{ + cout << test_filename << ".Enqueue: " << flush; + + //TODO: this is largely copy & paste'd from MessageTest in + //qpid tree. ideally need some helper routines for reducing + //this to a simpler less duplicated form + + string name("MyDurableQueue"); + string exchange("MyExchange"); + string routingKey("MyRoutingKey"); + Uuid messageId(true); + string data1("abcdefg"); + string data2("hijklmn"); + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + Queue::shared_ptr queue(new Queue(name, 0, &store, 0)); + queue->create(); + + Message msg = MessageUtils::createMessage(exchange, routingKey, messageId, true, 14); + MessageUtils::addContent(msg, data1); + MessageUtils::addContent(msg, data2); + + msg.addAnnotation("abc", "xyz"); + + queue->deliver(msg); + }//db will be closed + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + QueueRegistry registry; + registry.setStore (&store); + recover(store, registry); + Queue::shared_ptr queue = registry.find(name); + BOOST_REQUIRE(queue); + BOOST_CHECK_EQUAL((u_int32_t) 1, queue->getMessageCount()); + Message msg = MessageUtils::get(*queue); + + BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey()); + BOOST_CHECK_EQUAL(messageId, MessageUtils::getMessageId(msg)); + BOOST_CHECK_EQUAL(std::string("xyz"), msg.getAnnotation("abc")); + BOOST_CHECK_EQUAL((u_int64_t) 14, msg.getContentSize()); + + DummyHandler handler; + MessageUtils::deliver(msg, handler, 100); + BOOST_CHECK_EQUAL((size_t) 2, handler.frames.size()); + AMQContentBody* contentBody(dynamic_cast<AMQContentBody*>(handler.frames[1].getBody())); + BOOST_REQUIRE(contentBody); + BOOST_CHECK_EQUAL(data1.size() + data2.size(), contentBody->getData().size()); + BOOST_CHECK_EQUAL(data1 + data2, contentBody->getData()); + } + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(Dequeue) +{ + cout << test_filename << ".Dequeue: " << flush; + + //TODO: reduce the duplication in these tests + string name("MyDurableQueue"); + { + string exchange("MyExchange"); + string routingKey("MyRoutingKey"); + Uuid messageId(true); + string data("abcdefg"); + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + Queue::shared_ptr queue(new Queue(name, 0, &store, 0)); + queue->create(); + + Message msg = MessageUtils::createMessage(exchange, routingKey, messageId, true, 7); + MessageUtils::addContent(msg, data); + + queue->deliver(msg); + + QueueCursor cursor; + MessageUtils::get(*queue, &cursor); + queue->dequeue(0, cursor); + }//db will be closed + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + QueueRegistry registry; + registry.setStore (&store); + recover(store, registry); + Queue::shared_ptr queue = registry.find(name); + BOOST_REQUIRE(queue); + BOOST_CHECK_EQUAL((u_int32_t) 0, queue->getMessageCount()); + } + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy) +{ + cout << test_filename << ".ExchangeCreateAndDestroy: " << flush; + + uint64_t id(0); + string name("MyDurableExchange"); + string type("direct"); + FieldTable args; + args.setString("a", "A"); + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + ExchangeRegistry registry; + Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first; + store.create(*exchange, qpid::framing::FieldTable()); + id = exchange->getPersistenceId(); + BOOST_REQUIRE(id); + }//db will be closed + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + ExchangeRegistry registry; + + recover(store, registry); + + Exchange::shared_ptr exchange = registry.get(name); + BOOST_CHECK_EQUAL(id, exchange->getPersistenceId()); + BOOST_CHECK_EQUAL(type, exchange->getType()); + BOOST_REQUIRE(exchange->isDurable()); + BOOST_CHECK_EQUAL(*args.get("a"), *exchange->getArgs().get("a")); + store.destroy(*exchange); + } + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + ExchangeRegistry registry; + + recover(store, registry); + + try { + Exchange::shared_ptr exchange = registry.get(name); + BOOST_FAIL("Expected exchange not to be found"); + } catch (const SessionException& e) { + BOOST_CHECK_EQUAL((framing::ReplyCode) 404, e.code); + } + } + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(ExchangeBindAndUnbind) +{ + cout << test_filename << ".ExchangeBindAndUnbind: " << flush; + + bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", FieldTable()); + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgs) +{ + cout << test_filename << ".ExchangeBindAndUnbindWithArgs: " << flush; + + FieldTable args; + args.setString("a", "A"); + args.setString("b", "B"); + bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", args); + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind) +{ + cout << test_filename << ".ExchangeImplicitUnbind: " << flush; + + string exchangeName("MyDurableExchange"); + string queueName1("MyDurableQueue1"); + string queueName2("MyDurableQueue2"); + string key("my-routing-key"); + FieldTable args; + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args)); + Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0)); + Queue::shared_ptr queue2(new Queue(queueName2, 0, &store, 0)); + store.create(*exchange, qpid::framing::FieldTable()); + store.create(*queue1, qpid::framing::FieldTable()); + store.create(*queue2, qpid::framing::FieldTable()); + store.bind(*exchange, *queue1, key, args); + store.bind(*exchange, *queue2, key, args); + //delete queue1: + store.destroy(*queue1); + }//db will be closed + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + ExchangeRegistry exchanges; + QueueRegistry queues; + LinkRegistry links; + + //ensure recovery works ok: + recover(store, queues, exchanges, links); + + Exchange::shared_ptr exchange = exchanges.get(exchangeName); + BOOST_REQUIRE(!queues.find(queueName1).get()); + BOOST_REQUIRE(queues.find(queueName2).get()); + + //delete exchange: + store.destroy(*exchange); + } + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + ExchangeRegistry exchanges; + QueueRegistry queues; + LinkRegistry links; + + //ensure recovery works ok: + recover(store, queues, exchanges, links); + + try { + Exchange::shared_ptr exchange = exchanges.get(exchangeName); + BOOST_FAIL("Expected exchange not to be found"); + } catch (const SessionException& e) { + BOOST_CHECK_EQUAL((framing::ReplyCode) 404, e.code); + } + Queue::shared_ptr queue = queues.find(queueName2); + store.destroy(*queue); + } + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/legacystore/TestFramework.cpp b/cpp/src/tests/legacystore/TestFramework.cpp new file mode 100644 index 0000000000..2f7faf7682 --- /dev/null +++ b/cpp/src/tests/legacystore/TestFramework.cpp @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// Defines broker to be used by tests + +#include "unit_test.h" +#include "TestFramework.h" +#include "qpid/broker/Broker.h" + +#include <iostream> + +//BOOST_GLOBAL_FIXTURE( testBroker ) diff --git a/cpp/src/tests/legacystore/TestFramework.h b/cpp/src/tests/legacystore/TestFramework.h new file mode 100644 index 0000000000..f3066db602 --- /dev/null +++ b/cpp/src/tests/legacystore/TestFramework.h @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// Defines broker to be used by tests + +#include "unit_test.h" + +#include <qpid/broker/Broker.h> + +namespace { + // test broker + qpid::broker::Broker::Options opts; + qpid::broker::Broker br(opts); +/* + struct testBroker { + testBroker() {} + ~testBroker() {} + };*/ +} diff --git a/cpp/src/tests/legacystore/TransactionalTest.cpp b/cpp/src/tests/legacystore/TransactionalTest.cpp new file mode 100644 index 0000000000..2d3f6f922c --- /dev/null +++ b/cpp/src/tests/legacystore/TransactionalTest.cpp @@ -0,0 +1,351 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "unit_test.h" + +#include "qpid/legacystore/MessageStoreImpl.h" +#include <iostream> +#include "MessageUtils.h" +#include "qpid/legacystore/StoreException.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/RecoveryManagerImpl.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/log/Statement.h" +#include "qpid/log/Logger.h" +#include "qpid/sys/Timer.h" + +using namespace mrg::msgstore; +using namespace qpid; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace std; + +namespace { +qpid::broker::Broker::Options opts; +qpid::broker::Broker br(opts); +} + +QPID_AUTO_TEST_SUITE(TransactionalTest) + +#define SET_LOG_LEVEL(level) \ + qpid::log::Options opts(""); \ + opts.selectors.clear(); \ + opts.selectors.push_back(level); \ + qpid::log::Logger::instance().configure(opts); + +const string test_filename("TransactionalTest"); +const char* tdp = getenv("TMP_DATA_DIR"); +const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/TransactionalTest"); + +// Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning +class TestTxnCtxt : public TxnCtxt +{ + public: + TestTxnCtxt(IdSequence* _loggedtx) : TxnCtxt(_loggedtx) {} + void setCompleteFailure(const unsigned num_queues_rem) { + // Remove queue members from back of impactedQueues until queues_rem reamin. + // to end to simulate multi-queue txn complete failure. + while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin()); + } + void resetPreparedXidStorePtr() { preparedXidStorePtr = 0; } +}; + +// Test store which has special begin() which returns a TestTPCTxnCtxt, and a method to check for +// remaining open transactions. +// begin(), commit(), and abort() all hide functions in MessageStoreImpl. To avoid the compiler +// warnings/errors these are renamed with a 'TMS' prefix. +class TestMessageStore: public MessageStoreImpl +{ + public: + TestMessageStore(qpid::broker::Broker* br, const char* envpath = 0) : MessageStoreImpl(br, envpath) {} + std::auto_ptr<qpid::broker::TransactionContext> TMSbegin() { + checkInit(); + // pass sequence number for c/a + return auto_ptr<TransactionContext>(new TestTxnCtxt(&messageIdSequence)); + } + void TMScommit(TransactionContext& ctxt, const bool complete_prepared_list) { + checkInit(); + TxnCtxt* txn(check(&ctxt)); + if (!txn->isTPC()) { + localPrepare(dynamic_cast<TxnCtxt*>(txn)); + if (!complete_prepared_list) dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr(); + } + completed(*dynamic_cast<TxnCtxt*>(txn), true); + } + void TMSabort(TransactionContext& ctxt, const bool complete_prepared_list) + { + checkInit(); + TxnCtxt* txn(check(&ctxt)); + if (!txn->isTPC()) { + localPrepare(dynamic_cast<TxnCtxt*>(txn)); + if (!complete_prepared_list) dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr(); + } + completed(*dynamic_cast<TxnCtxt*>(txn), false); + } +}; + +// === Helper fns === + +const string nameA("queueA"); +const string nameB("queueB"); +//const Uuid messageId(true); +std::auto_ptr<MessageStoreImpl> store; +std::auto_ptr<QueueRegistry> queues; +Queue::shared_ptr queueA; +Queue::shared_ptr queueB; + +template <class T> +void setup() +{ + store = std::auto_ptr<T>(new T(&br)); + store->init(test_dir, 4, 1, true); // truncate store + + //create two queues: + queueA = Queue::shared_ptr(new Queue(nameA, 0, store.get(), 0)); + queueA->create(); + queueB = Queue::shared_ptr(new Queue(nameB, 0, store.get(), 0)); + queueB->create(); +} + +template <class T> +void restart() +{ + queueA.reset(); + queueB.reset(); + queues.reset(); + store.reset(); + + store = std::auto_ptr<T>(new T(&br)); + store->init(test_dir, 4, 1); + queues = std::auto_ptr<QueueRegistry>(new QueueRegistry); + ExchangeRegistry exchanges; + LinkRegistry links; + sys::Timer t; + DtxManager mgr(t); + mgr.setStore (store.get()); + RecoveryManagerImpl recovery(*queues, exchanges, links, mgr, br.getProtocolRegistry()); + store->recover(recovery); + + queueA = queues->find(nameA); + queueB = queues->find(nameB); +} + +Message createMessage(const string& id, const string& exchange="exchange", const string& key="routing_key") +{ + return MessageUtils::createMessage(exchange, key, Uuid(), true, 0, id); +} + +void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid = "<none>") +{ + BOOST_REQUIRE(queue); + BOOST_CHECK_EQUAL(size, queue->getMessageCount()); + if (size > 0) { + Message msg = MessageUtils::get(*queue); + BOOST_REQUIRE(msg); + BOOST_CHECK_EQUAL(msgid, MessageUtils::getCorrelationId(msg)); + } +} + +void swap(bool commit) +{ + setup<MessageStoreImpl>(); + + //create message and enqueue it onto first queue: + Message msgA = createMessage("Message", "exchange", "routing_key"); + queueA->deliver(msgA); + + QueueCursor cursorB; + Message msgB = MessageUtils::get(*queueA, &cursorB); + BOOST_REQUIRE(msgB); + //move the message from one queue to the other as a transaction + std::auto_ptr<TransactionContext> txn = store->begin(); + TxBuffer tx; + queueB->deliver(msgB, &tx);//note: need to enqueue it first to avoid message being deleted + + queueA->dequeue(txn.get(), cursorB); + tx.prepare(txn.get()); + if (commit) { + store->commit(*txn); + } else { + store->abort(*txn); + } + + restart<MessageStoreImpl>(); + + // Check outcome + BOOST_REQUIRE(queueA); + BOOST_REQUIRE(queueB); + + Queue::shared_ptr x;//the queue from which the message was swapped + Queue::shared_ptr y;//the queue on which the message is expected to be + + if (commit) { + x = queueA; + y = queueB; + } else { + x = queueB; + y = queueA; + } + + checkMsg(x, 0); + checkMsg(y, 1, "Message"); + checkMsg(y, 0); +} + +void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list, const bool commit) +{ + setup<TestMessageStore>(); + TestMessageStore* tmsp = static_cast<TestMessageStore*>(store.get()); + std::auto_ptr<TransactionContext> txn(tmsp->TMSbegin()); + TxBuffer tx; + + //create two messages and enqueue them onto both queues: + Message msgA = createMessage("MessageA", "exchange", "routing_key"); + queueA->deliver(msgA, &tx); + queueB->deliver(msgA, &tx); + Message msgB = createMessage("MessageB", "exchange", "routing_key"); + queueA->deliver(msgB, &tx); + queueB->deliver(msgB, &tx); + + tx.prepare(txn.get()); + static_cast<TestTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem); + if (commit) + tmsp->TMScommit(*txn, complete_prepared_list); + else + tmsp->TMSabort(*txn, complete_prepared_list); + restart<TestMessageStore>(); + + // Check outcome + if (commit) + { + checkMsg(queueA, 2, "MessageA"); + checkMsg(queueB, 2, "MessageA"); + checkMsg(queueA, 1, "MessageB"); + checkMsg(queueB, 1, "MessageB"); + } + checkMsg(queueA, 0); + checkMsg(queueB, 0); +} + +// === Test suite === + +QPID_AUTO_TEST_CASE(Commit) +{ + SET_LOG_LEVEL("error+"); // This only needs to be set once. + + cout << test_filename << ".Commit: " << flush; + swap(true); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(Abort) +{ + cout << test_filename << ".Abort: " << flush; + swap(false); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(MultiQueueCommit) +{ + cout << test_filename << ".MultiQueueCommit: " << flush; + testMultiQueueTxn(2, true, true); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(MultiQueueAbort) +{ + cout << test_filename << ".MultiQueueAbort: " << flush; + testMultiQueueTxn(2, true, false); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover) +{ + cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush; + testMultiQueueTxn(0, false, true); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover) +{ + cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush; + testMultiQueueTxn(0, false, false); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover) +{ + cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush; + testMultiQueueTxn(1, false, true); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover) +{ + cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush; + testMultiQueueTxn(1, false, false); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover) +{ + cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush; + testMultiQueueTxn(2, false, true); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover) +{ + cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush; + testMultiQueueTxn(2, false, false); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(LockedRecordTest) +{ + cout << test_filename << ".LockedRecordTest: " << flush; + + setup<MessageStoreImpl>(); + queueA->deliver(createMessage("Message", "exchange", "routingKey")); + std::auto_ptr<TransactionContext> txn = store->begin(); + + QueueCursor cursor; + Message msg = MessageUtils::get(*queueA, &cursor); + queueA->dequeue(txn.get(), cursor); + + try { + store->dequeue(0, msg.getPersistentContext(), *queueA); + BOOST_ERROR("Did not throw JERR_MAP_LOCKED exception as expected."); + } + catch (const mrg::msgstore::StoreException& e) { + if (std::strstr(e.what(), "JERR_MAP_LOCKED") == 0) + BOOST_ERROR("Unexpected StoreException: " << e.what()); + } + catch (const std::exception& e) { + BOOST_ERROR("Unexpected exception: " << e.what()); + } + store->commit(*txn); + checkMsg(queueA, 0); + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/legacystore/TwoPhaseCommitTest.cpp b/cpp/src/tests/legacystore/TwoPhaseCommitTest.cpp new file mode 100644 index 0000000000..92e49df9e3 --- /dev/null +++ b/cpp/src/tests/legacystore/TwoPhaseCommitTest.cpp @@ -0,0 +1,675 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "unit_test.h" + +#include "qpid/legacystore/MessageStoreImpl.h" +#include <iostream> +#include "MessageUtils.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/RecoveryManagerImpl.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/log/Statement.h" +#include "qpid/legacystore/TxnCtxt.h" +#include "qpid/log/Logger.h" +#include "qpid/sys/Timer.h" + +using namespace mrg::msgstore; +using namespace qpid; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace std; + + +qpid::broker::Broker::Options opts; +qpid::broker::Broker br(opts); + + +QPID_AUTO_TEST_SUITE(TwoPhaseCommitTest) + +#define SET_LOG_LEVEL(level) \ + qpid::log::Options opts(""); \ + opts.selectors.clear(); \ + opts.selectors.push_back(level); \ + qpid::log::Logger::instance().configure(opts); + + +const string test_filename("TwoPhaseCommitTest"); +const char* tdp = getenv("TMP_DATA_DIR"); +string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/TwoPhaseCommitTest"); + +// === Helper fns === + +class TwoPhaseCommitTest +{ + + class Strategy + { + public: + virtual void init() = 0; + virtual void run(TPCTransactionContext* txn) = 0; + virtual void check(bool committed) = 0; + virtual ~Strategy(){} + }; + + class Swap : public Strategy + { + TwoPhaseCommitTest* const test; + const string messageId; + Message msg; + public: + Swap(TwoPhaseCommitTest* const test_, const string& messageId_): test(test_), messageId(messageId_) {} + void init(){ msg = test->deliver(messageId, test->queueA); } + void run(TPCTransactionContext* txn) { test->swap(txn, test->queueA, test->queueB); } + void check(bool committed) { test->swapCheck(committed, messageId, test->queueA, test->queueB); } + }; + + class Enqueue : public Strategy + { + TwoPhaseCommitTest* const test; + Message msg1; + Message msg2; + Message msg3; + public: + Enqueue(TwoPhaseCommitTest* const test_): test(test_) {} + void init() {} + void run(TPCTransactionContext* txn) { + msg1 = test->enqueue(txn, "Enqueue1", test->queueA); + msg2 = test->enqueue(txn, "Enqueue2", test->queueA); + msg3 = test->enqueue(txn, "Enqueue3", test->queueA); + } + void check(bool committed) { + if (committed) { + test->checkMsg(test->queueA, 3, "Enqueue1"); + test->checkMsg(test->queueA, 2, "Enqueue2"); + test->checkMsg(test->queueA, 1, "Enqueue3"); + } + test->checkMsg(test->queueA, 0); + } + }; + + class Dequeue : public Strategy + { + TwoPhaseCommitTest* const test; + Message msg1; + Message msg2; + Message msg3; + public: + Dequeue(TwoPhaseCommitTest* const test_): test(test_) {} + void init() { + msg1 = test->deliver("Dequeue1", test->queueA); + msg2 = test->deliver("Dequeue2", test->queueA); + msg3 = test->deliver("Dequeue3", test->queueA); + } + void run(TPCTransactionContext* txn) { + test->dequeue(txn, test->queueA); + test->dequeue(txn, test->queueA); + test->dequeue(txn, test->queueA); + } + void check(bool committed) { + if (!committed) { + test->checkMsg(test->queueA, 3, "Dequeue1"); + test->checkMsg(test->queueA, 2, "Dequeue2"); + test->checkMsg(test->queueA, 1, "Dequeue3"); + } + test->checkMsg(test->queueA, 0); + } + }; + + class MultiQueueTxn : public Strategy + { + TwoPhaseCommitTest* const test; + Message msg1; + Message msg2; + std::set<Queue::shared_ptr> queueset; + public: + MultiQueueTxn(TwoPhaseCommitTest* const test_): test(test_) {} + virtual void init() {} + virtual void run(TPCTransactionContext* txn) { + queueset.insert(test->queueA); + queueset.insert(test->queueB); + msg1 = test->enqueue(txn, "Message1", queueset); + msg2 = test->enqueue(txn, "Message2", queueset); + queueset.clear(); + } + virtual void check(bool committed) { + TestMessageStore* sptr = static_cast<TestMessageStore*>(test->store.get()); + if (committed) + { + test->checkMsg(test->queueA, 2, "Message1"); + test->checkMsg(test->queueB, 2, "Message1"); + test->checkMsg(test->queueA, 1, "Message2"); + test->checkMsg(test->queueB, 1, "Message2"); + } + test->checkMsg(test->queueA, 0); + test->checkMsg(test->queueB, 0); + // Check there are no remaining open txns in store + BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingTxns(*(test->queueA))); + BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingTxns(*(test->queueB))); + BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingPreparedListTxns()); + } + }; + + // Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning + class TestTPCTxnCtxt : public TPCTxnCtxt + { + public: + TestTPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TPCTxnCtxt(_xid, _loggedtx) {} + void setCompleteFailure(const unsigned num_queues_rem, const bool complete_prepared_list) { + // Remove queue members from back of impactedQueues until queues_rem reamin. + // to end to simulate multi-queue txn complete failure. + while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin()); + // If prepared list is not to be committed, set pointer to 0 + if (!complete_prepared_list) preparedXidStorePtr = 0; + } + }; + + // Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to check for + // reamining open transactions + class TestMessageStore: public MessageStoreImpl + { + public: + TestMessageStore(qpid::broker::Broker* br, const char* envpath = 0) : MessageStoreImpl(br, envpath) {} + std::auto_ptr<qpid::broker::TPCTransactionContext> TMSbegin(const std::string& xid) { + checkInit(); + IdSequence* jtx = &messageIdSequence; + // pass sequence number for c/a + return auto_ptr<TPCTransactionContext>(new TestTPCTxnCtxt(xid, jtx)); + } + u_int32_t getRemainingTxns(const PersistableQueue& queue) { + return static_cast<JournalImpl*>(queue.getExternalQueueStore())->get_open_txn_cnt(); + } + u_int32_t getRemainingPreparedListTxns() { + return tplStorePtr->get_open_txn_cnt(); + } + }; + + const string nameA; + const string nameB; + std::auto_ptr<MessageStoreImpl> store; + std::auto_ptr<DtxManager> dtxmgr; + std::auto_ptr<QueueRegistry> queues; + std::auto_ptr<LinkRegistry> links; + Queue::shared_ptr queueA; + Queue::shared_ptr queueB; + Message msg1; + Message msg2; + Message msg4; + std::auto_ptr<TxBuffer> tx; + + void recoverPrepared(bool commit) + { + setup<MessageStoreImpl>(); + + Swap swap(this, "RecoverPrepared"); + swap.init(); + std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid")); + swap.run(txn.get()); + if (tx.get()) { + tx->prepare(txn.get()); + tx.reset(); + } + + store->prepare(*txn); + restart<MessageStoreImpl>(); + + //check that the message is not available from either queue + BOOST_CHECK_EQUAL((u_int32_t) 0, queueA->getMessageCount()); + BOOST_CHECK_EQUAL((u_int32_t) 0, queueB->getMessageCount()); + + //commit/abort the txn - through the dtx manager, not directly on the store + if (commit) { + dtxmgr->commit("my-xid", false); + } else { + dtxmgr->rollback("my-xid"); + } + + swap.check(commit); + restart<MessageStoreImpl>(); + swap.check(commit); + } + + void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list, const bool commit) + { + setup<TestMessageStore>(); + MultiQueueTxn mqtTest(this); + mqtTest.init(); + std::auto_ptr<TPCTransactionContext> txn(static_cast<TestMessageStore*>(store.get())->begin("my-xid")); + mqtTest.run(txn.get()); + if (tx.get()) { + tx->prepare(txn.get()); + tx.reset(); + } + store->prepare(*txn); + + // As the commits and aborts should happen through DtxManager, and it is too complex to + // pass all these test params through, we bypass DtxManager and use the store directly. + // This will prevent the queues from seeing committed txns, however. To test the success + // or failure of + static_cast<TestTPCTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem, complete_prepared_list); + if (commit) + store->commit(*txn); + else + store->abort(*txn); + restart<TestMessageStore>(); + mqtTest.check(commit); + } + + void commit(Strategy& strategy) + { + setup<MessageStoreImpl>(); + strategy.init(); + + std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid")); + strategy.run(txn.get()); + if (tx.get()) { + tx->prepare(txn.get()); + tx.reset(); + } + store->prepare(*txn); + store->commit(*txn); + restart<MessageStoreImpl>(); + strategy.check(true); + } + + void abort(Strategy& strategy, bool prepare) + { + setup<MessageStoreImpl>(); + strategy.init(); + + std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid")); + strategy.run(txn.get()); + if (tx.get()) { + tx->prepare(txn.get()); + tx.reset(); + } + if (prepare) store->prepare(*txn); + store->abort(*txn); + restart<MessageStoreImpl>(); + strategy.check(false); + } + + void swap(TPCTransactionContext* txn, Queue::shared_ptr& from, Queue::shared_ptr& to) + { + QueueCursor c; + Message msg1 = MessageUtils::get(*from, &c);//just dequeues in memory + //move the message from one queue to the other as part of a + //distributed transaction + if (!tx.get()) tx = std::auto_ptr<TxBuffer>(new TxBuffer); + to->deliver(msg1, tx.get());//note: need to enqueue it first to avoid message being deleted + from->dequeue(txn, c); + } + + void dequeue(TPCTransactionContext* txn, Queue::shared_ptr& queue) + { + QueueCursor c; + Message msg2 = MessageUtils::get(*queue, &c);//just dequeues in memory + queue->dequeue(txn, c); + } + + Message enqueue(TPCTransactionContext* /*txn*/, const string& msgid, Queue::shared_ptr& queue) + { + Message msg = createMessage(msgid); + if (!tx.get()) tx = std::auto_ptr<TxBuffer>(new TxBuffer); + queue->deliver(msg, tx.get()); + return msg; + } + + Message enqueue(TPCTransactionContext* /*txn*/, const string& msgid, std::set<Queue::shared_ptr>& queueset) + { + if (!tx.get()) tx = std::auto_ptr<TxBuffer>(new TxBuffer); + Message msg = createMessage(msgid); + for (std::set<Queue::shared_ptr>::iterator i = queueset.begin(); i != queueset.end(); i++) { + (*i)->deliver(msg, tx.get()); + } + return msg; + } + + Message deliver(const string& msgid, Queue::shared_ptr& queue) + { + Message m = createMessage(msgid); + queue->deliver(m); + return m; + } + + template <class T> + void setup() + { + store = std::auto_ptr<T>(new T(&br)); + store->init(test_dir, 4, 1, true); // truncate store + + //create two queues: + queueA = Queue::shared_ptr(new Queue(nameA, 0, store.get(), 0)); + queueA->create(); + queueB = Queue::shared_ptr(new Queue(nameB, 0, store.get(), 0)); + queueB->create(); + } + + Message createMessage(const string& id, const string& exchange="exchange", const string& key="routing_key") + { + Message msg = MessageUtils::createMessage(exchange, key, Uuid(), true, 0, id); + return msg; + } + + template <class T> + void restart() + { + queueA.reset(); + queueB.reset(); + store.reset(); + queues.reset(); + links.reset(); + + store = std::auto_ptr<T>(new T(&br)); + store->init(test_dir, 4, 1); + sys::Timer t; + ExchangeRegistry exchanges; + queues = std::auto_ptr<QueueRegistry>(new QueueRegistry); + links = std::auto_ptr<LinkRegistry>(new LinkRegistry); + dtxmgr = std::auto_ptr<DtxManager>(new DtxManager(t)); + dtxmgr->setStore (store.get()); + RecoveryManagerImpl recovery(*queues, exchanges, *links, *dtxmgr, br.getProtocolRegistry()); + store->recover(recovery); + + queueA = queues->find(nameA); + queueB = queues->find(nameB); + } + + void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid = "<none>") + { + BOOST_REQUIRE(queue); + BOOST_CHECK_EQUAL(size, queue->getMessageCount()); + if (size > 0) { + Message msg = MessageUtils::get(*queue); + BOOST_REQUIRE(msg); + BOOST_CHECK_EQUAL(msgid, MessageUtils::getCorrelationId(msg)); + } + } + + void swapCheck(bool swapped, const string& msgid, Queue::shared_ptr& from, Queue::shared_ptr& to) + { + BOOST_REQUIRE(from); + BOOST_REQUIRE(to); + + Queue::shared_ptr x; //the queue from which the message was swapped + Queue::shared_ptr y; //the queue on which the message is expected to be + + if (swapped) { + x = from; + y = to; + } else { + x = to; + y = from; + } + + checkMsg(x, 0); + checkMsg(y, 1, msgid); + checkMsg(y, 0); + } + +public: + TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB") {} + + void testCommitEnqueue() + { + Enqueue enqueue(this); + commit(enqueue); + } + + void testCommitDequeue() + { + Dequeue dequeue(this); + commit(dequeue); + } + + void testCommitSwap() + { + Swap swap(this, "SwapMessageId"); + commit(swap); + } + + void testPrepareAndAbortEnqueue() + { + Enqueue enqueue(this); + abort(enqueue, true); + } + + void testPrepareAndAbortDequeue() + { + Dequeue dequeue(this); + abort(dequeue, true); + } + + void testPrepareAndAbortSwap() + { + Swap swap(this, "SwapMessageId"); + abort(swap, true); + } + + void testAbortNoPrepareEnqueue() + { + Enqueue enqueue(this); + abort(enqueue, false); + } + + void testAbortNoPrepareDequeue() + { + Dequeue dequeue(this); + abort(dequeue, false); + } + + void testAbortNoPrepareSwap() + { + Swap swap(this, "SwapMessageId"); + abort(swap, false); + } + + void testRecoverPreparedThenCommitted() + { + recoverPrepared(true); + } + + void testRecoverPreparedThenAborted() + { + recoverPrepared(false); + } + + void testMultiQueueCommit() + { + testMultiQueueTxn(2, true, true); + } + + void testMultiQueueAbort() + { + testMultiQueueTxn(2, true, false); + } + + void testMultiQueueNoQueueCommitRecover() + { + testMultiQueueTxn(0, false, true); + } + + void testMultiQueueNoQueueAbortRecover() + { + testMultiQueueTxn(0, false, false); + } + + void testMultiQueueSomeQueueCommitRecover() + { + testMultiQueueTxn(1, false, true); + } + + void testMultiQueueSomeQueueAbortRecover() + { + testMultiQueueTxn(1, false, false); + } + + void testMultiQueueAllQueueCommitRecover() + { + testMultiQueueTxn(2, false, true); + } + + void testMultiQueueAllQueueAbortRecover() + { + testMultiQueueTxn(2, false, false); + } +}; + +TwoPhaseCommitTest tpct; + +// === Test suite === + +QPID_AUTO_TEST_CASE(CommitEnqueue) +{ + SET_LOG_LEVEL("error+"); // This only needs to be set once. + + cout << test_filename << ".CommitEnqueue: " << flush; + tpct.testCommitEnqueue(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(CommitDequeue) +{ + cout << test_filename << ".CommitDequeue: " << flush; + tpct.testCommitDequeue(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(CommitSwap) +{ + cout << test_filename << ".CommitSwap: " << flush; + tpct.testCommitSwap(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueue) +{ + cout << test_filename << ".PrepareAndAbortEnqueue: " << flush; + tpct.testPrepareAndAbortEnqueue(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(PrepareAndAbortDequeue) +{ + cout << test_filename << ".PrepareAndAbortDequeue: " << flush; + tpct.testPrepareAndAbortDequeue(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(PrepareAndAbortSwap) +{ + cout << test_filename << ".PrepareAndAbortSwap: " << flush; + tpct.testPrepareAndAbortSwap(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueue) +{ + cout << test_filename << ".AbortNoPrepareEnqueue: " << flush; + tpct.testAbortNoPrepareEnqueue(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(AbortNoPrepareDequeue) +{ + cout << test_filename << ".AbortNoPrepareDequeue: " << flush; + tpct.testAbortNoPrepareDequeue(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(AbortNoPrepareSwap) +{ + cout << test_filename << ".AbortNoPrepareSwap: " << flush; + tpct.testAbortNoPrepareSwap(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(RecoverPreparedThenCommitted) +{ + cout << test_filename << ".RecoverPreparedThenCommitted: " << flush; + tpct.testRecoverPreparedThenCommitted(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(RecoverPreparedThenAborted) +{ + cout << test_filename << ".RecoverPreparedThenAborted: " << flush; + tpct.testRecoverPreparedThenAborted(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(MultiQueueCommit) +{ + cout << test_filename << ".MultiQueueCommit: " << flush; + tpct.testMultiQueueCommit(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(MultiQueueAbort) +{ + cout << test_filename << ".MultiQueueAbort: " << flush; + tpct.testMultiQueueAbort(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover) +{ + cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush; + tpct.testMultiQueueNoQueueCommitRecover(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover) +{ + cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush; + tpct.testMultiQueueNoQueueAbortRecover(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover) +{ + cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush; + tpct.testMultiQueueSomeQueueCommitRecover(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover) +{ + cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush; + tpct.testMultiQueueSomeQueueAbortRecover(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover) +{ + cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush; + tpct.testMultiQueueAllQueueCommitRecover(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover) +{ + cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush; + tpct.testMultiQueueAllQueueAbortRecover(); + cout << "ok" << endl; +} + +QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/legacystore/clean.sh b/cpp/src/tests/legacystore/clean.sh new file mode 100644 index 0000000000..efb19586fa --- /dev/null +++ b/cpp/src/tests/legacystore/clean.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This script cleans up any previous database and journal files, and should +# be run prior to the store system tests, as these are prone to crashing or +# hanging under some circumstances if the database is old or inconsistent. + +if [ -d ${TMP_DATA_DIR} ]; then + rm -rf ${TMP_DATA_DIR} +fi +if [ -d ${TMP_PYTHON_TEST_DIR} ]; then + rm -rf ${TMP_PYTHON_TEST_DIR} +fi +rm -f ${abs_srcdir}/*.vglog* diff --git a/cpp/src/tests/legacystore/persistence.py b/cpp/src/tests/legacystore/persistence.py new file mode 100644 index 0000000000..c4ab712f14 --- /dev/null +++ b/cpp/src/tests/legacystore/persistence.py @@ -0,0 +1,574 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys, re, traceback, socket +from getopt import getopt, GetoptError + +from qpid.connection import Connection +from qpid.util import connect +from qpid.datatypes import Message, RangedSet +from qpid.queue import Empty +from qpid.session import SessionException +from qpid.testlib import TestBase010 +from time import sleep + +class PersistenceTest(TestBase010): + + XA_RBROLLBACK = 1 + XA_RBTIMEOUT = 2 + XA_OK = 0 + + def createMessage(self, **kwargs): + session = self.session + dp = {} + dp['delivery_mode'] = 2 + mp = {} + for k, v in kwargs.iteritems(): + if k in ['routing_key', 'delivery_mode']: dp[k] = v + if k in ['message_id', 'correlation_id', 'application_headers']: mp[k] = v + args = [] + args.append(session.delivery_properties(**dp)) + if len(mp): + args.append(session.message_properties(**mp)) + if kwargs.has_key('body'): args.append(kwargs['body']) + return Message(*args) + + def phase1(self): + session = self.session + + session.queue_declare(queue="queue-a", durable=True) + session.queue_declare(queue="queue-b", durable=True) + session.exchange_bind(queue="queue-a", exchange="amq.direct", binding_key="a") + session.exchange_bind(queue="queue-b", exchange="amq.direct", binding_key="b") + + session.message_transfer(destination="amq.direct", + message=self.createMessage(routing_key="a", correlation_id="Msg0001", body="A_Message1")) + session.message_transfer(destination="amq.direct", + message=self.createMessage(routing_key="b", correlation_id="Msg0002", body="B_Message1")) + +# session.queue_declare(queue="lvq-test", durable=True, arguments={"qpid.last_value_queue":True}) +# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B1")) +# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A1")) +# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A2")) +# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B2")) +# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B3")) +# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C1")) + + + + def phase2(self): + session = self.session + + #check queues exists + session.queue_declare(queue="queue-a", durable=True, passive=True) + session.queue_declare(queue="queue-b", durable=True, passive=True) + + #check they are still bound to amq.direct correctly + responses = [] + responses.append(session.exchange_bound(queue="queue-a", exchange="amq.direct", binding_key="a")) + responses.append(session.exchange_bound(queue="queue-b", exchange="amq.direct", binding_key="b")) + for r in responses: + self.assert_(not r.exchange_not_found) + self.assert_(not r.queue_not_found) + self.assert_(not r.key_not_matched) + + + #check expected messages are there + self.assertMessageOnQueue("queue-a", "Msg0001", "A_Message1") + self.assertMessageOnQueue("queue-b", "Msg0002", "B_Message1") + + self.assertEmptyQueue("queue-a") + self.assertEmptyQueue("queue-b") + + session.queue_declare(queue="queue-c", durable=True) + + #send a message to a topic such that it reaches all queues + session.exchange_bind(queue="queue-a", exchange="amq.topic", binding_key="abc") + session.exchange_bind(queue="queue-b", exchange="amq.topic", binding_key="abc") + session.exchange_bind(queue="queue-c", exchange="amq.topic", binding_key="abc") + + session.message_transfer(destination="amq.topic", + message=self.createMessage(routing_key="abc", correlation_id="Msg0003", body="AB_Message2")) + +# #check LVQ exists and has exepected messages: +# session.queue_declare(queue="lvq-test", durable=True, passive=True) +# session.message_subscribe(destination="lvq", queue="lvq-test") +# lvq = session.incoming("lvq") +# lvq.start() +# accepted = RangedSet() +# for m in ["A2", "B3", "C1"]: +# msg = lvq.get(timeout=1) +# self.assertEquals(m, msg.body) +# accepted.add(msg.id) +# try: +# extra = lvq.get(timeout=1) +# self.fail("lvq-test not empty, contains: " + extra.body) +# except Empty: None +# #publish some more messages while subscriber is active (no replacement): +# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C2")) +# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C3")) +# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A3")) +# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A4")) +# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C4")) +# #check that accepting replaced messages is safe +# session.message_accept(accepted) + + + def phase3(self): + session = self.session + +# #lvq recovery validation +# session.queue_declare(queue="lvq-test", durable=True, passive=True) +# session.message_subscribe(destination="lvq", queue="lvq-test") +# lvq = session.incoming("lvq") +# lvq.start() +# accepted = RangedSet() +# lvq.start() +# for m in ["C4", "A4"]: +# msg = lvq.get(timeout=1) +# self.assertEquals(m, msg.body) +# accepted.add(msg.id) +# session.message_accept(accepted) +# try: +# extra = lvq.get(timeout=1) +# self.fail("lvq-test not empty, contains: " + extra.body) +# except Empty: None +# session.message_cancel(destination="lvq") +# session.queue_delete(queue="lvq-test") + + + #check queues exists + session.queue_declare(queue="queue-a", durable=True, passive=True) + session.queue_declare(queue="queue-b", durable=True, passive=True) + session.queue_declare(queue="queue-c", durable=True, passive=True) + + session.tx_select() + #check expected messages are there + self.assertMessageOnQueue("queue-a", "Msg0003", "AB_Message2") + self.assertMessageOnQueue("queue-b", "Msg0003", "AB_Message2") + self.assertMessageOnQueue("queue-c", "Msg0003", "AB_Message2") + + self.assertEmptyQueue("queue-a") + self.assertEmptyQueue("queue-b") + self.assertEmptyQueue("queue-c") + + #note: default bindings must be restored for this to work + session.message_transfer(message=self.createMessage( + routing_key="queue-a", correlation_id="Msg0004", body="A_Message3")) + session.message_transfer(message=self.createMessage( + routing_key="queue-a", correlation_id="Msg0005", body="A_Message4")) + session.message_transfer(message=self.createMessage( + routing_key="queue-a", correlation_id="Msg0006", body="A_Message5")) + + session.tx_commit() + + + #delete a queue + session.queue_delete(queue="queue-c") + + session.message_subscribe(destination="ctag", queue="queue-a", accept_mode=0) + session.message_flow(destination="ctag", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="ctag", unit=1, value=0xFFFFFFFF) + included = session.incoming("ctag") + msg1 = included.get(timeout=1) + self.assertExpectedContent(msg1, "Msg0004", "A_Message3") + msg2 = included.get(timeout=1) + self.assertExpectedContent(msg2, "Msg0005", "A_Message4") + msg3 = included.get(timeout=1) + self.assertExpectedContent(msg3, "Msg0006", "A_Message5") + self.ack(msg1, msg2, msg3) + + session.message_transfer(destination="amq.direct", message=self.createMessage( + routing_key="queue-b", correlation_id="Msg0007", body="B_Message3")) + + session.tx_rollback() + + + def phase4(self): + session = self.session + + #check queues exists + session.queue_declare(queue="queue-a", durable=True, passive=True) + session.queue_declare(queue="queue-b", durable=True, passive=True) + + self.assertMessageOnQueue("queue-a", "Msg0004", "A_Message3") + self.assertMessageOnQueue("queue-a", "Msg0005", "A_Message4") + self.assertMessageOnQueue("queue-a", "Msg0006", "A_Message5") + + self.assertEmptyQueue("queue-a") + self.assertEmptyQueue("queue-b") + + #check this queue doesn't exist + try: + session.queue_declare(queue="queue-c", durable=True, passive=True) + raise Exception("Expected queue-c to have been deleted") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + def phase5(self): + + session = self.session + queues = ["queue-a1", "queue-a2", "queue-b1", "queue-b2", "queue-c1", "queue-c2", "queue-d1", "queue-d2"] + + for q in queues: + session.queue_declare(queue=q, durable=True) + session.queue_purge(queue=q) + + session.message_transfer(message=self.createMessage( + routing_key="queue-a1", correlation_id="MsgA", body="MessageA")) + session.message_transfer(message=self.createMessage( + routing_key="queue-b1", correlation_id="MsgB", body="MessageB")) + session.message_transfer(message=self.createMessage( + routing_key="queue-c1", correlation_id="MsgC", body="MessageC")) + session.message_transfer(message=self.createMessage( + routing_key="queue-d1", correlation_id="MsgD", body="MessageD")) + + session.dtx_select() + txa = self.xid('a') + txb = self.xid('b') + txc = self.xid('c') + txd = self.xid('d') + + self.txswap("queue-a1", "queue-a2", txa) + self.txswap("queue-b1", "queue-b2", txb) + self.txswap("queue-c1", "queue-c2", txc) + self.txswap("queue-d1", "queue-d2", txd) + + #no queue should have any messages accessible + for q in queues: + self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad count for %s" % (q)) + + self.assertEqual(self.XA_OK, session.dtx_commit(xid=txa, one_phase=True).status) + self.assertEqual(self.XA_OK, session.dtx_rollback(xid=txb).status) + self.assertEqual(self.XA_OK, session.dtx_prepare(xid=txc).status) + self.assertEqual(self.XA_OK, session.dtx_prepare(xid=txd).status) + + #further checks + not_empty = ["queue-a2", "queue-b1"] + for q in queues: + if q in not_empty: + self.assertEqual(1, session.queue_query(queue=q).message_count, "Bad count for %s" % (q)) + else: + self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad count for %s" % (q)) + + + def phase6(self): + session = self.session + + #check prepared transaction are reported correctly by recover + txc = self.xid('c') + txd = self.xid('d') + + xids = session.dtx_recover().in_doubt + ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these + + if txc.global_id not in ids: + self.fail("Recovered xids not as expected. missing: %s" % (txc)) + if txd.global_id not in ids: + self.fail("Recovered xids not as expected. missing: %s" % (txd)) + self.assertEqual(2, len(xids)) + + + queues = ["queue-a1", "queue-a2", "queue-b1", "queue-b2", "queue-c1", "queue-c2", "queue-d1", "queue-d2"] + not_empty = ["queue-a2", "queue-b1"] + + #re-check + not_empty = ["queue-a2", "queue-b1"] + for q in queues: + if q in not_empty: + self.assertEqual(1, session.queue_query(queue=q).message_count, "Bad count for %s" % (q)) + else: + self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad count for %s" % (q)) + + #complete the prepared transactions + self.assertEqual(self.XA_OK, session.dtx_commit(xid=txc).status) + self.assertEqual(self.XA_OK, session.dtx_rollback(xid=txd).status) + not_empty.append("queue-c2") + not_empty.append("queue-d1") + + for q in queues: + if q in not_empty: + self.assertEqual(1, session.queue_query(queue=q).message_count) + else: + self.assertEqual(0, session.queue_query(queue=q).message_count) + + def phase7(self): + session = self.session + session.synchronous = False + + # check xids from phase 6 are gone + txc = self.xid('c') + txd = self.xid('d') + + xids = session.dtx_recover().in_doubt + ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these + + if txc.global_id in ids: + self.fail("Xid still present : %s" % (txc)) + if txd.global_id in ids: + self.fail("Xid still present : %s" % (txc)) + self.assertEqual(0, len(xids)) + + #test deletion of queue after publish + #create queue + session.queue_declare(queue = "q", auto_delete=True, durable=True) + + #send message + for i in range(1, 10): + session.message_transfer(message=self.createMessage(routing_key = "q", body = "my-message")) + + session.synchronous = True + #explicitly delete queue + session.queue_delete(queue = "q") + + #test acking of message from auto-deleted queue + #create queue + session.queue_declare(queue = "q", auto_delete=True, durable=True) + + #send message + session.message_transfer(message=self.createMessage(routing_key = "q", body = "my-message")) + + #create consumer + session.message_subscribe(queue = "q", destination = "a", accept_mode=0, acquire_mode=0) + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + session.message_flow(unit = 0, value = 10, destination = "a") + queue = session.incoming("a") + + #consume the message, cancel subscription (triggering auto-delete), then ack it + msg = queue.get(timeout = 5) + session.message_cancel(destination = "a") + self.ack(msg) + + #test implicit deletion of bindings when queue is deleted + session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True) + session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz") + session.message_transfer(destination= "amq.topic", message=self.createMessage(routing_key = "xyz", body = "my-message")) + session.queue_delete(queue = "durable-subscriber-queue") + + #test unbind: + #create a series of bindings to a queue + session.queue_declare(queue = "binding-test-queue", durable=True) + session.exchange_bind(exchange="amq.direct", queue="binding-test-queue", binding_key="abc") + session.exchange_bind(exchange="amq.direct", queue="binding-test-queue", binding_key="pqr") + session.exchange_bind(exchange="amq.direct", queue="binding-test-queue", binding_key="xyz") + session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="a", arguments={"x-match":"all", "p":"a"}) + session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="b", arguments={"x-match":"all", "p":"b"}) + session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="c", arguments={"x-match":"all", "p":"c"}) + #then restart broker... + + + def phase8(self): + session = self.session + + #continue testing unbind: + #send messages to the queue via each of the bindings + for k in ["abc", "pqr", "xyz"]: + data = "first %s" % (k) + session.message_transfer(destination= "amq.direct", message=self.createMessage(routing_key=k, body=data)) + for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]: + data = "first %s" % (a["p"]) + session.message_transfer(destination="amq.match", message=self.createMessage(application_headers=a, body=data)) + #unbind some bindings (using final 0-10 semantics) + session.exchange_unbind(exchange="amq.direct", queue="binding-test-queue", binding_key="pqr") + session.exchange_unbind(exchange="amq.match", queue="binding-test-queue", binding_key="b") + #send messages again + for k in ["abc", "pqr", "xyz"]: + data = "second %s" % (k) + session.message_transfer(destination= "amq.direct", message=self.createMessage(routing_key=k, body=data)) + for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]: + data = "second %s" % (a["p"]) + session.message_transfer(destination="amq.match", message=self.createMessage(application_headers=a, body=data)) + + #check that only the correct messages are received + expected = [] + for k in ["abc", "pqr", "xyz"]: + expected.append("first %s" % (k)) + for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]: + expected.append("first %s" % (a["p"])) + for k in ["abc", "xyz"]: + expected.append("second %s" % (k)) + for a in [{"p":"a"}, {"p":"c"}]: + expected.append("second %s" % (a["p"])) + + session.message_subscribe(queue = "binding-test-queue", destination = "binding-test") + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "binding-test") + session.message_flow(unit = 0, value = 10, destination = "binding-test") + queue = session.incoming("binding-test") + + while len(expected): + msg = queue.get(timeout=1) + if msg.body not in expected: + self.fail("Missing message: %s" % msg.body) + expected.remove(msg.body) + try: + msg = queue.get(timeout=1) + self.fail("Got extra message: %s" % msg.body) + except Empty: pass + + + + session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True) + session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz") + session.message_transfer(destination= "amq.topic", message=self.createMessage(routing_key = "xyz", body = "my-message")) + session.queue_delete(queue = "durable-subscriber-queue") + + + def xid(self, txid, branchqual = ''): + return self.session.xid(format=0, global_id=txid, branch_id=branchqual) + + def txswap(self, src, dest, tx): + self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status) + self.session.message_subscribe(destination="temp-swap", queue=src, accept_mode=0) + self.session.message_flow(destination="temp-swap", unit=0, value=1) + self.session.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF) + msg = self.session.incoming("temp-swap").get(timeout=1) + self.session.message_cancel(destination="temp-swap") + self.session.message_transfer(message=self.createMessage(routing_key=dest, correlation_id=self.getProperty(msg, 'correlation_id'), + body=msg.body)) + self.ack(msg) + self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status) + + def assertEmptyQueue(self, name): + self.assertEqual(0, self.session.queue_query(queue=name).message_count) + + def assertConnectionException(self, expectedCode, message): + self.assertEqual("connection", message.method.klass.name) + self.assertEqual("close", message.method.name) + self.assertEqual(expectedCode, message.reply_code) + + def assertExpectedMethod(self, reply, klass, method): + self.assertEqual(klass, reply.method.klass.name) + self.assertEqual(method, reply.method.name) + + def assertExpectedContent(self, msg, id, body): + self.assertEqual(id, self.getProperty(msg, 'correlation_id')) + self.assertEqual(body, msg.body) + return msg + + def getProperty(self, msg, name): + for h in msg.headers: + if hasattr(h, name): return getattr(h, name) + return None + + def ack(self, *msgs): + session = self.session + set = RangedSet() + for m in msgs: + set.add(m.id) + #TODO: tidy up completion + session.receiver._completed.add(m.id) + session.message_accept(set) + session.channel.session_completed(session.receiver._completed) + + def assertExpectedGetResult(self, id, body): + return self.assertExpectedContent(session.incoming("incoming-gets").get(timeout=1), id, body) + + def assertEqual(self, expected, actual, msg=''): + if expected != actual: raise Exception("%s expected: %s actual: %s" % (msg, expected, actual)) + + def assertMessageOnQueue(self, queue, id, body): + self.session.message_subscribe(destination="incoming-gets", queue=queue, accept_mode=0) + self.session.message_flow(destination="incoming-gets", unit=0, value=1) + self.session.message_flow(destination="incoming-gets", unit=1, value=0xFFFFFFFF) + msg = self.session.incoming("incoming-gets").get(timeout=1) + self.assertExpectedContent(msg, id, body) + self.ack(msg) + self.session.message_cancel(destination="incoming-gets") + + + def __init__(self): + TestBase010.__init__(self, "run") + self.setBroker("localhost") + self.errata = [] + + def connect(self): + """ Connects to the broker """ + self.conn = Connection(connect(self.host, self.port)) + self.conn.start(timeout=10) + self.session = self.conn.session("test-session", timeout=10) + + def run(self, args=sys.argv[1:]): + try: + opts, extra = getopt(args, "r:s:e:b:p:h", ["retry=", "spec=", "errata=", "broker=", "phase=", "help"]) + except GetoptError, e: + self._die(str(e)) + phase = 0 + retry = 0; + for opt, value in opts: + if opt in ("-h", "--help"): self._die() + if opt in ("-s", "--spec"): self.spec = value + if opt in ("-e", "--errata"): self.errata.append(value) + if opt in ("-b", "--broker"): self.setBroker(value) + if opt in ("-p", "--phase"): phase = int(value) + if opt in ("-r", "--retry"): retry = int(value) + + if not phase: self._die("please specify the phase to run") + phase = "phase%d" % phase + self.connect() + + try: + getattr(self, phase)() + print phase, "succeeded" + res = True; + except Exception, e: + print phase, "failed: ", e + traceback.print_exc() + res = False + + + if not self.session.error(): self.session.close(timeout=10) + self.conn.close(timeout=10) + + # Crude fix to wait for thread in client to exit after return from session_close() + # Reduces occurrences of "Unhandled exception in thread" messages after each test + import time + time.sleep(1) + + return res + + + def setBroker(self, broker): + rex = re.compile(r""" + # [ <user> [ / <password> ] @] <host> [ :<port> ] + ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X) + match = rex.match(broker) + if not match: self._die("'%s' is not a valid broker" % (broker)) + self.user, self.password, self.host, self.port = match.groups() + self.port = int(default(self.port, 5672)) + self.user = default(self.user, "guest") + self.password = default(self.password, "guest") + + def _die(self, message = None): + if message: print message + print """ +Options: + -h/--help : this message + -s/--spec <spec.xml> : file containing amqp XML spec + -p/--phase : test phase to run + -b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to + """ + sys.exit(1) + +def default(value, default): + if (value == None): return default + else: return value + +if __name__ == "__main__": + test = PersistenceTest() + if not test.run(): sys.exit(1) diff --git a/cpp/src/tests/legacystore/run_long_python_tests b/cpp/src/tests/legacystore/run_long_python_tests new file mode 100644 index 0000000000..e43b2236ec --- /dev/null +++ b/cpp/src/tests/legacystore/run_long_python_tests @@ -0,0 +1,21 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +./run_python_tests LONG_TEST diff --git a/cpp/src/tests/legacystore/run_python_tests b/cpp/src/tests/legacystore/run_python_tests new file mode 100644 index 0000000000..d9dec16963 --- /dev/null +++ b/cpp/src/tests/legacystore/run_python_tests @@ -0,0 +1,64 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if test -z ${QPID_DIR} ; then + cat <<EOF + + =========== WARNING: PYTHON TESTS DISABLED ============== + + QPID_DIR not set. + + =========================================================== + +EOF + exit +fi + +. `dirname $0`/tests_env.sh + +MODULENAME=python_tests + +echo "Running Python tests in module ${MODULENAME}..." + +case x$1 in + xSHORT_TEST) + DEFAULT_PYTHON_TESTS="*.client_persistence.ExchangeQueueTests.* *.flow_to_disk.SimpleMaxSizeCountTest.test_browse_recover *.flow_to_disk.SimpleMaxSizeCountTest.test_durable_browse_recover *.flow_to_disk.MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest.test_mixed_limit_2" ;; + xLONG_TEST) + DEFAULT_PYTHON_TESTS= ;; + x) + DEFAULT_PYTHON_TESTS="*.client_persistence.* *.flow_to_disk.SimpleMaxSizeCountTest.* *.flow_to_disk.MultiDurableQueue*.test_mixed_limit_1 *.flow_to_disk.MultiQueue*.test_mixed_limit_1 *.resize.SimpleTest.* *.federation.*" ;; + *) + DEFAULT_PYTHON_TESTS=$1 +esac + +PYTHON_TESTS=${PYTHON_TESTS:-${DEFAULT_PYTHON_TESTS}} + +OUTDIR=${MODULENAME}.tmp +rm -rf $OUTDIR + +# To debug a test, add the following options to the end of the following line: +# -v DEBUG -c qpid.messaging.io.ops [*.testName] +${PYTHON_DIR}/qpid-python-test -m ${MODULENAME} -I ${FAILING_PYTHON_TESTS} ${PYTHON_TESTS} -DOUTDIR=$OUTDIR #-v DEBUG +RETCODE=$? + +if test x${RETCODE} != x0; then + exit 1; +fi +exit 0 diff --git a/cpp/src/tests/legacystore/run_short_python_tests b/cpp/src/tests/legacystore/run_short_python_tests new file mode 100644 index 0000000000..523924fdba --- /dev/null +++ b/cpp/src/tests/legacystore/run_short_python_tests @@ -0,0 +1,21 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +./run_python_tests SHORT_TEST diff --git a/cpp/src/tests/legacystore/run_test b/cpp/src/tests/legacystore/run_test new file mode 100644 index 0000000000..1d5c2ae407 --- /dev/null +++ b/cpp/src/tests/legacystore/run_test @@ -0,0 +1,69 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Set up environment and run a test executable or script. +# +# Output nothing if test passes, show the output if it fails and +# leave output in <test>.log for examination. +# +# If qpidd.port exists run test with QPID_PORT=`cat qpidd.port` +# +# If $VALGRIND if is set run under valgrind. If there are valgrind +# erros show valgrind output, also leave it in <test>.valgrind for +# examination. +# + +source `dirname $0`/vg_check + +# Export variables from makefile. +export VALGRIND srcdir + +# Export QPID_PORT if qpidd.port exists. +test -f qpidd.port && export QPID_PORT=`cat qpidd.port` + +# Avoid silly libtool error messages if these are not defined +test -z "$LC_ALL" && export LC_ALL= +test -z "$LC_CTYPE" && export LC_CTYPE= +test -z "$LC_COLLATE" && export LC_COLLATE= +test -z "$LC_MESSAGES" && export LC_MESSAGES= + +VG_LOG="$1.vglog" +rm -f $VG_LOG* + +if grep -l "^# Generated by .*libtool" "$1" >/dev/null 2>&1; then + # This is a libtool "executable". Valgrind it if VALGRIND specified. + test -n "$VALGRIND" && VALGRIND="$VALGRIND --log-file=$VG_LOG --" + # Hide output unless there's an error. + libtool --mode=execute $VALGRIND "$@" 2>&1 || ERROR=$? + test -n "$VALGRIND" && vg_check $VG_LOG* +else + # This is a non-libtool shell script, just execute it. + export VALGRIND srcdir + exec "$@" +fi + +if test -z "$ERROR"; then + # Clean up logs if there was no error. + rm -f $VG_LOG* + exit 0 +else + exit $ERROR +fi diff --git a/cpp/src/tests/legacystore/start_broker b/cpp/src/tests/legacystore/start_broker new file mode 100644 index 0000000000..30e4659030 --- /dev/null +++ b/cpp/src/tests/legacystore/start_broker @@ -0,0 +1,25 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +QPIDD=$QPID_BLD/src/qpidd +rm -f qpidd.vglog* qpidd.log +test -n "$VALGRIND" && VALGRIND="$VALGRIND --log-file=qpidd.vglog --suppressions=$QPID_DIR/cpp/src/tests/.valgrind.supp --" +exec libtool --mode=execute $VALGRIND $QPIDD --daemon --port=0 --log-enable error+ --log-to-file qpidd.log "$@" > qpidd.port diff --git a/cpp/src/tests/legacystore/stop_broker b/cpp/src/tests/legacystore/stop_broker new file mode 100644 index 0000000000..dcefff376f --- /dev/null +++ b/cpp/src/tests/legacystore/stop_broker @@ -0,0 +1,46 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Stop the broker, check for errors. +# +if test -f qpidd.port; then + export QPID_PORT=`cat qpidd.port` + QPIDD=$QPID_BLD/src/qpidd + rm -f qpidd.port + + $QPIDD --quit || ERROR=$? + + # Check qpidd.log. + grep -a 'warning\|error\|critical' qpidd.log && { + echo "WARNING: Suspicious broker log entries in qpidd.log, above." + } + + # Check valgrind log. + if test -n "$VALGRIND"; then + source `dirname $0`/vg_check $VG_LOG* + vg_check qpidd.vglog* + fi + + exit $ERROR +else + echo "No qpidd.port file found - cannot stop broker." + exit 1; +fi diff --git a/cpp/src/tests/legacystore/system_test.sh b/cpp/src/tests/legacystore/system_test.sh new file mode 100644 index 0000000000..4cccc5ac8d --- /dev/null +++ b/cpp/src/tests/legacystore/system_test.sh @@ -0,0 +1,51 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +error() { echo $*; exit 1; } + +# Make sure $QPID_DIR contains what we need. +if ! test -d "$QPID_DIR" ; then + echo "WARNING: QPID_DIR is not set skipping system tests." + exit +fi +STORE_LIB=../lib/.libs/msgstore.so + +xml_spec=$QPID_DIR/specs/amqp.0-10-qpid-errata.xml +test -f $xml_spec || error "$xml_spec not found: invalid \$QPID_DIR ?" +export PYTHONPATH=$QPID_DIR/python:$QPID_DIR/extras/qmf/src/py:$QPID_DIR/tools/src/py + +echo "Using directory $TMP_DATA_DIR" + +fail=0 + +# Run the tests with a given set of flags +BROKER_OPTS="--no-module-dir --load-module=$STORE_LIB --data-dir=$TMP_DATA_DIR --auth=no --wcache-page-size 16" +run_tests() { + for p in `seq 1 8`; do + $abs_srcdir/start_broker "$@" ${BROKER_OPTS} || { echo "FAIL broker start"; return 1; } + python "$abs_srcdir/persistence.py" -s "$xml_spec" -b localhost:`cat qpidd.port` -p $p -r 3 || fail=1; + $abs_srcdir/stop_broker + done +} + +run_tests || fail=1 + +exit $fail diff --git a/cpp/src/tests/legacystore/tests_env.sh b/cpp/src/tests/legacystore/tests_env.sh new file mode 100644 index 0000000000..30d255b87c --- /dev/null +++ b/cpp/src/tests/legacystore/tests_env.sh @@ -0,0 +1,260 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# --- Function definitions --- + + +func_check_required_env () +#------------------------- +# Check that EITHER: +# QPID_DIR is set (for running against svn QPID) +# OR +# QPID_PREFIX is set (for running against installed QPID +# Will exit with error code 1 if neither of these is defined. +# Params: None +# Returns: 0 if env vars ok, 1 otherwise +{ + if test -z "${QPID_DIR}" -a -z "${QPID_PREFIX}"; then + # Try to find qpidd in the normal installed location + if test -x /usr/sbin/qpidd; then + QPID_PREFIX=/usr + else + echo "ERROR: Could not find installed Qpid" + echo "Either of the following must be set in the environment for this script to run:" + echo " QPID_DIR for running against a Qpid svn build" + echo " QPID_PREFIX for running against an installed Qpid" + return 1 + fi + fi + return 0 +} + + +func_check_qpid_python () +#------------------------ +# Check that Qpid python environment is ok +# Params: None +# Returns: 0 if Python environment is ok; 1 otherwise +{ + if ! python -c "import qpid" ; then + cat <<EOF + + =========== WARNING: PYTHON TESTS DISABLED ============== + + Unable to load python qpid module - skipping python tests. + + PYTHONPATH=${PYTHONPATH} + + =========================================================== + +EOF + return 1 + fi + return 0 +} + + +func_set_env () +#-------------- +# Set up the environment based on value of ${QPID_DIR}: if ${QPID_DIR} exists, assume a svn checkout, +# otherwise set up for an installed or prefix test. +# Params: None +# Returns: Nothing +{ + if test "${QPID_DIR}" -a -d "${QPID_DIR}" ; then + # QPID_DIR is defined for source tree builds by the --with-qpid-checkout configure option. + # QPID_BLD is defined as the build directory, either $QPID_DIR/cpp or separately specified with + # the --with-qpid-build option for VPATH builds. + + # Check QPID_BLD is also set + if test -z ${QPID_BLD}; then + QPID_BLD="${QPID_DIR}/cpp" + fi + + # Paths and dirs + #if test -z ${abs_srcdir}; then + # abs_srcdir=`pwd` + #fi + source $QPID_BLD/src/tests/test_env.sh + # Override these two settings from test_env.sh: + export RECEIVER_EXEC=$QPID_TEST_EXEC_DIR/qpid-receive + export SENDER_EXEC=$QPID_TEST_EXEC_DIR/qpid-send + + echo "abs_srcdir=$abs_srcdir" + export STORE_LIB="`pwd`/../lib/.libs/msgstore.so" + export STORE_ENABLE=1 + export CLUSTER_LIB="${QPID_BLD}/src/.libs/cluster.so" + + PYTHON_DIR="${QPID_DIR}/python" + export PYTHONPATH="${PYTHONPATH}":"${PYTHON_DIR}":"${QPID_DIR}/extras/qmf/src/py":"${QPID_DIR}/tools/src/py":"${QPID_DIR}/cpp/src/tests":"${abs_srcdir}" + + # Libraries + + # Executables + export QPIDD_EXEC="${QPID_BLD}/src/qpidd" + + # Test data + + else + # Set up the environment based on value of ${QPID_PREFIX} for testing against an installed qpid + # Alternatively, make sure ${QPID_BIN_DIR}, ${QPID_SBIN_DIR}, ${QPID_LIB_DIR} and ${QPID_LIBEXEC_DIR} are set for + # the installed location. + if test "${QPID_PREFIX}" -a -d "${QPID_PREFIX}" ; then + QPID_BIN_DIR=${QPID_PREFIX}/bin + QPID_SBIN_DIR=${QPID_PREFIX}/sbin + QPID_LIB_DIR=${QPID_PREFIX}/lib + QPID_LIBEXEC_DIR=${QPID_PREFIX}/libexec + fi + + # These four env vars must be set prior to calling this script + func_checkpaths QPID_BIN_DIR QPID_SBIN_DIR QPID_LIB_DIR QPID_LIBEXEC_DIR + + # Paths and dirs + export PYTHON_DIR="${QPID_BIN_DIR}" + export PYTHONPATH="${PYTHONPATH}":"${QPID_LIB_DIR}/python":"${QPID_LIBEXEC_DIR}/qpid/tests":"${QPID_LIB_DIR}/python2.4" + + + # Libraries + + # Executables + export QPIDD_EXEC="${QPID_SBIN_DIR}/qpidd" + + # Test Data + + fi +} + + +func_mk_data_dir () +#------------------ +# Create a data dir at ${TMP_DATA_DIR} if not present, clear it otherwise. +# Set TMP_DATA_DIR if it is not set. +# Params: None +# Returns: Nothing +{ + if test -z "${TMP_DATA_DIR}"; then + TMP_DATA_DIR=/tmp/python_tests + echo "TMP_DATA_DIR not set; using ${TMP_DATA_DIR}" + fi + + # Delete old test dirs if they exist + if test -d "${TMP_DATA_DIR}" ; then + rm -rf "${TMP_DATA_DIR}/*" + fi + mkdir -p "${TMP_DATA_DIR}" + export TMP_DATA_DIR +} + + +func_checkvar () +#--------------- +# Check that an environment var is set (ie non-zero length) +# Params: $1 - env var to be checked +# Returns: 0 = env var is set (ie non-zero length) +# 1 = env var is not set +{ + local loc_VAR=$1 + if test -z ${!loc_VAR}; then + echo "WARNING: environment variable ${loc_VAR} not set." + return 1 + fi + return 0 +} + + +func_checkpaths () +#----------------- +# Check a list of paths (each can contain ':'-separated sub-list) is set and valid (ie each path exists as a dir) +# Params: $@ - List of path env vars to be checked +# Returns: Nothing +{ + local loc_PATHS=$@ + for path in ${loc_PATHS}; do + func_checkvar ${path} + if test $? == 0; then + local temp_IFS=${IFS} + IFS=":" + local pl=${!path} + for p in ${pl[@]}; do + if test ! -d ${p}; then + echo "WARNING: Directory ${p} in var ${path} not found." + fi + done + IFS=${temp_IFS} + fi + done +} + + +func_checklibs () +#---------------- +# Check that a list of libs is set and valid (ie each lib exists as an executable file) +# Params: $@ - List of lib values to be checked +# Returns: Nothing +{ + local loc_LIBS=$@ + for lib in ${loc_LIBS[@]}; do + func_checkvar ${lib} + if test $? == 0; then + if test ! -x ${!lib}; then + echo "WARNING: Library ${lib}=${!lib} not found." + fi + fi + done +} + + +func_checkexecs () +#----------------- +# Check that a list of executable is set and valid (ie each exec exists as an executable file) +# Params: $@ - List of exec values to be checked +# Returns: Nothing +{ + local loc_EXECS=$@ + for exec in ${loc_EXECS[@]}; do + func_checkvar ${exec} + if test $? == 0; then + if test ! -x ${!exec}; then + echo "WARNING: Executable ${exec}=${!exec} not found or is not executable." + fi + fi + done +} + + +#--- Start of script --- + +func_check_required_env || exit 1 # Cannot run, exit with error + +srcdir=`dirname $0` +if test -z ${abs_srcdir}; then + abs_srcdir=${srcdir} +fi + +func_set_env +func_check_qpid_python || exit 0 # A warning, not a failure. +func_mk_data_dir + +# Check expected environment vars are set +func_checkpaths PYTHON_DIR PYTHONPATH TMP_DATA_DIR +func_checklibs STORE_LIB CLUSTER_LIB +func_checkexecs QPIDD_EXEC QPID_CONFIG_EXEC QPID_ROUTE_EXEC SENDER_EXEC RECEIVER_EXEC + +FAILING_PYTHON_TESTS="${abs_srcdir}/failing_python_tests.txt" + diff --git a/cpp/src/tests/legacystore/unit_test.cpp b/cpp/src/tests/legacystore/unit_test.cpp new file mode 100644 index 0000000000..add80a6f91 --- /dev/null +++ b/cpp/src/tests/legacystore/unit_test.cpp @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +// Defines test_main function to link with actual unit test code. +#define BOOST_AUTO_TEST_MAIN // Boost 1.33 +#define BOOST_TEST_MAIN + +#include "unit_test.h" + diff --git a/cpp/src/tests/legacystore/unit_test.h b/cpp/src/tests/legacystore/unit_test.h new file mode 100644 index 0000000000..16b6ae2ffb --- /dev/null +++ b/cpp/src/tests/legacystore/unit_test.h @@ -0,0 +1,69 @@ +#ifndef QPIPD_TEST_UNIT_TEST_H_ +#define QPIPD_TEST_UNIT_TEST_H_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +// Workaround so we can build against boost 1.32, 1.33 and boost 1.34. +// Remove when we no longer need to support 1.32 or 1.33. + +#include <boost/version.hpp> + +#if (BOOST_VERSION < 103400) // v.1.33 and earlier +# include <boost/test/auto_unit_test.hpp> +#else // v.1.34 and later +# include <boost/test/unit_test.hpp> +#endif + +// Keep the test function for compilation but do not not register it. +// TODO aconway 2008-04-23: better workaround for expected failures. +// The following causes the test testUpdateTxState not to run at all. +# define QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(test_name,n) \ + namespace { struct test_name { void test_method(); }; } \ + void test_name::test_method() +// The following runs the test testUpdateTxState, but it fails. +/*#define QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(test_name,n) \ + namespace { struct test_name { void test_method(); }; } \ + BOOST_AUTO_TEST_CASE(name)*/ + +#if (BOOST_VERSION < 103300) // v.1.32 and earlier + +# define QPID_AUTO_TEST_SUITE(name) +# define QPID_AUTO_TEST_CASE(name) BOOST_AUTO_UNIT_TEST(name) +# define QPID_AUTO_TEST_SUITE_END() + +#elif (BOOST_VERSION < 103400) // v.1.33 + +// Note the trailing ';' +# define QPID_AUTO_TEST_SUITE(name) BOOST_AUTO_TEST_SUITE(name); +# define QPID_AUTO_TEST_CASE(name) BOOST_AUTO_TEST_CASE(name) +# define QPID_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END(); + +#else // v.1.34 and later + +# define QPID_AUTO_TEST_SUITE(name) BOOST_AUTO_TEST_SUITE(name) +# define QPID_AUTO_TEST_CASE(name) BOOST_AUTO_TEST_CASE(name) +# define QPID_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END() + +#endif + +#endif /*!QPIPD_TEST_UNIT_TEST_H_*/ |