summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-12 21:20:07 +0000
committerAlan Conway <aconway@apache.org>2012-06-12 21:20:07 +0000
commit2c26294c60daa02e19189cbbd935e2441f2c541c (patch)
treeb62b1f4dc8ce0627d9fc28782772ee9f2e503147 /qpid/cpp/src/tests/ha_tests.py
parentbf69fd2f69325dd660454e6b6c8399c51cacea2c (diff)
downloadqpid-python-2c26294c60daa02e19189cbbd935e2441f2c541c.tar.gz
QPID-3603: Introduced RemoteBackup to track backup status.
The primary creates RemoteBackup object for each connected or expected backup. On first being promoted, the new primary has a RemoteBackup for each of the known backups at the time of the failure. The RemoteBackup manages queue guards for its backup and tracks it's readiness. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1349540 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py50
1 files changed, 31 insertions, 19 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 86679611c4..e43d8bcb91 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -20,7 +20,7 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math
import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout
from qpid.datatypes import uuid4
from brokertest import *
from threading import Thread, Lock, Condition
@@ -493,6 +493,8 @@ class ReplicationTests(BrokerTest):
for i in range(10): s.send(Message(str(i)), sync=False)
except qpid.messaging.exceptions.TargetCapacityExceeded: pass
backup.assert_browse_backup("q", [str(i) for i in range(0,5)])
+ # Detach, don't close as there is a broken session
+ s.session.connection.detach()
def test_priority(self):
"""Verify priority queues replicate correctly"""
@@ -716,21 +718,24 @@ class LongTests(BrokerTest):
brokers = HaCluster(self, 3)
# Start sender and receiver threads
- sender = NumberedSender(brokers[0], max_depth=1024, failover_updates=False)
- receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False)
- receiver.start()
- sender.start()
+ senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False,
+ queue="test%s"%(i)) for i in xrange(10)]
+ receivers = [NumberedReceiver(brokers[0], sender=senders[i],
+ failover_updates=False,
+ queue="test%s"%(i)) for i in xrange(10)]
+ for r in receivers: r.start()
+ for s in senders: s.start()
# Wait for sender & receiver to get up and running
- assert retry(lambda: receiver.received > 100)
+ assert retry(lambda: receivers[0].received > 100)
# Kill and restart brokers in a cycle:
endtime = time.time() + self.duration()
i = 0
try:
while time.time() < endtime or i < 3: # At least 3 iterations
- sender.sender.assert_running()
- receiver.receiver.assert_running()
- n = receiver.received
+ for s in senders: s.sender.assert_running()
+ for r in receivers: r.receiver.assert_running()
+ n = receivers[0].received
# FIXME aconway 2012-05-01: don't kill primary till it's active
# otherwise we can lose messages. When we implement non-promotion
# of catchup brokers we can make this stronger: wait only for
@@ -739,23 +744,22 @@ class LongTests(BrokerTest):
brokers.bounce(i%3)
i += 1
def enough(): # Verify we're still running
- receiver.check() # Verify no exceptions
- return receiver.received > n + 100
+ receivers[0].check() # Verify no exceptions
+ return receivers[0].received > n + 100
# FIXME aconway 2012-05-17: client reconnect sometimes takes > 1 sec.
- assert retry(enough, 3), "Stalled: %s < %s+100"%(receiver.received, n)
+ assert retry(enough, 10), "Stalled: %s < %s+100"%(receivers[0].received, n)
except:
traceback.print_exc()
raise
finally:
- sender.stop()
- receiver.stop()
+ for s in senders: s.stop()
+ for r in receivers: r.stop()
dead = []
for i in xrange(3):
if not brokers[i].is_running(): dead.append(i)
brokers.kill(i, False)
if dead: raise Exception("Brokers not running: %s"%dead)
-
class RecoveryTests(BrokerTest):
"""Tests for recovery after a failure."""
@@ -766,31 +770,37 @@ class RecoveryTests(BrokerTest):
cluster = HaCluster(self, 4);
# Wait for the primary to be ready
cluster[0].wait_status("active")
-
# Create a queue before the failure.
s1 = cluster.connect(0).session().sender("q1;{create:always}")
for b in cluster: b.wait_backup("q1")
for i in xrange(100): s1.send(str(i))
-
# Kill primary and 2 backups
for i in [0,1,2]: cluster.kill(i, False)
cluster[3].promote() # New primary, backups will be 1 and 2
cluster[3].wait_status("recovering")
+ def trySync(s):
+ try:
+ s.sync(timeout=.1)
+ self.fail("Expected Timeout exception")
+ except Timeout: pass
+
# Create a queue after the failure
s2 = cluster.connect(3).session().sender("q2;{create:always}")
-
# Verify that messages sent are not completed
for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
+ trySync(s1)
self.assertEqual(s1.unsettled(), 100)
+ trySync(s2)
self.assertEqual(s2.unsettled(), 100)
# Verify we can receive even if sending is on hold:
cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
-
# Restart backups, verify queues are released only when both backups are up
cluster.restart(1)
+ trySync(s1)
self.assertEqual(s1.unsettled(), 100)
+ trySync(s2)
self.assertEqual(s2.unsettled(), 100)
self.assertEqual(cluster[3].ha_status(), "recovering")
cluster.restart(2)
@@ -801,6 +811,8 @@ class RecoveryTests(BrokerTest):
cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
cluster[3].wait_status("active"),
+ s1.session.connection.close()
+ s2.session.connection.close()
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)