summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/cluster_test.cpp')
-rw-r--r--cpp/src/tests/cluster_test.cpp214
1 files changed, 130 insertions, 84 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index d38d84025b..819bf4365e 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -69,6 +69,18 @@ using namespace boost::assign;
using broker::Broker;
using boost::shared_ptr;
+bool durableFlag = std::getenv("DURABLE_ENABLE") != 0;
+
+void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) {
+ ostringstream clusterLib;
+ clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
+ args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str();
+ if (durableFlag)
+ args += "--load-module", getLibPath("LIBSTORE"), "TMP_DATA_DIR";
+ else
+ args += "--no-data-dir";
+}
+
// Timeout for tests that wait for messages
const sys::Duration TIMEOUT=sys::TIME_SEC/4;
@@ -166,29 +178,31 @@ QPID_AUTO_TEST_CASE(testAcl) {
policyFile.close();
char cwd[1024];
BOOST_CHECK(::getcwd(cwd, sizeof(cwd)));
- ClusterFixture cluster(2,-1, list_of<string>
- ("--no-data-dir")
- ("--auth=no")
- ("--acl-file="+string(cwd)+"/cluster_test.acl")
- ("--cluster-mechanism=PLAIN")
- ("--cluster-username=cluster")
- ("--cluster-password=cluster")
- ("--load-module=../.libs/acl.so"));
+ ostringstream aclLib;
+ aclLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/acl.so";
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ args += "--acl-file", string(cwd) + "/cluster_test.acl",
+ "--cluster-mechanism", "PLAIN",
+ "--cluster-username", "cluster",
+ "--cluster-password", "cluster",
+ "--load-module", aclLib.str();
+ ClusterFixture cluster(2, args, -1);
Client c0(aclSettings(cluster[0], "c0"), "c0");
Client c1(aclSettings(cluster[1], "c1"), "c1");
Client foo(aclSettings(cluster[1], "foo"), "foo");
- foo.session.queueDeclare("foo");
+ foo.session.queueDeclare("foo", arg::durable=durableFlag);
BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo");
- BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException);
+ BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::NotAllowedException);
BOOST_CHECK(c0.session.queueQuery("bar").getQueue().empty());
BOOST_CHECK(c1.session.queueQuery("bar").getQueue().empty());
cluster.add();
Client c2(aclSettings(cluster[2], "c2"), "c2");
- BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException);
+ BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::NotAllowedException);
BOOST_CHECK(c2.session.queueQuery("bar").getQueue().empty());
}
@@ -198,15 +212,17 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) {
// Note: this doesn't actually test for cluster race conditions around TTL,
// it just verifies that basic TTL functionality works.
//
- ClusterFixture cluster(2);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(2, args, -1);
Client c0(cluster[0], "c0");
Client c1(cluster[1], "c1");
- c0.session.queueDeclare("p");
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200));
- c0.session.messageTransfer(arg::content=Message("b", "q"));
- c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000));
- c0.session.messageTransfer(arg::content=Message("y", "p"));
+ c0.session.queueDeclare("p", arg::durable=durableFlag);
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("y", "p"), arg::durable=durableFlag);
cluster.add();
Client c2(cluster[1], "c2");
@@ -222,44 +238,48 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) {
QPID_AUTO_TEST_CASE(testSequenceOptions) {
// Make sure the exchange qpid.msg_sequence property is properly replicated.
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
- FieldTable args;
- args.setInt("qpid.msg_sequence", 1);
- c0.session.queueDeclare(arg::queue="q");
- c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args);
+ FieldTable ftargs;
+ ftargs.setInt("qpid.msg_sequence", 1);
+ c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
+ c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftargs);
c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
- c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex");
- c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex");
+ c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex", arg::durable=durableFlag);
BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT)));
BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT)));
cluster.add();
Client c1(cluster[1]);
- c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex");
+ c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex", arg::durable=durableFlag);
BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT)));
}
QPID_AUTO_TEST_CASE(testTxTransaction) {
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
- c0.session.queueDeclare(arg::queue="q");
- c0.session.messageTransfer(arg::content=Message("A", "q"));
- c0.session.messageTransfer(arg::content=Message("B", "q"));
+ c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("A", "q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("B", "q"), arg::durable=durableFlag);
// Start a transaction that will commit.
Session commitSession = c0.connection.newSession("commit");
SubscriptionManager commitSubs(commitSession);
commitSession.txSelect();
- commitSession.messageTransfer(arg::content=Message("a", "q"));
- commitSession.messageTransfer(arg::content=Message("b", "q"));
+ commitSession.messageTransfer(arg::content=Message("a", "q"), arg::durable=durableFlag);
+ commitSession.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag);
BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A");
// Start a transaction that will roll back.
Session rollbackSession = c0.connection.newSession("rollback");
SubscriptionManager rollbackSubs(rollbackSession);
rollbackSession.txSelect();
- rollbackSession.messageTransfer(arg::content=Message("1", "q"));
+ rollbackSession.messageTransfer(arg::content=Message("1", "q"), arg::durable=durableFlag);
Message rollbackMessage = rollbackSubs.get("q", TIMEOUT);
BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
@@ -270,9 +290,9 @@ QPID_AUTO_TEST_CASE(testTxTransaction) {
// More transactional work
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
- rollbackSession.messageTransfer(arg::content=Message("2", "q"));
- commitSession.messageTransfer(arg::content=Message("c", "q"));
- rollbackSession.messageTransfer(arg::content=Message("3", "q"));
+ rollbackSession.messageTransfer(arg::content=Message("2", "q"), arg::durable=durableFlag);
+ commitSession.messageTransfer(arg::content=Message("c", "q"), arg::durable=durableFlag);
+ rollbackSession.messageTransfer(arg::content=Message("3", "q"), arg::durable=durableFlag);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
@@ -292,15 +312,17 @@ QPID_AUTO_TEST_CASE(testTxTransaction) {
QPID_AUTO_TEST_CASE(testUnacked) {
// Verify replication of unacknowledged messages.
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
Message m;
// Create unacked message: acquired but not accepted.
SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0);
- c0.session.queueDeclare("q1");
- c0.session.messageTransfer(arg::content=Message("11","q1"));
+ c0.session.queueDeclare("q1", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("11","q1"), arg::durable=durableFlag);
LocalQueue q1;
c0.subs.subscribe(q1, "q1", manualAccept);
BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted
@@ -308,9 +330,9 @@ QPID_AUTO_TEST_CASE(testUnacked) {
// Create unacked message: not acquired, accepted or completeed.
SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0);
- c0.session.queueDeclare("q2");
- c0.session.messageTransfer(arg::content=Message("21","q2"));
- c0.session.messageTransfer(arg::content=Message("22","q2"));
+ c0.session.queueDeclare("q2", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("21","q2"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("22","q2"), arg::durable=durableFlag);
LocalQueue q2;
c0.subs.subscribe(q2, "q2", manualAcquire);
m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue
@@ -323,9 +345,9 @@ QPID_AUTO_TEST_CASE(testUnacked) {
// Create empty credit record: acquire and accept but don't complete.
SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION);
- c0.session.queueDeclare("q3");
- c0.session.messageTransfer(arg::content=Message("31", "q3"));
- c0.session.messageTransfer(arg::content=Message("32", "q3"));
+ c0.session.queueDeclare("q3", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("31", "q3"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("32", "q3"), arg::durable=durableFlag);
LocalQueue q3;
c0.subs.subscribe(q3, "q3", manualComplete);
Message m31=q3.get(TIMEOUT);
@@ -360,14 +382,16 @@ QPID_AUTO_TEST_CASE(testUnacked) {
QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
// Verify that we update transaction state correctly to new members.
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
// Do work in a transaction.
c0.session.txSelect();
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=Message("1","q"));
- c0.session.messageTransfer(arg::content=Message("2","q"));
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("1","q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("2","q"), arg::durable=durableFlag);
Message m;
BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "1");
@@ -384,7 +408,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
BOOST_CHECK_EQUAL(m.getData(), "2");
// Another transaction with both members active.
- c0.session.messageTransfer(arg::content=Message("3","q"));
+ c0.session.messageTransfer(arg::content=Message("3","q"), arg::durable=durableFlag);
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
c0.session.txCommit();
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
@@ -394,9 +418,11 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
// Verify that we update a partially recieved message to a new member.
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
- c0.session.queueDeclare("q");
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel());
// Send first 2 frames of message.
@@ -407,6 +433,10 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
sender.send(transfer, true, false, true, true);
AMQHeaderBody header;
header.get<DeliveryProperties>(true)->setRoutingKey("q");
+ if (durableFlag)
+ header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_PERSISTENT);
+ else
+ header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_NON_PERSISTENT);
sender.send(header, false, false, true, true);
// No reliable way to ensure the partial message has arrived
@@ -427,7 +457,9 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
}
QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
set<int> kb0 = knownBrokerPorts(c0.connection);
BOOST_CHECK_EQUAL(kb0.size(), 1u);
@@ -459,11 +491,13 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
}
QPID_AUTO_TEST_CASE(testUpdateConsumers) {
- ClusterFixture cluster(1, 1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
- c0.session.queueDeclare("p");
- c0.session.queueDeclare("q");
+ c0.session.queueDeclare("p", arg::durable=durableFlag);
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
c0.subs.subscribe(c0.lq, "q", FlowControl::zero());
LocalQueue lp;
c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
@@ -476,10 +510,10 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) {
Client c2(cluster[2], "c2");
// Transfer messages
- c0.session.messageTransfer(arg::content=Message("aaa", "q"));
+ c0.session.messageTransfer(arg::content=Message("aaa", "q"), arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("bbb", "p"));
- c0.session.messageTransfer(arg::content=Message("ccc", "p"));
+ c0.session.messageTransfer(arg::content=Message("bbb", "p"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("ccc", "p"), arg::durable=durableFlag);
// Activate the subscription, ensure message removed on all queues.
c0.subs.setFlowControl("q", FlowControl::unlimited());
@@ -504,20 +538,22 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) {
cluster.killWithSilencer(0,c0.connection,9);
BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u);
for (int i = 0; i < 10; ++i) {
- c1.session.messageTransfer(arg::content=Message("xxx", "q"));
+ c1.session.messageTransfer(arg::content=Message("xxx", "q"), arg::durable=durableFlag);
BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT));
BOOST_REQUIRE_EQUAL(m.getData(), "xxx");
}
}
QPID_AUTO_TEST_CASE(testCatchupSharedState) {
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
// Create some shared state.
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=Message("foo","q"));
- c0.session.messageTransfer(arg::content=Message("bar","q"));
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("foo","q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("bar","q"), arg::durable=durableFlag);
while (c0.session.queueQuery("q").getMessageCount() != 2)
sys::usleep(1000); // Wait for message to show up on broker 0.
@@ -526,12 +562,12 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
cluster.add();
// Do some work post-add
- c0.session.queueDeclare("p");
- c0.session.messageTransfer(arg::content=Message("pfoo","p"));
+ c0.session.queueDeclare("p", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("pfoo","p"), arg::durable=durableFlag);
// Do some work post-join
BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u);
- c0.session.messageTransfer(arg::content=Message("pbar","p"));
+ c0.session.messageTransfer(arg::content=Message("pbar","p"), arg::durable=durableFlag);
// Verify new brokers have state.
Message m;
@@ -556,11 +592,13 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
}
QPID_AUTO_TEST_CASE(testWiringReplication) {
- ClusterFixture cluster(3);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(3, args, -1);
Client c0(cluster[0]);
BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty());
- c0.session.queueDeclare("q");
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
c0.session.exchangeDeclare("ex", arg::type="direct");
c0.session.close();
c0.connection.close();
@@ -575,11 +613,13 @@ QPID_AUTO_TEST_CASE(testWiringReplication) {
QPID_AUTO_TEST_CASE(testMessageEnqueue) {
// Enqueue on one broker, dequeue on another.
- ClusterFixture cluster(2);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(2, args, -1);
Client c0(cluster[0]);
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=Message("foo", "q"));
- c0.session.messageTransfer(arg::content=Message("bar", "q"));
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag);
c0.session.close();
Client c1(cluster[1]);
Message msg;
@@ -591,11 +631,13 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) {
QPID_AUTO_TEST_CASE(testMessageDequeue) {
// Enqueue on one broker, dequeue on two others.
- ClusterFixture cluster(3);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(3, args, -1);
Client c0(cluster[0], "c0");
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=Message("foo", "q"));
- c0.session.messageTransfer(arg::content=Message("bar", "q"));
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag);
Message msg;
@@ -615,18 +657,20 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) {
}
QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
- ClusterFixture cluster(3);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(3, args, -1);
Client c0(cluster[0]);
BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers.
// First start a subscription.
- c0.session.queueDeclare("q");
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
// Now send messages
Client c1(cluster[1]);
- c1.session.messageTransfer(arg::content=Message("foo", "q"));
- c1.session.messageTransfer(arg::content=Message("bar", "q"));
+ c1.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag);
+ c1.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag);
// Check they arrived
Message m;
@@ -653,7 +697,7 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
void execute(AsyncSession& session, bool)
{
- session.messageTransfer(arg::content=Message(content, queue));
+ session.messageTransfer(arg::content=Message(content, queue), arg::durable=durableFlag);
}
};
@@ -676,7 +720,7 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
void execute(AsyncSession& session, bool)
{
- session.queueDeclare(arg::queue=queue);
+ session.queueDeclare(arg::queue=queue, arg::durable=durableFlag);
SubscriptionManager subs(session);
subscription = subs.subscribe(*this, queue);
session.sync();
@@ -707,7 +751,9 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
}
};
- ClusterFixture cluster(2);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(2, args, -1);
ConnectionSettings settings;
settings.port = cluster[1];
settings.heartbeat = 1;