summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/brokertest.py2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py102
2 files changed, 71 insertions, 33 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 1b93504b64..c32b7f2c96 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -248,7 +248,7 @@ class Broker(Popen):
self.log = "%s.log" % self.name
i = 1
while (os.path.exists(self.log)):
- self.log = "%s-%d.log" % (self.name, i)
+ self.log = "%s.%d.log" % (self.name, i)
i += 1
def get_log(self):
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 6e270851f0..86679611c4 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -24,7 +24,7 @@ from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Co
from qpid.datatypes import uuid4
from brokertest import *
from threading import Thread, Lock, Condition
-from logging import getLogger, WARN, ERROR, DEBUG
+from logging import getLogger, WARN, ERROR, DEBUG, INFO
from qpidtoollibs import BrokerAgent
from uuid import UUID
@@ -75,7 +75,10 @@ class HaBroker(Broker):
if not self._agent: self._agent = QmfAgent(self.host_port())
return self._agent
- def ha_status(self): self.agent().getHaBroker().status
+ def ha_status(self): return self.agent().getHaBroker().status
+
+ def wait_status(self, status):
+ assert retry(lambda: self.ha_status() == status), "%r != %r"%(self.ha_status(), status)
# FIXME aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
@@ -97,6 +100,14 @@ class HaBroker(Broker):
try: wait_address(bs, address)
finally: bs.connection.close()
+ def assert_browse(self, queue, expected, **kwargs):
+ """Verify queue contents by browsing."""
+ bs = self.connect().session()
+ try:
+ wait_address(bs, queue)
+ assert_browse_retry(bs, queue, expected, **kwargs)
+ finally: bs.connection.close()
+
def assert_browse_backup(self, queue, expected, **kwargs):
"""Combines wait_backup and assert_browse_retry."""
bs = self.connect_admin().session()
@@ -157,9 +168,11 @@ class HaCluster(object):
if promote_next: self[(i+1) % len(self)].promote()
def restart(self, i):
+ """Start a broker with the same name and data directory. It will get
+ a separate log file: foo.n.log"""
b = self._brokers[i]
self._brokers[i] = HaBroker(
- self.test, name=self.next_name(), port=b.port(), brokers_url=self.url,
+ self.test, name=b.name, port=b.port(), brokers_url=self.url,
**self.kwargs)
def bounce(self, i, promote_next=True):
@@ -347,7 +360,8 @@ class ReplicationTests(BrokerTest):
primary.kill()
assert retry(lambda: not is_running(primary.pid))
backup.promote()
- self.assert_browse_retry(s, "q", ["foo"])
+ sender.send("bar")
+ self.assert_browse_retry(s, "q", ["foo", "bar"])
c.close()
def test_failover_cpp(self):
@@ -630,27 +644,6 @@ class ReplicationTests(BrokerTest):
assert valid_address(s, "ad")
assert valid_address(s, "time")
- def test_recovering(self):
- """Verify that the primary broker does not go active until expected
- backups have connected"""
- cluster = HaCluster(self, 3, args=["--ha-expected-backups=2"])
- c = cluster[0].connect()
- for i in xrange(10):
- s = c.session().sender("q%s;{create:always}"%i)
- for j in xrange(100): s.send(str(j))
- cluster.kill(0) # Fail over to 1
- cluster[1].assert_connect_fail() # Waiting for backups, won't accept clients.
- cluster.restart(0)
- c = retry(cluster[1].try_connect)
- self.assertTrue(c)
- cluster[1].assert_browse_backup("q0", [str(i) for i in xrange(100)]);
-
- # Verify in logs that all queue catch-up happened before the transition to active.
- log = open(cluster[1].log).read()
- i = log.find("Status change: recovering -> active")
- self.failIf(i < 0)
- self.assertEqual(log.find("caught up", i), -1)
-
def test_broker_info(self):
"""Check that broker information is correctly published via management"""
cluster = HaCluster(self, 3)
@@ -667,7 +660,7 @@ class ReplicationTests(BrokerTest):
# Check that all brokers have the same membership as the cluster
for broker in cluster:
qmf = broker.agent().getHaBroker()
- assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf))
+ assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker)
# Add a new broker, check it is updated everywhere
b = cluster.start()
cluster_ports.append(b.port())
@@ -720,12 +713,10 @@ class LongTests(BrokerTest):
def test_failover_send_receive(self):
"""Test failover with continuous send-receive"""
- # Start a cluster, all members will be killed during the test.
- # FIXME aconway 2012-05-01: try expected-backups=1, requires catchup-ready fixed.
- brokers = HaCluster(self, 3, args=["--ha-expected-backups=2"])
+ brokers = HaCluster(self, 3)
# Start sender and receiver threads
- sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False)
+ sender = NumberedSender(brokers[0], max_depth=1024, failover_updates=False)
receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False)
receiver.start()
sender.start()
@@ -744,14 +735,14 @@ class LongTests(BrokerTest):
# otherwise we can lose messages. When we implement non-promotion
# of catchup brokers we can make this stronger: wait only for
# there to be at least one ready backup.
- assert retry(brokers[i%3].try_connect, 1)
+ brokers[i%3].wait_status("active")
brokers.bounce(i%3)
i += 1
def enough(): # Verify we're still running
receiver.check() # Verify no exceptions
return receiver.received > n + 100
# FIXME aconway 2012-05-17: client reconnect sometimes takes > 1 sec.
- assert retry(enough, 10)
+ assert retry(enough, 3), "Stalled: %s < %s+100"%(receiver.received, n)
except:
traceback.print_exc()
raise
@@ -764,6 +755,53 @@ class LongTests(BrokerTest):
brokers.kill(i, False)
if dead: raise Exception("Brokers not running: %s"%dead)
+
+class RecoveryTests(BrokerTest):
+ """Tests for recovery after a failure."""
+
+ def test_queue_hold(self):
+ """Verify that the broker holds queues without sufficient backup,
+ i.e. does not complete messages sent to those queues."""
+
+ cluster = HaCluster(self, 4);
+ # Wait for the primary to be ready
+ cluster[0].wait_status("active")
+
+ # Create a queue before the failure.
+ s1 = cluster.connect(0).session().sender("q1;{create:always}")
+ for b in cluster: b.wait_backup("q1")
+ for i in xrange(100): s1.send(str(i))
+
+ # Kill primary and 2 backups
+ for i in [0,1,2]: cluster.kill(i, False)
+ cluster[3].promote() # New primary, backups will be 1 and 2
+ cluster[3].wait_status("recovering")
+
+ # Create a queue after the failure
+ s2 = cluster.connect(3).session().sender("q2;{create:always}")
+
+ # Verify that messages sent are not completed
+ for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
+ self.assertEqual(s1.unsettled(), 100)
+ self.assertEqual(s2.unsettled(), 100)
+
+ # Verify we can receive even if sending is on hold:
+ cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
+
+ # Restart backups, verify queues are released only when both backups are up
+ cluster.restart(1)
+ self.assertEqual(s1.unsettled(), 100)
+ self.assertEqual(s2.unsettled(), 100)
+ self.assertEqual(cluster[3].ha_status(), "recovering")
+ cluster.restart(2)
+
+ def settled(sender): sender.sync(); return sender.unsettled() == 0;
+ assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
+ assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
+ cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
+ cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
+ cluster[3].wait_status("active"),
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
qpid_ha = os.getenv("QPID_HA_EXEC")