summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-12 18:07:47 +0000
committerAlan Conway <aconway@apache.org>2008-09-12 18:07:47 +0000
commitebb59131b198b693c5774dd51656fec520ddd770 (patch)
tree688cbbfe3aae8874156cd2ac2bf9430ef3af19df /cpp/src/tests
parentad8e400b786c5868d4e0d2aa880625240e44e311 (diff)
downloadqpid-python-ebb59131b198b693c5774dd51656fec520ddd770.tar.gz
Added ClusterMap and test. Moved PollableCondition, PollableQueue to sys.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@694758 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/ClusterMapTest.cpp191
-rw-r--r--cpp/src/tests/Makefile.am2
-rw-r--r--cpp/src/tests/cluster.mk3
-rw-r--r--cpp/src/tests/cluster_test.cpp144
-rwxr-xr-xcpp/src/tests/start_cluster2
-rwxr-xr-xcpp/src/tests/start_cluster_hosts2
6 files changed, 304 insertions, 40 deletions
diff --git a/cpp/src/tests/ClusterMapTest.cpp b/cpp/src/tests/ClusterMapTest.cpp
new file mode 100644
index 0000000000..344dd71eb5
--- /dev/null
+++ b/cpp/src/tests/ClusterMapTest.cpp
@@ -0,0 +1,191 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "qpid/cluster/ClusterMap.h"
+#include "qpid/framing/ClusterMapBody.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/Url.h"
+#include <boost/assign.hpp>
+
+QPID_AUTO_TEST_SUITE(CluterMapTest)
+
+using namespace std;
+using namespace qpid;
+using namespace cluster;
+using namespace framing;
+
+MemberId id(int i) { return MemberId(i,i); }
+
+Url url(const char* host) { return Url(TcpAddress(host)); }
+
+QPID_AUTO_TEST_CASE(testNotice) {
+ ClusterMap m;
+ BOOST_CHECK(!m.urlNotice(id(0), url("0-ready"))); // id(0) member, no dump.
+ BOOST_CHECK(m.isMember(id(0)));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
+ BOOST_CHECK_EQUAL(m.memberCount(), 1);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+ BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(1), url("1-dump"))); // Newbie, needs dump
+ BOOST_CHECK(m.isMember(id(0)));
+ BOOST_CHECK(m.isDumpee(id(1)));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
+ BOOST_CHECK_EQUAL(m.dumps(id(1)), 0);
+ BOOST_CHECK_EQUAL(m.memberCount(), 1);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
+
+ BOOST_CHECK(!m.urlNotice(id(1), url("1-ready"))); // id(1) is ready.
+ BOOST_CHECK(m.isMember(id(0)));
+ BOOST_CHECK(m.isMember(id(1)));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
+ BOOST_CHECK_EQUAL(m.dumps(id(1)), 0);
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+ BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(2), url("2-dump"))); // id(2) needs dump
+ BOOST_CHECK(m.isDumpee(id(2)));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
+
+ BOOST_CHECK_EQUAL(id(1), m.urlNotice(id(3), url("3-dump"))); // 0 busy, dump to id(1).
+ BOOST_CHECK(m.isDumpee(id(3)));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
+ BOOST_CHECK_EQUAL(m.dumps(id(1)), 1);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 2);
+
+ BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(4), url("4-dump"))); // Equally busy, 0 is first on list.
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 2);
+ BOOST_CHECK_EQUAL(m.dumps(id(1)), 1);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 3);
+
+ // My dumpees both complete
+ BOOST_CHECK(!m.urlNotice(id(2), url("2-ready")));
+ BOOST_CHECK(!m.urlNotice(id(4), url("4-ready")));
+ BOOST_CHECK(m.isMember(id(2)));
+ BOOST_CHECK(m.isMember(id(4)));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
+ BOOST_CHECK_EQUAL(m.dumps(id(1)), 1);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
+
+ // Final dumpee completes.
+ BOOST_CHECK(!m.urlNotice(id(3), url("3-ready")));
+ BOOST_CHECK(m.isMember(id(3)));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
+ BOOST_CHECK_EQUAL(m.dumps(id(1)), 0);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+}
+
+QPID_AUTO_TEST_CASE(testLeave) {
+ ClusterMap m;
+ BOOST_CHECK(!m.urlNotice(id(0), url("0-ready")));
+ BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(1), url("1-dump")));
+ BOOST_CHECK(!m.urlNotice(id(1), url("1-ready")));
+ BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(2), url("2-dump")));
+ BOOST_CHECK(!m.urlNotice(id(2), url("2-ready")));
+ BOOST_CHECK_EQUAL(m.memberCount(), 3);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+ m.leave(id(1));
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+ BOOST_CHECK(m.isMember(id(0)));
+ BOOST_CHECK(m.isMember(id(2)));
+
+ BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(4), url("4-dump")));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
+ BOOST_CHECK(m.isDumpee(id(4)));
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
+
+ m.dumpFailed(id(4)); // Dumper detected a failure.
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
+ BOOST_CHECK(!m.isDumpee(id(4)));
+ BOOST_CHECK(!m.isMember(id(4)));
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+ m.leave(id(4)); // Dumpee leaves, no-op since we already know it failed.
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+ BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(5), url("5-dump")));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
+ BOOST_CHECK(m.isDumpee(id(5)));
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
+
+ m.leave(id(5)); // Dumpee detects failure and leaves cluster.
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
+ BOOST_CHECK(!m.isDumpee(id(5)));
+ BOOST_CHECK(!m.isMember(id(5)));
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+ m.dumpFailed(id(5)); // Dumper reports failure - no op, we already know.
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+}
+
+QPID_AUTO_TEST_CASE(testToControl) {
+ ClusterMap m;
+ m.urlNotice(id(0), url("0"));
+ m.urlNotice(id(1), url("1dump"));
+ m.urlNotice(id(1), url("1"));
+ m.urlNotice(id(2), url("2dump"));
+ m.urlNotice(id(3), url("3dump"));
+ m.urlNotice(id(4), url("4dump"));
+
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 3);
+
+ ClusterMapBody b = m.toControl();
+
+ BOOST_CHECK_EQUAL(b.getMembers().count(), 2);
+ BOOST_CHECK_EQUAL(b.getMembers().getString(id(0).str()), url("0").str());
+ BOOST_CHECK_EQUAL(b.getMembers().getString(id(1).str()), url("1").str());
+
+ BOOST_CHECK_EQUAL(b.getDumpees().count(), 3);
+ BOOST_CHECK_EQUAL(b.getDumpees().getString(id(2).str()), url("2dump").str());
+ BOOST_CHECK_EQUAL(b.getDumpees().getString(id(3).str()), url("3dump").str());
+ BOOST_CHECK_EQUAL(b.getDumpees().getString(id(4).str()), url("4dump").str());
+
+ BOOST_CHECK_EQUAL(b.getDumps().count(), 3);
+ BOOST_CHECK_EQUAL(b.getDumps().getString(id(2).str()), id(0).str());
+ BOOST_CHECK_EQUAL(b.getDumps().getString(id(3).str()), id(1).str());
+ BOOST_CHECK_EQUAL(b.getDumps().getString(id(4).str()), id(0).str());
+
+ std::string s(b.size(), '\0');
+ Buffer buf(&s[0], s.size());
+ b.encode(buf);
+
+ ClusterMap m2;
+ m2.fromControl(b);
+ ClusterMapBody b2 = m2.toControl();
+ std::string s2(b2.size(), '\0');
+ Buffer buf2(&s2[0], s2.size());
+ b2.encode(buf2);
+
+ // Verify a round-trip encoding produces identical results.
+ BOOST_CHECK_EQUAL(s,s2);
+}
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 3b82aff8a8..8b0c12e47c 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -28,7 +28,7 @@ CLEANFILES=
TESTS+=unit_test
check_PROGRAMS+=unit_test
unit_test_LDADD=-lboost_unit_test_framework -lboost_regex \
- $(lib_client) $(lib_broker) # $(lib_amqp_0_10)
+ $(lib_client) $(lib_broker)
unit_test_SOURCES= unit_test.cpp unit_test.h \
BrokerFixture.h SocketProxy.h \
diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk
index 3454beb9bc..40d1c0df3e 100644
--- a/cpp/src/tests/cluster.mk
+++ b/cpp/src/tests/cluster.mk
@@ -18,4 +18,7 @@ check_PROGRAMS+=cluster_test
cluster_test_SOURCES=unit_test.cpp cluster_test.cpp
cluster_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
+unit_test_SOURCES+=ClusterMapTest.cpp
+unit_test_LDADD+=$(lib_cluster)
+
endif
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 1f9aac8fc5..da77d7405e 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -59,12 +59,19 @@ using boost::ptr_vector;
using qpid::cluster::Cluster;
using qpid::cluster::getGlobalCluster;
+/** Parse broker & cluster options */
+Broker::Options parseOpts(size_t argc, const char* argv[]) {
+ Broker::Options opts;
+ Plugin::addOptions(opts); // Pick up cluster options.
+ opts.parse(argc, argv, "", true); // Allow-unknown for --load-module
+ return opts;
+}
+
/** Cluster fixture is a vector of ports for the replicas.
* Replica 0 is in the current process, all others are forked as children.
*/
struct ClusterFixture : public vector<uint16_t> {
string name;
- Broker::Options opts;
std::auto_ptr<BrokerFixture> broker0;
boost::ptr_vector<ForkedBroker> forkedBrokers;
@@ -96,7 +103,7 @@ void ClusterFixture::add() {
const char* argv[] = {
"qpidd " __FILE__ ,
- "--load-module=../.libs/libqpidcluster.so",
+ "--load-module=../.libs/cluster.so",
"--cluster-name", name.c_str(),
"--auth=no", "--no-data-dir",
"--log-prefix", prefix.c_str(),
@@ -108,11 +115,8 @@ void ClusterFixture::add() {
push_back(forkedBrokers.back().getPort());
}
else { // First broker, run in this process.
- Broker::Options opts;
qpid::log::Logger::instance().setPrefix("main");
- Plugin::addOptions(opts); // Pick up cluster options.
- opts.parse(argc, argv, "", true); // Allow-unknown for --load-module
- broker0.reset(new BrokerFixture(opts));
+ broker0.reset(new BrokerFixture(parseOpts(argc, argv)));
push_back(broker0->getPort());
}
}
@@ -136,38 +140,16 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
-#if 0 // FIXME aconway 2008-09-10: finish & enable
-QPID_AUTO_TEST_CASE(testDumpConsumers) {
- ClusterFixture cluster(1);
- Client a(cluster[0]);
- a.session.queueDeclare("q");
- a.subs.subscribe(a.lq, "q");
-
- cluster.add();
- Client b(cluster[1]);
- try {
- b.connection.newSession(a.session.getId().getName());
- BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName());
- } catch (const SessionBusyException&) {}
-
- // Transfer some messages to the subscription by client a.
- Message m;
- a.session.messageTransfer(arg::bindingKey="q", arg::content=Message("aaa", "q"));
- BOOST_CHECK(a.lq.get(m, TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "aaa");
-
- b.session.messageTransfer(arg::bindingKey="q", arg::content=Message("bbb", "q"));
- BOOST_CHECK(a.lq.get(m, TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "bbb");
-
- // Verify that the queue has been drained on both brokers.
- // This proves that the consumer was replicated when the second broker joined.
- BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0);
-}
-#endif
-
+// FIXME aconway 2008-09-11: This test has to be first otherwise
+// it picks up the cluster name from a previous test and runs the
+// brokers as cluster nodes. Something wrong with option parsing...
+//
QPID_AUTO_TEST_CASE(testDumpClientSharedState) {
- BrokerFixture donor, receiver;
+ // In this test we don't want the cluster plugin to initialize, so set --cluster-name=""
+ const char* argv[] = { "--cluster-name", "" };
+ Broker::Options opts = parseOpts(sizeof(argv)/sizeof(*argv), argv);
+
+ BrokerFixture donor(opts), receiver(opts);
{
Client c(donor.getPort());
FieldTable args;
@@ -246,6 +228,94 @@ QPID_AUTO_TEST_CASE(testDumpClientSharedState) {
}
}
+
+// FIXME aconway 2008-09-12: finish the new join protocol.
+QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchUpSharedState, 1) {
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+ // Create some shared state.
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=Message("foo","q"));
+ while (c0.session.queueQuery("q").getMessageCount() != 1)
+ ::usleep(1000); // Wait for message to show up on broker 0.
+
+ // Now join new broker, should catch up.
+ cluster.add();
+ c0.session.messageTransfer(arg::content=Message("bar","q"));
+ c0.session.queueDeclare("p");
+ c0.session.messageTransfer(arg::content=Message("poo","p"));
+
+ // Verify new broker has all state.
+ Message m;
+ Client c1(cluster[1], "c1");
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "foo");
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "bar");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0);
+ BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "poo");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0);
+}
+
+QPID_AUTO_TEST_CASE(testStall) {
+ ClusterFixture cluster(2);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+
+ // Declare on all to avoid race condition.
+ c0.session.queueDeclare("q");
+ c1.session.queueDeclare("q");
+
+ // Stall 0, verify it does not process deliverys while stalled.
+ getGlobalCluster().stall();
+ c1.session.messageTransfer(arg::content=Message("foo","q"));
+ while (c1.session.queueQuery("q").getMessageCount() != 1)
+ ::usleep(1000); // Wait for message to show up on broker 1.
+ sleep(2); // FIXME aconway 2008-09-11: remove.
+ // But it should not be on broker 0.
+ boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q");
+ BOOST_REQUIRE(q0);
+ BOOST_CHECK_EQUAL(q0->getMessageCount(), 0);
+ // Now unstall and we should get the message.
+ getGlobalCluster().unStall();
+ Message m;
+ BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "foo");
+}
+
+#if 0 // FIXME aconway 2008-09-10: finish & enable
+QPID_AUTO_TEST_CASE(testDumpConsumers) {
+ ClusterFixture cluster(1);
+ Client a(cluster[0]);
+ a.session.queueDeclare("q");
+ a.subs.subscribe(a.lq, "q");
+
+ cluster.add();
+ Client b(cluster[1]);
+ try {
+ b.connection.newSession(a.session.getId().getName());
+ BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName());
+ } catch (const SessionBusyException&) {}
+
+ // Transfer some messages to the subscription by client a.
+ Message m;
+ a.session.messageTransfer(arg::content=Message("aaa", "q"));
+ BOOST_CHECK(a.lq.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "aaa");
+
+ b.session.messageTransfer(arg::content=Message("bbb", "q"));
+ BOOST_CHECK(a.lq.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "bbb");
+
+ // Verify that the queue has been drained on both brokers.
+ // This proves that the consumer was replicated when the second broker joined.
+ BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0);
+}
+
+
+#endif
+
QPID_AUTO_TEST_CASE(testForkedBroker) {
// Verify the ForkedBroker works as expected.
const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" };
diff --git a/cpp/src/tests/start_cluster b/cpp/src/tests/start_cluster
index 6d254190df..0c1722e566 100755
--- a/cpp/src/tests/start_cluster
+++ b/cpp/src/tests/start_cluster
@@ -12,7 +12,7 @@ test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; }
rm -f cluster*.log
SIZE=$1; shift
CLUSTER=`pwd` # Cluster name=pwd, avoid clashes.
-OPTS="-d --load-module ../.libs/libqpidcluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $*"
+OPTS="-d --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $*"
if test "$SIZE" = "one"; then # Special case of singleton cluster, use default port.
../qpidd -q
diff --git a/cpp/src/tests/start_cluster_hosts b/cpp/src/tests/start_cluster_hosts
index 683798453b..f1dce9f0de 100755
--- a/cpp/src/tests/start_cluster_hosts
+++ b/cpp/src/tests/start_cluster_hosts
@@ -15,7 +15,7 @@
#
QPIDD=${QPIDD:-$PWD/../qpidd}
-LIBQPIDCLUSTER=${LIBQPIDCLUSTER:-$PWD/../.libs/libqpidcluster.so}
+LIBQPIDCLUSTER=${LIBQPIDCLUSTER:-$PWD/../.libs/cluster.so}
NAME=$USER # User name is default cluster name.
RESTART=NO