summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-05-01 13:10:58 +0000
committerGordon Sim <gsim@apache.org>2008-05-01 13:10:58 +0000
commit657c6071b11a0319f78dcb96ef2978733c93c1a2 (patch)
tree8819d3095df0fbbcad38635e2026ca55a54d4659 /qpid/python
parent5fe0458b26fdf5ec233d8181201d3673a15006ae (diff)
downloadqpid-python-657c6071b11a0319f78dcb96ef2978733c93c1a2.tar.gz
Remove preview tests (no longer required)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@652506 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/tests_0-10_preview/__init__.py33
-rw-r--r--qpid/python/tests_0-10_preview/alternate_exchange.py179
-rw-r--r--qpid/python/tests_0-10_preview/broker.py111
-rw-r--r--qpid/python/tests_0-10_preview/dtx.py645
-rw-r--r--qpid/python/tests_0-10_preview/example.py95
-rw-r--r--qpid/python/tests_0-10_preview/exchange.py335
-rw-r--r--qpid/python/tests_0-10_preview/execution.py29
-rw-r--r--qpid/python/tests_0-10_preview/message.py834
-rw-r--r--qpid/python/tests_0-10_preview/persistence.py67
-rw-r--r--qpid/python/tests_0-10_preview/query.py227
-rw-r--r--qpid/python/tests_0-10_preview/queue.py338
-rw-r--r--qpid/python/tests_0-10_preview/testlib.py66
-rw-r--r--qpid/python/tests_0-10_preview/tx.py231
13 files changed, 0 insertions, 3190 deletions
diff --git a/qpid/python/tests_0-10_preview/__init__.py b/qpid/python/tests_0-10_preview/__init__.py
deleted file mode 100644
index f0acf9c632..0000000000
--- a/qpid/python/tests_0-10_preview/__init__.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# Do not delete - marks this directory as a python package.
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from alternate_exchange import *
-from broker import *
-from dtx import *
-from example import *
-from exchange import *
-from execution import *
-from management import *
-from message import *
-from query import *
-from queue import *
-from testlib import *
-from tx import *
diff --git a/qpid/python/tests_0-10_preview/alternate_exchange.py b/qpid/python/tests_0-10_preview/alternate_exchange.py
deleted file mode 100644
index 83f8d85811..0000000000
--- a/qpid/python/tests_0-10_preview/alternate_exchange.py
+++ /dev/null
@@ -1,179 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class AlternateExchangeTests(TestBase):
- """
- Tests for the new mechanism for message returns introduced in 0-10
- and available in 0-9 for preview
- """
-
- def test_unroutable(self):
- """
- Test that unroutable messages are delivered to the alternate-exchange if specified
- """
- channel = self.channel
- #create an exchange with an alternate defined
- channel.exchange_declare(exchange="secondary", type="fanout")
- channel.exchange_declare(exchange="primary", type="direct", alternate_exchange="secondary")
-
- #declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages
- channel.queue_declare(queue="returns", exclusive=True, auto_delete=True)
- channel.queue_bind(queue="returns", exchange="secondary")
- self.subscribe(destination="a", queue="returns")
- returned = self.client.queue("a")
-
- #declare, bind (to the primary exchange) and consume from a queue for 'processed' messages
- channel.queue_declare(queue="processed", exclusive=True, auto_delete=True)
- channel.queue_bind(queue="processed", exchange="primary", routing_key="my-key")
- self.subscribe(destination="b", queue="processed")
- processed = self.client.queue("b")
-
- #publish to the primary exchange
- #...one message that makes it to the 'processed' queue:
- channel.message_transfer(destination="primary", content=Content("Good", properties={'routing_key':"my-key"}))
- #...and one that does not:
- channel.message_transfer(destination="primary", content=Content("Bad", properties={'routing_key':"unused-key"}))
-
- #delete the exchanges
- channel.exchange_delete(exchange="primary")
- channel.exchange_delete(exchange="secondary")
-
- #verify behaviour
- self.assertEqual("Good", processed.get(timeout=1).content.body)
- self.assertEqual("Bad", returned.get(timeout=1).content.body)
- self.assertEmpty(processed)
- self.assertEmpty(returned)
-
- def test_queue_delete(self):
- """
- Test that messages in a queue being deleted are delivered to the alternate-exchange if specified
- """
- channel = self.channel
- #set up a 'dead letter queue':
- channel.exchange_declare(exchange="dlq", type="fanout")
- channel.queue_declare(queue="deleted", exclusive=True, auto_delete=True)
- channel.queue_bind(exchange="dlq", queue="deleted")
- self.subscribe(destination="dlq", queue="deleted")
- dlq = self.client.queue("dlq")
-
- #create a queue using the dlq as its alternate exchange:
- channel.queue_declare(queue="delete-me", alternate_exchange="dlq")
- #send it some messages:
- channel.message_transfer(content=Content("One", properties={'routing_key':"delete-me"}))
- channel.message_transfer(content=Content("Two", properties={'routing_key':"delete-me"}))
- channel.message_transfer(content=Content("Three", properties={'routing_key':"delete-me"}))
- #delete it:
- channel.queue_delete(queue="delete-me")
- #delete the dlq exchange:
- channel.exchange_delete(exchange="dlq")
-
- #check the messages were delivered to the dlq:
- self.assertEqual("One", dlq.get(timeout=1).content.body)
- self.assertEqual("Two", dlq.get(timeout=1).content.body)
- self.assertEqual("Three", dlq.get(timeout=1).content.body)
- self.assertEmpty(dlq)
-
-
- def test_immediate(self):
- """
- Test that messages in a queue being deleted are delivered to the alternate-exchange if specified
- """
- channel = self.channel
- #set up a 'dead letter queue':
- channel.exchange_declare(exchange="dlq", type="fanout")
- channel.queue_declare(queue="immediate", exclusive=True, auto_delete=True)
- channel.queue_bind(exchange="dlq", queue="immediate")
- self.subscribe(destination="dlq", queue="immediate")
- dlq = self.client.queue("dlq")
-
- #create a queue using the dlq as its alternate exchange:
- channel.queue_declare(queue="no-consumers", alternate_exchange="dlq", exclusive=True, auto_delete=True)
- #send it some messages:
- #TODO: WE HAVE LOST THE IMMEDIATE FLAG; FIX THIS ONCE ITS BACK
- channel.message_transfer(content=Content("no one wants me", properties={'routing_key':"no-consumers"}))
-
- #check the messages were delivered to the dlq:
- self.assertEqual("no one wants me", dlq.get(timeout=1).content.body)
- self.assertEmpty(dlq)
-
- #cleanup:
- channel.queue_delete(queue="no-consumers")
- channel.exchange_delete(exchange="dlq")
-
-
- def test_delete_while_used_by_queue(self):
- """
- Ensure an exchange still in use as an alternate-exchange for a
- queue can't be deleted
- """
- channel = self.channel
- channel.exchange_declare(exchange="alternate", type="fanout")
- channel.queue_declare(queue="q", exclusive=True, auto_delete=True, alternate_exchange="alternate")
- try:
- channel.exchange_delete(exchange="alternate")
- self.fail("Expected deletion of in-use alternate-exchange to fail")
- except Closed, e:
- #cleanup:
- other = self.connect()
- channel = other.channel(1)
- channel.session_open()
- channel.exchange_delete(exchange="alternate")
- channel.session_close()
- other.close()
-
- self.assertConnectionException(530, e.args[0])
-
-
-
- def test_delete_while_used_by_exchange(self):
- """
- Ensure an exchange still in use as an alternate-exchange for
- another exchange can't be deleted
- """
- channel = self.channel
- channel.exchange_declare(exchange="alternate", type="fanout")
- channel.exchange_declare(exchange="e", type="fanout", alternate_exchange="alternate")
- try:
- channel.exchange_delete(exchange="alternate")
- #cleanup:
- channel.exchange_delete(exchange="e")
- self.fail("Expected deletion of in-use alternate-exchange to fail")
- except Closed, e:
- #cleanup:
- other = self.connect()
- channel = other.channel(1)
- channel.session_open()
- channel.exchange_delete(exchange="e")
- channel.exchange_delete(exchange="alternate")
- channel.session_close()
- other.close()
-
- self.assertConnectionException(530, e.args[0])
-
-
- def assertEmpty(self, queue):
- try:
- msg = queue.get(timeout=1)
- self.fail("Queue not empty: " + msg)
- except Empty: None
-
diff --git a/qpid/python/tests_0-10_preview/broker.py b/qpid/python/tests_0-10_preview/broker.py
deleted file mode 100644
index 99936ba742..0000000000
--- a/qpid/python/tests_0-10_preview/broker.py
+++ /dev/null
@@ -1,111 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class BrokerTests(TestBase):
- """Tests for basic Broker functionality"""
-
- def test_ack_and_no_ack(self):
- """
- First, this test tries to receive a message with a no-ack
- consumer. Second, this test tries to explicitly receive and
- acknowledge a message with an acknowledging consumer.
- """
- ch = self.channel
- self.queue_declare(ch, queue = "myqueue")
-
- # No ack consumer
- ctag = "tag1"
- self.subscribe(ch, queue = "myqueue", destination = ctag)
- body = "test no-ack"
- ch.message_transfer(content = Content(body, properties = {"routing_key" : "myqueue"}))
- msg = self.client.queue(ctag).get(timeout = 5)
- self.assert_(msg.content.body == body)
-
- # Acknowledging consumer
- self.queue_declare(ch, queue = "otherqueue")
- ctag = "tag2"
- self.subscribe(ch, queue = "otherqueue", destination = ctag, confirm_mode = 1)
- ch.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF)
- ch.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF)
- body = "test ack"
- ch.message_transfer(content = Content(body, properties = {"routing_key" : "otherqueue"}))
- msg = self.client.queue(ctag).get(timeout = 5)
- msg.complete()
- self.assert_(msg.content.body == body)
-
- def test_simple_delivery_immediate(self):
- """
- Test simple message delivery where consume is issued before publish
- """
- channel = self.channel
- self.exchange_declare(channel, exchange="test-exchange", type="direct")
- self.queue_declare(channel, queue="test-queue")
- channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
- consumer_tag = "tag1"
- self.subscribe(queue="test-queue", destination=consumer_tag)
- queue = self.client.queue(consumer_tag)
-
- body = "Immediate Delivery"
- channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"}))
- msg = queue.get(timeout=5)
- self.assert_(msg.content.body == body)
-
- # TODO: Ensure we fail if immediate=True and there's no consumer.
-
-
- def test_simple_delivery_queued(self):
- """
- Test basic message delivery where publish is issued before consume
- (i.e. requires queueing of the message)
- """
- channel = self.channel
- self.exchange_declare(channel, exchange="test-exchange", type="direct")
- self.queue_declare(channel, queue="test-queue")
- channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
- body = "Queued Delivery"
- channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"}))
-
- consumer_tag = "tag1"
- self.subscribe(queue="test-queue", destination=consumer_tag)
- queue = self.client.queue(consumer_tag)
- msg = queue.get(timeout=5)
- self.assert_(msg.content.body == body)
-
- def test_invalid_channel(self):
- channel = self.client.channel(200)
- try:
- channel.queue_declare(exclusive=True)
- self.fail("Expected error on queue_declare for invalid channel")
- except Closed, e:
- self.assertConnectionException(504, e.args[0])
-
- def test_closed_channel(self):
- channel = self.client.channel(200)
- channel.session_open()
- channel.session_close()
- try:
- channel.queue_declare(exclusive=True)
- self.fail("Expected error on queue_declare for closed channel")
- except Closed, e:
- if isinstance(e.args[0], str): self.fail(e)
- self.assertConnectionException(504, e.args[0])
diff --git a/qpid/python/tests_0-10_preview/dtx.py b/qpid/python/tests_0-10_preview/dtx.py
deleted file mode 100644
index f84f91c75a..0000000000
--- a/qpid/python/tests_0-10_preview/dtx.py
+++ /dev/null
@@ -1,645 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-from struct import pack, unpack
-from time import sleep
-
-class DtxTests(TestBase):
- """
- Tests for the amqp dtx related classes.
-
- Tests of the form test_simple_xxx test the basic transactional
- behaviour. The approach here is to 'swap' a message from one queue
- to another by consuming and re-publishing in the same
- transaction. That transaction is then completed in different ways
- and the appropriate result verified.
-
- The other tests enforce more specific rules and behaviour on a
- per-method or per-field basis.
- """
-
- XA_RBROLLBACK = 1
- XA_RBTIMEOUT = 2
- XA_OK = 0
- tx_counter = 0
-
- def reset_channel(self):
- self.channel.session_close()
- self.channel = self.client.channel(self.channel.id + 1)
- self.channel.session_open()
-
- def test_simple_commit(self):
- """
- Test basic one-phase commit behaviour.
- """
- channel = self.channel
- tx = self.xid("my-xid")
- self.txswap(tx, "commit")
-
- #neither queue should have any messages accessible
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(0, "queue-b")
-
- #commit
- self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status)
-
- #should close and reopen channel to ensure no unacked messages are held
- self.reset_channel()
-
- #check result
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(1, "queue-b")
- self.assertMessageId("commit", "queue-b")
-
- def test_simple_prepare_commit(self):
- """
- Test basic two-phase commit behaviour.
- """
- channel = self.channel
- tx = self.xid("my-xid")
- self.txswap(tx, "prepare-commit")
-
- #prepare
- self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status)
-
- #neither queue should have any messages accessible
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(0, "queue-b")
-
- #commit
- self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status)
-
- self.reset_channel()
-
- #check result
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(1, "queue-b")
- self.assertMessageId("prepare-commit", "queue-b")
-
-
- def test_simple_rollback(self):
- """
- Test basic rollback behaviour.
- """
- channel = self.channel
- tx = self.xid("my-xid")
- self.txswap(tx, "rollback")
-
- #neither queue should have any messages accessible
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(0, "queue-b")
-
- #rollback
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
-
- self.reset_channel()
-
- #check result
- self.assertMessageCount(1, "queue-a")
- self.assertMessageCount(0, "queue-b")
- self.assertMessageId("rollback", "queue-a")
-
- def test_simple_prepare_rollback(self):
- """
- Test basic rollback behaviour after the transaction has been prepared.
- """
- channel = self.channel
- tx = self.xid("my-xid")
- self.txswap(tx, "prepare-rollback")
-
- #prepare
- self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status)
-
- #neither queue should have any messages accessible
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(0, "queue-b")
-
- #rollback
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
-
- self.reset_channel()
-
- #check result
- self.assertMessageCount(1, "queue-a")
- self.assertMessageCount(0, "queue-b")
- self.assertMessageId("prepare-rollback", "queue-a")
-
- def test_select_required(self):
- """
- check that an error is flagged if select is not issued before
- start or end
- """
- channel = self.channel
- tx = self.xid("dummy")
- try:
- channel.dtx_demarcation_start(xid=tx)
-
- #if we get here we have failed, but need to do some cleanup:
- channel.dtx_demarcation_end(xid=tx)
- channel.dtx_coordination_rollback(xid=tx)
- self.fail("Channel not selected for use with dtx, expected exception!")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_start_already_known(self):
- """
- Verify that an attempt to start an association with a
- transaction that is already known is not allowed (unless the
- join flag is set).
- """
- #create two channels on different connection & select them for use with dtx:
- channel1 = self.channel
- channel1.dtx_demarcation_select()
-
- other = self.connect()
- channel2 = other.channel(1)
- channel2.session_open()
- channel2.dtx_demarcation_select()
-
- #create a xid
- tx = self.xid("dummy")
- #start work on one channel under that xid:
- channel1.dtx_demarcation_start(xid=tx)
- #then start on the other without the join set
- failed = False
- try:
- channel2.dtx_demarcation_start(xid=tx)
- except Closed, e:
- failed = True
- error = e
-
- #cleanup:
- if not failed:
- channel2.dtx_demarcation_end(xid=tx)
- other.close()
- channel1.dtx_demarcation_end(xid=tx)
- channel1.dtx_coordination_rollback(xid=tx)
-
- #verification:
- if failed: self.assertConnectionException(503, e.args[0])
- else: self.fail("Xid already known, expected exception!")
-
- def test_forget_xid_on_completion(self):
- """
- Verify that a xid is 'forgotten' - and can therefore be used
- again - once it is completed.
- """
- #do some transactional work & complete the transaction
- self.test_simple_commit()
- # channel has been reset, so reselect for use with dtx
- self.channel.dtx_demarcation_select()
-
- #start association for the same xid as the previously completed txn
- tx = self.xid("my-xid")
- self.channel.dtx_demarcation_start(xid=tx)
- self.channel.dtx_demarcation_end(xid=tx)
- self.channel.dtx_coordination_rollback(xid=tx)
-
- def test_start_join_and_resume(self):
- """
- Ensure the correct error is signalled when both the join and
- resume flags are set on starting an association between a
- channel and a transcation.
- """
- channel = self.channel
- channel.dtx_demarcation_select()
- tx = self.xid("dummy")
- try:
- channel.dtx_demarcation_start(xid=tx, join=True, resume=True)
- #failed, but need some cleanup:
- channel.dtx_demarcation_end(xid=tx)
- channel.dtx_coordination_rollback(xid=tx)
- self.fail("Join and resume both set, expected exception!")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_start_join(self):
- """
- Verify 'join' behaviour, where a channel is associated with a
- transaction that is already associated with another channel.
- """
- #create two channels & select them for use with dtx:
- channel1 = self.channel
- channel1.dtx_demarcation_select()
-
- channel2 = self.client.channel(2)
- channel2.session_open()
- channel2.dtx_demarcation_select()
-
- #setup
- channel1.queue_declare(queue="one", exclusive=True, auto_delete=True)
- channel1.queue_declare(queue="two", exclusive=True, auto_delete=True)
- channel1.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
- channel1.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
-
- #create a xid
- tx = self.xid("dummy")
- #start work on one channel under that xid:
- channel1.dtx_demarcation_start(xid=tx)
- #then start on the other with the join flag set
- channel2.dtx_demarcation_start(xid=tx, join=True)
-
- #do work through each channel
- self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two'
- self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one'
-
- #mark end on both channels
- channel1.dtx_demarcation_end(xid=tx)
- channel2.dtx_demarcation_end(xid=tx)
-
- #commit and check
- channel1.dtx_coordination_commit(xid=tx, one_phase=True)
- self.assertMessageCount(1, "one")
- self.assertMessageCount(1, "two")
- self.assertMessageId("a", "two")
- self.assertMessageId("b", "one")
-
-
- def test_suspend_resume(self):
- """
- Test suspension and resumption of an association
- """
- channel = self.channel
- channel.dtx_demarcation_select()
-
- #setup
- channel.queue_declare(queue="one", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="two", exclusive=True, auto_delete=True)
- channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
- channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
-
- tx = self.xid("dummy")
-
- channel.dtx_demarcation_start(xid=tx)
- self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two'
- channel.dtx_demarcation_end(xid=tx, suspend=True)
-
- channel.dtx_demarcation_start(xid=tx, resume=True)
- self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one'
- channel.dtx_demarcation_end(xid=tx)
-
- #commit and check
- channel.dtx_coordination_commit(xid=tx, one_phase=True)
- self.assertMessageCount(1, "one")
- self.assertMessageCount(1, "two")
- self.assertMessageId("a", "two")
- self.assertMessageId("b", "one")
-
- def test_suspend_start_end_resume(self):
- """
- Test suspension and resumption of an association with work
- done on another transaction when the first transaction is
- suspended
- """
- channel = self.channel
- channel.dtx_demarcation_select()
-
- #setup
- channel.queue_declare(queue="one", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="two", exclusive=True, auto_delete=True)
- channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
- channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
-
- tx = self.xid("dummy")
-
- channel.dtx_demarcation_start(xid=tx)
- self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two'
- channel.dtx_demarcation_end(xid=tx, suspend=True)
-
- channel.dtx_demarcation_start(xid=tx, resume=True)
- self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one'
- channel.dtx_demarcation_end(xid=tx)
-
- #commit and check
- channel.dtx_coordination_commit(xid=tx, one_phase=True)
- self.assertMessageCount(1, "one")
- self.assertMessageCount(1, "two")
- self.assertMessageId("a", "two")
- self.assertMessageId("b", "one")
-
- def test_end_suspend_and_fail(self):
- """
- Verify that the correct error is signalled if the suspend and
- fail flag are both set when disassociating a transaction from
- the channel
- """
- channel = self.channel
- channel.dtx_demarcation_select()
- tx = self.xid("suspend_and_fail")
- channel.dtx_demarcation_start(xid=tx)
- try:
- channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True)
- self.fail("Suspend and fail both set, expected exception!")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- #cleanup
- other = self.connect()
- channel = other.channel(1)
- channel.session_open()
- channel.dtx_coordination_rollback(xid=tx)
- channel.session_close()
- other.close()
-
-
- def test_end_unknown_xid(self):
- """
- Verifies that the correct exception is thrown when an attempt
- is made to end the association for a xid not previously
- associated with the channel
- """
- channel = self.channel
- channel.dtx_demarcation_select()
- tx = self.xid("unknown-xid")
- try:
- channel.dtx_demarcation_end(xid=tx)
- self.fail("Attempted to end association with unknown xid, expected exception!")
- except Closed, e:
- #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming...
- self.assertConnectionException(503, e.args[0])
-
- def test_end(self):
- """
- Verify that the association is terminated by end and subsequent
- operations are non-transactional
- """
- channel = self.client.channel(2)
- channel.session_open()
- channel.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True)
-
- #publish a message under a transaction
- channel.dtx_demarcation_select()
- tx = self.xid("dummy")
- channel.dtx_demarcation_start(xid=tx)
- channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"one"}, body="DtxMessage"))
- channel.dtx_demarcation_end(xid=tx)
-
- #now that association with txn is ended, publish another message
- channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"two"}, body="DtxMessage"))
-
- #check the second message is available, but not the first
- self.assertMessageCount(1, "tx-queue")
- self.subscribe(channel, queue="tx-queue", destination="results", confirm_mode=1)
- msg = self.client.queue("results").get(timeout=1)
- self.assertEqual("two", msg.content['message_id'])
- channel.message_cancel(destination="results")
- #ack the message then close the channel
- msg.complete()
- channel.session_close()
-
- channel = self.channel
- #commit the transaction and check that the first message (and
- #only the first message) is then delivered
- channel.dtx_coordination_commit(xid=tx, one_phase=True)
- self.assertMessageCount(1, "tx-queue")
- self.assertMessageId("one", "tx-queue")
-
- def test_invalid_commit_one_phase_true(self):
- """
- Test that a commit with one_phase = True is rejected if the
- transaction in question has already been prepared.
- """
- other = self.connect()
- tester = other.channel(1)
- tester.session_open()
- tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
- tester.dtx_demarcation_select()
- tx = self.xid("dummy")
- tester.dtx_demarcation_start(xid=tx)
- tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
- tester.dtx_demarcation_end(xid=tx)
- tester.dtx_coordination_prepare(xid=tx)
- failed = False
- try:
- tester.dtx_coordination_commit(xid=tx, one_phase=True)
- except Closed, e:
- failed = True
- error = e
-
- if failed:
- self.channel.dtx_coordination_rollback(xid=tx)
- self.assertConnectionException(503, e.args[0])
- else:
- tester.session_close()
- other.close()
- self.fail("Invalid use of one_phase=True, expected exception!")
-
- def test_invalid_commit_one_phase_false(self):
- """
- Test that a commit with one_phase = False is rejected if the
- transaction in question has not yet been prepared.
- """
- """
- Test that a commit with one_phase = True is rejected if the
- transaction in question has already been prepared.
- """
- other = self.connect()
- tester = other.channel(1)
- tester.session_open()
- tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
- tester.dtx_demarcation_select()
- tx = self.xid("dummy")
- tester.dtx_demarcation_start(xid=tx)
- tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
- tester.dtx_demarcation_end(xid=tx)
- failed = False
- try:
- tester.dtx_coordination_commit(xid=tx, one_phase=False)
- except Closed, e:
- failed = True
- error = e
-
- if failed:
- self.channel.dtx_coordination_rollback(xid=tx)
- self.assertConnectionException(503, e.args[0])
- else:
- tester.session_close()
- other.close()
- self.fail("Invalid use of one_phase=False, expected exception!")
-
- def test_implicit_end(self):
- """
- Test that an association is implicitly ended when the channel
- is closed (whether by exception or explicit client request)
- and the transaction in question is marked as rollback only.
- """
- channel1 = self.channel
- channel2 = self.client.channel(2)
- channel2.session_open()
-
- #setup:
- channel2.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
- channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
- tx = self.xid("dummy")
-
- channel2.dtx_demarcation_select()
- channel2.dtx_demarcation_start(xid=tx)
- channel2.message_subscribe(queue="dummy", destination="dummy", confirm_mode=1)
- channel2.message_flow(destination="dummy", unit=0, value=1)
- channel2.message_flow(destination="dummy", unit=1, value=0xFFFFFFFF)
- self.client.queue("dummy").get(timeout=1).complete()
- channel2.message_cancel(destination="dummy")
- channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
- channel2.session_close()
-
- self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).status)
- channel1.dtx_coordination_rollback(xid=tx)
-
- def test_get_timeout(self):
- """
- Check that get-timeout returns the correct value, (and that a
- transaction with a timeout can complete normally)
- """
- channel = self.channel
- tx = self.xid("dummy")
-
- channel.dtx_demarcation_select()
- channel.dtx_demarcation_start(xid=tx)
- self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout)
- channel.dtx_coordination_set_timeout(xid=tx, timeout=60)
- self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout)
- self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).status)
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
-
- def test_set_timeout(self):
- """
- Test the timeout of a transaction results in the expected
- behaviour
- """
- #open new channel to allow self.channel to be used in checking te queue
- channel = self.client.channel(2)
- channel.session_open()
- #setup:
- tx = self.xid("dummy")
- channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
- channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':"timeout"}, body="DtxMessage"))
-
- channel.dtx_demarcation_select()
- channel.dtx_demarcation_start(xid=tx)
- self.swap(channel, "queue-a", "queue-b")
- channel.dtx_coordination_set_timeout(xid=tx, timeout=2)
- sleep(3)
- #check that the work has been rolled back already
- self.assertMessageCount(1, "queue-a")
- self.assertMessageCount(0, "queue-b")
- self.assertMessageId("timeout", "queue-a")
- #check the correct codes are returned when we try to complete the txn
- self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).status)
- self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).status)
-
-
-
- def test_recover(self):
- """
- Test basic recover behaviour
- """
- channel = self.channel
-
- channel.dtx_demarcation_select()
- channel.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
-
- prepared = []
- for i in range(1, 10):
- tx = self.xid("tx%s" % (i))
- channel.dtx_demarcation_start(xid=tx)
- channel.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="message%s" % (i)))
- channel.dtx_demarcation_end(xid=tx)
- if i in [2, 5, 6, 8]:
- channel.dtx_coordination_prepare(xid=tx)
- prepared.append(tx)
- else:
- channel.dtx_coordination_rollback(xid=tx)
-
- xids = channel.dtx_coordination_recover().in_doubt
-
- #rollback the prepared transactions returned by recover
- for x in xids:
- channel.dtx_coordination_rollback(xid=x)
-
- #validate against the expected list of prepared transactions
- actual = set(xids)
- expected = set(prepared)
- intersection = actual.intersection(expected)
-
- if intersection != expected:
- missing = expected.difference(actual)
- extra = actual.difference(expected)
- for x in missing:
- channel.dtx_coordination_rollback(xid=x)
- self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
-
- def test_bad_resume(self):
- """
- Test that a resume on a session not selected for use with dtx fails
- """
- channel = self.channel
- try:
- channel.dtx_demarcation_start(resume=True)
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def xid(self, txid):
- DtxTests.tx_counter += 1
- branchqual = "v%s" % DtxTests.tx_counter
- return pack('!LBB', 0, len(txid), len(branchqual)) + txid + branchqual
-
- def txswap(self, tx, id):
- channel = self.channel
- #declare two queues:
- channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
- #put message with specified id on one queue:
- channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':id}, body="DtxMessage"))
-
- #start the transaction:
- channel.dtx_demarcation_select()
- self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).status)
-
- #'swap' the message from one queue to the other, under that transaction:
- self.swap(self.channel, "queue-a", "queue-b")
-
- #mark the end of the transactional work:
- self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).status)
-
- def swap(self, channel, src, dest):
- #consume from src:
- channel.message_subscribe(destination="temp-swap", queue=src, confirm_mode=1)
- channel.message_flow(destination="temp-swap", unit=0, value=1)
- channel.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF)
- msg = self.client.queue("temp-swap").get(timeout=1)
- channel.message_cancel(destination="temp-swap")
- msg.complete();
-
- #re-publish to dest
- channel.message_transfer(content=Content(properties={'routing_key':dest, 'message_id':msg.content['message_id']},
- body=msg.content.body))
-
- def assertMessageCount(self, expected, queue):
- self.assertEqual(expected, self.channel.queue_query(queue=queue).message_count)
-
- def assertMessageId(self, expected, queue):
- self.channel.message_subscribe(queue=queue, destination="results")
- self.channel.message_flow(destination="results", unit=0, value=1)
- self.channel.message_flow(destination="results", unit=1, value=0xFFFFFFFF)
- self.assertEqual(expected, self.client.queue("results").get(timeout=1).content['message_id'])
- self.channel.message_cancel(destination="results")
diff --git a/qpid/python/tests_0-10_preview/example.py b/qpid/python/tests_0-10_preview/example.py
deleted file mode 100644
index da5ee2441f..0000000000
--- a/qpid/python/tests_0-10_preview/example.py
+++ /dev/null
@@ -1,95 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class ExampleTest (TestBase):
- """
- An example Qpid test, illustrating the unittest framework and the
- python Qpid client. The test class must inherit TestBase. The
- test code uses the Qpid client to interact with a qpid broker and
- verify it behaves as expected.
- """
-
- def test_example(self):
- """
- An example test. Note that test functions must start with 'test_'
- to be recognized by the test framework.
- """
-
- # By inheriting TestBase, self.client is automatically connected
- # and self.channel is automatically opened as channel(1)
- # Other channel methods mimic the protocol.
- channel = self.channel
-
- # Now we can send regular commands. If you want to see what the method
- # arguments mean or what other commands are available, you can use the
- # python builtin help() method. For example:
- #help(chan)
- #help(chan.exchange_declare)
-
- # If you want browse the available protocol methods without being
- # connected to a live server you can use the amqp-doc utility:
- #
- # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>]
- #
- # Options:
- # -e, --regexp use regex instead of glob when matching
-
- # Now that we know what commands are available we can use them to
- # interact with the server.
-
- # Here we use ordinal arguments.
- self.exchange_declare(channel, 0, "test", "direct")
-
- # Here we use keyword arguments.
- self.queue_declare(channel, queue="test-queue")
- channel.queue_bind(queue="test-queue", exchange="test", routing_key="key")
-
- # Call Channel.basic_consume to register as a consumer.
- # All the protocol methods return a message object. The message object
- # has fields corresponding to the reply method fields, plus a content
- # field that is filled if the reply includes content. In this case the
- # interesting field is the consumer_tag.
- channel.message_subscribe(queue="test-queue", destination="consumer_tag")
- channel.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF)
- channel.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF)
-
- # We can use the Client.queue(...) method to access the queue
- # corresponding to our consumer_tag.
- queue = self.client.queue("consumer_tag")
-
- # Now lets publish a message and see if our consumer gets it. To do
- # this we need to import the Content class.
- sent = Content("Hello World!")
- sent["routing_key"] = "key"
- channel.message_transfer(destination="test", content=sent)
-
- # Now we'll wait for the message to arrive. We can use the timeout
- # argument in case the server hangs. By default queue.get() will wait
- # until a message arrives or the connection to the server dies.
- msg = queue.get(timeout=10)
-
- # And check that we got the right response with assertEqual
- self.assertEqual(sent.body, msg.content.body)
-
- # Now acknowledge the message.
- msg.complete()
-
diff --git a/qpid/python/tests_0-10_preview/exchange.py b/qpid/python/tests_0-10_preview/exchange.py
deleted file mode 100644
index 86c39b7736..0000000000
--- a/qpid/python/tests_0-10_preview/exchange.py
+++ /dev/null
@@ -1,335 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-Tests for exchange behaviour.
-
-Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
-"""
-
-import Queue, logging
-from qpid.testlib import TestBase
-from qpid.content import Content
-from qpid.client import Closed
-
-
-class StandardExchangeVerifier:
- """Verifies standard exchange behavior.
-
- Used as base class for classes that test standard exchanges."""
-
- def verifyDirectExchange(self, ex):
- """Verify that ex behaves like a direct exchange."""
- self.queue_declare(queue="q")
- self.channel.queue_bind(queue="q", exchange=ex, routing_key="k")
- self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
- try:
- self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
- self.fail("Expected Empty exception")
- except Queue.Empty: None # Expected
-
- def verifyFanOutExchange(self, ex):
- """Verify that ex behaves like a fanout exchange."""
- self.queue_declare(queue="q")
- self.channel.queue_bind(queue="q", exchange=ex)
- self.queue_declare(queue="p")
- self.channel.queue_bind(queue="p", exchange=ex)
- for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
-
- def verifyTopicExchange(self, ex):
- """Verify that ex behaves like a topic exchange"""
- self.queue_declare(queue="a")
- self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*")
- q = self.consume("a")
- self.assertPublishGet(q, ex, "a.b.x")
- self.assertPublishGet(q, ex, "a.x.b.x")
- self.assertPublishGet(q, ex, "a.x.x.b.x")
- # Shouldn't match
- self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"}))
- self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b.x.y"}))
- self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"x.a.b.x"}))
- self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"}))
- self.assert_(q.empty())
-
- def verifyHeadersExchange(self, ex):
- """Verify that ex is a headers exchange"""
- self.queue_declare(queue="q")
- self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
- q = self.consume("q")
- headers = {"name":"fred", "age":3}
- self.assertPublishGet(q, exchange=ex, properties=headers)
- self.channel.message_transfer(destination=ex) # No headers, won't deliver
- self.assertEmpty(q);
-
-
-class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
- """
- The server SHOULD implement these standard exchange types: topic, headers.
-
- Client attempts to declare an exchange with each of these standard types.
- """
-
- def testDirect(self):
- """Declare and test a direct exchange"""
- self.exchange_declare(0, exchange="d", type="direct")
- self.verifyDirectExchange("d")
-
- def testFanout(self):
- """Declare and test a fanout exchange"""
- self.exchange_declare(0, exchange="f", type="fanout")
- self.verifyFanOutExchange("f")
-
- def testTopic(self):
- """Declare and test a topic exchange"""
- self.exchange_declare(0, exchange="t", type="topic")
- self.verifyTopicExchange("t")
-
- def testHeaders(self):
- """Declare and test a headers exchange"""
- self.exchange_declare(0, exchange="h", type="headers")
- self.verifyHeadersExchange("h")
-
-
-class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
- """
- The server MUST, in each virtual host, pre-declare an exchange instance
- for each standard exchange type that it implements, where the name of the
- exchange instance is amq. followed by the exchange type name.
-
- Client creates a temporary queue and attempts to bind to each required
- exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if
- those types are defined).
- """
- def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
-
- def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
-
- def testAmqTopic(self): self.verifyTopicExchange("amq.topic")
-
- def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
-
-class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
- """
- The server MUST predeclare a direct exchange to act as the default exchange
- for content Publish methods and for default queue bindings.
-
- Client checks that the default exchange is active by specifying a queue
- binding with no exchange name, and publishing a message with a suitable
- routing key but without specifying the exchange name, then ensuring that
- the message arrives in the queue correctly.
- """
- def testDefaultExchange(self):
- # Test automatic binding by queue name.
- self.queue_declare(queue="d")
- self.assertPublishConsume(queue="d", routing_key="d")
- # Test explicit bind to default queue
- self.verifyDirectExchange("")
-
-
-# TODO aconway 2006-09-27: Fill in empty tests:
-
-class DefaultAccessRuleTests(TestBase):
- """
- The server MUST NOT allow clients to access the default exchange except
- by specifying an empty exchange name in the Queue.Bind and content Publish
- methods.
- """
-
-class ExtensionsRuleTests(TestBase):
- """
- The server MAY implement other exchange types as wanted.
- """
-
-
-class DeclareMethodMinimumRuleTests(TestBase):
- """
- The server SHOULD support a minimum of 16 exchanges per virtual host and
- ideally, impose no limit except as defined by available resources.
-
- The client creates as many exchanges as it can until the server reports
- an error; the number of exchanges successfuly created must be at least
- sixteen.
- """
-
-
-class DeclareMethodTicketFieldValidityRuleTests(TestBase):
- """
- The client MUST provide a valid access ticket giving "active" access to
- the realm in which the exchange exists or will be created, or "passive"
- access if the if-exists flag is set.
-
- Client creates access ticket with wrong access rights and attempts to use
- in this method.
- """
-
-
-class DeclareMethodExchangeFieldReservedRuleTests(TestBase):
- """
- Exchange names starting with "amq." are reserved for predeclared and
- standardised exchanges. The client MUST NOT attempt to create an exchange
- starting with "amq.".
-
-
- """
-
-
-class DeclareMethodTypeFieldTypedRuleTests(TestBase):
- """
- Exchanges cannot be redeclared with different types. The client MUST not
- attempt to redeclare an existing exchange with a different type than used
- in the original Exchange.Declare method.
-
-
- """
-
-
-class DeclareMethodTypeFieldSupportRuleTests(TestBase):
- """
- The client MUST NOT attempt to create an exchange with a type that the
- server does not support.
-
-
- """
-
-
-class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase):
- """
- If set, and the exchange does not already exist, the server MUST raise a
- channel exception with reply code 404 (not found).
- """
- def test(self):
- try:
- self.channel.exchange_declare(exchange="humpty_dumpty", passive=True)
- self.fail("Expected 404 for passive declaration of unknown exchange.")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
-class DeclareMethodDurableFieldSupportRuleTests(TestBase):
- """
- The server MUST support both durable and transient exchanges.
-
-
- """
-
-
-class DeclareMethodDurableFieldStickyRuleTests(TestBase):
- """
- The server MUST ignore the durable field if the exchange already exists.
-
-
- """
-
-
-class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase):
- """
- The server MUST ignore the auto-delete field if the exchange already
- exists.
-
-
- """
-
-
-class DeleteMethodTicketFieldValidityRuleTests(TestBase):
- """
- The client MUST provide a valid access ticket giving "active" access
- rights to the exchange's access realm.
-
- Client creates access ticket with wrong access rights and attempts to use
- in this method.
- """
-
-
-class DeleteMethodExchangeFieldExistsRuleTests(TestBase):
- """
- The client MUST NOT attempt to delete an exchange that does not exist.
- """
-
-
-class HeadersExchangeTests(TestBase):
- """
- Tests for headers exchange functionality.
- """
- def setUp(self):
- TestBase.setUp(self)
- self.queue_declare(queue="q")
- self.q = self.consume("q")
-
- def myAssertPublishGet(self, headers):
- self.assertPublishGet(self.q, exchange="amq.match", properties=headers)
-
- def myBasicPublish(self, headers):
- self.channel.message_transfer(destination="amq.match", content=Content("foobar", properties={'application_headers':headers}))
-
- def testMatchAll(self):
- self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
- self.myAssertPublishGet({"name":"fred", "age":3})
- self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
-
- # None of these should match
- self.myBasicPublish({})
- self.myBasicPublish({"name":"barney"})
- self.myBasicPublish({"name":10})
- self.myBasicPublish({"name":"fred", "age":2})
- self.assertEmpty(self.q)
-
- def testMatchAny(self):
- self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
- self.myAssertPublishGet({"name":"fred"})
- self.myAssertPublishGet({"name":"fred", "ignoreme":10})
- self.myAssertPublishGet({"ignoreme":10, "age":3})
-
- # Wont match
- self.myBasicPublish({})
- self.myBasicPublish({"irrelevant":0})
- self.assertEmpty(self.q)
-
-
-class MiscellaneousErrorsTests(TestBase):
- """
- Test some miscellaneous error conditions
- """
- def testTypeNotKnown(self):
- try:
- self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
- self.fail("Expected 503 for declaration of unknown exchange type.")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def testDifferentDeclaredType(self):
- self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
- try:
- self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
- self.fail("Expected 530 for redeclaration of exchange with different type.")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
- #cleanup
- other = self.connect()
- c2 = other.channel(1)
- c2.session_open()
- c2.exchange_delete(exchange="test_different_declared_type_exchange")
-
-class ExchangeTests(TestBase):
- def testHeadersBindNoMatchArg(self):
- self.channel.queue_declare(queue="q", exclusive=True, auto_delete=True)
- try:
- self.channel.queue_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} )
- self.fail("Expected failure for missing x-match arg.")
- except Closed, e:
- self.assertConnectionException(541, e.args[0])
diff --git a/qpid/python/tests_0-10_preview/execution.py b/qpid/python/tests_0-10_preview/execution.py
deleted file mode 100644
index 3ff6d8ea65..0000000000
--- a/qpid/python/tests_0-10_preview/execution.py
+++ /dev/null
@@ -1,29 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class ExecutionTests (TestBase):
- def test_flush(self):
- channel = self.channel
- for i in [1, 2, 3]:
- channel.message_transfer(
- content=Content(properties={'routing_key':str(i)}))
- assert(channel.completion.wait(channel.completion.command_id, timeout=1))
diff --git a/qpid/python/tests_0-10_preview/message.py b/qpid/python/tests_0-10_preview/message.py
deleted file mode 100644
index a3d32bdb2d..0000000000
--- a/qpid/python/tests_0-10_preview/message.py
+++ /dev/null
@@ -1,834 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-from qpid.reference import Reference, ReferenceId
-
-class MessageTests(TestBase):
- """Tests for 'methods' on the amqp message 'class'"""
-
- def test_consume_no_local(self):
- """
- Test that the no_local flag is honoured in the consume method
- """
- channel = self.channel
- #setup, declare two queues:
- channel.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True)
- #establish two consumers one of which excludes delivery of locally sent messages
- self.subscribe(destination="local_included", queue="test-queue-1a")
- self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True)
-
- #send a message
- channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local"))
-
- #check the queues of the two consumers
- excluded = self.client.queue("local_excluded")
- included = self.client.queue("local_included")
- msg = included.get(timeout=1)
- self.assertEqual("consume_no_local", msg.content.body)
- try:
- excluded.get(timeout=1)
- self.fail("Received locally published message though no_local=true")
- except Empty: None
-
- def test_consume_no_local_awkward(self):
-
- """
- If an exclusive queue gets a no-local delivered to it, that
- message could 'block' delivery of subsequent messages or it
- could be left on the queue, possibly never being consumed
- (this is the case for example in the qpid JMS mapping of
- topics). This test excercises a Qpid C++ broker hack that
- deletes such messages.
- """
-
- channel = self.channel
- #setup:
- channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
- #establish consumer which excludes delivery of locally sent messages
- self.subscribe(destination="local_excluded", queue="test-queue", no_local=True)
-
- #send a 'local' message
- channel.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="local"))
-
- #send a non local message
- other = self.connect()
- channel2 = other.channel(1)
- channel2.session_open()
- channel2.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="foreign"))
- channel2.session_close()
- other.close()
-
- #check that the second message only is delivered
- excluded = self.client.queue("local_excluded")
- msg = excluded.get(timeout=1)
- self.assertEqual("foreign", msg.content.body)
- try:
- excluded.get(timeout=1)
- self.fail("Received extra message")
- except Empty: None
- #check queue is empty
- self.assertEqual(0, channel.queue_query(queue="test-queue").message_count)
-
-
- def test_consume_exclusive(self):
- """
- Test that the exclusive flag is honoured in the consume method
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
-
- #check that an exclusive consumer prevents other consumer being created:
- self.subscribe(destination="first", queue="test-queue-2", exclusive=True)
- try:
- self.subscribe(destination="second", queue="test-queue-2")
- self.fail("Expected consume request to fail due to previous exclusive consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
-
- #open new channel and cleanup last consumer:
- channel = self.client.channel(2)
- channel.session_open()
-
- #check that an exclusive consumer cannot be created if a consumer already exists:
- self.subscribe(channel, destination="first", queue="test-queue-2")
- try:
- self.subscribe(destination="second", queue="test-queue-2", exclusive=True)
- self.fail("Expected exclusive consume request to fail due to previous consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
-
- def test_consume_queue_errors(self):
- """
- Test error conditions associated with the queue field of the consume method:
- """
- channel = self.channel
- try:
- #queue specified but doesn't exist:
- self.subscribe(queue="invalid-queue", destination="")
- self.fail("Expected failure when consuming from non-existent queue")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- channel = self.client.channel(2)
- channel.session_open()
- try:
- #queue not specified and none previously declared for channel:
- self.subscribe(channel, queue="", destination="")
- self.fail("Expected failure when consuming from unspecified queue")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- def test_consume_unique_consumers(self):
- """
- Ensure unique consumer tags are enforced
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True)
-
- #check that attempts to use duplicate tags are detected and prevented:
- self.subscribe(destination="first", queue="test-queue-3")
- try:
- self.subscribe(destination="first", queue="test-queue-3")
- self.fail("Expected consume request to fail due to non-unique tag")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- def test_cancel(self):
- """
- Test compliance of the basic.cancel method
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True)
- self.subscribe(destination="my-consumer", queue="test-queue-4")
- channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One"))
-
- #cancel should stop messages being delivered
- channel.message_cancel(destination="my-consumer")
- channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="Two"))
- myqueue = self.client.queue("my-consumer")
- msg = myqueue.get(timeout=1)
- self.assertEqual("One", msg.content.body)
- try:
- msg = myqueue.get(timeout=1)
- self.fail("Got message after cancellation: " + msg)
- except Empty: None
-
- #cancellation of non-existant consumers should be handled without error
- channel.message_cancel(destination="my-consumer")
- channel.message_cancel(destination="this-never-existed")
-
-
- def test_ack(self):
- """
- Test basic ack/recover behaviour
- """
- channel = self.channel
- channel.queue_declare(queue="test-ack-queue", exclusive=True, auto_delete=True)
-
- self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1)
- queue = self.client.queue("consumer_tag")
-
- channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Two"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Three"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Four"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Five"))
-
- msg1 = queue.get(timeout=1)
- msg2 = queue.get(timeout=1)
- msg3 = queue.get(timeout=1)
- msg4 = queue.get(timeout=1)
- msg5 = queue.get(timeout=1)
-
- self.assertEqual("One", msg1.content.body)
- self.assertEqual("Two", msg2.content.body)
- self.assertEqual("Three", msg3.content.body)
- self.assertEqual("Four", msg4.content.body)
- self.assertEqual("Five", msg5.content.body)
-
- msg2.complete(cumulative=True)#One and Two
- msg4.complete(cumulative=False)
-
- channel.message_recover(requeue=False)
-
- msg3b = queue.get(timeout=1)
- msg5b = queue.get(timeout=1)
-
- self.assertEqual("Three", msg3b.content.body)
- self.assertEqual("Five", msg5b.content.body)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None
-
-
- def test_recover(self):
- """
- Test recover behaviour
- """
- channel = self.channel
- channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
- channel.queue_bind(exchange="amq.fanout", queue="queue-a")
- channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
- channel.queue_bind(exchange="amq.fanout", queue="queue-b")
-
- self.subscribe(queue="queue-a", destination="unconfirmed", confirm_mode=1)
- self.subscribe(queue="queue-b", destination="confirmed", confirm_mode=0)
- confirmed = self.client.queue("confirmed")
- unconfirmed = self.client.queue("unconfirmed")
-
- data = ["One", "Two", "Three", "Four", "Five"]
- for d in data:
- channel.message_transfer(destination="amq.fanout", content=Content(body=d))
-
- for q in [confirmed, unconfirmed]:
- for d in data:
- self.assertEqual(d, q.get(timeout=1).content.body)
- self.assertEmpty(q)
-
- channel.message_recover(requeue=False)
-
- self.assertEmpty(confirmed)
-
- while len(data):
- msg = None
- for d in data:
- msg = unconfirmed.get(timeout=1)
- self.assertEqual(d, msg.content.body)
- self.assertEqual(True, msg.content['redelivered'])
- self.assertEmpty(unconfirmed)
- data.remove(msg.content.body)
- msg.complete(cumulative=False)
- channel.message_recover(requeue=False)
-
-
- def test_recover_requeue(self):
- """
- Test requeing on recovery
- """
- channel = self.channel
- channel.queue_declare(queue="test-requeue", exclusive=True, auto_delete=True)
-
- self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1)
- queue = self.client.queue("consumer_tag")
-
- channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Two"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five"))
-
- msg1 = queue.get(timeout=1)
- msg2 = queue.get(timeout=1)
- msg3 = queue.get(timeout=1)
- msg4 = queue.get(timeout=1)
- msg5 = queue.get(timeout=1)
-
- self.assertEqual("One", msg1.content.body)
- self.assertEqual("Two", msg2.content.body)
- self.assertEqual("Three", msg3.content.body)
- self.assertEqual("Four", msg4.content.body)
- self.assertEqual("Five", msg5.content.body)
-
- msg2.complete(cumulative=True) #One and Two
- msg4.complete(cumulative=False) #Four
-
- channel.message_cancel(destination="consumer_tag")
-
- #publish a new message
- channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Six"))
- #requeue unacked messages (Three and Five)
- channel.message_recover(requeue=True)
-
- self.subscribe(queue="test-requeue", destination="consumer_tag")
- queue2 = self.client.queue("consumer_tag")
-
- msg3b = queue2.get(timeout=1)
- msg5b = queue2.get(timeout=1)
-
- self.assertEqual("Three", msg3b.content.body)
- self.assertEqual("Five", msg5b.content.body)
-
- self.assertEqual(True, msg3b.content['redelivered'])
- self.assertEqual(True, msg5b.content['redelivered'])
-
- self.assertEqual("Six", queue2.get(timeout=1).content.body)
-
- try:
- extra = queue2.get(timeout=1)
- self.fail("Got unexpected message in second queue: " + extra.content.body)
- except Empty: None
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected message in original queue: " + extra.content.body)
- except Empty: None
-
-
- def test_qos_prefetch_count(self):
- """
- Test that the prefetch count specified is honoured
- """
- #setup: declare queue and subscribe
- channel = self.channel
- channel.queue_declare(queue="test-prefetch-count", exclusive=True, auto_delete=True)
- subscription = self.subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1)
- queue = self.client.queue("consumer_tag")
-
- #set prefetch to 5:
- channel.message_qos(prefetch_count=5)
-
- #publish 10 messages:
- for i in range(1, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-count"}, body="Message %d" % i))
-
- #only 5 messages should have been delivered:
- for i in range(1, 6):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
- except Empty: None
-
- #ack messages and check that the next set arrive ok:
- msg.complete()
-
- for i in range(6, 11):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- msg.complete()
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
- except Empty: None
-
-
-
- def test_qos_prefetch_size(self):
- """
- Test that the prefetch size specified is honoured
- """
- #setup: declare queue and subscribe
- channel = self.channel
- channel.queue_declare(queue="test-prefetch-size", exclusive=True, auto_delete=True)
- subscription = self.subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1)
- queue = self.client.queue("consumer_tag")
-
- #set prefetch to 50 bytes (each message is 9 or 10 bytes):
- channel.message_qos(prefetch_size=50)
-
- #publish 10 messages:
- for i in range(1, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body="Message %d" % i))
-
- #only 5 messages should have been delivered (i.e. 45 bytes worth):
- for i in range(1, 6):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
- except Empty: None
-
- #ack messages and check that the next set arrive ok:
- msg.complete()
-
- for i in range(6, 11):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- msg.complete()
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
- except Empty: None
-
- #make sure that a single oversized message still gets delivered
- large = "abcdefghijklmnopqrstuvwxyz"
- large = large + "-" + large;
- channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body=large))
- msg = queue.get(timeout=1)
- self.assertEqual(large, msg.content.body)
-
- def test_reject(self):
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout")
- channel.queue_declare(queue = "r", exclusive=True, auto_delete=True)
- channel.queue_bind(queue = "r", exchange = "amq.fanout")
-
- self.subscribe(queue = "q", destination = "consumer", confirm_mode = 1)
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah"))
- msg = self.client.queue("consumer").get(timeout = 1)
- self.assertEquals(msg.content.body, "blah, blah")
- channel.message_reject([msg.command_id, msg.command_id])
-
- self.subscribe(queue = "r", destination = "checker")
- msg = self.client.queue("checker").get(timeout = 1)
- self.assertEquals(msg.content.body, "blah, blah")
-
- def test_credit_flow_messages(self):
- """
- Test basic credit based flow control with unit = message
- """
- #declare an exclusive queue
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- #create consumer (for now that defaults to infinite credit)
- channel.message_subscribe(queue = "q", destination = "c")
- channel.message_flow_mode(mode = 0, destination = "c")
- #send batch of messages to queue
- for i in range(1, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
-
- #set message credit to finite amount (less than enough for all messages)
- channel.message_flow(unit = 0, value = 5, destination = "c")
- #set infinite byte credit
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
- #check that expected number were received
- q = self.client.queue("c")
- for i in range(1, 6):
- self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
- self.assertEmpty(q)
-
- #increase credit again and check more are received
- for i in range(6, 11):
- channel.message_flow(unit = 0, value = 1, destination = "c")
- self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
- self.assertEmpty(q)
-
- def test_credit_flow_bytes(self):
- """
- Test basic credit based flow control with unit = bytes
- """
- #declare an exclusive queue
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- #create consumer (for now that defaults to infinite credit)
- channel.message_subscribe(queue = "q", destination = "c")
- channel.message_flow_mode(mode = 0, destination = "c")
- #send batch of messages to queue
- for i in range(10):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
-
- #each message is currently interpreted as requiring msg_size bytes of credit
- msg_size = 35
-
- #set byte credit to finite amount (less than enough for all messages)
- channel.message_flow(unit = 1, value = msg_size*5, destination = "c")
- #set infinite message credit
- channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
- #check that expected number were received
- q = self.client.queue("c")
- for i in range(5):
- self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
- self.assertEmpty(q)
-
- #increase credit again and check more are received
- for i in range(5):
- channel.message_flow(unit = 1, value = msg_size, destination = "c")
- self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
- self.assertEmpty(q)
-
-
- def test_window_flow_messages(self):
- """
- Test basic window based flow control with unit = message
- """
- #declare an exclusive queue
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- #create consumer (for now that defaults to infinite credit)
- channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
- channel.message_flow_mode(mode = 1, destination = "c")
- #send batch of messages to queue
- for i in range(1, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
-
- #set message credit to finite amount (less than enough for all messages)
- channel.message_flow(unit = 0, value = 5, destination = "c")
- #set infinite byte credit
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
- #check that expected number were received
- q = self.client.queue("c")
- for i in range(1, 6):
- msg = q.get(timeout = 1)
- self.assertDataEquals(channel, msg, "Message %d" % i)
- self.assertEmpty(q)
-
- #acknowledge messages and check more are received
- msg.complete(cumulative=True)
- for i in range(6, 11):
- self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
- self.assertEmpty(q)
-
-
- def test_window_flow_bytes(self):
- """
- Test basic window based flow control with unit = bytes
- """
- #declare an exclusive queue
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- #create consumer (for now that defaults to infinite credit)
- channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
- channel.message_flow_mode(mode = 1, destination = "c")
- #send batch of messages to queue
- for i in range(10):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
-
- #each message is currently interpreted as requiring msg_size bytes of credit
- msg_size = 40
-
- #set byte credit to finite amount (less than enough for all messages)
- channel.message_flow(unit = 1, value = msg_size*5, destination = "c")
- #set infinite message credit
- channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
- #check that expected number were received
- q = self.client.queue("c")
- msgs = []
- for i in range(5):
- msg = q.get(timeout = 1)
- msgs.append(msg)
- self.assertDataEquals(channel, msg, "abcdefgh")
- self.assertEmpty(q)
-
- #ack each message individually and check more are received
- for i in range(5):
- msg = msgs.pop()
- msg.complete(cumulative=False)
- self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
- self.assertEmpty(q)
-
- def test_subscribe_not_acquired(self):
- """
- Test the not-acquired modes works as expected for a simple case
- """
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- for i in range(1, 6):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
-
- self.subscribe(queue = "q", destination = "a", acquire_mode = 1)
- self.subscribe(queue = "q", destination = "b", acquire_mode = 1)
-
- for i in range(6, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
-
- #both subscribers should see all messages
- qA = self.client.queue("a")
- qB = self.client.queue("b")
- for i in range(1, 11):
- for q in [qA, qB]:
- msg = q.get(timeout = 1)
- self.assertEquals("Message %s" % i, msg.content.body)
- msg.complete()
-
- #messages should still be on the queue:
- self.assertEquals(10, channel.queue_query(queue = "q").message_count)
-
- def test_acquire(self):
- """
- Test explicit acquire function
- """
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
-
- self.subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1)
- msg = self.client.queue("a").get(timeout = 1)
- #message should still be on the queue:
- self.assertEquals(1, channel.queue_query(queue = "q").message_count)
-
- channel.message_acquire([msg.command_id, msg.command_id])
- #check that we get notification (i.e. message_acquired)
- response = channel.control_queue.get(timeout=1)
- self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
- #message should have been removed from the queue:
- self.assertEquals(0, channel.queue_query(queue = "q").message_count)
- msg.complete()
-
-
-
-
- def test_release(self):
- """
- Test explicit release function
- """
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me"))
-
- self.subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1)
- msg = self.client.queue("a").get(timeout = 1)
- channel.message_cancel(destination = "a")
- channel.message_release([msg.command_id, msg.command_id])
- msg.complete()
-
- #message should not have been removed from the queue:
- self.assertEquals(1, channel.queue_query(queue = "q").message_count)
-
- def test_release_ordering(self):
- """
- Test order of released messages is as expected
- """
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- for i in range (1, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "released message %s" % (i)))
-
- channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
- channel.message_flow(unit = 0, value = 10, destination = "a")
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
- queue = self.client.queue("a")
- first = queue.get(timeout = 1)
- for i in range (2, 10):
- self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body)
- last = queue.get(timeout = 1)
- self.assertEmpty(queue)
- channel.message_release([first.command_id, last.command_id])
- last.complete()#will re-allocate credit, as in window mode
- for i in range (1, 11):
- self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body)
-
- def test_ranged_ack(self):
- """
- Test acking of messages ranges
- """
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- for i in range (1, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message %s" % (i)))
-
- channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
- channel.message_flow(unit = 0, value = 10, destination = "a")
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
- queue = self.client.queue("a")
- for i in range (1, 11):
- self.assertEquals("message %s" % (i), queue.get(timeout = 1).content.body)
- self.assertEmpty(queue)
-
- #ack all but the third message (command id 2)
- channel.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=[0,1,3,6,7,7,8,9])
- channel.message_recover()
- self.assertEquals("message 3", queue.get(timeout = 1).content.body)
- self.assertEmpty(queue)
-
- def test_subscribe_not_acquired_2(self):
- channel = self.channel
-
- #publish some messages
- self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- for i in range(1, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
-
- #consume some of them
- channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
- channel.message_flow_mode(mode = 0, destination = "a")
- channel.message_flow(unit = 0, value = 5, destination = "a")
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
-
- queue = self.client.queue("a")
- for i in range(1, 6):
- msg = queue.get(timeout = 1)
- self.assertEquals("message-%d" % (i), msg.content.body)
- msg.complete()
- self.assertEmpty(queue)
-
- #now create a not-acquired subscriber
- channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
-
- #check it gets those not consumed
- queue = self.client.queue("b")
- channel.message_flow(unit = 0, value = 1, destination = "b")
- for i in range(6, 11):
- msg = queue.get(timeout = 1)
- self.assertEquals("message-%d" % (i), msg.content.body)
- msg.complete()
- channel.message_flow(unit = 0, value = 1, destination = "b")
- self.assertEmpty(queue)
-
- #check all 'browsed' messages are still on the queue
- self.assertEqual(5, channel.queue_query(queue="q").message_count)
-
- def test_subscribe_not_acquired_3(self):
- channel = self.channel
-
- #publish some messages
- self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- for i in range(1, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
-
- #create a not-acquired subscriber
- channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
- channel.message_flow(unit = 0, value = 10, destination = "a")
-
- #browse through messages
- queue = self.client.queue("a")
- for i in range(1, 11):
- msg = queue.get(timeout = 1)
- self.assertEquals("message-%d" % (i), msg.content.body)
- if (i % 2):
- #try to acquire every second message
- channel.message_acquire([msg.command_id, msg.command_id])
- #check that acquire succeeds
- response = channel.control_queue.get(timeout=1)
- self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
- msg.complete()
- self.assertEmpty(queue)
-
- #create a second not-acquired subscriber
- channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
- channel.message_flow(unit = 0, value = 1, destination = "b")
- #check it gets those not consumed
- queue = self.client.queue("b")
- for i in [2,4,6,8,10]:
- msg = queue.get(timeout = 1)
- self.assertEquals("message-%d" % (i), msg.content.body)
- msg.complete()
- channel.message_flow(unit = 0, value = 1, destination = "b")
- self.assertEmpty(queue)
-
- #check all 'browsed' messages are still on the queue
- self.assertEqual(5, channel.queue_query(queue="q").message_count)
-
- def test_release_unacquired(self):
- channel = self.channel
-
- #create queue
- self.queue_declare(queue = "q", exclusive=True, auto_delete=True, durable=True)
-
- #send message
- channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
-
- #create two 'browsers'
- channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
- channel.message_flow(unit = 0, value = 10, destination = "a")
- queueA = self.client.queue("a")
-
- channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
- channel.message_flow(unit = 0, value = 10, destination = "b")
- queueB = self.client.queue("b")
-
- #have each browser release the message
- msgA = queueA.get(timeout = 1)
- channel.message_release([msgA.command_id, msgA.command_id])
-
- msgB = queueB.get(timeout = 1)
- channel.message_release([msgB.command_id, msgB.command_id])
-
- #cancel browsers
- channel.message_cancel(destination = "a")
- channel.message_cancel(destination = "b")
-
- #create consumer
- channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1, acquire_mode=0)
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
- channel.message_flow(unit = 0, value = 10, destination = "c")
- queueC = self.client.queue("c")
- #consume the message then ack it
- msgC = queueC.get(timeout = 1)
- msgC.complete()
- #ensure there are no other messages
- self.assertEmpty(queueC)
-
- def test_no_size(self):
- self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
-
- ch = self.channel
- ch.message_transfer(content=SizelessContent(properties={'routing_key' : "q"}, body="message-body"))
-
- ch.message_subscribe(queue = "q", destination="d", confirm_mode = 0)
- ch.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "d")
- ch.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "d")
-
- queue = self.client.queue("d")
- msg = queue.get(timeout = 3)
- self.assertEquals("message-body", msg.content.body)
-
- def assertDataEquals(self, channel, msg, expected):
- self.assertEquals(expected, msg.content.body)
-
- def assertEmpty(self, queue):
- try:
- extra = queue.get(timeout=1)
- self.fail("Queue not empty, contains: " + extra.content.body)
- except Empty: None
-
-class SizelessContent(Content):
-
- def size(self):
- return None
diff --git a/qpid/python/tests_0-10_preview/persistence.py b/qpid/python/tests_0-10_preview/persistence.py
deleted file mode 100644
index ad578474eb..0000000000
--- a/qpid/python/tests_0-10_preview/persistence.py
+++ /dev/null
@@ -1,67 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class PersistenceTests(TestBase):
- def test_delete_queue_after_publish(self):
- channel = self.channel
- channel.synchronous = False
-
- #create queue
- channel.queue_declare(queue = "q", auto_delete=True, durable=True)
-
- #send message
- for i in range(1, 10):
- channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
-
- channel.synchronous = True
- #explicitly delete queue
- channel.queue_delete(queue = "q")
-
- def test_ack_message_from_deleted_queue(self):
- channel = self.channel
- channel.synchronous = False
-
- #create queue
- channel.queue_declare(queue = "q", auto_delete=True, durable=True)
-
- #send message
- channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
-
- #create consumer
- channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=0)
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
- channel.message_flow(unit = 0, value = 10, destination = "a")
- queue = self.client.queue("a")
-
- #consume the message, cancel subscription (triggering auto-delete), then ack it
- msg = queue.get(timeout = 5)
- channel.message_cancel(destination = "a")
- msg.complete()
-
- def test_queue_deletion(self):
- channel = self.channel
- channel.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
- channel.queue_bind(exchange="amq.topic", queue="durable-subscriber-queue", routing_key="xyz")
- channel.message_transfer(destination= "amq.topic", content=Content(properties={'routing_key' : "xyz", 'delivery_mode':2}, body = "my-message"))
- channel.queue_delete(queue = "durable-subscriber-queue")
-
diff --git a/qpid/python/tests_0-10_preview/query.py b/qpid/python/tests_0-10_preview/query.py
deleted file mode 100644
index eba2ee6dd1..0000000000
--- a/qpid/python/tests_0-10_preview/query.py
+++ /dev/null
@@ -1,227 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class QueryTests(TestBase):
- """Tests for various query methods introduced in 0-10 and available in 0-9 for preview"""
-
- def test_exchange_query(self):
- """
- Test that the exchange_query method works as expected
- """
- channel = self.channel
- #check returned type for the standard exchanges
- self.assert_type("direct", channel.exchange_query(name="amq.direct"))
- self.assert_type("topic", channel.exchange_query(name="amq.topic"))
- self.assert_type("fanout", channel.exchange_query(name="amq.fanout"))
- self.assert_type("headers", channel.exchange_query(name="amq.match"))
- self.assert_type("direct", channel.exchange_query(name=""))
- #declare an exchange
- channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False)
- #check that the result of a query is as expected
- response = channel.exchange_query(name="my-test-exchange")
- self.assert_type("direct", response)
- self.assertEqual(False, response.durable)
- self.assertEqual(False, response.not_found)
- #delete the exchange
- channel.exchange_delete(exchange="my-test-exchange")
- #check that the query now reports not-found
- self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found)
-
- def assert_type(self, expected_type, response):
- self.assertEqual(expected_type, response.__getattr__("type"))
-
- def test_binding_query_direct(self):
- """
- Test that the binding_query method works as expected with the direct exchange
- """
- self.binding_query_with_key("amq.direct")
-
- def test_binding_query_topic(self):
- """
- Test that the binding_query method works as expected with the direct exchange
- """
- self.binding_query_with_key("amq.topic")
-
- def binding_query_with_key(self, exchange_name):
- channel = self.channel
- #setup: create two queues
- channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
-
- channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key")
-
- # test detection of any binding to specific queue
- response = channel.binding_query(exchange=exchange_name, queue="used-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
-
- # test detection of specific binding to any queue
- response = channel.binding_query(exchange=exchange_name, routing_key="used-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.key_not_matched)
-
- # test detection of specific binding to specific queue
- response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="used-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
- self.assertEqual(False, response.key_not_matched)
-
- # test unmatched queue, unspecified binding
- response = channel.binding_query(exchange=exchange_name, queue="unused-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.queue_not_matched)
-
- # test unspecified queue, unmatched binding
- response = channel.binding_query(exchange=exchange_name, routing_key="unused-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.key_not_matched)
-
- # test matched queue, unmatched binding
- response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="unused-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
- self.assertEqual(True, response.key_not_matched)
-
- # test unmatched queue, matched binding
- response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="used-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.queue_not_matched)
- self.assertEqual(False, response.key_not_matched)
-
- # test unmatched queue, unmatched binding
- response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="unused-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.queue_not_matched)
- self.assertEqual(True, response.key_not_matched)
-
- #test exchange not found
- self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
-
- #test queue not found
- self.assertEqual(True, channel.binding_query(exchange=exchange_name, queue="unknown-queue").queue_not_found)
-
-
- def test_binding_query_fanout(self):
- """
- Test that the binding_query method works as expected with fanout exchange
- """
- channel = self.channel
- #setup
- channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
- channel.queue_bind(exchange="amq.fanout", queue="used-queue")
-
- # test detection of any binding to specific queue
- response = channel.binding_query(exchange="amq.fanout", queue="used-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
-
- # test unmatched queue, unspecified binding
- response = channel.binding_query(exchange="amq.fanout", queue="unused-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.queue_not_matched)
-
- #test exchange not found
- self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
-
- #test queue not found
- self.assertEqual(True, channel.binding_query(exchange="amq.fanout", queue="unknown-queue").queue_not_found)
-
- def test_binding_query_header(self):
- """
- Test that the binding_query method works as expected with headers exchanges
- """
- channel = self.channel
- #setup
- channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
- channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )
-
- # test detection of any binding to specific queue
- response = channel.binding_query(exchange="amq.match", queue="used-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
-
- # test detection of specific binding to any queue
- response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "a":"A"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.args_not_matched)
-
- # test detection of specific binding to specific queue
- response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
- self.assertEqual(False, response.args_not_matched)
-
- # test unmatched queue, unspecified binding
- response = channel.binding_query(exchange="amq.match", queue="unused-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.queue_not_matched)
-
- # test unspecified queue, unmatched binding
- response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "b":"B"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.args_not_matched)
-
- # test matched queue, unmatched binding
- response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
- self.assertEqual(True, response.args_not_matched)
-
- # test unmatched queue, matched binding
- response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.queue_not_matched)
- self.assertEqual(False, response.args_not_matched)
-
- # test unmatched queue, unmatched binding
- response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.queue_not_matched)
- self.assertEqual(True, response.args_not_matched)
-
- #test exchange not found
- self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
-
- #test queue not found
- self.assertEqual(True, channel.binding_query(exchange="amq.match", queue="unknown-queue").queue_not_found)
-
diff --git a/qpid/python/tests_0-10_preview/queue.py b/qpid/python/tests_0-10_preview/queue.py
deleted file mode 100644
index 7b3590d11b..0000000000
--- a/qpid/python/tests_0-10_preview/queue.py
+++ /dev/null
@@ -1,338 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class QueueTests(TestBase):
- """Tests for 'methods' on the amqp queue 'class'"""
-
- def test_purge(self):
- """
- Test that the purge method removes messages from the queue
- """
- channel = self.channel
- #setup, declare a queue and add some messages to it:
- channel.exchange_declare(exchange="test-exchange", type="direct")
- channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
- channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
- channel.message_transfer(destination="test-exchange", content=Content("one", properties={'routing_key':"key"}))
- channel.message_transfer(destination="test-exchange", content=Content("two", properties={'routing_key':"key"}))
- channel.message_transfer(destination="test-exchange", content=Content("three", properties={'routing_key':"key"}))
-
- #check that the queue now reports 3 messages:
- channel.queue_declare(queue="test-queue")
- reply = channel.queue_query(queue="test-queue")
- self.assertEqual(3, reply.message_count)
-
- #now do the purge, then test that three messages are purged and the count drops to 0
- channel.queue_purge(queue="test-queue");
- reply = channel.queue_query(queue="test-queue")
- self.assertEqual(0, reply.message_count)
-
- #send a further message and consume it, ensuring that the other messages are really gone
- channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"}))
- self.subscribe(queue="test-queue", destination="tag")
- queue = self.client.queue("tag")
- msg = queue.get(timeout=1)
- self.assertEqual("four", msg.content.body)
-
- #check error conditions (use new channels):
- channel = self.client.channel(2)
- channel.session_open()
- try:
- #queue specified but doesn't exist:
- channel.queue_purge(queue="invalid-queue")
- self.fail("Expected failure when purging non-existent queue")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- channel = self.client.channel(3)
- channel.session_open()
- try:
- #queue not specified and none previously declared for channel:
- channel.queue_purge()
- self.fail("Expected failure when purging unspecified queue")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- #cleanup
- other = self.connect()
- channel = other.channel(1)
- channel.session_open()
- channel.exchange_delete(exchange="test-exchange")
-
- def test_declare_exclusive(self):
- """
- Test that the exclusive field is honoured in queue.declare
- """
- # TestBase.setUp has already opened channel(1)
- c1 = self.channel
- # Here we open a second separate connection:
- other = self.connect()
- c2 = other.channel(1)
- c2.session_open()
-
- #declare an exclusive queue:
- c1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
- try:
- #other connection should not be allowed to declare this:
- c2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
- self.fail("Expected second exclusive queue_declare to raise a channel exception")
- except Closed, e:
- self.assertChannelException(405, e.args[0])
-
-
- def test_declare_passive(self):
- """
- Test that the passive field is honoured in queue.declare
- """
- channel = self.channel
- #declare an exclusive queue:
- channel.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="passive-queue-1", passive=True)
- try:
- #other connection should not be allowed to declare this:
- channel.queue_declare(queue="passive-queue-2", passive=True)
- self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
- def test_bind(self):
- """
- Test various permutations of the queue.bind method
- """
- channel = self.channel
- channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
-
- #straightforward case, both exchange & queue exist so no errors expected:
- channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1")
-
- #use the queue name where the routing key is not specified:
- channel.queue_bind(queue="queue-1", exchange="amq.direct")
-
- #try and bind to non-existant exchange
- try:
- channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1")
- self.fail("Expected bind to non-existant exchange to fail")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- #need to reopen a channel:
- channel = self.client.channel(2)
- channel.session_open()
-
- #try and bind non-existant queue:
- try:
- channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1")
- self.fail("Expected bind of non-existant queue to fail")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- def test_unbind_direct(self):
- self.unbind_test(exchange="amq.direct", routing_key="key")
-
- def test_unbind_topic(self):
- self.unbind_test(exchange="amq.topic", routing_key="key")
-
- def test_unbind_fanout(self):
- self.unbind_test(exchange="amq.fanout")
-
- def test_unbind_headers(self):
- self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"})
-
- def unbind_test(self, exchange, routing_key="", args=None, headers={}):
- #bind two queues and consume from them
- channel = self.channel
-
- channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="queue-2", exclusive=True, auto_delete=True)
-
- self.subscribe(queue="queue-1", destination="queue-1")
- self.subscribe(queue="queue-2", destination="queue-2")
-
- queue1 = self.client.queue("queue-1")
- queue2 = self.client.queue("queue-2")
-
- channel.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
- channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args)
-
- #send a message that will match both bindings
- channel.message_transfer(destination=exchange,
- content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers}))
-
- #unbind first queue
- channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
-
- #send another message
- channel.message_transfer(destination=exchange,
- content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers}))
-
- #check one queue has both messages and the other has only one
- self.assertEquals("one", queue1.get(timeout=1).content.body)
- try:
- msg = queue1.get(timeout=1)
- self.fail("Got extra message: %s" % msg.content.body)
- except Empty: pass
-
- self.assertEquals("one", queue2.get(timeout=1).content.body)
- self.assertEquals("two", queue2.get(timeout=1).content.body)
- try:
- msg = queue2.get(timeout=1)
- self.fail("Got extra message: " + msg)
- except Empty: pass
-
-
- def test_delete_simple(self):
- """
- Test core queue deletion behaviour
- """
- channel = self.channel
-
- #straight-forward case:
- channel.queue_declare(queue="delete-me")
- channel.message_transfer(content=Content("a", properties={'routing_key':"delete-me"}))
- channel.message_transfer(content=Content("b", properties={'routing_key':"delete-me"}))
- channel.message_transfer(content=Content("c", properties={'routing_key':"delete-me"}))
- channel.queue_delete(queue="delete-me")
- #check that it has gone be declaring passively
- try:
- channel.queue_declare(queue="delete-me", passive=True)
- self.fail("Queue has not been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- #check attempted deletion of non-existant queue is handled correctly:
- channel = self.client.channel(2)
- channel.session_open()
- try:
- channel.queue_delete(queue="i-dont-exist", if_empty=True)
- self.fail("Expected delete of non-existant queue to fail")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
-
- def test_delete_ifempty(self):
- """
- Test that if_empty field of queue_delete is honoured
- """
- channel = self.channel
-
- #create a queue and add a message to it (use default binding):
- channel.queue_declare(queue="delete-me-2")
- channel.queue_declare(queue="delete-me-2", passive=True)
- channel.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"}))
-
- #try to delete, but only if empty:
- try:
- channel.queue_delete(queue="delete-me-2", if_empty=True)
- self.fail("Expected delete if_empty to fail for non-empty queue")
- except Closed, e:
- self.assertChannelException(406, e.args[0])
-
- #need new channel now:
- channel = self.client.channel(2)
- channel.session_open()
-
- #empty queue:
- self.subscribe(channel, destination="consumer_tag", queue="delete-me-2")
- queue = self.client.queue("consumer_tag")
- msg = queue.get(timeout=1)
- self.assertEqual("message", msg.content.body)
- channel.message_cancel(destination="consumer_tag")
-
- #retry deletion on empty queue:
- channel.queue_delete(queue="delete-me-2", if_empty=True)
-
- #check that it has gone by declaring passively:
- try:
- channel.queue_declare(queue="delete-me-2", passive=True)
- self.fail("Queue has not been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- def test_delete_ifunused(self):
- """
- Test that if_unused field of queue_delete is honoured
- """
- channel = self.channel
-
- #create a queue and register a consumer:
- channel.queue_declare(queue="delete-me-3")
- channel.queue_declare(queue="delete-me-3", passive=True)
- self.subscribe(destination="consumer_tag", queue="delete-me-3")
-
- #need new channel now:
- channel2 = self.client.channel(2)
- channel2.session_open()
- #try to delete, but only if empty:
- try:
- channel2.queue_delete(queue="delete-me-3", if_unused=True)
- self.fail("Expected delete if_unused to fail for queue with existing consumer")
- except Closed, e:
- self.assertChannelException(406, e.args[0])
-
-
- channel.message_cancel(destination="consumer_tag")
- channel.queue_delete(queue="delete-me-3", if_unused=True)
- #check that it has gone by declaring passively:
- try:
- channel.queue_declare(queue="delete-me-3", passive=True)
- self.fail("Queue has not been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
- def test_autodelete_shared(self):
- """
- Test auto-deletion (of non-exclusive queues)
- """
- channel = self.channel
- other = self.connect()
- channel2 = other.channel(1)
- channel2.session_open()
-
- channel.queue_declare(queue="auto-delete-me", auto_delete=True)
-
- #consume from both channels
- reply = channel.basic_consume(queue="auto-delete-me")
- channel2.basic_consume(queue="auto-delete-me")
-
- #implicit cancel
- channel2.session_close()
-
- #check it is still there
- channel.queue_declare(queue="auto-delete-me", passive=True)
-
- #explicit cancel => queue is now unused again:
- channel.basic_cancel(consumer_tag=reply.consumer_tag)
-
- #NOTE: this assumes there is no timeout in use
-
- #check that it has gone be declaring passively
- try:
- channel.queue_declare(queue="auto-delete-me", passive=True)
- self.fail("Expected queue to have been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
diff --git a/qpid/python/tests_0-10_preview/testlib.py b/qpid/python/tests_0-10_preview/testlib.py
deleted file mode 100644
index a0355c4ce0..0000000000
--- a/qpid/python/tests_0-10_preview/testlib.py
+++ /dev/null
@@ -1,66 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-#
-# Tests for the testlib itself.
-#
-
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-from Queue import Empty
-
-import sys
-from traceback import *
-
-def mytrace(frame, event, arg):
- print_stack(frame);
- print "===="
- return mytrace
-
-class TestBaseTest(TestBase):
- """Verify TestBase functions work as expected"""
-
- def testAssertEmptyPass(self):
- """Test assert empty works"""
- self.queue_declare(queue="empty")
- q = self.consume("empty")
- self.assertEmpty(q)
- try:
- q.get(timeout=1)
- self.fail("Queue is not empty.")
- except Empty: None # Ignore
-
- def testAssertEmptyFail(self):
- self.queue_declare(queue="full")
- q = self.consume("full")
- self.channel.message_transfer(content=Content("", properties={'routing_key':"full"}))
- try:
- self.assertEmpty(q);
- self.fail("assertEmpty did not assert on non-empty queue")
- except AssertionError: None # Ignore
-
- def testMessageProperties(self):
- """Verify properties are passed with message"""
- props={"x":1, "y":2}
- self.queue_declare(queue="q")
- q = self.consume("q")
- self.assertPublishGet(q, routing_key="q", properties=props)
-
-
-
diff --git a/qpid/python/tests_0-10_preview/tx.py b/qpid/python/tests_0-10_preview/tx.py
deleted file mode 100644
index 3fd1065af3..0000000000
--- a/qpid/python/tests_0-10_preview/tx.py
+++ /dev/null
@@ -1,231 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class TxTests(TestBase):
- """
- Tests for 'methods' on the amqp tx 'class'
- """
-
- def test_commit(self):
- """
- Test that commited publishes are delivered and commited acks are not re-delivered
- """
- channel2 = self.client.channel(2)
- channel2.session_open()
- self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
- channel2.tx_commit()
- channel2.session_close()
-
- #use a different channel with new subscriptions to ensure
- #there is no redelivery of acked messages:
- channel = self.channel
- channel.tx_select()
-
- self.subscribe(channel, queue="tx-commit-a", destination="qa", confirm_mode=1)
- queue_a = self.client.queue("qa")
-
- self.subscribe(channel, queue="tx-commit-b", destination="qb", confirm_mode=1)
- queue_b = self.client.queue("qb")
-
- self.subscribe(channel, queue="tx-commit-c", destination="qc", confirm_mode=1)
- queue_c = self.client.queue("qc")
-
- #check results
- for i in range(1, 5):
- msg = queue_c.get(timeout=1)
- self.assertEqual("TxMessage %d" % i, msg.content.body)
- msg.complete()
-
- msg = queue_b.get(timeout=1)
- self.assertEqual("TxMessage 6", msg.content.body)
- msg.complete()
-
- msg = queue_a.get(timeout=1)
- self.assertEqual("TxMessage 7", msg.content.body)
- msg.complete()
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None
-
- #cleanup
- channel.tx_commit()
-
- def test_auto_rollback(self):
- """
- Test that a channel closed with an open transaction is effectively rolled back
- """
- channel2 = self.client.channel(2)
- channel2.session_open()
- queue_a, queue_b, queue_c = self.perform_txn_work(channel2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None
-
- channel2.session_close()
- channel = self.channel
- channel.tx_select()
-
- self.subscribe(channel, queue="tx-autorollback-a", destination="qa", confirm_mode=1)
- queue_a = self.client.queue("qa")
-
- self.subscribe(channel, queue="tx-autorollback-b", destination="qb", confirm_mode=1)
- queue_b = self.client.queue("qb")
-
- self.subscribe(channel, queue="tx-autorollback-c", destination="qc", confirm_mode=1)
- queue_c = self.client.queue("qc")
-
- #check results
- for i in range(1, 5):
- msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
- msg.complete()
-
- msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.content.body)
- msg.complete()
-
- msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.content.body)
- msg.complete()
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None
-
- #cleanup
- channel.tx_commit()
-
- def test_rollback(self):
- """
- Test that rolled back publishes are not delivered and rolled back acks are re-delivered
- """
- channel = self.channel
- queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None
-
- #stop subscriptions (ensures no delivery occurs during rollback as messages are requeued)
- for d in ["sub_a", "sub_b", "sub_c"]:
- channel.message_stop(destination=d)
-
- channel.tx_rollback()
-
- #restart susbcriptions
- for d in ["sub_a", "sub_b", "sub_c"]:
- channel.message_flow(destination=d, unit=0, value=0xFFFFFFFF)
- channel.message_flow(destination=d, unit=1, value=0xFFFFFFFF)
-
- #check results
- for i in range(1, 5):
- msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
- msg.complete()
-
- msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.content.body)
- msg.complete()
-
- msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.content.body)
- msg.complete()
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None
-
- #cleanup
- channel.tx_commit()
-
- def perform_txn_work(self, channel, name_a, name_b, name_c):
- """
- Utility method that does some setup and some work under a transaction. Used for testing both
- commit and rollback
- """
- #setup:
- channel.queue_declare(queue=name_a, exclusive=True, auto_delete=True)
- channel.queue_declare(queue=name_b, exclusive=True, auto_delete=True)
- channel.queue_declare(queue=name_c, exclusive=True, auto_delete=True)
-
- key = "my_key_" + name_b
- topic = "my_topic_" + name_c
-
- channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key)
- channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic)
-
- for i in range(1, 5):
- channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"msg%d" % i}, body="Message %d" % i))
-
- channel.message_transfer(destination="amq.direct",
- content=Content(properties={'routing_key':key, 'message_id':"msg6"}, body="Message 6"))
- channel.message_transfer(destination="amq.topic",
- content=Content(properties={'routing_key':topic, 'message_id':"msg7"}, body="Message 7"))
-
- channel.tx_select()
-
- #consume and ack messages
- self.subscribe(channel, queue=name_a, destination="sub_a", confirm_mode=1)
- queue_a = self.client.queue("sub_a")
- for i in range(1, 5):
- msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- msg.complete()
-
- self.subscribe(channel, queue=name_b, destination="sub_b", confirm_mode=1)
- queue_b = self.client.queue("sub_b")
- msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.content.body)
- msg.complete()
-
- sub_c = self.subscribe(channel, queue=name_c, destination="sub_c", confirm_mode=1)
- queue_c = self.client.queue("sub_c")
- msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.content.body)
- msg.complete()
-
- #publish messages
- for i in range(1, 5):
- channel.message_transfer(destination="amq.topic",
- content=Content(properties={'routing_key':topic, 'message_id':"tx-msg%d" % i},
- body="TxMessage %d" % i))
-
- channel.message_transfer(destination="amq.direct",
- content=Content(properties={'routing_key':key, 'message_id':"tx-msg6"},
- body="TxMessage 6"))
- channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"tx-msg7"},
- body="TxMessage 7"))
- return queue_a, queue_b, queue_c