summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/legacystore
diff options
context:
space:
mode:
authorCharles E. Rolke <chug@apache.org>2013-01-07 21:24:48 +0000
committerCharles E. Rolke <chug@apache.org>2013-01-07 21:24:48 +0000
commitb1ea5d9b887f7360cdc18ed8f87ccbf4e1299f29 (patch)
treea6fb2f3a38d0af42520b73b01ada8c78653f05b1 /qpid/cpp/src/tests/legacystore
parent985928c7cb19799e3e1da7e0dcd18823a18b698c (diff)
downloadqpid-python-b1ea5d9b887f7360cdc18ed8f87ccbf4e1299f29.tar.gz
QPID-1726 ASF licensed Qpid Store
Add unit tests. There are several issues with these tests: 1. Originally the four .cpp unit test sources were compiled into a single unit_test executable. In the current framework those four sources create conflicting brokers that overwrite each other's store and fail to open port 5672. In this checkin there are four unit test executables. Running each serially gets them all to pass. A new strategy is needed to start brokers that don't conflict. 2. The legacystore.so is not integrated with the rest of the tests. Some tests may run with the externally compiled msgstore.so and some use the built in test_store. Plugging legacystore.so into these other tests is TBD. 3. cpp/src/tests/legacystore defines more tests beyond simple unit tests. None of the issues related to wider system tests are addressed yet. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1430018 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/legacystore')
-rw-r--r--qpid/cpp/src/tests/legacystore/.valgrind.supp35
-rw-r--r--qpid/cpp/src/tests/legacystore/.valgrindrc7
-rw-r--r--qpid/cpp/src/tests/legacystore/CMakeLists.txt111
-rw-r--r--qpid/cpp/src/tests/legacystore/MessageUtils.h105
-rw-r--r--qpid/cpp/src/tests/legacystore/OrderingTest.cpp168
-rw-r--r--qpid/cpp/src/tests/legacystore/SimpleTest.cpp497
-rw-r--r--qpid/cpp/src/tests/legacystore/TestFramework.cpp30
-rw-r--r--qpid/cpp/src/tests/legacystore/TestFramework.h37
-rw-r--r--qpid/cpp/src/tests/legacystore/TransactionalTest.cpp351
-rw-r--r--qpid/cpp/src/tests/legacystore/TwoPhaseCommitTest.cpp675
-rw-r--r--qpid/cpp/src/tests/legacystore/clean.sh32
-rw-r--r--qpid/cpp/src/tests/legacystore/persistence.py574
-rw-r--r--qpid/cpp/src/tests/legacystore/run_long_python_tests21
-rw-r--r--qpid/cpp/src/tests/legacystore/run_python_tests64
-rw-r--r--qpid/cpp/src/tests/legacystore/run_short_python_tests21
-rw-r--r--qpid/cpp/src/tests/legacystore/run_test69
-rw-r--r--qpid/cpp/src/tests/legacystore/start_broker25
-rw-r--r--qpid/cpp/src/tests/legacystore/stop_broker46
-rw-r--r--qpid/cpp/src/tests/legacystore/system_test.sh51
-rw-r--r--qpid/cpp/src/tests/legacystore/tests_env.sh260
-rw-r--r--qpid/cpp/src/tests/legacystore/unit_test.cpp28
-rw-r--r--qpid/cpp/src/tests/legacystore/unit_test.h69
22 files changed, 3276 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/legacystore/.valgrind.supp b/qpid/cpp/src/tests/legacystore/.valgrind.supp
new file mode 100644
index 0000000000..5c1c5377bf
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/.valgrind.supp
@@ -0,0 +1,35 @@
+{
+ <insert_a_suppression_name_here>
+ Memcheck:Leak
+ fun:_Znwm
+ fun:_ZNSs4_Rep9_S_createEmmRKSaIcE
+ fun:_ZNSs12_S_constructIPKcEEPcT_S3_RKSaIcESt20forward_iterator_tag
+ fun:_ZNSsC1EPKcRKSaIcE
+}
+
+{
+ <insert_a_suppression_name_here>
+ Memcheck:Leak
+ fun:_Znwm
+ fun:_ZNSs4_Rep9_S_createEmmRKSaIcE
+ fun:_ZNSs4_Rep8_M_cloneERKSaIcEm
+ fun:_ZNSs7reserveEm
+}
+
+{
+ <insert_a_suppression_name_here>
+ Memcheck:Leak
+ fun:_Znwm
+ fun:_ZNSs4_Rep9_S_createEmmRKSaIcE
+ fun:_ZNSs9_M_mutateEmmm
+ fun:_ZNSs15_M_replace_safeEmmPKcm
+}
+
+{
+ <insert_a_suppression_name_here>
+ Memcheck:Leak
+ fun:_Znwm
+ fun:_ZNSs4_Rep9_S_createEmmRKSaIcE
+ fun:_ZNSsC1IPcEET_S1_RKSaIcE
+}
+
diff --git a/qpid/cpp/src/tests/legacystore/.valgrindrc b/qpid/cpp/src/tests/legacystore/.valgrindrc
new file mode 100644
index 0000000000..4aba7661de
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/.valgrindrc
@@ -0,0 +1,7 @@
+--gen-suppressions=all
+--leak-check=full
+--demangle=yes
+--suppressions=.valgrind.supp
+--num-callers=25
+--trace-children=yes
+
diff --git a/qpid/cpp/src/tests/legacystore/CMakeLists.txt b/qpid/cpp/src/tests/legacystore/CMakeLists.txt
new file mode 100644
index 0000000000..f4bed53ad0
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/CMakeLists.txt
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Enable dashboard reporting.
+include (CTest)
+
+# Make sure that everything get built before the tests
+# Need to create a var with all the necessary top level targets
+
+# If we're linking Boost for DLLs, turn that on for the unit test too.
+if (QPID_LINK_BOOST_DYNAMIC)
+ add_definitions(-DBOOST_TEST_DYN_LINK)
+endif (QPID_LINK_BOOST_DYNAMIC)
+
+include_directories( ${CMAKE_CURRENT_SOURCE_DIR} )
+
+include (FindPythonInterp)
+
+# # Inherit environment from parent script
+# set (abs_srcdir ${CMAKE_CURRENT_SOURCE_DIR})
+# set (abs_builddir ${CMAKE_CURRENT_BINARY_DIR})
+# set (abs_top_srcdir ${CMAKE_SOURCE_DIR})
+# set (abs_top_builddir ${CMAKE_BINARY_DIR})
+# set (builddir_lib_suffix "")
+
+# If valgrind is selected in the configuration step, set up the path to it
+# for CTest.
+if (ENABLE_VALGRIND)
+ set (MEMORYCHECK_COMMAND ${VALGRIND})
+ set (MEMORYCHECK_COMMAND_OPTIONS "--gen-suppressions=all
+--leak-check=full
+--demangle=yes
+--suppressions=${CMAKE_CURRENT_SOURCE_DIR}/.valgrind.supp
+--num-callers=25
+--log-file=ctest_valgrind.vglog")
+endif (ENABLE_VALGRIND)
+
+# Like this to work with cmake 2.4 on Unix
+set (qpid_test_boost_libs
+ ${Boost_UNIT_TEST_FRAMEWORK_LIBRARY} ${Boost_SYSTEM_LIBRARY})
+
+#
+# Unit test program
+#
+# Unit tests are built as a single program to reduce valgrind overhead
+# when running the tests. If you want to build a subset of the tests run
+# ccmake and set unit_tests_to_build to the set you want to build.
+# HACK ALERT - Unit tests are built individually to resolve a conflict
+# with running multiple brokers that connect to 0.0.0.0:5672 and that
+# womp on each other's store directory.
+
+#
+# define_selftest
+# macro to accept the name of a single source file and to create a
+# unit test executable that runs the source.
+#
+MACRO (define_selftest theSourceFile)
+add_executable (legacystore_${theSourceFile}
+ unit_test
+ ${theSourceFile}
+ ${platform_test_additions})
+target_link_libraries (legacystore_${theSourceFile}
+ ${qpid_test_boost_libs}
+ qpidmessaging qpidbroker qmfconsole legacystore)
+get_property(ls_include TARGET legacystore_${theSourceFile} PROPERTY INCLUDE_DIRECTORIES)
+list(APPEND ls_include ${abs_top_srcdir}/src/qpid/legacystore)
+list(APPEND ls_include ${abs_top_srcdir}/src/tests)
+set_target_properties (legacystore_${theSourceFile} PROPERTIES
+ INCLUDE_DIRECTORIES "${ls_include}"
+ COMPILE_DEFINITIONS _IN_QPID_BROKER)
+remember_location(legacystore_${theSourceFile})
+set(test_wrap ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_test${test_script_suffix})
+
+add_test (legacystore_${theSourceFile} ${test_wrap} ${legacystore_${theSourceFile}_LOCATION})
+ENDMACRO (define_selftest)
+
+# add_definitions(-H)
+
+define_selftest (SimpleTest)
+define_selftest (OrderingTest)
+define_selftest (TransactionalTest)
+define_selftest (TwoPhaseCommitTest)
+
+#
+# Other test programs
+#
+
+# This should ideally be done as part of the test run, but I don't know a way
+# to get these arguments and the working directory set like Makefile.am does,
+# and have that run during the test pass.
+if (PYTHON_EXECUTABLE)
+ set (python_bld ${CMAKE_CURRENT_BINARY_DIR}/python)
+ execute_process(COMMAND ${PYTHON_EXECUTABLE} setup.py install --prefix=${pythoon_bld} --install-lib=${python_bld} --install-scripts=${python_bld}/commands
+ WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/../python)
+endif (PYTHON_EXECUTABLE)
diff --git a/qpid/cpp/src/tests/legacystore/MessageUtils.h b/qpid/cpp/src/tests/legacystore/MessageUtils.h
new file mode 100644
index 0000000000..6552357c72
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/MessageUtils.h
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/broker/Message.h>
+#include <qpid/broker/Queue.h>
+#include <qpid/broker/amqp_0_10/MessageTransfer.h>
+#include <qpid/framing/AMQFrame.h>
+#include <qpid/framing/all_method_bodies.h>
+#include <qpid/framing/Uuid.h>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+struct MessageUtils
+{
+ static Message createMessage(const std::string& exchange, const std::string& routingKey,
+ const Uuid& messageId=Uuid(), const bool durable = false,
+ const uint64_t contentSize = 0, const std::string& correlationId = std::string())
+ {
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg(new qpid::broker::amqp_0_10::MessageTransfer());
+
+ AMQFrame method(( MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(contentSize);
+ props->setMessageId(messageId);
+ props->setCorrelationId(correlationId);
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ if (durable)
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(PERSISTENT);
+ return Message(msg, msg);
+ }
+
+ static void addContent(Message msg, const std::string& data)
+ {
+ AMQFrame content((AMQContentBody(data)));
+ qpid::broker::amqp_0_10::MessageTransfer::get(msg).getFrames().append(content);
+ }
+
+ struct MessageRetriever : public Consumer
+ {
+ MessageRetriever(Queue& q) : Consumer("test", CONSUMER), queue(q) {};
+
+ bool deliver(const QueueCursor& c, const Message& m)
+ {
+ message = m;
+ cursor = c;
+ return true;
+ };
+ void notify() {}
+ void cancel() {}
+ void acknowledged(const DeliveryRecord&) {}
+ OwnershipToken* getSession() { return 0; }
+
+ const Queue& queue;
+ Message message;
+ QueueCursor cursor;
+ };
+
+ static Message get(Queue& queue, QueueCursor* cursor = 0)
+ {
+ boost::shared_ptr<MessageRetriever> consumer(new MessageRetriever(queue));
+ if (!queue.dispatch(consumer))throw qpid::Exception("No message found!");
+ if (cursor) *cursor = consumer->cursor;
+ return consumer->message;
+ }
+
+ static Uuid getMessageId(const Message& message)
+ {
+ return qpid::broker::amqp_0_10::MessageTransfer::get(message).getProperties<MessageProperties>()->getMessageId();
+ }
+
+ static std::string getCorrelationId(const Message& message)
+ {
+ return qpid::broker::amqp_0_10::MessageTransfer::get(message).getProperties<MessageProperties>()->getCorrelationId();
+ }
+
+ static void deliver(Message& msg, FrameHandler& h, uint16_t framesize)
+ {
+ qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendHeader(h, framesize, false, 0, 0, qpid::types::Variant::Map());
+ qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendContent(h, framesize);
+ }
+
+};
diff --git a/qpid/cpp/src/tests/legacystore/OrderingTest.cpp b/qpid/cpp/src/tests/legacystore/OrderingTest.cpp
new file mode 100644
index 0000000000..92a09f0c60
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/OrderingTest.cpp
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "unit_test.h"
+
+#include "qpid/legacystore/MessageStoreImpl.h"
+#include <iostream>
+#include "MessageUtils.h"
+#include <qpid/broker/Queue.h>
+#include <qpid/broker/RecoveryManagerImpl.h>
+#include <qpid/framing/AMQHeaderBody.h>
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+using namespace qpid;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace mrg::msgstore;
+
+qpid::broker::Broker::Options opts;
+qpid::broker::Broker br(opts);
+
+QPID_AUTO_TEST_SUITE(OrderingTest)
+
+#define SET_LOG_LEVEL(level) \
+ qpid::log::Options opts(""); \
+ opts.selectors.clear(); \
+ opts.selectors.push_back(level); \
+ qpid::log::Logger::instance().configure(opts);
+
+const std::string test_filename("OrderingTest");
+const char* tdp = getenv("TMP_DATA_DIR");
+const std::string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/OrderingTest");
+
+// === Helper fns ===
+
+const std::string name("OrderingQueue");
+std::auto_ptr<MessageStoreImpl> store;
+QueueRegistry queues;
+Queue::shared_ptr queue;
+std::queue<Uuid> ids;
+
+class TestConsumer : public Consumer
+{
+ public:
+
+ TestConsumer(Queue::shared_ptr q, std::queue<Uuid>& i) : Consumer("test", CONSUMER), queue(q), ids(i) {};
+
+ bool deliver(const QueueCursor& cursor, const Message& message)
+ {
+ queue->dequeue(0, cursor);
+ BOOST_CHECK_EQUAL(ids.front(), MessageUtils::getMessageId(message));
+ ids.pop();
+ return true;
+ };
+ void notify() {}
+ void cancel() {}
+ void acknowledged(const DeliveryRecord&) {}
+ OwnershipToken* getSession() { return 0; }
+ private:
+ Queue::shared_ptr queue;
+ std::queue<Uuid>& ids;
+};
+boost::shared_ptr<TestConsumer> consumer;
+
+void setup()
+{
+ store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(&br));
+ store->init(test_dir, 4, 1, true); // truncate store
+
+ queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0));
+ queue->create();
+ consumer = boost::shared_ptr<TestConsumer>(new TestConsumer(queue, ids));
+}
+
+void push()
+{
+ Uuid messageId(true);
+ ids.push(messageId);
+
+ Message msg = MessageUtils::createMessage("exchange", "routing_key", messageId, true, 0);
+
+ queue->deliver(msg);
+}
+
+bool pop()
+{
+ return queue->dispatch(consumer);
+}
+
+void restart()
+{
+ queue.reset();
+ store.reset();
+
+ store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(&br));
+ store->init(test_dir, 4, 1);
+ ExchangeRegistry exchanges;
+ LinkRegistry links;
+ sys::Timer t;
+ DtxManager mgr(t);
+ mgr.setStore (store.get());
+ RecoveryManagerImpl recoveryMgr(queues, exchanges, links, mgr, br.getProtocolRegistry());
+ store->recover(recoveryMgr);
+
+ queue = queues.find(name);
+ consumer = boost::shared_ptr<TestConsumer>(new TestConsumer(queue, ids));
+}
+
+void check()
+{
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL((u_int32_t) ids.size(), queue->getMessageCount());
+ while (pop()) ;//keeping popping 'till all messages are dequeued
+ BOOST_CHECK_EQUAL((u_int32_t) 0, queue->getMessageCount());
+ BOOST_CHECK_EQUAL((size_t) 0, ids.size());
+}
+
+
+// === Test suite ===
+
+QPID_AUTO_TEST_CASE(Basic)
+{
+ SET_LOG_LEVEL("error+"); // This only needs to be set once.
+
+ std::cout << test_filename << ".Basic: " << std::flush;
+ setup();
+ //push on 10 messages
+ for (int i = 0; i < 10; i++) push();
+ restart();
+ check();
+ std::cout << "ok" << std::endl;
+}
+
+QPID_AUTO_TEST_CASE(Cycle)
+{
+ std::cout << test_filename << ".Cycle: " << std::flush;
+ setup();
+ //push on 10 messages:
+ for (int i = 0; i < 10; i++) push();
+ //pop 5:
+ for (int i = 0; i < 5; i++) pop();
+ //push on another 5:
+ for (int i = 0; i < 5; i++) push();
+ restart();
+ check();
+ std::cout << "ok" << std::endl;
+}
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/legacystore/SimpleTest.cpp b/qpid/cpp/src/tests/legacystore/SimpleTest.cpp
new file mode 100644
index 0000000000..a49333d876
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/SimpleTest.cpp
@@ -0,0 +1,497 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "unit_test.h"
+
+#include "qpid/legacystore/MessageStoreImpl.h"
+#include <iostream>
+#include "tests/legacystore/MessageUtils.h"
+#include "qpid/legacystore/StoreException.h"
+#include "qpid/broker/DirectExchange.h"
+#include <qpid/broker/Queue.h>
+#include <qpid/broker/QueueSettings.h>
+#include <qpid/broker/RecoveryManagerImpl.h>
+#include <qpid/framing/AMQHeaderBody.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+qpid::broker::Broker::Options opts;
+qpid::broker::Broker br(opts);
+
+#define SET_LOG_LEVEL(level) \
+ qpid::log::Options opts(""); \
+ opts.selectors.clear(); \
+ opts.selectors.push_back(level); \
+ qpid::log::Logger::instance().configure(opts);
+
+
+using boost::intrusive_ptr;
+using boost::static_pointer_cast;
+using namespace qpid;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace mrg::msgstore;
+using namespace std;
+
+QPID_AUTO_TEST_SUITE(SimpleTest)
+
+const string test_filename("SimpleTest");
+const char* tdp = getenv("TMP_DATA_DIR");
+const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/SimpleTest");
+
+// === Helper fns ===
+
+struct DummyHandler : OutputHandler
+{
+ std::vector<AMQFrame> frames;
+
+ virtual void send(AMQFrame& frame){
+ frames.push_back(frame);
+ }
+};
+
+void recover(MessageStoreImpl& store, QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links)
+{
+ sys::Timer t;
+ DtxManager mgr(t);
+ mgr.setStore (&store);
+ RecoveryManagerImpl recovery(queues, exchanges, links, mgr, br.getProtocolRegistry());
+ store.recover(recovery);
+}
+
+void recover(MessageStoreImpl& store, ExchangeRegistry& exchanges)
+{
+ QueueRegistry queues;
+ LinkRegistry links;
+ recover(store, queues, exchanges, links);
+}
+
+void recover(MessageStoreImpl& store, QueueRegistry& queues)
+{
+ ExchangeRegistry exchanges;
+ LinkRegistry links;
+ recover(store, queues, exchanges, links);
+}
+
+void bindAndUnbind(const string& exchangeName, const string& queueName,
+ const string& key, const FieldTable& args)
+{
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
+ Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
+ store.create(*exchange, qpid::framing::FieldTable());
+ store.create(*queue, qpid::framing::FieldTable());
+ BOOST_REQUIRE(exchange->bind(queue, key, &args));
+ store.bind(*exchange, *queue, key, args);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry exchanges;
+ QueueRegistry queues;
+ LinkRegistry links;
+
+ recover(store, queues, exchanges, links);
+
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ Queue::shared_ptr queue = queues.find(queueName);
+ // check exchange args are still set
+ for (FieldTable::ValueMap::const_iterator i = args.begin(); i!=args.end(); i++) {
+ BOOST_CHECK(exchange->getArgs().get((*i).first)->getData() == (*i).second->getData());
+ }
+ //check it is bound by unbinding
+ BOOST_REQUIRE(exchange->unbind(queue, key, &args));
+ store.unbind(*exchange, *queue, key, args);
+ }
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry exchanges;
+ QueueRegistry queues;
+ LinkRegistry links;
+
+ recover(store, queues, exchanges, links);
+
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ Queue::shared_ptr queue = queues.find(queueName);
+ // check exchange args are still set
+ for (FieldTable::ValueMap::const_iterator i = args.begin(); i!=args.end(); i++) {
+ BOOST_CHECK(exchange->getArgs().get((*i).first)->getData() == (*i).second->getData());
+ }
+ //make sure it is no longer bound
+ BOOST_REQUIRE(!exchange->unbind(queue, key, &args));
+ }
+}
+
+
+// === Test suite ===
+
+QPID_AUTO_TEST_CASE(CreateDelete)
+{
+ SET_LOG_LEVEL("error+"); // This only needs to be set once.
+
+ cout << test_filename << ".CreateDelete: " << flush;
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ string name("CreateDeleteQueue");
+ Queue queue(name, 0, &store, 0);
+ store.create(queue, qpid::framing::FieldTable());
+// TODO - check dir exists
+ BOOST_REQUIRE(queue.getPersistenceId());
+ store.destroy(queue);
+// TODO - check dir is deleted
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(EmptyRecover)
+{
+ cout << test_filename << ".EmptyRecover: " << flush;
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ //nothing to assert, just testing it doesn't blow up
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(QueueCreate)
+{
+ cout << test_filename << ".QueueCreate: " << flush;
+
+ uint64_t id(0);
+ string name("MyDurableQueue");
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Queue queue(name, 0, &store, 0);
+ store.create(queue, qpid::framing::FieldTable());
+ BOOST_REQUIRE(queue.getPersistenceId());
+ id = queue.getPersistenceId();
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ Queue::shared_ptr queue = registry.find(name);
+ BOOST_REQUIRE(queue.get());
+ BOOST_CHECK_EQUAL(id, queue->getPersistenceId());
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(QueueCreateWithSettings)
+{
+ cout << test_filename << ".QueueCreateWithSettings: " << flush;
+
+ FieldTable arguments;
+ arguments.setInt("qpid.max_count", 202);
+ arguments.setInt("qpid.max_size", 1003);
+ QueueSettings settings;
+ settings.populate(arguments, settings.storeSettings);
+ string name("MyDurableQueue");
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Queue queue(name, settings, &store, 0);
+ queue.create();
+ BOOST_REQUIRE(queue.getPersistenceId());
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ Queue::shared_ptr queue = registry.find(name);
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), 202);
+ BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), 1003);
+ BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), queue->getSettings().maxDepth.getCount());
+ BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), queue->getSettings().maxDepth.getSize());
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(QueueDestroy)
+{
+ cout << test_filename << ".QueueDestroy: " << flush;
+
+ string name("MyDurableQueue");
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Queue queue(name, 0, &store, 0);
+ store.create(queue, qpid::framing::FieldTable());
+ store.destroy(queue);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ BOOST_REQUIRE(!registry.find(name));
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(Enqueue)
+{
+ cout << test_filename << ".Enqueue: " << flush;
+
+ //TODO: this is largely copy & paste'd from MessageTest in
+ //qpid tree. ideally need some helper routines for reducing
+ //this to a simpler less duplicated form
+
+ string name("MyDurableQueue");
+ string exchange("MyExchange");
+ string routingKey("MyRoutingKey");
+ Uuid messageId(true);
+ string data1("abcdefg");
+ string data2("hijklmn");
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
+ queue->create();
+
+ Message msg = MessageUtils::createMessage(exchange, routingKey, messageId, true, 14);
+ MessageUtils::addContent(msg, data1);
+ MessageUtils::addContent(msg, data2);
+
+ msg.addAnnotation("abc", "xyz");
+
+ queue->deliver(msg);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ Queue::shared_ptr queue = registry.find(name);
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL((u_int32_t) 1, queue->getMessageCount());
+ Message msg = MessageUtils::get(*queue);
+
+ BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey());
+ BOOST_CHECK_EQUAL(messageId, MessageUtils::getMessageId(msg));
+ BOOST_CHECK_EQUAL(std::string("xyz"), msg.getAnnotation("abc"));
+ BOOST_CHECK_EQUAL((u_int64_t) 14, msg.getContentSize());
+
+ DummyHandler handler;
+ MessageUtils::deliver(msg, handler, 100);
+ BOOST_CHECK_EQUAL((size_t) 2, handler.frames.size());
+ AMQContentBody* contentBody(dynamic_cast<AMQContentBody*>(handler.frames[1].getBody()));
+ BOOST_REQUIRE(contentBody);
+ BOOST_CHECK_EQUAL(data1.size() + data2.size(), contentBody->getData().size());
+ BOOST_CHECK_EQUAL(data1 + data2, contentBody->getData());
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(Dequeue)
+{
+ cout << test_filename << ".Dequeue: " << flush;
+
+ //TODO: reduce the duplication in these tests
+ string name("MyDurableQueue");
+ {
+ string exchange("MyExchange");
+ string routingKey("MyRoutingKey");
+ Uuid messageId(true);
+ string data("abcdefg");
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
+ queue->create();
+
+ Message msg = MessageUtils::createMessage(exchange, routingKey, messageId, true, 7);
+ MessageUtils::addContent(msg, data);
+
+ queue->deliver(msg);
+
+ QueueCursor cursor;
+ MessageUtils::get(*queue, &cursor);
+ queue->dequeue(0, cursor);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ Queue::shared_ptr queue = registry.find(name);
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL((u_int32_t) 0, queue->getMessageCount());
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy)
+{
+ cout << test_filename << ".ExchangeCreateAndDestroy: " << flush;
+
+ uint64_t id(0);
+ string name("MyDurableExchange");
+ string type("direct");
+ FieldTable args;
+ args.setString("a", "A");
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ ExchangeRegistry registry;
+ Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
+ store.create(*exchange, qpid::framing::FieldTable());
+ id = exchange->getPersistenceId();
+ BOOST_REQUIRE(id);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry registry;
+
+ recover(store, registry);
+
+ Exchange::shared_ptr exchange = registry.get(name);
+ BOOST_CHECK_EQUAL(id, exchange->getPersistenceId());
+ BOOST_CHECK_EQUAL(type, exchange->getType());
+ BOOST_REQUIRE(exchange->isDurable());
+ BOOST_CHECK_EQUAL(*args.get("a"), *exchange->getArgs().get("a"));
+ store.destroy(*exchange);
+ }
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry registry;
+
+ recover(store, registry);
+
+ try {
+ Exchange::shared_ptr exchange = registry.get(name);
+ BOOST_FAIL("Expected exchange not to be found");
+ } catch (const SessionException& e) {
+ BOOST_CHECK_EQUAL((framing::ReplyCode) 404, e.code);
+ }
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeBindAndUnbind)
+{
+ cout << test_filename << ".ExchangeBindAndUnbind: " << flush;
+
+ bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", FieldTable());
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgs)
+{
+ cout << test_filename << ".ExchangeBindAndUnbindWithArgs: " << flush;
+
+ FieldTable args;
+ args.setString("a", "A");
+ args.setString("b", "B");
+ bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", args);
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind)
+{
+ cout << test_filename << ".ExchangeImplicitUnbind: " << flush;
+
+ string exchangeName("MyDurableExchange");
+ string queueName1("MyDurableQueue1");
+ string queueName2("MyDurableQueue2");
+ string key("my-routing-key");
+ FieldTable args;
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
+ Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
+ Queue::shared_ptr queue2(new Queue(queueName2, 0, &store, 0));
+ store.create(*exchange, qpid::framing::FieldTable());
+ store.create(*queue1, qpid::framing::FieldTable());
+ store.create(*queue2, qpid::framing::FieldTable());
+ store.bind(*exchange, *queue1, key, args);
+ store.bind(*exchange, *queue2, key, args);
+ //delete queue1:
+ store.destroy(*queue1);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry exchanges;
+ QueueRegistry queues;
+ LinkRegistry links;
+
+ //ensure recovery works ok:
+ recover(store, queues, exchanges, links);
+
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ BOOST_REQUIRE(!queues.find(queueName1).get());
+ BOOST_REQUIRE(queues.find(queueName2).get());
+
+ //delete exchange:
+ store.destroy(*exchange);
+ }
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry exchanges;
+ QueueRegistry queues;
+ LinkRegistry links;
+
+ //ensure recovery works ok:
+ recover(store, queues, exchanges, links);
+
+ try {
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ BOOST_FAIL("Expected exchange not to be found");
+ } catch (const SessionException& e) {
+ BOOST_CHECK_EQUAL((framing::ReplyCode) 404, e.code);
+ }
+ Queue::shared_ptr queue = queues.find(queueName2);
+ store.destroy(*queue);
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/legacystore/TestFramework.cpp b/qpid/cpp/src/tests/legacystore/TestFramework.cpp
new file mode 100644
index 0000000000..2f7faf7682
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/TestFramework.cpp
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// Defines broker to be used by tests
+
+#include "unit_test.h"
+#include "TestFramework.h"
+#include "qpid/broker/Broker.h"
+
+#include <iostream>
+
+//BOOST_GLOBAL_FIXTURE( testBroker )
diff --git a/qpid/cpp/src/tests/legacystore/TestFramework.h b/qpid/cpp/src/tests/legacystore/TestFramework.h
new file mode 100644
index 0000000000..f3066db602
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/TestFramework.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// Defines broker to be used by tests
+
+#include "unit_test.h"
+
+#include <qpid/broker/Broker.h>
+
+namespace {
+ // test broker
+ qpid::broker::Broker::Options opts;
+ qpid::broker::Broker br(opts);
+/*
+ struct testBroker {
+ testBroker() {}
+ ~testBroker() {}
+ };*/
+}
diff --git a/qpid/cpp/src/tests/legacystore/TransactionalTest.cpp b/qpid/cpp/src/tests/legacystore/TransactionalTest.cpp
new file mode 100644
index 0000000000..2d3f6f922c
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/TransactionalTest.cpp
@@ -0,0 +1,351 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "unit_test.h"
+
+#include "qpid/legacystore/MessageStoreImpl.h"
+#include <iostream>
+#include "MessageUtils.h"
+#include "qpid/legacystore/StoreException.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/log/Statement.h"
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+using namespace mrg::msgstore;
+using namespace qpid;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace std;
+
+namespace {
+qpid::broker::Broker::Options opts;
+qpid::broker::Broker br(opts);
+}
+
+QPID_AUTO_TEST_SUITE(TransactionalTest)
+
+#define SET_LOG_LEVEL(level) \
+ qpid::log::Options opts(""); \
+ opts.selectors.clear(); \
+ opts.selectors.push_back(level); \
+ qpid::log::Logger::instance().configure(opts);
+
+const string test_filename("TransactionalTest");
+const char* tdp = getenv("TMP_DATA_DIR");
+const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/TransactionalTest");
+
+// Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning
+class TestTxnCtxt : public TxnCtxt
+{
+ public:
+ TestTxnCtxt(IdSequence* _loggedtx) : TxnCtxt(_loggedtx) {}
+ void setCompleteFailure(const unsigned num_queues_rem) {
+ // Remove queue members from back of impactedQueues until queues_rem reamin.
+ // to end to simulate multi-queue txn complete failure.
+ while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
+ }
+ void resetPreparedXidStorePtr() { preparedXidStorePtr = 0; }
+};
+
+// Test store which has special begin() which returns a TestTPCTxnCtxt, and a method to check for
+// remaining open transactions.
+// begin(), commit(), and abort() all hide functions in MessageStoreImpl. To avoid the compiler
+// warnings/errors these are renamed with a 'TMS' prefix.
+class TestMessageStore: public MessageStoreImpl
+{
+ public:
+ TestMessageStore(qpid::broker::Broker* br, const char* envpath = 0) : MessageStoreImpl(br, envpath) {}
+ std::auto_ptr<qpid::broker::TransactionContext> TMSbegin() {
+ checkInit();
+ // pass sequence number for c/a
+ return auto_ptr<TransactionContext>(new TestTxnCtxt(&messageIdSequence));
+ }
+ void TMScommit(TransactionContext& ctxt, const bool complete_prepared_list) {
+ checkInit();
+ TxnCtxt* txn(check(&ctxt));
+ if (!txn->isTPC()) {
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
+ if (!complete_prepared_list) dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr();
+ }
+ completed(*dynamic_cast<TxnCtxt*>(txn), true);
+ }
+ void TMSabort(TransactionContext& ctxt, const bool complete_prepared_list)
+ {
+ checkInit();
+ TxnCtxt* txn(check(&ctxt));
+ if (!txn->isTPC()) {
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
+ if (!complete_prepared_list) dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr();
+ }
+ completed(*dynamic_cast<TxnCtxt*>(txn), false);
+ }
+};
+
+// === Helper fns ===
+
+const string nameA("queueA");
+const string nameB("queueB");
+//const Uuid messageId(true);
+std::auto_ptr<MessageStoreImpl> store;
+std::auto_ptr<QueueRegistry> queues;
+Queue::shared_ptr queueA;
+Queue::shared_ptr queueB;
+
+template <class T>
+void setup()
+{
+ store = std::auto_ptr<T>(new T(&br));
+ store->init(test_dir, 4, 1, true); // truncate store
+
+ //create two queues:
+ queueA = Queue::shared_ptr(new Queue(nameA, 0, store.get(), 0));
+ queueA->create();
+ queueB = Queue::shared_ptr(new Queue(nameB, 0, store.get(), 0));
+ queueB->create();
+}
+
+template <class T>
+void restart()
+{
+ queueA.reset();
+ queueB.reset();
+ queues.reset();
+ store.reset();
+
+ store = std::auto_ptr<T>(new T(&br));
+ store->init(test_dir, 4, 1);
+ queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
+ ExchangeRegistry exchanges;
+ LinkRegistry links;
+ sys::Timer t;
+ DtxManager mgr(t);
+ mgr.setStore (store.get());
+ RecoveryManagerImpl recovery(*queues, exchanges, links, mgr, br.getProtocolRegistry());
+ store->recover(recovery);
+
+ queueA = queues->find(nameA);
+ queueB = queues->find(nameB);
+}
+
+Message createMessage(const string& id, const string& exchange="exchange", const string& key="routing_key")
+{
+ return MessageUtils::createMessage(exchange, key, Uuid(), true, 0, id);
+}
+
+void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid = "<none>")
+{
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL(size, queue->getMessageCount());
+ if (size > 0) {
+ Message msg = MessageUtils::get(*queue);
+ BOOST_REQUIRE(msg);
+ BOOST_CHECK_EQUAL(msgid, MessageUtils::getCorrelationId(msg));
+ }
+}
+
+void swap(bool commit)
+{
+ setup<MessageStoreImpl>();
+
+ //create message and enqueue it onto first queue:
+ Message msgA = createMessage("Message", "exchange", "routing_key");
+ queueA->deliver(msgA);
+
+ QueueCursor cursorB;
+ Message msgB = MessageUtils::get(*queueA, &cursorB);
+ BOOST_REQUIRE(msgB);
+ //move the message from one queue to the other as a transaction
+ std::auto_ptr<TransactionContext> txn = store->begin();
+ TxBuffer tx;
+ queueB->deliver(msgB, &tx);//note: need to enqueue it first to avoid message being deleted
+
+ queueA->dequeue(txn.get(), cursorB);
+ tx.prepare(txn.get());
+ if (commit) {
+ store->commit(*txn);
+ } else {
+ store->abort(*txn);
+ }
+
+ restart<MessageStoreImpl>();
+
+ // Check outcome
+ BOOST_REQUIRE(queueA);
+ BOOST_REQUIRE(queueB);
+
+ Queue::shared_ptr x;//the queue from which the message was swapped
+ Queue::shared_ptr y;//the queue on which the message is expected to be
+
+ if (commit) {
+ x = queueA;
+ y = queueB;
+ } else {
+ x = queueB;
+ y = queueA;
+ }
+
+ checkMsg(x, 0);
+ checkMsg(y, 1, "Message");
+ checkMsg(y, 0);
+}
+
+void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list, const bool commit)
+{
+ setup<TestMessageStore>();
+ TestMessageStore* tmsp = static_cast<TestMessageStore*>(store.get());
+ std::auto_ptr<TransactionContext> txn(tmsp->TMSbegin());
+ TxBuffer tx;
+
+ //create two messages and enqueue them onto both queues:
+ Message msgA = createMessage("MessageA", "exchange", "routing_key");
+ queueA->deliver(msgA, &tx);
+ queueB->deliver(msgA, &tx);
+ Message msgB = createMessage("MessageB", "exchange", "routing_key");
+ queueA->deliver(msgB, &tx);
+ queueB->deliver(msgB, &tx);
+
+ tx.prepare(txn.get());
+ static_cast<TestTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem);
+ if (commit)
+ tmsp->TMScommit(*txn, complete_prepared_list);
+ else
+ tmsp->TMSabort(*txn, complete_prepared_list);
+ restart<TestMessageStore>();
+
+ // Check outcome
+ if (commit)
+ {
+ checkMsg(queueA, 2, "MessageA");
+ checkMsg(queueB, 2, "MessageA");
+ checkMsg(queueA, 1, "MessageB");
+ checkMsg(queueB, 1, "MessageB");
+ }
+ checkMsg(queueA, 0);
+ checkMsg(queueB, 0);
+}
+
+// === Test suite ===
+
+QPID_AUTO_TEST_CASE(Commit)
+{
+ SET_LOG_LEVEL("error+"); // This only needs to be set once.
+
+ cout << test_filename << ".Commit: " << flush;
+ swap(true);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(Abort)
+{
+ cout << test_filename << ".Abort: " << flush;
+ swap(false);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueCommit)
+{
+ cout << test_filename << ".MultiQueueCommit: " << flush;
+ testMultiQueueTxn(2, true, true);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAbort)
+{
+ cout << test_filename << ".MultiQueueAbort: " << flush;
+ testMultiQueueTxn(2, true, false);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
+ testMultiQueueTxn(0, false, true);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
+ testMultiQueueTxn(0, false, false);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
+ testMultiQueueTxn(1, false, true);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
+ testMultiQueueTxn(1, false, false);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
+ testMultiQueueTxn(2, false, true);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
+ testMultiQueueTxn(2, false, false);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(LockedRecordTest)
+{
+ cout << test_filename << ".LockedRecordTest: " << flush;
+
+ setup<MessageStoreImpl>();
+ queueA->deliver(createMessage("Message", "exchange", "routingKey"));
+ std::auto_ptr<TransactionContext> txn = store->begin();
+
+ QueueCursor cursor;
+ Message msg = MessageUtils::get(*queueA, &cursor);
+ queueA->dequeue(txn.get(), cursor);
+
+ try {
+ store->dequeue(0, msg.getPersistentContext(), *queueA);
+ BOOST_ERROR("Did not throw JERR_MAP_LOCKED exception as expected.");
+ }
+ catch (const mrg::msgstore::StoreException& e) {
+ if (std::strstr(e.what(), "JERR_MAP_LOCKED") == 0)
+ BOOST_ERROR("Unexpected StoreException: " << e.what());
+ }
+ catch (const std::exception& e) {
+ BOOST_ERROR("Unexpected exception: " << e.what());
+ }
+ store->commit(*txn);
+ checkMsg(queueA, 0);
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/legacystore/TwoPhaseCommitTest.cpp b/qpid/cpp/src/tests/legacystore/TwoPhaseCommitTest.cpp
new file mode 100644
index 0000000000..92e49df9e3
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/TwoPhaseCommitTest.cpp
@@ -0,0 +1,675 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "unit_test.h"
+
+#include "qpid/legacystore/MessageStoreImpl.h"
+#include <iostream>
+#include "MessageUtils.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/log/Statement.h"
+#include "qpid/legacystore/TxnCtxt.h"
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+using namespace mrg::msgstore;
+using namespace qpid;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace std;
+
+
+qpid::broker::Broker::Options opts;
+qpid::broker::Broker br(opts);
+
+
+QPID_AUTO_TEST_SUITE(TwoPhaseCommitTest)
+
+#define SET_LOG_LEVEL(level) \
+ qpid::log::Options opts(""); \
+ opts.selectors.clear(); \
+ opts.selectors.push_back(level); \
+ qpid::log::Logger::instance().configure(opts);
+
+
+const string test_filename("TwoPhaseCommitTest");
+const char* tdp = getenv("TMP_DATA_DIR");
+string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/TwoPhaseCommitTest");
+
+// === Helper fns ===
+
+class TwoPhaseCommitTest
+{
+
+ class Strategy
+ {
+ public:
+ virtual void init() = 0;
+ virtual void run(TPCTransactionContext* txn) = 0;
+ virtual void check(bool committed) = 0;
+ virtual ~Strategy(){}
+ };
+
+ class Swap : public Strategy
+ {
+ TwoPhaseCommitTest* const test;
+ const string messageId;
+ Message msg;
+ public:
+ Swap(TwoPhaseCommitTest* const test_, const string& messageId_): test(test_), messageId(messageId_) {}
+ void init(){ msg = test->deliver(messageId, test->queueA); }
+ void run(TPCTransactionContext* txn) { test->swap(txn, test->queueA, test->queueB); }
+ void check(bool committed) { test->swapCheck(committed, messageId, test->queueA, test->queueB); }
+ };
+
+ class Enqueue : public Strategy
+ {
+ TwoPhaseCommitTest* const test;
+ Message msg1;
+ Message msg2;
+ Message msg3;
+ public:
+ Enqueue(TwoPhaseCommitTest* const test_): test(test_) {}
+ void init() {}
+ void run(TPCTransactionContext* txn) {
+ msg1 = test->enqueue(txn, "Enqueue1", test->queueA);
+ msg2 = test->enqueue(txn, "Enqueue2", test->queueA);
+ msg3 = test->enqueue(txn, "Enqueue3", test->queueA);
+ }
+ void check(bool committed) {
+ if (committed) {
+ test->checkMsg(test->queueA, 3, "Enqueue1");
+ test->checkMsg(test->queueA, 2, "Enqueue2");
+ test->checkMsg(test->queueA, 1, "Enqueue3");
+ }
+ test->checkMsg(test->queueA, 0);
+ }
+ };
+
+ class Dequeue : public Strategy
+ {
+ TwoPhaseCommitTest* const test;
+ Message msg1;
+ Message msg2;
+ Message msg3;
+ public:
+ Dequeue(TwoPhaseCommitTest* const test_): test(test_) {}
+ void init() {
+ msg1 = test->deliver("Dequeue1", test->queueA);
+ msg2 = test->deliver("Dequeue2", test->queueA);
+ msg3 = test->deliver("Dequeue3", test->queueA);
+ }
+ void run(TPCTransactionContext* txn) {
+ test->dequeue(txn, test->queueA);
+ test->dequeue(txn, test->queueA);
+ test->dequeue(txn, test->queueA);
+ }
+ void check(bool committed) {
+ if (!committed) {
+ test->checkMsg(test->queueA, 3, "Dequeue1");
+ test->checkMsg(test->queueA, 2, "Dequeue2");
+ test->checkMsg(test->queueA, 1, "Dequeue3");
+ }
+ test->checkMsg(test->queueA, 0);
+ }
+ };
+
+ class MultiQueueTxn : public Strategy
+ {
+ TwoPhaseCommitTest* const test;
+ Message msg1;
+ Message msg2;
+ std::set<Queue::shared_ptr> queueset;
+ public:
+ MultiQueueTxn(TwoPhaseCommitTest* const test_): test(test_) {}
+ virtual void init() {}
+ virtual void run(TPCTransactionContext* txn) {
+ queueset.insert(test->queueA);
+ queueset.insert(test->queueB);
+ msg1 = test->enqueue(txn, "Message1", queueset);
+ msg2 = test->enqueue(txn, "Message2", queueset);
+ queueset.clear();
+ }
+ virtual void check(bool committed) {
+ TestMessageStore* sptr = static_cast<TestMessageStore*>(test->store.get());
+ if (committed)
+ {
+ test->checkMsg(test->queueA, 2, "Message1");
+ test->checkMsg(test->queueB, 2, "Message1");
+ test->checkMsg(test->queueA, 1, "Message2");
+ test->checkMsg(test->queueB, 1, "Message2");
+ }
+ test->checkMsg(test->queueA, 0);
+ test->checkMsg(test->queueB, 0);
+ // Check there are no remaining open txns in store
+ BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingTxns(*(test->queueA)));
+ BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingTxns(*(test->queueB)));
+ BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingPreparedListTxns());
+ }
+ };
+
+ // Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning
+ class TestTPCTxnCtxt : public TPCTxnCtxt
+ {
+ public:
+ TestTPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TPCTxnCtxt(_xid, _loggedtx) {}
+ void setCompleteFailure(const unsigned num_queues_rem, const bool complete_prepared_list) {
+ // Remove queue members from back of impactedQueues until queues_rem reamin.
+ // to end to simulate multi-queue txn complete failure.
+ while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
+ // If prepared list is not to be committed, set pointer to 0
+ if (!complete_prepared_list) preparedXidStorePtr = 0;
+ }
+ };
+
+ // Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to check for
+ // reamining open transactions
+ class TestMessageStore: public MessageStoreImpl
+ {
+ public:
+ TestMessageStore(qpid::broker::Broker* br, const char* envpath = 0) : MessageStoreImpl(br, envpath) {}
+ std::auto_ptr<qpid::broker::TPCTransactionContext> TMSbegin(const std::string& xid) {
+ checkInit();
+ IdSequence* jtx = &messageIdSequence;
+ // pass sequence number for c/a
+ return auto_ptr<TPCTransactionContext>(new TestTPCTxnCtxt(xid, jtx));
+ }
+ u_int32_t getRemainingTxns(const PersistableQueue& queue) {
+ return static_cast<JournalImpl*>(queue.getExternalQueueStore())->get_open_txn_cnt();
+ }
+ u_int32_t getRemainingPreparedListTxns() {
+ return tplStorePtr->get_open_txn_cnt();
+ }
+ };
+
+ const string nameA;
+ const string nameB;
+ std::auto_ptr<MessageStoreImpl> store;
+ std::auto_ptr<DtxManager> dtxmgr;
+ std::auto_ptr<QueueRegistry> queues;
+ std::auto_ptr<LinkRegistry> links;
+ Queue::shared_ptr queueA;
+ Queue::shared_ptr queueB;
+ Message msg1;
+ Message msg2;
+ Message msg4;
+ std::auto_ptr<TxBuffer> tx;
+
+ void recoverPrepared(bool commit)
+ {
+ setup<MessageStoreImpl>();
+
+ Swap swap(this, "RecoverPrepared");
+ swap.init();
+ std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
+ swap.run(txn.get());
+ if (tx.get()) {
+ tx->prepare(txn.get());
+ tx.reset();
+ }
+
+ store->prepare(*txn);
+ restart<MessageStoreImpl>();
+
+ //check that the message is not available from either queue
+ BOOST_CHECK_EQUAL((u_int32_t) 0, queueA->getMessageCount());
+ BOOST_CHECK_EQUAL((u_int32_t) 0, queueB->getMessageCount());
+
+ //commit/abort the txn - through the dtx manager, not directly on the store
+ if (commit) {
+ dtxmgr->commit("my-xid", false);
+ } else {
+ dtxmgr->rollback("my-xid");
+ }
+
+ swap.check(commit);
+ restart<MessageStoreImpl>();
+ swap.check(commit);
+ }
+
+ void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list, const bool commit)
+ {
+ setup<TestMessageStore>();
+ MultiQueueTxn mqtTest(this);
+ mqtTest.init();
+ std::auto_ptr<TPCTransactionContext> txn(static_cast<TestMessageStore*>(store.get())->begin("my-xid"));
+ mqtTest.run(txn.get());
+ if (tx.get()) {
+ tx->prepare(txn.get());
+ tx.reset();
+ }
+ store->prepare(*txn);
+
+ // As the commits and aborts should happen through DtxManager, and it is too complex to
+ // pass all these test params through, we bypass DtxManager and use the store directly.
+ // This will prevent the queues from seeing committed txns, however. To test the success
+ // or failure of
+ static_cast<TestTPCTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem, complete_prepared_list);
+ if (commit)
+ store->commit(*txn);
+ else
+ store->abort(*txn);
+ restart<TestMessageStore>();
+ mqtTest.check(commit);
+ }
+
+ void commit(Strategy& strategy)
+ {
+ setup<MessageStoreImpl>();
+ strategy.init();
+
+ std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
+ strategy.run(txn.get());
+ if (tx.get()) {
+ tx->prepare(txn.get());
+ tx.reset();
+ }
+ store->prepare(*txn);
+ store->commit(*txn);
+ restart<MessageStoreImpl>();
+ strategy.check(true);
+ }
+
+ void abort(Strategy& strategy, bool prepare)
+ {
+ setup<MessageStoreImpl>();
+ strategy.init();
+
+ std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
+ strategy.run(txn.get());
+ if (tx.get()) {
+ tx->prepare(txn.get());
+ tx.reset();
+ }
+ if (prepare) store->prepare(*txn);
+ store->abort(*txn);
+ restart<MessageStoreImpl>();
+ strategy.check(false);
+ }
+
+ void swap(TPCTransactionContext* txn, Queue::shared_ptr& from, Queue::shared_ptr& to)
+ {
+ QueueCursor c;
+ Message msg1 = MessageUtils::get(*from, &c);//just dequeues in memory
+ //move the message from one queue to the other as part of a
+ //distributed transaction
+ if (!tx.get()) tx = std::auto_ptr<TxBuffer>(new TxBuffer);
+ to->deliver(msg1, tx.get());//note: need to enqueue it first to avoid message being deleted
+ from->dequeue(txn, c);
+ }
+
+ void dequeue(TPCTransactionContext* txn, Queue::shared_ptr& queue)
+ {
+ QueueCursor c;
+ Message msg2 = MessageUtils::get(*queue, &c);//just dequeues in memory
+ queue->dequeue(txn, c);
+ }
+
+ Message enqueue(TPCTransactionContext* /*txn*/, const string& msgid, Queue::shared_ptr& queue)
+ {
+ Message msg = createMessage(msgid);
+ if (!tx.get()) tx = std::auto_ptr<TxBuffer>(new TxBuffer);
+ queue->deliver(msg, tx.get());
+ return msg;
+ }
+
+ Message enqueue(TPCTransactionContext* /*txn*/, const string& msgid, std::set<Queue::shared_ptr>& queueset)
+ {
+ if (!tx.get()) tx = std::auto_ptr<TxBuffer>(new TxBuffer);
+ Message msg = createMessage(msgid);
+ for (std::set<Queue::shared_ptr>::iterator i = queueset.begin(); i != queueset.end(); i++) {
+ (*i)->deliver(msg, tx.get());
+ }
+ return msg;
+ }
+
+ Message deliver(const string& msgid, Queue::shared_ptr& queue)
+ {
+ Message m = createMessage(msgid);
+ queue->deliver(m);
+ return m;
+ }
+
+ template <class T>
+ void setup()
+ {
+ store = std::auto_ptr<T>(new T(&br));
+ store->init(test_dir, 4, 1, true); // truncate store
+
+ //create two queues:
+ queueA = Queue::shared_ptr(new Queue(nameA, 0, store.get(), 0));
+ queueA->create();
+ queueB = Queue::shared_ptr(new Queue(nameB, 0, store.get(), 0));
+ queueB->create();
+ }
+
+ Message createMessage(const string& id, const string& exchange="exchange", const string& key="routing_key")
+ {
+ Message msg = MessageUtils::createMessage(exchange, key, Uuid(), true, 0, id);
+ return msg;
+ }
+
+ template <class T>
+ void restart()
+ {
+ queueA.reset();
+ queueB.reset();
+ store.reset();
+ queues.reset();
+ links.reset();
+
+ store = std::auto_ptr<T>(new T(&br));
+ store->init(test_dir, 4, 1);
+ sys::Timer t;
+ ExchangeRegistry exchanges;
+ queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
+ links = std::auto_ptr<LinkRegistry>(new LinkRegistry);
+ dtxmgr = std::auto_ptr<DtxManager>(new DtxManager(t));
+ dtxmgr->setStore (store.get());
+ RecoveryManagerImpl recovery(*queues, exchanges, *links, *dtxmgr, br.getProtocolRegistry());
+ store->recover(recovery);
+
+ queueA = queues->find(nameA);
+ queueB = queues->find(nameB);
+ }
+
+ void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid = "<none>")
+ {
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL(size, queue->getMessageCount());
+ if (size > 0) {
+ Message msg = MessageUtils::get(*queue);
+ BOOST_REQUIRE(msg);
+ BOOST_CHECK_EQUAL(msgid, MessageUtils::getCorrelationId(msg));
+ }
+ }
+
+ void swapCheck(bool swapped, const string& msgid, Queue::shared_ptr& from, Queue::shared_ptr& to)
+ {
+ BOOST_REQUIRE(from);
+ BOOST_REQUIRE(to);
+
+ Queue::shared_ptr x; //the queue from which the message was swapped
+ Queue::shared_ptr y; //the queue on which the message is expected to be
+
+ if (swapped) {
+ x = from;
+ y = to;
+ } else {
+ x = to;
+ y = from;
+ }
+
+ checkMsg(x, 0);
+ checkMsg(y, 1, msgid);
+ checkMsg(y, 0);
+ }
+
+public:
+ TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB") {}
+
+ void testCommitEnqueue()
+ {
+ Enqueue enqueue(this);
+ commit(enqueue);
+ }
+
+ void testCommitDequeue()
+ {
+ Dequeue dequeue(this);
+ commit(dequeue);
+ }
+
+ void testCommitSwap()
+ {
+ Swap swap(this, "SwapMessageId");
+ commit(swap);
+ }
+
+ void testPrepareAndAbortEnqueue()
+ {
+ Enqueue enqueue(this);
+ abort(enqueue, true);
+ }
+
+ void testPrepareAndAbortDequeue()
+ {
+ Dequeue dequeue(this);
+ abort(dequeue, true);
+ }
+
+ void testPrepareAndAbortSwap()
+ {
+ Swap swap(this, "SwapMessageId");
+ abort(swap, true);
+ }
+
+ void testAbortNoPrepareEnqueue()
+ {
+ Enqueue enqueue(this);
+ abort(enqueue, false);
+ }
+
+ void testAbortNoPrepareDequeue()
+ {
+ Dequeue dequeue(this);
+ abort(dequeue, false);
+ }
+
+ void testAbortNoPrepareSwap()
+ {
+ Swap swap(this, "SwapMessageId");
+ abort(swap, false);
+ }
+
+ void testRecoverPreparedThenCommitted()
+ {
+ recoverPrepared(true);
+ }
+
+ void testRecoverPreparedThenAborted()
+ {
+ recoverPrepared(false);
+ }
+
+ void testMultiQueueCommit()
+ {
+ testMultiQueueTxn(2, true, true);
+ }
+
+ void testMultiQueueAbort()
+ {
+ testMultiQueueTxn(2, true, false);
+ }
+
+ void testMultiQueueNoQueueCommitRecover()
+ {
+ testMultiQueueTxn(0, false, true);
+ }
+
+ void testMultiQueueNoQueueAbortRecover()
+ {
+ testMultiQueueTxn(0, false, false);
+ }
+
+ void testMultiQueueSomeQueueCommitRecover()
+ {
+ testMultiQueueTxn(1, false, true);
+ }
+
+ void testMultiQueueSomeQueueAbortRecover()
+ {
+ testMultiQueueTxn(1, false, false);
+ }
+
+ void testMultiQueueAllQueueCommitRecover()
+ {
+ testMultiQueueTxn(2, false, true);
+ }
+
+ void testMultiQueueAllQueueAbortRecover()
+ {
+ testMultiQueueTxn(2, false, false);
+ }
+};
+
+TwoPhaseCommitTest tpct;
+
+// === Test suite ===
+
+QPID_AUTO_TEST_CASE(CommitEnqueue)
+{
+ SET_LOG_LEVEL("error+"); // This only needs to be set once.
+
+ cout << test_filename << ".CommitEnqueue: " << flush;
+ tpct.testCommitEnqueue();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(CommitDequeue)
+{
+ cout << test_filename << ".CommitDequeue: " << flush;
+ tpct.testCommitDequeue();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(CommitSwap)
+{
+ cout << test_filename << ".CommitSwap: " << flush;
+ tpct.testCommitSwap();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueue)
+{
+ cout << test_filename << ".PrepareAndAbortEnqueue: " << flush;
+ tpct.testPrepareAndAbortEnqueue();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(PrepareAndAbortDequeue)
+{
+ cout << test_filename << ".PrepareAndAbortDequeue: " << flush;
+ tpct.testPrepareAndAbortDequeue();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(PrepareAndAbortSwap)
+{
+ cout << test_filename << ".PrepareAndAbortSwap: " << flush;
+ tpct.testPrepareAndAbortSwap();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueue)
+{
+ cout << test_filename << ".AbortNoPrepareEnqueue: " << flush;
+ tpct.testAbortNoPrepareEnqueue();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(AbortNoPrepareDequeue)
+{
+ cout << test_filename << ".AbortNoPrepareDequeue: " << flush;
+ tpct.testAbortNoPrepareDequeue();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(AbortNoPrepareSwap)
+{
+ cout << test_filename << ".AbortNoPrepareSwap: " << flush;
+ tpct.testAbortNoPrepareSwap();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(RecoverPreparedThenCommitted)
+{
+ cout << test_filename << ".RecoverPreparedThenCommitted: " << flush;
+ tpct.testRecoverPreparedThenCommitted();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(RecoverPreparedThenAborted)
+{
+ cout << test_filename << ".RecoverPreparedThenAborted: " << flush;
+ tpct.testRecoverPreparedThenAborted();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueCommit)
+{
+ cout << test_filename << ".MultiQueueCommit: " << flush;
+ tpct.testMultiQueueCommit();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAbort)
+{
+ cout << test_filename << ".MultiQueueAbort: " << flush;
+ tpct.testMultiQueueAbort();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
+ tpct.testMultiQueueNoQueueCommitRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
+ tpct.testMultiQueueNoQueueAbortRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
+ tpct.testMultiQueueSomeQueueCommitRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
+ tpct.testMultiQueueSomeQueueAbortRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
+ tpct.testMultiQueueAllQueueCommitRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
+ tpct.testMultiQueueAllQueueAbortRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/legacystore/clean.sh b/qpid/cpp/src/tests/legacystore/clean.sh
new file mode 100644
index 0000000000..efb19586fa
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/clean.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This script cleans up any previous database and journal files, and should
+# be run prior to the store system tests, as these are prone to crashing or
+# hanging under some circumstances if the database is old or inconsistent.
+
+if [ -d ${TMP_DATA_DIR} ]; then
+ rm -rf ${TMP_DATA_DIR}
+fi
+if [ -d ${TMP_PYTHON_TEST_DIR} ]; then
+ rm -rf ${TMP_PYTHON_TEST_DIR}
+fi
+rm -f ${abs_srcdir}/*.vglog*
diff --git a/qpid/cpp/src/tests/legacystore/persistence.py b/qpid/cpp/src/tests/legacystore/persistence.py
new file mode 100644
index 0000000000..c4ab712f14
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/persistence.py
@@ -0,0 +1,574 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys, re, traceback, socket
+from getopt import getopt, GetoptError
+
+from qpid.connection import Connection
+from qpid.util import connect
+from qpid.datatypes import Message, RangedSet
+from qpid.queue import Empty
+from qpid.session import SessionException
+from qpid.testlib import TestBase010
+from time import sleep
+
+class PersistenceTest(TestBase010):
+
+ XA_RBROLLBACK = 1
+ XA_RBTIMEOUT = 2
+ XA_OK = 0
+
+ def createMessage(self, **kwargs):
+ session = self.session
+ dp = {}
+ dp['delivery_mode'] = 2
+ mp = {}
+ for k, v in kwargs.iteritems():
+ if k in ['routing_key', 'delivery_mode']: dp[k] = v
+ if k in ['message_id', 'correlation_id', 'application_headers']: mp[k] = v
+ args = []
+ args.append(session.delivery_properties(**dp))
+ if len(mp):
+ args.append(session.message_properties(**mp))
+ if kwargs.has_key('body'): args.append(kwargs['body'])
+ return Message(*args)
+
+ def phase1(self):
+ session = self.session
+
+ session.queue_declare(queue="queue-a", durable=True)
+ session.queue_declare(queue="queue-b", durable=True)
+ session.exchange_bind(queue="queue-a", exchange="amq.direct", binding_key="a")
+ session.exchange_bind(queue="queue-b", exchange="amq.direct", binding_key="b")
+
+ session.message_transfer(destination="amq.direct",
+ message=self.createMessage(routing_key="a", correlation_id="Msg0001", body="A_Message1"))
+ session.message_transfer(destination="amq.direct",
+ message=self.createMessage(routing_key="b", correlation_id="Msg0002", body="B_Message1"))
+
+# session.queue_declare(queue="lvq-test", durable=True, arguments={"qpid.last_value_queue":True})
+# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B1"))
+# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A1"))
+# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A2"))
+# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B2"))
+# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B3"))
+# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C1"))
+
+
+
+ def phase2(self):
+ session = self.session
+
+ #check queues exists
+ session.queue_declare(queue="queue-a", durable=True, passive=True)
+ session.queue_declare(queue="queue-b", durable=True, passive=True)
+
+ #check they are still bound to amq.direct correctly
+ responses = []
+ responses.append(session.exchange_bound(queue="queue-a", exchange="amq.direct", binding_key="a"))
+ responses.append(session.exchange_bound(queue="queue-b", exchange="amq.direct", binding_key="b"))
+ for r in responses:
+ self.assert_(not r.exchange_not_found)
+ self.assert_(not r.queue_not_found)
+ self.assert_(not r.key_not_matched)
+
+
+ #check expected messages are there
+ self.assertMessageOnQueue("queue-a", "Msg0001", "A_Message1")
+ self.assertMessageOnQueue("queue-b", "Msg0002", "B_Message1")
+
+ self.assertEmptyQueue("queue-a")
+ self.assertEmptyQueue("queue-b")
+
+ session.queue_declare(queue="queue-c", durable=True)
+
+ #send a message to a topic such that it reaches all queues
+ session.exchange_bind(queue="queue-a", exchange="amq.topic", binding_key="abc")
+ session.exchange_bind(queue="queue-b", exchange="amq.topic", binding_key="abc")
+ session.exchange_bind(queue="queue-c", exchange="amq.topic", binding_key="abc")
+
+ session.message_transfer(destination="amq.topic",
+ message=self.createMessage(routing_key="abc", correlation_id="Msg0003", body="AB_Message2"))
+
+# #check LVQ exists and has exepected messages:
+# session.queue_declare(queue="lvq-test", durable=True, passive=True)
+# session.message_subscribe(destination="lvq", queue="lvq-test")
+# lvq = session.incoming("lvq")
+# lvq.start()
+# accepted = RangedSet()
+# for m in ["A2", "B3", "C1"]:
+# msg = lvq.get(timeout=1)
+# self.assertEquals(m, msg.body)
+# accepted.add(msg.id)
+# try:
+# extra = lvq.get(timeout=1)
+# self.fail("lvq-test not empty, contains: " + extra.body)
+# except Empty: None
+# #publish some more messages while subscriber is active (no replacement):
+# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C2"))
+# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C3"))
+# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A3"))
+# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A4"))
+# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C4"))
+# #check that accepting replaced messages is safe
+# session.message_accept(accepted)
+
+
+ def phase3(self):
+ session = self.session
+
+# #lvq recovery validation
+# session.queue_declare(queue="lvq-test", durable=True, passive=True)
+# session.message_subscribe(destination="lvq", queue="lvq-test")
+# lvq = session.incoming("lvq")
+# lvq.start()
+# accepted = RangedSet()
+# lvq.start()
+# for m in ["C4", "A4"]:
+# msg = lvq.get(timeout=1)
+# self.assertEquals(m, msg.body)
+# accepted.add(msg.id)
+# session.message_accept(accepted)
+# try:
+# extra = lvq.get(timeout=1)
+# self.fail("lvq-test not empty, contains: " + extra.body)
+# except Empty: None
+# session.message_cancel(destination="lvq")
+# session.queue_delete(queue="lvq-test")
+
+
+ #check queues exists
+ session.queue_declare(queue="queue-a", durable=True, passive=True)
+ session.queue_declare(queue="queue-b", durable=True, passive=True)
+ session.queue_declare(queue="queue-c", durable=True, passive=True)
+
+ session.tx_select()
+ #check expected messages are there
+ self.assertMessageOnQueue("queue-a", "Msg0003", "AB_Message2")
+ self.assertMessageOnQueue("queue-b", "Msg0003", "AB_Message2")
+ self.assertMessageOnQueue("queue-c", "Msg0003", "AB_Message2")
+
+ self.assertEmptyQueue("queue-a")
+ self.assertEmptyQueue("queue-b")
+ self.assertEmptyQueue("queue-c")
+
+ #note: default bindings must be restored for this to work
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-a", correlation_id="Msg0004", body="A_Message3"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-a", correlation_id="Msg0005", body="A_Message4"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-a", correlation_id="Msg0006", body="A_Message5"))
+
+ session.tx_commit()
+
+
+ #delete a queue
+ session.queue_delete(queue="queue-c")
+
+ session.message_subscribe(destination="ctag", queue="queue-a", accept_mode=0)
+ session.message_flow(destination="ctag", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="ctag", unit=1, value=0xFFFFFFFF)
+ included = session.incoming("ctag")
+ msg1 = included.get(timeout=1)
+ self.assertExpectedContent(msg1, "Msg0004", "A_Message3")
+ msg2 = included.get(timeout=1)
+ self.assertExpectedContent(msg2, "Msg0005", "A_Message4")
+ msg3 = included.get(timeout=1)
+ self.assertExpectedContent(msg3, "Msg0006", "A_Message5")
+ self.ack(msg1, msg2, msg3)
+
+ session.message_transfer(destination="amq.direct", message=self.createMessage(
+ routing_key="queue-b", correlation_id="Msg0007", body="B_Message3"))
+
+ session.tx_rollback()
+
+
+ def phase4(self):
+ session = self.session
+
+ #check queues exists
+ session.queue_declare(queue="queue-a", durable=True, passive=True)
+ session.queue_declare(queue="queue-b", durable=True, passive=True)
+
+ self.assertMessageOnQueue("queue-a", "Msg0004", "A_Message3")
+ self.assertMessageOnQueue("queue-a", "Msg0005", "A_Message4")
+ self.assertMessageOnQueue("queue-a", "Msg0006", "A_Message5")
+
+ self.assertEmptyQueue("queue-a")
+ self.assertEmptyQueue("queue-b")
+
+ #check this queue doesn't exist
+ try:
+ session.queue_declare(queue="queue-c", durable=True, passive=True)
+ raise Exception("Expected queue-c to have been deleted")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+ def phase5(self):
+
+ session = self.session
+ queues = ["queue-a1", "queue-a2", "queue-b1", "queue-b2", "queue-c1", "queue-c2", "queue-d1", "queue-d2"]
+
+ for q in queues:
+ session.queue_declare(queue=q, durable=True)
+ session.queue_purge(queue=q)
+
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-a1", correlation_id="MsgA", body="MessageA"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-b1", correlation_id="MsgB", body="MessageB"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-c1", correlation_id="MsgC", body="MessageC"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-d1", correlation_id="MsgD", body="MessageD"))
+
+ session.dtx_select()
+ txa = self.xid('a')
+ txb = self.xid('b')
+ txc = self.xid('c')
+ txd = self.xid('d')
+
+ self.txswap("queue-a1", "queue-a2", txa)
+ self.txswap("queue-b1", "queue-b2", txb)
+ self.txswap("queue-c1", "queue-c2", txc)
+ self.txswap("queue-d1", "queue-d2", txd)
+
+ #no queue should have any messages accessible
+ for q in queues:
+ self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad count for %s" % (q))
+
+ self.assertEqual(self.XA_OK, session.dtx_commit(xid=txa, one_phase=True).status)
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=txb).status)
+ self.assertEqual(self.XA_OK, session.dtx_prepare(xid=txc).status)
+ self.assertEqual(self.XA_OK, session.dtx_prepare(xid=txd).status)
+
+ #further checks
+ not_empty = ["queue-a2", "queue-b1"]
+ for q in queues:
+ if q in not_empty:
+ self.assertEqual(1, session.queue_query(queue=q).message_count, "Bad count for %s" % (q))
+ else:
+ self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad count for %s" % (q))
+
+
+ def phase6(self):
+ session = self.session
+
+ #check prepared transaction are reported correctly by recover
+ txc = self.xid('c')
+ txd = self.xid('d')
+
+ xids = session.dtx_recover().in_doubt
+ ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these
+
+ if txc.global_id not in ids:
+ self.fail("Recovered xids not as expected. missing: %s" % (txc))
+ if txd.global_id not in ids:
+ self.fail("Recovered xids not as expected. missing: %s" % (txd))
+ self.assertEqual(2, len(xids))
+
+
+ queues = ["queue-a1", "queue-a2", "queue-b1", "queue-b2", "queue-c1", "queue-c2", "queue-d1", "queue-d2"]
+ not_empty = ["queue-a2", "queue-b1"]
+
+ #re-check
+ not_empty = ["queue-a2", "queue-b1"]
+ for q in queues:
+ if q in not_empty:
+ self.assertEqual(1, session.queue_query(queue=q).message_count, "Bad count for %s" % (q))
+ else:
+ self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad count for %s" % (q))
+
+ #complete the prepared transactions
+ self.assertEqual(self.XA_OK, session.dtx_commit(xid=txc).status)
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=txd).status)
+ not_empty.append("queue-c2")
+ not_empty.append("queue-d1")
+
+ for q in queues:
+ if q in not_empty:
+ self.assertEqual(1, session.queue_query(queue=q).message_count)
+ else:
+ self.assertEqual(0, session.queue_query(queue=q).message_count)
+
+ def phase7(self):
+ session = self.session
+ session.synchronous = False
+
+ # check xids from phase 6 are gone
+ txc = self.xid('c')
+ txd = self.xid('d')
+
+ xids = session.dtx_recover().in_doubt
+ ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these
+
+ if txc.global_id in ids:
+ self.fail("Xid still present : %s" % (txc))
+ if txd.global_id in ids:
+ self.fail("Xid still present : %s" % (txc))
+ self.assertEqual(0, len(xids))
+
+ #test deletion of queue after publish
+ #create queue
+ session.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+ #send message
+ for i in range(1, 10):
+ session.message_transfer(message=self.createMessage(routing_key = "q", body = "my-message"))
+
+ session.synchronous = True
+ #explicitly delete queue
+ session.queue_delete(queue = "q")
+
+ #test acking of message from auto-deleted queue
+ #create queue
+ session.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+ #send message
+ session.message_transfer(message=self.createMessage(routing_key = "q", body = "my-message"))
+
+ #create consumer
+ session.message_subscribe(queue = "q", destination = "a", accept_mode=0, acquire_mode=0)
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = 0, value = 10, destination = "a")
+ queue = session.incoming("a")
+
+ #consume the message, cancel subscription (triggering auto-delete), then ack it
+ msg = queue.get(timeout = 5)
+ session.message_cancel(destination = "a")
+ self.ack(msg)
+
+ #test implicit deletion of bindings when queue is deleted
+ session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
+ session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz")
+ session.message_transfer(destination= "amq.topic", message=self.createMessage(routing_key = "xyz", body = "my-message"))
+ session.queue_delete(queue = "durable-subscriber-queue")
+
+ #test unbind:
+ #create a series of bindings to a queue
+ session.queue_declare(queue = "binding-test-queue", durable=True)
+ session.exchange_bind(exchange="amq.direct", queue="binding-test-queue", binding_key="abc")
+ session.exchange_bind(exchange="amq.direct", queue="binding-test-queue", binding_key="pqr")
+ session.exchange_bind(exchange="amq.direct", queue="binding-test-queue", binding_key="xyz")
+ session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="a", arguments={"x-match":"all", "p":"a"})
+ session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="b", arguments={"x-match":"all", "p":"b"})
+ session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="c", arguments={"x-match":"all", "p":"c"})
+ #then restart broker...
+
+
+ def phase8(self):
+ session = self.session
+
+ #continue testing unbind:
+ #send messages to the queue via each of the bindings
+ for k in ["abc", "pqr", "xyz"]:
+ data = "first %s" % (k)
+ session.message_transfer(destination= "amq.direct", message=self.createMessage(routing_key=k, body=data))
+ for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
+ data = "first %s" % (a["p"])
+ session.message_transfer(destination="amq.match", message=self.createMessage(application_headers=a, body=data))
+ #unbind some bindings (using final 0-10 semantics)
+ session.exchange_unbind(exchange="amq.direct", queue="binding-test-queue", binding_key="pqr")
+ session.exchange_unbind(exchange="amq.match", queue="binding-test-queue", binding_key="b")
+ #send messages again
+ for k in ["abc", "pqr", "xyz"]:
+ data = "second %s" % (k)
+ session.message_transfer(destination= "amq.direct", message=self.createMessage(routing_key=k, body=data))
+ for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
+ data = "second %s" % (a["p"])
+ session.message_transfer(destination="amq.match", message=self.createMessage(application_headers=a, body=data))
+
+ #check that only the correct messages are received
+ expected = []
+ for k in ["abc", "pqr", "xyz"]:
+ expected.append("first %s" % (k))
+ for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
+ expected.append("first %s" % (a["p"]))
+ for k in ["abc", "xyz"]:
+ expected.append("second %s" % (k))
+ for a in [{"p":"a"}, {"p":"c"}]:
+ expected.append("second %s" % (a["p"]))
+
+ session.message_subscribe(queue = "binding-test-queue", destination = "binding-test")
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "binding-test")
+ session.message_flow(unit = 0, value = 10, destination = "binding-test")
+ queue = session.incoming("binding-test")
+
+ while len(expected):
+ msg = queue.get(timeout=1)
+ if msg.body not in expected:
+ self.fail("Missing message: %s" % msg.body)
+ expected.remove(msg.body)
+ try:
+ msg = queue.get(timeout=1)
+ self.fail("Got extra message: %s" % msg.body)
+ except Empty: pass
+
+
+
+ session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
+ session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz")
+ session.message_transfer(destination= "amq.topic", message=self.createMessage(routing_key = "xyz", body = "my-message"))
+ session.queue_delete(queue = "durable-subscriber-queue")
+
+
+ def xid(self, txid, branchqual = ''):
+ return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
+
+ def txswap(self, src, dest, tx):
+ self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)
+ self.session.message_subscribe(destination="temp-swap", queue=src, accept_mode=0)
+ self.session.message_flow(destination="temp-swap", unit=0, value=1)
+ self.session.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF)
+ msg = self.session.incoming("temp-swap").get(timeout=1)
+ self.session.message_cancel(destination="temp-swap")
+ self.session.message_transfer(message=self.createMessage(routing_key=dest, correlation_id=self.getProperty(msg, 'correlation_id'),
+ body=msg.body))
+ self.ack(msg)
+ self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status)
+
+ def assertEmptyQueue(self, name):
+ self.assertEqual(0, self.session.queue_query(queue=name).message_count)
+
+ def assertConnectionException(self, expectedCode, message):
+ self.assertEqual("connection", message.method.klass.name)
+ self.assertEqual("close", message.method.name)
+ self.assertEqual(expectedCode, message.reply_code)
+
+ def assertExpectedMethod(self, reply, klass, method):
+ self.assertEqual(klass, reply.method.klass.name)
+ self.assertEqual(method, reply.method.name)
+
+ def assertExpectedContent(self, msg, id, body):
+ self.assertEqual(id, self.getProperty(msg, 'correlation_id'))
+ self.assertEqual(body, msg.body)
+ return msg
+
+ def getProperty(self, msg, name):
+ for h in msg.headers:
+ if hasattr(h, name): return getattr(h, name)
+ return None
+
+ def ack(self, *msgs):
+ session = self.session
+ set = RangedSet()
+ for m in msgs:
+ set.add(m.id)
+ #TODO: tidy up completion
+ session.receiver._completed.add(m.id)
+ session.message_accept(set)
+ session.channel.session_completed(session.receiver._completed)
+
+ def assertExpectedGetResult(self, id, body):
+ return self.assertExpectedContent(session.incoming("incoming-gets").get(timeout=1), id, body)
+
+ def assertEqual(self, expected, actual, msg=''):
+ if expected != actual: raise Exception("%s expected: %s actual: %s" % (msg, expected, actual))
+
+ def assertMessageOnQueue(self, queue, id, body):
+ self.session.message_subscribe(destination="incoming-gets", queue=queue, accept_mode=0)
+ self.session.message_flow(destination="incoming-gets", unit=0, value=1)
+ self.session.message_flow(destination="incoming-gets", unit=1, value=0xFFFFFFFF)
+ msg = self.session.incoming("incoming-gets").get(timeout=1)
+ self.assertExpectedContent(msg, id, body)
+ self.ack(msg)
+ self.session.message_cancel(destination="incoming-gets")
+
+
+ def __init__(self):
+ TestBase010.__init__(self, "run")
+ self.setBroker("localhost")
+ self.errata = []
+
+ def connect(self):
+ """ Connects to the broker """
+ self.conn = Connection(connect(self.host, self.port))
+ self.conn.start(timeout=10)
+ self.session = self.conn.session("test-session", timeout=10)
+
+ def run(self, args=sys.argv[1:]):
+ try:
+ opts, extra = getopt(args, "r:s:e:b:p:h", ["retry=", "spec=", "errata=", "broker=", "phase=", "help"])
+ except GetoptError, e:
+ self._die(str(e))
+ phase = 0
+ retry = 0;
+ for opt, value in opts:
+ if opt in ("-h", "--help"): self._die()
+ if opt in ("-s", "--spec"): self.spec = value
+ if opt in ("-e", "--errata"): self.errata.append(value)
+ if opt in ("-b", "--broker"): self.setBroker(value)
+ if opt in ("-p", "--phase"): phase = int(value)
+ if opt in ("-r", "--retry"): retry = int(value)
+
+ if not phase: self._die("please specify the phase to run")
+ phase = "phase%d" % phase
+ self.connect()
+
+ try:
+ getattr(self, phase)()
+ print phase, "succeeded"
+ res = True;
+ except Exception, e:
+ print phase, "failed: ", e
+ traceback.print_exc()
+ res = False
+
+
+ if not self.session.error(): self.session.close(timeout=10)
+ self.conn.close(timeout=10)
+
+ # Crude fix to wait for thread in client to exit after return from session_close()
+ # Reduces occurrences of "Unhandled exception in thread" messages after each test
+ import time
+ time.sleep(1)
+
+ return res
+
+
+ def setBroker(self, broker):
+ rex = re.compile(r"""
+ # [ <user> [ / <password> ] @] <host> [ :<port> ]
+ ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X)
+ match = rex.match(broker)
+ if not match: self._die("'%s' is not a valid broker" % (broker))
+ self.user, self.password, self.host, self.port = match.groups()
+ self.port = int(default(self.port, 5672))
+ self.user = default(self.user, "guest")
+ self.password = default(self.password, "guest")
+
+ def _die(self, message = None):
+ if message: print message
+ print """
+Options:
+ -h/--help : this message
+ -s/--spec <spec.xml> : file containing amqp XML spec
+ -p/--phase : test phase to run
+ -b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to
+ """
+ sys.exit(1)
+
+def default(value, default):
+ if (value == None): return default
+ else: return value
+
+if __name__ == "__main__":
+ test = PersistenceTest()
+ if not test.run(): sys.exit(1)
diff --git a/qpid/cpp/src/tests/legacystore/run_long_python_tests b/qpid/cpp/src/tests/legacystore/run_long_python_tests
new file mode 100644
index 0000000000..e43b2236ec
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/run_long_python_tests
@@ -0,0 +1,21 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+./run_python_tests LONG_TEST
diff --git a/qpid/cpp/src/tests/legacystore/run_python_tests b/qpid/cpp/src/tests/legacystore/run_python_tests
new file mode 100644
index 0000000000..d9dec16963
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/run_python_tests
@@ -0,0 +1,64 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+if test -z ${QPID_DIR} ; then
+ cat <<EOF
+
+ =========== WARNING: PYTHON TESTS DISABLED ==============
+
+ QPID_DIR not set.
+
+ ===========================================================
+
+EOF
+ exit
+fi
+
+. `dirname $0`/tests_env.sh
+
+MODULENAME=python_tests
+
+echo "Running Python tests in module ${MODULENAME}..."
+
+case x$1 in
+ xSHORT_TEST)
+ DEFAULT_PYTHON_TESTS="*.client_persistence.ExchangeQueueTests.* *.flow_to_disk.SimpleMaxSizeCountTest.test_browse_recover *.flow_to_disk.SimpleMaxSizeCountTest.test_durable_browse_recover *.flow_to_disk.MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest.test_mixed_limit_2" ;;
+ xLONG_TEST)
+ DEFAULT_PYTHON_TESTS= ;;
+ x)
+ DEFAULT_PYTHON_TESTS="*.client_persistence.* *.flow_to_disk.SimpleMaxSizeCountTest.* *.flow_to_disk.MultiDurableQueue*.test_mixed_limit_1 *.flow_to_disk.MultiQueue*.test_mixed_limit_1 *.resize.SimpleTest.* *.federation.*" ;;
+ *)
+ DEFAULT_PYTHON_TESTS=$1
+esac
+
+PYTHON_TESTS=${PYTHON_TESTS:-${DEFAULT_PYTHON_TESTS}}
+
+OUTDIR=${MODULENAME}.tmp
+rm -rf $OUTDIR
+
+# To debug a test, add the following options to the end of the following line:
+# -v DEBUG -c qpid.messaging.io.ops [*.testName]
+${PYTHON_DIR}/qpid-python-test -m ${MODULENAME} -I ${FAILING_PYTHON_TESTS} ${PYTHON_TESTS} -DOUTDIR=$OUTDIR #-v DEBUG
+RETCODE=$?
+
+if test x${RETCODE} != x0; then
+ exit 1;
+fi
+exit 0
diff --git a/qpid/cpp/src/tests/legacystore/run_short_python_tests b/qpid/cpp/src/tests/legacystore/run_short_python_tests
new file mode 100644
index 0000000000..523924fdba
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/run_short_python_tests
@@ -0,0 +1,21 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+./run_python_tests SHORT_TEST
diff --git a/qpid/cpp/src/tests/legacystore/run_test b/qpid/cpp/src/tests/legacystore/run_test
new file mode 100644
index 0000000000..1d5c2ae407
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/run_test
@@ -0,0 +1,69 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set up environment and run a test executable or script.
+#
+# Output nothing if test passes, show the output if it fails and
+# leave output in <test>.log for examination.
+#
+# If qpidd.port exists run test with QPID_PORT=`cat qpidd.port`
+#
+# If $VALGRIND if is set run under valgrind. If there are valgrind
+# erros show valgrind output, also leave it in <test>.valgrind for
+# examination.
+#
+
+source `dirname $0`/vg_check
+
+# Export variables from makefile.
+export VALGRIND srcdir
+
+# Export QPID_PORT if qpidd.port exists.
+test -f qpidd.port && export QPID_PORT=`cat qpidd.port`
+
+# Avoid silly libtool error messages if these are not defined
+test -z "$LC_ALL" && export LC_ALL=
+test -z "$LC_CTYPE" && export LC_CTYPE=
+test -z "$LC_COLLATE" && export LC_COLLATE=
+test -z "$LC_MESSAGES" && export LC_MESSAGES=
+
+VG_LOG="$1.vglog"
+rm -f $VG_LOG*
+
+if grep -l "^# Generated by .*libtool" "$1" >/dev/null 2>&1; then
+ # This is a libtool "executable". Valgrind it if VALGRIND specified.
+ test -n "$VALGRIND" && VALGRIND="$VALGRIND --log-file=$VG_LOG --"
+ # Hide output unless there's an error.
+ libtool --mode=execute $VALGRIND "$@" 2>&1 || ERROR=$?
+ test -n "$VALGRIND" && vg_check $VG_LOG*
+else
+ # This is a non-libtool shell script, just execute it.
+ export VALGRIND srcdir
+ exec "$@"
+fi
+
+if test -z "$ERROR"; then
+ # Clean up logs if there was no error.
+ rm -f $VG_LOG*
+ exit 0
+else
+ exit $ERROR
+fi
diff --git a/qpid/cpp/src/tests/legacystore/start_broker b/qpid/cpp/src/tests/legacystore/start_broker
new file mode 100644
index 0000000000..30e4659030
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/start_broker
@@ -0,0 +1,25 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+QPIDD=$QPID_BLD/src/qpidd
+rm -f qpidd.vglog* qpidd.log
+test -n "$VALGRIND" && VALGRIND="$VALGRIND --log-file=qpidd.vglog --suppressions=$QPID_DIR/cpp/src/tests/.valgrind.supp --"
+exec libtool --mode=execute $VALGRIND $QPIDD --daemon --port=0 --log-enable error+ --log-to-file qpidd.log "$@" > qpidd.port
diff --git a/qpid/cpp/src/tests/legacystore/stop_broker b/qpid/cpp/src/tests/legacystore/stop_broker
new file mode 100644
index 0000000000..dcefff376f
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/stop_broker
@@ -0,0 +1,46 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Stop the broker, check for errors.
+#
+if test -f qpidd.port; then
+ export QPID_PORT=`cat qpidd.port`
+ QPIDD=$QPID_BLD/src/qpidd
+ rm -f qpidd.port
+
+ $QPIDD --quit || ERROR=$?
+
+ # Check qpidd.log.
+ grep -a 'warning\|error\|critical' qpidd.log && {
+ echo "WARNING: Suspicious broker log entries in qpidd.log, above."
+ }
+
+ # Check valgrind log.
+ if test -n "$VALGRIND"; then
+ source `dirname $0`/vg_check $VG_LOG*
+ vg_check qpidd.vglog*
+ fi
+
+ exit $ERROR
+else
+ echo "No qpidd.port file found - cannot stop broker."
+ exit 1;
+fi
diff --git a/qpid/cpp/src/tests/legacystore/system_test.sh b/qpid/cpp/src/tests/legacystore/system_test.sh
new file mode 100644
index 0000000000..4cccc5ac8d
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/system_test.sh
@@ -0,0 +1,51 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+error() { echo $*; exit 1; }
+
+# Make sure $QPID_DIR contains what we need.
+if ! test -d "$QPID_DIR" ; then
+ echo "WARNING: QPID_DIR is not set skipping system tests."
+ exit
+fi
+STORE_LIB=../lib/.libs/msgstore.so
+
+xml_spec=$QPID_DIR/specs/amqp.0-10-qpid-errata.xml
+test -f $xml_spec || error "$xml_spec not found: invalid \$QPID_DIR ?"
+export PYTHONPATH=$QPID_DIR/python:$QPID_DIR/extras/qmf/src/py:$QPID_DIR/tools/src/py
+
+echo "Using directory $TMP_DATA_DIR"
+
+fail=0
+
+# Run the tests with a given set of flags
+BROKER_OPTS="--no-module-dir --load-module=$STORE_LIB --data-dir=$TMP_DATA_DIR --auth=no --wcache-page-size 16"
+run_tests() {
+ for p in `seq 1 8`; do
+ $abs_srcdir/start_broker "$@" ${BROKER_OPTS} || { echo "FAIL broker start"; return 1; }
+ python "$abs_srcdir/persistence.py" -s "$xml_spec" -b localhost:`cat qpidd.port` -p $p -r 3 || fail=1;
+ $abs_srcdir/stop_broker
+ done
+}
+
+run_tests || fail=1
+
+exit $fail
diff --git a/qpid/cpp/src/tests/legacystore/tests_env.sh b/qpid/cpp/src/tests/legacystore/tests_env.sh
new file mode 100644
index 0000000000..30d255b87c
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/tests_env.sh
@@ -0,0 +1,260 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# --- Function definitions ---
+
+
+func_check_required_env ()
+#-------------------------
+# Check that EITHER:
+# QPID_DIR is set (for running against svn QPID)
+# OR
+# QPID_PREFIX is set (for running against installed QPID
+# Will exit with error code 1 if neither of these is defined.
+# Params: None
+# Returns: 0 if env vars ok, 1 otherwise
+{
+ if test -z "${QPID_DIR}" -a -z "${QPID_PREFIX}"; then
+ # Try to find qpidd in the normal installed location
+ if test -x /usr/sbin/qpidd; then
+ QPID_PREFIX=/usr
+ else
+ echo "ERROR: Could not find installed Qpid"
+ echo "Either of the following must be set in the environment for this script to run:"
+ echo " QPID_DIR for running against a Qpid svn build"
+ echo " QPID_PREFIX for running against an installed Qpid"
+ return 1
+ fi
+ fi
+ return 0
+}
+
+
+func_check_qpid_python ()
+#------------------------
+# Check that Qpid python environment is ok
+# Params: None
+# Returns: 0 if Python environment is ok; 1 otherwise
+{
+ if ! python -c "import qpid" ; then
+ cat <<EOF
+
+ =========== WARNING: PYTHON TESTS DISABLED ==============
+
+ Unable to load python qpid module - skipping python tests.
+
+ PYTHONPATH=${PYTHONPATH}
+
+ ===========================================================
+
+EOF
+ return 1
+ fi
+ return 0
+}
+
+
+func_set_env ()
+#--------------
+# Set up the environment based on value of ${QPID_DIR}: if ${QPID_DIR} exists, assume a svn checkout,
+# otherwise set up for an installed or prefix test.
+# Params: None
+# Returns: Nothing
+{
+ if test "${QPID_DIR}" -a -d "${QPID_DIR}" ; then
+ # QPID_DIR is defined for source tree builds by the --with-qpid-checkout configure option.
+ # QPID_BLD is defined as the build directory, either $QPID_DIR/cpp or separately specified with
+ # the --with-qpid-build option for VPATH builds.
+
+ # Check QPID_BLD is also set
+ if test -z ${QPID_BLD}; then
+ QPID_BLD="${QPID_DIR}/cpp"
+ fi
+
+ # Paths and dirs
+ #if test -z ${abs_srcdir}; then
+ # abs_srcdir=`pwd`
+ #fi
+ source $QPID_BLD/src/tests/test_env.sh
+ # Override these two settings from test_env.sh:
+ export RECEIVER_EXEC=$QPID_TEST_EXEC_DIR/qpid-receive
+ export SENDER_EXEC=$QPID_TEST_EXEC_DIR/qpid-send
+
+ echo "abs_srcdir=$abs_srcdir"
+ export STORE_LIB="`pwd`/../lib/.libs/msgstore.so"
+ export STORE_ENABLE=1
+ export CLUSTER_LIB="${QPID_BLD}/src/.libs/cluster.so"
+
+ PYTHON_DIR="${QPID_DIR}/python"
+ export PYTHONPATH="${PYTHONPATH}":"${PYTHON_DIR}":"${QPID_DIR}/extras/qmf/src/py":"${QPID_DIR}/tools/src/py":"${QPID_DIR}/cpp/src/tests":"${abs_srcdir}"
+
+ # Libraries
+
+ # Executables
+ export QPIDD_EXEC="${QPID_BLD}/src/qpidd"
+
+ # Test data
+
+ else
+ # Set up the environment based on value of ${QPID_PREFIX} for testing against an installed qpid
+ # Alternatively, make sure ${QPID_BIN_DIR}, ${QPID_SBIN_DIR}, ${QPID_LIB_DIR} and ${QPID_LIBEXEC_DIR} are set for
+ # the installed location.
+ if test "${QPID_PREFIX}" -a -d "${QPID_PREFIX}" ; then
+ QPID_BIN_DIR=${QPID_PREFIX}/bin
+ QPID_SBIN_DIR=${QPID_PREFIX}/sbin
+ QPID_LIB_DIR=${QPID_PREFIX}/lib
+ QPID_LIBEXEC_DIR=${QPID_PREFIX}/libexec
+ fi
+
+ # These four env vars must be set prior to calling this script
+ func_checkpaths QPID_BIN_DIR QPID_SBIN_DIR QPID_LIB_DIR QPID_LIBEXEC_DIR
+
+ # Paths and dirs
+ export PYTHON_DIR="${QPID_BIN_DIR}"
+ export PYTHONPATH="${PYTHONPATH}":"${QPID_LIB_DIR}/python":"${QPID_LIBEXEC_DIR}/qpid/tests":"${QPID_LIB_DIR}/python2.4"
+
+
+ # Libraries
+
+ # Executables
+ export QPIDD_EXEC="${QPID_SBIN_DIR}/qpidd"
+
+ # Test Data
+
+ fi
+}
+
+
+func_mk_data_dir ()
+#------------------
+# Create a data dir at ${TMP_DATA_DIR} if not present, clear it otherwise.
+# Set TMP_DATA_DIR if it is not set.
+# Params: None
+# Returns: Nothing
+{
+ if test -z "${TMP_DATA_DIR}"; then
+ TMP_DATA_DIR=/tmp/python_tests
+ echo "TMP_DATA_DIR not set; using ${TMP_DATA_DIR}"
+ fi
+
+ # Delete old test dirs if they exist
+ if test -d "${TMP_DATA_DIR}" ; then
+ rm -rf "${TMP_DATA_DIR}/*"
+ fi
+ mkdir -p "${TMP_DATA_DIR}"
+ export TMP_DATA_DIR
+}
+
+
+func_checkvar ()
+#---------------
+# Check that an environment var is set (ie non-zero length)
+# Params: $1 - env var to be checked
+# Returns: 0 = env var is set (ie non-zero length)
+# 1 = env var is not set
+{
+ local loc_VAR=$1
+ if test -z ${!loc_VAR}; then
+ echo "WARNING: environment variable ${loc_VAR} not set."
+ return 1
+ fi
+ return 0
+}
+
+
+func_checkpaths ()
+#-----------------
+# Check a list of paths (each can contain ':'-separated sub-list) is set and valid (ie each path exists as a dir)
+# Params: $@ - List of path env vars to be checked
+# Returns: Nothing
+{
+ local loc_PATHS=$@
+ for path in ${loc_PATHS}; do
+ func_checkvar ${path}
+ if test $? == 0; then
+ local temp_IFS=${IFS}
+ IFS=":"
+ local pl=${!path}
+ for p in ${pl[@]}; do
+ if test ! -d ${p}; then
+ echo "WARNING: Directory ${p} in var ${path} not found."
+ fi
+ done
+ IFS=${temp_IFS}
+ fi
+ done
+}
+
+
+func_checklibs ()
+#----------------
+# Check that a list of libs is set and valid (ie each lib exists as an executable file)
+# Params: $@ - List of lib values to be checked
+# Returns: Nothing
+{
+ local loc_LIBS=$@
+ for lib in ${loc_LIBS[@]}; do
+ func_checkvar ${lib}
+ if test $? == 0; then
+ if test ! -x ${!lib}; then
+ echo "WARNING: Library ${lib}=${!lib} not found."
+ fi
+ fi
+ done
+}
+
+
+func_checkexecs ()
+#-----------------
+# Check that a list of executable is set and valid (ie each exec exists as an executable file)
+# Params: $@ - List of exec values to be checked
+# Returns: Nothing
+{
+ local loc_EXECS=$@
+ for exec in ${loc_EXECS[@]}; do
+ func_checkvar ${exec}
+ if test $? == 0; then
+ if test ! -x ${!exec}; then
+ echo "WARNING: Executable ${exec}=${!exec} not found or is not executable."
+ fi
+ fi
+ done
+}
+
+
+#--- Start of script ---
+
+func_check_required_env || exit 1 # Cannot run, exit with error
+
+srcdir=`dirname $0`
+if test -z ${abs_srcdir}; then
+ abs_srcdir=${srcdir}
+fi
+
+func_set_env
+func_check_qpid_python || exit 0 # A warning, not a failure.
+func_mk_data_dir
+
+# Check expected environment vars are set
+func_checkpaths PYTHON_DIR PYTHONPATH TMP_DATA_DIR
+func_checklibs STORE_LIB CLUSTER_LIB
+func_checkexecs QPIDD_EXEC QPID_CONFIG_EXEC QPID_ROUTE_EXEC SENDER_EXEC RECEIVER_EXEC
+
+FAILING_PYTHON_TESTS="${abs_srcdir}/failing_python_tests.txt"
+
diff --git a/qpid/cpp/src/tests/legacystore/unit_test.cpp b/qpid/cpp/src/tests/legacystore/unit_test.cpp
new file mode 100644
index 0000000000..add80a6f91
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/unit_test.cpp
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+// Defines test_main function to link with actual unit test code.
+#define BOOST_AUTO_TEST_MAIN // Boost 1.33
+#define BOOST_TEST_MAIN
+
+#include "unit_test.h"
+
diff --git a/qpid/cpp/src/tests/legacystore/unit_test.h b/qpid/cpp/src/tests/legacystore/unit_test.h
new file mode 100644
index 0000000000..16b6ae2ffb
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/unit_test.h
@@ -0,0 +1,69 @@
+#ifndef QPIPD_TEST_UNIT_TEST_H_
+#define QPIPD_TEST_UNIT_TEST_H_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+// Workaround so we can build against boost 1.32, 1.33 and boost 1.34.
+// Remove when we no longer need to support 1.32 or 1.33.
+
+#include <boost/version.hpp>
+
+#if (BOOST_VERSION < 103400) // v.1.33 and earlier
+# include <boost/test/auto_unit_test.hpp>
+#else // v.1.34 and later
+# include <boost/test/unit_test.hpp>
+#endif
+
+// Keep the test function for compilation but do not not register it.
+// TODO aconway 2008-04-23: better workaround for expected failures.
+// The following causes the test testUpdateTxState not to run at all.
+# define QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(test_name,n) \
+ namespace { struct test_name { void test_method(); }; } \
+ void test_name::test_method()
+// The following runs the test testUpdateTxState, but it fails.
+/*#define QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(test_name,n) \
+ namespace { struct test_name { void test_method(); }; } \
+ BOOST_AUTO_TEST_CASE(name)*/
+
+#if (BOOST_VERSION < 103300) // v.1.32 and earlier
+
+# define QPID_AUTO_TEST_SUITE(name)
+# define QPID_AUTO_TEST_CASE(name) BOOST_AUTO_UNIT_TEST(name)
+# define QPID_AUTO_TEST_SUITE_END()
+
+#elif (BOOST_VERSION < 103400) // v.1.33
+
+// Note the trailing ';'
+# define QPID_AUTO_TEST_SUITE(name) BOOST_AUTO_TEST_SUITE(name);
+# define QPID_AUTO_TEST_CASE(name) BOOST_AUTO_TEST_CASE(name)
+# define QPID_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END();
+
+#else // v.1.34 and later
+
+# define QPID_AUTO_TEST_SUITE(name) BOOST_AUTO_TEST_SUITE(name)
+# define QPID_AUTO_TEST_CASE(name) BOOST_AUTO_TEST_CASE(name)
+# define QPID_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END()
+
+#endif
+
+#endif /*!QPIPD_TEST_UNIT_TEST_H_*/