diff options
author | Gordon Sim <gsim@apache.org> | 2011-03-03 12:45:25 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2011-03-03 12:45:25 +0000 |
commit | f59f54a3b6b58af62e5f81c06410aaba2f06ad47 (patch) | |
tree | 65d1d18a755edd5566a0c0232afe9ab294db5ac8 /tests | |
parent | 9194c6e4e67a5559b4577c5b788aa0cbfcf9a3f4 (diff) | |
download | qpid-python-f59f54a3b6b58af62e5f81c06410aaba2f06ad47.tar.gz |
QPID-3107: If queue's alternate-exchange can't route message, try that exchange's alternate-exchange
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1076604 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'tests')
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py | 125 |
1 files changed, 117 insertions, 8 deletions
diff --git a/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py b/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py index 0ffeb57172..8cbb5793d9 100644 --- a/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py +++ b/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py @@ -18,7 +18,7 @@ # import traceback from qpid.queue import Empty -from qpid.datatypes import Message +from qpid.datatypes import Message, RangedSet from qpid.testlib import TestBase010 from qpid.session import SessionException @@ -77,13 +77,7 @@ class AlternateExchangeTests(TestBase010): """ session = self.session #set up a 'dead letter queue': - session.exchange_declare(exchange="dlq", type="fanout") - session.queue_declare(queue="deleted", exclusive=True, auto_delete=True) - session.exchange_bind(exchange="dlq", queue="deleted") - session.message_subscribe(destination="dlq", queue="deleted") - session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - dlq = session.incoming("dlq") + dlq = self.setup_dlq() #create a queue using the dlq as its alternate exchange: session.queue_declare(queue="delete-me", alternate_exchange="dlq") @@ -236,6 +230,121 @@ class AlternateExchangeTests(TestBase010): self.assertEqual("Three", dlq.get(timeout=1).body) self.assertEmpty(dlq) + def test_queue_delete_loop(self): + """ + Test that if a queue is bound to its own alternate exchange, + then on deletion there is no infinite looping + """ + session = self.session + dlq = self.setup_dlq() + + #create a queue using the dlq as its alternate exchange: + session.queue_declare(queue="delete-me", alternate_exchange="dlq") + #bind that queue to the dlq as well: + session.exchange_bind(exchange="dlq", queue="delete-me") + #send it some messages: + dp=self.session.delivery_properties(routing_key="delete-me") + for m in ["One", "Two", "Three"]: + session.message_transfer(message=Message(dp, m)) + #delete it: + session.queue_delete(queue="delete-me") + #cleanup: + session.exchange_delete(exchange="dlq") + + #check the messages were delivered to the dlq: + for m in ["One", "Two", "Three"]: + self.assertEqual(m, dlq.get(timeout=1).body) + self.assertEmpty(dlq) + + def test_queue_delete_no_match(self): + """ + Test that on queue deletion, if the queues own alternate + exchange cannot find a match for the message, the + alternate-exchange of that exchange will be tried. Note: + though the spec rules out going to the alternate-exchanges + alternate exchange when sending to an exchange, it does not + cover this case. + """ + session = self.session + dlq = self.setup_dlq() + + #setu up an 'intermediary' exchange + session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq") + + #create a queue using the intermediary as its alternate exchange: + session.queue_declare(queue="delete-me", alternate_exchange="my-exchange") + #bind that queue to the dlq as well: + session.exchange_bind(exchange="dlq", queue="delete-me") + #send it some messages: + dp=self.session.delivery_properties(routing_key="delete-me") + for m in ["One", "Two", "Three"]: + session.message_transfer(message=Message(dp, m)) + + #delete it: + session.queue_delete(queue="delete-me") + #cleanup: + session.exchange_delete(exchange="my-exchange") + session.exchange_delete(exchange="dlq") + + #check the messages were delivered to the dlq: + for m in ["One", "Two", "Three"]: + self.assertEqual(m, dlq.get(timeout=1).body) + self.assertEmpty(dlq) + + def test_reject_no_match(self): + """ + Test that on rejecting a message, if the queues own alternate + exchange cannot find a match for the message, the + alternate-exchange of that exchange will be tried. Note: + though the spec rules out going to the alternate-exchanges + alternate exchange when sending to an exchange, it does not + cover this case. + """ + session = self.session + dlq = self.setup_dlq() + + #setu up an 'intermediary' exchange + session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq") + + #create a queue using the intermediary as its alternate exchange: + session.queue_declare(queue="delivery-queue", alternate_exchange="my-exchange", auto_delete=True) + #bind that queue to the dlq as well: + session.exchange_bind(exchange="dlq", queue="delivery-queue") + #send it some messages: + dp=self.session.delivery_properties(routing_key="delivery-queue") + for m in ["One", "Two", "Three"]: + session.message_transfer(message=Message(dp, m)) + + #get and reject those messages: + session.message_subscribe(destination="a", queue="delivery-queue") + session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + incoming = session.incoming("a") + for m in ["One", "Two", "Three"]: + msg = incoming.get(timeout=1) + self.assertEqual(m, msg.body) + session.message_reject(RangedSet(msg.id)) + session.message_cancel(destination="a") + + #check the messages were delivered to the dlq: + for m in ["One", "Two", "Three"]: + self.assertEqual(m, dlq.get(timeout=1).body) + self.assertEmpty(dlq) + #cleanup: + session.exchange_delete(exchange="my-exchange") + session.exchange_delete(exchange="dlq") + + def setup_dlq(self): + session = self.session + #set up 'dead-letter' handling: + session.exchange_declare(exchange="dlq", type="fanout") + session.queue_declare(queue="deleted", exclusive=True, auto_delete=True) + session.exchange_bind(exchange="dlq", queue="deleted") + session.message_subscribe(destination="dlq", queue="deleted") + session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + dlq = session.incoming("dlq") + return dlq def assertEmpty(self, queue): try: |