diff options
Diffstat (limited to 'python/tests_0-10/queue.py')
| -rw-r--r-- | python/tests_0-10/queue.py | 174 |
1 files changed, 86 insertions, 88 deletions
diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py index 7b3590d11b..f192a2af90 100644 --- a/python/tests_0-10/queue.py +++ b/python/tests_0-10/queue.py @@ -28,66 +28,64 @@ class QueueTests(TestBase): """ Test that the purge method removes messages from the queue """ - channel = self.channel + session = self.session #setup, declare a queue and add some messages to it: - channel.exchange_declare(exchange="test-exchange", type="direct") - channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - channel.message_transfer(destination="test-exchange", content=Content("one", properties={'routing_key':"key"})) - channel.message_transfer(destination="test-exchange", content=Content("two", properties={'routing_key':"key"})) - channel.message_transfer(destination="test-exchange", content=Content("three", properties={'routing_key':"key"})) + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) + session.message_transfer(content=Content("one", properties={'routing_key':"test-queue"})) + session.message_transfer(content=Content("two", properties={'routing_key':"test-queue"})) + session.message_transfer(content=Content("three", properties={'routing_key':"test-queue"})) #check that the queue now reports 3 messages: - channel.queue_declare(queue="test-queue") - reply = channel.queue_query(queue="test-queue") + session.queue_declare(queue="test-queue") + reply = session.queue_query(queue="test-queue") self.assertEqual(3, reply.message_count) #now do the purge, then test that three messages are purged and the count drops to 0 - channel.queue_purge(queue="test-queue"); - reply = channel.queue_query(queue="test-queue") + session.queue_purge(queue="test-queue"); + reply = session.queue_query(queue="test-queue") self.assertEqual(0, reply.message_count) #send a further message and consume it, ensuring that the other messages are really gone - channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"})) + session.message_transfer(content=Content("four", properties={'routing_key':"test-queue"})) self.subscribe(queue="test-queue", destination="tag") queue = self.client.queue("tag") msg = queue.get(timeout=1) self.assertEqual("four", msg.content.body) - #check error conditions (use new channels): - channel = self.client.channel(2) - channel.session_open() + #check error conditions (use new sessions): + session = self.client.session(2) + session.session_open() try: #queue specified but doesn't exist: - channel.queue_purge(queue="invalid-queue") + session.queue_purge(queue="invalid-queue") self.fail("Expected failure when purging non-existent queue") except Closed, e: self.assertChannelException(404, e.args[0]) - channel = self.client.channel(3) - channel.session_open() + session = self.client.session(3) + session.session_open() try: #queue not specified and none previously declared for channel: - channel.queue_purge() + session.queue_purge() self.fail("Expected failure when purging unspecified queue") except Closed, e: self.assertConnectionException(530, e.args[0]) #cleanup other = self.connect() - channel = other.channel(1) - channel.session_open() - channel.exchange_delete(exchange="test-exchange") + session = other.session(1) + session.session_open() + session.exchange_delete(exchange="test-exchange") def test_declare_exclusive(self): """ Test that the exclusive field is honoured in queue.declare """ - # TestBase.setUp has already opened channel(1) - c1 = self.channel + # TestBase.setUp has already opened session(1) + c1 = self.session # Here we open a second separate connection: other = self.connect() - c2 = other.channel(1) + c2 = other.session(1) c2.session_open() #declare an exclusive queue: @@ -104,13 +102,13 @@ class QueueTests(TestBase): """ Test that the passive field is honoured in queue.declare """ - channel = self.channel + session = self.session #declare an exclusive queue: - channel.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True) - channel.queue_declare(queue="passive-queue-1", passive=True) + session.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True) + session.queue_declare(queue="passive-queue-1", passive=True) try: #other connection should not be allowed to declare this: - channel.queue_declare(queue="passive-queue-2", passive=True) + session.queue_declare(queue="passive-queue-2", passive=True) self.fail("Expected passive declaration of non-existant queue to raise a channel exception") except Closed, e: self.assertChannelException(404, e.args[0]) @@ -120,29 +118,29 @@ class QueueTests(TestBase): """ Test various permutations of the queue.bind method """ - channel = self.channel - channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) + session = self.session + session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) #straightforward case, both exchange & queue exist so no errors expected: - channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") + session.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") #use the queue name where the routing key is not specified: - channel.queue_bind(queue="queue-1", exchange="amq.direct") + session.queue_bind(queue="queue-1", exchange="amq.direct") #try and bind to non-existant exchange try: - channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") + session.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") self.fail("Expected bind to non-existant exchange to fail") except Closed, e: self.assertChannelException(404, e.args[0]) - #need to reopen a channel: - channel = self.client.channel(2) - channel.session_open() + #need to reopen a session: + session = self.client.session(2) + session.session_open() #try and bind non-existant queue: try: - channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") + session.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") self.fail("Expected bind of non-existant queue to fail") except Closed, e: self.assertChannelException(404, e.args[0]) @@ -161,10 +159,10 @@ class QueueTests(TestBase): def unbind_test(self, exchange, routing_key="", args=None, headers={}): #bind two queues and consume from them - channel = self.channel + session = self.session - channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) - channel.queue_declare(queue="queue-2", exclusive=True, auto_delete=True) + session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) + session.queue_declare(queue="queue-2", exclusive=True, auto_delete=True) self.subscribe(queue="queue-1", destination="queue-1") self.subscribe(queue="queue-2", destination="queue-2") @@ -172,18 +170,18 @@ class QueueTests(TestBase): queue1 = self.client.queue("queue-1") queue2 = self.client.queue("queue-2") - channel.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) - channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) + session.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) + session.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) #send a message that will match both bindings - channel.message_transfer(destination=exchange, + session.message_transfer(destination=exchange, content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers})) #unbind first queue - channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) + session.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) #send another message - channel.message_transfer(destination=exchange, + session.message_transfer(destination=exchange, content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers})) #check one queue has both messages and the other has only one @@ -205,26 +203,26 @@ class QueueTests(TestBase): """ Test core queue deletion behaviour """ - channel = self.channel + session = self.session #straight-forward case: - channel.queue_declare(queue="delete-me") - channel.message_transfer(content=Content("a", properties={'routing_key':"delete-me"})) - channel.message_transfer(content=Content("b", properties={'routing_key':"delete-me"})) - channel.message_transfer(content=Content("c", properties={'routing_key':"delete-me"})) - channel.queue_delete(queue="delete-me") + session.queue_declare(queue="delete-me") + session.message_transfer(content=Content("a", properties={'routing_key':"delete-me"})) + session.message_transfer(content=Content("b", properties={'routing_key':"delete-me"})) + session.message_transfer(content=Content("c", properties={'routing_key':"delete-me"})) + session.queue_delete(queue="delete-me") #check that it has gone be declaring passively try: - channel.queue_declare(queue="delete-me", passive=True) + session.queue_declare(queue="delete-me", passive=True) self.fail("Queue has not been deleted") except Closed, e: self.assertChannelException(404, e.args[0]) #check attempted deletion of non-existant queue is handled correctly: - channel = self.client.channel(2) - channel.session_open() + session = self.client.session(2) + session.session_open() try: - channel.queue_delete(queue="i-dont-exist", if_empty=True) + session.queue_delete(queue="i-dont-exist", if_empty=True) self.fail("Expected delete of non-existant queue to fail") except Closed, e: self.assertChannelException(404, e.args[0]) @@ -235,37 +233,37 @@ class QueueTests(TestBase): """ Test that if_empty field of queue_delete is honoured """ - channel = self.channel + session = self.session #create a queue and add a message to it (use default binding): - channel.queue_declare(queue="delete-me-2") - channel.queue_declare(queue="delete-me-2", passive=True) - channel.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"})) + session.queue_declare(queue="delete-me-2") + session.queue_declare(queue="delete-me-2", passive=True) + session.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"})) #try to delete, but only if empty: try: - channel.queue_delete(queue="delete-me-2", if_empty=True) + session.queue_delete(queue="delete-me-2", if_empty=True) self.fail("Expected delete if_empty to fail for non-empty queue") except Closed, e: self.assertChannelException(406, e.args[0]) #need new channel now: - channel = self.client.channel(2) - channel.session_open() + session = self.client.session(2) + session.session_open() #empty queue: - self.subscribe(channel, destination="consumer_tag", queue="delete-me-2") + self.subscribe(session, destination="consumer_tag", queue="delete-me-2") queue = self.client.queue("consumer_tag") msg = queue.get(timeout=1) self.assertEqual("message", msg.content.body) - channel.message_cancel(destination="consumer_tag") + session.message_cancel(destination="consumer_tag") #retry deletion on empty queue: - channel.queue_delete(queue="delete-me-2", if_empty=True) + session.queue_delete(queue="delete-me-2", if_empty=True) #check that it has gone by declaring passively: try: - channel.queue_declare(queue="delete-me-2", passive=True) + session.queue_declare(queue="delete-me-2", passive=True) self.fail("Queue has not been deleted") except Closed, e: self.assertChannelException(404, e.args[0]) @@ -274,29 +272,29 @@ class QueueTests(TestBase): """ Test that if_unused field of queue_delete is honoured """ - channel = self.channel + session = self.channel #create a queue and register a consumer: - channel.queue_declare(queue="delete-me-3") - channel.queue_declare(queue="delete-me-3", passive=True) + session.queue_declare(queue="delete-me-3") + session.queue_declare(queue="delete-me-3", passive=True) self.subscribe(destination="consumer_tag", queue="delete-me-3") - #need new channel now: - channel2 = self.client.channel(2) - channel2.session_open() + #need new session now: + session2 = self.client.session(2) + session2.session_open() #try to delete, but only if empty: try: - channel2.queue_delete(queue="delete-me-3", if_unused=True) + session2.queue_delete(queue="delete-me-3", if_unused=True) self.fail("Expected delete if_unused to fail for queue with existing consumer") except Closed, e: self.assertChannelException(406, e.args[0]) - channel.message_cancel(destination="consumer_tag") - channel.queue_delete(queue="delete-me-3", if_unused=True) + session.message_cancel(destination="consumer_tag") + session.queue_delete(queue="delete-me-3", if_unused=True) #check that it has gone by declaring passively: try: - channel.queue_declare(queue="delete-me-3", passive=True) + session.queue_declare(queue="delete-me-3", passive=True) self.fail("Queue has not been deleted") except Closed, e: self.assertChannelException(404, e.args[0]) @@ -306,31 +304,31 @@ class QueueTests(TestBase): """ Test auto-deletion (of non-exclusive queues) """ - channel = self.channel + session = self.session other = self.connect() - channel2 = other.channel(1) - channel2.session_open() + session2 = other.session(1) + session2.session_open() - channel.queue_declare(queue="auto-delete-me", auto_delete=True) + session.queue_declare(queue="auto-delete-me", auto_delete=True) - #consume from both channels - reply = channel.basic_consume(queue="auto-delete-me") - channel2.basic_consume(queue="auto-delete-me") + #consume from both sessions + reply = session.basic_consume(queue="auto-delete-me") + session2.basic_consume(queue="auto-delete-me") #implicit cancel - channel2.session_close() + session2.session_close() #check it is still there - channel.queue_declare(queue="auto-delete-me", passive=True) + session.queue_declare(queue="auto-delete-me", passive=True) #explicit cancel => queue is now unused again: - channel.basic_cancel(consumer_tag=reply.consumer_tag) + session.basic_cancel(consumer_tag=reply.consumer_tag) #NOTE: this assumes there is no timeout in use #check that it has gone be declaring passively try: - channel.queue_declare(queue="auto-delete-me", passive=True) + session.queue_declare(queue="auto-delete-me", passive=True) self.fail("Expected queue to have been deleted") except Closed, e: self.assertChannelException(404, e.args[0]) |
