diff options
Diffstat (limited to 'cpp/src/tests/cluster_test.cpp')
| -rw-r--r-- | cpp/src/tests/cluster_test.cpp | 214 |
1 files changed, 130 insertions, 84 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index d38d84025b..819bf4365e 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -69,6 +69,18 @@ using namespace boost::assign; using broker::Broker; using boost::shared_ptr; +bool durableFlag = std::getenv("DURABLE_ENABLE") != 0; + +void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) { + ostringstream clusterLib; + clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so"; + args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str(); + if (durableFlag) + args += "--load-module", getLibPath("LIBSTORE"), "TMP_DATA_DIR"; + else + args += "--no-data-dir"; +} + // Timeout for tests that wait for messages const sys::Duration TIMEOUT=sys::TIME_SEC/4; @@ -166,29 +178,31 @@ QPID_AUTO_TEST_CASE(testAcl) { policyFile.close(); char cwd[1024]; BOOST_CHECK(::getcwd(cwd, sizeof(cwd))); - ClusterFixture cluster(2,-1, list_of<string> - ("--no-data-dir") - ("--auth=no") - ("--acl-file="+string(cwd)+"/cluster_test.acl") - ("--cluster-mechanism=PLAIN") - ("--cluster-username=cluster") - ("--cluster-password=cluster") - ("--load-module=../.libs/acl.so")); + ostringstream aclLib; + aclLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/acl.so"; + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + args += "--acl-file", string(cwd) + "/cluster_test.acl", + "--cluster-mechanism", "PLAIN", + "--cluster-username", "cluster", + "--cluster-password", "cluster", + "--load-module", aclLib.str(); + ClusterFixture cluster(2, args, -1); Client c0(aclSettings(cluster[0], "c0"), "c0"); Client c1(aclSettings(cluster[1], "c1"), "c1"); Client foo(aclSettings(cluster[1], "foo"), "foo"); - foo.session.queueDeclare("foo"); + foo.session.queueDeclare("foo", arg::durable=durableFlag); BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo"); - BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException); + BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::NotAllowedException); BOOST_CHECK(c0.session.queueQuery("bar").getQueue().empty()); BOOST_CHECK(c1.session.queueQuery("bar").getQueue().empty()); cluster.add(); Client c2(aclSettings(cluster[2], "c2"), "c2"); - BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException); + BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::NotAllowedException); BOOST_CHECK(c2.session.queueQuery("bar").getQueue().empty()); } @@ -198,15 +212,17 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) { // Note: this doesn't actually test for cluster race conditions around TTL, // it just verifies that basic TTL functionality works. // - ClusterFixture cluster(2); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(2, args, -1); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); - c0.session.queueDeclare("p"); - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200)); - c0.session.messageTransfer(arg::content=Message("b", "q")); - c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000)); - c0.session.messageTransfer(arg::content=Message("y", "p")); + c0.session.queueDeclare("p", arg::durable=durableFlag); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("y", "p"), arg::durable=durableFlag); cluster.add(); Client c2(cluster[1], "c2"); @@ -222,44 +238,48 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) { QPID_AUTO_TEST_CASE(testSequenceOptions) { // Make sure the exchange qpid.msg_sequence property is properly replicated. - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); - FieldTable args; - args.setInt("qpid.msg_sequence", 1); - c0.session.queueDeclare(arg::queue="q"); - c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args); + FieldTable ftargs; + ftargs.setInt("qpid.msg_sequence", 1); + c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag); + c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftargs); c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k"); - c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex"); - c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex"); + c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex", arg::durable=durableFlag); BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT))); BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT))); cluster.add(); Client c1(cluster[1]); - c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex"); + c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex", arg::durable=durableFlag); BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT))); } QPID_AUTO_TEST_CASE(testTxTransaction) { - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); - c0.session.queueDeclare(arg::queue="q"); - c0.session.messageTransfer(arg::content=Message("A", "q")); - c0.session.messageTransfer(arg::content=Message("B", "q")); + c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("A", "q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("B", "q"), arg::durable=durableFlag); // Start a transaction that will commit. Session commitSession = c0.connection.newSession("commit"); SubscriptionManager commitSubs(commitSession); commitSession.txSelect(); - commitSession.messageTransfer(arg::content=Message("a", "q")); - commitSession.messageTransfer(arg::content=Message("b", "q")); + commitSession.messageTransfer(arg::content=Message("a", "q"), arg::durable=durableFlag); + commitSession.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag); BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A"); // Start a transaction that will roll back. Session rollbackSession = c0.connection.newSession("rollback"); SubscriptionManager rollbackSubs(rollbackSession); rollbackSession.txSelect(); - rollbackSession.messageTransfer(arg::content=Message("1", "q")); + rollbackSession.messageTransfer(arg::content=Message("1", "q"), arg::durable=durableFlag); Message rollbackMessage = rollbackSubs.get("q", TIMEOUT); BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B"); @@ -270,9 +290,9 @@ QPID_AUTO_TEST_CASE(testTxTransaction) { // More transactional work BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - rollbackSession.messageTransfer(arg::content=Message("2", "q")); - commitSession.messageTransfer(arg::content=Message("c", "q")); - rollbackSession.messageTransfer(arg::content=Message("3", "q")); + rollbackSession.messageTransfer(arg::content=Message("2", "q"), arg::durable=durableFlag); + commitSession.messageTransfer(arg::content=Message("c", "q"), arg::durable=durableFlag); + rollbackSession.messageTransfer(arg::content=Message("3", "q"), arg::durable=durableFlag); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); @@ -292,15 +312,17 @@ QPID_AUTO_TEST_CASE(testTxTransaction) { QPID_AUTO_TEST_CASE(testUnacked) { // Verify replication of unacknowledged messages. - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); Message m; // Create unacked message: acquired but not accepted. SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0); - c0.session.queueDeclare("q1"); - c0.session.messageTransfer(arg::content=Message("11","q1")); + c0.session.queueDeclare("q1", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("11","q1"), arg::durable=durableFlag); LocalQueue q1; c0.subs.subscribe(q1, "q1", manualAccept); BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted @@ -308,9 +330,9 @@ QPID_AUTO_TEST_CASE(testUnacked) { // Create unacked message: not acquired, accepted or completeed. SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0); - c0.session.queueDeclare("q2"); - c0.session.messageTransfer(arg::content=Message("21","q2")); - c0.session.messageTransfer(arg::content=Message("22","q2")); + c0.session.queueDeclare("q2", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("21","q2"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("22","q2"), arg::durable=durableFlag); LocalQueue q2; c0.subs.subscribe(q2, "q2", manualAcquire); m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue @@ -323,9 +345,9 @@ QPID_AUTO_TEST_CASE(testUnacked) { // Create empty credit record: acquire and accept but don't complete. SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION); - c0.session.queueDeclare("q3"); - c0.session.messageTransfer(arg::content=Message("31", "q3")); - c0.session.messageTransfer(arg::content=Message("32", "q3")); + c0.session.queueDeclare("q3", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("31", "q3"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("32", "q3"), arg::durable=durableFlag); LocalQueue q3; c0.subs.subscribe(q3, "q3", manualComplete); Message m31=q3.get(TIMEOUT); @@ -360,14 +382,16 @@ QPID_AUTO_TEST_CASE(testUnacked) { QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { // Verify that we update transaction state correctly to new members. - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); // Do work in a transaction. c0.session.txSelect(); - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("1","q")); - c0.session.messageTransfer(arg::content=Message("2","q")); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("1","q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("2","q"), arg::durable=durableFlag); Message m; BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "1"); @@ -384,7 +408,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { BOOST_CHECK_EQUAL(m.getData(), "2"); // Another transaction with both members active. - c0.session.messageTransfer(arg::content=Message("3","q")); + c0.session.messageTransfer(arg::content=Message("3","q"), arg::durable=durableFlag); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); c0.session.txCommit(); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); @@ -394,9 +418,11 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { // Verify that we update a partially recieved message to a new member. - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); - c0.session.queueDeclare("q"); + c0.session.queueDeclare("q", arg::durable=durableFlag); Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel()); // Send first 2 frames of message. @@ -407,6 +433,10 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { sender.send(transfer, true, false, true, true); AMQHeaderBody header; header.get<DeliveryProperties>(true)->setRoutingKey("q"); + if (durableFlag) + header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_PERSISTENT); + else + header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_NON_PERSISTENT); sender.send(header, false, false, true, true); // No reliable way to ensure the partial message has arrived @@ -427,7 +457,9 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { } QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); set<int> kb0 = knownBrokerPorts(c0.connection); BOOST_CHECK_EQUAL(kb0.size(), 1u); @@ -459,11 +491,13 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { } QPID_AUTO_TEST_CASE(testUpdateConsumers) { - ClusterFixture cluster(1, 1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); - c0.session.queueDeclare("p"); - c0.session.queueDeclare("q"); + c0.session.queueDeclare("p", arg::durable=durableFlag); + c0.session.queueDeclare("q", arg::durable=durableFlag); c0.subs.subscribe(c0.lq, "q", FlowControl::zero()); LocalQueue lp; c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1)); @@ -476,10 +510,10 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) { Client c2(cluster[2], "c2"); // Transfer messages - c0.session.messageTransfer(arg::content=Message("aaa", "q")); + c0.session.messageTransfer(arg::content=Message("aaa", "q"), arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("bbb", "p")); - c0.session.messageTransfer(arg::content=Message("ccc", "p")); + c0.session.messageTransfer(arg::content=Message("bbb", "p"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("ccc", "p"), arg::durable=durableFlag); // Activate the subscription, ensure message removed on all queues. c0.subs.setFlowControl("q", FlowControl::unlimited()); @@ -504,20 +538,22 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) { cluster.killWithSilencer(0,c0.connection,9); BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u); for (int i = 0; i < 10; ++i) { - c1.session.messageTransfer(arg::content=Message("xxx", "q")); + c1.session.messageTransfer(arg::content=Message("xxx", "q"), arg::durable=durableFlag); BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT)); BOOST_REQUIRE_EQUAL(m.getData(), "xxx"); } } QPID_AUTO_TEST_CASE(testCatchupSharedState) { - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); // Create some shared state. - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("foo","q")); - c0.session.messageTransfer(arg::content=Message("bar","q")); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("foo","q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("bar","q"), arg::durable=durableFlag); while (c0.session.queueQuery("q").getMessageCount() != 2) sys::usleep(1000); // Wait for message to show up on broker 0. @@ -526,12 +562,12 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { cluster.add(); // Do some work post-add - c0.session.queueDeclare("p"); - c0.session.messageTransfer(arg::content=Message("pfoo","p")); + c0.session.queueDeclare("p", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("pfoo","p"), arg::durable=durableFlag); // Do some work post-join BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u); - c0.session.messageTransfer(arg::content=Message("pbar","p")); + c0.session.messageTransfer(arg::content=Message("pbar","p"), arg::durable=durableFlag); // Verify new brokers have state. Message m; @@ -556,11 +592,13 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { } QPID_AUTO_TEST_CASE(testWiringReplication) { - ClusterFixture cluster(3); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(3, args, -1); Client c0(cluster[0]); BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty()); BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); - c0.session.queueDeclare("q"); + c0.session.queueDeclare("q", arg::durable=durableFlag); c0.session.exchangeDeclare("ex", arg::type="direct"); c0.session.close(); c0.connection.close(); @@ -575,11 +613,13 @@ QPID_AUTO_TEST_CASE(testWiringReplication) { QPID_AUTO_TEST_CASE(testMessageEnqueue) { // Enqueue on one broker, dequeue on another. - ClusterFixture cluster(2); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(2, args, -1); Client c0(cluster[0]); - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("foo", "q")); - c0.session.messageTransfer(arg::content=Message("bar", "q")); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag); c0.session.close(); Client c1(cluster[1]); Message msg; @@ -591,11 +631,13 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) { QPID_AUTO_TEST_CASE(testMessageDequeue) { // Enqueue on one broker, dequeue on two others. - ClusterFixture cluster(3); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(3, args, -1); Client c0(cluster[0], "c0"); - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("foo", "q")); - c0.session.messageTransfer(arg::content=Message("bar", "q")); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag); Message msg; @@ -615,18 +657,20 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) { } QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { - ClusterFixture cluster(3); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(3, args, -1); Client c0(cluster[0]); BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers. // First start a subscription. - c0.session.queueDeclare("q"); + c0.session.queueDeclare("q", arg::durable=durableFlag); c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2)); // Now send messages Client c1(cluster[1]); - c1.session.messageTransfer(arg::content=Message("foo", "q")); - c1.session.messageTransfer(arg::content=Message("bar", "q")); + c1.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag); + c1.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag); // Check they arrived Message m; @@ -653,7 +697,7 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) void execute(AsyncSession& session, bool) { - session.messageTransfer(arg::content=Message(content, queue)); + session.messageTransfer(arg::content=Message(content, queue), arg::durable=durableFlag); } }; @@ -676,7 +720,7 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) void execute(AsyncSession& session, bool) { - session.queueDeclare(arg::queue=queue); + session.queueDeclare(arg::queue=queue, arg::durable=durableFlag); SubscriptionManager subs(session); subscription = subs.subscribe(*this, queue); session.sync(); @@ -707,7 +751,9 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) } }; - ClusterFixture cluster(2); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(2, args, -1); ConnectionSettings settings; settings.port = cluster[1]; settings.heartbeat = 1; |
