summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/tests/ClusterFailover.cpp5
-rw-r--r--cpp/src/tests/ClusterFixture.cpp13
-rw-r--r--cpp/src/tests/ClusterFixture.h8
-rw-r--r--cpp/src/tests/Makefile.am8
-rw-r--r--cpp/src/tests/PartialFailure.cpp30
-rw-r--r--cpp/src/tests/cluster_test.cpp214
-rw-r--r--cpp/src/tests/test_tools.h9
7 files changed, 175 insertions, 112 deletions
diff --git a/cpp/src/tests/ClusterFailover.cpp b/cpp/src/tests/ClusterFailover.cpp
index db2392b296..7d49ed5cda 100644
--- a/cpp/src/tests/ClusterFailover.cpp
+++ b/cpp/src/tests/ClusterFailover.cpp
@@ -50,7 +50,10 @@ const sys::Duration TIMEOUT=sys::TIME_SEC/4;
// Test re-connecting with same session name after a failure.
QPID_AUTO_TEST_CASE(testReconnectSameSessionName) {
- ClusterFixture cluster(2, -1);
+ ostringstream clusterLib;
+ clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
+ ClusterFixture::Args args = list_of<string>("--auth")("no")("--no-module-dir")("--no-data-dir")("--load-module")(clusterLib.str());
+ ClusterFixture cluster(2, args, -1);
Client c0(cluster[0], "foo");
cluster.kill(0, 9);
Client c1(cluster[1], "foo"); // Using same name, should be cleaned up.
diff --git a/cpp/src/tests/ClusterFixture.cpp b/cpp/src/tests/ClusterFixture.cpp
index 70d60b10b4..e12106c464 100644
--- a/cpp/src/tests/ClusterFixture.cpp
+++ b/cpp/src/tests/ClusterFixture.cpp
@@ -61,25 +61,20 @@ using boost::assign::list_of;
#include "ClusterFixture.h"
-ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_, const string& clusterLib_)
- : name(Uuid(true).str()), localIndex(localIndex_), userArgs(args_), clusterLib(clusterLib_)
+ClusterFixture::ClusterFixture(size_t n, const Args& args_, int localIndex_)
+ : name(Uuid(true).str()), localIndex(localIndex_), userArgs(args_)
{
add(n);
}
-ClusterFixture::ClusterFixture(size_t n, int localIndex_, boost::function<void (Args&, size_t)> updateArgs_, const string& clusterLib_)
- : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_), clusterLib(clusterLib_)
+ClusterFixture::ClusterFixture(size_t n, boost::function<void (Args&, size_t)> updateArgs_, int localIndex_)
+ : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_)
{
add(n);
}
-const ClusterFixture::Args ClusterFixture::DEFAULT_ARGS =
- list_of<string>("--auth=no")("--no-data-dir");
-
ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix, size_t index) {
Args args = list_of<string>("qpidd ")
- ("--no-module-dir")
- ("--load-module")(clusterLib)
("--cluster-name")(name)
("--log-prefix")(prefix);
args.insert(args.end(), userArgs.begin(), userArgs.end());
diff --git a/cpp/src/tests/ClusterFixture.h b/cpp/src/tests/ClusterFixture.h
index 353ec0c88d..08b314499e 100644
--- a/cpp/src/tests/ClusterFixture.h
+++ b/cpp/src/tests/ClusterFixture.h
@@ -60,8 +60,6 @@ using qpid::broker::Broker;
using boost::shared_ptr;
using qpid::cluster::Cluster;
-#define DEFAULT_CLUSTER_LIB "../.libs/cluster.so"
-
/** Cluster fixture is a vector of ports for the replicas.
*
* At most one replica (by default replica 0) is in the current
@@ -70,15 +68,14 @@ using qpid::cluster::Cluster;
class ClusterFixture : public vector<uint16_t> {
public:
typedef std::vector<std::string> Args;
- static const Args DEFAULT_ARGS;
/** @param localIndex can be -1 meaning don't automatically start a local broker.
* A local broker can be started with addLocal().
*/
- ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS, const string& clusterLib = DEFAULT_CLUSTER_LIB);
+ ClusterFixture(size_t n, const Args& args, int localIndex=0);
/**@param updateArgs function is passed the index of the cluster member and can update the arguments. */
- ClusterFixture(size_t n, int localIndex, boost::function<void (Args&, size_t)> updateArgs, const string& clusterLib = DEFAULT_CLUSTER_LIB);
+ ClusterFixture(size_t n, boost::function<void (Args&, size_t)> updateArgs, int localIndex);
void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
void add(); // Add a broker.
@@ -102,7 +99,6 @@ class ClusterFixture : public vector<uint16_t> {
std::vector<shared_ptr<ForkedBroker> > forkedBrokers;
Args userArgs;
boost::function<void (Args&, size_t)> updateArgs;
- string clusterLib;
};
/**
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 161428fcad..98d101049b 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -211,7 +211,13 @@ check_PROGRAMS+=DispatcherTest
DispatcherTest_SOURCES=DispatcherTest.cpp
DispatcherTest_LDADD=$(lib_common) $(SOCKLIBS)
-TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test
+TESTS_ENVIRONMENT = \
+ VALGRIND=$(VALGRIND) \
+ srcdir=$(srcdir) \
+ QPID_DATA_DIR= \
+ QPID_LIB_DIR=../.libs \
+ BOOST_TEST_SHOW_PROGRESS=yes \
+ $(srcdir)/run_test
system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest
TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test
diff --git a/cpp/src/tests/PartialFailure.cpp b/cpp/src/tests/PartialFailure.cpp
index 5137672e7d..91fa63e6e9 100644
--- a/cpp/src/tests/PartialFailure.cpp
+++ b/cpp/src/tests/PartialFailure.cpp
@@ -33,7 +33,7 @@
QPID_AUTO_TEST_SUITE(PartialFailureTestSuite)
- using namespace std;
+using namespace std;
using namespace qpid;
using namespace qpid::cluster;
using namespace qpid::framing;
@@ -49,11 +49,19 @@ const sys::Duration TIMEOUT=sys::TIME_SEC/4;
static bool isLogOption(const std::string& s) { return boost::starts_with(s, "--log-enable"); }
void updateArgs(ClusterFixture::Args& args, size_t index) {
- ostringstream os;
- os << "--test-store-name=s" << index;
- args.push_back(os.str());
- args.push_back("--load-module=.libs/test_store.so");
- args.push_back("--auth=no");
+ ostringstream clusterLib, testStoreLib, storeName;
+ clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
+ testStoreLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/../tests/.libs/test_store.so";
+ storeName << "s" << index;
+ args.push_back("--auth");
+ args.push_back("no");
+ args.push_back("--no-module-dir");
+ args.push_back("--load-module");
+ args.push_back(clusterLib.str());
+ args.push_back("--load-module");
+ args.push_back(testStoreLib.str());
+ args.push_back("--test-store-name");
+ args.push_back(storeName.str());
args.push_back("TMP_DATA_DIR");
// These tests generate errors deliberately, disable error logging unless a log env var is set.
@@ -82,7 +90,7 @@ QPID_AUTO_TEST_CASE(testNormalErrors) {
// Connection thread.
ScopedSuppressLogging allQuiet;
- ClusterFixture cluster(3, -1, updateArgs);
+ ClusterFixture cluster(3, updateArgs, -1);
Client c0(cluster[0], "c0");
Client c1(cluster[1], "c1");
Client c2(cluster[2], "c2");
@@ -113,7 +121,7 @@ QPID_AUTO_TEST_CASE(testNormalErrors) {
QPID_AUTO_TEST_CASE(testErrorAfterJoin) {
ScopedSuppressLogging allQuiet;
- ClusterFixture cluster(1, -1, updateArgs);
+ ClusterFixture cluster(1, updateArgs, -1);
Client c0(cluster[0]);
c0.session.queueDeclare("q", durable=true);
c0.session.messageTransfer(content=pMessage("a", "q"));
@@ -138,7 +146,7 @@ QPID_AUTO_TEST_CASE(testErrorAfterJoin) {
QPID_AUTO_TEST_CASE(testSinglePartialFailure) {
ScopedSuppressLogging allQuiet;
- ClusterFixture cluster(3, -1, updateArgs);
+ ClusterFixture cluster(3, updateArgs, -1);
Client c0(cluster[0], "c0");
Client c1(cluster[1], "c1");
Client c2(cluster[2], "c2");
@@ -166,7 +174,7 @@ QPID_AUTO_TEST_CASE(testSinglePartialFailure) {
QPID_AUTO_TEST_CASE(testMultiPartialFailure) {
ScopedSuppressLogging allQuiet;
- ClusterFixture cluster(4, -1, updateArgs);
+ ClusterFixture cluster(4, updateArgs, -1);
Client c0(cluster[0], "c0");
Client c1(cluster[1], "c1");
Client c2(cluster[2], "c2");
@@ -195,7 +203,7 @@ QPID_AUTO_TEST_CASE(testMultiPartialFailure) {
QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) {
ScopedSuppressLogging allQuiet;
- ClusterFixture cluster(2, -1, updateArgs);
+ ClusterFixture cluster(2, updateArgs, -1);
Client c0(cluster[0], "c0");
Client c1(cluster[1], "c1");
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index d38d84025b..819bf4365e 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -69,6 +69,18 @@ using namespace boost::assign;
using broker::Broker;
using boost::shared_ptr;
+bool durableFlag = std::getenv("DURABLE_ENABLE") != 0;
+
+void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) {
+ ostringstream clusterLib;
+ clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
+ args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str();
+ if (durableFlag)
+ args += "--load-module", getLibPath("LIBSTORE"), "TMP_DATA_DIR";
+ else
+ args += "--no-data-dir";
+}
+
// Timeout for tests that wait for messages
const sys::Duration TIMEOUT=sys::TIME_SEC/4;
@@ -166,29 +178,31 @@ QPID_AUTO_TEST_CASE(testAcl) {
policyFile.close();
char cwd[1024];
BOOST_CHECK(::getcwd(cwd, sizeof(cwd)));
- ClusterFixture cluster(2,-1, list_of<string>
- ("--no-data-dir")
- ("--auth=no")
- ("--acl-file="+string(cwd)+"/cluster_test.acl")
- ("--cluster-mechanism=PLAIN")
- ("--cluster-username=cluster")
- ("--cluster-password=cluster")
- ("--load-module=../.libs/acl.so"));
+ ostringstream aclLib;
+ aclLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/acl.so";
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ args += "--acl-file", string(cwd) + "/cluster_test.acl",
+ "--cluster-mechanism", "PLAIN",
+ "--cluster-username", "cluster",
+ "--cluster-password", "cluster",
+ "--load-module", aclLib.str();
+ ClusterFixture cluster(2, args, -1);
Client c0(aclSettings(cluster[0], "c0"), "c0");
Client c1(aclSettings(cluster[1], "c1"), "c1");
Client foo(aclSettings(cluster[1], "foo"), "foo");
- foo.session.queueDeclare("foo");
+ foo.session.queueDeclare("foo", arg::durable=durableFlag);
BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo");
- BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException);
+ BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::NotAllowedException);
BOOST_CHECK(c0.session.queueQuery("bar").getQueue().empty());
BOOST_CHECK(c1.session.queueQuery("bar").getQueue().empty());
cluster.add();
Client c2(aclSettings(cluster[2], "c2"), "c2");
- BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException);
+ BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::NotAllowedException);
BOOST_CHECK(c2.session.queueQuery("bar").getQueue().empty());
}
@@ -198,15 +212,17 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) {
// Note: this doesn't actually test for cluster race conditions around TTL,
// it just verifies that basic TTL functionality works.
//
- ClusterFixture cluster(2);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(2, args, -1);
Client c0(cluster[0], "c0");
Client c1(cluster[1], "c1");
- c0.session.queueDeclare("p");
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200));
- c0.session.messageTransfer(arg::content=Message("b", "q"));
- c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000));
- c0.session.messageTransfer(arg::content=Message("y", "p"));
+ c0.session.queueDeclare("p", arg::durable=durableFlag);
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("y", "p"), arg::durable=durableFlag);
cluster.add();
Client c2(cluster[1], "c2");
@@ -222,44 +238,48 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) {
QPID_AUTO_TEST_CASE(testSequenceOptions) {
// Make sure the exchange qpid.msg_sequence property is properly replicated.
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
- FieldTable args;
- args.setInt("qpid.msg_sequence", 1);
- c0.session.queueDeclare(arg::queue="q");
- c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args);
+ FieldTable ftargs;
+ ftargs.setInt("qpid.msg_sequence", 1);
+ c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
+ c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftargs);
c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
- c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex");
- c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex");
+ c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex", arg::durable=durableFlag);
BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT)));
BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT)));
cluster.add();
Client c1(cluster[1]);
- c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex");
+ c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex", arg::durable=durableFlag);
BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT)));
}
QPID_AUTO_TEST_CASE(testTxTransaction) {
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
- c0.session.queueDeclare(arg::queue="q");
- c0.session.messageTransfer(arg::content=Message("A", "q"));
- c0.session.messageTransfer(arg::content=Message("B", "q"));
+ c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("A", "q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("B", "q"), arg::durable=durableFlag);
// Start a transaction that will commit.
Session commitSession = c0.connection.newSession("commit");
SubscriptionManager commitSubs(commitSession);
commitSession.txSelect();
- commitSession.messageTransfer(arg::content=Message("a", "q"));
- commitSession.messageTransfer(arg::content=Message("b", "q"));
+ commitSession.messageTransfer(arg::content=Message("a", "q"), arg::durable=durableFlag);
+ commitSession.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag);
BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A");
// Start a transaction that will roll back.
Session rollbackSession = c0.connection.newSession("rollback");
SubscriptionManager rollbackSubs(rollbackSession);
rollbackSession.txSelect();
- rollbackSession.messageTransfer(arg::content=Message("1", "q"));
+ rollbackSession.messageTransfer(arg::content=Message("1", "q"), arg::durable=durableFlag);
Message rollbackMessage = rollbackSubs.get("q", TIMEOUT);
BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
@@ -270,9 +290,9 @@ QPID_AUTO_TEST_CASE(testTxTransaction) {
// More transactional work
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
- rollbackSession.messageTransfer(arg::content=Message("2", "q"));
- commitSession.messageTransfer(arg::content=Message("c", "q"));
- rollbackSession.messageTransfer(arg::content=Message("3", "q"));
+ rollbackSession.messageTransfer(arg::content=Message("2", "q"), arg::durable=durableFlag);
+ commitSession.messageTransfer(arg::content=Message("c", "q"), arg::durable=durableFlag);
+ rollbackSession.messageTransfer(arg::content=Message("3", "q"), arg::durable=durableFlag);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
@@ -292,15 +312,17 @@ QPID_AUTO_TEST_CASE(testTxTransaction) {
QPID_AUTO_TEST_CASE(testUnacked) {
// Verify replication of unacknowledged messages.
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
Message m;
// Create unacked message: acquired but not accepted.
SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0);
- c0.session.queueDeclare("q1");
- c0.session.messageTransfer(arg::content=Message("11","q1"));
+ c0.session.queueDeclare("q1", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("11","q1"), arg::durable=durableFlag);
LocalQueue q1;
c0.subs.subscribe(q1, "q1", manualAccept);
BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted
@@ -308,9 +330,9 @@ QPID_AUTO_TEST_CASE(testUnacked) {
// Create unacked message: not acquired, accepted or completeed.
SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0);
- c0.session.queueDeclare("q2");
- c0.session.messageTransfer(arg::content=Message("21","q2"));
- c0.session.messageTransfer(arg::content=Message("22","q2"));
+ c0.session.queueDeclare("q2", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("21","q2"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("22","q2"), arg::durable=durableFlag);
LocalQueue q2;
c0.subs.subscribe(q2, "q2", manualAcquire);
m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue
@@ -323,9 +345,9 @@ QPID_AUTO_TEST_CASE(testUnacked) {
// Create empty credit record: acquire and accept but don't complete.
SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION);
- c0.session.queueDeclare("q3");
- c0.session.messageTransfer(arg::content=Message("31", "q3"));
- c0.session.messageTransfer(arg::content=Message("32", "q3"));
+ c0.session.queueDeclare("q3", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("31", "q3"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("32", "q3"), arg::durable=durableFlag);
LocalQueue q3;
c0.subs.subscribe(q3, "q3", manualComplete);
Message m31=q3.get(TIMEOUT);
@@ -360,14 +382,16 @@ QPID_AUTO_TEST_CASE(testUnacked) {
QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
// Verify that we update transaction state correctly to new members.
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
// Do work in a transaction.
c0.session.txSelect();
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=Message("1","q"));
- c0.session.messageTransfer(arg::content=Message("2","q"));
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("1","q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("2","q"), arg::durable=durableFlag);
Message m;
BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "1");
@@ -384,7 +408,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
BOOST_CHECK_EQUAL(m.getData(), "2");
// Another transaction with both members active.
- c0.session.messageTransfer(arg::content=Message("3","q"));
+ c0.session.messageTransfer(arg::content=Message("3","q"), arg::durable=durableFlag);
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
c0.session.txCommit();
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
@@ -394,9 +418,11 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
// Verify that we update a partially recieved message to a new member.
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
- c0.session.queueDeclare("q");
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel());
// Send first 2 frames of message.
@@ -407,6 +433,10 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
sender.send(transfer, true, false, true, true);
AMQHeaderBody header;
header.get<DeliveryProperties>(true)->setRoutingKey("q");
+ if (durableFlag)
+ header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_PERSISTENT);
+ else
+ header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_NON_PERSISTENT);
sender.send(header, false, false, true, true);
// No reliable way to ensure the partial message has arrived
@@ -427,7 +457,9 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
}
QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
set<int> kb0 = knownBrokerPorts(c0.connection);
BOOST_CHECK_EQUAL(kb0.size(), 1u);
@@ -459,11 +491,13 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
}
QPID_AUTO_TEST_CASE(testUpdateConsumers) {
- ClusterFixture cluster(1, 1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
- c0.session.queueDeclare("p");
- c0.session.queueDeclare("q");
+ c0.session.queueDeclare("p", arg::durable=durableFlag);
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
c0.subs.subscribe(c0.lq, "q", FlowControl::zero());
LocalQueue lp;
c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
@@ -476,10 +510,10 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) {
Client c2(cluster[2], "c2");
// Transfer messages
- c0.session.messageTransfer(arg::content=Message("aaa", "q"));
+ c0.session.messageTransfer(arg::content=Message("aaa", "q"), arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("bbb", "p"));
- c0.session.messageTransfer(arg::content=Message("ccc", "p"));
+ c0.session.messageTransfer(arg::content=Message("bbb", "p"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("ccc", "p"), arg::durable=durableFlag);
// Activate the subscription, ensure message removed on all queues.
c0.subs.setFlowControl("q", FlowControl::unlimited());
@@ -504,20 +538,22 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) {
cluster.killWithSilencer(0,c0.connection,9);
BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u);
for (int i = 0; i < 10; ++i) {
- c1.session.messageTransfer(arg::content=Message("xxx", "q"));
+ c1.session.messageTransfer(arg::content=Message("xxx", "q"), arg::durable=durableFlag);
BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT));
BOOST_REQUIRE_EQUAL(m.getData(), "xxx");
}
}
QPID_AUTO_TEST_CASE(testCatchupSharedState) {
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
// Create some shared state.
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=Message("foo","q"));
- c0.session.messageTransfer(arg::content=Message("bar","q"));
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("foo","q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("bar","q"), arg::durable=durableFlag);
while (c0.session.queueQuery("q").getMessageCount() != 2)
sys::usleep(1000); // Wait for message to show up on broker 0.
@@ -526,12 +562,12 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
cluster.add();
// Do some work post-add
- c0.session.queueDeclare("p");
- c0.session.messageTransfer(arg::content=Message("pfoo","p"));
+ c0.session.queueDeclare("p", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("pfoo","p"), arg::durable=durableFlag);
// Do some work post-join
BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u);
- c0.session.messageTransfer(arg::content=Message("pbar","p"));
+ c0.session.messageTransfer(arg::content=Message("pbar","p"), arg::durable=durableFlag);
// Verify new brokers have state.
Message m;
@@ -556,11 +592,13 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
}
QPID_AUTO_TEST_CASE(testWiringReplication) {
- ClusterFixture cluster(3);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(3, args, -1);
Client c0(cluster[0]);
BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty());
- c0.session.queueDeclare("q");
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
c0.session.exchangeDeclare("ex", arg::type="direct");
c0.session.close();
c0.connection.close();
@@ -575,11 +613,13 @@ QPID_AUTO_TEST_CASE(testWiringReplication) {
QPID_AUTO_TEST_CASE(testMessageEnqueue) {
// Enqueue on one broker, dequeue on another.
- ClusterFixture cluster(2);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(2, args, -1);
Client c0(cluster[0]);
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=Message("foo", "q"));
- c0.session.messageTransfer(arg::content=Message("bar", "q"));
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag);
c0.session.close();
Client c1(cluster[1]);
Message msg;
@@ -591,11 +631,13 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) {
QPID_AUTO_TEST_CASE(testMessageDequeue) {
// Enqueue on one broker, dequeue on two others.
- ClusterFixture cluster(3);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(3, args, -1);
Client c0(cluster[0], "c0");
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=Message("foo", "q"));
- c0.session.messageTransfer(arg::content=Message("bar", "q"));
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag);
Message msg;
@@ -615,18 +657,20 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) {
}
QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
- ClusterFixture cluster(3);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(3, args, -1);
Client c0(cluster[0]);
BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers.
// First start a subscription.
- c0.session.queueDeclare("q");
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
// Now send messages
Client c1(cluster[1]);
- c1.session.messageTransfer(arg::content=Message("foo", "q"));
- c1.session.messageTransfer(arg::content=Message("bar", "q"));
+ c1.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag);
+ c1.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag);
// Check they arrived
Message m;
@@ -653,7 +697,7 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
void execute(AsyncSession& session, bool)
{
- session.messageTransfer(arg::content=Message(content, queue));
+ session.messageTransfer(arg::content=Message(content, queue), arg::durable=durableFlag);
}
};
@@ -676,7 +720,7 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
void execute(AsyncSession& session, bool)
{
- session.queueDeclare(arg::queue=queue);
+ session.queueDeclare(arg::queue=queue, arg::durable=durableFlag);
SubscriptionManager subs(session);
subscription = subs.subscribe(*this, queue);
session.sync();
@@ -707,7 +751,9 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
}
};
- ClusterFixture cluster(2);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(2, args, -1);
ConnectionSettings settings;
settings.port = cluster[1];
settings.heartbeat = 1;
diff --git a/cpp/src/tests/test_tools.h b/cpp/src/tests/test_tools.h
index 37a6594f8a..54837d3e5b 100644
--- a/cpp/src/tests/test_tools.h
+++ b/cpp/src/tests/test_tools.h
@@ -89,6 +89,15 @@ struct ScopedSuppressLogging {
qpid::log::Options opts;
};
+inline std::string getLibPath(const char* envName, const char* defaultPath = 0) {
+ const char* p = std::getenv(envName);
+ if (p != 0)
+ return p;
+ if (defaultPath == 0)
+ BOOST_FAIL("Environment variable " << envName << " not set.");
+ return defaultPath;
+}
+
#endif /*!TEST_TOOLS_H*/