diff options
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 50 |
1 files changed, 31 insertions, 19 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 86679611c4..e43d8bcb91 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -20,7 +20,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math import traceback -from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection +from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout from qpid.datatypes import uuid4 from brokertest import * from threading import Thread, Lock, Condition @@ -493,6 +493,8 @@ class ReplicationTests(BrokerTest): for i in range(10): s.send(Message(str(i)), sync=False) except qpid.messaging.exceptions.TargetCapacityExceeded: pass backup.assert_browse_backup("q", [str(i) for i in range(0,5)]) + # Detach, don't close as there is a broken session + s.session.connection.detach() def test_priority(self): """Verify priority queues replicate correctly""" @@ -716,21 +718,24 @@ class LongTests(BrokerTest): brokers = HaCluster(self, 3) # Start sender and receiver threads - sender = NumberedSender(brokers[0], max_depth=1024, failover_updates=False) - receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False) - receiver.start() - sender.start() + senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False, + queue="test%s"%(i)) for i in xrange(10)] + receivers = [NumberedReceiver(brokers[0], sender=senders[i], + failover_updates=False, + queue="test%s"%(i)) for i in xrange(10)] + for r in receivers: r.start() + for s in senders: s.start() # Wait for sender & receiver to get up and running - assert retry(lambda: receiver.received > 100) + assert retry(lambda: receivers[0].received > 100) # Kill and restart brokers in a cycle: endtime = time.time() + self.duration() i = 0 try: while time.time() < endtime or i < 3: # At least 3 iterations - sender.sender.assert_running() - receiver.receiver.assert_running() - n = receiver.received + for s in senders: s.sender.assert_running() + for r in receivers: r.receiver.assert_running() + n = receivers[0].received # FIXME aconway 2012-05-01: don't kill primary till it's active # otherwise we can lose messages. When we implement non-promotion # of catchup brokers we can make this stronger: wait only for @@ -739,23 +744,22 @@ class LongTests(BrokerTest): brokers.bounce(i%3) i += 1 def enough(): # Verify we're still running - receiver.check() # Verify no exceptions - return receiver.received > n + 100 + receivers[0].check() # Verify no exceptions + return receivers[0].received > n + 100 # FIXME aconway 2012-05-17: client reconnect sometimes takes > 1 sec. - assert retry(enough, 3), "Stalled: %s < %s+100"%(receiver.received, n) + assert retry(enough, 10), "Stalled: %s < %s+100"%(receivers[0].received, n) except: traceback.print_exc() raise finally: - sender.stop() - receiver.stop() + for s in senders: s.stop() + for r in receivers: r.stop() dead = [] for i in xrange(3): if not brokers[i].is_running(): dead.append(i) brokers.kill(i, False) if dead: raise Exception("Brokers not running: %s"%dead) - class RecoveryTests(BrokerTest): """Tests for recovery after a failure.""" @@ -766,31 +770,37 @@ class RecoveryTests(BrokerTest): cluster = HaCluster(self, 4); # Wait for the primary to be ready cluster[0].wait_status("active") - # Create a queue before the failure. s1 = cluster.connect(0).session().sender("q1;{create:always}") for b in cluster: b.wait_backup("q1") for i in xrange(100): s1.send(str(i)) - # Kill primary and 2 backups for i in [0,1,2]: cluster.kill(i, False) cluster[3].promote() # New primary, backups will be 1 and 2 cluster[3].wait_status("recovering") + def trySync(s): + try: + s.sync(timeout=.1) + self.fail("Expected Timeout exception") + except Timeout: pass + # Create a queue after the failure s2 = cluster.connect(3).session().sender("q2;{create:always}") - # Verify that messages sent are not completed for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False) + trySync(s1) self.assertEqual(s1.unsettled(), 100) + trySync(s2) self.assertEqual(s2.unsettled(), 100) # Verify we can receive even if sending is on hold: cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)]) - # Restart backups, verify queues are released only when both backups are up cluster.restart(1) + trySync(s1) self.assertEqual(s1.unsettled(), 100) + trySync(s2) self.assertEqual(s2.unsettled(), 100) self.assertEqual(cluster[3].ha_status(), "recovering") cluster.restart(2) @@ -801,6 +811,8 @@ class RecoveryTests(BrokerTest): cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)]) cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)]) cluster[3].wait_status("active"), + s1.session.connection.close() + s2.session.connection.close() if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) |
