summaryrefslogtreecommitdiff
path: root/qpid/tests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/tests')
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py27
1 files changed, 24 insertions, 3 deletions
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py b/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py
index f51923fcf3..a0f0684bc8 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py
@@ -226,8 +226,6 @@ class DefaultExchangeRuleTests(TestHelper, StandardExchangeVerifier):
# Test automatic binding by queue name.
self.queue_declare(queue="d")
self.assertPublishConsume(queue="d", routing_key="d")
- # Test explicit bind to default queue
- self.verifyDirectExchange("")
# TODO aconway 2006-09-27: Fill in empty tests:
@@ -460,7 +458,30 @@ class MiscellaneousErrorsTests(TestHelper):
self.fail("Expected 530 for redeclaration of exchange with different type.")
except SessionException, e:
self.assertEquals(530, e.args[0].error_code)
-
+
+ def testDefaultAccessBind(self):
+ try:
+ self.session.queue_declare(queue="my-queue", auto_delete=True, exclusive=True)
+ self.session.exchange_bind(exchange="", queue="my-queue", binding_key="another-key")
+ self.fail("Expected 530 (not-allowed) code for bind to default exchange.")
+ except SessionException, e:
+ self.assertEquals(530, e.args[0].error_code)
+
+ def testDefaultAccessUnbind(self):
+ try:
+ self.session.queue_declare(queue="my-queue", auto_delete=True, exclusive=True)
+ self.session.exchange_unbind(exchange="", queue="my-queue", binding_key="my-queue")
+ self.fail("Expected 530 (not-allowed) code for unbind from default exchange.")
+ except SessionException, e:
+ self.assertEquals(530, e.args[0].error_code)
+
+ def testDefaultAccessDelete(self):
+ try:
+ self.session.exchange_delete(exchange="")
+ self.fail("Expected 530 (not-allowed) code for delete of default exchange.")
+ except SessionException, e:
+ self.assertEquals(530, e.args[0].error_code)
+
class ExchangeTests(TestHelper):
def testHeadersBindNoMatchArg(self):
self.session.queue_declare(queue="q", exclusive=True, auto_delete=True)