diff options
Diffstat (limited to 'qpid/python/tests_0-10/broker.py')
| -rw-r--r-- | qpid/python/tests_0-10/broker.py | 92 |
1 files changed, 37 insertions, 55 deletions
diff --git a/qpid/python/tests_0-10/broker.py b/qpid/python/tests_0-10/broker.py index 99936ba742..25cf1241ec 100644 --- a/qpid/python/tests_0-10/broker.py +++ b/qpid/python/tests_0-10/broker.py @@ -18,10 +18,10 @@ # from qpid.client import Closed from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.testlib import TestBase010 +from qpid.datatypes import Message, RangedSet -class BrokerTests(TestBase): +class BrokerTests(TestBase010): """Tests for basic Broker functionality""" def test_ack_and_no_ack(self): @@ -30,82 +30,64 @@ class BrokerTests(TestBase): consumer. Second, this test tries to explicitly receive and acknowledge a message with an acknowledging consumer. """ - ch = self.channel - self.queue_declare(ch, queue = "myqueue") + session = self.session + session.queue_declare(queue = "myqueue", exclusive=True, auto_delete=True) # No ack consumer ctag = "tag1" - self.subscribe(ch, queue = "myqueue", destination = ctag) + session.message_subscribe(queue = "myqueue", destination = ctag) + session.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF) + session.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF) body = "test no-ack" - ch.message_transfer(content = Content(body, properties = {"routing_key" : "myqueue"})) - msg = self.client.queue(ctag).get(timeout = 5) - self.assert_(msg.content.body == body) + session.message_transfer(message=Message(session.delivery_properties(routing_key="myqueue"), body)) + msg = session.incoming(ctag).get(timeout = 5) + self.assert_(msg.body == body) # Acknowledging consumer - self.queue_declare(ch, queue = "otherqueue") + session.queue_declare(queue = "otherqueue", exclusive=True, auto_delete=True) ctag = "tag2" - self.subscribe(ch, queue = "otherqueue", destination = ctag, confirm_mode = 1) - ch.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF) - ch.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF) + session.message_subscribe(queue = "otherqueue", destination = ctag, accept_mode = 1) + session.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF) + session.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF) body = "test ack" - ch.message_transfer(content = Content(body, properties = {"routing_key" : "otherqueue"})) - msg = self.client.queue(ctag).get(timeout = 5) - msg.complete() - self.assert_(msg.content.body == body) + session.message_transfer(message=Message(session.delivery_properties(routing_key="otherqueue"), body)) + msg = session.incoming(ctag).get(timeout = 5) + session.message_accept(RangedSet(msg.id)) + self.assert_(msg.body == body) def test_simple_delivery_immediate(self): """ Test simple message delivery where consume is issued before publish """ - channel = self.channel - self.exchange_declare(channel, exchange="test-exchange", type="direct") - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + session = self.session + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) + session.exchange_bind(queue="test-queue", exchange="amq.fanout") consumer_tag = "tag1" - self.subscribe(queue="test-queue", destination=consumer_tag) - queue = self.client.queue(consumer_tag) + session.message_subscribe(queue="test-queue", destination=consumer_tag) + session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = consumer_tag) + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = consumer_tag) + queue = session.incoming(consumer_tag) body = "Immediate Delivery" - channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"})) + session.message_transfer("amq.fanout", None, None, Message(body)) msg = queue.get(timeout=5) - self.assert_(msg.content.body == body) - - # TODO: Ensure we fail if immediate=True and there's no consumer. - + self.assert_(msg.body == body) def test_simple_delivery_queued(self): """ Test basic message delivery where publish is issued before consume (i.e. requires queueing of the message) """ - channel = self.channel - self.exchange_declare(channel, exchange="test-exchange", type="direct") - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + session = self.session + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) + session.exchange_bind(queue="test-queue", exchange="amq.fanout") body = "Queued Delivery" - channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"})) + session.message_transfer("amq.fanout", None, None, Message(body)) consumer_tag = "tag1" - self.subscribe(queue="test-queue", destination=consumer_tag) - queue = self.client.queue(consumer_tag) + session.message_subscribe(queue="test-queue", destination=consumer_tag) + session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = consumer_tag) + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = consumer_tag) + queue = session.incoming(consumer_tag) msg = queue.get(timeout=5) - self.assert_(msg.content.body == body) - - def test_invalid_channel(self): - channel = self.client.channel(200) - try: - channel.queue_declare(exclusive=True) - self.fail("Expected error on queue_declare for invalid channel") - except Closed, e: - self.assertConnectionException(504, e.args[0]) - - def test_closed_channel(self): - channel = self.client.channel(200) - channel.session_open() - channel.session_close() - try: - channel.queue_declare(exclusive=True) - self.fail("Expected error on queue_declare for closed channel") - except Closed, e: - if isinstance(e.args[0], str): self.fail(e) - self.assertConnectionException(504, e.args[0]) + self.assert_(msg.body == body) |
