diff options
Diffstat (limited to 'qpid/cpp/src/tests')
| -rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 2 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 102 |
2 files changed, 71 insertions, 33 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 1b93504b64..c32b7f2c96 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -248,7 +248,7 @@ class Broker(Popen): self.log = "%s.log" % self.name i = 1 while (os.path.exists(self.log)): - self.log = "%s-%d.log" % (self.name, i) + self.log = "%s.%d.log" % (self.name, i) i += 1 def get_log(self): diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 6e270851f0..86679611c4 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -24,7 +24,7 @@ from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Co from qpid.datatypes import uuid4 from brokertest import * from threading import Thread, Lock, Condition -from logging import getLogger, WARN, ERROR, DEBUG +from logging import getLogger, WARN, ERROR, DEBUG, INFO from qpidtoollibs import BrokerAgent from uuid import UUID @@ -75,7 +75,10 @@ class HaBroker(Broker): if not self._agent: self._agent = QmfAgent(self.host_port()) return self._agent - def ha_status(self): self.agent().getHaBroker().status + def ha_status(self): return self.agent().getHaBroker().status + + def wait_status(self, status): + assert retry(lambda: self.ha_status() == status), "%r != %r"%(self.ha_status(), status) # FIXME aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): @@ -97,6 +100,14 @@ class HaBroker(Broker): try: wait_address(bs, address) finally: bs.connection.close() + def assert_browse(self, queue, expected, **kwargs): + """Verify queue contents by browsing.""" + bs = self.connect().session() + try: + wait_address(bs, queue) + assert_browse_retry(bs, queue, expected, **kwargs) + finally: bs.connection.close() + def assert_browse_backup(self, queue, expected, **kwargs): """Combines wait_backup and assert_browse_retry.""" bs = self.connect_admin().session() @@ -157,9 +168,11 @@ class HaCluster(object): if promote_next: self[(i+1) % len(self)].promote() def restart(self, i): + """Start a broker with the same name and data directory. It will get + a separate log file: foo.n.log""" b = self._brokers[i] self._brokers[i] = HaBroker( - self.test, name=self.next_name(), port=b.port(), brokers_url=self.url, + self.test, name=b.name, port=b.port(), brokers_url=self.url, **self.kwargs) def bounce(self, i, promote_next=True): @@ -347,7 +360,8 @@ class ReplicationTests(BrokerTest): primary.kill() assert retry(lambda: not is_running(primary.pid)) backup.promote() - self.assert_browse_retry(s, "q", ["foo"]) + sender.send("bar") + self.assert_browse_retry(s, "q", ["foo", "bar"]) c.close() def test_failover_cpp(self): @@ -630,27 +644,6 @@ class ReplicationTests(BrokerTest): assert valid_address(s, "ad") assert valid_address(s, "time") - def test_recovering(self): - """Verify that the primary broker does not go active until expected - backups have connected""" - cluster = HaCluster(self, 3, args=["--ha-expected-backups=2"]) - c = cluster[0].connect() - for i in xrange(10): - s = c.session().sender("q%s;{create:always}"%i) - for j in xrange(100): s.send(str(j)) - cluster.kill(0) # Fail over to 1 - cluster[1].assert_connect_fail() # Waiting for backups, won't accept clients. - cluster.restart(0) - c = retry(cluster[1].try_connect) - self.assertTrue(c) - cluster[1].assert_browse_backup("q0", [str(i) for i in xrange(100)]); - - # Verify in logs that all queue catch-up happened before the transition to active. - log = open(cluster[1].log).read() - i = log.find("Status change: recovering -> active") - self.failIf(i < 0) - self.assertEqual(log.find("caught up", i), -1) - def test_broker_info(self): """Check that broker information is correctly published via management""" cluster = HaCluster(self, 3) @@ -667,7 +660,7 @@ class ReplicationTests(BrokerTest): # Check that all brokers have the same membership as the cluster for broker in cluster: qmf = broker.agent().getHaBroker() - assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf)) + assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker) # Add a new broker, check it is updated everywhere b = cluster.start() cluster_ports.append(b.port()) @@ -720,12 +713,10 @@ class LongTests(BrokerTest): def test_failover_send_receive(self): """Test failover with continuous send-receive""" - # Start a cluster, all members will be killed during the test. - # FIXME aconway 2012-05-01: try expected-backups=1, requires catchup-ready fixed. - brokers = HaCluster(self, 3, args=["--ha-expected-backups=2"]) + brokers = HaCluster(self, 3) # Start sender and receiver threads - sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False) + sender = NumberedSender(brokers[0], max_depth=1024, failover_updates=False) receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False) receiver.start() sender.start() @@ -744,14 +735,14 @@ class LongTests(BrokerTest): # otherwise we can lose messages. When we implement non-promotion # of catchup brokers we can make this stronger: wait only for # there to be at least one ready backup. - assert retry(brokers[i%3].try_connect, 1) + brokers[i%3].wait_status("active") brokers.bounce(i%3) i += 1 def enough(): # Verify we're still running receiver.check() # Verify no exceptions return receiver.received > n + 100 # FIXME aconway 2012-05-17: client reconnect sometimes takes > 1 sec. - assert retry(enough, 10) + assert retry(enough, 3), "Stalled: %s < %s+100"%(receiver.received, n) except: traceback.print_exc() raise @@ -764,6 +755,53 @@ class LongTests(BrokerTest): brokers.kill(i, False) if dead: raise Exception("Brokers not running: %s"%dead) + +class RecoveryTests(BrokerTest): + """Tests for recovery after a failure.""" + + def test_queue_hold(self): + """Verify that the broker holds queues without sufficient backup, + i.e. does not complete messages sent to those queues.""" + + cluster = HaCluster(self, 4); + # Wait for the primary to be ready + cluster[0].wait_status("active") + + # Create a queue before the failure. + s1 = cluster.connect(0).session().sender("q1;{create:always}") + for b in cluster: b.wait_backup("q1") + for i in xrange(100): s1.send(str(i)) + + # Kill primary and 2 backups + for i in [0,1,2]: cluster.kill(i, False) + cluster[3].promote() # New primary, backups will be 1 and 2 + cluster[3].wait_status("recovering") + + # Create a queue after the failure + s2 = cluster.connect(3).session().sender("q2;{create:always}") + + # Verify that messages sent are not completed + for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False) + self.assertEqual(s1.unsettled(), 100) + self.assertEqual(s2.unsettled(), 100) + + # Verify we can receive even if sending is on hold: + cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)]) + + # Restart backups, verify queues are released only when both backups are up + cluster.restart(1) + self.assertEqual(s1.unsettled(), 100) + self.assertEqual(s2.unsettled(), 100) + self.assertEqual(cluster[3].ha_status(), "recovering") + cluster.restart(2) + + def settled(sender): sender.sync(); return sender.unsettled() == 0; + assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled()) + assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled()) + cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)]) + cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)]) + cluster[3].wait_status("active"), + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) qpid_ha = os.getenv("QPID_HA_EXEC") |
