summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-09 19:36:51 +0000
committerAlan Conway <aconway@apache.org>2008-10-09 19:36:51 +0000
commit48ee75d4252bce8738395384b6f9daae4d251f34 (patch)
treecea3a1818f3109e94fd4bd794725749f765bb7eb /qpid/cpp/src/tests/cluster_test.cpp
parent4bc95324d6d2738542f9d8964b5d6364bce86840 (diff)
downloadqpid-python-48ee75d4252bce8738395384b6f9daae4d251f34.tar.gz
Client-side support for amq.faiover exchange. Connection::getKnownBrokers provides latest list.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@703237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp68
1 files changed, 24 insertions, 44 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 5cfcbc262d..5b9657c2c7 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -22,6 +22,7 @@
#include "BrokerFixture.h"
#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionAccess.h"
#include "qpid/client/Session.h"
#include "qpid/client/FailoverListener.h"
#include "qpid/cluster/Cluster.h"
@@ -176,47 +177,14 @@ template <class C> set<uint16_t> makeSet(const C& c) {
return s;
}
-std::set<uint16_t> portsFromFailoverArray(const framing::Array& urlArray) {
- std::set<uint16_t> ports;
- for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i < urlArray.end(); ++i ) {
- Url url((*i)->get<std::string>());
- BOOST_REQUIRE(url.size() > 0);
- BOOST_REQUIRE(url[0].get<TcpAddress>());
- ports.insert(url[0].get<TcpAddress>()->port);
- }
- return ports;
-}
-
-std::set<uint16_t> portsFromFailoverMessage(const Message& m) {
- framing::Array urlArray;
- m.getHeaders().getArray("amq.failover", urlArray);
- return portsFromFailoverArray(urlArray);
-}
-
-QPID_AUTO_TEST_CASE(FailoverExchange) {
- ClusterFixture cluster(2);
- Client c0(cluster[0], "c0");
- c0.session.queueDeclare("q");
- c0.session.exchangeBind(arg::queue="q", arg::exchange="amq.failover");
-
- Message m;
- BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC));
- BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverMessage(m));
-
- cluster.add();
- BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC));
- BOOST_CHECK_EQUAL(makeSet(cluster),portsFromFailoverMessage(m));
-}
-
-std::set<uint16_t> portsFromFailoverListener(const FailoverListener& fl, size_t n) {
- // Wait till there are n ports in the list.
- vector<Url> kb = fl.getKnownBrokers();
- for (size_t retry=1000; kb.size() != n && retry != 0; --retry) {
+template <class T> std::set<uint16_t> knownBrokerPorts(T& source, size_t n) {
+ vector<Url> urls = source.getKnownBrokers();
+ for (size_t retry=1000; urls.size() != n && retry != 0; --retry) {
::usleep(1000);
- kb = fl.getKnownBrokers();
+ urls = source.getKnownBrokers();
}
set<uint16_t> s;
- for (vector<Url>::const_iterator i = kb.begin(); i != kb.end(); ++i) {
+ for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
BOOST_MESSAGE("Failover URL: " << *i);
BOOST_CHECK(i->size() >= 1);
BOOST_CHECK((*i)[0].get<TcpAddress>());
@@ -226,17 +194,29 @@ std::set<uint16_t> portsFromFailoverListener(const FailoverListener& fl, size_t
}
QPID_AUTO_TEST_CASE(testFailoverListener) {
- ClusterFixture cluster(1);
+ ClusterFixture cluster(2);
Client c0(cluster[0], "c0");
- FailoverListener fl(c0.connection);
+ FailoverListener fl;
+ fl.start(ConnectionAccess::getImpl(c0.connection));
+ set<uint16_t> set0=makeSet(cluster);
+ BOOST_CHECK_EQUAL(set0, knownBrokerPorts(fl, 2));
+ cluster.add();
+ BOOST_CHECK_EQUAL(makeSet(cluster), knownBrokerPorts(fl, 3));
+ cluster.kill(2);
+ BOOST_CHECK_EQUAL(set0, knownBrokerPorts(fl, 2));
+}
+
+QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
+ ClusterFixture cluster(2);
+ Client c0(cluster[0], "c0");
set<uint16_t> set0=makeSet(cluster);
- BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1));
+ BOOST_CHECK_EQUAL(set0, knownBrokerPorts(c0.connection, 2));
cluster.add();
- BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverListener(fl, 2));
- cluster.kill(1);
- BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1));
+ BOOST_CHECK_EQUAL(makeSet(cluster), knownBrokerPorts(c0.connection, 3));
+ cluster.kill(2);
+ BOOST_CHECK_EQUAL(set0, knownBrokerPorts(c0.connection, 2));
}
QPID_AUTO_TEST_CASE(DumpConsumers) {