From bf69fd2f69325dd660454e6b6c8399c51cacea2c Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 12 Jun 2012 21:19:48 +0000 Subject: QPID-3603: Separate QueueGuard from ReplicatingSubscription. QueueGuard: implements QueueObserver to delay completion of new messages. ReplicatingSubscription: Implements subscription, sends messages & events to backup. These were previously combined as one. QueueGuard is now separated out so that it can be created before the ReplicatingSubscription, in anticipation of an expected backup connecting. This is needed for 2 reasons: - new queues must be guarded until they are backuped up. - after a failover, all queues must be guarded until backups are ready. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1349538 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/brokertest.py | 2 +- qpid/cpp/src/tests/ha_tests.py | 102 +++++++++++++++++++++++++++------------ 2 files changed, 71 insertions(+), 33 deletions(-) (limited to 'qpid/cpp/src/tests') diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 1b93504b64..c32b7f2c96 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -248,7 +248,7 @@ class Broker(Popen): self.log = "%s.log" % self.name i = 1 while (os.path.exists(self.log)): - self.log = "%s-%d.log" % (self.name, i) + self.log = "%s.%d.log" % (self.name, i) i += 1 def get_log(self): diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 6e270851f0..86679611c4 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -24,7 +24,7 @@ from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Co from qpid.datatypes import uuid4 from brokertest import * from threading import Thread, Lock, Condition -from logging import getLogger, WARN, ERROR, DEBUG +from logging import getLogger, WARN, ERROR, DEBUG, INFO from qpidtoollibs import BrokerAgent from uuid import UUID @@ -75,7 +75,10 @@ class HaBroker(Broker): if not self._agent: self._agent = QmfAgent(self.host_port()) return self._agent - def ha_status(self): self.agent().getHaBroker().status + def ha_status(self): return self.agent().getHaBroker().status + + def wait_status(self, status): + assert retry(lambda: self.ha_status() == status), "%r != %r"%(self.ha_status(), status) # FIXME aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): @@ -97,6 +100,14 @@ class HaBroker(Broker): try: wait_address(bs, address) finally: bs.connection.close() + def assert_browse(self, queue, expected, **kwargs): + """Verify queue contents by browsing.""" + bs = self.connect().session() + try: + wait_address(bs, queue) + assert_browse_retry(bs, queue, expected, **kwargs) + finally: bs.connection.close() + def assert_browse_backup(self, queue, expected, **kwargs): """Combines wait_backup and assert_browse_retry.""" bs = self.connect_admin().session() @@ -157,9 +168,11 @@ class HaCluster(object): if promote_next: self[(i+1) % len(self)].promote() def restart(self, i): + """Start a broker with the same name and data directory. It will get + a separate log file: foo.n.log""" b = self._brokers[i] self._brokers[i] = HaBroker( - self.test, name=self.next_name(), port=b.port(), brokers_url=self.url, + self.test, name=b.name, port=b.port(), brokers_url=self.url, **self.kwargs) def bounce(self, i, promote_next=True): @@ -347,7 +360,8 @@ class ReplicationTests(BrokerTest): primary.kill() assert retry(lambda: not is_running(primary.pid)) backup.promote() - self.assert_browse_retry(s, "q", ["foo"]) + sender.send("bar") + self.assert_browse_retry(s, "q", ["foo", "bar"]) c.close() def test_failover_cpp(self): @@ -630,27 +644,6 @@ class ReplicationTests(BrokerTest): assert valid_address(s, "ad") assert valid_address(s, "time") - def test_recovering(self): - """Verify that the primary broker does not go active until expected - backups have connected""" - cluster = HaCluster(self, 3, args=["--ha-expected-backups=2"]) - c = cluster[0].connect() - for i in xrange(10): - s = c.session().sender("q%s;{create:always}"%i) - for j in xrange(100): s.send(str(j)) - cluster.kill(0) # Fail over to 1 - cluster[1].assert_connect_fail() # Waiting for backups, won't accept clients. - cluster.restart(0) - c = retry(cluster[1].try_connect) - self.assertTrue(c) - cluster[1].assert_browse_backup("q0", [str(i) for i in xrange(100)]); - - # Verify in logs that all queue catch-up happened before the transition to active. - log = open(cluster[1].log).read() - i = log.find("Status change: recovering -> active") - self.failIf(i < 0) - self.assertEqual(log.find("caught up", i), -1) - def test_broker_info(self): """Check that broker information is correctly published via management""" cluster = HaCluster(self, 3) @@ -667,7 +660,7 @@ class ReplicationTests(BrokerTest): # Check that all brokers have the same membership as the cluster for broker in cluster: qmf = broker.agent().getHaBroker() - assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf)) + assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker) # Add a new broker, check it is updated everywhere b = cluster.start() cluster_ports.append(b.port()) @@ -720,12 +713,10 @@ class LongTests(BrokerTest): def test_failover_send_receive(self): """Test failover with continuous send-receive""" - # Start a cluster, all members will be killed during the test. - # FIXME aconway 2012-05-01: try expected-backups=1, requires catchup-ready fixed. - brokers = HaCluster(self, 3, args=["--ha-expected-backups=2"]) + brokers = HaCluster(self, 3) # Start sender and receiver threads - sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False) + sender = NumberedSender(brokers[0], max_depth=1024, failover_updates=False) receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False) receiver.start() sender.start() @@ -744,14 +735,14 @@ class LongTests(BrokerTest): # otherwise we can lose messages. When we implement non-promotion # of catchup brokers we can make this stronger: wait only for # there to be at least one ready backup. - assert retry(brokers[i%3].try_connect, 1) + brokers[i%3].wait_status("active") brokers.bounce(i%3) i += 1 def enough(): # Verify we're still running receiver.check() # Verify no exceptions return receiver.received > n + 100 # FIXME aconway 2012-05-17: client reconnect sometimes takes > 1 sec. - assert retry(enough, 10) + assert retry(enough, 3), "Stalled: %s < %s+100"%(receiver.received, n) except: traceback.print_exc() raise @@ -764,6 +755,53 @@ class LongTests(BrokerTest): brokers.kill(i, False) if dead: raise Exception("Brokers not running: %s"%dead) + +class RecoveryTests(BrokerTest): + """Tests for recovery after a failure.""" + + def test_queue_hold(self): + """Verify that the broker holds queues without sufficient backup, + i.e. does not complete messages sent to those queues.""" + + cluster = HaCluster(self, 4); + # Wait for the primary to be ready + cluster[0].wait_status("active") + + # Create a queue before the failure. + s1 = cluster.connect(0).session().sender("q1;{create:always}") + for b in cluster: b.wait_backup("q1") + for i in xrange(100): s1.send(str(i)) + + # Kill primary and 2 backups + for i in [0,1,2]: cluster.kill(i, False) + cluster[3].promote() # New primary, backups will be 1 and 2 + cluster[3].wait_status("recovering") + + # Create a queue after the failure + s2 = cluster.connect(3).session().sender("q2;{create:always}") + + # Verify that messages sent are not completed + for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False) + self.assertEqual(s1.unsettled(), 100) + self.assertEqual(s2.unsettled(), 100) + + # Verify we can receive even if sending is on hold: + cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)]) + + # Restart backups, verify queues are released only when both backups are up + cluster.restart(1) + self.assertEqual(s1.unsettled(), 100) + self.assertEqual(s2.unsettled(), 100) + self.assertEqual(cluster[3].ha_status(), "recovering") + cluster.restart(2) + + def settled(sender): sender.sync(); return sender.unsettled() == 0; + assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled()) + assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled()) + cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)]) + cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)]) + cluster[3].wait_status("active"), + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) qpid_ha = os.getenv("QPID_HA_EXEC") -- cgit v1.2.1