diff options
| author | Alan Conway <aconway@apache.org> | 2013-12-10 14:11:36 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-12-10 14:11:36 +0000 |
| commit | c52b4b966e4127aa2c965a4d29560b13cefcdc20 (patch) | |
| tree | 5cf018888147d382e6777649b0f396c1086e7aa7 /qpid/cpp/src/tests/ha_tests.py | |
| parent | 077619a135faf171ba85baf1c8acb250c93f2657 (diff) | |
| download | qpid-python-c52b4b966e4127aa2c965a4d29560b13cefcdc20.tar.gz | |
QPID-5404: HA broker message duplication when deleting a queue with an alt-exchange
The old code ran auto-delete on the backup on disconnect. This reroutes
messages onto the alt queue with incorrect replication IDs from the original
queue, and then replicates duplicate rerouted messages from the primary. The
solution is to process auto deletes on the new primary and let them replicate to
the backups.
- Move all auto-delete logic into QueueReplicator
- Primary process auto-delete on QueueReplicator as part of promotion.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1549844 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 112 |
1 files changed, 40 insertions, 72 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 138868f64e..1a5d6ddff8 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -34,6 +34,14 @@ log = getLogger(__name__) class HaBrokerTest(BrokerTest): """Base class for HA broker tests""" +def alt_setup(session, suffix): + # Create exchange to use as alternate and a queue bound to it. + # altex exchange: acts as alternate exchange + session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix)) + # altq queue bound to altex, collect re-routed messages. + session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix)) + + class ReplicationTests(HaBrokerTest): """Correctness tests for HA replication.""" @@ -718,19 +726,44 @@ acl deny all all except NotFound: pass assert not cluster[1].agent().getQueue("q") # Should not be in QMF - def alt_setup(self, session, suffix): - # Create exchange to use as alternate and a queue bound to it. - # altex exchange: acts as alternate exchange - session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix)) - # altq queue bound to altex, collect re-routed messages. - session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix)) + def test_auto_delete_failover(self): + """Test auto-delete queues. Verify that: + - queues auto-deleted on the primary are deleted on the backup. + - auto-delete queues with/without timeout are deleted after a failover. + - messages are correctly routed to the alternate exchange. + """ + cluster = HaCluster(self, 3) + s = cluster[0].connect().session() + def setup(q, timeout=""): + if timeout: timeout = ",arguments:{'qpid.auto_delete_timeout':%s}"%timeout + # Create alternate exchange, auto-delete queue and queue bound to alt. ex. + s.sender("%s-altex;{create:always,node:{type:topic,x-declare:{type:fanout}}}"%q) + qs = s.sender("%s;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:%s-altex%s}}}"%(q,q,timeout)) + s.sender("%s-altq;{create:always,node:{x-bindings:[{exchange:%s-altex,queue:%s-altq}]}}"%(q,q,q)) + qs.send(q) # Send a message to the auto-delete queue + return s + + for args in [("q1",""),("q2","0"),("q3","1"),("q4",""),("q5","")]: setup(*args) + receivers = [s.receiver("q%s"%i) for i in [1,2,3,4]] # Subscribe to queues + # Note q5 is never subscribed to, so should not be auto-deleted. + receivers[3].close() # Trigger auto-delete for q4 + cluster[0].kill(final=False) + cluster[2].promote() + cluster.restart(0) + cluster[2].assert_browse("q3",["q3"]) # Not yet auto-deleted, 1 sec timeout. + for i in [2,1,0]: + for q in ["q1", "q2", "q3","q4"]: + cluster[i].wait_no_queue(q,timeout=2) # auto-deleted + cluster[i].assert_browse_backup("%s-altq"%q, [q]) # Routed to alternate + cluster[i].assert_browse_backup("q5", ["q5"]) # Never subscribed, not deleted. + cluster[i].assert_browse_backup("q5-altq", []) def test_auto_delete_close(self): """Verify auto-delete queues are deleted on backup if auto-deleted on primary""" cluster=HaCluster(self, 2) p = cluster[0].connect().session() - self.alt_setup(p, "1") + alt_setup(p, "1") r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1) s = p.sender("adq1") for m in ["aa","bb","cc"]: s.send(m) @@ -742,71 +775,6 @@ acl deny all all cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"]) cluster[1].wait_queue("adq2") - def test_auto_delete_crash(self): - """Verify auto-delete queues are deleted on backup if the primary crashes""" - cluster=HaCluster(self, 2) - p = cluster[0].connect().session() - self.alt_setup(p,"1") - - # adq1 is subscribed so will be auto-deleted. - r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1) - s = p.sender("adq1") - for m in ["aa","bb","cc"]: s.send(m) - # adq2 is subscribed after cluster[2] starts. - p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}") - # adq3 is never subscribed. - p.sender("adq3;{create:always,node:{x-declare:{auto-delete:True}}}") - - cluster.start() - cluster[2].wait_status("ready") - - p.receiver("adq2") # Subscribed after cluster[2] joined - - for q in ["adq1","adq2","adq3","altq1"]: cluster[1].wait_queue(q) - for q in ["adq1","adq2","adq3","altq1"]: cluster[2].wait_queue(q) - cluster[0].kill() - - cluster[1].wait_no_queue("adq1") - cluster[1].wait_no_queue("adq2") - cluster[1].wait_queue("adq3") - - cluster[2].wait_no_queue("adq1") - cluster[2].wait_no_queue("adq2") - cluster[2].wait_queue("adq3") - - cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"]) - cluster[2].assert_browse_backup("altq1", ["aa","bb","cc"]) - - def test_auto_delete_timeout(self): - cluster = HaCluster(self, 2) - # Test timeout - r1 = cluster[0].connect().session().receiver("q1;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}") - # Test special case of timeout = 0 - r0 = cluster[0].connect().session().receiver("q0;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':0}}}}") - cluster[1].wait_queue("q0") - cluster[1].wait_queue("q1") - cluster[0].kill() - cluster[1].wait_queue("q1") # Not timed out yet - cluster[1].wait_no_queue("q1", timeout=5) # Wait for timeout - cluster[1].wait_no_queue("q0", timeout=5) # Wait for timeout - - def test_alt_exchange_dup(self): - """QPID-4349: if a queue has an alterante exchange and is deleted the - messages appear twice on the alternate, they are rerouted once by the - primary and again by the backup.""" - cluster = HaCluster(self,2) - - # Set up q with alternate exchange altex bound to altq. - s = cluster[0].connect().session() - s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}") - s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}") - snd = s.sender("q;{create:always,node:{x-declare:{alternate-exchange:'altex'}}}") - messages = [ str(n) for n in xrange(10) ] - for m in messages: snd.send(m) - cluster[1].assert_browse_backup("q", messages) - s.sender("q;{delete:always}").close() - cluster[1].assert_browse_backup("altq", messages) - def test_expired(self): """Regression test for QPID-4379: HA does not properly handle expired messages""" # Race between messages expiring and HA replicating consumer. |
