diff options
| author | Gordon Sim <gsim@apache.org> | 2008-03-07 16:19:30 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-03-07 16:19:30 +0000 |
| commit | 791cd5a65e193ad4a1645065256a244ee238e9e4 (patch) | |
| tree | 16e0d4af9de12bfe397d9913a34681cb5e537ead /qpid/python | |
| parent | d20ff23e2a578969b1cad084504cd72bcbb38581 (diff) | |
| download | qpid-python-791cd5a65e193ad4a1645065256a244ee238e9e4.tar.gz | |
Converted some more tests to use new client
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@634729 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
| -rw-r--r-- | qpid/python/cpp_failing_0-10.txt | 18 | ||||
| -rw-r--r-- | qpid/python/tests_0-10/example.py | 44 | ||||
| -rw-r--r-- | qpid/python/tests_0-10/exchange.py | 135 | ||||
| -rw-r--r-- | qpid/python/tests_0-10/message.py | 58 | ||||
| -rw-r--r-- | qpid/python/tests_0-10/queue.py | 39 |
5 files changed, 201 insertions, 93 deletions
diff --git a/qpid/python/cpp_failing_0-10.txt b/qpid/python/cpp_failing_0-10.txt index 1b937563c6..3d00313d2d 100644 --- a/qpid/python/cpp_failing_0-10.txt +++ b/qpid/python/cpp_failing_0-10.txt @@ -11,20 +11,13 @@ tests_0-10.alternate_exchange.AlternateExchangeTests.test_delete_while_used_by_q tests_0-10.alternate_exchange.AlternateExchangeTests.test_queue_delete tests_0-10.alternate_exchange.AlternateExchangeTests.test_unroutable tests_0-10.exchange.DeclareMethodPassiveFieldNotFoundRuleTests.test -tests_0-10.exchange.DefaultExchangeRuleTests.testDefaultExchange tests_0-10.exchange.ExchangeTests.testHeadersBindNoMatchArg tests_0-10.exchange.HeadersExchangeTests.testMatchAll tests_0-10.exchange.HeadersExchangeTests.testMatchAny tests_0-10.exchange.MiscellaneousErrorsTests.testDifferentDeclaredType tests_0-10.exchange.MiscellaneousErrorsTests.testTypeNotKnown -tests_0-10.exchange.RecommendedTypesRuleTests.testDirect -tests_0-10.exchange.RecommendedTypesRuleTests.testFanout tests_0-10.exchange.RecommendedTypesRuleTests.testHeaders -tests_0-10.exchange.RecommendedTypesRuleTests.testTopic -tests_0-10.exchange.RequiredInstancesRuleTests.testAmqDirect -tests_0-10.exchange.RequiredInstancesRuleTests.testAmqFanOut tests_0-10.exchange.RequiredInstancesRuleTests.testAmqMatch -tests_0-10.exchange.RequiredInstancesRuleTests.testAmqTopic tests_0-10.dtx.DtxTests.test_bad_resume tests_0-10.dtx.DtxTests.test_end tests_0-10.dtx.DtxTests.test_end_suspend_and_fail @@ -47,6 +40,7 @@ tests_0-10.dtx.DtxTests.test_start_join_and_resume tests_0-10.dtx.DtxTests.test_suspend_resume tests_0-10.dtx.DtxTests.test_suspend_start_end_resume tests_0-10.message.MessageTests.test_acquire +tests_0-10.message.MessageTests.test_subscribe_not_acquired_3 tests_0-10.message.MessageTests.test_consume_exclusive tests_0-10.message.MessageTests.test_consume_no_local tests_0-10.message.MessageTests.test_consume_no_local_awkward @@ -55,25 +49,17 @@ tests_0-10.message.MessageTests.test_consume_unique_consumers tests_0-10.message.MessageTests.test_no_size tests_0-10.message.MessageTests.test_qos_prefetch_count tests_0-10.message.MessageTests.test_qos_prefetch_size -tests_0-10.message.MessageTests.test_ranged_ack tests_0-10.message.MessageTests.test_recover tests_0-10.message.MessageTests.test_recover_requeue -tests_0-10.message.MessageTests.test_subscribe_not_acquired -tests_0-10.message.MessageTests.test_subscribe_not_acquired_2 -tests_0-10.message.MessageTests.test_subscribe_not_acquired_3 tests_0-10.testlib.TestBaseTest.testAssertEmptyFail tests_0-10.testlib.TestBaseTest.testAssertEmptyPass tests_0-10.testlib.TestBaseTest.testMessageProperties tests_0-10.queue.QueueTests.test_autodelete_shared -tests_0-10.queue.QueueTests.test_bind tests_0-10.queue.QueueTests.test_declare_exclusive tests_0-10.queue.QueueTests.test_declare_passive tests_0-10.queue.QueueTests.test_delete_ifempty tests_0-10.queue.QueueTests.test_delete_ifunused tests_0-10.queue.QueueTests.test_delete_simple tests_0-10.queue.QueueTests.test_purge -tests_0-10.queue.QueueTests.test_unbind_direct -tests_0-10.queue.QueueTests.test_unbind_fanout +tests_0-10.queue.QueueTests.test_bind tests_0-10.queue.QueueTests.test_unbind_headers -tests_0-10.queue.QueueTests.test_unbind_topic -tests_0-10.example.ExampleTest.test_example diff --git a/qpid/python/tests_0-10/example.py b/qpid/python/tests_0-10/example.py index da5ee2441f..fd10a8df4f 100644 --- a/qpid/python/tests_0-10/example.py +++ b/qpid/python/tests_0-10/example.py @@ -17,10 +17,10 @@ # under the License. # -from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.datatypes import Message, RangedSet +from qpid.testlib import TestBase010 -class ExampleTest (TestBase): +class ExampleTest (TestBase010): """ An example Qpid test, illustrating the unittest framework and the python Qpid client. The test class must inherit TestBase. The @@ -35,9 +35,9 @@ class ExampleTest (TestBase): """ # By inheriting TestBase, self.client is automatically connected - # and self.channel is automatically opened as channel(1) - # Other channel methods mimic the protocol. - channel = self.channel + # and self.session is automatically opened as session(1) + # Other session methods mimic the protocol. + session = self.session # Now we can send regular commands. If you want to see what the method # arguments mean or what other commands are available, you can use the @@ -57,30 +57,30 @@ class ExampleTest (TestBase): # interact with the server. # Here we use ordinal arguments. - self.exchange_declare(channel, 0, "test", "direct") + session.exchange_declare("test", "direct") # Here we use keyword arguments. - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test", routing_key="key") + session.queue_declare(session, queue="test-queue", exclusive=True, auto_delete=True) + session.exchange_bind(queue="test-queue", exchange="test", binding_key="key") - # Call Channel.basic_consume to register as a consumer. + # Call Session.subscribe to register as a consumer. # All the protocol methods return a message object. The message object # has fields corresponding to the reply method fields, plus a content # field that is filled if the reply includes content. In this case the # interesting field is the consumer_tag. - channel.message_subscribe(queue="test-queue", destination="consumer_tag") - channel.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF) - channel.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF) + session.message_subscribe(queue="test-queue", destination="consumer_tag") + session.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF) - # We can use the Client.queue(...) method to access the queue - # corresponding to our consumer_tag. - queue = self.client.queue("consumer_tag") + # We can use the session.incoming(...) method to access the messages + # delivered for our consumer_tag. + queue = session.incoming("consumer_tag") # Now lets publish a message and see if our consumer gets it. To do - # this we need to import the Content class. - sent = Content("Hello World!") - sent["routing_key"] = "key" - channel.message_transfer(destination="test", content=sent) + # this we need to import the Message class. + delivery_properties = session.delivery_properties(routing_key="key") + sent = Message(delivery_properties, "Hello World!") + session.message_transfer(destination="test", message=sent) # Now we'll wait for the message to arrive. We can use the timeout # argument in case the server hangs. By default queue.get() will wait @@ -88,8 +88,8 @@ class ExampleTest (TestBase): msg = queue.get(timeout=10) # And check that we got the right response with assertEqual - self.assertEqual(sent.body, msg.content.body) + self.assertEqual(sent.body, msg.body) # Now acknowledge the message. - msg.complete() + session.message_accept(RangedSet(msg.id)) diff --git a/qpid/python/tests_0-10/exchange.py b/qpid/python/tests_0-10/exchange.py index afa7f05a49..151807b045 100644 --- a/qpid/python/tests_0-10/exchange.py +++ b/qpid/python/tests_0-10/exchange.py @@ -23,12 +23,89 @@ Tests for exchange behaviour. Test classes ending in 'RuleTests' are derived from rules in amqp.xml. """ -import Queue, logging -from qpid.testlib import TestBase -from qpid.content import Content +import Queue, logging, traceback +from qpid.testlib import TestBase010 +from qpid.datatypes import Message from qpid.client import Closed +class TestHelper(TestBase010): + def setUp(self): + TestBase010.setUp(self) + self.queues = [] + self.exchanges = [] + + def tearDown(self): + try: + for ssn, q in self.queues: + ssn.queue_delete(queue=q) + for ssn, ex in self.exchanges: + ssn.exchange_delete(exchange=ex) + except: + print "Error on tearDown:" + print traceback.print_exc() + + def createMessage(self, key="", body=""): + return Message(self.session.delivery_properties(routing_key=key), body) + + def assertPublishGet(self, queue, exchange="", routing_key="", properties=None): + """ + Publish to exchange and assert queue.get() returns the same message. + """ + body = self.uniqueString() + dp=self.session.delivery_properties(routing_key=routing_key) + mp=self.session.message_properties(application_headers=properties) + self.session.message_transfer(destination=exchange, message=Message(dp, mp, body)) + msg = queue.get(timeout=1) + self.assertEqual(body, msg.body) + if (properties): + self.assertEqual(properties, msg.content['application_headers']) + + def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): + """ + Publish a message and consume it, assert it comes back intact. + Return the Queue object used to consume. + """ + self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) + + def assertEmpty(self, queue): + """Assert that the queue is empty""" + try: + queue.get(timeout=1) + self.fail("Queue is not empty.") + except Queue.Empty: None # Ignore + + def queue_declare(self, session=None, *args, **keys): + session = session or self.session + reply = session.queue_declare(*args, **keys) + self.queues.append((session, keys["queue"])) + return reply + + def exchange_declare(self, session=None, ticket=0, exchange='', + type='', passive=False, durable=False, + auto_delete=False, + arguments={}): + session = session or self.session + reply = session.exchange_declare(exchange=exchange, type=type, passive=passive,durable=durable, auto_delete=auto_delete, arguments=arguments) + self.exchanges.append((session,exchange)) + return reply + + def uniqueString(self): + """Generate a unique string, unique for this TestBase instance""" + if not "uniqueCounter" in dir(self): self.uniqueCounter = 1; + return "Test Message " + str(self.uniqueCounter) + + def consume(self, queueName): + """Consume from named queue returns the Queue object.""" + if not "uniqueTag" in dir(self): self.uniqueTag = 1 + else: self.uniqueTag += 1 + consumer_tag = "tag" + str(self.uniqueTag) + self.session.message_subscribe(queue=queueName, destination=consumer_tag) + self.session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) + self.session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) + return self.session.incoming(consumer_tag) + + class StandardExchangeVerifier: """Verifies standard exchange behavior. @@ -37,7 +114,7 @@ class StandardExchangeVerifier: def verifyDirectExchange(self, ex): """Verify that ex behaves like a direct exchange.""" self.queue_declare(queue="q") - self.session.exchange_bind(queue="q", exchange=ex, routing_key="k") + self.session.exchange_bind(queue="q", exchange=ex, binding_key="k") self.assertPublishConsume(exchange=ex, queue="q", routing_key="k") try: self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk") @@ -55,16 +132,16 @@ class StandardExchangeVerifier: def verifyTopicExchange(self, ex): """Verify that ex behaves like a topic exchange""" self.queue_declare(queue="a") - self.session.exchange_bind(queue="a", exchange=ex, routing_key="a.#.b.*") + self.session.exchange_bind(queue="a", exchange=ex, binding_key="a.#.b.*") q = self.consume("a") self.assertPublishGet(q, ex, "a.b.x") self.assertPublishGet(q, ex, "a.x.b.x") self.assertPublishGet(q, ex, "a.x.x.b.x") # Shouldn't match - self.session.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"})) - self.session.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b.x.y"})) - self.session.message_transfer(destination=ex, content=Content(properties={'routing_key':"x.a.b.x"})) - self.session.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"})) + self.session.message_transfer(destination=ex, message=self.createMessage("a.b")) + self.session.message_transfer(destination=ex, message=self.createMessage("a.b.x.y")) + self.session.message_transfer(destination=ex, message=self.createMessage("x.a.b.x")) + self.session.message_transfer(destination=ex, message=self.createMessage("a.b")) self.assert_(q.empty()) def verifyHeadersExchange(self, ex): @@ -78,7 +155,7 @@ class StandardExchangeVerifier: self.assertEmpty(q); -class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): +class RecommendedTypesRuleTests(TestHelper, StandardExchangeVerifier): """ The server SHOULD implement these standard exchange types: topic, headers. @@ -106,7 +183,7 @@ class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): self.verifyHeadersExchange("h") -class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): +class RequiredInstancesRuleTests(TestHelper, StandardExchangeVerifier): """ The server MUST, in each virtual host, pre-declare an exchange instance for each standard exchange type that it implements, where the name of the @@ -124,7 +201,7 @@ class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): def testAmqMatch(self): self.verifyHeadersExchange("amq.match") -class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier): +class DefaultExchangeRuleTests(TestHelper, StandardExchangeVerifier): """ The server MUST predeclare a direct exchange to act as the default exchange for content Publish methods and for default queue bindings. @@ -144,20 +221,20 @@ class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier): # TODO aconway 2006-09-27: Fill in empty tests: -class DefaultAccessRuleTests(TestBase): +class DefaultAccessRuleTests(TestHelper): """ The server MUST NOT allow clients to access the default exchange except by specifying an empty exchange name in the Queue.Bind and content Publish methods. """ -class ExtensionsRuleTests(TestBase): +class ExtensionsRuleTests(TestHelper): """ The server MAY implement other exchange types as wanted. """ -class DeclareMethodMinimumRuleTests(TestBase): +class DeclareMethodMinimumRuleTests(TestHelper): """ The server SHOULD support a minimum of 16 exchanges per virtual host and ideally, impose no limit except as defined by available resources. @@ -168,7 +245,7 @@ class DeclareMethodMinimumRuleTests(TestBase): """ -class DeclareMethodTicketFieldValidityRuleTests(TestBase): +class DeclareMethodTicketFieldValidityRuleTests(TestHelper): """ The client MUST provide a valid access ticket giving "active" access to the realm in which the exchange exists or will be created, or "passive" @@ -179,7 +256,7 @@ class DeclareMethodTicketFieldValidityRuleTests(TestBase): """ -class DeclareMethodExchangeFieldReservedRuleTests(TestBase): +class DeclareMethodExchangeFieldReservedRuleTests(TestHelper): """ Exchange names starting with "amq." are reserved for predeclared and standardised exchanges. The client MUST NOT attempt to create an exchange @@ -189,7 +266,7 @@ class DeclareMethodExchangeFieldReservedRuleTests(TestBase): """ -class DeclareMethodTypeFieldTypedRuleTests(TestBase): +class DeclareMethodTypeFieldTypedRuleTests(TestHelper): """ Exchanges cannot be redeclared with different types. The client MUST not attempt to redeclare an existing exchange with a different type than used @@ -199,7 +276,7 @@ class DeclareMethodTypeFieldTypedRuleTests(TestBase): """ -class DeclareMethodTypeFieldSupportRuleTests(TestBase): +class DeclareMethodTypeFieldSupportRuleTests(TestHelper): """ The client MUST NOT attempt to create an exchange with a type that the server does not support. @@ -208,7 +285,7 @@ class DeclareMethodTypeFieldSupportRuleTests(TestBase): """ -class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase): +class DeclareMethodPassiveFieldNotFoundRuleTests(TestHelper): """ If set, and the exchange does not already exist, the server MUST raise a channel exception with reply code 404 (not found). @@ -221,7 +298,7 @@ class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase): self.assertChannelException(404, e.args[0]) -class DeclareMethodDurableFieldSupportRuleTests(TestBase): +class DeclareMethodDurableFieldSupportRuleTests(TestHelper): """ The server MUST support both durable and transient exchanges. @@ -229,7 +306,7 @@ class DeclareMethodDurableFieldSupportRuleTests(TestBase): """ -class DeclareMethodDurableFieldStickyRuleTests(TestBase): +class DeclareMethodDurableFieldStickyRuleTests(TestHelper): """ The server MUST ignore the durable field if the exchange already exists. @@ -237,7 +314,7 @@ class DeclareMethodDurableFieldStickyRuleTests(TestBase): """ -class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase): +class DeclareMethodAutoDeleteFieldStickyRuleTests(TestHelper): """ The server MUST ignore the auto-delete field if the exchange already exists. @@ -246,7 +323,7 @@ class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase): """ -class DeleteMethodTicketFieldValidityRuleTests(TestBase): +class DeleteMethodTicketFieldValidityRuleTests(TestHelper): """ The client MUST provide a valid access ticket giving "active" access rights to the exchange's access realm. @@ -256,18 +333,18 @@ class DeleteMethodTicketFieldValidityRuleTests(TestBase): """ -class DeleteMethodExchangeFieldExistsRuleTests(TestBase): +class DeleteMethodExchangeFieldExistsRuleTests(TestHelper): """ The client MUST NOT attempt to delete an exchange that does not exist. """ -class HeadersExchangeTests(TestBase): +class HeadersExchangeTests(TestHelper): """ Tests for headers exchange functionality. """ def setUp(self): - TestBase.setUp(self) + TestHelper.setUp(self) self.queue_declare(queue="q") self.q = self.consume("q") @@ -301,7 +378,7 @@ class HeadersExchangeTests(TestBase): self.assertEmpty(self.q) -class MiscellaneousErrorsTests(TestBase): +class MiscellaneousErrorsTests(TestHelper): """ Test some miscellaneous error conditions """ @@ -325,7 +402,7 @@ class MiscellaneousErrorsTests(TestBase): c2.session_open() c2.exchange_delete(exchange="test_different_declared_type_exchange") -class ExchangeTests(TestBase): +class ExchangeTests(TestHelper): def testHeadersBindNoMatchArg(self): self.session.queue_declare(queue="q", exclusive=True, auto_delete=True) try: diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py index 64e2bc44c4..aaefb52392 100644 --- a/qpid/python/tests_0-10/message.py +++ b/qpid/python/tests_0-10/message.py @@ -619,8 +619,10 @@ class MessageTests(TestBase010): for q in [qA, qB]: msg = q.get(timeout = 1) self.assertEquals("Message %s" % i, msg.body) + #TODO: tidy up completion session.receiver._completed.add(msg.id) + #TODO: tidy up completion session.channel.session_completed(session.receiver._completed) #messages should still be on the queue: self.assertEquals(10, session.queue_query(queue = "q").message_count) @@ -708,8 +710,9 @@ class MessageTests(TestBase010): """ Test acking of messages ranges """ - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + session = self.conn.session("alternate-session", timeout=10) + + session.queue_declare(queue = "q", auto_delete=True) delivery_properties = session.delivery_properties(routing_key="q") for i in range (1, 11): session.message_transfer(message=Message(delivery_properties, "message %s" % (i))) @@ -718,26 +721,46 @@ class MessageTests(TestBase010): session.message_flow(unit = 0, value = 10, destination = "a") session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") queue = session.incoming("a") + ids = [] for i in range (1, 11): - self.assertEquals("message %s" % (i), queue.get(timeout = 1).body) + msg = queue.get(timeout = 1) + self.assertEquals("message %s" % (i), msg.body) + ids.append(msg.id) + self.assertEmpty(queue) - #ack all but the third message (command id 2) - session.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=[0,1,3,6,7,7,8,9]) - session.message_recover() - self.assertEquals("message 3", queue.get(timeout = 1).content.body) + #ack all but the fourth message (command id 2) + accepted = RangedSet() + accepted.add(ids[0], ids[2]) + accepted.add(ids[4], ids[9]) + session.message_accept(accepted) + + #subscribe from second session here to ensure queue is not + #auto-deleted when alternate session closes (no need to ack on these): + self.session.message_subscribe(queue = "q", destination = "checker") + + #now close the session, and see that the unacked messages are + #then redelivered to another subscriber: + session.close(timeout=10) + + session = self.session + session.message_flow(destination="checker", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="checker", unit=1, value=0xFFFFFFFF) + queue = session.incoming("checker") + + self.assertEquals("message 4", queue.get(timeout = 1).body) self.assertEmpty(queue) def test_subscribe_not_acquired_2(self): session = self.session #publish some messages - self.queue_declare(queue = "q", exclusive=True, auto_delete=True) + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 11): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) #consume some of them - session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) + session.message_subscribe(queue = "q", destination = "a") session.message_set_flow_mode(flow_mode = 0, destination = "a") session.message_flow(unit = 0, value = 5, destination = "a") session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") @@ -746,11 +769,15 @@ class MessageTests(TestBase010): for i in range(1, 6): msg = queue.get(timeout = 1) self.assertEquals("message-%d" % (i), msg.body) - msg.complete() + #complete and accept + session.message_accept(RangedSet(msg.id)) + #TODO: tidy up completion + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) self.assertEmpty(queue) #now create a not-acquired subscriber - session.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) + session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") #check it gets those not consumed @@ -759,7 +786,10 @@ class MessageTests(TestBase010): for i in range(6, 11): msg = queue.get(timeout = 1) self.assertEquals("message-%d" % (i), msg.body) - msg.complete() + session.message_release(RangedSet(msg.id)) + #TODO: tidy up completion + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) session.message_flow(unit = 0, value = 1, destination = "b") self.assertEmpty(queue) @@ -790,6 +820,10 @@ class MessageTests(TestBase010): #check that acquire succeeds response = session.control_queue.get(timeout=1) self.assertEquals(response.transfers, [msg.command_id, msg.command_id]) + session.message_release(RangedSet(msg.id)) + session.channel._completed.add(msg.id) + session.channel.session_completed(session.channel._completed) + msg.complete() self.assertEmpty(queue) diff --git a/qpid/python/tests_0-10/queue.py b/qpid/python/tests_0-10/queue.py index b972166325..38d7a3291b 100644 --- a/qpid/python/tests_0-10/queue.py +++ b/qpid/python/tests_0-10/queue.py @@ -58,7 +58,7 @@ class QueueTests(TestBase010): session = self.conn.session("error-checker") try: #queue specified but doesn't exist: - session.queue_purge(queue="invalid-queue") + session.queue_purge(queue="invalid-queue") self.fail("Expected failure when purging non-existent queue") except Closed, e: self.assertChannelException(404, e.args[0]) @@ -119,7 +119,7 @@ class QueueTests(TestBase010): session.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") #use the queue name where the routing key is not specified: - session.queue_bind(queue="queue-1", exchange="amq.direct") + session.exchange_bind(queue="queue-1", exchange="amq.direct") #try and bind to non-existant exchange try: @@ -151,32 +151,43 @@ class QueueTests(TestBase010): def test_unbind_headers(self): self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"}) - def unbind_test(self, exchange, routing_key="", args=None, headers={}): + def unbind_test(self, exchange, routing_key="", args=None, headers=None): #bind two queues and consume from them session = self.session session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) session.queue_declare(queue="queue-2", exclusive=True, auto_delete=True) - self.subscribe(queue="queue-1", destination="queue-1") - self.subscribe(queue="queue-2", destination="queue-2") + session.message_subscribe(queue="queue-1", destination="queue-1") + session.message_flow(destination="queue-1", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="queue-1", unit=1, value=0xFFFFFFFF) + session.message_subscribe(queue="queue-2", destination="queue-2") + session.message_flow(destination="queue-2", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="queue-2", unit=1, value=0xFFFFFFFF) queue1 = session.incoming("queue-1") queue2 = session.incoming("queue-2") - session.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) - session.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) - + session.exchange_bind(exchange=exchange, queue="queue-1", binding_key=routing_key, arguments=args) + session.exchange_bind(exchange=exchange, queue="queue-2", binding_key=routing_key, arguments=args) + + dp = session.delivery_properties(routing_key=routing_key) + if (headers): + mp = session.message_properties(application_headers=headers) + msg1 = Message(dp, mp, "one") + msg2 = Message(dp, mp, "two") + else: + msg1 = Message(dp, "one") + msg2 = Message(dp, "two") + #send a message that will match both bindings - session.message_transfer(destination=exchange, - message=Message(session.delivery_properties(routing_key=routing_key, application_headers=headers), "one")) + session.message_transfer(destination=exchange, message=msg1) #unbind first queue - session.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) + session.exchange_unbind(exchange=exchange, queue="queue-1", binding_key=routing_key) #send another message - session.message_transfer(destination=exchange, - message=Message(session.delivery_properties(routing_key=routing_key, application_headers=headers), "two", )) + session.message_transfer(destination=exchange, message=msg2) #check one queue has both messages and the other has only one self.assertEquals("one", queue1.get(timeout=1).body) @@ -320,7 +331,7 @@ class QueueTests(TestBase010): #NOTE: this assumes there is no timeout in use - #check that it has gone be declaring passively + #check that it has gone by declaring it passively try: session.queue_declare(queue="auto-delete-me", passive=True) self.fail("Expected queue to have been deleted") |
