summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-09-14 18:48:09 +0000
committerAlan Conway <aconway@apache.org>2012-09-14 18:48:09 +0000
commit5c73472bc29bb0dfcfda168ad81e0bb5025a3077 (patch)
tree3eb45e481a828824e6c8e6589ad79150b67ecdeb /qpid/cpp/src/tests
parentcef26777e4d9ed0874e21344994abfaf8fef9a60 (diff)
downloadqpid-python-5c73472bc29bb0dfcfda168ad81e0bb5025a3077.tar.gz
QPID-4223: HA Completion isn't sent when queue that has acquired but unacknowledged messages is deleted
- Extended ha_test.py test_failover_send_receive to kill backup as well as primary - QueueRegistry::destroy was not calling observer. - Primary removes disconnected brokers backups and expectedBackups - Primary calls checkReady in all cases where broker is removed from expectedBackups git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1384882 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/brokertest.py4
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py52
2 files changed, 41 insertions, 15 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index aea4460e5a..dd09e8aa27 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -565,7 +565,7 @@ class NumberedSender(Thread):
def __init__(self, broker, max_depth=None, queue="test-queue",
connection_options=Cluster.CONNECTION_OPTIONS,
- failover_updates=True, url=None):
+ failover_updates=True, url=None, args=[]):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
Requires self.notify_received(n) to be called each time messages are received.
@@ -576,7 +576,7 @@ class NumberedSender(Thread):
"--address", "%s;{create:always}"%queue,
"--connection-options", "{%s}"%(connection_options),
"--content-stdin"
- ]
+ ] + args
if failover_updates: cmd += ["--failover-updates"]
self.sender = broker.test.popen(
cmd, expect=EXPECT_RUNNING, stdin=PIPE)
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index cf5beb467c..92442b465a 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -18,7 +18,7 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
import traceback
from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
from qpid.datatypes import uuid4
@@ -114,7 +114,8 @@ class HaBroker(Broker):
self._status = self.ha_status()
return self._status == status;
except ConnectionError: return False
- assert retry(try_get_status, timeout=20), "%s %r != %r"%(self, self._status, status)
+ assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%(
+ self, status, self._status)
# FIXME aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
@@ -887,29 +888,54 @@ class LongTests(BrokerTest):
# Kill and restart brokers in a cycle:
endtime = time.time() + self.duration()
i = 0
+ primary = 0
try:
while time.time() < endtime or i < 3: # At least 3 iterations
+ # Precondition: All 3 brokers running,
+ # primary = index of promoted primary
+ # one or two backups are running,
for s in senders: s.sender.assert_running()
for r in receivers: r.receiver.assert_running()
- checkpoint = [ r.received for r in receivers ]
- # Don't kill primary till it is active and the next
- # backup is ready, otherwise we can lose messages.
- brokers[i%3].wait_status("active")
- brokers[(i+1)%3].wait_status("ready")
- brokers.bounce(i%3)
+ checkpoint = [ r.received+100 for r in receivers ]
+ dead = None
+ victim = random.randint(0,2)
+ if victim == primary:
+ # Don't kill primary till it is active and the next
+ # backup is ready, otherwise we can lose messages.
+ brokers[victim].wait_status("active")
+ next = (victim+1)%3
+ brokers[next].wait_status("ready")
+ brokers.bounce(victim) # Next one is promoted
+ primary = next
+ else:
+ brokers.kill(victim, False)
+ dead = victim
+
+ # At this point the primary is running with 1 or 2 backups
+ # Make sure we are not stalled
+ map(wait_passed, receivers, checkpoint)
+ # Run another checkpoint to ensure things work in this configuration
+ checkpoint = [ r.received+100 for r in receivers ]
+ map(wait_passed, receivers, checkpoint)
+
+ if dead is not None:
+ brokers.restart(dead) # Restart backup
+ brokers[dead].ready(client_properties={"qpid.ha-admin":1})
+ dead = None
i += 1
- map(wait_passed, receivers, checkpoint) # Wait for all receivers
except:
traceback.print_exc()
raise
finally:
for s in senders: s.stop()
for r in receivers: r.stop()
- dead = []
+ unexpected_dead = []
for i in xrange(3):
- if not brokers[i].is_running(): dead.append(i)
- brokers.kill(i, False)
- if dead: raise Exception("Brokers not running: %s"%dead)
+ if not brokers[i].is_running() and i != dead:
+ unexpected_dead.append(i)
+ if brokers[i].is_running(): brokers.kill(i, False)
+ if unexpected_dead:
+ raise Exception("Brokers not running: %s"%unexpected_dead)
class RecoveryTests(BrokerTest):
"""Tests for recovery after a failure."""