diff options
| author | Keith Wall <kwall@apache.org> | 2012-11-07 12:27:56 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-11-07 12:27:56 +0000 |
| commit | cc7fe438f414323d659692f753ba197cf80489ba (patch) | |
| tree | 3ecc9bb4d7ab9cfe591da72851fd002253f41f9d /qpid/tests/src | |
| parent | bc2f214dbd631358da84f4ebbe315555b9661f6b (diff) | |
| download | qpid-python-cc7fe438f414323d659692f753ba197cf80489ba.tar.gz | |
QPID-4422: Python Client (0-8..0-9) now allows "instance" client property to be passed in order to allow re-subscribing to durable subscriptions. Centralised the creation of client properties such that this is only done in one place across all protocols. Also increased Python Client (0-8..0-9)'s diagnostic logging.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1406584 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/tests/src')
| -rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_8/basic.py | 45 | ||||
| -rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py | 2 | ||||
| -rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_9/messageheader.py | 35 |
3 files changed, 81 insertions, 1 deletions
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py b/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py index d5837fc19c..f04d581750 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py @@ -79,6 +79,51 @@ class BasicTests(TestBase): except Closed, e: self.assertChannelException(403, e.args[0]) + def test_reconnect_to_durable_subscription(self): + try: + publisherchannel = self.channel + my_id = "my_id" + consumer_connection_properties_with_instance = {"instance": my_id} + queue_for_subscription = "queue_for_subscription_%s" % my_id + topic_name = "my_topic_name" + test_message = self.uniqueString() + + durable_subscription_client = self.connect(client_properties=consumer_connection_properties_with_instance) + consumerchannel = durable_subscription_client.channel(1) + consumerchannel.channel_open() + + self._declare_and_bind_exclusive_queue_on_topic_exchange(consumerchannel, queue_for_subscription, topic_name) + + # disconnect + durable_subscription_client.close() + + # send message to topic + publisherchannel.basic_publish(routing_key=topic_name, exchange="amq.topic", content=Content(test_message)) + + # reconnect and consume message + durable_subscription_client = self.connect(client_properties=consumer_connection_properties_with_instance) + consumerchannel = durable_subscription_client.channel(1) + consumerchannel.channel_open() + + self._declare_and_bind_exclusive_queue_on_topic_exchange(consumerchannel, queue_for_subscription, topic_name) + + # Create consumer and consume the message that was sent whilst subscriber was disconnected. By convention we + # declare the consumer as exclusive to forbid concurrent access. + subscription = consumerchannel.basic_consume(queue=queue_for_subscription, exclusive=True) + queue = durable_subscription_client.queue(subscription.consumer_tag) + + # consume and verify message content + msg = queue.get(timeout=1) + self.assertEqual(test_message, msg.content.body) + consumerchannel.basic_ack(delivery_tag=msg.delivery_tag) + finally: + publisherchannel.queue_delete(queue=queue_for_subscription) + durable_subscription_client.close() + + def _declare_and_bind_exclusive_queue_on_topic_exchange(self, channel, queue, topic_name): + channel.queue_declare(queue=queue, exclusive=True, auto_delete=False, durable=True) + channel.queue_bind(exchange="amq.topic", queue=queue, routing_key=topic_name) + def test_consume_queue_errors(self): """ Test error conditions associated with the queue field of the consume method: diff --git a/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py b/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py index d9f2ed7dbb..6b46b96b1d 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py @@ -19,4 +19,4 @@ # under the License. # -import query, queue +import query, queue, messageheader diff --git a/qpid/tests/src/py/qpid_tests/broker_0_9/messageheader.py b/qpid/tests/src/py/qpid_tests/broker_0_9/messageheader.py new file mode 100644 index 0000000000..3526cf37af --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_9/messageheader.py @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.testlib import TestBase + +class MessageHeaderTests(TestBase): + """Verify that messages with headers work as expected""" + + def test_message_with_integer_header(self): + props={"headers":{"one":1, "zero":0}} + self.queue_declare(queue="q") + q = self.consume("q") + self.assertPublishGet(q, routing_key="q", properties=props) + + def test_message_with_string_header(self): + props={"headers":{"mystr":"hello world", "myempty":""}} + self.queue_declare(queue="q") + q = self.consume("q") + self.assertPublishGet(q, routing_key="q", properties=props) |
