diff options
Diffstat (limited to 'python/tests_0-10/message.py')
-rw-r--r-- | python/tests_0-10/message.py | 410 |
1 files changed, 57 insertions, 353 deletions
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index 6cf2f3ef89..f08f437a65 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -38,14 +38,14 @@ class MessageTests(TestBase): channel.message_subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True) #send a message - channel.message_transfer(routing_key="test-queue-1a", body="consume_no_local") - channel.message_transfer(routing_key="test-queue-1b", body="consume_no_local") + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local")) #check the queues of the two consumers excluded = self.client.queue("local_excluded") included = self.client.queue("local_included") msg = included.get(timeout=1) - self.assertEqual("consume_no_local", msg.body) + self.assertEqual("consume_no_local", msg.content.body) try: excluded.get(timeout=1) self.fail("Received locally published message though no_local=true") @@ -125,14 +125,14 @@ class MessageTests(TestBase): #setup, declare a queue: channel.queue_declare(queue="test-queue-4", exclusive=True) channel.message_subscribe(destination="my-consumer", queue="test-queue-4") - channel.message_transfer(routing_key="test-queue-4", body="One") + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One")) #cancel should stop messages being delivered channel.message_cancel(destination="my-consumer") - channel.message_transfer(routing_key="test-queue-4", body="Two") + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="Two")) myqueue = self.client.queue("my-consumer") msg = myqueue.get(timeout=1) - self.assertEqual("One", msg.body) + self.assertEqual("One", msg.content.body) try: msg = myqueue.get(timeout=1) self.fail("Got message after cancellation: " + msg) @@ -153,11 +153,11 @@ class MessageTests(TestBase): channel.message_subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") - channel.message_transfer(routing_key="test-ack-queue", body="One") - channel.message_transfer(routing_key="test-ack-queue", body="Two") - channel.message_transfer(routing_key="test-ack-queue", body="Three") - channel.message_transfer(routing_key="test-ack-queue", body="Four") - channel.message_transfer(routing_key="test-ack-queue", body="Five") + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Two")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Three")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Four")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Five")) msg1 = queue.get(timeout=1) msg2 = queue.get(timeout=1) @@ -165,11 +165,11 @@ class MessageTests(TestBase): msg4 = queue.get(timeout=1) msg5 = queue.get(timeout=1) - self.assertEqual("One", msg1.body) - self.assertEqual("Two", msg2.body) - self.assertEqual("Three", msg3.body) - self.assertEqual("Four", msg4.body) - self.assertEqual("Five", msg5.body) + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) msg2.complete(cumulative=True)#One and Two msg4.complete(cumulative=False) @@ -179,12 +179,12 @@ class MessageTests(TestBase): msg3b = queue.get(timeout=1) msg5b = queue.get(timeout=1) - self.assertEqual("Three", msg3b.body) - self.assertEqual("Five", msg5b.body) + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) try: extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) + self.fail("Got unexpected message: " + extra.content.body) except Empty: None def test_recover_requeue(self): @@ -197,11 +197,11 @@ class MessageTests(TestBase): channel.message_subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") - channel.message_transfer(routing_key="test-requeue", body="One") - channel.message_transfer(routing_key="test-requeue", body="Two") - channel.message_transfer(routing_key="test-requeue", body="Three") - channel.message_transfer(routing_key="test-requeue", body="Four") - channel.message_transfer(routing_key="test-requeue", body="Five") + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Two")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five")) msg1 = queue.get(timeout=1) msg2 = queue.get(timeout=1) @@ -209,11 +209,11 @@ class MessageTests(TestBase): msg4 = queue.get(timeout=1) msg5 = queue.get(timeout=1) - self.assertEqual("One", msg1.body) - self.assertEqual("Two", msg2.body) - self.assertEqual("Three", msg3.body) - self.assertEqual("Four", msg4.body) - self.assertEqual("Five", msg5.body) + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) msg2.complete(cumulative=True) #One and Two msg4.complete(cumulative=False) #Four @@ -221,7 +221,7 @@ class MessageTests(TestBase): channel.message_cancel(destination="consumer_tag") #publish a new message - channel.message_transfer(routing_key="test-requeue", body="Six") + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Six")) #requeue unacked messages (Three and Five) channel.message_recover(requeue=True) @@ -231,21 +231,21 @@ class MessageTests(TestBase): msg3b = queue2.get(timeout=1) msg5b = queue2.get(timeout=1) - self.assertEqual("Three", msg3b.body) - self.assertEqual("Five", msg5b.body) + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) - self.assertEqual(True, msg3b.redelivered) - self.assertEqual(True, msg5b.redelivered) + self.assertEqual(True, msg3b.content['redelivered']) + self.assertEqual(True, msg5b.content['redelivered']) - self.assertEqual("Six", queue2.get(timeout=1).body) + self.assertEqual("Six", queue2.get(timeout=1).content.body) try: extra = queue2.get(timeout=1) - self.fail("Got unexpected message in second queue: " + extra.body) + self.fail("Got unexpected message in second queue: " + extra.content.body) except Empty: None try: extra = queue.get(timeout=1) - self.fail("Got unexpected message in original queue: " + extra.body) + self.fail("Got unexpected message in original queue: " + extra.content.body) except Empty: None @@ -264,15 +264,15 @@ class MessageTests(TestBase): #publish 10 messages: for i in range(1, 11): - channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i) + channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-count"}, body="Message %d" % i)) #only 5 messages should have been delivered: for i in range(1, 6): msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) try: extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.body) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) except Empty: None #ack messages and check that the next set arrive ok: @@ -280,13 +280,13 @@ class MessageTests(TestBase): for i in range(6, 11): msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) msg.complete() try: extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.body) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) except Empty: None @@ -306,16 +306,16 @@ class MessageTests(TestBase): #publish 10 messages: for i in range(1, 11): - channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i) + channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body="Message %d" % i)) #only 5 messages should have been delivered (i.e. 45 bytes worth): for i in range(1, 6): msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) try: extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.body) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) except Empty: None #ack messages and check that the next set arrive ok: @@ -323,328 +323,38 @@ class MessageTests(TestBase): for i in range(6, 11): msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) msg.complete() try: extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.body) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) except Empty: None #make sure that a single oversized message still gets delivered large = "abcdefghijklmnopqrstuvwxyz" large = large + "-" + large; - channel.message_transfer(routing_key="test-prefetch-size", body=large) + channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body=large)) msg = queue.get(timeout=1) - self.assertEqual(large, msg.body) + self.assertEqual(large, msg.content.body) - def test_get(self): - """ - Test message_get method - """ - channel = self.channel - channel.queue_declare(queue="test-get", exclusive=True) - - #publish some messages (no_ack=True) - for i in range(1, 11): - channel.message_transfer(routing_key="test-get", body="Message %d" % i) - - #use message_get to read back the messages, and check that we get an empty at the end - for i in range(1, 11): - tag = "queue %d" % i - reply = channel.message_get(no_ack=True, queue="test-get", destination=tag) - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "ok") - self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body) - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - #repeat for confirm_mode=1 - for i in range(11, 21): - channel.message_transfer(routing_key="test-get", body="Message %d" % i) - - for i in range(11, 21): - tag = "queue %d" % i - reply = channel.message_get(confirm_mode=1, queue="test-get", destination=tag) - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "ok") - msg = self.client.queue(tag).get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - - if (i==13): - msg.complete()#11, 12 & 13 - if(i in [15, 17, 19]): - msg.complete(cumulative=False) - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - #recover(requeue=True) - channel.message_recover(requeue=True) - - #get the unacked messages again (14, 16, 18, 20) - for i in [14, 16, 18, 20]: - tag = "queue %d" % i - reply = channel.message_get(confirm_mode=1, queue="test-get", destination=tag) - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "ok") - msg = self.client.queue(tag).get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - msg.complete() - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - channel.message_recover(requeue=True) - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - def test_reference_simple(self): - """ - Test basic ability to handle references - """ - channel = self.channel - channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_subscribe(queue="ref_queue", destination="c1") - queue = self.client.queue("c1") - - refId = "myref" - channel.message_open(reference=refId) - channel.message_append(reference=refId, bytes="abcd") - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId)) - channel.synchronous = True - - channel.message_append(reference=refId, bytes="efgh") - channel.message_append(reference=refId, bytes="ijkl") - channel.message_close(reference=refId) - - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - self.assertDataEquals(channel, queue.get(timeout=1), "abcdefghijkl") - - - def test_reference_large(self): - """ - Test basic ability to handle references whose content exceeds max frame size - """ - channel = self.channel - self.queue_declare(queue="ref_queue") - - #generate a big data string (> max frame size of consumer): - data = "0123456789" - for i in range(0, 10): - data += data - #send it inline - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", body=data) - channel.synchronous = True - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - #create a new connection for consumer, with specific max frame size (< data) - other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0}) - ch2 = other.channel(1) - ch2.channel_open() - ch2.message_subscribe(queue="ref_queue", destination="c1") - queue = other.queue("c1") - - msg = queue.get(timeout=1) - self.assertTrue(isinstance(msg.body, ReferenceId)) - self.assertTrue(msg.reference) - self.assertEquals(data, msg.reference.get_complete()) - - def test_reference_completion(self): - """ - Test that reference transfer are not deemed complete until - closed (therefore are not acked or routed until that point) - """ - channel = self.channel - channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_subscribe(queue="ref_queue", destination="c1") - queue = self.client.queue("c1") - - refId = "myref" - channel.message_open(reference=refId) - channel.message_append(reference=refId, bytes="abcd") - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId)) - channel.synchronous = True - - try: - msg = queue.get(timeout=1) - self.fail("Got unexpected message on queue: " + msg) - except Empty: None - - self.assertTrue(not ack.is_complete()) - - channel.message_close(reference=refId) - - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - self.assertDataEquals(channel, queue.get(timeout=1), "abcd") - def test_reference_multi_transfer(self): - """ - Test that multiple transfer requests for the same reference are - correctly handled. - """ - channel = self.channel - #declare and consume from two queues - channel.queue_declare(queue="q-one", exclusive=True) - channel.queue_declare(queue="q-two", exclusive=True) - channel.message_subscribe(queue="q-one", destination="q-one") - channel.message_subscribe(queue="q-two", destination="q-two") - queue1 = self.client.queue("q-one") - queue2 = self.client.queue("q-two") - - #transfer a single ref to both queues (in separate commands) - channel.message_open(reference="my-ref") - channel.synchronous = False - ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref")) - channel.message_append(reference="my-ref", bytes="my data") - ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref")) - channel.synchronous = True - channel.message_close(reference="my-ref") - - #check that both queues have the message - self.assertDataEquals(channel, queue1.get(timeout=1), "my data") - self.assertDataEquals(channel, queue2.get(timeout=1), "my data") - self.assertEmpty(queue1) - self.assertEmpty(queue2) - - #transfer a single ref to the same queue twice (in separate commands) - channel.message_open(reference="my-ref") - channel.synchronous = False - ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref")) - channel.message_append(reference="my-ref", bytes="second message") - ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref")) - channel.synchronous = True - channel.message_close(reference="my-ref") - - msg1 = queue1.get(timeout=1) - msg2 = queue1.get(timeout=1) - #order is undefined - if msg1.message_id == "abc": - self.assertEquals(msg2.message_id, "xyz") - else: - self.assertEquals(msg1.message_id, "xyz") - self.assertEquals(msg2.message_id, "abc") - - #would be legal for the incoming messages to be transfered - #inline or by reference in any combination - - if isinstance(msg1.body, ReferenceId): - self.assertEquals("second message", msg1.reference.get_complete()) - if isinstance(msg2.body, ReferenceId): - if msg1.body != msg2.body: - self.assertEquals("second message", msg2.reference.get_complete()) - #else ok, as same ref as msg1 - else: - self.assertEquals("second message", msg1.body) - if isinstance(msg2.body, ReferenceId): - self.assertEquals("second message", msg2.reference.get_complete()) - else: - self.assertEquals("second message", msg2.body) - - self.assertEmpty(queue1) - - def test_reference_unopened_on_append_error(self): - channel = self.channel - try: - channel.message_append(reference="unopened") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_reference_unopened_on_close_error(self): - channel = self.channel - try: - channel.message_close(reference="unopened") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_reference_unopened_on_transfer_error(self): - channel = self.channel - try: - channel.message_transfer(body=ReferenceId("unopened")) - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_reference_already_opened_error(self): - channel = self.channel - channel.message_open(reference="a") - try: - channel.message_open(reference="a") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_empty_reference(self): - channel = self.channel - channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_subscribe(queue="ref_queue", destination="c1") - queue = self.client.queue("c1") - - refId = "myref" - channel.message_open(reference=refId) - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId)) - channel.synchronous = True - channel.message_close(reference=refId) - - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - msg = queue.get(timeout=1) - self.assertEquals(msg.message_id, "empty-msg") - self.assertDataEquals(channel, msg, "") def test_reject(self): channel = self.channel channel.queue_declare(queue = "q", exclusive=True) channel.message_subscribe(queue = "q", destination = "consumer") - channel.message_transfer(routing_key = "q", body="blah, blah") + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah")) msg = self.client.queue("consumer").get(timeout = 1) - self.assertEquals(msg.body, "blah, blah") + self.assertEquals(msg.content.body, "blah, blah") channel.message_cancel(destination = "consumer") msg.reject() channel.message_subscribe(queue = "q", destination = "checker") msg = self.client.queue("checker").get(timeout = 1) - self.assertEquals(msg.body, "blah, blah") - - def test_checkpoint(self): - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True) - - channel.message_open(reference="my-ref") - channel.message_append(reference="my-ref", bytes="abcdefgh") - channel.message_append(reference="my-ref", bytes="ijklmnop") - channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint") - channel.channel_close() - - channel = self.client.channel(2) - channel.channel_open() - channel.message_subscribe(queue = "q", destination = "consumer") - offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value - self.assertTrue(offset<=16) - channel.message_append(reference="my-ref", bytes="qrstuvwxyz") - channel.synchronous = False - channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref")) - channel.synchronous = True - channel.message_close(reference="my-ref") - - self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz") - self.assertEmpty(self.client.queue("consumer")) + self.assertEquals(msg.content.body, "blah, blah") def test_credit_flow_messages(self): """ @@ -660,7 +370,7 @@ class MessageTests(TestBase): channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): - channel.message_transfer(routing_key = "q", body = "Message %d" % i) + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) #set message credit to finite amount (less than enough for all messages) channel.message_flow(unit = 0, value = 5, destination = "c") @@ -692,7 +402,7 @@ class MessageTests(TestBase): channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): - channel.message_transfer(routing_key = "q", body = "abcdefgh") + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) #each message is currently interpreted as requiring 75 bytes of credit #set byte credit to finite amount (less than enough for all messages) @@ -726,7 +436,7 @@ class MessageTests(TestBase): channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): - channel.message_transfer(routing_key = "q", body = "Message %d" % i) + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) #set message credit to finite amount (less than enough for all messages) channel.message_flow(unit = 0, value = 5, destination = "c") @@ -760,7 +470,7 @@ class MessageTests(TestBase): channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): - channel.message_transfer(routing_key = "q", body = "abcdefgh") + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) #each message is currently interpreted as requiring 75 bytes of credit #set byte credit to finite amount (less than enough for all messages) @@ -783,11 +493,5 @@ class MessageTests(TestBase): self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) - - def assertDataEquals(self, channel, msg, expected): - if isinstance(msg.body, ReferenceId): - data = msg.reference.get_complete() - else: - data = msg.body - self.assertEquals(expected, data) + self.assertEquals(expected, msg.content.body) |