diff options
| author | Alan Conway <aconway@apache.org> | 2008-09-12 18:07:47 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-09-12 18:07:47 +0000 |
| commit | ebb59131b198b693c5774dd51656fec520ddd770 (patch) | |
| tree | 688cbbfe3aae8874156cd2ac2bf9430ef3af19df /cpp/src/tests | |
| parent | ad8e400b786c5868d4e0d2aa880625240e44e311 (diff) | |
| download | qpid-python-ebb59131b198b693c5774dd51656fec520ddd770.tar.gz | |
Added ClusterMap and test. Moved PollableCondition, PollableQueue to sys.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@694758 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
| -rw-r--r-- | cpp/src/tests/ClusterMapTest.cpp | 191 | ||||
| -rw-r--r-- | cpp/src/tests/Makefile.am | 2 | ||||
| -rw-r--r-- | cpp/src/tests/cluster.mk | 3 | ||||
| -rw-r--r-- | cpp/src/tests/cluster_test.cpp | 144 | ||||
| -rwxr-xr-x | cpp/src/tests/start_cluster | 2 | ||||
| -rwxr-xr-x | cpp/src/tests/start_cluster_hosts | 2 |
6 files changed, 304 insertions, 40 deletions
diff --git a/cpp/src/tests/ClusterMapTest.cpp b/cpp/src/tests/ClusterMapTest.cpp new file mode 100644 index 0000000000..344dd71eb5 --- /dev/null +++ b/cpp/src/tests/ClusterMapTest.cpp @@ -0,0 +1,191 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +#include "unit_test.h" +#include "test_tools.h" +#include "qpid/cluster/ClusterMap.h" +#include "qpid/framing/ClusterMapBody.h" +#include "qpid/framing/Buffer.h" +#include "qpid/Url.h" +#include <boost/assign.hpp> + +QPID_AUTO_TEST_SUITE(CluterMapTest) + +using namespace std; +using namespace qpid; +using namespace cluster; +using namespace framing; + +MemberId id(int i) { return MemberId(i,i); } + +Url url(const char* host) { return Url(TcpAddress(host)); } + +QPID_AUTO_TEST_CASE(testNotice) { + ClusterMap m; + BOOST_CHECK(!m.urlNotice(id(0), url("0-ready"))); // id(0) member, no dump. + BOOST_CHECK(m.isMember(id(0))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); + BOOST_CHECK_EQUAL(m.memberCount(), 1); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + + BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(1), url("1-dump"))); // Newbie, needs dump + BOOST_CHECK(m.isMember(id(0))); + BOOST_CHECK(m.isDumpee(id(1))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); + BOOST_CHECK_EQUAL(m.dumps(id(1)), 0); + BOOST_CHECK_EQUAL(m.memberCount(), 1); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 1); + + BOOST_CHECK(!m.urlNotice(id(1), url("1-ready"))); // id(1) is ready. + BOOST_CHECK(m.isMember(id(0))); + BOOST_CHECK(m.isMember(id(1))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); + BOOST_CHECK_EQUAL(m.dumps(id(1)), 0); + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + + BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(2), url("2-dump"))); // id(2) needs dump + BOOST_CHECK(m.isDumpee(id(2))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 1); + + BOOST_CHECK_EQUAL(id(1), m.urlNotice(id(3), url("3-dump"))); // 0 busy, dump to id(1). + BOOST_CHECK(m.isDumpee(id(3))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); + BOOST_CHECK_EQUAL(m.dumps(id(1)), 1); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 2); + + BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(4), url("4-dump"))); // Equally busy, 0 is first on list. + BOOST_CHECK_EQUAL(m.dumps(id(0)), 2); + BOOST_CHECK_EQUAL(m.dumps(id(1)), 1); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 3); + + // My dumpees both complete + BOOST_CHECK(!m.urlNotice(id(2), url("2-ready"))); + BOOST_CHECK(!m.urlNotice(id(4), url("4-ready"))); + BOOST_CHECK(m.isMember(id(2))); + BOOST_CHECK(m.isMember(id(4))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); + BOOST_CHECK_EQUAL(m.dumps(id(1)), 1); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 1); + + // Final dumpee completes. + BOOST_CHECK(!m.urlNotice(id(3), url("3-ready"))); + BOOST_CHECK(m.isMember(id(3))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); + BOOST_CHECK_EQUAL(m.dumps(id(1)), 0); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + +} + +QPID_AUTO_TEST_CASE(testLeave) { + ClusterMap m; + BOOST_CHECK(!m.urlNotice(id(0), url("0-ready"))); + BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(1), url("1-dump"))); + BOOST_CHECK(!m.urlNotice(id(1), url("1-ready"))); + BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(2), url("2-dump"))); + BOOST_CHECK(!m.urlNotice(id(2), url("2-ready"))); + BOOST_CHECK_EQUAL(m.memberCount(), 3); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + + m.leave(id(1)); + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + BOOST_CHECK(m.isMember(id(0))); + BOOST_CHECK(m.isMember(id(2))); + + BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(4), url("4-dump"))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); + BOOST_CHECK(m.isDumpee(id(4))); + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 1); + + m.dumpFailed(id(4)); // Dumper detected a failure. + BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); + BOOST_CHECK(!m.isDumpee(id(4))); + BOOST_CHECK(!m.isMember(id(4))); + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + + m.leave(id(4)); // Dumpee leaves, no-op since we already know it failed. + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + + BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(5), url("5-dump"))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); + BOOST_CHECK(m.isDumpee(id(5))); + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 1); + + m.leave(id(5)); // Dumpee detects failure and leaves cluster. + BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); + BOOST_CHECK(!m.isDumpee(id(5))); + BOOST_CHECK(!m.isMember(id(5))); + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + + m.dumpFailed(id(5)); // Dumper reports failure - no op, we already know. + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); +} + +QPID_AUTO_TEST_CASE(testToControl) { + ClusterMap m; + m.urlNotice(id(0), url("0")); + m.urlNotice(id(1), url("1dump")); + m.urlNotice(id(1), url("1")); + m.urlNotice(id(2), url("2dump")); + m.urlNotice(id(3), url("3dump")); + m.urlNotice(id(4), url("4dump")); + + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 3); + + ClusterMapBody b = m.toControl(); + + BOOST_CHECK_EQUAL(b.getMembers().count(), 2); + BOOST_CHECK_EQUAL(b.getMembers().getString(id(0).str()), url("0").str()); + BOOST_CHECK_EQUAL(b.getMembers().getString(id(1).str()), url("1").str()); + + BOOST_CHECK_EQUAL(b.getDumpees().count(), 3); + BOOST_CHECK_EQUAL(b.getDumpees().getString(id(2).str()), url("2dump").str()); + BOOST_CHECK_EQUAL(b.getDumpees().getString(id(3).str()), url("3dump").str()); + BOOST_CHECK_EQUAL(b.getDumpees().getString(id(4).str()), url("4dump").str()); + + BOOST_CHECK_EQUAL(b.getDumps().count(), 3); + BOOST_CHECK_EQUAL(b.getDumps().getString(id(2).str()), id(0).str()); + BOOST_CHECK_EQUAL(b.getDumps().getString(id(3).str()), id(1).str()); + BOOST_CHECK_EQUAL(b.getDumps().getString(id(4).str()), id(0).str()); + + std::string s(b.size(), '\0'); + Buffer buf(&s[0], s.size()); + b.encode(buf); + + ClusterMap m2; + m2.fromControl(b); + ClusterMapBody b2 = m2.toControl(); + std::string s2(b2.size(), '\0'); + Buffer buf2(&s2[0], s2.size()); + b2.encode(buf2); + + // Verify a round-trip encoding produces identical results. + BOOST_CHECK_EQUAL(s,s2); +} + +QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 3b82aff8a8..8b0c12e47c 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -28,7 +28,7 @@ CLEANFILES= TESTS+=unit_test check_PROGRAMS+=unit_test unit_test_LDADD=-lboost_unit_test_framework -lboost_regex \ - $(lib_client) $(lib_broker) # $(lib_amqp_0_10) + $(lib_client) $(lib_broker) unit_test_SOURCES= unit_test.cpp unit_test.h \ BrokerFixture.h SocketProxy.h \ diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index 3454beb9bc..40d1c0df3e 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -18,4 +18,7 @@ check_PROGRAMS+=cluster_test cluster_test_SOURCES=unit_test.cpp cluster_test.cpp cluster_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework +unit_test_SOURCES+=ClusterMapTest.cpp +unit_test_LDADD+=$(lib_cluster) + endif diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 1f9aac8fc5..da77d7405e 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -59,12 +59,19 @@ using boost::ptr_vector; using qpid::cluster::Cluster; using qpid::cluster::getGlobalCluster; +/** Parse broker & cluster options */ +Broker::Options parseOpts(size_t argc, const char* argv[]) { + Broker::Options opts; + Plugin::addOptions(opts); // Pick up cluster options. + opts.parse(argc, argv, "", true); // Allow-unknown for --load-module + return opts; +} + /** Cluster fixture is a vector of ports for the replicas. * Replica 0 is in the current process, all others are forked as children. */ struct ClusterFixture : public vector<uint16_t> { string name; - Broker::Options opts; std::auto_ptr<BrokerFixture> broker0; boost::ptr_vector<ForkedBroker> forkedBrokers; @@ -96,7 +103,7 @@ void ClusterFixture::add() { const char* argv[] = { "qpidd " __FILE__ , - "--load-module=../.libs/libqpidcluster.so", + "--load-module=../.libs/cluster.so", "--cluster-name", name.c_str(), "--auth=no", "--no-data-dir", "--log-prefix", prefix.c_str(), @@ -108,11 +115,8 @@ void ClusterFixture::add() { push_back(forkedBrokers.back().getPort()); } else { // First broker, run in this process. - Broker::Options opts; qpid::log::Logger::instance().setPrefix("main"); - Plugin::addOptions(opts); // Pick up cluster options. - opts.parse(argc, argv, "", true); // Allow-unknown for --load-module - broker0.reset(new BrokerFixture(opts)); + broker0.reset(new BrokerFixture(parseOpts(argc, argv))); push_back(broker0->getPort()); } } @@ -136,38 +140,16 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } -#if 0 // FIXME aconway 2008-09-10: finish & enable -QPID_AUTO_TEST_CASE(testDumpConsumers) { - ClusterFixture cluster(1); - Client a(cluster[0]); - a.session.queueDeclare("q"); - a.subs.subscribe(a.lq, "q"); - - cluster.add(); - Client b(cluster[1]); - try { - b.connection.newSession(a.session.getId().getName()); - BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName()); - } catch (const SessionBusyException&) {} - - // Transfer some messages to the subscription by client a. - Message m; - a.session.messageTransfer(arg::bindingKey="q", arg::content=Message("aaa", "q")); - BOOST_CHECK(a.lq.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "aaa"); - - b.session.messageTransfer(arg::bindingKey="q", arg::content=Message("bbb", "q")); - BOOST_CHECK(a.lq.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "bbb"); - - // Verify that the queue has been drained on both brokers. - // This proves that the consumer was replicated when the second broker joined. - BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0); -} -#endif - +// FIXME aconway 2008-09-11: This test has to be first otherwise +// it picks up the cluster name from a previous test and runs the +// brokers as cluster nodes. Something wrong with option parsing... +// QPID_AUTO_TEST_CASE(testDumpClientSharedState) { - BrokerFixture donor, receiver; + // In this test we don't want the cluster plugin to initialize, so set --cluster-name="" + const char* argv[] = { "--cluster-name", "" }; + Broker::Options opts = parseOpts(sizeof(argv)/sizeof(*argv), argv); + + BrokerFixture donor(opts), receiver(opts); { Client c(donor.getPort()); FieldTable args; @@ -246,6 +228,94 @@ QPID_AUTO_TEST_CASE(testDumpClientSharedState) { } } + +// FIXME aconway 2008-09-12: finish the new join protocol. +QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchUpSharedState, 1) { + ClusterFixture cluster(1); + Client c0(cluster[0], "c0"); + // Create some shared state. + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=Message("foo","q")); + while (c0.session.queueQuery("q").getMessageCount() != 1) + ::usleep(1000); // Wait for message to show up on broker 0. + + // Now join new broker, should catch up. + cluster.add(); + c0.session.messageTransfer(arg::content=Message("bar","q")); + c0.session.queueDeclare("p"); + c0.session.messageTransfer(arg::content=Message("poo","p")); + + // Verify new broker has all state. + Message m; + Client c1(cluster[1], "c1"); + BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "foo"); + BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "bar"); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0); + BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "poo"); + BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0); +} + +QPID_AUTO_TEST_CASE(testStall) { + ClusterFixture cluster(2); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + + // Declare on all to avoid race condition. + c0.session.queueDeclare("q"); + c1.session.queueDeclare("q"); + + // Stall 0, verify it does not process deliverys while stalled. + getGlobalCluster().stall(); + c1.session.messageTransfer(arg::content=Message("foo","q")); + while (c1.session.queueQuery("q").getMessageCount() != 1) + ::usleep(1000); // Wait for message to show up on broker 1. + sleep(2); // FIXME aconway 2008-09-11: remove. + // But it should not be on broker 0. + boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q"); + BOOST_REQUIRE(q0); + BOOST_CHECK_EQUAL(q0->getMessageCount(), 0); + // Now unstall and we should get the message. + getGlobalCluster().unStall(); + Message m; + BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "foo"); +} + +#if 0 // FIXME aconway 2008-09-10: finish & enable +QPID_AUTO_TEST_CASE(testDumpConsumers) { + ClusterFixture cluster(1); + Client a(cluster[0]); + a.session.queueDeclare("q"); + a.subs.subscribe(a.lq, "q"); + + cluster.add(); + Client b(cluster[1]); + try { + b.connection.newSession(a.session.getId().getName()); + BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName()); + } catch (const SessionBusyException&) {} + + // Transfer some messages to the subscription by client a. + Message m; + a.session.messageTransfer(arg::content=Message("aaa", "q")); + BOOST_CHECK(a.lq.get(m, TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "aaa"); + + b.session.messageTransfer(arg::content=Message("bbb", "q")); + BOOST_CHECK(a.lq.get(m, TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "bbb"); + + // Verify that the queue has been drained on both brokers. + // This proves that the consumer was replicated when the second broker joined. + BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0); +} + + +#endif + QPID_AUTO_TEST_CASE(testForkedBroker) { // Verify the ForkedBroker works as expected. const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" }; diff --git a/cpp/src/tests/start_cluster b/cpp/src/tests/start_cluster index 6d254190df..0c1722e566 100755 --- a/cpp/src/tests/start_cluster +++ b/cpp/src/tests/start_cluster @@ -12,7 +12,7 @@ test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; } rm -f cluster*.log SIZE=$1; shift CLUSTER=`pwd` # Cluster name=pwd, avoid clashes. -OPTS="-d --load-module ../.libs/libqpidcluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $*" +OPTS="-d --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $*" if test "$SIZE" = "one"; then # Special case of singleton cluster, use default port. ../qpidd -q diff --git a/cpp/src/tests/start_cluster_hosts b/cpp/src/tests/start_cluster_hosts index 683798453b..f1dce9f0de 100755 --- a/cpp/src/tests/start_cluster_hosts +++ b/cpp/src/tests/start_cluster_hosts @@ -15,7 +15,7 @@ # QPIDD=${QPIDD:-$PWD/../qpidd} -LIBQPIDCLUSTER=${LIBQPIDCLUSTER:-$PWD/../.libs/libqpidcluster.so} +LIBQPIDCLUSTER=${LIBQPIDCLUSTER:-$PWD/../.libs/cluster.so} NAME=$USER # User name is default cluster name. RESTART=NO |
