diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/tests/ha_tests.py | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/ha_tests.py')
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 1097 |
1 files changed, 666 insertions, 431 deletions
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index 310ef844bd..77399bf2e2 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -18,235 +18,32 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random import traceback -from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED +from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty from qpid.datatypes import uuid4 from brokertest import * +from ha_test import * from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG, INFO -from qpidtoollibs import BrokerAgent +from qpidtoollibs import BrokerAgent, EventHelper from uuid import UUID log = getLogger(__name__) -class QmfAgent(object): - """Access to a QMF broker agent.""" - def __init__(self, address, **kwargs): - self._connection = Connection.establish( - address, client_properties={"qpid.ha-admin":1}, **kwargs) - self._agent = BrokerAgent(self._connection) - assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address) +def grep(filename, regexp): + for line in open(filename).readlines(): + if (regexp.search(line)): return True + return False - def __getattr__(self, name): - a = getattr(self._agent, name) - return a +class HaBrokerTest(BrokerTest): + """Base class for HA broker tests""" + def assert_log_no_errors(self, broker): + log = broker.get_log() + if grep(log, re.compile("] error|] critical")): + self.fail("Errors in log file %s"%(log)) -class Credentials(object): - """SASL credentials: username, password, and mechanism""" - def __init__(self, username, password, mechanism): - (self.username, self.password, self.mechanism) = (username, password, mechanism) - - def __str__(self): return "Credentials%s"%(self.tuple(),) - - def tuple(self): return (self.username, self.password, self.mechanism) - - def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url) - -class HaBroker(Broker): - """Start a broker with HA enabled - @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker. - """ - def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all", - client_credentials=None, **kwargs): - assert BrokerTest.ha_lib, "Cannot locate HA plug-in" - args = copy(args) - args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=debug+:ha::", - # FIXME aconway 2012-02-13: workaround slow link failover. - "--link-maintenace-interval=0.1", - "--ha-cluster=%s"%ha_cluster] - if ha_replicate is not None: - args += [ "--ha-replicate=%s"%ha_replicate ] - if brokers_url: args += [ "--ha-brokers-url", brokers_url ] - Broker.__init__(self, test, args, **kwargs) - self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha") - assert os.path.exists(self.qpid_ha_path) - self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config") - assert os.path.exists(self.qpid_config_path) - getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. - self.qpid_ha_script=import_script(self.qpid_ha_path) - self._agent = None - self.client_credentials = client_credentials - - def __str__(self): return Broker.__str__(self) - - def qpid_ha(self, args): - cred = self.client_credentials - url = self.host_port() - if cred: - url =cred.add_user(url) - args = args + ["--sasl-mechanism", cred.mechanism] - self.qpid_ha_script.main_except(["", "-b", url]+args) - - def promote(self): self.qpid_ha(["promote"]) - def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url]) - def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]) - def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) - - def agent(self): - if not self._agent: - cred = self.client_credentials - if cred: - self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism) - else: - self._agent = QmfAgent(self.host_port()) - return self._agent - - def ha_status(self): - hb = self.agent().getHaBroker() - hb.update() - return hb.status - - def wait_status(self, status): - def try_get_status(): - # Ignore ConnectionError, the broker may not be up yet. - try: - self._status = self.ha_status() - return self._status == status; - except ConnectionError: return False - assert retry(try_get_status, timeout=20), "%s %r != %r"%(self, self._status, status) - - # FIXME aconway 2012-05-01: do direct python call to qpid-config code. - def qpid_config(self, args): - assert subprocess.call( - [self.qpid_config_path, "--broker", self.host_port()]+args) == 0 - - def config_replicate(self, from_broker, queue): - self.qpid_config(["add", "queue", "--start-replica", from_broker, queue]) - - def config_declare(self, queue, replication): - self.qpid_config(["add", "queue", queue, "--replicate", replication]) - - def connect_admin(self, **kwargs): - cred = self.client_credentials - if cred: - return Broker.connect( - self, client_properties={"qpid.ha-admin":1}, - username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism, - **kwargs) - else: - return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) - - def wait_backup(self, address): - """Wait for address to become valid on a backup broker.""" - bs = self.connect_admin().session() - try: wait_address(bs, address) - finally: bs.connection.close() - - def assert_browse(self, queue, expected, **kwargs): - """Verify queue contents by browsing.""" - bs = self.connect().session() - try: - wait_address(bs, queue) - assert_browse_retry(bs, queue, expected, **kwargs) - finally: bs.connection.close() - - def assert_browse_backup(self, queue, expected, **kwargs): - """Combines wait_backup and assert_browse_retry.""" - bs = self.connect_admin().session() - try: - wait_address(bs, queue) - assert_browse_retry(bs, queue, expected, **kwargs) - finally: bs.connection.close() - - def assert_connect_fail(self): - try: - self.connect() - self.test.fail("Expected ConnectionError") - except ConnectionError: pass - - def try_connect(self): - try: return self.connect() - except ConnectionError: return None - -class HaCluster(object): - _cluster_count = 0 - - def __init__(self, test, n, promote=True, **kwargs): - """Start a cluster of n brokers""" - self.test = test - self.kwargs = kwargs - self._brokers = [] - self.id = HaCluster._cluster_count - self.broker_id = 0 - HaCluster._cluster_count += 1 - for i in xrange(n): self.start(False) - self.update_urls() - self[0].promote() - - def next_name(self): - name="cluster%s-%s"%(self.id, self.broker_id) - self.broker_id += 1 - return name - - def start(self, update_urls=True, args=[]): - """Start a new broker in the cluster""" - b = HaBroker(self.test, name=self.next_name(), **self.kwargs) - self._brokers.append(b) - if update_urls: self.update_urls() - return b - - def update_urls(self): - self.url = ",".join([b.host_port() for b in self]) - if len(self) > 1: # No failover addresses on a 1 cluster. - for b in self: b.set_brokers_url(self.url) - - def connect(self, i): - """Connect with reconnect_urls""" - return self[i].connect(reconnect=True, reconnect_urls=self.url.split(",")) - - def kill(self, i, promote_next=True): - """Kill broker i, promote broker i+1""" - self[i].expect = EXPECT_EXIT_FAIL - self[i].kill() - if promote_next: self[(i+1) % len(self)].promote() - - def restart(self, i): - """Start a broker with the same port, name and data directory. It will get - a separate log file: foo.n.log""" - b = self._brokers[i] - self._brokers[i] = HaBroker( - self.test, name=b.name, port=b.port(), brokers_url=self.url, - **self.kwargs) - - def bounce(self, i, promote_next=True): - """Stop and restart a broker in a cluster.""" - self.kill(i, promote_next) - self.restart(i) - - # Behave like a list of brokers. - def __len__(self): return len(self._brokers) - def __getitem__(self,index): return self._brokers[index] - def __iter__(self): return self._brokers.__iter__() - -def wait_address(session, address): - """Wait for an address to become valid.""" - def check(): - try: - session.sender(address) - return True - except NotFound: return False - assert retry(check), "Timed out waiting for address %s"%(address) - -def valid_address(session, address): - """Test if an address is valid""" - try: - session.receiver(address) - return True - except NotFound: return False - -class ReplicationTests(BrokerTest): +class ReplicationTests(HaBrokerTest): """Correctness tests for HA replication.""" def test_replication(self): @@ -256,8 +53,9 @@ class ReplicationTests(BrokerTest): def queue(name, replicate): return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate) - def exchange(name, replicate, bindq): - return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq) + def exchange(name, replicate, bindq, key): + return "%s/%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'topic'},x-bindings:[{exchange:'%s',queue:'%s',key:'%s'}]}}"%(name, key, replicate, name, bindq, key) + def setup(p, prefix, primary): """Create config, send messages on the primary p""" s = p.sender(queue(prefix+"q1", "all")) @@ -267,16 +65,21 @@ class ReplicationTests(BrokerTest): p.acknowledge() p.sender(queue(prefix+"q2", "configuration")).send(Message("2")) p.sender(queue(prefix+"q3", "none")).send(Message("3")) - p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4")) - p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5")) + p.sender(exchange(prefix+"e1", "all", prefix+"q1", "key1")).send(Message("4")) + p.sender(exchange(prefix+"e2", "configuration", prefix+"q2", "key2")).send(Message("5")) # Test unbind p.sender(queue(prefix+"q4", "all")).send(Message("6")) - s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4")) + s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4", "key4")) s3.send(Message("7")) # Use old connection to unbind us = primary.connect_old().session(str(uuid4())) - us.exchange_unbind(exchange=prefix+"e4", binding_key="", queue=prefix+"q4") + us.exchange_unbind(exchange=prefix+"e4", binding_key="key4", queue=prefix+"q4") p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped + # Test replication of deletes + p.sender(queue(prefix+"dq", "all")) + p.sender(exchange(prefix+"de", "all", prefix+"dq", "")) + p.sender(prefix+"dq;{delete:always}").close() + p.sender(prefix+"de;{delete:always}").close() # Need a marker so we can wait till sync is done. p.sender(queue(prefix+"x", "configuration")) @@ -292,50 +95,61 @@ class ReplicationTests(BrokerTest): self.assert_browse_retry(b, prefix+"q2", []) # configuration only assert not valid_address(b, prefix+"q3") - b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all + + # Verify exchange with replicate=all + b.sender(prefix+"e1/key1").send(Message(prefix+"e1")) self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"]) - b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration + + # Verify exchange with replicate=configuration + b.sender(prefix+"e2/key2").send(Message(prefix+"e2")) self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"]) - b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind. + b.sender(prefix+"e4/key4").send(Message("drop2")) # Verify unbind. self.assert_browse_retry(b, prefix+"q4", ["6","7"]) - primary = HaBroker(self, name="primary") - primary.promote() - p = primary.connect().session() + # Verify deletes + assert not valid_address(b, prefix+"dq") + assert not valid_address(b, prefix+"de") - # Create config, send messages before starting the backup, to test catch-up replication. - setup(p, "1", primary) - backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) - # Create config, send messages after starting the backup, to test steady-state replication. - setup(p, "2", primary) - - # Verify the data on the backup - b = backup.connect_admin().session() - verify(b, "1", p) - verify(b, "2", p) - # Test a series of messages, enqueue all then dequeue all. - s = p.sender(queue("foo","all")) - wait_address(b, "foo") - msgs = [str(i) for i in range(10)] - for m in msgs: s.send(Message(m)) - self.assert_browse_retry(p, "foo", msgs) - self.assert_browse_retry(b, "foo", msgs) - r = p.receiver("foo") - for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content) - p.acknowledge() - self.assert_browse_retry(p, "foo", []) - self.assert_browse_retry(b, "foo", []) - - # Another series, this time verify each dequeue individually. - for m in msgs: s.send(Message(m)) - self.assert_browse_retry(p, "foo", msgs) - self.assert_browse_retry(b, "foo", msgs) - for i in range(len(msgs)): - self.assertEqual(msgs[i], r.fetch(timeout=0).content) + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + primary = HaBroker(self, name="primary") + primary.promote() + p = primary.connect().session() + + # Create config, send messages before starting the backup, to test catch-up replication. + setup(p, "1", primary) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) + # Create config, send messages after starting the backup, to test steady-state replication. + setup(p, "2", primary) + + # Verify the data on the backup + b = backup.connect_admin().session() + verify(b, "1", p) + verify(b, "2", p) + # Test a series of messages, enqueue all then dequeue all. + s = p.sender(queue("foo","all")) + wait_address(b, "foo") + msgs = [str(i) for i in range(10)] + for m in msgs: s.send(Message(m)) + self.assert_browse_retry(p, "foo", msgs) + self.assert_browse_retry(b, "foo", msgs) + r = p.receiver("foo") + for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content) p.acknowledge() - self.assert_browse_retry(p, "foo", msgs[i+1:]) - self.assert_browse_retry(b, "foo", msgs[i+1:]) + self.assert_browse_retry(p, "foo", []) + self.assert_browse_retry(b, "foo", []) + + # Another series, this time verify each dequeue individually. + for m in msgs: s.send(Message(m)) + self.assert_browse_retry(p, "foo", msgs) + self.assert_browse_retry(b, "foo", msgs) + for i in range(len(msgs)): + self.assertEqual(msgs[i], r.fetch(timeout=0).content) + p.acknowledge() + self.assert_browse_retry(p, "foo", msgs[i+1:]) + self.assert_browse_retry(b, "foo", msgs[i+1:]) + finally: l.restore() def test_sync(self): primary = HaBroker(self, name="primary") @@ -361,53 +175,59 @@ class ReplicationTests(BrokerTest): def test_send_receive(self): """Verify sequence numbers of messages sent by qpid-send""" - brokers = HaCluster(self, 3) - sender = self.popen( - ["qpid-send", - "--broker", brokers[0].host_port(), - "--address", "q;{create:always}", - "--messages=1000", - "--content-string=x" - ]) - receiver = self.popen( - ["qpid-receive", - "--broker", brokers[0].host_port(), - "--address", "q;{create:always}", - "--messages=990", - "--timeout=10" - ]) - self.assertEqual(sender.wait(), 0) - self.assertEqual(receiver.wait(), 0) - expect = [long(i) for i in range(991, 1001)] - sn = lambda m: m.properties["sn"] - brokers[1].assert_browse_backup("q", expect, transform=sn) - brokers[2].assert_browse_backup("q", expect, transform=sn) + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + brokers = HaCluster(self, 3) + sender = self.popen( + ["qpid-send", + "--broker", brokers[0].host_port(), + "--address", "q;{create:always}", + "--messages=1000", + "--content-string=x" + ]) + receiver = self.popen( + ["qpid-receive", + "--broker", brokers[0].host_port(), + "--address", "q;{create:always}", + "--messages=990", + "--timeout=10" + ]) + self.assertEqual(sender.wait(), 0) + self.assertEqual(receiver.wait(), 0) + expect = [long(i) for i in range(991, 1001)] + sn = lambda m: m.properties["sn"] + brokers[1].assert_browse_backup("q", expect, transform=sn) + brokers[2].assert_browse_backup("q", expect, transform=sn) + finally: l.restore() def test_failover_python(self): """Verify that backups rejects connections and that fail-over works in python client""" - primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) - primary.promote() - backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) - # Check that backup rejects normal connections + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. try: - backup.connect().session() - self.fail("Expected connection to backup to fail") - except ConnectionError: pass - # Check that admin connections are allowed to backup. - backup.connect_admin().close() - - # Test discovery: should connect to primary after reject by backup - c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) - s = c.session() - sender = s.sender("q;{create:always}") - backup.wait_backup("q") - sender.send("foo") - primary.kill() - assert retry(lambda: not is_running(primary.pid)) - backup.promote() - sender.send("bar") - self.assert_browse_retry(s, "q", ["foo", "bar"]) - c.close() + primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) + primary.promote() + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) + # Check that backup rejects normal connections + try: + backup.connect().session() + self.fail("Expected connection to backup to fail") + except ConnectionError: pass + # Check that admin connections are allowed to backup. + backup.connect_admin().close() + + # Test discovery: should connect to primary after reject by backup + c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) + s = c.session() + sender = s.sender("q;{create:always}") + backup.wait_backup("q") + sender.send("foo") + primary.kill() + assert retry(lambda: not is_running(primary.pid)) + backup.promote() + sender.send("bar") + self.assert_browse_retry(s, "q", ["foo", "bar"]) + c.close() + finally: l.restore() def test_failover_cpp(self): """Verify that failover works in the C++ client.""" @@ -456,51 +276,61 @@ class ReplicationTests(BrokerTest): def test_standalone_queue_replica(self): """Test replication of individual queues outside of cluster mode""" - getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. - primary = HaBroker(self, name="primary", ha_cluster=False) - pc = primary.connect() - ps = pc.session().sender("q;{create:always}") - pr = pc.session().receiver("q;{create:always}") - backup = HaBroker(self, name="backup", ha_cluster=False) - br = backup.connect().session().receiver("q;{create:always}") - - # Set up replication with qpid-ha - backup.replicate(primary.host_port(), "q") - ps.send("a") - backup.assert_browse_backup("q", ["a"]) - ps.send("b") - backup.assert_browse_backup("q", ["a", "b"]) - self.assertEqual("a", pr.fetch().content) - pr.session.acknowledge() - backup.assert_browse_backup("q", ["b"]) - - # Set up replication with qpid-config - ps2 = pc.session().sender("q2;{create:always}") - backup.config_replicate(primary.host_port(), "q2"); - ps2.send("x") - backup.assert_browse_backup("q2", ["x"]) - + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + primary = HaBroker(self, name="primary", ha_cluster=False, + args=["--ha-queue-replication=yes"]); + pc = primary.connect() + ps = pc.session().sender("q;{create:always}") + pr = pc.session().receiver("q;{create:always}") + backup = HaBroker(self, name="backup", ha_cluster=False, + args=["--ha-queue-replication=yes"]) + br = backup.connect().session().receiver("q;{create:always}") + + # Set up replication with qpid-ha + backup.replicate(primary.host_port(), "q") + ps.send("a") + backup.assert_browse_backup("q", ["a"]) + ps.send("b") + backup.assert_browse_backup("q", ["a", "b"]) + self.assertEqual("a", pr.fetch().content) + pr.session.acknowledge() + backup.assert_browse_backup("q", ["b"]) + + # Set up replication with qpid-config + ps2 = pc.session().sender("q2;{create:always}") + backup.config_replicate(primary.host_port(), "q2"); + ps2.send("x") + backup.assert_browse_backup("q2", ["x"]) + finally: l.restore() def test_queue_replica_failover(self): - """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over.""" - cluster = HaCluster(self, 2) - primary = cluster[0] - pc = cluster.connect(0) - ps = pc.session().sender("q;{create:always}") - pr = pc.session().receiver("q;{create:always}") - backup = HaBroker(self, name="backup", ha_cluster=False) - br = backup.connect().session().receiver("q;{create:always}") - backup.replicate(cluster.url, "q") - ps.send("a") - backup.assert_browse_backup("q", ["a"]) - cluster.bounce(0) - backup.assert_browse_backup("q", ["a"]) - ps.send("b") - backup.assert_browse_backup("q", ["a", "b"]) - cluster.bounce(1) - self.assertEqual("a", pr.fetch().content) - pr.session.acknowledge() - backup.assert_browse_backup("q", ["b"]) + """Test individual queue replication from a cluster to a standalone + backup broker, verify it fails over.""" + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + cluster = HaCluster(self, 2) + primary = cluster[0] + pc = cluster.connect(0) + ps = pc.session().sender("q;{create:always}") + pr = pc.session().receiver("q;{create:always}") + backup = HaBroker(self, name="backup", ha_cluster=False, + args=["--ha-queue-replication=yes"]) + br = backup.connect().session().receiver("q;{create:always}") + backup.replicate(cluster.url, "q") + ps.send("a") + backup.assert_browse_backup("q", ["a"]) + cluster.bounce(0) + backup.assert_browse_backup("q", ["a"]) + ps.send("b") + backup.assert_browse_backup("q", ["a", "b"]) + cluster.bounce(1) + self.assertEqual("a", pr.fetch().content) + pr.session.acknowledge() + backup.assert_browse_backup("q", ["b"]) + pc.close() + br.close() + finally: l.restore() def test_lvq(self): """Verify that we replicate to an LVQ correctly""" @@ -634,8 +464,10 @@ class ReplicationTests(BrokerTest): def test_replicate_default(self): """Make sure we don't replicate if ha-replicate is unspecified or none""" cluster1 = HaCluster(self, 2, ha_replicate=None) + cluster1[1].wait_status("ready") c1 = cluster1[0].connect().session().sender("q;{create:always}") cluster2 = HaCluster(self, 2, ha_replicate="none") + cluster2[1].wait_status("ready") cluster2[0].connect().session().sender("q;{create:always}") time.sleep(.1) # Give replication a chance. try: @@ -647,6 +479,23 @@ class ReplicationTests(BrokerTest): self.fail("Excpected no-such-queue exception") except NotFound: pass + def test_replicate_binding(self): + """Verify that binding replication can be disabled""" + primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) + primary.promote() + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) + ps = primary.connect().session() + ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}") + ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}") + backup.wait_backup("q") + + primary.kill() + assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die + backup.promote() + bs = backup.connect_admin().session() + bs.sender("ex").send(Message("msg")) + self.assert_browse_retry(bs, "q", []) + def test_invalid_replication(self): """Verify that we reject an attempt to declare a queue with invalid replication value.""" cluster = HaCluster(self, 1, ha_replicate="all") @@ -672,20 +521,26 @@ class ReplicationTests(BrokerTest): def test_auto_delete_exclusive(self): """Verify that we ignore auto-delete, exclusive, non-auto-delete-timeout queues""" - cluster = HaCluster(self,2) - s = cluster[0].connect().session() - s.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}") - s.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}") - s.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}") - s.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}") - s.receiver("q;{create:always}") + cluster = HaCluster(self, 2) + s0 = cluster[0].connect().session() + s0.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}") + s0.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}") + ad = s0.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}") + s0.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}") + s0.receiver("q;{create:always}") - s = cluster[1].connect_admin().session() + s1 = cluster[1].connect_admin().session() cluster[1].wait_backup("q") - assert not valid_address(s, "exad") - assert valid_address(s, "ex") - assert valid_address(s, "ad") - assert valid_address(s, "time") + assert not valid_address(s1, "exad") + assert valid_address(s1, "ex") + assert valid_address(s1, "ad") + assert valid_address(s1, "time") + + # Verify that auto-delete queues are not kept alive by + # replicating subscriptions + ad.close() + s0.sync() + assert not valid_address(s0, "ad") def test_broker_info(self): """Check that broker information is correctly published via management""" @@ -763,18 +618,18 @@ acl deny all all s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}") # altq queue bound to altex, collect re-routed messages. s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}") - # 0ex exchange with alternate-exchange altex and no queues bound - s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}") + # ex exchange with alternate-exchange altex and no queues bound + s.sender("ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}") # create queue q with alternate-exchange altex s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}") # create a bunch of exchanges to ensure we don't clean up prematurely if the # response comes in multiple fragments. - for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i) + for i in xrange(200): s.sender("ex.%s;{create:always,node:{type:topic}}"%i) def verify(broker): s = broker.connect().session() # Verify unmatched message goes to ex's alternate. - s.sender("0ex").send("foo") + s.sender("ex").send("foo") altq = s.receiver("altq") self.assertEqual("foo", altq.fetch(timeout=0).content) s.acknowledge() @@ -786,20 +641,265 @@ acl deny all all self.assertEqual("bar", altq.fetch(timeout=0).content) s.acknowledge() + def ss(n): return cluster[n].connect().session() + # Sanity check: alternate exchanges on original broker verify(cluster[0]) + # Altex is in use as an alternate exchange. + self.assertRaises(SessionError, + lambda:ss(0).sender("altex;{delete:always}").close()) # Check backup that was connected during setup. - cluster[1].wait_backup("0ex") + cluster[1].wait_status("ready") + cluster[1].wait_backup("ex") cluster[1].wait_backup("q") cluster.bounce(0) verify(cluster[1]) + # Check a newly started backup. cluster.start() - cluster[2].wait_backup("0ex") + cluster[2].wait_status("ready") + cluster[2].wait_backup("ex") cluster[2].wait_backup("q") cluster.bounce(1) verify(cluster[2]) + # Check that alt-exchange in-use count is replicated + s = cluster[2].connect().session(); + + self.assertRaises(SessionError, + lambda:ss(2).sender("altex;{delete:always}").close()) + s.sender("q;{delete:always}").close() + self.assertRaises(SessionError, + lambda:ss(2).sender("altex;{delete:always}").close()) + s.sender("ex;{delete:always}").close() + s.sender("altex;{delete:always}").close() + + def test_priority_reroute(self): + """Regression test for QPID-4262, rerouting messages from a priority queue + to itself causes a crash""" + cluster = HaCluster(self, 2) + primary = cluster[0] + session = primary.connect().session() + s = session.sender("pq; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}},x-bindings:[{exchange:'amq.fanout',queue:pq}]}}") + for m in xrange(100): s.send(Message(str(m), priority=m%10)) + pq = QmfAgent(primary.host_port()).getQueue("pq") + pq.reroute(request=0, useAltExchange=False, exchange="amq.fanout") + # Verify that consuming is in priority order + expect = [str(10*i+p) for p in xrange(9,-1,-1) for i in xrange(0,10) ] + actual = [m.content for m in primary.get_messages("pq", 100)] + self.assertEqual(expect, actual) + + def test_delete_missing_response(self): + """Check that a backup correctly deletes leftover queues and exchanges that are + missing from the initial reponse set.""" + # This test is a bit contrived, we set up the situation on backup brokers + # and then promote one. + cluster = HaCluster(self, 2, promote=False) + + # cluster[0] Will be the primary + s = cluster[0].connect_admin().session() + s.sender("q1;{create:always}") + s.sender("e1;{create:always, node:{type:topic}}") + + # cluster[1] will be the backup, has extra queues/exchanges + xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}" + node = "node:{%s}"%(xdecl) + s = cluster[1].connect_admin().session() + s.sender("q1;{create:always, %s}"%(node)) + s.sender("q2;{create:always, %s}"%(node)) + s.sender("e1;{create:always, node:{type:topic, %s}}"%(xdecl)) + s.sender("e2;{create:always, node:{type:topic, %s}}"%(xdecl)) + for a in ["q1", "q2", "e1", "e2"]: cluster[1].wait_backup(a) + + cluster[0].promote() + # Verify the backup deletes the surplus queue and exchange + cluster[1].wait_status("ready") + s = cluster[1].connect_admin().session() + self.assertRaises(NotFound, s.receiver, ("q2")); + self.assertRaises(NotFound, s.receiver, ("e2")); + + + def test_delete_qpid_4285(self): + """Regression test for QPID-4285: on deleting a queue it gets stuck in a + partially deleted state and causes replication errors.""" + cluster = HaCluster(self,2) + s = cluster[0].connect().session() + s.receiver("q;{create:always}") + cluster[1].wait_backup("q") + cluster.kill(0) # Make the backup take over. + s = cluster[1].connect().session() + s.receiver("q;{delete:always}").close() # Delete q on new primary + try: + s.receiver("q") + self.fail("Expected NotFound exception") # Should not be avaliable + except NotFound: pass + assert not cluster[1].agent().getQueue("q") # Should not be in QMF + + def alt_setup(self, session, suffix): + # Create exchange to use as alternate and a queue bound to it. + # altex exchange: acts as alternate exchange + session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix)) + # altq queue bound to altex, collect re-routed messages. + session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix)) + + def test_auto_delete_close(self): + """Verify auto-delete queues are deleted on backup if auto-deleted + on primary""" + cluster=HaCluster(self, 2) + p = cluster[0].connect().session() + self.alt_setup(p, "1") + r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1) + s = p.sender("adq1") + for m in ["aa","bb","cc"]: s.send(m) + p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}") + cluster[1].wait_queue("adq1") + cluster[1].wait_queue("adq2") + r.close() # trigger auto-delete of adq1 + cluster[1].wait_no_queue("adq1") + cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"]) + cluster[1].wait_queue("adq2") + + def test_auto_delete_crash(self): + """Verify auto-delete queues are deleted on backup if the primary crashes""" + cluster=HaCluster(self, 2) + p = cluster[0].connect().session() + self.alt_setup(p,"1") + + # adq1 is subscribed so will be auto-deleted. + r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1) + s = p.sender("adq1") + for m in ["aa","bb","cc"]: s.send(m) + # adq2 is subscribed after cluster[2] starts. + p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}") + # adq3 is never subscribed. + p.sender("adq3;{create:always,node:{x-declare:{auto-delete:True}}}") + + cluster.start() + cluster[2].wait_status("ready") + + p.receiver("adq2") # Subscribed after cluster[2] joined + + for q in ["adq1","adq2","adq3","altq1"]: cluster[1].wait_queue(q) + for q in ["adq1","adq2","adq3","altq1"]: cluster[2].wait_queue(q) + cluster[0].kill() + + cluster[1].wait_no_queue("adq1") + cluster[1].wait_no_queue("adq2") + cluster[1].wait_queue("adq3") + + cluster[2].wait_no_queue("adq1") + cluster[2].wait_no_queue("adq2") + cluster[2].wait_queue("adq3") + + cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"]) + cluster[2].assert_browse_backup("altq1", ["aa","bb","cc"]) + + def test_auto_delete_timeout(self): + cluster = HaCluster(self, 2) + # Test timeout + r1 = cluster[0].connect().session().receiver("q1;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}") + # Test special case of timeout = 0 + r0 = cluster[0].connect().session().receiver("q0;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':0}}}}") + cluster[1].wait_queue("q0") + cluster[1].wait_queue("q1") + cluster[0].kill() + cluster[1].wait_queue("q1") # Not timed out yet + cluster[1].wait_no_queue("q1", timeout=5) # Wait for timeout + cluster[1].wait_no_queue("q0", timeout=5) # Wait for timeout + + def test_alt_exchange_dup(self): + """QPID-4349: if a queue has an alterante exchange and is deleted the + messages appear twice on the alternate, they are rerouted once by the + primary and again by the backup.""" + cluster = HaCluster(self,2) + + # Set up q with alternate exchange altex bound to altq. + s = cluster[0].connect().session() + s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}") + s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}") + snd = s.sender("q;{create:always,node:{x-declare:{alternate-exchange:'altex'}}}") + messages = [ str(n) for n in xrange(10) ] + for m in messages: snd.send(m) + cluster[1].assert_browse_backup("q", messages) + s.sender("q;{delete:always}").close() + cluster[1].assert_browse_backup("altq", messages) + + def test_expired(self): + """Regression test for QPID-4379: HA does not properly handle expired messages""" + # Race between messages expiring and HA replicating consumer. + cluster = HaCluster(self, 2) + s = cluster[0].connect().session().sender("q;{create:always}", capacity=2) + def send_ttl_messages(): + for i in xrange(100): s.send(Message(str(i), ttl=0.001), timeout=1) + send_ttl_messages() + cluster.start() + send_ttl_messages() + + def test_stale_response(self): + """Check for race condition where a stale response is processed after an + event for the same queue/exchange """ + cluster = HaCluster(self, 2) + s = cluster[0].connect().session() + s.sender("keep;{create:always}") # Leave this queue in place. + for i in xrange(1000): + s.sender("deleteme%s;{create:always,delete:always}"%(i)).close() + # It is possible for the backup to attempt to subscribe after the queue + # is deleted. This is not an error, but is logged as an error on the primary. + # The backup does not log this as an error so we only check the backup log for errors. + self.assert_log_no_errors(cluster[1]) + + def test_missed_recreate(self): + """If a queue or exchange is destroyed and one with the same name re-created + while a backup is disconnected, the backup should also delete/recreate + the object when it re-connects""" + cluster = HaCluster(self, 3) + sn = cluster[0].connect().session() + # Create a queue with messages + s = sn.sender("qq;{create:always}") + msgs = [str(i) for i in xrange(3)] + for m in msgs: s.send(m) + cluster[1].assert_browse_backup("qq", msgs) + cluster[2].assert_browse_backup("qq", msgs) + # Set up an exchange with a binding. + sn.sender("xx;{create:always,node:{type:topic}}") + sn.sender("xxq;{create:always,node:{x-bindings:[{exchange:'xx',queue:'xxq',key:xxq}]}}") + cluster[1].wait_address("xx") + self.assertEqual(cluster[1].agent().getExchange("xx").values["bindingCount"], 1) + cluster[2].wait_address("xx") + self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 1) + + # Simulate the race by re-creating the objects before promoting the new primary + cluster.kill(0, False) + xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}" + node = "node:{%s}"%(xdecl) + sn = cluster[1].connect_admin().session() + sn.sender("qq;{delete:always}").close() + s = sn.sender("qq;{create:always, %s}"%(node)) + s.send("foo") + sn.sender("xx;{delete:always}").close() + sn.sender("xx;{create:always,node:{type:topic,%s}}"%(xdecl)) + cluster[1].promote() + cluster[1].wait_status("active") + # Verify we are not still using the old objects on cluster[2] + cluster[2].assert_browse_backup("qq", ["foo"]) + cluster[2].wait_address("xx") + self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 0) + + def test_redeclare_exchange(self): + """Ensure that re-declaring an exchange is an HA no-op""" + cluster = HaCluster(self, 2) + ps = cluster[0].connect().session() + ps.sender("ex1;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}") + ps.sender("ex2;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout', alternate-exchange:'ex1'}}}") + cluster[1].wait_backup("ex1") + cluster[1].wait_backup("ex2") + + # Use old API to re-declare the exchange + old_conn = cluster[0].connect_old() + old_sess = old_conn.session(str(qpid.datatypes.uuid4())) + old_sess.exchange_declare(exchange='ex1', type='fanout') + cluster[1].wait_backup("ex1") + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit @@ -812,7 +912,7 @@ def fairshare(msgs, limit, levels): msgs = postponed count = 0 last_priority = None - postponed = [] + postponed = [ ] msg = msgs.pop(0) if last_priority and priority_level(msg.priority, levels) == last_priority: count += 1 @@ -834,7 +934,7 @@ def priority_level(value, levels): offset = 5-math.ceil(levels/2.0) return min(max(value - offset, 0), levels-1) -class LongTests(BrokerTest): +class LongTests(HaBrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): @@ -860,7 +960,7 @@ class LongTests(BrokerTest): """Wait for receiver r to pass n""" def check(): r.check() # Verify no exceptions - return r.received > n + return r.received > n + 100 assert retry(check), "Stalled %s at %s"%(r.queue, n) for r in receivers: wait_passed(r, 0) @@ -868,87 +968,176 @@ class LongTests(BrokerTest): # Kill and restart brokers in a cycle: endtime = time.time() + self.duration() i = 0 + primary = 0 try: while time.time() < endtime or i < 3: # At least 3 iterations + # Precondition: All 3 brokers running, + # primary = index of promoted primary + # one or two backups are running, for s in senders: s.sender.assert_running() for r in receivers: r.receiver.assert_running() - checkpoint = [ r.received for r in receivers ] - # Don't kill primary till it is active and the next - # backup is ready, otherwise we can lose messages. - brokers[i%3].wait_status("active") - brokers[(i+1)%3].wait_status("ready") - brokers.bounce(i%3) + checkpoint = [ r.received+100 for r in receivers ] + dead = None + victim = random.randint(0,2) + if victim == primary: + # Don't kill primary till it is active and the next + # backup is ready, otherwise we can lose messages. + brokers[victim].wait_status("active") + next = (victim+1)%3 + brokers[next].wait_status("ready") + brokers.bounce(victim) # Next one is promoted + primary = next + else: + brokers.kill(victim, False) + dead = victim + + # At this point the primary is running with 1 or 2 backups + # Make sure we are not stalled + map(wait_passed, receivers, checkpoint) + # Run another checkpoint to ensure things work in this configuration + checkpoint = [ r.received+100 for r in receivers ] + map(wait_passed, receivers, checkpoint) + + if dead is not None: + brokers.restart(dead) # Restart backup + brokers[dead].ready() + dead = None i += 1 - map(wait_passed, receivers, checkpoint) # Wait for all receivers except: traceback.print_exc() raise finally: for s in senders: s.stop() for r in receivers: r.stop() - dead = [] + unexpected_dead = [] for i in xrange(3): - if not brokers[i].is_running(): dead.append(i) - brokers.kill(i, False) - if dead: raise Exception("Brokers not running: %s"%dead) + if not brokers[i].is_running() and i != dead: + unexpected_dead.append(i) + if brokers[i].is_running(): brokers.kill(i, False) + if unexpected_dead: + raise Exception("Brokers not running: %s"%unexpected_dead) + + def test_qmf_order(self): + """QPID 4402: HA QMF events can be out of order. + This test mimics the test described in the JIRA. Two threads repeatedly + declare the same auto-delete queue and close their connection. + """ + broker = Broker(self) + class Receiver(Thread): + def __init__(self, qname): + Thread.__init__(self) + self.qname = qname + self.stopped = False + + def run(self): + while not self.stopped: + self.connection = broker.connect() + try: + self.connection.session().receiver( + self.qname+";{create:always,node:{x-declare:{auto-delete:True}}}") + except NotFound: pass # Can occur occasionally, not an error. + try: self.connection.close() + except: pass + + class QmfObject(object): + """Track existance of an object and validate QMF events""" + def __init__(self, type_name, name_field, name): + self.type_name, self.name_field, self.name = type_name, name_field, name + self.exists = False + + def qmf_event(self, event): + content = event.content[0] + event_type = content['_schema_id']['_class_name'] + values = content['_values'] + if event_type == self.type_name+"Declare" and values[self.name_field] == self.name: + disp = values['disp'] + log.debug("Event %s: disp=%s exists=%s"%( + event_type, values['disp'], self.exists)) + if self.exists: assert values['disp'] == 'existing' + else: assert values['disp'] == 'created' + self.exists = True + elif event_type == self.type_name+"Delete" and values[self.name_field] == self.name: + log.debug("Event %s: exists=%s"%(event_type, self.exists)) + assert self.exists + self.exists = False + + # Verify order of QMF events. + helper = EventHelper() + r = broker.connect().session().receiver(helper.eventAddress()) + threads = [Receiver("qq"), Receiver("qq")] + for t in threads: t.start() + queue = QmfObject("queue", "qName", "qq") + finish = time.time() + self.duration() + try: + while time.time() < finish: + queue.qmf_event(r.fetch()) + finally: + for t in threads: t.stopped = True; t.join() -class RecoveryTests(BrokerTest): +class RecoveryTests(HaBrokerTest): """Tests for recovery after a failure.""" def test_queue_hold(self): """Verify that the broker holds queues without sufficient backup, i.e. does not complete messages sent to those queues.""" - # We don't want backups to time out for this test, set long timeout. - cluster = HaCluster(self, 4, args=["--ha-backup-timeout=100000"]); - # Wait for the primary to be ready - cluster[0].wait_status("active") - # Create a queue before the failure. - s1 = cluster.connect(0).session().sender("q1;{create:always}") - for b in cluster: b.wait_backup("q1") - for i in xrange(100): s1.send(str(i)) - # Kill primary and 2 backups - for i in [0,1,2]: cluster.kill(i, False) - cluster[3].promote() # New primary, backups will be 1 and 2 - cluster[3].wait_status("recovering") - - def assertSyncTimeout(s): - try: - s.sync(timeout=.01) - self.fail("Expected Timeout exception") - except Timeout: pass - - # Create a queue after the failure - s2 = cluster.connect(3).session().sender("q2;{create:always}") - - # Verify that messages sent are not completed - for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False) - assertSyncTimeout(s1) - self.assertEqual(s1.unsettled(), 100) - assertSyncTimeout(s2) - self.assertEqual(s2.unsettled(), 100) - - # Verify we can receive even if sending is on hold: - cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)]) - - # Restart backups, verify queues are released only when both backups are up - cluster.restart(1) - assertSyncTimeout(s1) - self.assertEqual(s1.unsettled(), 100) - assertSyncTimeout(s2) - self.assertEqual(s2.unsettled(), 100) - self.assertEqual(cluster[3].ha_status(), "recovering") - cluster.restart(2) - - # Verify everything is up to date and active - def settled(sender): sender.sync(); return sender.unsettled() == 0; - assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled()) - assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled()) - cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)]) - cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)]) - cluster[3].wait_status("active"), - s1.session.connection.close() - s2.session.connection.close() + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + # We don't want backups to time out for this test, set long timeout. + cluster = HaCluster(self, 4, args=["--ha-backup-timeout=120"]); + # Wait for the primary to be ready + cluster[0].wait_status("active") + for b in cluster[1:4]: b.wait_status("ready") + # Create a queue before the failure. + s1 = cluster.connect(0).session().sender("q1;{create:always}") + for b in cluster: b.wait_backup("q1") + for i in xrange(100): s1.send(str(i)) + + # Kill primary and 2 backups + cluster[3].wait_status("ready") + for i in [0,1,2]: cluster.kill(i, False) + cluster[3].promote() # New primary, backups will be 1 and 2 + cluster[3].wait_status("recovering") + + def assertSyncTimeout(s): + try: + s.sync(timeout=.01) + self.fail("Expected Timeout exception") + except Timeout: pass + + # Create a queue after the failure + s2 = cluster.connect(3).session().sender("q2;{create:always}") + + # Verify that messages sent are not completed + for i in xrange(100,200): + s1.send(str(i), sync=False); + s2.send(str(i), sync=False) + assertSyncTimeout(s1) + self.assertEqual(s1.unsettled(), 100) + assertSyncTimeout(s2) + self.assertEqual(s2.unsettled(), 100) + + # Verify we can receive even if sending is on hold: + cluster[3].assert_browse("q1", [str(i) for i in range(200)]) + + # Restart backups, verify queues are released only when both backups are up + cluster.restart(1) + assertSyncTimeout(s1) + self.assertEqual(s1.unsettled(), 100) + assertSyncTimeout(s2) + self.assertEqual(s2.unsettled(), 100) + cluster.restart(2) + + # Verify everything is up to date and active + def settled(sender): sender.sync(timeout=1); return sender.unsettled() == 0; + assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled()) + assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled()) + cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)]) + cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)]) + cluster[3].wait_status("active"), + s1.session.connection.close() + s2.session.connection.close() + finally: l.restore() def test_expected_backup_timeout(self): """Verify that we time-out expected backups and release held queues @@ -972,6 +1161,52 @@ class RecoveryTests(BrokerTest): s.sync(timeout=1) # And released after the timeout. self.assertEqual(cluster[2].ha_status(), "active") + def test_join_ready_cluster(self): + """If we join a cluster where the primary is dead, the new primary is + not yet promoted and there are ready backups then we should refuse + promotion so that one of the ready backups can be chosen.""" + # FIXME aconway 2012-10-05: smaller timeout + cluster = HaCluster(self, 2, args=["--link-heartbeat-interval", 1]) + cluster[0].wait_status("active") + cluster[1].wait_status("ready") + cluster.bounce(0, promote_next=False) + self.assertRaises(Exception, cluster[0].promote) + os.kill(cluster[1].pid, signal.SIGSTOP) # Test for timeout if unresponsive. + cluster.bounce(0, promote_next=False) + cluster[0].promote() + + +class ConfigurationTests(HaBrokerTest): + """Tests for configuration settings.""" + + def test_client_broker_url(self): + """Check that setting of broker and public URLs obeys correct defaulting + and precedence""" + + def check(broker, brokers, public): + qmf = broker.qmf() + self.assertEqual(brokers, qmf.brokersUrl) + self.assertEqual(public, qmf.publicUrl) + + def start(brokers, public, known=None): + args=[] + if brokers: args.append("--ha-brokers-url="+brokers) + if public: args.append("--ha-public-url="+public) + if known: args.append("--known-hosts-url="+known) + return HaBroker(self, args=args) + + # Both set explictily, no defaulting + b = start("foo:123", "bar:456") + check(b, "amqp:tcp:foo:123", "amqp:tcp:bar:456") + b.set_brokers_url("foo:999") + check(b, "amqp:tcp:foo:999", "amqp:tcp:bar:456") + b.set_public_url("bar:999") + check(b, "amqp:tcp:foo:999", "amqp:tcp:bar:999") + + # Allow "none" to mean "not set" + b = start("none", "none") + check(b, "", "") + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) qpid_ha = os.getenv("QPID_HA_EXEC") @@ -979,5 +1214,5 @@ if __name__ == "__main__": os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) else: - print "Skipping ha_tests, qpid_ha not available" + print "Skipping ha_tests, %s not available"%(qpid_ha) |