diff options
author | Alan Conway <aconway@apache.org> | 2012-05-15 21:06:15 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-05-15 21:06:15 +0000 |
commit | 7f358c5dc2c54e9ba3d076e141704b0b72f1d885 (patch) | |
tree | 5ee5a007d5e740c63883f6841c0650902318b51a | |
parent | 22a03fed2d7d17e7c9bb81e2dcfff8e77393e293 (diff) | |
download | qpid-python-7f358c5dc2c54e9ba3d076e141704b0b72f1d885.tar.gz |
QPID-3603: Test fixes to ha_tests.py
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1338893 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 88 |
2 files changed, 46 insertions, 44 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 8eb7e441a2..b0bf1ce194 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -126,7 +126,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. // Clear out any old messages, reset the queue to start replicating fresh. - queue->purge(); + queue->purge(); // FIXME aconway 2012-05-02: race queue->setPosition(0); settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index b5dce0e4d7..96cf843775 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -19,6 +19,7 @@ # import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math +import traceback from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection from qpid.datatypes import uuid4 from brokertest import * @@ -100,13 +101,9 @@ class HaBroker(Broker): self.test.fail("Expected ConnectionError") except ConnectionError: pass - def connect_retry(self): - def try_connect(): - try: return self.connect() - except ConnectionError: return None - c = retry(try_connect) - if c: return c - else: self.test.fail("Failed to connect") + def try_connect(self): + try: return self.connect() + except ConnectionError: return None class HaCluster(object): _cluster_count = 0 @@ -117,17 +114,20 @@ class HaCluster(object): self.kwargs = kwargs self._brokers = [] self.id = HaCluster._cluster_count + self.broker_id = 0 HaCluster._cluster_count += 1 for i in xrange(n): self.start(False) self.update_urls() self[0].promote() + def next_name(self): + name="cluster%s-%s"%(self.id, self.broker_id) + self.broker_id += 1 + return name + def start(self, update_urls=True): """Start a new broker in the cluster""" - b = HaBroker( - self.test, - name="broker%s-%s"%(self.id, len(self._brokers)), - **self.kwargs) + b = HaBroker(self.test, name=self.next_name(), **self.kwargs) self._brokers.append(b) if update_urls: self.update_urls() return b @@ -140,20 +140,21 @@ class HaCluster(object): """Connect with reconnect_urls""" return self[i].connect(reconnect=True, reconnect_urls=self.url.split(",")) - def kill(self, i): + def kill(self, i, promote_next=True): """Kill broker i, promote broker i+1""" - self[i].kill() self[i].expect = EXPECT_EXIT_FAIL - self[(i+1) % len(self)].promote() + self[i].kill() + if promote_next: self[(i+1) % len(self)].promote() def restart(self, i): b = self._brokers[i] self._brokers[i] = HaBroker( - self.test, name=b.name, port=b.port(), broker_url=self.url, **self.kwargs) + self.test, name=self.next_name(), port=b.port(), broker_url=self.url, + **self.kwargs) - def bounce(self, i): + def bounce(self, i, promote_next=True): """Stop and restart a broker in a cluster.""" - self.kill(i) + self.kill(i, promote_next) self.restart(i) # Behave like a list of brokers. @@ -613,7 +614,8 @@ class ReplicationTests(BrokerTest): cluster.kill(0) # Fail over to 1 cluster[1].assert_connect_fail() # Waiting for backups, won't accept clients. cluster.restart(0) - c = cluster[1].connect_retry() + c = retry(cluster[1].try_connect) + self.assertTrue(c) cluster[1].assert_browse_backup("q0", [str(i) for i in xrange(100)]); # Verify in logs that all queue catch-up happened before the transition to active. @@ -665,50 +667,50 @@ class LongTests(BrokerTest): else: return 3 # Default is to be quick + # FIXME aconway 2012-05-15: disabled till functionality fixed. def disable_test_failover_send_receive(self): """Test failover with continuous send-receive""" - # FIXME aconway 2012-02-03: fails due to dropped messages, - # known issue: sending messages to new primary before - # backups are ready. Enable when fixed. - # Start a cluster, all members will be killed during the test. - brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL, - args=["--ha-expected-backups=2"]) - for name in ["ha0","ha1","ha2"] ] - url = ",".join([b.host_port() for b in brokers]) - for b in brokers: b.set_broker_url(url) - brokers[0].promote() + # FIXME aconway 2012-05-01: try expected-backups=1, requires catchup-ready fixed. + brokers = HaCluster(self, 3, args=["--ha-expected-backups=2"]) # Start sender and receiver threads sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False) receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False) receiver.start() sender.start() + + # Wait for sender & receiver to get up and running + assert retry(lambda: receiver.received > 100) + # Kill and restart brokers in a cycle: + endtime = time.time() + self.duration() + i = 0 try: - # Wait for sender & receiver to get up and running - assert retry(lambda: receiver.received > 100) - # Kill and restart brokers in a cycle: - endtime = time.time() + self.duration() - i = 0 while time.time() < endtime or i < 3: # At least 3 iterations sender.sender.assert_running() receiver.receiver.assert_running() - port = brokers[i].port() - brokers[i].kill() - brokers.append( - HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port, - expect=EXPECT_EXIT_FAIL)) + n = receiver.received + # FIXME aconway 2012-05-01: don't kill primary till it's active + # otherwise we can lose messages. This is in lieu of not + # promoting catchup brokers. + assert retry(brokers[i%3].try_connect, 1) + brokers.bounce(i%3) i += 1 - brokers[i].promote() - n = receiver.received # Verify we're still running - def enough(): + def enough(): # Verify we're still running receiver.check() # Verify no exceptions return receiver.received > n + 100 - assert retry(enough, timeout=5) + assert retry(enough, 1) + except: + traceback.print_exc() + raise finally: sender.stop() receiver.stop() - for b in brokers[i:]: b.kill() + dead = [] + for i in xrange(3): + if not brokers[i].is_running(): dead.append(i) + brokers.kill(i, False) + if dead: raise Exception("Brokers not running: %s"%dead) if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) |