summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/misc.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/misc.py')
-rw-r--r--qpid/cpp/src/tests/misc.py119
1 files changed, 0 insertions, 119 deletions
diff --git a/qpid/cpp/src/tests/misc.py b/qpid/cpp/src/tests/misc.py
deleted file mode 100644
index 257fb9e754..0000000000
--- a/qpid/cpp/src/tests/misc.py
+++ /dev/null
@@ -1,119 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from qpid.tests.messaging.implementation import *
-from qpid.tests.messaging import VersionTest
-
-class MiscellaneousTests (VersionTest):
- """
- Tests for various aspects of qpidd behaviour
- """
- def test_exclusive(self):
- con = self.create_connection("amqp1.0", True)
- rcv = con.session().receiver("q; {create:always, node:{properties:{exclusive:True,auto-delete:True}}}")
-
- other = self.create_connection("amqp1.0", True)
- try:
- #can send to the queue
- snd = other.session().sender("q")
-
- #can browse the queue
- browser = other.session().receiver("q; {mode:browse}")
-
- #can't consume from the queue
- try:
- consumer = other.session().receiver("q")
- assert False, ("Should not be able to consume from exclusively owned queue")
- except LinkError, e: None
- try:
- exclusive = other.session().receiver("q; {create: always, node:{properties:{exclusive:True}}}")
- assert False, ("Should not be able to consume exclusively from exclusively owned queue")
- except LinkError, e: None
- finally:
- rcv.close()
- con.close()
- other.close()
-
-class AutoDeleteExchangeTests(VersionTest):
- def init_test(self, exchange_type="topic"):
- rcv = self.ssn.receiver("my-topic; {create:always, node:{type:topic, properties:{'exchange-type':%s, 'auto-delete':True}}}" % exchange_type)
- snd = self.ssn.sender("my-topic")
- #send some messages
- msgs = [Message(content=c) for c in ['a','b','c','d']]
- for m in msgs: snd.send(m)
-
- #verify receipt
- for expected in msgs:
- msg = rcv.fetch(0)
- assert msg.content == expected.content
- self.ssn.acknowledge(msg)
- return (rcv, snd)
-
- def on_rcv_detach_test(self, exchange_type="topic"):
- rcv, snd = self.init_test(exchange_type)
- rcv.close()
- #verify exchange is still there
- snd.send(Message(content="will be dropped"))
- snd.close()
- #now verify it is no longer there
- try:
- self.ssn.sender("my-topic")
- assert False, "Attempt to send to deleted exchange should fail"
- except MessagingError: None
-
- def on_snd_detach_test(self, exchange_type="topic"):
- rcv, snd = self.init_test(exchange_type)
- snd.close()
- #verify exchange is still there
- snd = self.ssn.sender("my-topic")
- snd.send(Message(content="will be dropped"))
- snd.close()
- rcv.close()
- #now verify it is no longer there
- try:
- self.ssn.sender("my-topic")
- assert False, "Attempt to send to deleted exchange should fail"
- except MessagingError: None
-
- def test_autodelete_fanout_exchange_on_rcv_detach(self):
- self.on_rcv_detach_test("fanout")
-
- def test_autodelete_fanout_exchange_on_snd_detach(self):
- self.on_snd_detach_test("fanout")
-
- def test_autodelete_direct_exchange_on_rcv_detach(self):
- self.on_rcv_detach_test("direct")
-
- def test_autodelete_direct_exchange_on_snd_detach(self):
- self.on_snd_detach_test("direct")
-
- def test_autodelete_topic_exchange_on_rcv_detach(self):
- self.on_rcv_detach_test("topic")
-
- def test_autodelete_topic_exchange_on_snd_detach(self):
- self.on_snd_detach_test("topic")
-
- def test_autodelete_headers_exchange_on_rcv_detach(self):
- self.on_rcv_detach_test("headers")
-
- def test_autodelete_headers_exchange_on_snd_detach(self):
- self.on_snd_detach_test("headers")
-
-
-