diff options
Diffstat (limited to 'cpp/src')
| -rw-r--r-- | cpp/src/tests/ClusterFailover.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/tests/ClusterFixture.cpp | 13 | ||||
| -rw-r--r-- | cpp/src/tests/ClusterFixture.h | 8 | ||||
| -rw-r--r-- | cpp/src/tests/Makefile.am | 8 | ||||
| -rw-r--r-- | cpp/src/tests/PartialFailure.cpp | 30 | ||||
| -rw-r--r-- | cpp/src/tests/cluster_test.cpp | 214 | ||||
| -rw-r--r-- | cpp/src/tests/test_tools.h | 9 |
7 files changed, 175 insertions, 112 deletions
diff --git a/cpp/src/tests/ClusterFailover.cpp b/cpp/src/tests/ClusterFailover.cpp index db2392b296..7d49ed5cda 100644 --- a/cpp/src/tests/ClusterFailover.cpp +++ b/cpp/src/tests/ClusterFailover.cpp @@ -50,7 +50,10 @@ const sys::Duration TIMEOUT=sys::TIME_SEC/4; // Test re-connecting with same session name after a failure. QPID_AUTO_TEST_CASE(testReconnectSameSessionName) { - ClusterFixture cluster(2, -1); + ostringstream clusterLib; + clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so"; + ClusterFixture::Args args = list_of<string>("--auth")("no")("--no-module-dir")("--no-data-dir")("--load-module")(clusterLib.str()); + ClusterFixture cluster(2, args, -1); Client c0(cluster[0], "foo"); cluster.kill(0, 9); Client c1(cluster[1], "foo"); // Using same name, should be cleaned up. diff --git a/cpp/src/tests/ClusterFixture.cpp b/cpp/src/tests/ClusterFixture.cpp index 70d60b10b4..e12106c464 100644 --- a/cpp/src/tests/ClusterFixture.cpp +++ b/cpp/src/tests/ClusterFixture.cpp @@ -61,25 +61,20 @@ using boost::assign::list_of; #include "ClusterFixture.h" -ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_, const string& clusterLib_) - : name(Uuid(true).str()), localIndex(localIndex_), userArgs(args_), clusterLib(clusterLib_) +ClusterFixture::ClusterFixture(size_t n, const Args& args_, int localIndex_) + : name(Uuid(true).str()), localIndex(localIndex_), userArgs(args_) { add(n); } -ClusterFixture::ClusterFixture(size_t n, int localIndex_, boost::function<void (Args&, size_t)> updateArgs_, const string& clusterLib_) - : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_), clusterLib(clusterLib_) +ClusterFixture::ClusterFixture(size_t n, boost::function<void (Args&, size_t)> updateArgs_, int localIndex_) + : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_) { add(n); } -const ClusterFixture::Args ClusterFixture::DEFAULT_ARGS = - list_of<string>("--auth=no")("--no-data-dir"); - ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix, size_t index) { Args args = list_of<string>("qpidd ") - ("--no-module-dir") - ("--load-module")(clusterLib) ("--cluster-name")(name) ("--log-prefix")(prefix); args.insert(args.end(), userArgs.begin(), userArgs.end()); diff --git a/cpp/src/tests/ClusterFixture.h b/cpp/src/tests/ClusterFixture.h index 353ec0c88d..08b314499e 100644 --- a/cpp/src/tests/ClusterFixture.h +++ b/cpp/src/tests/ClusterFixture.h @@ -60,8 +60,6 @@ using qpid::broker::Broker; using boost::shared_ptr; using qpid::cluster::Cluster; -#define DEFAULT_CLUSTER_LIB "../.libs/cluster.so" - /** Cluster fixture is a vector of ports for the replicas. * * At most one replica (by default replica 0) is in the current @@ -70,15 +68,14 @@ using qpid::cluster::Cluster; class ClusterFixture : public vector<uint16_t> { public: typedef std::vector<std::string> Args; - static const Args DEFAULT_ARGS; /** @param localIndex can be -1 meaning don't automatically start a local broker. * A local broker can be started with addLocal(). */ - ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS, const string& clusterLib = DEFAULT_CLUSTER_LIB); + ClusterFixture(size_t n, const Args& args, int localIndex=0); /**@param updateArgs function is passed the index of the cluster member and can update the arguments. */ - ClusterFixture(size_t n, int localIndex, boost::function<void (Args&, size_t)> updateArgs, const string& clusterLib = DEFAULT_CLUSTER_LIB); + ClusterFixture(size_t n, boost::function<void (Args&, size_t)> updateArgs, int localIndex); void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } void add(); // Add a broker. @@ -102,7 +99,6 @@ class ClusterFixture : public vector<uint16_t> { std::vector<shared_ptr<ForkedBroker> > forkedBrokers; Args userArgs; boost::function<void (Args&, size_t)> updateArgs; - string clusterLib; }; /** diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 161428fcad..98d101049b 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -211,7 +211,13 @@ check_PROGRAMS+=DispatcherTest DispatcherTest_SOURCES=DispatcherTest.cpp DispatcherTest_LDADD=$(lib_common) $(SOCKLIBS) -TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test +TESTS_ENVIRONMENT = \ + VALGRIND=$(VALGRIND) \ + srcdir=$(srcdir) \ + QPID_DATA_DIR= \ + QPID_LIB_DIR=../.libs \ + BOOST_TEST_SHOW_PROGRESS=yes \ + $(srcdir)/run_test system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test diff --git a/cpp/src/tests/PartialFailure.cpp b/cpp/src/tests/PartialFailure.cpp index 5137672e7d..91fa63e6e9 100644 --- a/cpp/src/tests/PartialFailure.cpp +++ b/cpp/src/tests/PartialFailure.cpp @@ -33,7 +33,7 @@ QPID_AUTO_TEST_SUITE(PartialFailureTestSuite) - using namespace std; +using namespace std; using namespace qpid; using namespace qpid::cluster; using namespace qpid::framing; @@ -49,11 +49,19 @@ const sys::Duration TIMEOUT=sys::TIME_SEC/4; static bool isLogOption(const std::string& s) { return boost::starts_with(s, "--log-enable"); } void updateArgs(ClusterFixture::Args& args, size_t index) { - ostringstream os; - os << "--test-store-name=s" << index; - args.push_back(os.str()); - args.push_back("--load-module=.libs/test_store.so"); - args.push_back("--auth=no"); + ostringstream clusterLib, testStoreLib, storeName; + clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so"; + testStoreLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/../tests/.libs/test_store.so"; + storeName << "s" << index; + args.push_back("--auth"); + args.push_back("no"); + args.push_back("--no-module-dir"); + args.push_back("--load-module"); + args.push_back(clusterLib.str()); + args.push_back("--load-module"); + args.push_back(testStoreLib.str()); + args.push_back("--test-store-name"); + args.push_back(storeName.str()); args.push_back("TMP_DATA_DIR"); // These tests generate errors deliberately, disable error logging unless a log env var is set. @@ -82,7 +90,7 @@ QPID_AUTO_TEST_CASE(testNormalErrors) { // Connection thread. ScopedSuppressLogging allQuiet; - ClusterFixture cluster(3, -1, updateArgs); + ClusterFixture cluster(3, updateArgs, -1); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); Client c2(cluster[2], "c2"); @@ -113,7 +121,7 @@ QPID_AUTO_TEST_CASE(testNormalErrors) { QPID_AUTO_TEST_CASE(testErrorAfterJoin) { ScopedSuppressLogging allQuiet; - ClusterFixture cluster(1, -1, updateArgs); + ClusterFixture cluster(1, updateArgs, -1); Client c0(cluster[0]); c0.session.queueDeclare("q", durable=true); c0.session.messageTransfer(content=pMessage("a", "q")); @@ -138,7 +146,7 @@ QPID_AUTO_TEST_CASE(testErrorAfterJoin) { QPID_AUTO_TEST_CASE(testSinglePartialFailure) { ScopedSuppressLogging allQuiet; - ClusterFixture cluster(3, -1, updateArgs); + ClusterFixture cluster(3, updateArgs, -1); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); Client c2(cluster[2], "c2"); @@ -166,7 +174,7 @@ QPID_AUTO_TEST_CASE(testSinglePartialFailure) { QPID_AUTO_TEST_CASE(testMultiPartialFailure) { ScopedSuppressLogging allQuiet; - ClusterFixture cluster(4, -1, updateArgs); + ClusterFixture cluster(4, updateArgs, -1); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); Client c2(cluster[2], "c2"); @@ -195,7 +203,7 @@ QPID_AUTO_TEST_CASE(testMultiPartialFailure) { QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) { ScopedSuppressLogging allQuiet; - ClusterFixture cluster(2, -1, updateArgs); + ClusterFixture cluster(2, updateArgs, -1); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index d38d84025b..819bf4365e 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -69,6 +69,18 @@ using namespace boost::assign; using broker::Broker; using boost::shared_ptr; +bool durableFlag = std::getenv("DURABLE_ENABLE") != 0; + +void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) { + ostringstream clusterLib; + clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so"; + args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str(); + if (durableFlag) + args += "--load-module", getLibPath("LIBSTORE"), "TMP_DATA_DIR"; + else + args += "--no-data-dir"; +} + // Timeout for tests that wait for messages const sys::Duration TIMEOUT=sys::TIME_SEC/4; @@ -166,29 +178,31 @@ QPID_AUTO_TEST_CASE(testAcl) { policyFile.close(); char cwd[1024]; BOOST_CHECK(::getcwd(cwd, sizeof(cwd))); - ClusterFixture cluster(2,-1, list_of<string> - ("--no-data-dir") - ("--auth=no") - ("--acl-file="+string(cwd)+"/cluster_test.acl") - ("--cluster-mechanism=PLAIN") - ("--cluster-username=cluster") - ("--cluster-password=cluster") - ("--load-module=../.libs/acl.so")); + ostringstream aclLib; + aclLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/acl.so"; + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + args += "--acl-file", string(cwd) + "/cluster_test.acl", + "--cluster-mechanism", "PLAIN", + "--cluster-username", "cluster", + "--cluster-password", "cluster", + "--load-module", aclLib.str(); + ClusterFixture cluster(2, args, -1); Client c0(aclSettings(cluster[0], "c0"), "c0"); Client c1(aclSettings(cluster[1], "c1"), "c1"); Client foo(aclSettings(cluster[1], "foo"), "foo"); - foo.session.queueDeclare("foo"); + foo.session.queueDeclare("foo", arg::durable=durableFlag); BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo"); - BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException); + BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::NotAllowedException); BOOST_CHECK(c0.session.queueQuery("bar").getQueue().empty()); BOOST_CHECK(c1.session.queueQuery("bar").getQueue().empty()); cluster.add(); Client c2(aclSettings(cluster[2], "c2"), "c2"); - BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException); + BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::NotAllowedException); BOOST_CHECK(c2.session.queueQuery("bar").getQueue().empty()); } @@ -198,15 +212,17 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) { // Note: this doesn't actually test for cluster race conditions around TTL, // it just verifies that basic TTL functionality works. // - ClusterFixture cluster(2); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(2, args, -1); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); - c0.session.queueDeclare("p"); - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200)); - c0.session.messageTransfer(arg::content=Message("b", "q")); - c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000)); - c0.session.messageTransfer(arg::content=Message("y", "p")); + c0.session.queueDeclare("p", arg::durable=durableFlag); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("y", "p"), arg::durable=durableFlag); cluster.add(); Client c2(cluster[1], "c2"); @@ -222,44 +238,48 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) { QPID_AUTO_TEST_CASE(testSequenceOptions) { // Make sure the exchange qpid.msg_sequence property is properly replicated. - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); - FieldTable args; - args.setInt("qpid.msg_sequence", 1); - c0.session.queueDeclare(arg::queue="q"); - c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args); + FieldTable ftargs; + ftargs.setInt("qpid.msg_sequence", 1); + c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag); + c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftargs); c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k"); - c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex"); - c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex"); + c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex", arg::durable=durableFlag); BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT))); BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT))); cluster.add(); Client c1(cluster[1]); - c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex"); + c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex", arg::durable=durableFlag); BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT))); } QPID_AUTO_TEST_CASE(testTxTransaction) { - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); - c0.session.queueDeclare(arg::queue="q"); - c0.session.messageTransfer(arg::content=Message("A", "q")); - c0.session.messageTransfer(arg::content=Message("B", "q")); + c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("A", "q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("B", "q"), arg::durable=durableFlag); // Start a transaction that will commit. Session commitSession = c0.connection.newSession("commit"); SubscriptionManager commitSubs(commitSession); commitSession.txSelect(); - commitSession.messageTransfer(arg::content=Message("a", "q")); - commitSession.messageTransfer(arg::content=Message("b", "q")); + commitSession.messageTransfer(arg::content=Message("a", "q"), arg::durable=durableFlag); + commitSession.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag); BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A"); // Start a transaction that will roll back. Session rollbackSession = c0.connection.newSession("rollback"); SubscriptionManager rollbackSubs(rollbackSession); rollbackSession.txSelect(); - rollbackSession.messageTransfer(arg::content=Message("1", "q")); + rollbackSession.messageTransfer(arg::content=Message("1", "q"), arg::durable=durableFlag); Message rollbackMessage = rollbackSubs.get("q", TIMEOUT); BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B"); @@ -270,9 +290,9 @@ QPID_AUTO_TEST_CASE(testTxTransaction) { // More transactional work BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - rollbackSession.messageTransfer(arg::content=Message("2", "q")); - commitSession.messageTransfer(arg::content=Message("c", "q")); - rollbackSession.messageTransfer(arg::content=Message("3", "q")); + rollbackSession.messageTransfer(arg::content=Message("2", "q"), arg::durable=durableFlag); + commitSession.messageTransfer(arg::content=Message("c", "q"), arg::durable=durableFlag); + rollbackSession.messageTransfer(arg::content=Message("3", "q"), arg::durable=durableFlag); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); @@ -292,15 +312,17 @@ QPID_AUTO_TEST_CASE(testTxTransaction) { QPID_AUTO_TEST_CASE(testUnacked) { // Verify replication of unacknowledged messages. - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); Message m; // Create unacked message: acquired but not accepted. SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0); - c0.session.queueDeclare("q1"); - c0.session.messageTransfer(arg::content=Message("11","q1")); + c0.session.queueDeclare("q1", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("11","q1"), arg::durable=durableFlag); LocalQueue q1; c0.subs.subscribe(q1, "q1", manualAccept); BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted @@ -308,9 +330,9 @@ QPID_AUTO_TEST_CASE(testUnacked) { // Create unacked message: not acquired, accepted or completeed. SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0); - c0.session.queueDeclare("q2"); - c0.session.messageTransfer(arg::content=Message("21","q2")); - c0.session.messageTransfer(arg::content=Message("22","q2")); + c0.session.queueDeclare("q2", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("21","q2"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("22","q2"), arg::durable=durableFlag); LocalQueue q2; c0.subs.subscribe(q2, "q2", manualAcquire); m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue @@ -323,9 +345,9 @@ QPID_AUTO_TEST_CASE(testUnacked) { // Create empty credit record: acquire and accept but don't complete. SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION); - c0.session.queueDeclare("q3"); - c0.session.messageTransfer(arg::content=Message("31", "q3")); - c0.session.messageTransfer(arg::content=Message("32", "q3")); + c0.session.queueDeclare("q3", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("31", "q3"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("32", "q3"), arg::durable=durableFlag); LocalQueue q3; c0.subs.subscribe(q3, "q3", manualComplete); Message m31=q3.get(TIMEOUT); @@ -360,14 +382,16 @@ QPID_AUTO_TEST_CASE(testUnacked) { QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { // Verify that we update transaction state correctly to new members. - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); // Do work in a transaction. c0.session.txSelect(); - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("1","q")); - c0.session.messageTransfer(arg::content=Message("2","q")); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("1","q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("2","q"), arg::durable=durableFlag); Message m; BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "1"); @@ -384,7 +408,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { BOOST_CHECK_EQUAL(m.getData(), "2"); // Another transaction with both members active. - c0.session.messageTransfer(arg::content=Message("3","q")); + c0.session.messageTransfer(arg::content=Message("3","q"), arg::durable=durableFlag); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); c0.session.txCommit(); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); @@ -394,9 +418,11 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { // Verify that we update a partially recieved message to a new member. - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); - c0.session.queueDeclare("q"); + c0.session.queueDeclare("q", arg::durable=durableFlag); Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel()); // Send first 2 frames of message. @@ -407,6 +433,10 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { sender.send(transfer, true, false, true, true); AMQHeaderBody header; header.get<DeliveryProperties>(true)->setRoutingKey("q"); + if (durableFlag) + header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_PERSISTENT); + else + header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_NON_PERSISTENT); sender.send(header, false, false, true, true); // No reliable way to ensure the partial message has arrived @@ -427,7 +457,9 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { } QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); set<int> kb0 = knownBrokerPorts(c0.connection); BOOST_CHECK_EQUAL(kb0.size(), 1u); @@ -459,11 +491,13 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { } QPID_AUTO_TEST_CASE(testUpdateConsumers) { - ClusterFixture cluster(1, 1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); - c0.session.queueDeclare("p"); - c0.session.queueDeclare("q"); + c0.session.queueDeclare("p", arg::durable=durableFlag); + c0.session.queueDeclare("q", arg::durable=durableFlag); c0.subs.subscribe(c0.lq, "q", FlowControl::zero()); LocalQueue lp; c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1)); @@ -476,10 +510,10 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) { Client c2(cluster[2], "c2"); // Transfer messages - c0.session.messageTransfer(arg::content=Message("aaa", "q")); + c0.session.messageTransfer(arg::content=Message("aaa", "q"), arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("bbb", "p")); - c0.session.messageTransfer(arg::content=Message("ccc", "p")); + c0.session.messageTransfer(arg::content=Message("bbb", "p"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("ccc", "p"), arg::durable=durableFlag); // Activate the subscription, ensure message removed on all queues. c0.subs.setFlowControl("q", FlowControl::unlimited()); @@ -504,20 +538,22 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) { cluster.killWithSilencer(0,c0.connection,9); BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u); for (int i = 0; i < 10; ++i) { - c1.session.messageTransfer(arg::content=Message("xxx", "q")); + c1.session.messageTransfer(arg::content=Message("xxx", "q"), arg::durable=durableFlag); BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT)); BOOST_REQUIRE_EQUAL(m.getData(), "xxx"); } } QPID_AUTO_TEST_CASE(testCatchupSharedState) { - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); // Create some shared state. - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("foo","q")); - c0.session.messageTransfer(arg::content=Message("bar","q")); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("foo","q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("bar","q"), arg::durable=durableFlag); while (c0.session.queueQuery("q").getMessageCount() != 2) sys::usleep(1000); // Wait for message to show up on broker 0. @@ -526,12 +562,12 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { cluster.add(); // Do some work post-add - c0.session.queueDeclare("p"); - c0.session.messageTransfer(arg::content=Message("pfoo","p")); + c0.session.queueDeclare("p", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("pfoo","p"), arg::durable=durableFlag); // Do some work post-join BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u); - c0.session.messageTransfer(arg::content=Message("pbar","p")); + c0.session.messageTransfer(arg::content=Message("pbar","p"), arg::durable=durableFlag); // Verify new brokers have state. Message m; @@ -556,11 +592,13 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { } QPID_AUTO_TEST_CASE(testWiringReplication) { - ClusterFixture cluster(3); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(3, args, -1); Client c0(cluster[0]); BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty()); BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); - c0.session.queueDeclare("q"); + c0.session.queueDeclare("q", arg::durable=durableFlag); c0.session.exchangeDeclare("ex", arg::type="direct"); c0.session.close(); c0.connection.close(); @@ -575,11 +613,13 @@ QPID_AUTO_TEST_CASE(testWiringReplication) { QPID_AUTO_TEST_CASE(testMessageEnqueue) { // Enqueue on one broker, dequeue on another. - ClusterFixture cluster(2); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(2, args, -1); Client c0(cluster[0]); - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("foo", "q")); - c0.session.messageTransfer(arg::content=Message("bar", "q")); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag); c0.session.close(); Client c1(cluster[1]); Message msg; @@ -591,11 +631,13 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) { QPID_AUTO_TEST_CASE(testMessageDequeue) { // Enqueue on one broker, dequeue on two others. - ClusterFixture cluster(3); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(3, args, -1); Client c0(cluster[0], "c0"); - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("foo", "q")); - c0.session.messageTransfer(arg::content=Message("bar", "q")); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag); Message msg; @@ -615,18 +657,20 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) { } QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { - ClusterFixture cluster(3); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(3, args, -1); Client c0(cluster[0]); BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers. // First start a subscription. - c0.session.queueDeclare("q"); + c0.session.queueDeclare("q", arg::durable=durableFlag); c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2)); // Now send messages Client c1(cluster[1]); - c1.session.messageTransfer(arg::content=Message("foo", "q")); - c1.session.messageTransfer(arg::content=Message("bar", "q")); + c1.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag); + c1.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag); // Check they arrived Message m; @@ -653,7 +697,7 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) void execute(AsyncSession& session, bool) { - session.messageTransfer(arg::content=Message(content, queue)); + session.messageTransfer(arg::content=Message(content, queue), arg::durable=durableFlag); } }; @@ -676,7 +720,7 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) void execute(AsyncSession& session, bool) { - session.queueDeclare(arg::queue=queue); + session.queueDeclare(arg::queue=queue, arg::durable=durableFlag); SubscriptionManager subs(session); subscription = subs.subscribe(*this, queue); session.sync(); @@ -707,7 +751,9 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) } }; - ClusterFixture cluster(2); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(2, args, -1); ConnectionSettings settings; settings.port = cluster[1]; settings.heartbeat = 1; diff --git a/cpp/src/tests/test_tools.h b/cpp/src/tests/test_tools.h index 37a6594f8a..54837d3e5b 100644 --- a/cpp/src/tests/test_tools.h +++ b/cpp/src/tests/test_tools.h @@ -89,6 +89,15 @@ struct ScopedSuppressLogging { qpid::log::Options opts; }; +inline std::string getLibPath(const char* envName, const char* defaultPath = 0) { + const char* p = std::getenv(envName); + if (p != 0) + return p; + if (defaultPath == 0) + BOOST_FAIL("Environment variable " << envName << " not set."); + return defaultPath; +} + #endif /*!TEST_TOOLS_H*/ |
