summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/misc.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/misc.py')
-rw-r--r--qpid/cpp/src/tests/misc.py67
1 files changed, 67 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/misc.py b/qpid/cpp/src/tests/misc.py
index 108120f321..257fb9e754 100644
--- a/qpid/cpp/src/tests/misc.py
+++ b/qpid/cpp/src/tests/misc.py
@@ -50,3 +50,70 @@ class MiscellaneousTests (VersionTest):
con.close()
other.close()
+class AutoDeleteExchangeTests(VersionTest):
+ def init_test(self, exchange_type="topic"):
+ rcv = self.ssn.receiver("my-topic; {create:always, node:{type:topic, properties:{'exchange-type':%s, 'auto-delete':True}}}" % exchange_type)
+ snd = self.ssn.sender("my-topic")
+ #send some messages
+ msgs = [Message(content=c) for c in ['a','b','c','d']]
+ for m in msgs: snd.send(m)
+
+ #verify receipt
+ for expected in msgs:
+ msg = rcv.fetch(0)
+ assert msg.content == expected.content
+ self.ssn.acknowledge(msg)
+ return (rcv, snd)
+
+ def on_rcv_detach_test(self, exchange_type="topic"):
+ rcv, snd = self.init_test(exchange_type)
+ rcv.close()
+ #verify exchange is still there
+ snd.send(Message(content="will be dropped"))
+ snd.close()
+ #now verify it is no longer there
+ try:
+ self.ssn.sender("my-topic")
+ assert False, "Attempt to send to deleted exchange should fail"
+ except MessagingError: None
+
+ def on_snd_detach_test(self, exchange_type="topic"):
+ rcv, snd = self.init_test(exchange_type)
+ snd.close()
+ #verify exchange is still there
+ snd = self.ssn.sender("my-topic")
+ snd.send(Message(content="will be dropped"))
+ snd.close()
+ rcv.close()
+ #now verify it is no longer there
+ try:
+ self.ssn.sender("my-topic")
+ assert False, "Attempt to send to deleted exchange should fail"
+ except MessagingError: None
+
+ def test_autodelete_fanout_exchange_on_rcv_detach(self):
+ self.on_rcv_detach_test("fanout")
+
+ def test_autodelete_fanout_exchange_on_snd_detach(self):
+ self.on_snd_detach_test("fanout")
+
+ def test_autodelete_direct_exchange_on_rcv_detach(self):
+ self.on_rcv_detach_test("direct")
+
+ def test_autodelete_direct_exchange_on_snd_detach(self):
+ self.on_snd_detach_test("direct")
+
+ def test_autodelete_topic_exchange_on_rcv_detach(self):
+ self.on_rcv_detach_test("topic")
+
+ def test_autodelete_topic_exchange_on_snd_detach(self):
+ self.on_snd_detach_test("topic")
+
+ def test_autodelete_headers_exchange_on_rcv_detach(self):
+ self.on_rcv_detach_test("headers")
+
+ def test_autodelete_headers_exchange_on_snd_detach(self):
+ self.on_snd_detach_test("headers")
+
+
+