summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-05-22 23:52:17 +0000
committerAlan Conway <aconway@apache.org>2013-05-22 23:52:17 +0000
commit4e57cc3019fbb8cdf27f253d9c1fe1c4321f189b (patch)
tree21e369badfe1caad8b56e8c266ae4f7d28660e93 /qpid/cpp/src/tests
parent197e06a18ac0da4e34c29bf13f0290a1137f21df (diff)
downloadqpid-python-4e57cc3019fbb8cdf27f253d9c1fe1c4321f189b.tar.gz
QPID-4866: HA support for failover exchange
Add support for the "amq.failover" exchange with new HA, to support migration of clients that used this facility with the old cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1485511 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py14
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py23
2 files changed, 26 insertions, 11 deletions
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 75393fbdf1..7b0d88a27c 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -151,9 +151,8 @@ acl allow all all
def promote(self): self.ready(); self.qpid_ha(["promote"])
def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url])
- def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
+ def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]);
def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
-
def agent(self):
if not self._agent:
cred = self.client_credentials
@@ -191,7 +190,7 @@ acl allow all all
agent = self.agent()
assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout)
- # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
+ # TODO aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
assert subprocess.call(
[self.qpid_config_path, "--broker", self.host_port()]+args,
@@ -299,7 +298,7 @@ class HaCluster(object):
"""Start a new broker in the cluster"""
i = len(self)
assert i <= len(self._ports)
- if i == len(self._ports):
+ if i == len(self._ports): # Adding new broker after cluster init
self._ports.append(HaPort(self.test))
self._set_url()
self._update_urls()
@@ -311,10 +310,9 @@ class HaCluster(object):
self.url = ",".join("127.0.0.1:%s"%(p.port) for p in self._ports)
def _update_urls(self):
- if len(self) > 1: # No failover addresses on a 1 cluster.
- for b in self:
- b.set_brokers_url(self.url)
- b.set_public_url(self.url)
+ for b in self:
+ b.set_brokers_url(self.url)
+ b.set_public_url(self.url)
def connect(self, i):
"""Connect with reconnect_urls"""
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index b6642e4508..3836381ed2 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -549,7 +549,7 @@ class ReplicationTests(HaBrokerTest):
def test_auth(self):
"""Verify that authentication does not interfere with replication."""
- # FIXME aconway 2012-07-09: generate test sasl config portably for cmake
+ # TODO aconway 2012-07-09: generate test sasl config portably for cmake
sasl_config=os.path.join(self.rootdir, "sasl_config")
if not os.path.exists(sasl_config):
print "WARNING: Skipping test, SASL test configuration %s not found."%sasl_config
@@ -1183,6 +1183,24 @@ class ConfigurationTests(HaBrokerTest):
b = start("none", "none")
check(b, "", "")
+ def test_failover_exchange(self):
+ """Verify that the failover exchange correctly reports cluster membership"""
+
+ def strip_url(url): return re.sub('amqp:|tcp:', '', url)
+
+ def assert_url(m, url):
+ urls = m.properties['amq.failover']
+ self.assertEqual(1, len(urls))
+ self.assertEqual(strip_url(urls[0]), url)
+
+ cluster = HaCluster(self, 1, args=["--ha-public-url=foo:1234"])
+ r = cluster[0].connect().session().receiver("amq.failover")
+ assert_url(r.fetch(1), "foo:1234")
+ cluster[0].set_public_url("bar:1234")
+ assert_url(r.fetch(1), "bar:1234")
+ cluster[0].set_brokers_url(cluster.url+",xxx:1234")
+ self.assertRaises(Empty, r.fetch, 0) # Not updated for brokers URL
+
class StoreTests(BrokerTest):
"""Test for HA with persistence."""
@@ -1203,8 +1221,7 @@ class StoreTests(BrokerTest):
r = cluster[0].connect().session().receiver("qq")
self.assertEqual(r.fetch().content, "foo")
r.session.acknowledge()
- # FIXME aconway 2012-09-21: sending this message is an ugly hack to flush
- # the dequeue operation on qq.
+ # Sending this message is a hack to flush the dequeue operation on qq.
s.send(Message("flush", durable=True))
def verify(broker, x_count):