diff options
author | Ted Ross <tross@apache.org> | 2012-02-02 17:14:51 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2012-02-02 17:14:51 +0000 |
commit | 675ecbf53fb51ebe9f124568a800e1c33642e4a1 (patch) | |
tree | 01eddec70eb51d6ef97b382e328be0c22a1ac467 | |
parent | e010c8adf575dd98b1ca0f579b9c7740124c75b6 (diff) | |
download | qpid-python-675ecbf53fb51ebe9f124568a800e1c33642e4a1.tar.gz |
QPID-3481 - After queue deletion, route re-queued messages to the alternate exchange.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1239728 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 29 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/__init__.py | 1 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/new_api.py | 124 |
3 files changed, 145 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 329fd1cb8c..dd23760922 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -213,15 +213,26 @@ void Queue::requeue(const QueuedMessage& msg){ { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return; - messages->reinsert(msg); - listeners.populate(copy); - - // for persistLastNode - don't force a message twice to disk, but force it if no force before - if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) { - msg.payload->forcePersistent(); - if (msg.payload->isForcedPersistent() ){ - boost::intrusive_ptr<Message> payload = msg.payload; - enqueue(0, payload); + if (deleted) { + // + // If the queue has been deleted, requeued messages must be sent to the alternate exchange + // if one is configured. + // + if (alternateExchange.get()) { + DeliverableMessage dmsg(msg.payload); + alternateExchange->routeWithAlternate(dmsg); + } + } else { + messages->reinsert(msg); + listeners.populate(copy); + + // for persistLastNode - don't force a message twice to disk, but force it if no force before + if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) { + msg.payload->forcePersistent(); + if (msg.payload->isForcedPersistent() ){ + boost::intrusive_ptr<Message> payload = msg.payload; + enqueue(0, payload); + } } } observeRequeue(msg, locker); diff --git a/tests/src/py/qpid_tests/broker_0_10/__init__.py b/tests/src/py/qpid_tests/broker_0_10/__init__.py index 7b779df5f4..5b1964ae98 100644 --- a/tests/src/py/qpid_tests/broker_0_10/__init__.py +++ b/tests/src/py/qpid_tests/broker_0_10/__init__.py @@ -34,3 +34,4 @@ from priority import * from threshold import * from extensions import * from msg_groups import * +from new_api import * diff --git a/tests/src/py/qpid_tests/broker_0_10/new_api.py b/tests/src/py/qpid_tests/broker_0_10/new_api.py new file mode 100644 index 0000000000..3b924e9985 --- /dev/null +++ b/tests/src/py/qpid_tests/broker_0_10/new_api.py @@ -0,0 +1,124 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.messaging import * +from qpid.tests.messaging import Base +import qmf.console +from time import sleep + +# +# Tests the Broker's support for message groups +# + +class GeneralTests(Base): + """ + Tests of the API and broker via the new API. + """ + + def assertEqual(self, left, right, text=None): + if not left == right: + print "assertEqual failure: %r != %r" % (left, right) + if text: + print " %r" % text + assert None + + def fail(self, text=None): + if text: + print "Fail: %r" % text + assert None + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self): + return self.conn.session() + + def test_qpid_3481_acquired_to_alt_exchange(self): + """ + Verify that acquired messages are routed to the alternate when the queue is deleted. + """ + sess1 = self.setup_session() + sess2 = self.setup_session() + + tx = sess1.sender("amq.direct/key") + rx_main = sess1.receiver("amq.direct/key;{link:{x-declare:{alternate-exchange:'amq.fanout'}}}") + rx_alt = sess2.receiver("amq.fanout") + rx_alt.capacity = 10 + + tx.send("DATA") + tx.send("DATA") + tx.send("DATA") + tx.send("DATA") + tx.send("DATA") + + msg = rx_main.fetch() + msg = rx_main.fetch() + msg = rx_main.fetch() + + self.assertEqual(rx_alt.available(), 0, "No messages should have been routed to the alt_exchange") + + sess1.close() + + self.assertEqual(rx_alt.available(), 5, "All 5 messages should have been routed to the alt_exchange") + + sess2.close() + + def test_qpid_3481_acquired_to_alt_exchange_2_consumers(self): + """ + Verify that acquired messages are routed to the alternate when the queue is deleted. + """ + sess1 = self.setup_session() + sess2 = self.setup_session() + sess3 = self.setup_session() + sess4 = self.setup_session() + + tx = sess1.sender("test_acquired;{create:always,delete:always,node:{x-declare:{alternate-exchange:'amq.fanout'}}}") + rx_main1 = sess2.receiver("test_acquired") + rx_main2 = sess3.receiver("test_acquired") + rx_alt = sess4.receiver("amq.fanout") + rx_alt.capacity = 10 + + tx.send("DATA") + tx.send("DATA") + tx.send("DATA") + tx.send("DATA") + tx.send("DATA") + + msg = rx_main1.fetch() + msg = rx_main1.fetch() + msg = rx_main1.fetch() + + self.assertEqual(rx_alt.available(), 0, "No messages should have been routed to the alt_exchange") + + # Close sess1; This will cause the queue to be deleted + sess1.close() + self.assertEqual(rx_alt.available(), 2, "2 of the messages should have been routed to the alt_exchange") + + # Close sess2; This will cause the acquired messages to be requeued and routed to the alternate + sess2.close() + for i in range(5): + try: + m = rx_alt.fetch(0) + except: + self.fail("failed to receive all 5 messages via alternate exchange") + + sess3.close() + self.assertEqual(rx_alt.available(), 0, "No further messages should be received via the alternate exchange") + + sess4.close() |