summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-03-30 19:36:48 +0000
committerAlan Conway <aconway@apache.org>2012-03-30 19:36:48 +0000
commite54ef8dc737196343ad974c91a86681efca5fb14 (patch)
treefc1cb9b1d5035dc06795ae877e02b895e86b2a9f /qpid/cpp/src/tests
parent38d1f36fe4238a887f867350adaa56489e53e0e6 (diff)
downloadqpid-python-e54ef8dc737196343ad974c91a86681efca5fb14.tar.gz
QPID-3603: Keep acquired messages on queues for all queue types.
Updated priority and lvq queues to keep acquired messages, and supply them to browsers if requested. This is necessary so replicating subscriptions can back-up these queue types without message loss. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1307582 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/brokertest.py11
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py52
2 files changed, 57 insertions, 6 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 3207a51b79..ccf25f35b5 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -513,18 +513,21 @@ class BrokerTest(TestCase):
finally: r.close()
return contents
- def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content):
+ def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
actual_contents = self.browse(session, queue, timeout, transform=transform)
- self.assertEqual(expect_contents, actual_contents)
+ if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
+ self.assertEqual(expect_contents, actual_contents, msg)
- def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content):
+ def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg=None):
"""Wait up to timeout for contents of queue to match expect_contents"""
test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents
retry(test, timeout, delay)
- self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform))
+ actual_contents = self.browse(session, queue, 0, transform=transform)
+ if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
+ self.assertEqual(expect_contents, actual_contents, msg)
def join(thread, timeout=10):
thread.join(timeout)
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 822e07c702..e9d44c21e0 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -100,8 +100,8 @@ class HaCluster(object):
def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
-class ShortTests(BrokerTest):
- """Short HA functionality tests."""
+class HaTest(BrokerTest):
+ """Base class for HA test cases, defines convenience functions"""
# Wait for an address to become valid.
def wait(self, session, address):
@@ -135,6 +135,9 @@ class ShortTests(BrokerTest):
"""Connect to a backup broker as an admin connection"""
return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs)
+class ReplicationTests(HaTest):
+ """Correctness tests for HA replication."""
+
def test_replication(self):
"""Test basic replication of configuration and messages before and
after backup has connected"""
@@ -491,6 +494,51 @@ class ShortTests(BrokerTest):
# self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority)
+ def test_backup_acquired(self):
+ """Verify that acquired messages are backed up, for all queue types."""
+ class Test:
+ def __init__(self, queue, arguments, expect):
+ self.queue = queue
+ self.address = "%s;{create:always,node:{x-declare:{arguments:{%s}}}}"%(
+ self.queue, ",".join(arguments + ["'qpid.replicate':all"]))
+ self.expect = [str(i) for i in expect]
+
+ def send(self, connection):
+ """Send messages, then acquire one but don't acknowledge"""
+ s = connection.session()
+ for m in range(10): s.sender(self.address).send(str(m))
+ s.receiver(self.address).fetch()
+
+ def wait(self, brokertest, backup):
+ brokertest.wait_backup(backup, self.queue)
+
+ def verify(self, brokertest, backup):
+ brokertest.assert_browse_backup(
+ backup, self.queue, self.expect, msg=self.queue)
+
+ tests = [
+ Test("plain",[],range(10)),
+ Test("ring", ["'qpid.policy_type':ring", "'qpid.max_count':5"], range(5,10)),
+ Test("priority",["'qpid.priorities':10"], range(10)),
+ Test("fairshare", ["'qpid.priorities':10,'qpid.fairshare':5"], range(10)),
+ Test("lvq", ["'qpid.last_value_queue_key':lvq-key"], [9])
+ ]
+
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+ c = primary.connect()
+ for t in tests: t.send(c) # Send messages, leave one unacknowledged.
+
+ backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+ # Wait for backups to catch up.
+ for t in tests:
+ t.wait(self, backup1)
+ t.wait(self, backup2)
+ # Verify acquired message was replicated
+ for t in tests: t.verify(self, backup1)
+ for t in tests: t.verify(self, backup2)
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit