summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-05-15 21:06:15 +0000
committerAlan Conway <aconway@apache.org>2012-05-15 21:06:15 +0000
commit7f358c5dc2c54e9ba3d076e141704b0b72f1d885 (patch)
tree5ee5a007d5e740c63883f6841c0650902318b51a
parent22a03fed2d7d17e7c9bb81e2dcfff8e77393e293 (diff)
downloadqpid-python-7f358c5dc2c54e9ba3d076e141704b0b72f1d885.tar.gz
QPID-3603: Test fixes to ha_tests.py
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1338893 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py88
2 files changed, 46 insertions, 44 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 8eb7e441a2..b0bf1ce194 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -126,7 +126,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
// r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
// Clear out any old messages, reset the queue to start replicating fresh.
- queue->purge();
+ queue->purge(); // FIXME aconway 2012-05-02: race
queue->setPosition(0);
settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index b5dce0e4d7..96cf843775 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -19,6 +19,7 @@
#
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math
+import traceback
from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection
from qpid.datatypes import uuid4
from brokertest import *
@@ -100,13 +101,9 @@ class HaBroker(Broker):
self.test.fail("Expected ConnectionError")
except ConnectionError: pass
- def connect_retry(self):
- def try_connect():
- try: return self.connect()
- except ConnectionError: return None
- c = retry(try_connect)
- if c: return c
- else: self.test.fail("Failed to connect")
+ def try_connect(self):
+ try: return self.connect()
+ except ConnectionError: return None
class HaCluster(object):
_cluster_count = 0
@@ -117,17 +114,20 @@ class HaCluster(object):
self.kwargs = kwargs
self._brokers = []
self.id = HaCluster._cluster_count
+ self.broker_id = 0
HaCluster._cluster_count += 1
for i in xrange(n): self.start(False)
self.update_urls()
self[0].promote()
+ def next_name(self):
+ name="cluster%s-%s"%(self.id, self.broker_id)
+ self.broker_id += 1
+ return name
+
def start(self, update_urls=True):
"""Start a new broker in the cluster"""
- b = HaBroker(
- self.test,
- name="broker%s-%s"%(self.id, len(self._brokers)),
- **self.kwargs)
+ b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
self._brokers.append(b)
if update_urls: self.update_urls()
return b
@@ -140,20 +140,21 @@ class HaCluster(object):
"""Connect with reconnect_urls"""
return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
- def kill(self, i):
+ def kill(self, i, promote_next=True):
"""Kill broker i, promote broker i+1"""
- self[i].kill()
self[i].expect = EXPECT_EXIT_FAIL
- self[(i+1) % len(self)].promote()
+ self[i].kill()
+ if promote_next: self[(i+1) % len(self)].promote()
def restart(self, i):
b = self._brokers[i]
self._brokers[i] = HaBroker(
- self.test, name=b.name, port=b.port(), broker_url=self.url, **self.kwargs)
+ self.test, name=self.next_name(), port=b.port(), broker_url=self.url,
+ **self.kwargs)
- def bounce(self, i):
+ def bounce(self, i, promote_next=True):
"""Stop and restart a broker in a cluster."""
- self.kill(i)
+ self.kill(i, promote_next)
self.restart(i)
# Behave like a list of brokers.
@@ -613,7 +614,8 @@ class ReplicationTests(BrokerTest):
cluster.kill(0) # Fail over to 1
cluster[1].assert_connect_fail() # Waiting for backups, won't accept clients.
cluster.restart(0)
- c = cluster[1].connect_retry()
+ c = retry(cluster[1].try_connect)
+ self.assertTrue(c)
cluster[1].assert_browse_backup("q0", [str(i) for i in xrange(100)]);
# Verify in logs that all queue catch-up happened before the transition to active.
@@ -665,50 +667,50 @@ class LongTests(BrokerTest):
else: return 3 # Default is to be quick
+ # FIXME aconway 2012-05-15: disabled till functionality fixed.
def disable_test_failover_send_receive(self):
"""Test failover with continuous send-receive"""
- # FIXME aconway 2012-02-03: fails due to dropped messages,
- # known issue: sending messages to new primary before
- # backups are ready. Enable when fixed.
-
# Start a cluster, all members will be killed during the test.
- brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL,
- args=["--ha-expected-backups=2"])
- for name in ["ha0","ha1","ha2"] ]
- url = ",".join([b.host_port() for b in brokers])
- for b in brokers: b.set_broker_url(url)
- brokers[0].promote()
+ # FIXME aconway 2012-05-01: try expected-backups=1, requires catchup-ready fixed.
+ brokers = HaCluster(self, 3, args=["--ha-expected-backups=2"])
# Start sender and receiver threads
sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False)
receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False)
receiver.start()
sender.start()
+
+ # Wait for sender & receiver to get up and running
+ assert retry(lambda: receiver.received > 100)
+ # Kill and restart brokers in a cycle:
+ endtime = time.time() + self.duration()
+ i = 0
try:
- # Wait for sender & receiver to get up and running
- assert retry(lambda: receiver.received > 100)
- # Kill and restart brokers in a cycle:
- endtime = time.time() + self.duration()
- i = 0
while time.time() < endtime or i < 3: # At least 3 iterations
sender.sender.assert_running()
receiver.receiver.assert_running()
- port = brokers[i].port()
- brokers[i].kill()
- brokers.append(
- HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port,
- expect=EXPECT_EXIT_FAIL))
+ n = receiver.received
+ # FIXME aconway 2012-05-01: don't kill primary till it's active
+ # otherwise we can lose messages. This is in lieu of not
+ # promoting catchup brokers.
+ assert retry(brokers[i%3].try_connect, 1)
+ brokers.bounce(i%3)
i += 1
- brokers[i].promote()
- n = receiver.received # Verify we're still running
- def enough():
+ def enough(): # Verify we're still running
receiver.check() # Verify no exceptions
return receiver.received > n + 100
- assert retry(enough, timeout=5)
+ assert retry(enough, 1)
+ except:
+ traceback.print_exc()
+ raise
finally:
sender.stop()
receiver.stop()
- for b in brokers[i:]: b.kill()
+ dead = []
+ for i in xrange(3):
+ if not brokers[i].is_running(): dead.append(i)
+ brokers.kill(i, False)
+ if dead: raise Exception("Brokers not running: %s"%dead)
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)