summaryrefslogtreecommitdiff
path: root/RC9/qpid/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-01-13 18:11:43 +0000
committerRafael H. Schloming <rhs@apache.org>2009-01-13 18:11:43 +0000
commit7e34266b9a23f4536415bfbc3f161b84615b6550 (patch)
tree484008cf2d413f58b5e4ab80b373303c66200888 /RC9/qpid/cpp/src/tests/cluster_test.cpp
parent4612263ea692f00a4bd810438bdaf9bc88022091 (diff)
downloadqpid-python-M4.tar.gz
Tag M4 RC9M4
git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@734202 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'RC9/qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r--RC9/qpid/cpp/src/tests/cluster_test.cpp648
1 files changed, 648 insertions, 0 deletions
diff --git a/RC9/qpid/cpp/src/tests/cluster_test.cpp b/RC9/qpid/cpp/src/tests/cluster_test.cpp
new file mode 100644
index 0000000000..f4a38ae861
--- /dev/null
+++ b/RC9/qpid/cpp/src/tests/cluster_test.cpp
@@ -0,0 +1,648 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "test_tools.h"
+#include "unit_test.h"
+#include "ForkedBroker.h"
+#include "BrokerFixture.h"
+
+#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionAccess.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/FailoverListener.h"
+#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/DumpClient.h"
+#include "qpid/framing/AMQBody.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/enum.h"
+#include "qpid/log/Logger.h"
+
+#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include <string>
+#include <iostream>
+#include <iterator>
+#include <vector>
+#include <set>
+#include <algorithm>
+#include <iterator>
+
+namespace std { // ostream operators in std:: namespace
+template <class T>
+ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o, s); }
+}
+
+
+QPID_AUTO_TEST_SUITE(cluster)
+
+using namespace std;
+using namespace qpid;
+using namespace qpid::cluster;
+using namespace qpid::framing;
+using namespace qpid::client;
+using qpid::sys::TIME_SEC;
+using qpid::broker::Broker;
+using boost::shared_ptr;
+using qpid::cluster::Cluster;
+
+/** Parse broker & cluster options */
+Broker::Options parseOpts(size_t argc, const char* argv[]) {
+ Broker::Options opts;
+ Plugin::addOptions(opts); // Pick up cluster options.
+ opts.parse(argc, argv, "", true); // Allow-unknown for --load-module
+ return opts;
+}
+
+/** Cluster fixture is a vector of ports for the replicas.
+ *
+ * At most one replica (by default replica 0) is in the current
+ * process, all others are forked as children.
+ */
+class ClusterFixture : public vector<uint16_t> {
+ string name;
+ std::auto_ptr<BrokerFixture> localBroker;
+ int localIndex;
+ std::vector<shared_ptr<ForkedBroker> > forkedBrokers;
+
+ public:
+ /** @param localIndex can be -1 meaning don't automatically start a local broker.
+ * A local broker can be started with addLocal().
+ */
+ ClusterFixture(size_t n, int localIndex=0);
+ void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
+ void add(); // Add a broker.
+ void addLocal(); // Add a local broker.
+ void setup();
+
+ bool hasLocal() const { return localIndex >= 0 && size_t(localIndex) < size(); }
+
+ /** Kill a forked broker with sig, or shutdown localBroker if n==0. */
+ void kill(size_t n, int sig=SIGINT) {
+ if (n == size_t(localIndex))
+ localBroker->broker->shutdown();
+ else
+ forkedBrokers[n]->kill(sig);
+ }
+
+ /** Kill a broker and suppress errors from connection. */
+ void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT) {
+ ScopedSuppressLogging sl;
+ kill(n,sig);
+ try { c.close(); } catch(...) {}
+ }
+};
+
+ClusterFixture::ClusterFixture(size_t n, int localIndex_) : name(Uuid(true).str()), localIndex(localIndex_) {
+ add(n);
+}
+
+void ClusterFixture::add() {
+ if (size() != size_t(localIndex)) { // fork a broker process.
+ std::ostringstream os; os << "fork" << size();
+ std::string prefix = os.str();
+ const char* argv[] = {
+ "qpidd " __FILE__ ,
+ "--no-module-dir",
+ "--load-module=../.libs/cluster.so",
+ "--cluster-name", name.c_str(),
+ "--auth=no", "--no-data-dir",
+ "--log-prefix", prefix.c_str(),
+ };
+ size_t argc = sizeof(argv)/sizeof(argv[0]);
+ forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(argc, argv)));
+ push_back(forkedBrokers.back()->getPort());
+ }
+ else { // Run in this process
+ addLocal();
+ }
+}
+
+void ClusterFixture::addLocal() {
+ assert(int(size()) == localIndex || localIndex == -1);
+ localIndex = size();
+ const char* argv[] = {
+ "qpidd " __FILE__ ,
+ "--load-module=../.libs/cluster.so",
+ "--cluster-name", name.c_str(),
+ "--auth=no", "--no-data-dir"
+ };
+ size_t argc = sizeof(argv)/sizeof(argv[0]);
+ ostringstream os; os << "local" << localIndex;
+ qpid::log::Logger::instance().setPrefix(os.str());
+ localBroker.reset(new BrokerFixture(parseOpts(argc, argv)));
+ push_back(localBroker->getPort());
+ forkedBrokers.push_back(shared_ptr<ForkedBroker>());
+}
+
+ostream& operator<<(ostream& o, const cpg_name* n) {
+ return o << qpid::cluster::Cpg::str(*n);
+}
+
+ostream& operator<<(ostream& o, const cpg_address& a) {
+ return o << "(" << a.nodeid <<","<<a.pid<<","<<a.reason<<")";
+}
+
+template <class T>
+ostream& operator<<(ostream& o, const pair<T*, int>& array) {
+ o << "{ ";
+ ostream_iterator<cpg_address> i(o, " ");
+ copy(array.first, array.first+array.second, i);
+ o << "}";
+ return o;
+}
+
+template <class C> set<uint16_t> makeSet(const C& c) {
+ set<uint16_t> s;
+ std::copy(c.begin(), c.end(), std::inserter(s, s.begin()));
+ return s;
+}
+
+template <class T> std::set<uint16_t> knownBrokerPorts(T& source, int n=-1) {
+ vector<Url> urls = source.getKnownBrokers();
+ if (n >= 0 && unsigned(n) != urls.size()) {
+ BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls);
+ // Retry up to 10 secs in .1 second intervals.
+ for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
+ ::usleep(1000*100); // 0.1 secs
+ urls = source.getKnownBrokers();
+ }
+ }
+ BOOST_MESSAGE("knownBrokerPorts expecting " << n << ": " << urls);
+ set<uint16_t> s;
+ for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i)
+ s.insert((*i)[0].get<TcpAddress>()->port);
+ return s;
+}
+
+class Sender {
+ public:
+ Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {}
+ void send(const AMQBody& body, bool firstSeg, bool lastSeg, bool firstFrame, bool lastFrame) {
+ AMQFrame f(body);
+ f.setChannel(channel);
+ f.setFirstSegment(firstSeg);
+ f.setLastSegment(lastSeg);
+ f.setFirstFrame(firstFrame);
+ f.setLastFrame(lastFrame);
+ connection->handle(f);
+ }
+
+ private:
+ boost::shared_ptr<ConnectionImpl> connection;
+ uint16_t channel;
+};
+
+int64_t getMsgSequence(const Message& m) {
+ return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence");
+}
+
+QPID_AUTO_TEST_CASE(testSequenceOptions) {
+ // Make sure the exchange qpid.msg_sequence property is properly replicated.
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+ FieldTable args;
+ args.setInt("qpid.msg_sequence", 1); // FIXME aconway 2008-11-11: works with "qpid.sequence_counter"??
+ c0.session.queueDeclare(arg::queue="q");
+ c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args);
+ c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
+ c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex");
+ c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex");
+ BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIME_SEC)));
+ BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIME_SEC)));
+
+ cluster.add();
+ Client c1(cluster[1]);
+ c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex");
+ BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIME_SEC)));
+}
+
+QPID_AUTO_TEST_CASE(testUnsupported) {
+ ScopedSuppressLogging sl;
+ ClusterFixture cluster(1);
+ Client c1(cluster[0], "c1");
+ BOOST_CHECK_THROW(c1.session.dtxSelect(), FramingErrorException);
+ Client c2(cluster[0], "c2");
+ Message m;
+ m.getDeliveryProperties().setTtl(1);
+ BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception);
+}
+
+QPID_AUTO_TEST_CASE(testTxTransaction) {
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare(arg::queue="q");
+ c0.session.messageTransfer(arg::content=Message("A", "q"));
+ c0.session.messageTransfer(arg::content=Message("B", "q"));
+
+ // Start a transaction that will commit.
+ Session commitSession = c0.connection.newSession("commit");
+ SubscriptionManager commitSubs(commitSession);
+ commitSession.txSelect();
+ commitSession.messageTransfer(arg::content=Message("a", "q"));
+ commitSession.messageTransfer(arg::content=Message("b", "q"));
+ BOOST_CHECK_EQUAL(commitSubs.get("q", TIME_SEC).getData(), "A");
+
+ // Start a transaction that will roll back.
+ Session rollbackSession = c0.connection.newSession("rollback");
+ SubscriptionManager rollbackSubs(rollbackSession);
+ rollbackSession.txSelect();
+ rollbackSession.messageTransfer(arg::content=Message("1", "q"));
+ Message rollbackMessage = rollbackSubs.get("q", TIME_SEC);
+ BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
+
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+ // Add new member mid transaction.
+ cluster.add();
+ Client c1(cluster[1], "c1");
+
+ // More transactional work
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+ rollbackSession.messageTransfer(arg::content=Message("2", "q"));
+ commitSession.messageTransfer(arg::content=Message("c", "q"));
+ rollbackSession.messageTransfer(arg::content=Message("3", "q"));
+
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+
+ // Commit/roll back.
+ commitSession.txCommit();
+ rollbackSession.txRollback();
+ rollbackSession.messageRelease(rollbackMessage.getId());
+
+
+ // Verify queue status: just the comitted messages and dequeues should remain.
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "B");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "a");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "b");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "c");
+}
+
+QPID_AUTO_TEST_CASE(testUnacked) {
+ // Verify replication of unacknowledged messages.
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+
+ Message m;
+
+ // Create unacked message: acquired but not accepted.
+ SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0);
+ c0.session.queueDeclare("q1");
+ c0.session.messageTransfer(arg::content=Message("11","q1"));
+ LocalQueue q1;
+ c0.subs.subscribe(q1, "q1", manualAccept);
+ BOOST_CHECK_EQUAL(q1.get(TIME_SEC).getData(), "11"); // Acquired but not accepted
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue
+
+ // Create unacked message: not acquired, accepted or completeed.
+ SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0);
+ c0.session.queueDeclare("q2");
+ c0.session.messageTransfer(arg::content=Message("21","q2"));
+ c0.session.messageTransfer(arg::content=Message("22","q2"));
+ LocalQueue q2;
+ c0.subs.subscribe(q2, "q2", manualAcquire);
+ m = q2.get(TIME_SEC); // Not acquired or accepted, still on queue
+ BOOST_CHECK_EQUAL(m.getData(), "21");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed
+ c0.subs.getSubscription("q2").acquire(m); // Acquire manually
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed
+ BOOST_CHECK_EQUAL(q2.get(TIME_SEC).getData(), "22"); // Not acquired or accepted, still on queue
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired.
+
+ // Create empty credit record: acquire and accept but don't complete.
+ SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION);
+ c0.session.queueDeclare("q3");
+ c0.session.messageTransfer(arg::content=Message("31", "q3"));
+ c0.session.messageTransfer(arg::content=Message("32", "q3"));
+ LocalQueue q3;
+ c0.subs.subscribe(q3, "q3", manualComplete);
+ Message m31=q3.get(TIME_SEC);
+ BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed.
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u);
+
+ // Add new member while there are unacked messages.
+ cluster.add();
+ Client c1(cluster[1], "c1");
+
+ // Check queue counts
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 1u);
+
+ // Complete the empty credit message, should unblock the message behind it.
+ BOOST_CHECK_THROW(q3.get(0), Exception);
+ c0.session.markCompleted(SequenceSet(m31.getId()), true);
+ BOOST_CHECK_EQUAL(q3.get(TIME_SEC).getData(), "32");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u);
+
+ // Close the original session - unacked messages should be requeued.
+ c0.session.close();
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u);
+
+ BOOST_CHECK_EQUAL(c1.subs.get("q1", TIME_SEC).getData(), "11");
+ BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "21");
+ BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "22");
+}
+
+QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testDumpTxState, 1) {
+ // Verify that we dump transaction state correctly to new members.
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+
+ // Do work in a transaction.
+ c0.session.txSelect();
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=Message("1","q"));
+ c0.session.messageTransfer(arg::content=Message("2","q"));
+ Message m;
+ BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "1");
+
+ // New member, TX not comitted, c1 should see nothing.
+ cluster.add();
+ Client c1(cluster[1], "c1");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
+
+ // After commit c1 shoudl see results of tx.
+ c0.session.txCommit();
+ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "2");
+
+ // Another transaction with both members active.
+ c0.session.messageTransfer(arg::content=Message("3","q"));
+ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
+ c0.session.txCommit();
+ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "3");
+}
+
+QPID_AUTO_TEST_CASE(testDumpMessageBuilder) {
+ // Verify that we dump a partially recieved message to a new member.
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare("q");
+ Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel());
+
+ // Send first 2 frames of message.
+ MessageTransferBody transfer(
+ ProtocolVersion(), std::string(), // default exchange.
+ framing::message::ACCEPT_MODE_NONE,
+ framing::message::ACQUIRE_MODE_PRE_ACQUIRED);
+ sender.send(transfer, true, false, true, true);
+ AMQHeaderBody header;
+ header.get<DeliveryProperties>(true)->setRoutingKey("q");
+ sender.send(header, false, false, true, true);
+
+ // No reliable way to ensure the partial message has arrived
+ // before we start the new broker, so we sleep.
+ ::usleep(2500);
+ cluster.add();
+
+ // Send final 2 frames of message.
+ sender.send(AMQContentBody("ab"), false, true, true, false);
+ sender.send(AMQContentBody("cd"), false, true, false, true);
+
+ // Verify message is enqued correctly on second member.
+ Message m;
+ Client c1(cluster[1], "c1");
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "abcd");
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection).size());
+}
+
+QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+ set<uint16_t> kb0 = knownBrokerPorts(c0.connection);
+ BOOST_CHECK_EQUAL(kb0.size(), 1u);
+ BOOST_CHECK_EQUAL(kb0, makeSet(cluster));
+
+ cluster.add();
+ Client c1(cluster[1], "c1");
+ set<uint16_t> kb1 = knownBrokerPorts(c1.connection);
+ kb0 = knownBrokerPorts(c0.connection, 2);
+ BOOST_CHECK_EQUAL(kb1.size(), 2u);
+ BOOST_CHECK_EQUAL(kb1, makeSet(cluster));
+ BOOST_CHECK_EQUAL(kb1,kb0);
+
+ cluster.add();
+ Client c2(cluster[2], "c2");
+ set<uint16_t> kb2 = knownBrokerPorts(c2.connection);
+ kb1 = knownBrokerPorts(c1.connection, 3);
+ kb0 = knownBrokerPorts(c0.connection, 3);
+ BOOST_CHECK_EQUAL(kb2.size(), 3u);
+ BOOST_CHECK_EQUAL(kb2, makeSet(cluster));
+ BOOST_CHECK_EQUAL(kb2,kb0);
+ BOOST_CHECK_EQUAL(kb2,kb1);
+
+ cluster.killWithSilencer(1,c1.connection,9);
+ kb0 = knownBrokerPorts(c0.connection, 2);
+ kb2 = knownBrokerPorts(c2.connection, 2);
+ BOOST_CHECK_EQUAL(kb0.size(), 2u);
+ BOOST_CHECK_EQUAL(kb0, kb2);
+}
+
+QPID_AUTO_TEST_CASE(DumpConsumers) {
+ ClusterFixture cluster(1, 1);
+
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare("p");
+ c0.session.queueDeclare("q");
+ c0.subs.subscribe(c0.lq, "q", FlowControl::zero());
+ LocalQueue lp;
+ c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
+ c0.session.sync();
+
+ // Start new members
+ cluster.add(); // Local
+ Client c1(cluster[1], "c1");
+ cluster.add();
+ Client c2(cluster[2], "c2");
+
+ // Transfer messages
+ c0.session.messageTransfer(arg::content=Message("aaa", "q"));
+
+ c0.session.messageTransfer(arg::content=Message("bbb", "p"));
+ c0.session.messageTransfer(arg::content=Message("ccc", "p"));
+
+ // Activate the subscription, ensure message removed on all queues.
+ c0.subs.setFlowControl("q", FlowControl::unlimited());
+ Message m;
+ BOOST_CHECK(c0.lq.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "aaa");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
+
+ // Check second subscription's flow control: gets first message, not second.
+ BOOST_CHECK(lp.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "bbb");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u);
+
+ BOOST_CHECK(c0.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "ccc");
+
+ // Kill the subscribing member, ensure further messages are not removed.
+ cluster.killWithSilencer(0,c0.connection,9);
+ BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u);
+ for (int i = 0; i < 10; ++i) {
+ c1.session.messageTransfer(arg::content=Message("xxx", "q"));
+ BOOST_REQUIRE(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_REQUIRE_EQUAL(m.getData(), "xxx");
+ }
+}
+
+QPID_AUTO_TEST_CASE(testCatchupSharedState) {
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+
+ // Create some shared state.
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=Message("foo","q"));
+ c0.session.messageTransfer(arg::content=Message("bar","q"));
+ while (c0.session.queueQuery("q").getMessageCount() != 2)
+ ::usleep(1000); // Wait for message to show up on broker 0.
+
+ // Add a new broker, it should catch up.
+ cluster.add();
+
+ // Do some work post-add
+ c0.session.queueDeclare("p");
+ c0.session.messageTransfer(arg::content=Message("pfoo","p"));
+
+ // Do some work post-join
+ BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u);
+ c0.session.messageTransfer(arg::content=Message("pbar","p"));
+
+ // Verify new brokers have state.
+ Message m;
+
+ Client c1(cluster[1], "c1");
+
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "foo");
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "bar");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+
+ // Add another broker, don't wait for join - should be stalled till ready.
+ cluster.add();
+ Client c2(cluster[2], "c2");
+ BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "pfoo");
+ BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "pbar");
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u);
+}
+
+QPID_AUTO_TEST_CASE(testWiringReplication) {
+ ClusterFixture cluster(3);
+ Client c0(cluster[0]);
+ BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
+ BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty());
+ c0.session.queueDeclare("q");
+ c0.session.exchangeDeclare("ex", arg::type="direct");
+ c0.session.close();
+ c0.connection.close();
+ // Verify all brokers get wiring update.
+ for (size_t i = 0; i < cluster.size(); ++i) {
+ BOOST_MESSAGE("i == "<< i);
+ Client c(cluster[i]);
+ BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue());
+ BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType());
+ }
+}
+
+QPID_AUTO_TEST_CASE(testMessageEnqueue) {
+ // Enqueue on one broker, dequeue on another.
+ ClusterFixture cluster(2);
+ Client c0(cluster[0]);
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=Message("foo", "q"));
+ c0.session.messageTransfer(arg::content=Message("bar", "q"));
+ c0.session.close();
+ Client c1(cluster[1]);
+ Message msg;
+ BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
+ BOOST_CHECK_EQUAL(string("foo"), msg.getData());
+ BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
+ BOOST_CHECK_EQUAL(string("bar"), msg.getData());
+}
+
+QPID_AUTO_TEST_CASE(testMessageDequeue) {
+ // Enqueue on one broker, dequeue on two others.
+ ClusterFixture cluster(3);
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=Message("foo", "q"));
+ c0.session.messageTransfer(arg::content=Message("bar", "q"));
+
+ Message msg;
+
+ // Dequeue on 2 others, ensure correct order.
+ Client c1(cluster[1], "c1");
+ BOOST_CHECK(c1.subs.get(msg, "q"));
+ BOOST_CHECK_EQUAL("foo", msg.getData());
+
+ Client c2(cluster[2], "c2");
+ BOOST_CHECK(c1.subs.get(msg, "q"));
+ BOOST_CHECK_EQUAL("bar", msg.getData());
+
+ // Queue should be empty on all cluster members.
+ BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount());
+ BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
+ BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
+}
+
+QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
+ ClusterFixture cluster(3);
+ Client c0(cluster[0]);
+ BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers.
+
+ // First start a subscription.
+ c0.session.queueDeclare("q");
+ c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
+
+ // Now send messages
+ Client c1(cluster[1]);
+ c1.session.messageTransfer(arg::content=Message("foo", "q"));
+ c1.session.messageTransfer(arg::content=Message("bar", "q"));
+
+ // Check they arrived
+ Message m;
+ BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC));
+ BOOST_CHECK_EQUAL("foo", m.getData());
+ BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC));
+ BOOST_CHECK_EQUAL("bar", m.getData());
+
+ // Queue should be empty on all cluster members.
+ Client c2(cluster[2]);
+ BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount());
+ BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
+ BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
+}
+
+QPID_AUTO_TEST_SUITE_END()