summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-05-15 21:05:34 +0000
committerAlan Conway <aconway@apache.org>2012-05-15 21:05:34 +0000
commit55305747e6e7f931756bfa21460c37e350f5ea0f (patch)
treec6b826cac45092f95c428671a13024aad62b3400 /qpid/cpp/src/tests
parent80a0832a2fe775ff217e6353f003226eb3f18d89 (diff)
downloadqpid-python-55305747e6e7f931756bfa21460c37e350f5ea0f.tar.gz
QPID-3603: HA broker backup/primary ready checks.
- Introduce HA broker state machien - Inform backup queues when ready. - Incomplete implementation of backup ready check. - does not count correctly after a failover, see countUnready. - Existing replicator bridges updated out of sync with BrokerReplicator initialize. - Does not handle multi-messages responses. - Newly promoted HA primary waits for backups to be ready before accepting clients. - Uniform log prefixes for HA messages. - qpid-ha tests, call qpid-ha python code directly. - Move excluder from Backup to HaBroker, it is also used in PROMOTING. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1338889 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/brokertest.py18
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py164
2 files changed, 125 insertions, 57 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 8255fbe9ac..257ac68b74 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -77,17 +77,19 @@ def error_line(filename, n=1):
return ":\n" + "".join(result)
def retry(function, timeout=10, delay=.01):
- """Call function until it returns True or timeout expires.
- Double the delay for each retry. Return True if function
- returns true, False if timeout expires."""
+ """Call function until it returns a true value or timeout expires.
+ Double the delay for each retry. Returns what function returns if
+ true, None if timeout expires."""
deadline = time.time() + timeout
- while not function():
+ ret = None
+ while not ret:
+ ret = function()
remaining = deadline - time.time()
if remaining <= 0: return False
delay = min(delay, remaining)
time.sleep(delay)
delay *= 2
- return True
+ return ret
class AtomicCounter:
def __init__(self):
@@ -298,9 +300,9 @@ class Broker(Popen):
# Read port from broker process stdout if not already read.
if (self._port == 0):
try: self._port = int(self.stdout.readline())
- except ValueError:
- raise Exception("Can't get port for broker %s (%s)%s" %
- (self.name, self.pname, error_line(self.log,5)))
+ except ValueError, e:
+ raise Exception("Can't get port for broker %s (%s)%s: %s" %
+ (self.name, self.pname, error_line(self.log,5), e))
return self._port
def unexpected(self,msg):
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 827cb7dca9..15137a0c5f 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -28,13 +28,23 @@ from qpidtoollibs import BrokerAgent
log = getLogger(__name__)
+class QmfHaBroker(object):
+ def __init__(self, address):
+ self.connection = Connection.establish(
+ address, client_properties={"qpid.ha-admin":1})
+ self.qmf = BrokerAgent(self.connection)
+ self.ha_broker = self.qmf.getHaBroker()
+ if not self.ha_broker:
+ raise Exception("HA module is not loaded on broker at %s"%address)
+
class HaBroker(Broker):
def __init__(self, test, args=[], broker_url=None, ha_cluster=True,
ha_replicate="all", **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=info+", "--log-enable=debug+:ha::",
+ "--log-enable=info+",
+ "--log-enable=debug+:ha::",
# FIXME aconway 2012-02-13: workaround slow link failover.
"--link-maintenace-interval=0.1",
"--ha-cluster=%s"%ha_cluster]
@@ -42,32 +52,31 @@ class HaBroker(Broker):
args += [ "--ha-replicate=%s"%ha_replicate ]
if broker_url: args.extend([ "--ha-brokers", broker_url ])
Broker.__init__(self, test, args, **kwargs)
- self.commands=os.getenv("PYTHON_COMMANDS")
- assert os.path.isdir(self.commands)
+ self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha")
+ assert os.path.exists(self.qpid_ha_path)
+ self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
+ assert os.path.exists(self.qpid_config_path)
getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+ self.qpid_ha_script=import_script(self.qpid_ha_path)
- def promote(self):
- assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0
-
- def set_client_url(self, url):
- assert os.system(
- "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0
+ def qpid_ha(self, args): self.qpid_ha_script.main(["", "-b", self.host_port()]+args)
- def set_broker_url(self, url):
- assert os.system(
- "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0
+ def promote(self): self.qpid_ha(["promote"])
+ def set_client_url(self, url): self.qpid_ha(["set", "--public-brokers", url])
+ def set_broker_url(self, url): self.qpid_ha(["set", "--brokers", url])
+ def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
+ def ha_status(self): QmfHaBroker(self.host_port()).ha_broker.status
- def replicate(self, from_broker, queue):
- assert os.system(
- "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+ # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
+ def qpid_config(self, args):
+ assert subprocess.call(
+ [self.qpid_config_path, "--broker", self.host_port()]+args) == 0
def config_replicate(self, from_broker, queue):
- assert os.system(
- "%s/qpid-config --broker=%s add queue --start-replica %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+ self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
def config_declare(self, queue, replication):
- assert os.system(
- "%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0
+ self.qpid_config(["add", "queue", queue, "--replicate", replication])
def connect_admin(self, **kwargs):
return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
@@ -86,17 +95,47 @@ class HaBroker(Broker):
assert_browse_retry(bs, queue, expected, **kwargs)
finally: bs.connection.close()
+ def assert_connect_fail(self):
+ try:
+ self.connect()
+ self.test.fail("Expected ConnectionError")
+ except ConnectionError: pass
+
+ def connect_retry(self):
+ def try_connect():
+ try: return self.connect()
+ except ConnectionError: return None
+ c = retry(try_connect)
+ if c: return c
+ else: self.test.fail("Failed to connect")
+
class HaCluster(object):
_cluster_count = 0
def __init__(self, test, n, **kwargs):
"""Start a cluster of n brokers"""
self.test = test
- self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)]
+ self.kwargs = kwargs
+ self._brokers = []
+ self.id = HaCluster._cluster_count
HaCluster._cluster_count += 1
+ for i in xrange(n): self.start(False)
+ self.update_urls()
+ self[0].promote()
+
+ def start(self, update_urls=True):
+ """Start a new broker in the cluster"""
+ b = HaBroker(
+ self.test,
+ name="broker%s-%s"%(self.id, len(self._brokers)),
+ **self.kwargs)
+ self._brokers.append(b)
+ if update_urls: self.update_urls()
+ return b
+
+ def update_urls(self):
self.url = ",".join([b.host_port() for b in self])
for b in self: b.set_broker_url(self.url)
- self[0].promote()
def connect(self, i):
"""Connect with reconnect_urls"""
@@ -108,11 +147,15 @@ class HaCluster(object):
self[i].expect = EXPECT_EXIT_FAIL
self[(i+1) % len(self)].promote()
+ def restart(self, i):
+ b = self._brokers[i]
+ self._brokers[i] = HaBroker(
+ self.test, name=b.name, port=b.port(), broker_url=self.url, **self.kwargs)
+
def bounce(self, i):
"""Stop and restart a broker in a cluster."""
self.kill(i)
- b = self[i]
- self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url)
+ self.restart(i)
# Behave like a list of brokers.
def __len__(self): return len(self._brokers)
@@ -344,6 +387,7 @@ class ReplicationTests(BrokerTest):
def test_standalone_queue_replica(self):
"""Test replication of individual queues outside of cluster mode"""
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
primary = HaBroker(self, name="primary", ha_cluster=False)
pc = primary.connect()
ps = pc.session().sender("q;{create:always}")
@@ -559,6 +603,26 @@ class ReplicationTests(BrokerTest):
test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}")
+ def test_promoting(self):
+ """Verify that the primary broker does not go active until expected
+ backups have connected or timeout expires."""
+ cluster = HaCluster(self, 3, args=["--ha-expected-backups=2"])
+ c = cluster[0].connect()
+ for i in xrange(10):
+ s = c.session().sender("q%s;{create:always}"%i)
+ for j in xrange(100): s.send(str(j))
+ cluster.kill(0) # Fail over to 1
+ cluster[1].assert_connect_fail() # Waiting for backups, won't accept clients.
+ cluster.restart(0)
+ c = cluster[1].connect_retry()
+ cluster[1].assert_browse_backup("q0", [str(i) for i in xrange(100)]);
+
+ # Verify in logs that all queue catch-up happened before the transition to active.
+ log = open(cluster[1].log).read()
+ i = log.find("Status change: promoting -> active")
+ self.failIf(i < 0)
+ self.assertEqual(log.find("caught up", i), -1)
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit
@@ -602,14 +666,15 @@ class LongTests(BrokerTest):
else: return 3 # Default is to be quick
- def disable_test_failover(self):
+ def disable_test_failover_send_receive(self):
"""Test failover with continuous send-receive"""
# FIXME aconway 2012-02-03: fails due to dropped messages,
# known issue: sending messages to new primary before
# backups are ready. Enable when fixed.
# Start a cluster, all members will be killed during the test.
- brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
+ brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL,
+ args=["--ha-expected-backups=2"])
for name in ["ha0","ha1","ha2"] ]
url = ",".join([b.host_port() for b in brokers])
for b in brokers: b.set_broker_url(url)
@@ -620,30 +685,31 @@ class LongTests(BrokerTest):
receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False)
receiver.start()
sender.start()
- # Wait for sender & receiver to get up and running
- assert retry(lambda: receiver.received > 100)
- # Kill and restart brokers in a cycle:
- endtime = time.time() + self.duration()
- i = 0
- while time.time() < endtime or i < 3: # At least 3 iterations
- sender.sender.assert_running()
- receiver.receiver.assert_running()
- port = brokers[i].port()
- brokers[i].kill()
- brokers.append(
- HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port,
- expect=EXPECT_EXIT_FAIL))
- i += 1
- brokers[i].promote()
- n = receiver.received # Verify we're still running
- def enough():
- receiver.check() # Verify no exceptions
- return receiver.received > n + 100
- assert retry(enough, timeout=5)
-
- sender.stop()
- receiver.stop()
- for b in brokers[i:]: b.kill()
+ try:
+ # Wait for sender & receiver to get up and running
+ assert retry(lambda: receiver.received > 100)
+ # Kill and restart brokers in a cycle:
+ endtime = time.time() + self.duration()
+ i = 0
+ while time.time() < endtime or i < 3: # At least 3 iterations
+ sender.sender.assert_running()
+ receiver.receiver.assert_running()
+ port = brokers[i].port()
+ brokers[i].kill()
+ brokers.append(
+ HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port,
+ expect=EXPECT_EXIT_FAIL))
+ i += 1
+ brokers[i].promote()
+ n = receiver.received # Verify we're still running
+ def enough():
+ receiver.check() # Verify no exceptions
+ return receiver.received > n + 100
+ assert retry(enough, timeout=5)
+ finally:
+ sender.stop()
+ receiver.stop()
+ for b in brokers[i:]: b.kill()
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)