diff options
| author | Gordon Sim <gsim@apache.org> | 2013-11-12 13:42:36 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-11-12 13:42:36 +0000 |
| commit | b955a41e69f69a1ada69c780d9fb7260c0bfc3f2 (patch) | |
| tree | 4adb557c6b9325233acf2b48961e174325e28424 /qpid/cpp/src/tests/misc.py | |
| parent | 50595b9c333b08327462fa3693a5ba4cdced76e6 (diff) | |
| download | qpid-python-b955a41e69f69a1ada69c780d9fb7260c0bfc3f2.tar.gz | |
QPID-5301: support autodeleted exchanges
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1541058 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/misc.py')
| -rw-r--r-- | qpid/cpp/src/tests/misc.py | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/misc.py b/qpid/cpp/src/tests/misc.py index 108120f321..257fb9e754 100644 --- a/qpid/cpp/src/tests/misc.py +++ b/qpid/cpp/src/tests/misc.py @@ -50,3 +50,70 @@ class MiscellaneousTests (VersionTest): con.close() other.close() +class AutoDeleteExchangeTests(VersionTest): + def init_test(self, exchange_type="topic"): + rcv = self.ssn.receiver("my-topic; {create:always, node:{type:topic, properties:{'exchange-type':%s, 'auto-delete':True}}}" % exchange_type) + snd = self.ssn.sender("my-topic") + #send some messages + msgs = [Message(content=c) for c in ['a','b','c','d']] + for m in msgs: snd.send(m) + + #verify receipt + for expected in msgs: + msg = rcv.fetch(0) + assert msg.content == expected.content + self.ssn.acknowledge(msg) + return (rcv, snd) + + def on_rcv_detach_test(self, exchange_type="topic"): + rcv, snd = self.init_test(exchange_type) + rcv.close() + #verify exchange is still there + snd.send(Message(content="will be dropped")) + snd.close() + #now verify it is no longer there + try: + self.ssn.sender("my-topic") + assert False, "Attempt to send to deleted exchange should fail" + except MessagingError: None + + def on_snd_detach_test(self, exchange_type="topic"): + rcv, snd = self.init_test(exchange_type) + snd.close() + #verify exchange is still there + snd = self.ssn.sender("my-topic") + snd.send(Message(content="will be dropped")) + snd.close() + rcv.close() + #now verify it is no longer there + try: + self.ssn.sender("my-topic") + assert False, "Attempt to send to deleted exchange should fail" + except MessagingError: None + + def test_autodelete_fanout_exchange_on_rcv_detach(self): + self.on_rcv_detach_test("fanout") + + def test_autodelete_fanout_exchange_on_snd_detach(self): + self.on_snd_detach_test("fanout") + + def test_autodelete_direct_exchange_on_rcv_detach(self): + self.on_rcv_detach_test("direct") + + def test_autodelete_direct_exchange_on_snd_detach(self): + self.on_snd_detach_test("direct") + + def test_autodelete_topic_exchange_on_rcv_detach(self): + self.on_rcv_detach_test("topic") + + def test_autodelete_topic_exchange_on_snd_detach(self): + self.on_snd_detach_test("topic") + + def test_autodelete_headers_exchange_on_rcv_detach(self): + self.on_rcv_detach_test("headers") + + def test_autodelete_headers_exchange_on_snd_detach(self): + self.on_snd_detach_test("headers") + + + |
