diff options
Diffstat (limited to 'qpid/cpp/src/tests/misc.py')
| -rw-r--r-- | qpid/cpp/src/tests/misc.py | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/misc.py b/qpid/cpp/src/tests/misc.py index 108120f321..257fb9e754 100644 --- a/qpid/cpp/src/tests/misc.py +++ b/qpid/cpp/src/tests/misc.py @@ -50,3 +50,70 @@ class MiscellaneousTests (VersionTest): con.close() other.close() +class AutoDeleteExchangeTests(VersionTest): + def init_test(self, exchange_type="topic"): + rcv = self.ssn.receiver("my-topic; {create:always, node:{type:topic, properties:{'exchange-type':%s, 'auto-delete':True}}}" % exchange_type) + snd = self.ssn.sender("my-topic") + #send some messages + msgs = [Message(content=c) for c in ['a','b','c','d']] + for m in msgs: snd.send(m) + + #verify receipt + for expected in msgs: + msg = rcv.fetch(0) + assert msg.content == expected.content + self.ssn.acknowledge(msg) + return (rcv, snd) + + def on_rcv_detach_test(self, exchange_type="topic"): + rcv, snd = self.init_test(exchange_type) + rcv.close() + #verify exchange is still there + snd.send(Message(content="will be dropped")) + snd.close() + #now verify it is no longer there + try: + self.ssn.sender("my-topic") + assert False, "Attempt to send to deleted exchange should fail" + except MessagingError: None + + def on_snd_detach_test(self, exchange_type="topic"): + rcv, snd = self.init_test(exchange_type) + snd.close() + #verify exchange is still there + snd = self.ssn.sender("my-topic") + snd.send(Message(content="will be dropped")) + snd.close() + rcv.close() + #now verify it is no longer there + try: + self.ssn.sender("my-topic") + assert False, "Attempt to send to deleted exchange should fail" + except MessagingError: None + + def test_autodelete_fanout_exchange_on_rcv_detach(self): + self.on_rcv_detach_test("fanout") + + def test_autodelete_fanout_exchange_on_snd_detach(self): + self.on_snd_detach_test("fanout") + + def test_autodelete_direct_exchange_on_rcv_detach(self): + self.on_rcv_detach_test("direct") + + def test_autodelete_direct_exchange_on_snd_detach(self): + self.on_snd_detach_test("direct") + + def test_autodelete_topic_exchange_on_rcv_detach(self): + self.on_rcv_detach_test("topic") + + def test_autodelete_topic_exchange_on_snd_detach(self): + self.on_snd_detach_test("topic") + + def test_autodelete_headers_exchange_on_rcv_detach(self): + self.on_rcv_detach_test("headers") + + def test_autodelete_headers_exchange_on_snd_detach(self): + self.on_snd_detach_test("headers") + + + |
