summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/cpp_failing_0-10.txt18
-rw-r--r--qpid/python/tests_0-10/example.py44
-rw-r--r--qpid/python/tests_0-10/exchange.py135
-rw-r--r--qpid/python/tests_0-10/message.py58
-rw-r--r--qpid/python/tests_0-10/queue.py39
5 files changed, 201 insertions, 93 deletions
diff --git a/qpid/python/cpp_failing_0-10.txt b/qpid/python/cpp_failing_0-10.txt
index 1b937563c6..3d00313d2d 100644
--- a/qpid/python/cpp_failing_0-10.txt
+++ b/qpid/python/cpp_failing_0-10.txt
@@ -11,20 +11,13 @@ tests_0-10.alternate_exchange.AlternateExchangeTests.test_delete_while_used_by_q
tests_0-10.alternate_exchange.AlternateExchangeTests.test_queue_delete
tests_0-10.alternate_exchange.AlternateExchangeTests.test_unroutable
tests_0-10.exchange.DeclareMethodPassiveFieldNotFoundRuleTests.test
-tests_0-10.exchange.DefaultExchangeRuleTests.testDefaultExchange
tests_0-10.exchange.ExchangeTests.testHeadersBindNoMatchArg
tests_0-10.exchange.HeadersExchangeTests.testMatchAll
tests_0-10.exchange.HeadersExchangeTests.testMatchAny
tests_0-10.exchange.MiscellaneousErrorsTests.testDifferentDeclaredType
tests_0-10.exchange.MiscellaneousErrorsTests.testTypeNotKnown
-tests_0-10.exchange.RecommendedTypesRuleTests.testDirect
-tests_0-10.exchange.RecommendedTypesRuleTests.testFanout
tests_0-10.exchange.RecommendedTypesRuleTests.testHeaders
-tests_0-10.exchange.RecommendedTypesRuleTests.testTopic
-tests_0-10.exchange.RequiredInstancesRuleTests.testAmqDirect
-tests_0-10.exchange.RequiredInstancesRuleTests.testAmqFanOut
tests_0-10.exchange.RequiredInstancesRuleTests.testAmqMatch
-tests_0-10.exchange.RequiredInstancesRuleTests.testAmqTopic
tests_0-10.dtx.DtxTests.test_bad_resume
tests_0-10.dtx.DtxTests.test_end
tests_0-10.dtx.DtxTests.test_end_suspend_and_fail
@@ -47,6 +40,7 @@ tests_0-10.dtx.DtxTests.test_start_join_and_resume
tests_0-10.dtx.DtxTests.test_suspend_resume
tests_0-10.dtx.DtxTests.test_suspend_start_end_resume
tests_0-10.message.MessageTests.test_acquire
+tests_0-10.message.MessageTests.test_subscribe_not_acquired_3
tests_0-10.message.MessageTests.test_consume_exclusive
tests_0-10.message.MessageTests.test_consume_no_local
tests_0-10.message.MessageTests.test_consume_no_local_awkward
@@ -55,25 +49,17 @@ tests_0-10.message.MessageTests.test_consume_unique_consumers
tests_0-10.message.MessageTests.test_no_size
tests_0-10.message.MessageTests.test_qos_prefetch_count
tests_0-10.message.MessageTests.test_qos_prefetch_size
-tests_0-10.message.MessageTests.test_ranged_ack
tests_0-10.message.MessageTests.test_recover
tests_0-10.message.MessageTests.test_recover_requeue
-tests_0-10.message.MessageTests.test_subscribe_not_acquired
-tests_0-10.message.MessageTests.test_subscribe_not_acquired_2
-tests_0-10.message.MessageTests.test_subscribe_not_acquired_3
tests_0-10.testlib.TestBaseTest.testAssertEmptyFail
tests_0-10.testlib.TestBaseTest.testAssertEmptyPass
tests_0-10.testlib.TestBaseTest.testMessageProperties
tests_0-10.queue.QueueTests.test_autodelete_shared
-tests_0-10.queue.QueueTests.test_bind
tests_0-10.queue.QueueTests.test_declare_exclusive
tests_0-10.queue.QueueTests.test_declare_passive
tests_0-10.queue.QueueTests.test_delete_ifempty
tests_0-10.queue.QueueTests.test_delete_ifunused
tests_0-10.queue.QueueTests.test_delete_simple
tests_0-10.queue.QueueTests.test_purge
-tests_0-10.queue.QueueTests.test_unbind_direct
-tests_0-10.queue.QueueTests.test_unbind_fanout
+tests_0-10.queue.QueueTests.test_bind
tests_0-10.queue.QueueTests.test_unbind_headers
-tests_0-10.queue.QueueTests.test_unbind_topic
-tests_0-10.example.ExampleTest.test_example
diff --git a/qpid/python/tests_0-10/example.py b/qpid/python/tests_0-10/example.py
index da5ee2441f..fd10a8df4f 100644
--- a/qpid/python/tests_0-10/example.py
+++ b/qpid/python/tests_0-10/example.py
@@ -17,10 +17,10 @@
# under the License.
#
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import TestBase010
-class ExampleTest (TestBase):
+class ExampleTest (TestBase010):
"""
An example Qpid test, illustrating the unittest framework and the
python Qpid client. The test class must inherit TestBase. The
@@ -35,9 +35,9 @@ class ExampleTest (TestBase):
"""
# By inheriting TestBase, self.client is automatically connected
- # and self.channel is automatically opened as channel(1)
- # Other channel methods mimic the protocol.
- channel = self.channel
+ # and self.session is automatically opened as session(1)
+ # Other session methods mimic the protocol.
+ session = self.session
# Now we can send regular commands. If you want to see what the method
# arguments mean or what other commands are available, you can use the
@@ -57,30 +57,30 @@ class ExampleTest (TestBase):
# interact with the server.
# Here we use ordinal arguments.
- self.exchange_declare(channel, 0, "test", "direct")
+ session.exchange_declare("test", "direct")
# Here we use keyword arguments.
- self.queue_declare(channel, queue="test-queue")
- channel.queue_bind(queue="test-queue", exchange="test", routing_key="key")
+ session.queue_declare(session, queue="test-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="test-queue", exchange="test", binding_key="key")
- # Call Channel.basic_consume to register as a consumer.
+ # Call Session.subscribe to register as a consumer.
# All the protocol methods return a message object. The message object
# has fields corresponding to the reply method fields, plus a content
# field that is filled if the reply includes content. In this case the
# interesting field is the consumer_tag.
- channel.message_subscribe(queue="test-queue", destination="consumer_tag")
- channel.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF)
- channel.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF)
+ session.message_subscribe(queue="test-queue", destination="consumer_tag")
+ session.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF)
- # We can use the Client.queue(...) method to access the queue
- # corresponding to our consumer_tag.
- queue = self.client.queue("consumer_tag")
+ # We can use the session.incoming(...) method to access the messages
+ # delivered for our consumer_tag.
+ queue = session.incoming("consumer_tag")
# Now lets publish a message and see if our consumer gets it. To do
- # this we need to import the Content class.
- sent = Content("Hello World!")
- sent["routing_key"] = "key"
- channel.message_transfer(destination="test", content=sent)
+ # this we need to import the Message class.
+ delivery_properties = session.delivery_properties(routing_key="key")
+ sent = Message(delivery_properties, "Hello World!")
+ session.message_transfer(destination="test", message=sent)
# Now we'll wait for the message to arrive. We can use the timeout
# argument in case the server hangs. By default queue.get() will wait
@@ -88,8 +88,8 @@ class ExampleTest (TestBase):
msg = queue.get(timeout=10)
# And check that we got the right response with assertEqual
- self.assertEqual(sent.body, msg.content.body)
+ self.assertEqual(sent.body, msg.body)
# Now acknowledge the message.
- msg.complete()
+ session.message_accept(RangedSet(msg.id))
diff --git a/qpid/python/tests_0-10/exchange.py b/qpid/python/tests_0-10/exchange.py
index afa7f05a49..151807b045 100644
--- a/qpid/python/tests_0-10/exchange.py
+++ b/qpid/python/tests_0-10/exchange.py
@@ -23,12 +23,89 @@ Tests for exchange behaviour.
Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
"""
-import Queue, logging
-from qpid.testlib import TestBase
-from qpid.content import Content
+import Queue, logging, traceback
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message
from qpid.client import Closed
+class TestHelper(TestBase010):
+ def setUp(self):
+ TestBase010.setUp(self)
+ self.queues = []
+ self.exchanges = []
+
+ def tearDown(self):
+ try:
+ for ssn, q in self.queues:
+ ssn.queue_delete(queue=q)
+ for ssn, ex in self.exchanges:
+ ssn.exchange_delete(exchange=ex)
+ except:
+ print "Error on tearDown:"
+ print traceback.print_exc()
+
+ def createMessage(self, key="", body=""):
+ return Message(self.session.delivery_properties(routing_key=key), body)
+
+ def assertPublishGet(self, queue, exchange="", routing_key="", properties=None):
+ """
+ Publish to exchange and assert queue.get() returns the same message.
+ """
+ body = self.uniqueString()
+ dp=self.session.delivery_properties(routing_key=routing_key)
+ mp=self.session.message_properties(application_headers=properties)
+ self.session.message_transfer(destination=exchange, message=Message(dp, mp, body))
+ msg = queue.get(timeout=1)
+ self.assertEqual(body, msg.body)
+ if (properties):
+ self.assertEqual(properties, msg.content['application_headers'])
+
+ def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
+ """
+ Publish a message and consume it, assert it comes back intact.
+ Return the Queue object used to consume.
+ """
+ self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
+
+ def assertEmpty(self, queue):
+ """Assert that the queue is empty"""
+ try:
+ queue.get(timeout=1)
+ self.fail("Queue is not empty.")
+ except Queue.Empty: None # Ignore
+
+ def queue_declare(self, session=None, *args, **keys):
+ session = session or self.session
+ reply = session.queue_declare(*args, **keys)
+ self.queues.append((session, keys["queue"]))
+ return reply
+
+ def exchange_declare(self, session=None, ticket=0, exchange='',
+ type='', passive=False, durable=False,
+ auto_delete=False,
+ arguments={}):
+ session = session or self.session
+ reply = session.exchange_declare(exchange=exchange, type=type, passive=passive,durable=durable, auto_delete=auto_delete, arguments=arguments)
+ self.exchanges.append((session,exchange))
+ return reply
+
+ def uniqueString(self):
+ """Generate a unique string, unique for this TestBase instance"""
+ if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
+ return "Test Message " + str(self.uniqueCounter)
+
+ def consume(self, queueName):
+ """Consume from named queue returns the Queue object."""
+ if not "uniqueTag" in dir(self): self.uniqueTag = 1
+ else: self.uniqueTag += 1
+ consumer_tag = "tag" + str(self.uniqueTag)
+ self.session.message_subscribe(queue=queueName, destination=consumer_tag)
+ self.session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
+ self.session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
+ return self.session.incoming(consumer_tag)
+
+
class StandardExchangeVerifier:
"""Verifies standard exchange behavior.
@@ -37,7 +114,7 @@ class StandardExchangeVerifier:
def verifyDirectExchange(self, ex):
"""Verify that ex behaves like a direct exchange."""
self.queue_declare(queue="q")
- self.session.exchange_bind(queue="q", exchange=ex, routing_key="k")
+ self.session.exchange_bind(queue="q", exchange=ex, binding_key="k")
self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
try:
self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
@@ -55,16 +132,16 @@ class StandardExchangeVerifier:
def verifyTopicExchange(self, ex):
"""Verify that ex behaves like a topic exchange"""
self.queue_declare(queue="a")
- self.session.exchange_bind(queue="a", exchange=ex, routing_key="a.#.b.*")
+ self.session.exchange_bind(queue="a", exchange=ex, binding_key="a.#.b.*")
q = self.consume("a")
self.assertPublishGet(q, ex, "a.b.x")
self.assertPublishGet(q, ex, "a.x.b.x")
self.assertPublishGet(q, ex, "a.x.x.b.x")
# Shouldn't match
- self.session.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"}))
- self.session.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b.x.y"}))
- self.session.message_transfer(destination=ex, content=Content(properties={'routing_key':"x.a.b.x"}))
- self.session.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"}))
+ self.session.message_transfer(destination=ex, message=self.createMessage("a.b"))
+ self.session.message_transfer(destination=ex, message=self.createMessage("a.b.x.y"))
+ self.session.message_transfer(destination=ex, message=self.createMessage("x.a.b.x"))
+ self.session.message_transfer(destination=ex, message=self.createMessage("a.b"))
self.assert_(q.empty())
def verifyHeadersExchange(self, ex):
@@ -78,7 +155,7 @@ class StandardExchangeVerifier:
self.assertEmpty(q);
-class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
+class RecommendedTypesRuleTests(TestHelper, StandardExchangeVerifier):
"""
The server SHOULD implement these standard exchange types: topic, headers.
@@ -106,7 +183,7 @@ class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
self.verifyHeadersExchange("h")
-class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
+class RequiredInstancesRuleTests(TestHelper, StandardExchangeVerifier):
"""
The server MUST, in each virtual host, pre-declare an exchange instance
for each standard exchange type that it implements, where the name of the
@@ -124,7 +201,7 @@ class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
-class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
+class DefaultExchangeRuleTests(TestHelper, StandardExchangeVerifier):
"""
The server MUST predeclare a direct exchange to act as the default exchange
for content Publish methods and for default queue bindings.
@@ -144,20 +221,20 @@ class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
# TODO aconway 2006-09-27: Fill in empty tests:
-class DefaultAccessRuleTests(TestBase):
+class DefaultAccessRuleTests(TestHelper):
"""
The server MUST NOT allow clients to access the default exchange except
by specifying an empty exchange name in the Queue.Bind and content Publish
methods.
"""
-class ExtensionsRuleTests(TestBase):
+class ExtensionsRuleTests(TestHelper):
"""
The server MAY implement other exchange types as wanted.
"""
-class DeclareMethodMinimumRuleTests(TestBase):
+class DeclareMethodMinimumRuleTests(TestHelper):
"""
The server SHOULD support a minimum of 16 exchanges per virtual host and
ideally, impose no limit except as defined by available resources.
@@ -168,7 +245,7 @@ class DeclareMethodMinimumRuleTests(TestBase):
"""
-class DeclareMethodTicketFieldValidityRuleTests(TestBase):
+class DeclareMethodTicketFieldValidityRuleTests(TestHelper):
"""
The client MUST provide a valid access ticket giving "active" access to
the realm in which the exchange exists or will be created, or "passive"
@@ -179,7 +256,7 @@ class DeclareMethodTicketFieldValidityRuleTests(TestBase):
"""
-class DeclareMethodExchangeFieldReservedRuleTests(TestBase):
+class DeclareMethodExchangeFieldReservedRuleTests(TestHelper):
"""
Exchange names starting with "amq." are reserved for predeclared and
standardised exchanges. The client MUST NOT attempt to create an exchange
@@ -189,7 +266,7 @@ class DeclareMethodExchangeFieldReservedRuleTests(TestBase):
"""
-class DeclareMethodTypeFieldTypedRuleTests(TestBase):
+class DeclareMethodTypeFieldTypedRuleTests(TestHelper):
"""
Exchanges cannot be redeclared with different types. The client MUST not
attempt to redeclare an existing exchange with a different type than used
@@ -199,7 +276,7 @@ class DeclareMethodTypeFieldTypedRuleTests(TestBase):
"""
-class DeclareMethodTypeFieldSupportRuleTests(TestBase):
+class DeclareMethodTypeFieldSupportRuleTests(TestHelper):
"""
The client MUST NOT attempt to create an exchange with a type that the
server does not support.
@@ -208,7 +285,7 @@ class DeclareMethodTypeFieldSupportRuleTests(TestBase):
"""
-class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase):
+class DeclareMethodPassiveFieldNotFoundRuleTests(TestHelper):
"""
If set, and the exchange does not already exist, the server MUST raise a
channel exception with reply code 404 (not found).
@@ -221,7 +298,7 @@ class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase):
self.assertChannelException(404, e.args[0])
-class DeclareMethodDurableFieldSupportRuleTests(TestBase):
+class DeclareMethodDurableFieldSupportRuleTests(TestHelper):
"""
The server MUST support both durable and transient exchanges.
@@ -229,7 +306,7 @@ class DeclareMethodDurableFieldSupportRuleTests(TestBase):
"""
-class DeclareMethodDurableFieldStickyRuleTests(TestBase):
+class DeclareMethodDurableFieldStickyRuleTests(TestHelper):
"""
The server MUST ignore the durable field if the exchange already exists.
@@ -237,7 +314,7 @@ class DeclareMethodDurableFieldStickyRuleTests(TestBase):
"""
-class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase):
+class DeclareMethodAutoDeleteFieldStickyRuleTests(TestHelper):
"""
The server MUST ignore the auto-delete field if the exchange already
exists.
@@ -246,7 +323,7 @@ class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase):
"""
-class DeleteMethodTicketFieldValidityRuleTests(TestBase):
+class DeleteMethodTicketFieldValidityRuleTests(TestHelper):
"""
The client MUST provide a valid access ticket giving "active" access
rights to the exchange's access realm.
@@ -256,18 +333,18 @@ class DeleteMethodTicketFieldValidityRuleTests(TestBase):
"""
-class DeleteMethodExchangeFieldExistsRuleTests(TestBase):
+class DeleteMethodExchangeFieldExistsRuleTests(TestHelper):
"""
The client MUST NOT attempt to delete an exchange that does not exist.
"""
-class HeadersExchangeTests(TestBase):
+class HeadersExchangeTests(TestHelper):
"""
Tests for headers exchange functionality.
"""
def setUp(self):
- TestBase.setUp(self)
+ TestHelper.setUp(self)
self.queue_declare(queue="q")
self.q = self.consume("q")
@@ -301,7 +378,7 @@ class HeadersExchangeTests(TestBase):
self.assertEmpty(self.q)
-class MiscellaneousErrorsTests(TestBase):
+class MiscellaneousErrorsTests(TestHelper):
"""
Test some miscellaneous error conditions
"""
@@ -325,7 +402,7 @@ class MiscellaneousErrorsTests(TestBase):
c2.session_open()
c2.exchange_delete(exchange="test_different_declared_type_exchange")
-class ExchangeTests(TestBase):
+class ExchangeTests(TestHelper):
def testHeadersBindNoMatchArg(self):
self.session.queue_declare(queue="q", exclusive=True, auto_delete=True)
try:
diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py
index 64e2bc44c4..aaefb52392 100644
--- a/qpid/python/tests_0-10/message.py
+++ b/qpid/python/tests_0-10/message.py
@@ -619,8 +619,10 @@ class MessageTests(TestBase010):
for q in [qA, qB]:
msg = q.get(timeout = 1)
self.assertEquals("Message %s" % i, msg.body)
+ #TODO: tidy up completion
session.receiver._completed.add(msg.id)
+ #TODO: tidy up completion
session.channel.session_completed(session.receiver._completed)
#messages should still be on the queue:
self.assertEquals(10, session.queue_query(queue = "q").message_count)
@@ -708,8 +710,9 @@ class MessageTests(TestBase010):
"""
Test acking of messages ranges
"""
- session = self.session
- session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ session = self.conn.session("alternate-session", timeout=10)
+
+ session.queue_declare(queue = "q", auto_delete=True)
delivery_properties = session.delivery_properties(routing_key="q")
for i in range (1, 11):
session.message_transfer(message=Message(delivery_properties, "message %s" % (i)))
@@ -718,26 +721,46 @@ class MessageTests(TestBase010):
session.message_flow(unit = 0, value = 10, destination = "a")
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
queue = session.incoming("a")
+ ids = []
for i in range (1, 11):
- self.assertEquals("message %s" % (i), queue.get(timeout = 1).body)
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message %s" % (i), msg.body)
+ ids.append(msg.id)
+
self.assertEmpty(queue)
- #ack all but the third message (command id 2)
- session.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=[0,1,3,6,7,7,8,9])
- session.message_recover()
- self.assertEquals("message 3", queue.get(timeout = 1).content.body)
+ #ack all but the fourth message (command id 2)
+ accepted = RangedSet()
+ accepted.add(ids[0], ids[2])
+ accepted.add(ids[4], ids[9])
+ session.message_accept(accepted)
+
+ #subscribe from second session here to ensure queue is not
+ #auto-deleted when alternate session closes (no need to ack on these):
+ self.session.message_subscribe(queue = "q", destination = "checker")
+
+ #now close the session, and see that the unacked messages are
+ #then redelivered to another subscriber:
+ session.close(timeout=10)
+
+ session = self.session
+ session.message_flow(destination="checker", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="checker", unit=1, value=0xFFFFFFFF)
+ queue = session.incoming("checker")
+
+ self.assertEquals("message 4", queue.get(timeout = 1).body)
self.assertEmpty(queue)
def test_subscribe_not_acquired_2(self):
session = self.session
#publish some messages
- self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 11):
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
#consume some of them
- session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
+ session.message_subscribe(queue = "q", destination = "a")
session.message_set_flow_mode(flow_mode = 0, destination = "a")
session.message_flow(unit = 0, value = 5, destination = "a")
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
@@ -746,11 +769,15 @@ class MessageTests(TestBase010):
for i in range(1, 6):
msg = queue.get(timeout = 1)
self.assertEquals("message-%d" % (i), msg.body)
- msg.complete()
+ #complete and accept
+ session.message_accept(RangedSet(msg.id))
+ #TODO: tidy up completion
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
self.assertEmpty(queue)
#now create a not-acquired subscriber
- session.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+ session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
#check it gets those not consumed
@@ -759,7 +786,10 @@ class MessageTests(TestBase010):
for i in range(6, 11):
msg = queue.get(timeout = 1)
self.assertEquals("message-%d" % (i), msg.body)
- msg.complete()
+ session.message_release(RangedSet(msg.id))
+ #TODO: tidy up completion
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
session.message_flow(unit = 0, value = 1, destination = "b")
self.assertEmpty(queue)
@@ -790,6 +820,10 @@ class MessageTests(TestBase010):
#check that acquire succeeds
response = session.control_queue.get(timeout=1)
self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
+ session.message_release(RangedSet(msg.id))
+ session.channel._completed.add(msg.id)
+ session.channel.session_completed(session.channel._completed)
+
msg.complete()
self.assertEmpty(queue)
diff --git a/qpid/python/tests_0-10/queue.py b/qpid/python/tests_0-10/queue.py
index b972166325..38d7a3291b 100644
--- a/qpid/python/tests_0-10/queue.py
+++ b/qpid/python/tests_0-10/queue.py
@@ -58,7 +58,7 @@ class QueueTests(TestBase010):
session = self.conn.session("error-checker")
try:
#queue specified but doesn't exist:
- session.queue_purge(queue="invalid-queue")
+ session.queue_purge(queue="invalid-queue")
self.fail("Expected failure when purging non-existent queue")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -119,7 +119,7 @@ class QueueTests(TestBase010):
session.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1")
#use the queue name where the routing key is not specified:
- session.queue_bind(queue="queue-1", exchange="amq.direct")
+ session.exchange_bind(queue="queue-1", exchange="amq.direct")
#try and bind to non-existant exchange
try:
@@ -151,32 +151,43 @@ class QueueTests(TestBase010):
def test_unbind_headers(self):
self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"})
- def unbind_test(self, exchange, routing_key="", args=None, headers={}):
+ def unbind_test(self, exchange, routing_key="", args=None, headers=None):
#bind two queues and consume from them
session = self.session
session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
session.queue_declare(queue="queue-2", exclusive=True, auto_delete=True)
- self.subscribe(queue="queue-1", destination="queue-1")
- self.subscribe(queue="queue-2", destination="queue-2")
+ session.message_subscribe(queue="queue-1", destination="queue-1")
+ session.message_flow(destination="queue-1", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="queue-1", unit=1, value=0xFFFFFFFF)
+ session.message_subscribe(queue="queue-2", destination="queue-2")
+ session.message_flow(destination="queue-2", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="queue-2", unit=1, value=0xFFFFFFFF)
queue1 = session.incoming("queue-1")
queue2 = session.incoming("queue-2")
- session.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
- session.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args)
-
+ session.exchange_bind(exchange=exchange, queue="queue-1", binding_key=routing_key, arguments=args)
+ session.exchange_bind(exchange=exchange, queue="queue-2", binding_key=routing_key, arguments=args)
+
+ dp = session.delivery_properties(routing_key=routing_key)
+ if (headers):
+ mp = session.message_properties(application_headers=headers)
+ msg1 = Message(dp, mp, "one")
+ msg2 = Message(dp, mp, "two")
+ else:
+ msg1 = Message(dp, "one")
+ msg2 = Message(dp, "two")
+
#send a message that will match both bindings
- session.message_transfer(destination=exchange,
- message=Message(session.delivery_properties(routing_key=routing_key, application_headers=headers), "one"))
+ session.message_transfer(destination=exchange, message=msg1)
#unbind first queue
- session.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
+ session.exchange_unbind(exchange=exchange, queue="queue-1", binding_key=routing_key)
#send another message
- session.message_transfer(destination=exchange,
- message=Message(session.delivery_properties(routing_key=routing_key, application_headers=headers), "two", ))
+ session.message_transfer(destination=exchange, message=msg2)
#check one queue has both messages and the other has only one
self.assertEquals("one", queue1.get(timeout=1).body)
@@ -320,7 +331,7 @@ class QueueTests(TestBase010):
#NOTE: this assumes there is no timeout in use
- #check that it has gone be declaring passively
+ #check that it has gone by declaring it passively
try:
session.queue_declare(queue="auto-delete-me", passive=True)
self.fail("Expected queue to have been deleted")