diff options
| author | Alan Conway <aconway@apache.org> | 2014-08-08 09:24:15 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2014-08-08 09:24:15 +0000 |
| commit | 2602ecaf16a3ddf424383214da2ea846634c083f (patch) | |
| tree | cbe7e6a423e2d521c2ebce63a479f2a4e3074ae9 /qpid/cpp/src/tests/brokertest.py | |
| parent | a833f714a4de983bce8fb1c2f6b87070bd3b4309 (diff) | |
| download | qpid-python-2602ecaf16a3ddf424383214da2ea846634c083f.tar.gz | |
QPID-5966: HA mixing tx enqueue and non-tx dequeue leaves extra messages on backup.
There were several problems:
1. Positions of transactionally enqueued messages not known to QueueReplicator, so not dequeued
on backup if dequeued outside a TX on primary.
2. Race condition if tx created immediately after queue could cause duplication of TX message.
3. Replication IDs were not being set during recovery from store (regression, store change?)
Fix:
1. Update positions QueueReplicator positions via QueueObserver::enqueued to see all enqueues.
2. Check for duplicate replication-ids on backup in QueueReplicator::route.
3. Set replication-id in publish() if not already set in record().
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616704 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/brokertest.py')
| -rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 17 |
1 files changed, 11 insertions, 6 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 44824fe67e..461ef0de9a 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -428,17 +428,22 @@ class Broker(Popen): assert not error.search(line) or ignore.search(line), "Errors in log file %s: %s"%(log, line) finally: log.close() +def receiver_iter(receiver, timeout=0): + """Make an iterator out of a receiver. Returns messages till Empty is raised.""" + try: + while True: + yield receiver.fetch(timeout=timeout) + except qm.Empty: + pass + def browse(session, queue, timeout=0, transform=lambda m: m.content): """Return a list with the contents of each message on queue.""" r = session.receiver("%s;{mode:browse}"%(queue)) r.capacity = 100 try: - contents = [] - try: - while True: contents.append(transform(r.fetch(timeout=timeout))) - except qm.Empty: pass - finally: r.close() - return contents + return [transform(m) for m in receiver_iter(r, timeout)] + finally: + r.close() def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"): """Assert that the contents of messages on queue (as retrieved |
