diff options
| author | Alan Conway <aconway@apache.org> | 2013-05-22 23:52:17 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-05-22 23:52:17 +0000 |
| commit | 4e57cc3019fbb8cdf27f253d9c1fe1c4321f189b (patch) | |
| tree | 21e369badfe1caad8b56e8c266ae4f7d28660e93 /qpid/cpp/src/tests | |
| parent | 197e06a18ac0da4e34c29bf13f0290a1137f21df (diff) | |
| download | qpid-python-4e57cc3019fbb8cdf27f253d9c1fe1c4321f189b.tar.gz | |
QPID-4866: HA support for failover exchange
Add support for the "amq.failover" exchange with new HA, to support migration of
clients that used this facility with the old cluster.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1485511 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
| -rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 14 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 23 |
2 files changed, 26 insertions, 11 deletions
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 75393fbdf1..7b0d88a27c 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -151,9 +151,8 @@ acl allow all all def promote(self): self.ready(); self.qpid_ha(["promote"]) def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url]) - def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]) + def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]); def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) - def agent(self): if not self._agent: cred = self.client_credentials @@ -191,7 +190,7 @@ acl allow all all agent = self.agent() assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout) - # FIXME aconway 2012-05-01: do direct python call to qpid-config code. + # TODO aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): assert subprocess.call( [self.qpid_config_path, "--broker", self.host_port()]+args, @@ -299,7 +298,7 @@ class HaCluster(object): """Start a new broker in the cluster""" i = len(self) assert i <= len(self._ports) - if i == len(self._ports): + if i == len(self._ports): # Adding new broker after cluster init self._ports.append(HaPort(self.test)) self._set_url() self._update_urls() @@ -311,10 +310,9 @@ class HaCluster(object): self.url = ",".join("127.0.0.1:%s"%(p.port) for p in self._ports) def _update_urls(self): - if len(self) > 1: # No failover addresses on a 1 cluster. - for b in self: - b.set_brokers_url(self.url) - b.set_public_url(self.url) + for b in self: + b.set_brokers_url(self.url) + b.set_public_url(self.url) def connect(self, i): """Connect with reconnect_urls""" diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index b6642e4508..3836381ed2 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -549,7 +549,7 @@ class ReplicationTests(HaBrokerTest): def test_auth(self): """Verify that authentication does not interfere with replication.""" - # FIXME aconway 2012-07-09: generate test sasl config portably for cmake + # TODO aconway 2012-07-09: generate test sasl config portably for cmake sasl_config=os.path.join(self.rootdir, "sasl_config") if not os.path.exists(sasl_config): print "WARNING: Skipping test, SASL test configuration %s not found."%sasl_config @@ -1183,6 +1183,24 @@ class ConfigurationTests(HaBrokerTest): b = start("none", "none") check(b, "", "") + def test_failover_exchange(self): + """Verify that the failover exchange correctly reports cluster membership""" + + def strip_url(url): return re.sub('amqp:|tcp:', '', url) + + def assert_url(m, url): + urls = m.properties['amq.failover'] + self.assertEqual(1, len(urls)) + self.assertEqual(strip_url(urls[0]), url) + + cluster = HaCluster(self, 1, args=["--ha-public-url=foo:1234"]) + r = cluster[0].connect().session().receiver("amq.failover") + assert_url(r.fetch(1), "foo:1234") + cluster[0].set_public_url("bar:1234") + assert_url(r.fetch(1), "bar:1234") + cluster[0].set_brokers_url(cluster.url+",xxx:1234") + self.assertRaises(Empty, r.fetch, 0) # Not updated for brokers URL + class StoreTests(BrokerTest): """Test for HA with persistence.""" @@ -1203,8 +1221,7 @@ class StoreTests(BrokerTest): r = cluster[0].connect().session().receiver("qq") self.assertEqual(r.fetch().content, "foo") r.session.acknowledge() - # FIXME aconway 2012-09-21: sending this message is an ugly hack to flush - # the dequeue operation on qq. + # Sending this message is a hack to flush the dequeue operation on qq. s.send(Message("flush", durable=True)) def verify(broker, x_count): |
