diff options
| author | Alan Conway <aconway@apache.org> | 2008-10-07 17:27:06 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-10-07 17:27:06 +0000 |
| commit | 41d33af55b9fbf4c664ccb56accb1a37bd1ef006 (patch) | |
| tree | de5e5b5e431bf695b2c44e198ee93d179201a0e2 /cpp/src/tests/cluster_test.cpp | |
| parent | a653ebe5bdfad1d44a576d2ab23f7e6ea80ba96f (diff) | |
| download | qpid-python-41d33af55b9fbf4c664ccb56accb1a37bd1ef006.tar.gz | |
broker: Fixed incorrect pass-by-reference of Queue::shared_ptr in several files.
cluster: added FailoverExchange - send cluster membership to clients.
client: added FailoverListener - receive cluster updates from failover exchange.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702552 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/cluster_test.cpp')
| -rw-r--r-- | cpp/src/tests/cluster_test.cpp | 83 |
1 files changed, 79 insertions, 4 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 7b67fed388..5cfcbc262d 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -23,6 +23,7 @@ #include "qpid/client/Connection.h" #include "qpid/client/Session.h" +#include "qpid/client/FailoverListener.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/Cpg.h" #include "qpid/cluster/DumpClient.h" @@ -38,7 +39,9 @@ #include <iostream> #include <iterator> #include <vector> +#include <set> #include <algorithm> +#include <iterator> namespace qpid { namespace cluster { @@ -46,6 +49,12 @@ Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp }} // namespace qpid::cluster +namespace std { // ostream operators in std:: namespace +template <class T> +ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o, s); } +} + + QPID_AUTO_TEST_SUITE(cluster) using namespace std; @@ -88,11 +97,8 @@ struct ClusterFixture : public vector<uint16_t> { } void waitFor(size_t n) { - size_t retry=1000; // TODO aconway 2008-07-16: nasty sleeps, clean this up. - while (retry && getGlobalCluster().getUrls().size() != n) { + for (size_t retry = 1000; retry && getGlobalCluster().getUrls().size() != n; --retry) ::usleep(1000); - --retry; - } } }; @@ -164,6 +170,75 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } +template <class C> set<uint16_t> makeSet(const C& c) { + set<uint16_t> s; + std::copy(c.begin(), c.end(), std::inserter(s, s.begin())); + return s; +} + +std::set<uint16_t> portsFromFailoverArray(const framing::Array& urlArray) { + std::set<uint16_t> ports; + for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i < urlArray.end(); ++i ) { + Url url((*i)->get<std::string>()); + BOOST_REQUIRE(url.size() > 0); + BOOST_REQUIRE(url[0].get<TcpAddress>()); + ports.insert(url[0].get<TcpAddress>()->port); + } + return ports; +} + +std::set<uint16_t> portsFromFailoverMessage(const Message& m) { + framing::Array urlArray; + m.getHeaders().getArray("amq.failover", urlArray); + return portsFromFailoverArray(urlArray); +} + +QPID_AUTO_TEST_CASE(FailoverExchange) { + ClusterFixture cluster(2); + Client c0(cluster[0], "c0"); + c0.session.queueDeclare("q"); + c0.session.exchangeBind(arg::queue="q", arg::exchange="amq.failover"); + + Message m; + BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverMessage(m)); + + cluster.add(); + BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(makeSet(cluster),portsFromFailoverMessage(m)); +} + +std::set<uint16_t> portsFromFailoverListener(const FailoverListener& fl, size_t n) { + // Wait till there are n ports in the list. + vector<Url> kb = fl.getKnownBrokers(); + for (size_t retry=1000; kb.size() != n && retry != 0; --retry) { + ::usleep(1000); + kb = fl.getKnownBrokers(); + } + set<uint16_t> s; + for (vector<Url>::const_iterator i = kb.begin(); i != kb.end(); ++i) { + BOOST_MESSAGE("Failover URL: " << *i); + BOOST_CHECK(i->size() >= 1); + BOOST_CHECK((*i)[0].get<TcpAddress>()); + s.insert((*i)[0].get<TcpAddress>()->port); + } + return s; +} + +QPID_AUTO_TEST_CASE(testFailoverListener) { + ClusterFixture cluster(1); + Client c0(cluster[0], "c0"); + FailoverListener fl(c0.connection); + + set<uint16_t> set0=makeSet(cluster); + + BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1)); + cluster.add(); + BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverListener(fl, 2)); + cluster.kill(1); + BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1)); +} + QPID_AUTO_TEST_CASE(DumpConsumers) { ClusterFixture cluster(1); Client c0(cluster[0], "c0"); |
