summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/brokertest.py
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-08-08 09:24:15 +0000
committerAlan Conway <aconway@apache.org>2014-08-08 09:24:15 +0000
commit2602ecaf16a3ddf424383214da2ea846634c083f (patch)
treecbe7e6a423e2d521c2ebce63a479f2a4e3074ae9 /qpid/cpp/src/tests/brokertest.py
parenta833f714a4de983bce8fb1c2f6b87070bd3b4309 (diff)
downloadqpid-python-2602ecaf16a3ddf424383214da2ea846634c083f.tar.gz
QPID-5966: HA mixing tx enqueue and non-tx dequeue leaves extra messages on backup.
There were several problems: 1. Positions of transactionally enqueued messages not known to QueueReplicator, so not dequeued on backup if dequeued outside a TX on primary. 2. Race condition if tx created immediately after queue could cause duplication of TX message. 3. Replication IDs were not being set during recovery from store (regression, store change?) Fix: 1. Update positions QueueReplicator positions via QueueObserver::enqueued to see all enqueues. 2. Check for duplicate replication-ids on backup in QueueReplicator::route. 3. Set replication-id in publish() if not already set in record(). git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616704 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/brokertest.py')
-rw-r--r--qpid/cpp/src/tests/brokertest.py17
1 files changed, 11 insertions, 6 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 44824fe67e..461ef0de9a 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -428,17 +428,22 @@ class Broker(Popen):
assert not error.search(line) or ignore.search(line), "Errors in log file %s: %s"%(log, line)
finally: log.close()
+def receiver_iter(receiver, timeout=0):
+ """Make an iterator out of a receiver. Returns messages till Empty is raised."""
+ try:
+ while True:
+ yield receiver.fetch(timeout=timeout)
+ except qm.Empty:
+ pass
+
def browse(session, queue, timeout=0, transform=lambda m: m.content):
"""Return a list with the contents of each message on queue."""
r = session.receiver("%s;{mode:browse}"%(queue))
r.capacity = 100
try:
- contents = []
- try:
- while True: contents.append(transform(r.fetch(timeout=timeout)))
- except qm.Empty: pass
- finally: r.close()
- return contents
+ return [transform(m) for m in receiver_iter(r, timeout)]
+ finally:
+ r.close()
def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"):
"""Assert that the contents of messages on queue (as retrieved