From 15e9b9a0d064e142e6e971475338c0fff72579ad Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 7 Oct 2008 17:27:06 +0000 Subject: broker: Fixed incorrect pass-by-reference of Queue::shared_ptr in several files. cluster: added FailoverExchange - send cluster membership to clients. client: added FailoverListener - receive cluster updates from failover exchange. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@702552 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/cluster_test.cpp | 83 +++++++++++++++++++++++++++++++++++-- 1 file changed, 79 insertions(+), 4 deletions(-) (limited to 'qpid/cpp/src/tests/cluster_test.cpp') diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 7b67fed388..5cfcbc262d 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -23,6 +23,7 @@ #include "qpid/client/Connection.h" #include "qpid/client/Session.h" +#include "qpid/client/FailoverListener.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/Cpg.h" #include "qpid/cluster/DumpClient.h" @@ -38,7 +39,9 @@ #include #include #include +#include #include +#include namespace qpid { namespace cluster { @@ -46,6 +49,12 @@ Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp }} // namespace qpid::cluster +namespace std { // ostream operators in std:: namespace +template +ostream& operator<<(ostream& o, const std::set& s) { return seqPrint(o, s); } +} + + QPID_AUTO_TEST_SUITE(cluster) using namespace std; @@ -88,11 +97,8 @@ struct ClusterFixture : public vector { } void waitFor(size_t n) { - size_t retry=1000; // TODO aconway 2008-07-16: nasty sleeps, clean this up. - while (retry && getGlobalCluster().getUrls().size() != n) { + for (size_t retry = 1000; retry && getGlobalCluster().getUrls().size() != n; --retry) ::usleep(1000); - --retry; - } } }; @@ -164,6 +170,75 @@ ostream& operator<<(ostream& o, const pair& array) { return o; } +template set makeSet(const C& c) { + set s; + std::copy(c.begin(), c.end(), std::inserter(s, s.begin())); + return s; +} + +std::set portsFromFailoverArray(const framing::Array& urlArray) { + std::set ports; + for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i < urlArray.end(); ++i ) { + Url url((*i)->get()); + BOOST_REQUIRE(url.size() > 0); + BOOST_REQUIRE(url[0].get()); + ports.insert(url[0].get()->port); + } + return ports; +} + +std::set portsFromFailoverMessage(const Message& m) { + framing::Array urlArray; + m.getHeaders().getArray("amq.failover", urlArray); + return portsFromFailoverArray(urlArray); +} + +QPID_AUTO_TEST_CASE(FailoverExchange) { + ClusterFixture cluster(2); + Client c0(cluster[0], "c0"); + c0.session.queueDeclare("q"); + c0.session.exchangeBind(arg::queue="q", arg::exchange="amq.failover"); + + Message m; + BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverMessage(m)); + + cluster.add(); + BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(makeSet(cluster),portsFromFailoverMessage(m)); +} + +std::set portsFromFailoverListener(const FailoverListener& fl, size_t n) { + // Wait till there are n ports in the list. + vector kb = fl.getKnownBrokers(); + for (size_t retry=1000; kb.size() != n && retry != 0; --retry) { + ::usleep(1000); + kb = fl.getKnownBrokers(); + } + set s; + for (vector::const_iterator i = kb.begin(); i != kb.end(); ++i) { + BOOST_MESSAGE("Failover URL: " << *i); + BOOST_CHECK(i->size() >= 1); + BOOST_CHECK((*i)[0].get()); + s.insert((*i)[0].get()->port); + } + return s; +} + +QPID_AUTO_TEST_CASE(testFailoverListener) { + ClusterFixture cluster(1); + Client c0(cluster[0], "c0"); + FailoverListener fl(c0.connection); + + set set0=makeSet(cluster); + + BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1)); + cluster.add(); + BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverListener(fl, 2)); + cluster.kill(1); + BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1)); +} + QPID_AUTO_TEST_CASE(DumpConsumers) { ClusterFixture cluster(1); Client c0(cluster[0], "c0"); -- cgit v1.2.1