summaryrefslogtreecommitdiff
path: root/python/tests_0-10/queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests_0-10/queue.py')
-rw-r--r--python/tests_0-10/queue.py174
1 files changed, 86 insertions, 88 deletions
diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py
index 7b3590d11b..f192a2af90 100644
--- a/python/tests_0-10/queue.py
+++ b/python/tests_0-10/queue.py
@@ -28,66 +28,64 @@ class QueueTests(TestBase):
"""
Test that the purge method removes messages from the queue
"""
- channel = self.channel
+ session = self.session
#setup, declare a queue and add some messages to it:
- channel.exchange_declare(exchange="test-exchange", type="direct")
- channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
- channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
- channel.message_transfer(destination="test-exchange", content=Content("one", properties={'routing_key':"key"}))
- channel.message_transfer(destination="test-exchange", content=Content("two", properties={'routing_key':"key"}))
- channel.message_transfer(destination="test-exchange", content=Content("three", properties={'routing_key':"key"}))
+ session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+ session.message_transfer(content=Content("one", properties={'routing_key':"test-queue"}))
+ session.message_transfer(content=Content("two", properties={'routing_key':"test-queue"}))
+ session.message_transfer(content=Content("three", properties={'routing_key':"test-queue"}))
#check that the queue now reports 3 messages:
- channel.queue_declare(queue="test-queue")
- reply = channel.queue_query(queue="test-queue")
+ session.queue_declare(queue="test-queue")
+ reply = session.queue_query(queue="test-queue")
self.assertEqual(3, reply.message_count)
#now do the purge, then test that three messages are purged and the count drops to 0
- channel.queue_purge(queue="test-queue");
- reply = channel.queue_query(queue="test-queue")
+ session.queue_purge(queue="test-queue");
+ reply = session.queue_query(queue="test-queue")
self.assertEqual(0, reply.message_count)
#send a further message and consume it, ensuring that the other messages are really gone
- channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"}))
+ session.message_transfer(content=Content("four", properties={'routing_key':"test-queue"}))
self.subscribe(queue="test-queue", destination="tag")
queue = self.client.queue("tag")
msg = queue.get(timeout=1)
self.assertEqual("four", msg.content.body)
- #check error conditions (use new channels):
- channel = self.client.channel(2)
- channel.session_open()
+ #check error conditions (use new sessions):
+ session = self.client.session(2)
+ session.session_open()
try:
#queue specified but doesn't exist:
- channel.queue_purge(queue="invalid-queue")
+ session.queue_purge(queue="invalid-queue")
self.fail("Expected failure when purging non-existent queue")
except Closed, e:
self.assertChannelException(404, e.args[0])
- channel = self.client.channel(3)
- channel.session_open()
+ session = self.client.session(3)
+ session.session_open()
try:
#queue not specified and none previously declared for channel:
- channel.queue_purge()
+ session.queue_purge()
self.fail("Expected failure when purging unspecified queue")
except Closed, e:
self.assertConnectionException(530, e.args[0])
#cleanup
other = self.connect()
- channel = other.channel(1)
- channel.session_open()
- channel.exchange_delete(exchange="test-exchange")
+ session = other.session(1)
+ session.session_open()
+ session.exchange_delete(exchange="test-exchange")
def test_declare_exclusive(self):
"""
Test that the exclusive field is honoured in queue.declare
"""
- # TestBase.setUp has already opened channel(1)
- c1 = self.channel
+ # TestBase.setUp has already opened session(1)
+ c1 = self.session
# Here we open a second separate connection:
other = self.connect()
- c2 = other.channel(1)
+ c2 = other.session(1)
c2.session_open()
#declare an exclusive queue:
@@ -104,13 +102,13 @@ class QueueTests(TestBase):
"""
Test that the passive field is honoured in queue.declare
"""
- channel = self.channel
+ session = self.session
#declare an exclusive queue:
- channel.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="passive-queue-1", passive=True)
+ session.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="passive-queue-1", passive=True)
try:
#other connection should not be allowed to declare this:
- channel.queue_declare(queue="passive-queue-2", passive=True)
+ session.queue_declare(queue="passive-queue-2", passive=True)
self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -120,29 +118,29 @@ class QueueTests(TestBase):
"""
Test various permutations of the queue.bind method
"""
- channel = self.channel
- channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
+ session = self.session
+ session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
#straightforward case, both exchange & queue exist so no errors expected:
- channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1")
+ session.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1")
#use the queue name where the routing key is not specified:
- channel.queue_bind(queue="queue-1", exchange="amq.direct")
+ session.queue_bind(queue="queue-1", exchange="amq.direct")
#try and bind to non-existant exchange
try:
- channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1")
+ session.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1")
self.fail("Expected bind to non-existant exchange to fail")
except Closed, e:
self.assertChannelException(404, e.args[0])
- #need to reopen a channel:
- channel = self.client.channel(2)
- channel.session_open()
+ #need to reopen a session:
+ session = self.client.session(2)
+ session.session_open()
#try and bind non-existant queue:
try:
- channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1")
+ session.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1")
self.fail("Expected bind of non-existant queue to fail")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -161,10 +159,10 @@ class QueueTests(TestBase):
def unbind_test(self, exchange, routing_key="", args=None, headers={}):
#bind two queues and consume from them
- channel = self.channel
+ session = self.session
- channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="queue-2", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="queue-2", exclusive=True, auto_delete=True)
self.subscribe(queue="queue-1", destination="queue-1")
self.subscribe(queue="queue-2", destination="queue-2")
@@ -172,18 +170,18 @@ class QueueTests(TestBase):
queue1 = self.client.queue("queue-1")
queue2 = self.client.queue("queue-2")
- channel.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
- channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args)
+ session.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
+ session.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args)
#send a message that will match both bindings
- channel.message_transfer(destination=exchange,
+ session.message_transfer(destination=exchange,
content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers}))
#unbind first queue
- channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
+ session.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
#send another message
- channel.message_transfer(destination=exchange,
+ session.message_transfer(destination=exchange,
content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers}))
#check one queue has both messages and the other has only one
@@ -205,26 +203,26 @@ class QueueTests(TestBase):
"""
Test core queue deletion behaviour
"""
- channel = self.channel
+ session = self.session
#straight-forward case:
- channel.queue_declare(queue="delete-me")
- channel.message_transfer(content=Content("a", properties={'routing_key':"delete-me"}))
- channel.message_transfer(content=Content("b", properties={'routing_key':"delete-me"}))
- channel.message_transfer(content=Content("c", properties={'routing_key':"delete-me"}))
- channel.queue_delete(queue="delete-me")
+ session.queue_declare(queue="delete-me")
+ session.message_transfer(content=Content("a", properties={'routing_key':"delete-me"}))
+ session.message_transfer(content=Content("b", properties={'routing_key':"delete-me"}))
+ session.message_transfer(content=Content("c", properties={'routing_key':"delete-me"}))
+ session.queue_delete(queue="delete-me")
#check that it has gone be declaring passively
try:
- channel.queue_declare(queue="delete-me", passive=True)
+ session.queue_declare(queue="delete-me", passive=True)
self.fail("Queue has not been deleted")
except Closed, e:
self.assertChannelException(404, e.args[0])
#check attempted deletion of non-existant queue is handled correctly:
- channel = self.client.channel(2)
- channel.session_open()
+ session = self.client.session(2)
+ session.session_open()
try:
- channel.queue_delete(queue="i-dont-exist", if_empty=True)
+ session.queue_delete(queue="i-dont-exist", if_empty=True)
self.fail("Expected delete of non-existant queue to fail")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -235,37 +233,37 @@ class QueueTests(TestBase):
"""
Test that if_empty field of queue_delete is honoured
"""
- channel = self.channel
+ session = self.session
#create a queue and add a message to it (use default binding):
- channel.queue_declare(queue="delete-me-2")
- channel.queue_declare(queue="delete-me-2", passive=True)
- channel.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"}))
+ session.queue_declare(queue="delete-me-2")
+ session.queue_declare(queue="delete-me-2", passive=True)
+ session.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"}))
#try to delete, but only if empty:
try:
- channel.queue_delete(queue="delete-me-2", if_empty=True)
+ session.queue_delete(queue="delete-me-2", if_empty=True)
self.fail("Expected delete if_empty to fail for non-empty queue")
except Closed, e:
self.assertChannelException(406, e.args[0])
#need new channel now:
- channel = self.client.channel(2)
- channel.session_open()
+ session = self.client.session(2)
+ session.session_open()
#empty queue:
- self.subscribe(channel, destination="consumer_tag", queue="delete-me-2")
+ self.subscribe(session, destination="consumer_tag", queue="delete-me-2")
queue = self.client.queue("consumer_tag")
msg = queue.get(timeout=1)
self.assertEqual("message", msg.content.body)
- channel.message_cancel(destination="consumer_tag")
+ session.message_cancel(destination="consumer_tag")
#retry deletion on empty queue:
- channel.queue_delete(queue="delete-me-2", if_empty=True)
+ session.queue_delete(queue="delete-me-2", if_empty=True)
#check that it has gone by declaring passively:
try:
- channel.queue_declare(queue="delete-me-2", passive=True)
+ session.queue_declare(queue="delete-me-2", passive=True)
self.fail("Queue has not been deleted")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -274,29 +272,29 @@ class QueueTests(TestBase):
"""
Test that if_unused field of queue_delete is honoured
"""
- channel = self.channel
+ session = self.channel
#create a queue and register a consumer:
- channel.queue_declare(queue="delete-me-3")
- channel.queue_declare(queue="delete-me-3", passive=True)
+ session.queue_declare(queue="delete-me-3")
+ session.queue_declare(queue="delete-me-3", passive=True)
self.subscribe(destination="consumer_tag", queue="delete-me-3")
- #need new channel now:
- channel2 = self.client.channel(2)
- channel2.session_open()
+ #need new session now:
+ session2 = self.client.session(2)
+ session2.session_open()
#try to delete, but only if empty:
try:
- channel2.queue_delete(queue="delete-me-3", if_unused=True)
+ session2.queue_delete(queue="delete-me-3", if_unused=True)
self.fail("Expected delete if_unused to fail for queue with existing consumer")
except Closed, e:
self.assertChannelException(406, e.args[0])
- channel.message_cancel(destination="consumer_tag")
- channel.queue_delete(queue="delete-me-3", if_unused=True)
+ session.message_cancel(destination="consumer_tag")
+ session.queue_delete(queue="delete-me-3", if_unused=True)
#check that it has gone by declaring passively:
try:
- channel.queue_declare(queue="delete-me-3", passive=True)
+ session.queue_declare(queue="delete-me-3", passive=True)
self.fail("Queue has not been deleted")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -306,31 +304,31 @@ class QueueTests(TestBase):
"""
Test auto-deletion (of non-exclusive queues)
"""
- channel = self.channel
+ session = self.session
other = self.connect()
- channel2 = other.channel(1)
- channel2.session_open()
+ session2 = other.session(1)
+ session2.session_open()
- channel.queue_declare(queue="auto-delete-me", auto_delete=True)
+ session.queue_declare(queue="auto-delete-me", auto_delete=True)
- #consume from both channels
- reply = channel.basic_consume(queue="auto-delete-me")
- channel2.basic_consume(queue="auto-delete-me")
+ #consume from both sessions
+ reply = session.basic_consume(queue="auto-delete-me")
+ session2.basic_consume(queue="auto-delete-me")
#implicit cancel
- channel2.session_close()
+ session2.session_close()
#check it is still there
- channel.queue_declare(queue="auto-delete-me", passive=True)
+ session.queue_declare(queue="auto-delete-me", passive=True)
#explicit cancel => queue is now unused again:
- channel.basic_cancel(consumer_tag=reply.consumer_tag)
+ session.basic_cancel(consumer_tag=reply.consumer_tag)
#NOTE: this assumes there is no timeout in use
#check that it has gone be declaring passively
try:
- channel.queue_declare(queue="auto-delete-me", passive=True)
+ session.queue_declare(queue="auto-delete-me", passive=True)
self.fail("Expected queue to have been deleted")
except Closed, e:
self.assertChannelException(404, e.args[0])