diff options
Diffstat (limited to 'qpid/cpp/src/tests')
43 files changed, 1086 insertions, 1302 deletions
diff --git a/qpid/cpp/src/tests/BrokerFixture.h b/qpid/cpp/src/tests/BrokerFixture.h index bc23867ee1..b7b0f9d34b 100644 --- a/qpid/cpp/src/tests/BrokerFixture.h +++ b/qpid/cpp/src/tests/BrokerFixture.h @@ -42,11 +42,45 @@ namespace tests { struct BrokerFixture : private boost::noncopyable { typedef qpid::broker::Broker Broker; typedef boost::intrusive_ptr<Broker> BrokerPtr; + typedef std::vector<std::string> Args; BrokerPtr broker; + uint16_t port; qpid::sys::Thread brokerThread; - BrokerFixture(Broker::Options opts=Broker::Options(), bool enableMgmt=false) { + BrokerFixture(const Args& args=Args(), const Broker::Options& opts=Broker::Options(), + bool isExternalPort_=false, uint16_t externalPort_=0) + { + init(args, opts, isExternalPort_, externalPort_); + } + + BrokerFixture(const Broker::Options& opts, + bool isExternalPort_=false, uint16_t externalPort_=0) + { + init(Args(), opts, isExternalPort_, externalPort_); + } + + void shutdownBroker() { + if (broker) { + broker->shutdown(); + brokerThread.join(); + broker = BrokerPtr(); + } + } + + ~BrokerFixture() { shutdownBroker(); } + + /** Open a connection to the broker. */ + void open(qpid::client::Connection& c) { + c.open("localhost", getPort()); + } + + uint16_t getPort() { return port; } + + private: + void init(const Args& args, Broker::Options opts, + bool isExternalPort=false, uint16_t externalPort=0) + { // Keep the tests quiet unless logging env. vars have been set by user. if (!::getenv("QPID_LOG_ENABLE") && !::getenv("QPID_TRACE")) { qpid::log::Options logOpts; @@ -55,38 +89,28 @@ struct BrokerFixture : private boost::noncopyable { logOpts.selectors.push_back("error+"); qpid::log::Logger::instance().configure(logOpts); } + // Default options, may be over-ridden when we parse args. opts.port=0; opts.listenInterfaces.push_back("127.0.0.1"); - // Management doesn't play well with multiple in-process brokers. - opts.enableMgmt=enableMgmt; opts.workerThreads=1; opts.dataDir=""; opts.auth=false; + + // Argument parsing + std::vector<const char*> argv(args.size()); + std::transform(args.begin(), args.end(), argv.begin(), + boost::bind(&std::string::c_str, _1)); + Plugin::addOptions(opts); + opts.parse(argv.size(), &argv[0]); broker = Broker::create(opts); // TODO aconway 2007-12-05: At one point BrokerFixture // tests could hang in Connection ctor if the following // line is removed. This may not be an issue anymore. broker->accept(); - broker->getPort(qpid::broker::Broker::TCP_TRANSPORT); + if (isExternalPort) port = externalPort; + else port = broker->getPort(qpid::broker::Broker::TCP_TRANSPORT); brokerThread = qpid::sys::Thread(*broker); }; - - void shutdownBroker() { - if (broker) { - broker->shutdown(); - brokerThread.join(); - broker = BrokerPtr(); - } - } - - ~BrokerFixture() { shutdownBroker(); } - - /** Open a connection to the broker. */ - void open(qpid::client::Connection& c) { - c.open("localhost", broker->getPort(qpid::broker::Broker::TCP_TRANSPORT)); - } - - uint16_t getPort() { return broker->getPort(qpid::broker::Broker::TCP_TRANSPORT); } }; /** Connection that opens in its constructor */ @@ -125,8 +149,8 @@ template <class ConnectionType, class SessionType=qpid::client::Session> struct SessionFixtureT : BrokerFixture, ClientT<ConnectionType,SessionType> { SessionFixtureT(Broker::Options opts=Broker::Options()) : - BrokerFixture(opts), - ClientT<ConnectionType,SessionType>(broker->getPort(qpid::broker::Broker::TCP_TRANSPORT)) + BrokerFixture(BrokerFixture::Args(), opts), + ClientT<ConnectionType,SessionType>(getPort()) {} }; diff --git a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp index 29c3faf809..bad7e768a6 100644 --- a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp +++ b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp @@ -63,6 +63,7 @@ class AgentFixture qpid::broker::Broker::Options opts = qpid::broker::Broker::Options()) { opts.enableMgmt=true; + opts.qmf1Support=!qmfV2; opts.qmf2Support=qmfV2; opts.mgmtPubInterval=pubInterval; mFix = new MessagingFixture(opts, true); diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt index 4763bc0b80..281464e65a 100644 --- a/qpid/cpp/src/tests/CMakeLists.txt +++ b/qpid/cpp/src/tests/CMakeLists.txt @@ -108,7 +108,6 @@ set(all_unit_tests ClientMessage ClientMessageTest ClientSessionTest - ConsoleTest DeliveryRecordTest DtxWorkRecordTest exception_test @@ -147,6 +146,7 @@ set(all_unit_tests TimerTest TopicExchangeTest TxBufferTest + TransactionObserverTest Url Uuid Variant @@ -171,7 +171,7 @@ add_executable (unit_test unit_test ${actual_unit_tests} ${platform_test_additions}) target_link_libraries (unit_test ${qpid_test_boost_libs} - qpidmessaging qpidbroker qmfconsole) + qpidmessaging qpidtypes qpidbroker qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") set_target_properties (unit_test PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER) remember_location(unit_test) @@ -191,105 +191,104 @@ endif (BUILD_SASL) # Other test programs # add_executable (qpid-perftest qpid-perftest.cpp ${platform_test_additions}) -target_link_libraries (qpid-perftest qpidclient) +target_link_libraries (qpid-perftest qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") #qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h remember_location(qpid-perftest) add_executable (qpid-txtest qpid-txtest.cpp ${platform_test_additions}) -target_link_libraries (qpid-txtest qpidclient) +target_link_libraries (qpid-txtest qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") #qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h remember_location(qpid-txtest) add_executable (qpid-latency-test qpid-latency-test.cpp ${platform_test_additions}) -target_link_libraries (qpid-latency-test qpidclient) +target_link_libraries (qpid-latency-test qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") #qpid_latencytest_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h remember_location(qpid-latency-test) add_executable (echotest echotest.cpp ${platform_test_additions}) -target_link_libraries (echotest qpidclient) +target_link_libraries (echotest qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") #echotest_SOURCES=echotest.cpp TestOptions.h ConnectionOptions.h remember_location(echotest) add_executable (qpid-client-test qpid-client-test.cpp ${platform_test_additions}) -target_link_libraries (qpid-client-test qpidclient) +target_link_libraries (qpid-client-test qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") #qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h remember_location(qpid-client-test) add_executable (qpid-topic-listener qpid-topic-listener.cpp ${platform_test_additions}) -target_link_libraries (qpid-topic-listener qpidclient) +target_link_libraries (qpid-topic-listener qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") #qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h remember_location(qpid-topic-listener) add_executable (qpid-topic-publisher qpid-topic-publisher.cpp ${platform_test_additions}) -target_link_libraries (qpid-topic-publisher qpidclient) +target_link_libraries (qpid-topic-publisher qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") #qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h remember_location(qpid-topic-publisher) add_executable (publish publish.cpp ${platform_test_additions}) -target_link_libraries (publish qpidclient) +target_link_libraries (publish qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") #publish_SOURCES=publish.cpp TestOptions.h ConnectionOptions.h remember_location(publish) add_executable (consume consume.cpp ${platform_test_additions}) -target_link_libraries (consume qpidclient) +target_link_libraries (consume qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") #consume_SOURCES=consume.cpp TestOptions.h ConnectionOptions.h remember_location(consume) add_executable (header_test header_test.cpp ${platform_test_additions}) -target_link_libraries (header_test qpidclient) +target_link_libraries (header_test qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") #header_test_SOURCES=header_test.cpp TestOptions.h ConnectionOptions.h remember_location(header_test) add_executable (declare_queues declare_queues.cpp ${platform_test_additions}) -target_link_libraries (declare_queues qpidclient) +target_link_libraries (declare_queues qpidclient qpidcommon) remember_location(declare_queues) add_executable (replaying_sender replaying_sender.cpp ${platform_test_additions}) -target_link_libraries (replaying_sender qpidclient) +target_link_libraries (replaying_sender qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") remember_location(replaying_sender) add_executable (resuming_receiver resuming_receiver.cpp ${platform_test_additions}) -target_link_libraries (resuming_receiver qpidclient) +target_link_libraries (resuming_receiver qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") remember_location(resuming_receiver) add_executable (txshift txshift.cpp ${platform_test_additions}) -target_link_libraries (txshift qpidclient) +target_link_libraries (txshift qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") #txshift_SOURCES=txshift.cpp TestOptions.h ConnectionOptions.h remember_location(txshift) add_executable (txjob txjob.cpp ${platform_test_additions}) -target_link_libraries (txjob qpidclient) +target_link_libraries (txjob qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") #txjob_SOURCES=txjob.cpp TestOptions.h ConnectionOptions.h remember_location(txjob) add_executable (receiver receiver.cpp ${platform_test_additions}) -target_link_libraries (receiver qpidclient) -#receiver_SOURCES=receiver.cpp TestOptions.h ConnectionOptions.h +target_link_libraries (receiver qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") remember_location(receiver) +# This is bizarre - using both messaging and client libraries add_executable (sender sender.cpp Statistics.cpp ${platform_test_additions}) -target_link_libraries (sender qpidmessaging) -#sender_SOURCES=sender.cpp TestOptions.h ConnectionOptions.h +target_link_libraries (sender qpidmessaging qpidtypes qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") remember_location(sender) add_executable (qpid-receive qpid-receive.cpp Statistics.cpp ${platform_test_additions}) -target_link_libraries (qpid-receive qpidmessaging) +target_link_libraries (qpid-receive qpidmessaging qpidtypes qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") remember_location(qpid-receive) add_executable (qpid-send qpid-send.cpp Statistics.cpp ${platform_test_additions}) -target_link_libraries (qpid-send qpidmessaging) +target_link_libraries (qpid-send qpidmessaging qpidtypes qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") remember_location(qpid-send) add_executable (qpid-ping qpid-ping.cpp ${platform_test_additions}) -target_link_libraries (qpid-ping qpidclient) +target_link_libraries (qpid-ping qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") remember_location(qpid-ping) add_executable (datagen datagen.cpp ${platform_test_additions}) -target_link_libraries (datagen qpidclient) +target_link_libraries (datagen qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") remember_location(datagen) add_executable (msg_group_test msg_group_test.cpp ${platform_test_additions}) -target_link_libraries (msg_group_test qpidmessaging) +target_link_libraries (msg_group_test qpidmessaging qpidtypes qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") remember_location(msg_group_test) if (BUILD_SASL) @@ -357,6 +356,7 @@ if (PYTHON_EXECUTABLE) if (BUILD_AMQP) add_test (interlink_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/interlink_tests.py) endif (BUILD_AMQP) + add_test (swig_python_tests ${test_wrap} ${CMAKE_CURRENT_SOURCE_DIR}/swig_python_tests${test_script_suffix}) endif (PYTHON_EXECUTABLE) add_test (ipv6_test ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/ipv6_test${test_script_suffix}) add_test (federation_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_federation_tests${test_script_suffix}) @@ -378,24 +378,7 @@ add_test (queue_redirect ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_queue_redirect add_library(test_store MODULE test_store.cpp) target_link_libraries (test_store qpidbroker qpidcommon) -set_target_properties (test_store PROPERTIES - COMPILE_DEFINITIONS _IN_QPID_BROKER - PREFIX "") +set_target_properties (test_store PROPERTIES PREFIX "" COMPILE_DEFINITIONS _IN_QPID_BROKER) add_library (dlclose_noop MODULE dlclose_noop.c) -#libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir) - -#CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers) -# -## Longer running stability tests, not run by default check: target. -## Not run under valgrind, too slow -#LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest -#EXTRA_DIST+=$(LONG_TESTS) run_perftest -#check-long: -# $(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND= - -# -# legacystore -# -# add_subdirectory(legacystore) diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp index 2da96bf30c..2236882eed 100644 --- a/qpid/cpp/src/tests/ClientSessionTest.cpp +++ b/qpid/cpp/src/tests/ClientSessionTest.cpp @@ -37,6 +37,7 @@ #include <boost/lexical_cast.hpp> #include <boost/bind.hpp> #include <boost/ptr_container/ptr_vector.hpp> +#include <boost/format.hpp> #include <vector> diff --git a/qpid/cpp/src/tests/ConsoleTest.cpp b/qpid/cpp/src/tests/ConsoleTest.cpp deleted file mode 100644 index 107472ed9e..0000000000 --- a/qpid/cpp/src/tests/ConsoleTest.cpp +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/console/Package.h" -#include "qpid/console/ClassKey.h" -#include "unit_test.h" - -namespace qpid { -namespace tests { - -QPID_AUTO_TEST_SUITE(ConsoleTestSuite) - -using namespace qpid::framing; -using namespace qpid::console; - -QPID_AUTO_TEST_CASE(testClassKey) { - uint8_t hash[16] = {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15}; - ClassKey k("com.redhat.test", "class", hash); - - BOOST_CHECK_EQUAL(k.getPackageName(), "com.redhat.test"); - BOOST_CHECK_EQUAL(k.getClassName(), "class"); - BOOST_CHECK_EQUAL(k.getHashString(), "00010203-04050607-08090a0b-0c0d0e0f"); - BOOST_CHECK_EQUAL(k.str(), "com.redhat.test:class(00010203-04050607-08090a0b-0c0d0e0f)"); -} - -QPID_AUTO_TEST_SUITE_END() - -}} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am deleted file mode 100644 index 808d9f9731..0000000000 --- a/qpid/cpp/src/tests/Makefile.am +++ /dev/null @@ -1,410 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -AM_CXXFLAGS = $(WARNING_CFLAGS) -DBOOST_TEST_DYN_LINK -D_IN_QPID_BROKER -INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_srcdir)/src -I$(top_builddir)/src -PUBLIC_INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include # Use public API only -QMF_GEN=$(top_srcdir)/managementgen/qmf-gen - -abs_builddir=@abs_builddir@ -abs_srcdir=@abs_srcdir@ - -extra_libs = -lib_client = $(abs_builddir)/../libqpidclient.la -lib_messaging = $(abs_builddir)/../libqpidmessaging.la -lib_types = $(abs_builddir)/../libqpidtypes.la -lib_common = $(abs_builddir)/../libqpidcommon.la -lib_broker = $(abs_builddir)/../libqpidbroker.la -lib_console = $(abs_builddir)/../libqmfconsole.la -lib_qmf2 = $(abs_builddir)/../libqmf2.la -# lib_amqp_0_10 = $(abs_builddir)/../libqpidamqp_0_10.la - -# -# Initialize variables that are incremented with += -# -check_PROGRAMS= -check_LTLIBRARIES= -TESTS= -EXTRA_DIST= -CLEANFILES= -LONG_TESTS= -CLEAN_LOCAL= - -# -# Destination for intalled programs and tests defined here -# -qpidexecdir = $(libexecdir)/qpid -qpidexec_PROGRAMS = -qpidexec_SCRIPTS = -qpidexectestdir = $(qpidexecdir)/tests -qpidexectest_PROGRAMS = -qpidexectest_SCRIPTS = -tmoduledir = $(libdir)/qpid/tests -tmodule_LTLIBRARIES= - -# -# Unit test program -# -# Unit tests are built as a single program to reduce valgrind overhead -# when running the tests. If you want to build a subset of the tests do -# rm -f unit_test; make unit_test unit_test_OBJECTS="unit_test.o SelectedTest.o" -# - -TESTS+=unit_test -check_PROGRAMS+=unit_test -unit_test_LDADD=-lboost_unit_test_framework -lpthread \ - $(lib_messaging) $(lib_broker) $(lib_console) $(lib_qmf2) - -unit_test_SOURCES= unit_test.cpp unit_test.h \ - MessagingSessionTests.cpp \ - MessagingThreadTests.cpp \ - MessagingFixture.h \ - ClientSessionTest.cpp \ - BrokerFixture.h \ - exception_test.cpp \ - RefCounted.cpp \ - SessionState.cpp logging.cpp \ - AsyncCompletion.cpp \ - Url.cpp Uuid.cpp \ - Shlib.cpp FieldValue.cpp FieldTable.cpp Array.cpp \ - QueueOptionsTest.cpp \ - InlineAllocator.cpp \ - InlineVector.cpp \ - SequenceSet.cpp \ - StringUtils.cpp \ - RangeSet.cpp \ - AtomicValue.cpp \ - QueueTest.cpp \ - AccumulatedAckTest.cpp \ - DtxWorkRecordTest.cpp \ - DeliveryRecordTest.cpp \ - ExchangeTest.cpp \ - HeadersExchangeTest.cpp \ - MessageTest.cpp \ - QueueDepth.cpp \ - QueueRegistryTest.cpp \ - QueuePolicyTest.cpp \ - QueueFlowLimitTest.cpp \ - FramingTest.cpp \ - HeaderTest.cpp \ - SequenceNumberTest.cpp \ - TimerTest.cpp \ - TopicExchangeTest.cpp \ - TxBufferTest.cpp \ - ConnectionOptions.h \ - ManagementTest.cpp \ - MessageReplayTracker.cpp \ - ConsoleTest.cpp \ - ProxyTest.cpp \ - RetryList.cpp \ - FrameDecoder.cpp \ - ClientMessageTest.cpp \ - PollableCondition.cpp \ - Variant.cpp \ - Address.cpp \ - ClientMessage.cpp \ - Qmf2.cpp \ - BrokerOptions.cpp \ - Selector.cpp \ - SystemInfo.cpp - -if HAVE_XML -unit_test_SOURCES+= XmlClientSessionTest.cpp -endif - -TESTLIBFLAGS = -module -rpath $(abs_builddir) - -check_LTLIBRARIES += libshlibtest.la -libshlibtest_la_LDFLAGS = $(TESTLIBFLAGS) -libshlibtest_la_SOURCES = shlibtest.cpp - -tmodule_LTLIBRARIES += test_store.la -test_store_la_SOURCES = test_store.cpp -test_store_la_LIBADD = $(lib_broker) -test_store_la_LDFLAGS = -module - -include sasl.mk -if SSL -include ssl.mk -endif - -# Test programs that are installed and therefore built as part of make, not make check - -qpidexectest_SCRIPTS += qpid-cpp-benchmark qpid-cluster-benchmark install_env.sh qpidt -EXTRA_DIST += qpid-cpp-benchmark qpid-cluster-benchmark install_env.sh qpidt - -qpidexectest_PROGRAMS += receiver -receiver_SOURCES = \ - receiver.cpp \ - TestOptions.h \ - ConnectionOptions.h -receiver_LDADD = $(lib_client) -lboost_program_options $(lib_common) - -qpidexectest_PROGRAMS += sender -sender_SOURCES = \ - sender.cpp \ - TestOptions.h \ - ConnectionOptions.h \ - Statistics.cpp -sender_LDADD = $(lib_messaging) -lboost_program_options $(lib_common) $(lib_types) $(lib_client) - -qpidexectest_PROGRAMS += qpid-receive -qpid_receive_SOURCES = \ - qpid-receive.cpp \ - TestOptions.h \ - ConnectionOptions.h \ - Statistics.h \ - Statistics.cpp -qpid_receive_LDADD = $(lib_messaging) -lboost_program_options $(lib_common) $(lib_types) - -qpidexectest_PROGRAMS += qpid-send -qpid_send_SOURCES = \ - qpid-send.cpp \ - TestOptions.h \ - ConnectionOptions.h \ - Statistics.h \ - Statistics.cpp -qpid_send_LDADD = $(lib_messaging) -lboost_program_options $(lib_common) $(lib_types) - -qpidexectest_PROGRAMS+=qpid-perftest -qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h -qpid_perftest_INCLUDES=$(PUBLIC_INCLUDES) -qpid_perftest_LDADD=$(lib_client) -lboost_program_options $(lib_common) - -qpidexectest_PROGRAMS+=qpid-txtest -qpid_txtest_INCLUDES=$(PUBLIC_INCLUDES) -qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h -qpid_txtest_LDADD=$(lib_client) -lboost_program_options $(lib_common) - -qpidexectest_PROGRAMS+=qpid-latency-test -qpid_latency_test_INCLUDES=$(PUBLIC_INCLUDES) -qpid_latency_test_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h -qpid_latency_test_LDADD=$(lib_client) -lboost_program_options $(lib_common) - -qpidexectest_PROGRAMS+=qpid-client-test -qpid_client_test_INCLUDES=$(PUBLIC_INCLUDES) -qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h -qpid_client_test_LDADD=$(lib_client) -lboost_program_options $(lib_common) - -qpidexectest_PROGRAMS+=qpid-topic-listener -qpid_topic_listener_INCLUDES=$(PUBLIC_INCLUDES) -qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h -qpid_topic_listener_LDADD=$(lib_client) -lboost_program_options $(lib_common) - -qpidexectest_PROGRAMS+=qpid-topic-publisher -qpid_topic_publisher_INCLUDES=$(PUBLIC_INCLUDES) -qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h -qpid_topic_publisher_LDADD=$(lib_client) -lboost_program_options $(lib_common) - -qpidexectest_PROGRAMS+=qpid-ping -qpid_ping_INCLUDES=$(PUBLIC_INCLUDES) -qpid_ping_SOURCES=qpid-ping.cpp test_tools.h TestOptions.h ConnectionOptions.h -qpid_ping_LDADD=$(lib_client) -lboost_program_options $(lib_common) - -# -# Other test programs -# - -check_PROGRAMS+=echotest -echotest_INCLUDES=$(PUBLIC_INCLUDES) -echotest_SOURCES=echotest.cpp TestOptions.h ConnectionOptions.h -echotest_LDADD=$(lib_client) - -check_PROGRAMS+=publish -publish_INCLUDES=$(PUBLIC_INCLUDES) -publish_SOURCES=publish.cpp TestOptions.h ConnectionOptions.h -publish_LDADD=$(lib_client) - -check_PROGRAMS+=consume -consume_INCLUDES=$(PUBLIC_INCLUDES) -consume_SOURCES=consume.cpp TestOptions.h ConnectionOptions.h -consume_LDADD=$(lib_client) - -check_PROGRAMS+=header_test -header_test_INCLUDES=$(PUBLIC_INCLUDES) -header_test_SOURCES=header_test.cpp TestOptions.h ConnectionOptions.h -header_test_LDADD=$(lib_client) - -check_PROGRAMS+=declare_queues -declare_queues_INCLUDES=$(PUBLIC_INCLUDES) -declare_queues_SOURCES=declare_queues.cpp -declare_queues_LDADD=$(lib_client) - -check_PROGRAMS+=replaying_sender -replaying_sender_INCLUDES=$(PUBLIC_INCLUDES) -replaying_sender_SOURCES=replaying_sender.cpp -replaying_sender_LDADD=$(lib_client) - -check_PROGRAMS+=resuming_receiver -resuming_receiver_INCLUDES=$(PUBLIC_INCLUDES) -resuming_receiver_SOURCES=resuming_receiver.cpp -resuming_receiver_LDADD=$(lib_client) - -check_PROGRAMS+=txshift -txshift_INCLUDES=$(PUBLIC_INCLUDES) -txshift_SOURCES=txshift.cpp TestOptions.h ConnectionOptions.h -txshift_LDADD=$(lib_client) - -check_PROGRAMS+=txjob -txjob_INCLUDES=$(PUBLIC_INCLUDES) -txjob_SOURCES=txjob.cpp TestOptions.h ConnectionOptions.h -txjob_LDADD=$(lib_client) - -check_PROGRAMS+=PollerTest -PollerTest_SOURCES=PollerTest.cpp -PollerTest_LDADD=$(lib_common) $(lib_client) $(SOCKLIBS) - -check_PROGRAMS+=DispatcherTest -DispatcherTest_SOURCES=DispatcherTest.cpp -DispatcherTest_LDADD=$(lib_common) $(lib_client) $(SOCKLIBS) - -check_PROGRAMS+=datagen -datagen_SOURCES=datagen.cpp -datagen_LDADD=$(lib_common) $(lib_client) - -check_PROGRAMS+=qpid-stream -qpid_stream_INCLUDES=$(PUBLIC_INCLUDES) -qpid_stream_SOURCES=qpid-stream.cpp -qpid_stream_LDADD=$(lib_messaging) - -check_PROGRAMS+=msg_group_test -msg_group_test_INCLUDES=$(PUBLIC_INCLUDES) -msg_group_test_SOURCES=msg_group_test.cpp -msg_group_test_LDADD=$(lib_messaging) - -TESTS_ENVIRONMENT = \ - VALGRIND=$(VALGRIND) \ - LIBTOOL="$(LIBTOOL)" \ - QPID_DATA_DIR= \ - $(srcdir)/run_test - -system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest \ - run_msg_group_tests -TESTS += start_broker $(system_tests) python_tests stop_broker \ - run_ha_tests run_interlink_tests run_federation_tests run_federation_sys_tests \ - run_acl_tests run_cli_tests dynamic_log_level_test \ - dynamic_log_hires_timestamp run_queue_flow_limit_tests ipv6_test - -EXTRA_DIST += \ - run_test vg_check \ - run-unit-tests start_broker python_tests stop_broker \ - quick_topictest \ - quick_perftest \ - quick_txtest \ - topictest \ - run_header_test \ - header_test.py \ - ssl_test \ - ping_broker \ - config.null \ - run_federation_tests \ - run_federation_sys_tests \ - run_long_federation_sys_tests \ - run_cli_tests \ - run_acl_tests \ - .valgrind.supp \ - MessageUtils.h \ - TestMessageStore.h \ - TxMocks.h \ - run_perftest \ - ring_queue_test \ - run_ring_queue_test \ - run_paged_queue_tests \ - dynamic_log_level_test \ - dynamic_log_hires_timestamp \ - qpid-ctrl \ - CMakeLists.txt \ - windows/DisableWin32ErrorWindows.cpp \ - background.ps1 \ - find_prog.ps1 \ - python_tests.ps1 \ - quick_topictest.ps1 \ - run_federation_tests.ps1 \ - run_header_test.ps1 \ - run_test.ps1 \ - start_broker.ps1 \ - stop_broker.ps1 \ - topictest.ps1 \ - run_queue_flow_limit_tests \ - run_msg_group_tests \ - ipv6_test \ - run_ha_tests \ - ha_test.py \ - ha_tests.py \ - run_interlink_tests \ - interlink_tests.py \ - brokertest.py \ - test_env.ps1.in - -check_LTLIBRARIES += libdlclose_noop.la -libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir) -libdlclose_noop_la_SOURCES = dlclose_noop.c - -CLEANFILES+=valgrind.out *.log *.vglog* dummy_test qpidd.port $(unit_wrappers) - -# Longer running stability tests, not run by default check: target. -# Not run under valgrind, too slow - -LONG_TESTS+=start_broker \ - fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test run_paged_queue_tests\ - run_msg_group_tests_soak \ - stop_broker \ - run_long_federation_sys_tests - -EXTRA_DIST+= \ - fanout_perftest \ - shared_perftest \ - multiq_perftest \ - topic_perftest \ - sasl_test_setup.sh \ - run_msg_group_tests_soak \ - qpidd-empty.conf \ - qpidd-p0 - -check-long: - $(MAKE) check TESTS="$(LONG_TESTS)" VALGRIND= - -# Things that should be built before the check target runs. -check-am: python_prep test_env.sh install_env.sh sasl_config - -PYTHON_SRC_DIR=$(abs_srcdir)/../../../python -PYTHON_BLD_DIR=$(abs_builddir)/python - -# Generate python client as part of the all-am target so it gets built before tests. -all-am: python_prep - -python_prep: - if test -d $(PYTHON_SRC_DIR); \ - then cd $(PYTHON_SRC_DIR) && python $(PYTHON_SRC_DIR)/setup.py install \ - --prefix=$(PYTHON_BLD_DIR) --install-lib=$(PYTHON_BLD_DIR) \ - --install-scripts=$(PYTHON_BLD_DIR)/commands; \ - else echo "WARNING: python client not built, missing $(PYTHON_SRC_DIR)"; fi - -sasl_config: sasl_test_setup.sh - sh $(srcdir)/sasl_test_setup.sh - touch sasl_config - -CLEAN_LOCAL += sasl_config - -clean-local: - rm -rf $(CLEAN_LOCAL) - -include testagent.mk -include brokermgmt.mk - diff --git a/qpid/cpp/src/tests/ManagementTest.cpp b/qpid/cpp/src/tests/ManagementTest.cpp index 8944c084c0..98ef591fae 100644 --- a/qpid/cpp/src/tests/ManagementTest.cpp +++ b/qpid/cpp/src/tests/ManagementTest.cpp @@ -21,7 +21,6 @@ #include "qpid/management/ManagementObject.h" #include "qpid/framing/Buffer.h" -#include "qpid/console/ObjectId.h" #include "unit_test.h" namespace qpid { @@ -93,32 +92,6 @@ QPID_AUTO_TEST_CASE(testObjectIdCreate) { BOOST_CHECK_EQUAL(oid.getV2Key(), "an-object-name"); } -QPID_AUTO_TEST_CASE(testConsoleObjectId) { - qpid::console::ObjectId oid1, oid2; - - oid1.setValue(1, 2); - oid2.setValue(3, 4); - - BOOST_CHECK(oid1 < oid2); - BOOST_CHECK(oid1 <= oid2); - BOOST_CHECK(oid2 > oid1); - BOOST_CHECK(oid2 >= oid1); - BOOST_CHECK(oid1 != oid2); - BOOST_CHECK(oid1 == oid1); - - oid1.setValue(3, 6); - oid2.setValue(3, 4); - - BOOST_CHECK(oid1 > oid2); - BOOST_CHECK(oid1 >= oid2); - BOOST_CHECK(oid2 < oid1); - BOOST_CHECK(oid2 <= oid1); - BOOST_CHECK(oid1 != oid2); - - oid2.setValue(3, 6); - BOOST_CHECK(oid1 == oid2); -} - QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/MessageReplayTracker.cpp b/qpid/cpp/src/tests/MessageReplayTracker.cpp index e35f673683..c0778247f0 100644 --- a/qpid/cpp/src/tests/MessageReplayTracker.cpp +++ b/qpid/cpp/src/tests/MessageReplayTracker.cpp @@ -23,6 +23,8 @@ #include "qpid/client/MessageReplayTracker.h" #include "qpid/sys/Time.h" +#include <boost/format.hpp> + namespace qpid { namespace tests { diff --git a/qpid/cpp/src/tests/MessagingFixture.h b/qpid/cpp/src/tests/MessagingFixture.h index 2312a87e9d..1d70d4cfa6 100644 --- a/qpid/cpp/src/tests/MessagingFixture.h +++ b/qpid/cpp/src/tests/MessagingFixture.h @@ -35,6 +35,8 @@ #include "qpid/messaging/Message.h" #include "qpid/types/Variant.h" +#include <boost/format.hpp> + namespace qpid { namespace tests { @@ -115,6 +117,7 @@ struct MessagingFixture : public BrokerFixture (boost::format("amqp:tcp:localhost:%1%") % (port)).str()); connection.open(); return connection; + } /** Open a connection to the broker. */ @@ -231,9 +234,10 @@ inline void receive(messaging::Receiver& receiver, uint count = 1, uint start = class MethodInvoker { public: - MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"), - sender(session.createSender("qmf.default.direct/broker")), - receiver(session.createReceiver(replyTo)) {} + MethodInvoker(messaging::Session session) : + replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"), + sender(session.createSender("qmf.default.direct/broker")), + receiver(session.createReceiver(replyTo)) {} void createExchange(const std::string& name, const std::string& type, bool durable=false) { @@ -292,11 +296,14 @@ class MethodInvoker methodRequest("delete", params); } - void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0) + void methodRequest( + const std::string& method, + const Variant::Map& inParams, Variant::Map* outParams = 0, + const std::string& objectName="org.apache.qpid.broker:broker:amqp-broker") { Variant::Map content; Variant::Map objectId; - objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker"; + objectId["_object_name"] = objectName;; content["_object_id"] = objectId; content["_method_name"] = method; content["_arguments"] = inParams; diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index b388f2c13a..5cc595c56f 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -1289,6 +1289,95 @@ QPID_AUTO_TEST_CASE(testSimpleRequestResponse) BOOST_CHECK_EQUAL(m.getSubject(), original.getSubject()); } +QPID_AUTO_TEST_CASE(testSelfDestructQueue) +{ + MessagingFixture fix; + Session other = fix.connection.createSession(); + Receiver r1 = other.createReceiver("amq.fanout; {link:{reliability:at-least-once, x-declare:{arguments:{qpid.max_count:10,qpid.policy_type:self-destruct}}}}"); + Receiver r2 = fix.session.createReceiver("amq.fanout"); + //send request + Sender s = fix.session.createSender("amq.fanout"); + for (uint i = 0; i < 20; ++i) { + s.send(Message((boost::format("MSG_%1%") % (i+1)).str())); + } + try { + ScopedSuppressLogging sl; + for (uint i = 0; i < 20; ++i) { + r1.fetch(Duration::SECOND); + } + BOOST_FAIL("Expected exception."); + } catch (const qpid::messaging::MessagingException&) { + } + + for (uint i = 0; i < 20; ++i) { + BOOST_CHECK_EQUAL(r2.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str()); + } +} + +QPID_AUTO_TEST_CASE(testReroutingRingQueue) +{ + MessagingFixture fix; + Receiver r1 = fix.session.createReceiver("my-queue; {create:always, node:{x-declare:{alternate-exchange:amq.fanout, auto-delete:True, arguments:{qpid.max_count:10,qpid.policy_type:ring}}}}"); + Receiver r2 = fix.session.createReceiver("amq.fanout"); + + Sender s = fix.session.createSender("my-queue"); + for (uint i = 0; i < 20; ++i) { + s.send(Message((boost::format("MSG_%1%") % (i+1)).str())); + } + for (uint i = 10; i < 20; ++i) { + BOOST_CHECK_EQUAL(r1.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str()); + } + for (uint i = 0; i < 10; ++i) { + BOOST_CHECK_EQUAL(r2.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str()); + } +} + +QPID_AUTO_TEST_CASE(testReleaseOnPriorityQueue) +{ + MessagingFixture fix; + std::string queue("queue; {create:always, node:{x-declare:{auto-delete:True, arguments:{qpid.priorities:10}}}}"); + std::string text("my message"); + Sender sender = fix.session.createSender(queue); + sender.send(Message(text)); + Receiver receiver = fix.session.createReceiver(queue); + Message msg; + for (uint i = 0; i < 10; ++i) { + if (receiver.fetch(msg, Duration::SECOND)) { + BOOST_CHECK_EQUAL(msg.getContent(), text); + fix.session.release(msg); + } else { + BOOST_FAIL("Released message not redelivered as expected."); + } + } + fix.session.acknowledge(); +} + +QPID_AUTO_TEST_CASE(testRollbackWithFullPrefetch) +{ + QueueFixture fix; + std::string first("first"); + std::string second("second"); + Sender sender = fix.session.createSender(fix.queue); + for (uint i = 0; i < 10; ++i) { + sender.send(Message((boost::format("MSG_%1%") % (i+1)).str())); + } + Session txsession = fix.connection.createTransactionalSession(); + Receiver receiver = txsession.createReceiver(fix.queue); + receiver.setCapacity(9); + Message msg; + for (uint i = 0; i < 10; ++i) { + if (receiver.fetch(msg, Duration::SECOND)) { + BOOST_CHECK_EQUAL(msg.getContent(), std::string("MSG_1")); + txsession.rollback(); + } else { + BOOST_FAIL("Released message not redelivered as expected."); + break; + } + } + txsession.acknowledge(); + txsession.commit(); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/QueuePolicyTest.cpp b/qpid/cpp/src/tests/QueuePolicyTest.cpp index 00e964602a..ce1b0addea 100644 --- a/qpid/cpp/src/tests/QueuePolicyTest.cpp +++ b/qpid/cpp/src/tests/QueuePolicyTest.cpp @@ -28,6 +28,8 @@ #include "qpid/framing/reply_exceptions.h" #include "BrokerFixture.h" +#include <boost/format.hpp> + using namespace qpid::broker; using namespace qpid::client; using namespace qpid::framing; diff --git a/qpid/cpp/src/tests/Shlib.cpp b/qpid/cpp/src/tests/Shlib.cpp index d8ad4c14d8..7f01323e3c 100644 --- a/qpid/cpp/src/tests/Shlib.cpp +++ b/qpid/cpp/src/tests/Shlib.cpp @@ -35,13 +35,7 @@ typedef void (*CallMe)(int*); QPID_AUTO_TEST_CASE(testShlib) { - // The CMake-based build passes in the shared lib suffix; if it's not - // there, this is a Linux/UNIX libtool-based build. -#if defined (QPID_SHLIB_PREFIX) && defined (QPID_SHLIB_SUFFIX) Shlib sh("./" QPID_SHLIB_PREFIX "shlibtest" QPID_SHLIB_POSTFIX QPID_SHLIB_SUFFIX); -#else - Shlib sh(".libs/libshlibtest.so"); -#endif // Double cast to avoid ISO warning. CallMe callMe=sh.getSymbol<CallMe>("callMe"); BOOST_REQUIRE(callMe != 0); @@ -59,11 +53,7 @@ QPID_AUTO_TEST_CASE(testShlib) { QPID_AUTO_TEST_CASE(testAutoShlib) { int unloaded = 0; { -#if defined (QPID_SHLIB_PREFIX) && defined (QPID_SHLIB_SUFFIX) AutoShlib sh("./" QPID_SHLIB_PREFIX "shlibtest" QPID_SHLIB_POSTFIX QPID_SHLIB_SUFFIX); -#else - AutoShlib sh(".libs/libshlibtest.so"); -#endif CallMe callMe=sh.getSymbol<CallMe>("callMe"); BOOST_REQUIRE(callMe != 0); callMe(&unloaded); diff --git a/qpid/cpp/src/tests/TransactionObserverTest.cpp b/qpid/cpp/src/tests/TransactionObserverTest.cpp new file mode 100644 index 0000000000..fd1c331ae7 --- /dev/null +++ b/qpid/cpp/src/tests/TransactionObserverTest.cpp @@ -0,0 +1,144 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "unit_test.h" +#include "test_tools.h" +#include "MessagingFixture.h" +#include "qpid/broker/BrokerObserver.h" +#include "qpid/broker/TransactionObserver.h" +#include "qpid/broker/TxBuffer.h" +#include "qpid/broker/Queue.h" +#include "qpid/ha/types.h" + +#include <boost/bind.hpp> +#include <boost/function.hpp> +#include <boost/lexical_cast.hpp> +#include <iostream> +#include <vector> + +namespace qpid { +namespace tests { + +using framing::SequenceSet; +using messaging::Message; +using boost::shared_ptr; + +using namespace boost::assign; +using namespace boost; +using namespace broker; +using namespace std; +using namespace messaging; +using namespace types; + +QPID_AUTO_TEST_SUITE(TransactionalObserverTest) + +Message msg(string content) { return Message(content); } + +struct MockTransactionObserver : public TransactionObserver { + bool prep; + vector<string> events; + + MockTransactionObserver(bool prep_=true) : prep(prep_) {} + + void record(const string& e) { events.push_back(e); } + + void enqueue(const shared_ptr<Queue>& q, const broker::Message& m) { + record("enqueue "+q->getName()+" "+m.getContent()); + } + void dequeue(const Queue::shared_ptr& q, SequenceNumber p, SequenceNumber r) { + record("dequeue "+q->getName()+" "+ + lexical_cast<string>(p)+" "+lexical_cast<string>(r)); + } + bool prepare() { record("prepare"); return prep; } + void commit() { record("commit"); } + void rollback() {record("rollback"); } +}; + +struct MockBrokerObserver : public BrokerObserver { + bool prep; + shared_ptr<MockTransactionObserver> tx; + + MockBrokerObserver(bool prep_=true) : prep(prep_) {} + + void startTx(const shared_ptr<TxBuffer>& buffer) { + tx.reset(new MockTransactionObserver(prep)); + buffer->setObserver(tx); + } +}; + +Session simpleTxTransaction(MessagingFixture& fix) { + fix.session.createSender("q1;{create:always}").send(msg("foo")); // Not in TX + // Transaction with 1 enqueue and 1 dequeue. + Session txSession = fix.connection.createTransactionalSession(); + BOOST_CHECK_EQUAL("foo", txSession.createReceiver("q1").fetch().getContent()); + txSession.acknowledge(); + txSession.createSender("q2;{create:always}").send(msg("bar")); + return txSession; +} + +QPID_AUTO_TEST_CASE(tesTxtCommit) { + MessagingFixture fix; + shared_ptr<MockBrokerObserver> brokerObserver(new MockBrokerObserver); + fix.broker->getBrokerObservers().add(brokerObserver); + Session txSession = simpleTxTransaction(fix); + txSession.commit(); + // Note on ordering: observers see enqueues as they happen, but dequeues just + // before prepare. + BOOST_CHECK_EQUAL( + list_of<string>("enqueue q2 bar")("dequeue q1 1 0")("prepare")("commit"), + brokerObserver->tx->events + ); +} + +QPID_AUTO_TEST_CASE(testTxFail) { + MessagingFixture fix; + shared_ptr<MockBrokerObserver> brokerObserver(new MockBrokerObserver(false)); + fix.broker->getBrokerObservers().add(brokerObserver); + Session txSession = simpleTxTransaction(fix); + try { + txSession.commit(); + BOOST_FAIL("Expected exception"); + } catch(...) {} + + BOOST_CHECK_EQUAL( + list_of<string>("enqueue q2 bar")("dequeue q1 1 0")("prepare")("rollback"), + brokerObserver->tx->events + ); +} + +QPID_AUTO_TEST_CASE(testTxRollback) { + MessagingFixture fix; + shared_ptr<MockBrokerObserver> brokerObserver(new MockBrokerObserver(false)); + fix.broker->getBrokerObservers().add(brokerObserver); + Session txSession = simpleTxTransaction(fix); + txSession.rollback(); + // Note: The dequeue does not appear here. This is because TxAccepts + // (i.e. dequeues) are not enlisted until SemanticState::commit and are + // never enlisted if the transaction is rolled back. + BOOST_CHECK_EQUAL( + list_of<string>("enqueue q2 bar")("rollback"), + brokerObserver->tx->events + ); +} + +QPID_AUTO_TEST_SUITE_END() + +}} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/TxMocks.h b/qpid/cpp/src/tests/TxMocks.h index bf21104f70..8b54e7484b 100644 --- a/qpid/cpp/src/tests/TxMocks.h +++ b/qpid/cpp/src/tests/TxMocks.h @@ -103,6 +103,9 @@ public: if(!debugName.empty()) std::cout << std::endl << "MockTxOp[" << debugName << "]::rollback()" << std::endl; actual.push_back(ROLLBACK); } + + void callObserver(const boost::shared_ptr<TransactionObserver>&) {} + MockTxOp& expectPrepare(){ expected.push_back(PREPARE); return *this; diff --git a/qpid/cpp/src/tests/brokermgmt.mk b/qpid/cpp/src/tests/brokermgmt.mk deleted file mode 100644 index cf9a47200c..0000000000 --- a/qpid/cpp/src/tests/brokermgmt.mk +++ /dev/null @@ -1,44 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Build a unit test for the broker's internal management agent. - -BROKERMGMT_GEN_SRC= \ - brokermgmt_gen/qmf/org/apache/qpid/broker/mgmt/test/Package.cpp \ - brokermgmt_gen/qmf/org/apache/qpid/broker/mgmt/test/Package.h \ - brokermgmt_gen/qmf/org/apache/qpid/broker/mgmt/test/TestObject.h \ - brokermgmt_gen/qmf/org/apache/qpid/broker/mgmt/test/TestObject.cpp - -$(BROKERMGMT_GEN_SRC): brokermgmt_gen.timestamp - -if GENERATE -BROKERMGMT_DEPS=../mgen.timestamp -endif # GENERATE -brokermgmt_gen.timestamp: BrokerMgmtAgent.xml ${BROKERMGMT_DEPS} - $(QMF_GEN) -b -o brokermgmt_gen/qmf $(srcdir)/BrokerMgmtAgent.xml - touch $@ - -BrokerMgmtAgent.$(OBJEXT): $(BROKERMGMT_GEN_SRC) - -CLEANFILES+=$(BROKERMGMT_GEN_SRC) brokermgmt_gen.timestamp - -unit_test_SOURCES+=BrokerMgmtAgent.cpp ${BROKERMGMT_GEN_SRC} -INCLUDES+= -Ibrokermgmt_gen - -EXTRA_DIST+=BrokerMgmtAgent.xml diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 15372b312d..2786454399 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -78,7 +78,7 @@ def error_line(filename, n=1): except: return "" return ":\n" + "".join(result) -def retry(function, timeout=10, delay=.01, max_delay=1): +def retry(function, timeout=10, delay=.001, max_delay=1): """Call function until it returns a true value or timeout expires. Double the delay for each retry up to max_delay. Returns what function returns if true, None if timeout expires.""" @@ -141,7 +141,7 @@ class Popen(subprocess.Popen): finally: f.close() log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd))) - def __str__(self): return "Popen<%s>"%(self.pname) + def __repr__(self): return "Popen<%s>"%(self.pname) def outfile(self, ext): return "%s.%s" % (self.pname, ext) @@ -242,16 +242,12 @@ class Broker(Popen): _broker_count = 0 _log_count = 0 - def __str__(self): return "Broker<%s %s :%d>"%(self.log, self.pname, self.port()) - - def find_log(self): - self.log = "%03d:%s.log" % (Broker._log_count, self.name) - Broker._log_count += 1 + def __repr__(self): return "<Broker:%s:%d>"%(self.log, self.port()) def get_log(self): return os.path.abspath(self.log) - def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False): + def __init__(self, test, args=[], test_store=False, name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False): """Start a broker daemon. name determines the data-dir and log file names.""" @@ -273,11 +269,18 @@ class Broker(Popen): else: self.name = "broker%d" % Broker._broker_count Broker._broker_count += 1 - self.find_log() + + self.log = "%03d:%s.log" % (Broker._log_count, self.name) + self.store_log = "%03d:%s.store.log" % (Broker._log_count, self.name) + Broker._log_count += 1 + cmd += ["--log-to-file", self.log] cmd += ["--log-to-stderr=no"] cmd += ["--log-enable=%s"%(log_level or "info+") ] + if test_store: cmd += ["--load-module", BrokerTest.test_store_lib, + "--test-store-events", self.store_log] + self.datadir = self.name cmd += ["--data-dir", self.datadir] if show_cmd: print cmd @@ -285,7 +288,6 @@ class Broker(Popen): test.cleanup_stop(self) self._host = "127.0.0.1" log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) - self._log_ready = False def startQmf(self, handler=None): self.qmf_session = qmf.console.Session(handler) @@ -363,29 +365,21 @@ class Broker(Popen): def host_port(self): return "%s:%s" % (self.host(), self.port()) - def log_contains(self, str, timeout=1): - """Wait for str to appear in the log file up to timeout. Return true if found""" - return retry(lambda: find_in_file(str, self.log), timeout) - - def log_ready(self): - """Return true if the log file exists and contains a broker ready message""" - if not self._log_ready: - self._log_ready = find_in_file("notice Broker running", self.log) - return self._log_ready - def ready(self, timeout=30, **kwargs): """Wait till broker is ready to serve clients""" - # First make sure the broker is listening by checking the log. - if not retry(self.log_ready, timeout=timeout): - raise Exception( - "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5))) - # Create a connection and a session. - try: - c = self.connect(**kwargs) - try: c.session() - finally: c.close() - except Exception,e: raise RethrownException( - "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5))) + deadline = time.time()+timeout + while True: + try: + c = self.connect(timeout=timeout, **kwargs) + try: + c.session() + return # All good + finally: c.close() + except Exception,e: # Retry up to timeout + if time.time() > deadline: + raise RethrownException( + "Broker %s not responding: (%s)%s"%( + self.name,e,error_line(self.log, 5))) def browse(session, queue, timeout=0, transform=lambda m: m.content): """Return a list with the contents of each message on queue.""" @@ -407,7 +401,7 @@ def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) assert expect_contents == actual_contents, msg -def assert_browse_retry(session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg="browse failed"): +def assert_browse_retry(session, queue, expect_contents, timeout=1, delay=.001, transform=lambda m:m.content, msg="browse failed"): """Wait up to timeout for contents of queue to match expect_contents""" test = lambda: browse(session, queue, 0, transform=transform) == expect_contents retry(test, timeout, delay) @@ -421,6 +415,10 @@ class BrokerTest(TestCase): Provides a well-known working directory for each test. """ + def __init__(self, *args, **kwargs): + self.longMessage = True # Enable long messages for assert*(..., msg=xxx) + TestCase.__init__(self, *args, **kwargs) + # Environment settings. qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC")) ha_lib = os.getenv("HA_LIB") @@ -480,7 +478,7 @@ class BrokerTest(TestCase): def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs) def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs) -def join(thread, timeout=10): +def join(thread, timeout=30): thread.join(timeout) if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread) diff --git a/qpid/cpp/src/tests/cli_tests.py b/qpid/cpp/src/tests/cli_tests.py index dceafc5427..eee9bc648c 100755 --- a/qpid/cpp/src/tests/cli_tests.py +++ b/qpid/cpp/src/tests/cli_tests.py @@ -69,47 +69,39 @@ class CliTests(TestBase010): self.startBrokerAccess() queue1 = self.makeQueue("test_queue_params1", "--limit-policy none") queue2 = self.makeQueue("test_queue_params2", "--limit-policy reject") - queue3 = self.makeQueue("test_queue_params3", "--limit-policy flow-to-disk") - queue4 = self.makeQueue("test_queue_params4", "--limit-policy ring") - queue5 = self.makeQueue("test_queue_params5", "--limit-policy ring-strict") + queue3 = self.makeQueue("test_queue_params3", "--limit-policy ring") LIMIT = "qpid.policy_type" assert LIMIT not in queue1.arguments self.assertEqual(queue2.arguments[LIMIT], "reject") - self.assertEqual(queue3.arguments[LIMIT], "flow_to_disk") - self.assertEqual(queue4.arguments[LIMIT], "ring") - self.assertEqual(queue5.arguments[LIMIT], "ring_strict") + self.assertEqual(queue3.arguments[LIMIT], "ring") - queue6 = self.makeQueue("test_queue_params6", "--lvq-key lkey") + queue4 = self.makeQueue("test_queue_params4", "--lvq-key lkey") LVQKEY = "qpid.last_value_queue_key" - assert LVQKEY not in queue5.arguments - assert LVQKEY in queue6.arguments - assert queue6.arguments[LVQKEY] == "lkey" + assert LVQKEY not in queue3.arguments + assert LVQKEY in queue4.arguments + assert queue4.arguments[LVQKEY] == "lkey" def test_queue_params_api(self): self.startBrokerAccess() queue1 = self.makeQueue("test_queue_params_api1", "--limit-policy none", True) queue2 = self.makeQueue("test_queue_params_api2", "--limit-policy reject", True) - queue3 = self.makeQueue("test_queue_params_api3", "--limit-policy flow-to-disk", True) - queue4 = self.makeQueue("test_queue_params_api4", "--limit-policy ring", True) - queue5 = self.makeQueue("test_queue_params_api5", "--limit-policy ring-strict", True) + queue3 = self.makeQueue("test_queue_params_api3", "--limit-policy ring", True) LIMIT = "qpid.policy_type" assert LIMIT not in queue1.arguments self.assertEqual(queue2.arguments[LIMIT], "reject") - self.assertEqual(queue3.arguments[LIMIT], "flow_to_disk") - self.assertEqual(queue4.arguments[LIMIT], "ring") - self.assertEqual(queue5.arguments[LIMIT], "ring_strict") + self.assertEqual(queue3.arguments[LIMIT], "ring") - queue6 = self.makeQueue("test_queue_params_api6", "--lvq-key lkey") + queue4 = self.makeQueue("test_queue_params_api4", "--lvq-key lkey") LVQKEY = "qpid.last_value_queue_key" - assert LVQKEY not in queue5.arguments - assert LVQKEY in queue6.arguments - assert queue6.arguments[LVQKEY] == "lkey" + assert LVQKEY not in queue3.arguments + assert LVQKEY in queue4.arguments + assert queue4.arguments[LVQKEY] == "lkey" def test_qpid_config(self): diff --git a/qpid/cpp/src/tests/failing-amqp0-10-python-tests b/qpid/cpp/src/tests/failing-amqp0-10-python-tests new file mode 100644 index 0000000000..cb742a25e5 --- /dev/null +++ b/qpid/cpp/src/tests/failing-amqp0-10-python-tests @@ -0,0 +1,7 @@ +#The following four tests fail the because pure python client excludes +#the node type for queues from the reply-to address, weheras the swigged +#client does not (as that prevents it resolving the node on every send) +qpid.tests.messaging.message.MessageEchoTests.testReplyTo +qpid.tests.messaging.message.MessageEchoTests.testReplyToQueue +qpid.tests.messaging.message.MessageEchoTests.testReplyToQueueSubject +qpid.tests.messaging.message.MessageEchoTests.testProperties diff --git a/qpid/cpp/src/tests/failing-amqp1.0-python-tests b/qpid/cpp/src/tests/failing-amqp1.0-python-tests new file mode 100644 index 0000000000..2cae7b6306 --- /dev/null +++ b/qpid/cpp/src/tests/failing-amqp1.0-python-tests @@ -0,0 +1,2 @@ +qpid_tests.broker_0_10.new_api.GeneralTests.test_qpid_3481_acquired_to_alt_exchange_2_consumers +qpid_tests.broker_0_10.new_api.GeneralTests.test_qpid_3481_acquired_to_alt_exchange diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index f2fc50054f..3b5874875a 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -48,6 +48,24 @@ class QmfAgent(object): address, client_properties={"qpid.ha-admin":1}, **kwargs) self._agent = BrokerAgent(self._connection) + def queues(self): + return [q.values['name'] for q in self._agent.getAllQueues()] + + def repsub_queue(self, sub): + """If QMF subscription sub is a replicating subscription return + the name of the replicated queue, else return None""" + session_name = self.getSession(sub.sessionRef).name + m = re.search("qpid.ha-q:(.*)\.", session_name) + return m and m.group(1) + + def repsub_queues(self): + """Return queue names for all replicating subscriptions""" + return filter(None, [self.repsub_queue(s) for s in self.getAllSubscriptions()]) + + def tx_queues(self): + """Return names of all tx-queues""" + return [q for q in self.queues() if q.startswith("qpid.ha-tx")] + def __getattr__(self, name): a = getattr(self._agent, name) return a @@ -101,6 +119,9 @@ class HaBroker(Broker): """Start a broker with HA enabled @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker. """ + + heartbeat=2 + def __init__(self, test, ha_port=None, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all", client_credentials=None, **kwargs): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" @@ -112,7 +133,7 @@ class HaBroker(Broker): "--link-maintenance-interval=0.1", # Heartbeat and negotiate time are needed so that a broker wont # stall on an address that doesn't currently have a broker running. - "--link-heartbeat-interval=1", + "--link-heartbeat-interval=%s"%(HaBroker.heartbeat), "--max-negotiate-time=1000", "--ha-cluster=%s"%ha_cluster] if ha_replicate is not None: @@ -139,15 +160,18 @@ acl allow all all self.client_credentials = client_credentials self.ha_port = ha_port - def __str__(self): return Broker.__str__(self) + def __repr__(self): return "<HaBroker:%s:%d>"%(self.log, self.port()) def qpid_ha(self, args): - cred = self.client_credentials - url = self.host_port() - if cred: - url =cred.add_user(url) - args = args + ["--sasl-mechanism", cred.mechanism] - self.qpid_ha_script.main_except(["", "-b", url]+args) + try: + cred = self.client_credentials + url = self.host_port() + if cred: + url =cred.add_user(url) + args = args + ["--sasl-mechanism", cred.mechanism] + self.qpid_ha_script.main_except(["", "-b", url]+args) + except Exception, e: + raise Exception("Error in qpid_ha -b %s %s: %s"%(url, args,e)) def promote(self): self.ready(); self.qpid_ha(["promote"]) def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url]) @@ -221,6 +245,12 @@ acl allow all all def wait_backup(self, address): self.wait_address(address) + def browse(self, queue, timeout=0, transform=lambda m: m.content): + c = self.connect_admin() + try: + return browse(c.session(), queue, timeout, transform) + finally: c.close() + def assert_browse(self, queue, expected, **kwargs): """Verify queue contents by browsing.""" bs = self.connect().session() @@ -247,8 +277,10 @@ acl allow all all try: return self.connect() except ConnectionError: return None - def ready(self): - return Broker.ready(self, client_properties={"qpid.ha-admin":1}) + def ready(self, *args, **kwargs): + if not 'client_properties' in kwargs: kwargs['client_properties'] = {} + kwargs['client_properties']['qpid.ha-admin'] = True + return Broker.ready(self, *args, **kwargs) def kill(self, final=True): if final: self.ha_port.stop() @@ -259,16 +291,19 @@ acl allow all all class HaCluster(object): _cluster_count = 0 - def __init__(self, test, n, promote=True, wait=True, args=[], **kwargs): + def __init__(self, test, n, promote=True, wait=True, args=[], s_args=[], **kwargs): """Start a cluster of n brokers. @test: The test being run @n: start n brokers @promote: promote self[0] to primary @wait: wait for primary active and backups ready. Ignored if promote=False + @args: args for all brokers in the cluster. + @s_args: args for specific brokers: s_args[i] for broker i. """ self.test = test - self.args = args + self.args = copy(args) + self.s_args = copy(s_args) self.kwargs = kwargs self._ports = [HaPort(test) for i in xrange(n)] self._set_url() @@ -288,10 +323,13 @@ class HaCluster(object): self.broker_id += 1 return name - def _ha_broker(self, ha_port, name): + def _ha_broker(self, i, name): + args = self.args + if i < len(self.s_args): args += self.s_args[i] + ha_port = self._ports[i] b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name, - args=self.args, **self.kwargs) - b.ready() + args=args, **self.kwargs) + b.ready(timeout=5) return b def start(self): @@ -302,7 +340,7 @@ class HaCluster(object): self._ports.append(HaPort(self.test)) self._set_url() self._update_urls() - b = self._ha_broker(self._ports[i], self.next_name()) + b = self._ha_broker(i, self.next_name()) self._brokers.append(b) return b @@ -328,7 +366,7 @@ class HaCluster(object): a separate log file: foo.n.log""" if self._ports[i].stopped: raise Exception("Restart after final kill: %s"%(self)) b = self._brokers[i] - self._brokers[i] = self._ha_broker(self._ports[i], b.name) + self._brokers[i] = self._ha_broker(i, b.name) self._brokers[i].ready() def bounce(self, i, promote_next=True): diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 293712fe80..6941a2b545 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -20,7 +20,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest import traceback -from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty +from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty, ServerError from qpid.datatypes import uuid4, UUID from brokertest import * from ha_test import * @@ -37,6 +37,7 @@ def grep(filename, regexp): class HaBrokerTest(BrokerTest): """Base class for HA broker tests""" + def assert_log_no_errors(self, broker): log = broker.get_log() if grep(log, re.compile("] error|] critical")): @@ -219,7 +220,8 @@ class ReplicationTests(HaBrokerTest): backup.connect_admin().close() # Test discovery: should connect to primary after reject by backup - c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) + c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], + reconnect=True) s = c.session() sender = s.sender("q;{create:always}") backup.wait_backup("q") @@ -561,9 +563,9 @@ class ReplicationTests(HaBrokerTest): return acl=os.path.join(os.getcwd(), "policy.acl") aclf=file(acl,"w") - # Verify that replication works with auth=yes and HA user has at least the following - # privileges: + # Minimum set of privileges required for the HA user. aclf.write(""" +# HA user acl allow zag@QPID access queue acl allow zag@QPID create queue acl allow zag@QPID consume queue @@ -575,6 +577,9 @@ acl allow zag@QPID publish exchange acl allow zag@QPID delete exchange acl allow zag@QPID access method acl allow zag@QPID create link +# Normal user +acl allow zig@QPID all all + acl deny all all """) aclf.close() @@ -585,14 +590,16 @@ acl deny all all "--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN" ], client_credentials=Credentials("zag", "zag", "PLAIN")) - s0 = cluster[0].connect(username="zag", password="zag").session(); - s0.receiver("q;{create:always}") - s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}") - cluster[1].wait_backup("q") - cluster[1].wait_backup("ex") - s1 = cluster[1].connect_admin().session(); # Uses Credentials above. - s1.sender("ex").send("foo"); - self.assertEqual(s1.receiver("q").fetch().content, "foo") + c = cluster[0].connect(username="zig", password="zig") + s0 = c.session(); + s0.sender("q;{create:always}") + s0.sender("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}") + s0.sender("ex").send("foo"); + s1 = c.session(transactional=True) + s1.sender("ex").send("foo-tx"); + cluster[1].assert_browse_backup("q", ["foo"]) + s1.commit() + cluster[1].assert_browse_backup("q", ["foo", "foo-tx"]) def test_alternate_exchange(self): """Verify that alternate-exchange on exchanges and queues is propagated @@ -927,20 +934,22 @@ class LongTests(HaBrokerTest): if d: return float(d)*60 else: return 3 # Default is to be quick - # FIXME aconway 2013-06-27: skip this test pending a fix for - # https://issues.apache.org/jira/browse/QPID-4944 - def skip_test_failover_send_receive(self): + def test_failover_send_receive(self): """Test failover with continuous send-receive""" brokers = HaCluster(self, 3) # Start sender and receiver threads n = 10 - senders = [NumberedSender(brokers[0], url=brokers.url, - max_depth=1024, failover_updates=False, - queue="test%s"%(i)) for i in xrange(n)] - receivers = [NumberedReceiver(brokers[0], url=brokers.url, sender=senders[i], - failover_updates=False, - queue="test%s"%(i)) for i in xrange(n)] + senders = [ + NumberedSender( + brokers[0], url=brokers.url,max_depth=50, failover_updates=False, + queue="test%s"%(i), args=["--capacity=10"]) for i in xrange(n)] + + receivers = [ + NumberedReceiver( + brokers[0], url=brokers.url, sender=senders[i],failover_updates=False, + queue="test%s"%(i), args=["--capacity=10"]) for i in xrange(n)] + for r in receivers: r.start() for s in senders: s.start() @@ -991,7 +1000,7 @@ class LongTests(HaBrokerTest): finally: for s in senders: s.stop() for r in receivers: r.stop() - dead = filter(lambda i: not brokers[i].is_running(), xrange(3)) + dead = filter(lambda b: not b.is_running(), brokers) if dead: raise Exception("Brokers not running: %s"%dead) def test_qmf_order(self): @@ -1200,7 +1209,7 @@ class ConfigurationTests(HaBrokerTest): cluster[0].set_brokers_url(cluster.url+",xxx:1234") self.assertRaises(Empty, r.fetch, 0) # Not updated for brokers URL -class StoreTests(BrokerTest): +class StoreTests(HaBrokerTest): """Test for HA with persistence.""" def check_skip(self): @@ -1248,7 +1257,7 @@ class StoreTests(BrokerTest): doing catch-up from the primary.""" if self.check_skip(): return cluster = HaCluster(self, 2) - sn = cluster[0].connect(heartbeat=1).session() + sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session() s1 = sn.sender("q1;{create:always,node:{durable:true}}") for m in ["foo","bar"]: s1.send(Message(m, durable=True)) s2 = sn.sender("q2;{create:always,node:{durable:true}}") @@ -1259,7 +1268,7 @@ class StoreTests(BrokerTest): cluster[1].assert_browse_backup("q2", ["hello"]) # Make changes that the backup doesn't see cluster.kill(1, promote_next=False, final=False) - r1 = cluster[0].connect(heartbeat=1).session().receiver("q1") + r1 = cluster[0].connect(heartbeat=HaBroker.heartbeat).session().receiver("q1") for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m) r1.session.acknowledge() for m in ["x","y","z"]: s1.send(Message(m, durable=True)) @@ -1278,7 +1287,7 @@ class StoreTests(BrokerTest): cluster[0].assert_browse("q1", ["x","y","z"]) cluster[1].assert_browse_backup("q1", ["x","y","z"]) - sn = cluster[0].connect(heartbeat=1).session() # FIXME aconway 2012-09-25: should fail over! + sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session() sn.sender("ex/k1").send("boo") cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"]) cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"]) @@ -1287,6 +1296,185 @@ class StoreTests(BrokerTest): cluster[0].assert_browse("q2", ["hello", "end"]) cluster[1].assert_browse_backup("q2", ["hello", "end"]) +def open_read(name): + try: + f = open(name) + return f.read() + finally: f.close() + +class TransactionTests(HaBrokerTest): + + load_store=["--load-module", BrokerTest.test_store_lib] + + def tx_simple_setup(self, broker): + """Start a transaction, remove messages from queue a, add messages to queue b""" + c = broker.connect() + # Send messages to a, no transaction. + sa = c.session().sender("a;{create:always,node:{durable:true}}") + tx_msgs = ["x","y","z"] + for m in tx_msgs: sa.send(Message(content=m, durable=True)) + + # Receive messages from a, in transaction. + tx = c.session(transactional=True) + txr = tx.receiver("a") + tx_msgs2 = [txr.fetch(1).content for i in xrange(3)] + self.assertEqual(tx_msgs, tx_msgs2) + + # Send messages to b, transactional, mixed with non-transactional. + sb = c.session().sender("b;{create:always,node:{durable:true}}") + txs = tx.sender("b") + msgs = [str(i) for i in xrange(3)] + for tx_m,m in zip(tx_msgs2, msgs): + txs.send(tx_m); + sb.send(m) + return tx + + def tx_subscriptions(self, broker): + """Return list of queue names for tx subscriptions""" + return [q for q in broker.agent().repsub_queues() + if q.startswith("qpid.ha-tx")] + + def test_tx_simple_commit(self): + cluster = HaCluster(self, 2, test_store=True) + tx = self.tx_simple_setup(cluster[0]) + tx.sync() + tx_queues = cluster[0].agent().tx_queues() + + # NOTE: backup does not process transactional dequeues until prepare + cluster[1].assert_browse_backup("a", ["x","y","z"]) + cluster[1].assert_browse_backup("b", ['0', '1', '2']) + + tx.acknowledge() + tx.commit() + tx.sync() + + for b in cluster: self.assert_simple_commit_outcome(b, tx_queues) + + def assert_tx_cleanup(self, b, tx_queues): + """Verify that there are no transaction artifacts + (exchanges, queues, subscriptions) on b.""" + + self.assertEqual(0, len(b.agent().tx_queues()), msg=b) + self.assertEqual(0, len(self.tx_subscriptions(b)), msg=b) + + # TX exchanges don't show up in management so test for existence by name. + s = b.connect_admin().session() + try: + for q in tx_queues: + try: + s.sender("%s;{node:{type:topic}}"%q) + self.fail("Found tx exchange %s on %s "%(q,b)) + except NotFound: pass + finally: s.connection.close() + + def assert_simple_commit_outcome(self, b, tx_queues): + b.assert_browse_backup("a", [], msg=b) + b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b) + # Check for expected actions on the store + expect = """<enqueue a x> +<enqueue a y> +<enqueue a z> +<begin tx 1> +<dequeue a x tx=1> +<dequeue a y tx=1> +<dequeue a z tx=1> +<commit tx=1> +""" + self.assertEqual(expect, open_read(b.store_log), msg=b) + self.assert_tx_cleanup(b, tx_queues) + + def test_tx_simple_rollback(self): + cluster = HaCluster(self, 2, test_store=True) + tx = self.tx_simple_setup(cluster[0]) + tx.sync() + tx_queues = cluster[0].agent().tx_queues() + tx.acknowledge() + tx.rollback() + for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + + def assert_simple_rollback_outcome(self, b, tx_queues): + b.assert_browse_backup("a", ["x","y","z"], msg=b) + b.assert_browse_backup("b", ['0', '1', '2'], msg=b) + # Check for expected actions on the store + expect = """<enqueue a x> +<enqueue a y> +<enqueue a z> +""" + self.assertEqual(open_read(b.store_log), expect, msg=b) + self.assert_tx_cleanup(b, tx_queues) + + def test_tx_simple_failover(self): + cluster = HaCluster(self, 3, test_store=True) + tx = self.tx_simple_setup(cluster[0]) + tx.sync() + tx_queues = cluster[0].agent().tx_queues() + tx.acknowledge() + cluster.bounce(0) # Should cause roll-back + cluster[0].wait_status("ready") # Restarted. + cluster[1].wait_status("active") # Promoted. + cluster[2].wait_status("ready") # Failed over. + for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + + def test_tx_no_backups(self): + """Test the special case of a TX where there are no backups""" + + # Test commit + cluster = HaCluster(self, 1, test_store=True) + tx = self.tx_simple_setup(cluster[0]) + tx.acknowledge() + tx.commit() + tx.sync() + tx_queues = cluster[0].agent().tx_queues() + self.assert_simple_commit_outcome(cluster[0], tx_queues) + + # Test rollback + cluster = HaCluster(self, 1, test_store=True) + tx = self.tx_simple_setup(cluster[0]) + tx.sync() + tx_queues = cluster[0].agent().tx_queues() + tx.acknowledge() + tx.rollback() + tx.sync() + self.assert_simple_rollback_outcome(cluster[0], tx_queues) + + + def test_tx_backup_fail(self): + cluster = HaCluster( + self, 2, test_store=True, s_args=[[],["--test-store-throw=bang"]]) + c = cluster[0].connect() + tx = c.session(transactional=True) + s = tx.sender("q;{create:always,node:{durable:true}}") + for m in ["foo","bang","bar"]: s.send(Message(m, durable=True)) + self.assertRaises(ServerError, tx.commit) + for b in cluster: b.assert_browse_backup("q", []) + self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n") + self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n") + + def test_tx_join_leave(self): + """Test cluster members joining/leaving cluster. + Also check that tx-queues are cleaned up at end of transaction.""" + + cluster = HaCluster(self, 3) + + # Leaving + tx = cluster[0].connect().session(transactional=True) + s = tx.sender("q;{create:always}") + s.send("a", sync=True) + self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster]) + cluster[1].kill(final=False) + s.send("b") + self.assertRaises(ServerError, tx.commit) + self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]]) + + # Joining + tx = cluster[0].connect().session(transactional=True) + s = tx.sender("q;{create:always}") + s.send("foo") + cluster.restart(1) + tx.commit() + # The new member is not in the tx but receives the results normal replication. + for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b) + if __name__ == "__main__": outdir = "ha_tests.tmp" shutil.rmtree(outdir, True) diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py index 129283ac24..608d4ac890 100755 --- a/qpid/cpp/src/tests/interlink_tests.py +++ b/qpid/cpp/src/tests/interlink_tests.py @@ -22,6 +22,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil import traceback from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty from brokertest import * +from ha_test import HaPort from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG, INFO from qpidtoollibs import BrokerAgent, BrokerObject @@ -46,7 +47,8 @@ class AmqpBrokerTest(BrokerTest): def setUp(self): BrokerTest.setUp(self) os.putenv("QPID_LOAD_MODULE", BrokerTest.amqpc_lib) - self.broker = self.amqp_broker() + self.port_holder = HaPort(self) + self.broker = self.amqp_broker(port_holder=self.port_holder) self.default_config = Config(self.broker) self.agent = BrokerAgent(self.broker.connect()) @@ -126,6 +128,9 @@ class AmqpBrokerTest(BrokerTest): def test_translate2(self): self.send_and_receive(send_config=Config(self.broker, version="amqp0-10")) + def test_translate_with_large_routingkey(self): + self.send_and_receive(send_config=Config(self.broker, address="amq.topic/a.%s" % ("x" * 256), version="amqp1.0"), recv_config=Config(self.broker, address="amq.topic/a.*", version="amqp0-10"), wait_for_receiver=True) + def send_and_receive_empty(self, send_config=None, recv_config=None): sconfig = send_config or self.default_config rconfig = recv_config or self.default_config @@ -218,16 +223,22 @@ class AmqpBrokerTest(BrokerTest): assert len(domains) == 1 assert domains[0].name == "BrokerB" - def test_incoming_link(self): + def incoming_link(self, mechanism): brokerB = self.amqp_broker() agentB = BrokerAgent(brokerB.connect()) self.agent.create("queue", "q") agentB.create("queue", "q") - self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":"NONE"}) + self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":mechanism}) self.agent.create("incoming", "Link1", {"domain":"BrokerB","source":"q","target":"q"}) #send to brokerB, receive from brokerA self.send_and_receive(send_config=Config(brokerB)) + def test_incoming_link_anonymous(self): + self.incoming_link("ANONYMOUS") + + def test_incoming_link_nosasl(self): + self.incoming_link("NONE") + def test_outgoing_link(self): brokerB = self.amqp_broker() agentB = BrokerAgent(brokerB.connect()) @@ -246,14 +257,73 @@ class AmqpBrokerTest(BrokerTest): #send to q on broker B through brokerA self.send_and_receive(send_config=Config(self.broker, address="q@BrokerB"), recv_config=Config(brokerB)) + def test_reconnect(self): + receiver_cmd = ["qpid-receive", + "--broker", self.broker.host_port(), + "--address=amq.fanout", + "--connection-options={protocol:amqp1.0, reconnect:True,container_id:receiver}", + "--timeout=10", "--print-content=true", "--print-headers=false" + ] + receiver = self.popen(receiver_cmd, stdout=PIPE) + + sender_cmd = ["qpid-send", + "--broker", self.broker.host_port(), + "--address=amq.fanout", + "--connection-options={protocol:amqp1.0,reconnect:True,container_id:sender}", + "--content-stdin", "--send-eos=1" + ] + sender = self.popen(sender_cmd, stdin=PIPE) + sender._set_cloexec_flag(sender.stdin) #required for older python, see http://bugs.python.org/issue4112 + + + batch1 = ["message-%s" % (i+1) for i in range(10000)] + for m in batch1: + sender.stdin.write(m + "\n") + sender.stdin.flush() + + self.broker.kill() + self.broker = self.amqp_broker(port_holder=self.port_holder) + + batch2 = ["message-%s" % (i+1) for i in range(10000, 20000)] + for m in batch2: + sender.stdin.write(m + "\n") + sender.stdin.flush() + + sender.stdin.close() + + last = None + m = receiver.stdout.readline().rstrip() + while len(m): + last = m + m = receiver.stdout.readline().rstrip() + assert last == "message-20000", (last) + """ Create and return a broker with AMQP 1.0 support """ def amqp_broker(self): assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in" + self.port_holder = HaPort(self) #reserve port args = ["--load-module", BrokerTest.amqp_lib, - "--max-negotiate-time=600000", + "--socket-fd=%s" % self.port_holder.fileno, + "--listen-disable=tcp", "--log-enable=trace+:Protocol", "--log-enable=info+"] - return BrokerTest.broker(self, args) + return BrokerTest.broker(self, args, port=self.port_holder.port) + + def amqp_broker(self, port_holder=None): + assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in" + if port_holder: + args = ["--load-module", BrokerTest.amqp_lib, + "--socket-fd=%s" % port_holder.fileno, + "--listen-disable=tcp", + "--log-enable=trace+:Protocol", + "--log-enable=info+"] + return BrokerTest.broker(self, args, port=port_holder.port) + else: + args = ["--load-module", BrokerTest.amqp_lib, + "--log-enable=trace+:Protocol", + "--log-enable=info+"] + return BrokerTest.broker(self, args) + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) diff --git a/qpid/cpp/src/tests/qpid-cluster-lag.py b/qpid/cpp/src/tests/qpid-cluster-lag.py deleted file mode 100755 index 5b24353241..0000000000 --- a/qpid/cpp/src/tests/qpid-cluster-lag.py +++ /dev/null @@ -1,93 +0,0 @@ -#!/usr/bin/env python - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -"""%prog [options] broker... -Check for brokers that lag behind other brokers in a cluster.""" - -import os, os.path, sys, socket, time, re -from qpid.messaging import * -from optparse import OptionParser -from threading import Thread - -class Browser(Thread): - def __init__(self, broker, queue, timeout): - Thread.__init__(self) - self.broker = broker - self.queue = queue - self.timeout = timeout - self.error = None - self.time = None - - def run(self): - try: - self.connection = Connection(self.broker) - self.connection.open() - self.session = self.connection.session() - self.receiver = self.session.receiver("%s;{mode:browse}"%self.queue) - self.msg = self.receiver.fetch(timeout=self.timeout) - self.time = time.time() - if (self.msg.content != self.queue): - raise Exception("Wrong message content, expected '%s' found '%s'"% - (self.queue, self.msg.content)) - except Empty: - self.error = "No message on queue %s"%self.queue - except Exception, e: - self.error = "Error: %s"%e - -def main(argv): - op = OptionParser(usage=__doc__) - op.add_option("--timeout", type="float", default=None, metavar="TIMEOUT", - help="Give up after TIMEOUT milliseconds, default never timeout") - (opts, args) = op.parse_args(argv) - if (len(args) <= 1): op.error("No brokers were specified") - brokers = args[1:] - - # Put a message on a uniquely named queue. - queue = "%s:%s:%s"%(os.path.basename(args[0]), socket.gethostname(), os.getpid()) - connection = Connection(brokers[0]) - connection.open() - session = connection.session() - sender = session.sender( - "%s;{create:always,delete:always,node:{durable:False}}"%queue) - sender.send(Message(content=queue)) - start = time.time() - # Browse for the message on each broker - if opts.timeout: opts.timeout - threads = [Browser(b, queue, opts.timeout) for b in brokers] - for t in threads: t.start() - delays=[] - - for t in threads: - t.join() - if t.error: - delay=t.error - else: - delay = t.time-start - delays.append([delay, t.broker]) - print "%s: %s"%(t.broker,delay) - if delays: - delays.sort() - print "lag: %s (%s-%s)"%(delays[-1][0] - delays[0][0], delays[-1][1], delays[0][1]) - # Clean up - sender.close() - session.close() - connection.close() - -if __name__ == "__main__": sys.exit(main(sys.argv)) diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 510e9be42c..ab26d3d9b5 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -69,11 +69,12 @@ struct Options : public qpid::Options string readyAddress; uint receiveRate; std::string replyto; + bool noReplies; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), help(false), - url("amqp:tcp:127.0.0.1"), + url("127.0.0.1"), timeout(0), forever(false), messages(0), @@ -91,7 +92,8 @@ struct Options : public qpid::Options reportTotal(false), reportEvery(0), reportHeader(true), - receiveRate(0) + receiveRate(0), + noReplies(false) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") @@ -116,6 +118,7 @@ struct Options : public qpid::Options ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive") ("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate of N messages/second. 0 means receive as fast as possible.") ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address on response messages") + ("ignore-reply-to", qpid::optValue(noReplies), "Do not send replies even if reply-to is set") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -222,6 +225,7 @@ int main(int argc, char ** argv) if (opts.printHeaders) { if (msg.getSubject().size()) std::cout << "Subject: " << msg.getSubject() << std::endl; if (msg.getReplyTo()) std::cout << "ReplyTo: " << msg.getReplyTo() << std::endl; + if (msg.getMessageId().size()) std::cout << "MessageId: " << msg.getMessageId() << std::endl; if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl; if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl; if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl; @@ -231,8 +235,10 @@ int main(int argc, char ** argv) std::cout << "Properties: " << msg.getProperties() << std::endl; std::cout << std::endl; } - if (opts.printContent) - std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages + if (opts.printContent) { + if (!msg.getContentObject().isVoid()) std::cout << msg.getContentObject() << std::endl; + else std::cout << msg.getContent() << std::endl; + } if (opts.messages && count >= opts.messages) done = true; } } @@ -245,7 +251,7 @@ int main(int argc, char ** argv) } else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) { session.acknowledge(); } - if (msg.getReplyTo()) { // Echo message back to reply-to address. + if (msg.getReplyTo() && !opts.noReplies) { // Echo message back to reply-to address. Sender& s = replyTo[msg.getReplyTo().str()]; if (s.isNull()) { s = session.createSender(msg.getReplyTo()); @@ -260,8 +266,6 @@ int main(int argc, char ** argv) int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC); } - // Clear out message properties & content for next iteration. - msg = Message(); // TODO aconway 2010-12-01: should be done by fetch } if (opts.reportTotal) reporter.report(); if (opts.tx) { diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp index c3bba31e3b..f912b84e8e 100644 --- a/qpid/cpp/src/tests/qpid-send.cpp +++ b/qpid/cpp/src/tests/qpid-send.cpp @@ -96,7 +96,7 @@ struct Options : public qpid::Options Options(const std::string& argv0=std::string()) : qpid::Options("Options"), help(false), - url("amqp:tcp:127.0.0.1"), + url("127.0.0.1"), messages(1), sendEos(0), durable(false), @@ -262,9 +262,8 @@ class MapContentGenerator : public ContentGenerator { public: MapContentGenerator(const Options& opt) : opts(opt) {} virtual bool setContent(Message& msg) { - Variant::Map map; - opts.setEntries(map); - encode(map, msg); + msg.getContentObject() = qpid::types::Variant::Map(); + opts.setEntries(msg.getContentObject().asMap()); return true; } private: @@ -371,6 +370,7 @@ int main(int argc, char ** argv) msg.setReplyTo(Address(opts.replyto)); } if (!opts.userid.empty()) msg.setUserId(opts.userid); + if (!opts.id.empty()) msg.setMessageId(opts.id); if (!opts.correlationid.empty()) msg.setCorrelationId(opts.correlationid); opts.setProperties(msg); uint sent = 0; diff --git a/qpid/cpp/src/tests/qpidd_qmfv2_tests.py b/qpid/cpp/src/tests/qpidd_qmfv2_tests.py index 36fbe4b438..55497ccc03 100755 --- a/qpid/cpp/src/tests/qpidd_qmfv2_tests.py +++ b/qpid/cpp/src/tests/qpidd_qmfv2_tests.py @@ -39,20 +39,25 @@ class ConsoleTest(BrokerTest): def setUp(self): BrokerTest.setUp(self) - args = ["--mgmt-qmf1=no", - "--mgmt-pub-interval=%d" % self.PUB_INTERVAL] + + def _startBroker(self, QMFv1=False ): + self._broker_is_v1 = QMFv1 + if self._broker_is_v1: + args = ["--mgmt-qmf1=yes", "--mgmt-qmf2=no"] + else: + args = ["--mgmt-qmf1=no", "--mgmt-qmf2=yes"] + + args.append("--mgmt-pub-interval=%d" % self.PUB_INTERVAL) self.broker = BrokerTest.broker(self, args) - def _startQmfV2(self, broker, console=None): + + def _myStartQmf(self, broker, console=None): # I manually set up the QMF session here rather than call the startQmf # method from BrokerTest as I can guarantee the console library is used # (assuming BrokerTest's implementation of startQmf could change) self.qmf_session = qmf.console.Session(console) self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (broker.host(), broker.port())) - self.assertEqual(self.qmf_broker.getBrokerAgent().isV2, True, - "Expected broker agent to support QMF V2") - def _create_queue( self, q_name, args={} ): broker = self.qmf_session.getObjects(_class="broker")[0] @@ -60,11 +65,11 @@ class ConsoleTest(BrokerTest): self.assertEqual(result.status, 0, result) - def test_method_call(self): + def _test_method_call(self): """ Verify method calls work, and check the behavior of getObjects() call """ - self._startQmfV2( self.broker ) + self._myStartQmf( self.broker ) self._create_queue( "fleabag", {"auto-delete":True} ) qObj = None @@ -76,12 +81,16 @@ class ConsoleTest(BrokerTest): self.assertNotEqual(qObj, None, "Failed to get queue object") #print qObj - def test_unsolicited_updates(self): + def _test_unsolicited_updates(self): """ Verify that the Console callbacks work """ class Handler(qmf.console.Console): def __init__(self): + self.v1_oids = 0 + self.v1_events = 0 + self.v2_oids = 0 + self.v2_events = 0 self.broker_info = [] self.broker_conn = [] self.newpackage = [] @@ -109,28 +118,38 @@ class ConsoleTest(BrokerTest): def event(self, broker, event): #print "EVENT %s" % event self.events.append(event) - def objectProps(self, broker, record): - #print "ObjProps %s" % record - assert len(record.getProperties()), "objectProps() invoked with no properties?" + if event.isV2: + self.v2_events += 1 + else: + self.v1_events += 1 + + def heartbeat(self, agent, timestamp): + #print "Heartbeat %s" % agent + self.heartbeats.append( (agent, timestamp) ) + + # generic handler for objectProps and objectStats + def _handle_obj_update(self, record): oid = record.getObjectId() + if oid.isV2: + self.v2_oids += 1 + else: + self.v1_oids += 1 + if oid not in self.updates: self.updates[oid] = record else: self.updates[oid].mergeUpdate( record ) + + def objectProps(self, broker, record): + assert len(record.getProperties()), "objectProps() invoked with no properties?" + self._handle_obj_update(record) + def objectStats(self, broker, record): - #print "ObjStats %s" % record assert len(record.getStatistics()), "objectStats() invoked with no properties?" - oid = record.getObjectId() - if oid not in self.updates: - self.updates[oid] = record - else: - self.updates[oid].mergeUpdate( record ) - def heartbeat(self, agent, timestamp): - #print "Heartbeat %s" % agent - self.heartbeats.append( (agent, timestamp) ) + self._handle_obj_update(record) handler = Handler() - self._startQmfV2( self.broker, handler ) + self._myStartQmf( self.broker, handler ) # this should force objectProps, queueDeclare Event callbacks self._create_queue( "fleabag", {"auto-delete":True} ) # this should force objectStats callback @@ -163,7 +182,15 @@ class ConsoleTest(BrokerTest): break assert msgs == 3, "msgDepth statistics not accurate!" - def test_async_method(self): + # verify that the published objects were of the correct QMF version + if self._broker_is_v1: + assert handler.v1_oids and handler.v2_oids == 0, "QMFv2 updates received while in V1-only mode!" + assert handler.v1_events and handler.v2_events == 0, "QMFv2 events received while in V1-only mode!" + else: + assert handler.v2_oids and handler.v1_oids == 0, "QMFv1 updates received while in V2-only mode!" + assert handler.v2_events and handler.v1_events == 0, "QMFv1 events received while in V2-only mode!" + + def _test_async_method(self): class Handler (qmf.console.Console): def __init__(self): self.cv = Condition() @@ -207,12 +234,40 @@ class ConsoleTest(BrokerTest): return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious) handler = Handler() - self._startQmfV2(self.broker, handler) + self._myStartQmf(self.broker, handler) broker = self.qmf_session.getObjects(_class="broker")[0] handler.request(broker, 20) sleep(1) self.assertEqual(handler.check(), "pass") + def test_method_call(self): + self._startBroker() + self._test_method_call() + + def test_unsolicited_updates(self): + self._startBroker() + self._test_unsolicited_updates() + + def test_async_method(self): + self._startBroker() + self._test_async_method() + + # For now, include "QMFv1 only" tests. Once QMFv1 is deprecated, these can + # be removed + + def test_method_call_v1(self): + self._startBroker(QMFv1=True) + self._test_method_call() + + def test_unsolicited_updates_v1(self): + self._startBroker(QMFv1=True) + self._test_unsolicited_updates() + + def test_async_method_v1(self): + self._startBroker(QMFv1=True) + self._test_async_method() + + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) diff --git a/qpid/cpp/src/tests/qpidt b/qpid/cpp/src/tests/qpidt index 5bdfb6eefd..92df9efc8d 100755 --- a/qpid/cpp/src/tests/qpidt +++ b/qpid/cpp/src/tests/qpidt @@ -117,7 +117,7 @@ class Manager: if k == "name": name = v elif v: - if isinstance(v, dict) and v["_object_name"]: + if isinstance(v, dict) and "_object_name" in v: v = v["_object_name"] details += "%s=%s " %(k,v) print "%-25s %s" % (name, details) diff --git a/qpid/cpp/src/tests/queue_flow_limit_tests.py b/qpid/cpp/src/tests/queue_flow_limit_tests.py index c7361a629e..83e31e979b 100644 --- a/qpid/cpp/src/tests/queue_flow_limit_tests.py +++ b/qpid/cpp/src/tests/queue_flow_limit_tests.py @@ -27,6 +27,8 @@ from os import environ, popen class QueueFlowLimitTests(TestBase010): + _timeout = 100 + def __getattr__(self, name): if name == "assertGreater": return lambda a, b: self.failUnless(a > b) @@ -156,7 +158,7 @@ class QueueFlowLimitTests(TestBase010): totalMsgs = 1213 + 797 + 331 # wait until flow control is active - deadline = time() + 10 + deadline = time() + self._timeout while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \ time() < deadline: pass @@ -209,7 +211,7 @@ class QueueFlowLimitTests(TestBase010): totalMsgs = 1699 + 1129 + 881 # wait until flow control is active - deadline = time() + 10 + deadline = time() + self._timeout while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \ time() < deadline: pass @@ -255,7 +257,7 @@ class QueueFlowLimitTests(TestBase010): # fill up the queue, waiting until flow control is active sndr1 = self._start_qpid_send(testq.mgmt.name, count=testq.sendCount, content=testq.content) - deadline = time() + 10 + deadline = time() + self._timeout while (not testq.mgmt.flowStopped) and time() < deadline: testq.mgmt.update() @@ -357,7 +359,7 @@ class QueueFlowLimitTests(TestBase010): sender = BlockedSender(self, "kill-q", count=100) # wait for flow control - deadline = time() + 10 + deadline = time() + self._timeout while (not q.flowStopped) and time() < deadline: q.update() diff --git a/qpid/cpp/src/tests/rsynchosts b/qpid/cpp/src/tests/rsynchosts index 4f19c3b22a..81c9699ac3 100755 --- a/qpid/cpp/src/tests/rsynchosts +++ b/qpid/cpp/src/tests/rsynchosts @@ -27,13 +27,21 @@ abspath() { } usage() { - echo "Usage: $(basename $0) file [file...] + echo "Usage: $(basename $0) [-l user] file [file...] Synchronize the contents of each file or directory to the same absolute path on each host in \$HOSTS. " exit 1 } +while getopts "l:" opt; do + case $opt in + l) RSYNC_USER="$OPTARG@" ;; + *) usage ;; + esac +done +shift `expr $OPTIND - 1` + test "$*" || usage for f in $*; do FILES="$FILES $(abspath $f)" || exit 1; done @@ -42,7 +50,7 @@ OK_FILE=`mktemp` # Will be deleted if anything goes wrong. trap "rm -f $OK_FILE" EXIT for h in $HOSTS; do - rsync -aRO --delete $FILES $h:/ || { echo "rsync to $h failed"; rm -f $OK_FILE; } & + rsync -vaRO --delete $FILES $RSYNC_USER$h:/ || { echo "rsync to $h failed"; rm -f $OK_FILE; } & done wait test -f $OK_FILE diff --git a/qpid/cpp/src/tests/run-unit-tests b/qpid/cpp/src/tests/run-unit-tests deleted file mode 100755 index 5337a3dc22..0000000000 --- a/qpid/cpp/src/tests/run-unit-tests +++ /dev/null @@ -1,48 +0,0 @@ -#!/bin/bash - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# -# Library names (without path or .so) and CppUnit test paths can be -# specified on the command line or in env var UNIT_TESTS. For example: -# -# Selected test classes: -# ./run-unit-tests ValueTest ClientChannelTest -# -# Individual test method -# ./run-unit-tests ValueTest :ValueTest::testStringValueEquals -# -# Build and run selected tests: -# make check TESTS=run-unit-tests UNIT_TESTS=ClientChannelTest -# - -for u in $* $UNIT_TESTS ; do - case $u in - :*) TEST_ARGS="$TEST_ARGS $u" ;; # A test path. - *) TEST_ARGS="$TEST_ARGS .libs/$u.so" ;; # A test library. - esac -done -test -z "$TEST_ARGS" && TEST_ARGS=".libs/*Test.so" - -test -z "$srcdir" && srcdir=. - -# libdlclose_noop prevents unloading symbols needed for valgrind output. -export LD_PRELOAD=.libs/libdlclose_noop.so -source $srcdir/run_test DllPlugInTester -c -b $TEST_ARGS diff --git a/qpid/cpp/src/tests/run_cli_tests b/qpid/cpp/src/tests/run_cli_tests index a7f55d58b7..e9590080a1 100755 --- a/qpid/cpp/src/tests/run_cli_tests +++ b/qpid/cpp/src/tests/run_cli_tests @@ -44,8 +44,8 @@ start_brokers() { # look like they're xml related. # if we start supporting xml on windows, it will need something similar # here - if [ -f ../.libs/xml.so ] ; then - xargs="--load-module ../.libs/xml.so" + if [ -f ../xml.so ] ; then + xargs="--load-module ../xml.so" if [ ! -f test.xquery ] ; then create_test_xquery fi diff --git a/qpid/cpp/src/tests/run_federation_tests b/qpid/cpp/src/tests/run_federation_tests index 09219141ef..bf33f96ebd 100755 --- a/qpid/cpp/src/tests/run_federation_tests +++ b/qpid/cpp/src/tests/run_federation_tests @@ -25,7 +25,7 @@ source ./test_env.sh #set -x trap stop_brokers INT TERM QUIT -if [ -f ../.libs/xml.so ] ; then +if [ -f ../xml.so ] ; then MODULES="--load-module xml" # Load the XML exchange and run XML exchange federation tests SKIPTESTS= else diff --git a/qpid/cpp/src/tests/run_msg_group_tests.ps1 b/qpid/cpp/src/tests/run_msg_group_tests.ps1 new file mode 100644 index 0000000000..e9cee0a5a0 --- /dev/null +++ b/qpid/cpp/src/tests/run_msg_group_tests.ps1 @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Simple test of encode/decode of a double in application headers +# TODO: this should be expanded to cover a wider set of types and go +# in both directions + +$srcdir = Split-Path $myInvocation.InvocationName +$PYTHON_DIR = "$srcdir\..\..\..\python" +if (!(Test-Path $PYTHON_DIR -pathType Container)) { + "Skipping msg_group test as python libs not found" + exit 0 +} + +. .\test_env.ps1 + +if (Test-Path qpidd.port) { + set-item -path env:QPID_PORT -value (get-content -path qpidd.port -totalcount 1) +} + +# Test runs from the tests directory but the test executables are in a +# subdirectory based on the build type. Look around for it before trying +# to start it. +. $srcdir\find_prog.ps1 .\msg_group_test.exe +if (!(Test-Path $prog)) { + "Cannot locate msg_group_test.exe" + exit 1 +} + +$QUEUE_NAME="group-queue" +$GROUP_KEY="My-Group-Id" +$BROKER_URL="localhost:$env:QPID_PORT" + +$tests=@("python $QPID_CONFIG_EXEC -b $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups", + "$prog -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 3", + "$prog -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 7 --randomize-group-size", + "python $QPID_CONFIG_EXEC -b $BROKER_URL add queue ${QUEUE_NAME}-two --group-header=${GROUP_KEY} --shared-groups", + "$prog -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 3 --randomize-group-size", + "$prog -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 5", + "$prog -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 5 --receivers 2 --senders 3 --capacity 1 --ack-frequency 3 --randomize-group-size", + "python $QPID_CONFIG_EXEC -b $BROKER_URL del queue ${QUEUE_NAME}-two --force", + "$prog -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 3 --receivers 2 --senders 3 --capacity 1 --ack-frequency 1 --randomize-group-size", + "$prog -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 211 --group-size 13 --receivers 2 --senders 3 --capacity 47 --ack-frequency 79 --interleave 53", + "$prog -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10000 --group-size 1 --receivers 0 --senders 1", + "$prog -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10000 --receivers 5 --senders 0", + "python $QPID_CONFIG_EXEC -b $BROKER_URL del queue $QUEUE_NAME --force") + +foreach ($cmd in $tests) +{ + Invoke-Expression "$cmd" | Write-Output + $ret = $LASTEXITCODE + if ($ret -ne 0) {Write-Host "FAILED message group test. Failed command: $cmd" + break} +} +exit $ret diff --git a/qpid/cpp/src/tests/sasl.mk b/qpid/cpp/src/tests/sasl.mk deleted file mode 100644 index 8c31192635..0000000000 --- a/qpid/cpp/src/tests/sasl.mk +++ /dev/null @@ -1,44 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Test that are only relevant if SASL is enabled. -if HAVE_SASL - -# Note: sasl_version is not a test -- it is a tool used by tests. -check_PROGRAMS+=sasl_version -sasl_version_SOURCES=sasl_version.cpp -sasl_version_LDADD=$(lib_client) - -TESTS += sasl_fed - sasl_fed_ex_dynamic - sasl_fed_ex_link - sasl_fed_ex_queue - sasl_fed_ex_route - sasl_no_dir - -EXTRA_DIST += sasl_fed \ - sasl_fed_ex \ - sasl_fed_ex_dynamic \ - sasl_fed_ex_link \ - sasl_fed_ex_queue \ - sasl_fed_ex_route \ - sasl_no_dir - - -endif # HAVE_SASL diff --git a/qpid/cpp/src/tests/ssl.mk b/qpid/cpp/src/tests/ssl.mk deleted file mode 100644 index 1544dc5e71..0000000000 --- a/qpid/cpp/src/tests/ssl.mk +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -TESTS+=ssl_test -EXTRA_DIST+=ssl_test -CLEAN_LOCAL += test_cert_dir cert.password diff --git a/qpid/cpp/src/tests/swig_python_tests b/qpid/cpp/src/tests/swig_python_tests new file mode 100755 index 0000000000..6f862ffa2d --- /dev/null +++ b/qpid/cpp/src/tests/swig_python_tests @@ -0,0 +1,62 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Run the python tests. +source ./test_env.sh +trap stop_broker INT TERM QUIT + +if [[ -a $AMQP_LIB ]] ; then + echo "Found AMQP support: $AMQP_LIB" + MODULES="--load-module $AMQP_LIB" +fi + +fail() { + echo "FAIL swigged python tests: $1"; exit 1; +} +skip() { + echo "SKIPPED swigged python tests: $1"; exit 0; +} + +start_broker() { + QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir $MODULES --auth no) || fail "Could not start broker" +} + +stop_broker() { + $QPIDD_EXEC -q --port $QPID_PORT +} + +test -d $PYTHON_DIR || skip "no python dir" +test -f $PYTHONSWIGMODULE || skip "no swigged python client" + +start_broker +echo "Running swigged python tests using broker on port $QPID_PORT" + +export PYTHONPATH=$PYTHONPATH:$PYTHONPATH_SWIG +$QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests || FAILED=1 +if [[ -a $AMQPC_LIB ]] ; then + export QPID_LOAD_MODULE=$AMQPC_LIB + $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1 +fi +stop_broker +if [[ $FAILED -eq 1 ]]; then + fail "" +fi + diff --git a/qpid/cpp/src/tests/test_env.ps1.in b/qpid/cpp/src/tests/test_env.ps1.in index 5fa3a0ac31..12373b5b35 100644 --- a/qpid/cpp/src/tests/test_env.ps1.in +++ b/qpid/cpp/src/tests/test_env.ps1.in @@ -41,7 +41,6 @@ $PYTHON_COMMANDS="$QPID_TOOLS\src\py" $env:PYTHONPATH="$srcdir;$PYTHON_DIR;$PYTHON_COMMANDS;$QPID_TESTS_PY;$QPID_TOOLS_LIBS;$QMF_LIB;$env:PYTHONPATH" $QPID_CONFIG_EXEC="$PYTHON_COMMANDS\qpid-config" $QPID_ROUTE_EXEC="$PYTHON_COMMANDS\qpid-route" -$QPID_CLUSTER_EXEC="$PYTHON_COMMANDS\qpid-cluster" $QPID_HA_TOOL_EXEC="$PYTHON_COMMANDS\qpid-ha-tool" # Executables diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in index bafdacf944..486034ca3b 100644 --- a/qpid/cpp/src/tests/test_env.sh.in +++ b/qpid/cpp/src/tests/test_env.sh.in @@ -25,6 +25,8 @@ builddir=`absdir @abs_builddir@` top_srcdir=`absdir @abs_top_srcdir@` top_builddir=`absdir @abs_top_builddir@` moduledir=$top_builddir/src@builddir_lib_suffix@ +pythonswigdir=$top_builddir/bindings/qpid/python/ +pythonswiglibdir=$top_builddir/bindings/qpid/python@builddir_lib_suffix@ testmoduledir=$builddir@builddir_lib_suffix@ export QPID_INSTALL_PREFIX=@prefix@ @@ -44,7 +46,8 @@ export PYTHONPATH=$srcdir:$PYTHON_DIR:$PYTHON_COMMANDS:$QPID_TESTS_PY:$QMF_LIB:$ export QPID_CONFIG_EXEC=$PYTHON_COMMANDS/qpid-config export QPID_ROUTE_EXEC=$PYTHON_COMMANDS/qpid-route export QPID_HA_EXEC=$PYTHON_COMMANDS/qpid-ha - +export PYTHONPATH_SWIG=$pythonswigdir:$pythonswiglibdir +export PYTHONSWIGMODULE=$pythonswigdir/qpid_messaging.py # Executables export QPIDD_EXEC=$top_builddir/src/qpidd @@ -78,5 +81,5 @@ if [ ! -e "$HOME" ]; then fi # Options for boost test framework -export BOOST_TEST_SHOW_PROGRESS=yes -export BOOST_TEST_CATCH_SYSTEM_ERRORS=no +test -z "$BOOST_TEST_SHOW_PROGRESS" && export BOOST_TEST_SHOW_PROGRESS=yes +test -z "$BOOST_TEST_CATCH_SYSTEM_ERRORS" && export BOOST_TEST_CATCH_SYSTEM_ERRORS=no diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp index eac4deda2d..e299161c68 100644 --- a/qpid/cpp/src/tests/test_store.cpp +++ b/qpid/cpp/src/tests/test_store.cpp @@ -40,14 +40,19 @@ #include "qpid/sys/Thread.h" #include "qpid/Plugin.h" #include "qpid/Options.h" +#include "qpid/RefCounted.h" +#include "qpid/Msg.h" #include <boost/cast.hpp> #include <boost/lexical_cast.hpp> #include <memory> +#include <ostream> #include <fstream> +#include <sstream> -using namespace qpid; -using namespace broker; using namespace std; +using namespace boost; +using namespace qpid; +using namespace qpid::broker; using namespace qpid::sys; namespace qpid { @@ -57,11 +62,19 @@ struct TestStoreOptions : public Options { string name; string dump; + string events; + vector<string> throwMsg; // Throw exception if message content matches. TestStoreOptions() : Options("Test Store Options") { addOptions() - ("test-store-name", optValue(name, "NAME"), "Name of test store instance.") - ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.") + ("test-store-name", optValue(name, "NAME"), + "Name of test store instance.") + ("test-store-dump", optValue(dump, "FILE"), + "File to dump enqueued messages.") + ("test-store-events", optValue(events, "FILE"), + "File to log events, 1 line per event.") + ("test-store-throw", optValue(throwMsg, "CONTENT"), + "Throw exception if message content matches.") ; } }; @@ -82,24 +95,76 @@ class TestStore : public NullMessageStore { TestStore(const TestStoreOptions& opts, Broker& broker_) : options(opts), name(opts.name), broker(broker_) { - QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump); - if (!options.dump.empty()) + QPID_LOG(info, "TestStore name=" << name + << " dump=" << options.dump + << " events=" << options.events + << " throw messages =" << options.throwMsg.size()); + + if (!options.dump.empty()) dump.reset(new ofstream(options.dump.c_str())); + if (!options.events.empty()) + events.reset(new ofstream(options.events.c_str())); } ~TestStore() { for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1)); } - virtual bool isNull() const { return false; } - - void enqueue(TransactionContext* , + // Dummy transaction context. + struct TxContext : public TPCTransactionContext { + static int nextId; + string id; + TxContext() : id(lexical_cast<string>(nextId++)) {} + TxContext(string xid) : id(xid) {} + }; + + static string getId(const TransactionContext& tx) { + const TxContext* tc = dynamic_cast<const TxContext*>(&tx); + assert(tc); + return tc->id; + } + + + bool isNull() const { return false; } + + void log(const string& msg) { + QPID_LOG(info, "test_store: " << msg); + if (events.get()) *events << msg << endl << std::flush; + } + + auto_ptr<TransactionContext> begin() { + auto_ptr<TxContext> tx(new TxContext()); + log(Msg() << "<begin tx " << tx->id << ">"); + return auto_ptr<TransactionContext>(tx); + } + + auto_ptr<TPCTransactionContext> begin(const std::string& xid) { + auto_ptr<TxContext> tx(new TxContext(xid)); + log(Msg() << "<begin tx " << tx->id << ">"); + return auto_ptr<TPCTransactionContext>(tx); + } + + string getContent(const intrusive_ptr<PersistableMessage>& msg) { + intrusive_ptr<broker::Message::Encoding> enc( + dynamic_pointer_cast<broker::Message::Encoding>(msg)); + return enc->getContent(); + } + + void enqueue(TransactionContext* tx, const boost::intrusive_ptr<PersistableMessage>& pmsg, - const PersistableQueue& ) + const PersistableQueue& queue) { - qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get()); + QPID_LOG(debug, "TestStore enqueue " << queue.getName()); + qpid::broker::amqp_0_10::MessageTransfer* msg = + dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get()); assert(msg); + ostringstream o; + o << "<enqueue " << queue.getName() << " " << getContent(msg); + if (tx) o << " tx=" << getId(*tx); + o << ">"; + log(o.str()); + // Dump the message if there is a dump file. if (dump.get()) { msg->getFrames().getMethod()->print(*dump); @@ -113,7 +178,11 @@ class TestStore : public NullMessageStore { string data = msg->getFrames().getContent(); size_t i = string::npos; size_t j = string::npos; - if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0 + const vector<string>& throwMsg(options.throwMsg); + if (find(throwMsg.begin(), throwMsg.end(), data) != throwMsg.end()) { + throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data)); + } + else if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0 && (i = data.find(name+"[")) != string::npos && (j = data.find("]", i)) != string::npos) { @@ -144,6 +213,31 @@ class TestStore : public NullMessageStore { msg->enqueueComplete(); } + void dequeue(TransactionContext* tx, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) + { + QPID_LOG(debug, "TestStore dequeue " << queue.getName()); + ostringstream o; + o<< "<dequeue " << queue.getName() << " " << getContent(msg); + if (tx) o << " tx=" << getId(*tx); + o << ">"; + log(o.str()); + } + + void prepare(TPCTransactionContext& txn) { + log(Msg() << "<prepare tx=" << getId(txn) << ">"); + } + + void commit(TransactionContext& txn) { + log(Msg() << "<commit tx=" << getId(txn) << ">"); + } + + void abort(TransactionContext& txn) { + log(Msg() << "<abort tx=" << getId(txn) << ">"); + } + + private: static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC; TestStoreOptions options; @@ -151,8 +245,11 @@ class TestStore : public NullMessageStore { Broker& broker; vector<Thread> threads; std::auto_ptr<ofstream> dump; + std::auto_ptr<ofstream> events; }; +int TestStore::TxContext::nextId(1); + const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: "; const string TestStore::EXCEPTION = "exception"; const string TestStore::EXIT_PROCESS = "exit_process"; diff --git a/qpid/cpp/src/tests/test_tools.h b/qpid/cpp/src/tests/test_tools.h index de672f938a..7950a36913 100644 --- a/qpid/cpp/src/tests/test_tools.h +++ b/qpid/cpp/src/tests/test_tools.h @@ -23,7 +23,6 @@ #include <limits.h> // Include before boost/test headers. #include <boost/test/test_tools.hpp> #include <boost/assign/list_of.hpp> -#include <boost/assign/list_of.hpp> #include <vector> #include <set> #include <ostream> diff --git a/qpid/cpp/src/tests/testagent.cpp b/qpid/cpp/src/tests/testagent.cpp deleted file mode 100644 index e6010a8e00..0000000000 --- a/qpid/cpp/src/tests/testagent.cpp +++ /dev/null @@ -1,208 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <qpid/management/Manageable.h> -#include <qpid/management/ManagementObject.h> -#include <qpid/agent/ManagementAgent.h> -#include <qpid/sys/Mutex.h> -#include <qpid/sys/Time.h> -#include "qmf/org/apache/qpid/agent/example/Parent.h" -#include "qmf/org/apache/qpid/agent/example/Child.h" -#include "qmf/org/apache/qpid/agent/example/ArgsParentCreate_child.h" -#include "qmf/org/apache/qpid/agent/example/EventChildCreated.h" -#include "qmf/org/apache/qpid/agent/example/Package.h" - -#include <signal.h> -#include <cstdlib> -#include <iostream> - -#include <sstream> - -namespace qpid { -namespace tests { - -static bool running = true; - -using std::string; -using qpid::management::ManagementAgent; -using qpid::management::ManagementObject; -using qpid::management::Manageable; -using qpid::management::Args; -using qpid::sys::Mutex; -namespace _qmf = qmf::org::apache::qpid::agent::example; - -class ChildClass; - -//============================================================== -// CoreClass is the operational class that corresponds to the -// "Parent" class in the management schema. -//============================================================== -class CoreClass : public Manageable -{ - string name; - ManagementAgent* agent; - _qmf::Parent* mgmtObject; - std::vector<ChildClass*> children; - Mutex vectorLock; - -public: - - CoreClass(ManagementAgent* agent, string _name); - ~CoreClass() { mgmtObject->resourceDestroy(); } - - ManagementObject* GetManagementObject(void) const - { return mgmtObject; } - - void doLoop(); - status_t ManagementMethod (uint32_t methodId, Args& args, string& text); -}; - -class ChildClass : public Manageable -{ - string name; - _qmf::Child* mgmtObject; - -public: - - ChildClass(ManagementAgent* agent, CoreClass* parent, string name); - ~ChildClass() { mgmtObject->resourceDestroy(); } - - ManagementObject* GetManagementObject(void) const - { return mgmtObject; } - - void doWork() - { - mgmtObject->inc_count(2); - } -}; - -CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent(_agent) -{ - static uint64_t persistId = 0x111222333444555LL; - mgmtObject = new _qmf::Parent(agent, this, name); - - agent->addObject(mgmtObject, persistId++); - mgmtObject->set_state("IDLE"); -} - -void CoreClass::doLoop() -{ - // Periodically bump a counter to provide a changing statistical value - while (running) { - qpid::sys::sleep(1); - mgmtObject->inc_count(); - mgmtObject->set_state("IN_LOOP"); - - { - Mutex::ScopedLock _lock(vectorLock); - - for (std::vector<ChildClass*>::iterator iter = children.begin(); - iter != children.end(); - iter++) { - (*iter)->doWork(); - } - } - } -} - -Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args, string& /*text*/) -{ - Mutex::ScopedLock _lock(vectorLock); - - switch (methodId) { - case _qmf::Parent::METHOD_CREATE_CHILD: - _qmf::ArgsParentCreate_child& ioArgs = (_qmf::ArgsParentCreate_child&) args; - - ChildClass *child = new ChildClass(agent, this, ioArgs.i_name); - ioArgs.o_childRef = child->GetManagementObject()->getObjectId(); - - children.push_back(child); - - agent->raiseEvent(_qmf::EventChildCreated(ioArgs.i_name)); - - return STATUS_OK; - } - - return STATUS_NOT_IMPLEMENTED; -} - -ChildClass::ChildClass(ManagementAgent* agent, CoreClass* parent, string name) -{ - mgmtObject = new _qmf::Child(agent, this, parent, name); - - agent->addObject(mgmtObject); -} - - -//============================================================== -// Main program -//============================================================== - -ManagementAgent::Singleton* singleton; - -void shutdown(int) -{ - running = false; -} - -int main_int(int argc, char** argv) -{ - singleton = new ManagementAgent::Singleton(); - const char* host = argc>1 ? argv[1] : "127.0.0.1"; - int port = argc>2 ? atoi(argv[2]) : 5672; - - signal(SIGINT, shutdown); - - // Create the qmf management agent - ManagementAgent* agent = singleton->getInstance(); - - // Register the Qmf_example schema with the agent - _qmf::Package packageInit(agent); - - // Start the agent. It will attempt to make a connection to the - // management broker - agent->init(host, port, 5, false, ".magentdata"); - - // Allocate some core objects - CoreClass core1(agent, "Example Core Object #1"); - CoreClass core2(agent, "Example Core Object #2"); - CoreClass core3(agent, "Example Core Object #3"); - - core1.doLoop(); - - // done, cleanup and exit - delete singleton; - - return 0; -} - -}} // namespace qpid::tests - -int main(int argc, char** argv) -{ - try { - return qpid::tests::main_int(argc, argv); - } catch(std::exception& e) { - std::cerr << "Top Level Exception: " << e.what() << std::endl; - return 1; - } -} - diff --git a/qpid/cpp/src/tests/testagent.mk b/qpid/cpp/src/tests/testagent.mk deleted file mode 100644 index 0492f3e3bb..0000000000 --- a/qpid/cpp/src/tests/testagent.mk +++ /dev/null @@ -1,51 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Build a simple qmf agent for test purposes. - -TESTAGENT_GEN_SRC= \ - testagent_gen/qmf/org/apache/qpid/agent/example/Parent.h \ - testagent_gen/qmf/org/apache/qpid/agent/example/Child.h \ - testagent_gen/qmf/org/apache/qpid/agent/example/Parent.cpp \ - testagent_gen/qmf/org/apache/qpid/agent/example/Child.cpp \ - testagent_gen/qmf/org/apache/qpid/agent/example/ArgsParentCreate_child.h \ - testagent_gen/qmf/org/apache/qpid/agent/example/EventChildCreated.h \ - testagent_gen/qmf/org/apache/qpid/agent/example/EventChildDestroyed.h \ - testagent_gen/qmf/org/apache/qpid/agent/example/EventChildCreated.cpp \ - testagent_gen/qmf/org/apache/qpid/agent/example/EventChildDestroyed.cpp \ - testagent_gen/qmf/org/apache/qpid/agent/example/Package.h \ - testagent_gen/qmf/org/apache/qpid/agent/example/Package.cpp - -$(TESTAGENT_GEN_SRC): testagent_gen.timestamp -if GENERATE -TESTAGENT_DEPS=../mgen.timestamp -endif # GENERATE -testagent_gen.timestamp: testagent.xml ${TESTAGENT_DEPS} - $(QMF_GEN) -o testagent_gen/qmf $(srcdir)/testagent.xml - touch $@ - -CLEANFILES+=$(TESTAGENT_GEN_SRC) testagent_gen.timestamp - -testagent-testagent.$(OBJEXT): $(TESTAGENT_GEN_SRC) -qpidexectest_PROGRAMS+=testagent -testagent_CXXFLAGS=$(CXXFLAGS) -Itestagent_gen -testagent_SOURCES=testagent.cpp $(TESTAGENT_GEN_SRC) -testagent_LDADD=$(top_builddir)/src/libqmf.la $(top_builddir)/src/libqpidcommon.la $(top_builddir)/src/libqpidtypes.la $(top_builddir)/src/libqpidclient.la - -EXTRA_DIST+=testagent.xml diff --git a/qpid/cpp/src/tests/testagent.xml b/qpid/cpp/src/tests/testagent.xml deleted file mode 100644 index 0b1436f999..0000000000 --- a/qpid/cpp/src/tests/testagent.xml +++ /dev/null @@ -1,64 +0,0 @@ -<schema package="org.apache.qpid.agent.example"> - -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. ---> - - <!-- - =============================================================== - Parent - =============================================================== - --> - <class name="Parent"> - - This class represents a parent object - - <property name="name" type="lstr" access="RC" index="y"/> - - <statistic name="state" type="sstr" desc="Operational state of the link"/> - <statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/> - - <method name="create_child" desc="Create child object"> - <arg name="name" dir="I" type="lstr"/> - <arg name="childRef" dir="O" type="objId"/> - </method> - </class> - - - <!-- - =============================================================== - Child - =============================================================== - --> - <class name="Child"> - <property name="ParentRef" type="objId" references="Parent" access="RC" index="y" parentRef="y"/> - <property name="name" type="lstr" access="RC" index="y"/> - - <statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/> - - <method name="delete"/> - </class> - - <eventArguments> - <arg name="childName" type="lstr"/> - </eventArguments> - - <event name="ChildCreated" args="childName"/> - <event name="ChildDestroyed" args="childName"/> -</schema> - |
