summaryrefslogtreecommitdiff
path: root/python/tests_0-10/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests_0-10/message.py')
-rw-r--r--python/tests_0-10/message.py410
1 files changed, 57 insertions, 353 deletions
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index 6cf2f3ef89..f08f437a65 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -38,14 +38,14 @@ class MessageTests(TestBase):
channel.message_subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True)
#send a message
- channel.message_transfer(routing_key="test-queue-1a", body="consume_no_local")
- channel.message_transfer(routing_key="test-queue-1b", body="consume_no_local")
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local"))
#check the queues of the two consumers
excluded = self.client.queue("local_excluded")
included = self.client.queue("local_included")
msg = included.get(timeout=1)
- self.assertEqual("consume_no_local", msg.body)
+ self.assertEqual("consume_no_local", msg.content.body)
try:
excluded.get(timeout=1)
self.fail("Received locally published message though no_local=true")
@@ -125,14 +125,14 @@ class MessageTests(TestBase):
#setup, declare a queue:
channel.queue_declare(queue="test-queue-4", exclusive=True)
channel.message_subscribe(destination="my-consumer", queue="test-queue-4")
- channel.message_transfer(routing_key="test-queue-4", body="One")
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One"))
#cancel should stop messages being delivered
channel.message_cancel(destination="my-consumer")
- channel.message_transfer(routing_key="test-queue-4", body="Two")
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="Two"))
myqueue = self.client.queue("my-consumer")
msg = myqueue.get(timeout=1)
- self.assertEqual("One", msg.body)
+ self.assertEqual("One", msg.content.body)
try:
msg = myqueue.get(timeout=1)
self.fail("Got message after cancellation: " + msg)
@@ -153,11 +153,11 @@ class MessageTests(TestBase):
channel.message_subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
- channel.message_transfer(routing_key="test-ack-queue", body="One")
- channel.message_transfer(routing_key="test-ack-queue", body="Two")
- channel.message_transfer(routing_key="test-ack-queue", body="Three")
- channel.message_transfer(routing_key="test-ack-queue", body="Four")
- channel.message_transfer(routing_key="test-ack-queue", body="Five")
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Two"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Three"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Four"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Five"))
msg1 = queue.get(timeout=1)
msg2 = queue.get(timeout=1)
@@ -165,11 +165,11 @@ class MessageTests(TestBase):
msg4 = queue.get(timeout=1)
msg5 = queue.get(timeout=1)
- self.assertEqual("One", msg1.body)
- self.assertEqual("Two", msg2.body)
- self.assertEqual("Three", msg3.body)
- self.assertEqual("Four", msg4.body)
- self.assertEqual("Five", msg5.body)
+ self.assertEqual("One", msg1.content.body)
+ self.assertEqual("Two", msg2.content.body)
+ self.assertEqual("Three", msg3.content.body)
+ self.assertEqual("Four", msg4.content.body)
+ self.assertEqual("Five", msg5.content.body)
msg2.complete(cumulative=True)#One and Two
msg4.complete(cumulative=False)
@@ -179,12 +179,12 @@ class MessageTests(TestBase):
msg3b = queue.get(timeout=1)
msg5b = queue.get(timeout=1)
- self.assertEqual("Three", msg3b.body)
- self.assertEqual("Five", msg5b.body)
+ self.assertEqual("Three", msg3b.content.body)
+ self.assertEqual("Five", msg5b.content.body)
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
+ self.fail("Got unexpected message: " + extra.content.body)
except Empty: None
def test_recover_requeue(self):
@@ -197,11 +197,11 @@ class MessageTests(TestBase):
channel.message_subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
- channel.message_transfer(routing_key="test-requeue", body="One")
- channel.message_transfer(routing_key="test-requeue", body="Two")
- channel.message_transfer(routing_key="test-requeue", body="Three")
- channel.message_transfer(routing_key="test-requeue", body="Four")
- channel.message_transfer(routing_key="test-requeue", body="Five")
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Two"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five"))
msg1 = queue.get(timeout=1)
msg2 = queue.get(timeout=1)
@@ -209,11 +209,11 @@ class MessageTests(TestBase):
msg4 = queue.get(timeout=1)
msg5 = queue.get(timeout=1)
- self.assertEqual("One", msg1.body)
- self.assertEqual("Two", msg2.body)
- self.assertEqual("Three", msg3.body)
- self.assertEqual("Four", msg4.body)
- self.assertEqual("Five", msg5.body)
+ self.assertEqual("One", msg1.content.body)
+ self.assertEqual("Two", msg2.content.body)
+ self.assertEqual("Three", msg3.content.body)
+ self.assertEqual("Four", msg4.content.body)
+ self.assertEqual("Five", msg5.content.body)
msg2.complete(cumulative=True) #One and Two
msg4.complete(cumulative=False) #Four
@@ -221,7 +221,7 @@ class MessageTests(TestBase):
channel.message_cancel(destination="consumer_tag")
#publish a new message
- channel.message_transfer(routing_key="test-requeue", body="Six")
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Six"))
#requeue unacked messages (Three and Five)
channel.message_recover(requeue=True)
@@ -231,21 +231,21 @@ class MessageTests(TestBase):
msg3b = queue2.get(timeout=1)
msg5b = queue2.get(timeout=1)
- self.assertEqual("Three", msg3b.body)
- self.assertEqual("Five", msg5b.body)
+ self.assertEqual("Three", msg3b.content.body)
+ self.assertEqual("Five", msg5b.content.body)
- self.assertEqual(True, msg3b.redelivered)
- self.assertEqual(True, msg5b.redelivered)
+ self.assertEqual(True, msg3b.content['redelivered'])
+ self.assertEqual(True, msg5b.content['redelivered'])
- self.assertEqual("Six", queue2.get(timeout=1).body)
+ self.assertEqual("Six", queue2.get(timeout=1).content.body)
try:
extra = queue2.get(timeout=1)
- self.fail("Got unexpected message in second queue: " + extra.body)
+ self.fail("Got unexpected message in second queue: " + extra.content.body)
except Empty: None
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected message in original queue: " + extra.body)
+ self.fail("Got unexpected message in original queue: " + extra.content.body)
except Empty: None
@@ -264,15 +264,15 @@ class MessageTests(TestBase):
#publish 10 messages:
for i in range(1, 11):
- channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i)
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-count"}, body="Message %d" % i))
#only 5 messages should have been delivered:
for i in range(1, 6):
msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
+ self.assertEqual("Message %d" % i, msg.content.body)
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.body)
+ self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
except Empty: None
#ack messages and check that the next set arrive ok:
@@ -280,13 +280,13 @@ class MessageTests(TestBase):
for i in range(6, 11):
msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
+ self.assertEqual("Message %d" % i, msg.content.body)
msg.complete()
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.body)
+ self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
except Empty: None
@@ -306,16 +306,16 @@ class MessageTests(TestBase):
#publish 10 messages:
for i in range(1, 11):
- channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i)
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body="Message %d" % i))
#only 5 messages should have been delivered (i.e. 45 bytes worth):
for i in range(1, 6):
msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
+ self.assertEqual("Message %d" % i, msg.content.body)
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.body)
+ self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
except Empty: None
#ack messages and check that the next set arrive ok:
@@ -323,328 +323,38 @@ class MessageTests(TestBase):
for i in range(6, 11):
msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
+ self.assertEqual("Message %d" % i, msg.content.body)
msg.complete()
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.body)
+ self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
except Empty: None
#make sure that a single oversized message still gets delivered
large = "abcdefghijklmnopqrstuvwxyz"
large = large + "-" + large;
- channel.message_transfer(routing_key="test-prefetch-size", body=large)
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body=large))
msg = queue.get(timeout=1)
- self.assertEqual(large, msg.body)
+ self.assertEqual(large, msg.content.body)
- def test_get(self):
- """
- Test message_get method
- """
- channel = self.channel
- channel.queue_declare(queue="test-get", exclusive=True)
-
- #publish some messages (no_ack=True)
- for i in range(1, 11):
- channel.message_transfer(routing_key="test-get", body="Message %d" % i)
-
- #use message_get to read back the messages, and check that we get an empty at the end
- for i in range(1, 11):
- tag = "queue %d" % i
- reply = channel.message_get(no_ack=True, queue="test-get", destination=tag)
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "ok")
- self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
-
- reply = channel.message_get(no_ack=True, queue="test-get")
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "empty")
-
- #repeat for confirm_mode=1
- for i in range(11, 21):
- channel.message_transfer(routing_key="test-get", body="Message %d" % i)
-
- for i in range(11, 21):
- tag = "queue %d" % i
- reply = channel.message_get(confirm_mode=1, queue="test-get", destination=tag)
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "ok")
- msg = self.client.queue(tag).get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
-
- if (i==13):
- msg.complete()#11, 12 & 13
- if(i in [15, 17, 19]):
- msg.complete(cumulative=False)
-
- reply = channel.message_get(no_ack=True, queue="test-get")
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "empty")
-
- #recover(requeue=True)
- channel.message_recover(requeue=True)
-
- #get the unacked messages again (14, 16, 18, 20)
- for i in [14, 16, 18, 20]:
- tag = "queue %d" % i
- reply = channel.message_get(confirm_mode=1, queue="test-get", destination=tag)
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "ok")
- msg = self.client.queue(tag).get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
- msg.complete()
-
- reply = channel.message_get(no_ack=True, queue="test-get")
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "empty")
-
- channel.message_recover(requeue=True)
-
- reply = channel.message_get(no_ack=True, queue="test-get")
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "empty")
-
- def test_reference_simple(self):
- """
- Test basic ability to handle references
- """
- channel = self.channel
- channel.queue_declare(queue="ref_queue", exclusive=True)
- channel.message_subscribe(queue="ref_queue", destination="c1")
- queue = self.client.queue("c1")
-
- refId = "myref"
- channel.message_open(reference=refId)
- channel.message_append(reference=refId, bytes="abcd")
- channel.synchronous = False
- ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId))
- channel.synchronous = True
-
- channel.message_append(reference=refId, bytes="efgh")
- channel.message_append(reference=refId, bytes="ijkl")
- channel.message_close(reference=refId)
-
- #first, wait for the ok for the transfer
- ack.get_response(timeout=1)
-
- self.assertDataEquals(channel, queue.get(timeout=1), "abcdefghijkl")
-
-
- def test_reference_large(self):
- """
- Test basic ability to handle references whose content exceeds max frame size
- """
- channel = self.channel
- self.queue_declare(queue="ref_queue")
-
- #generate a big data string (> max frame size of consumer):
- data = "0123456789"
- for i in range(0, 10):
- data += data
- #send it inline
- channel.synchronous = False
- ack = channel.message_transfer(routing_key="ref_queue", body=data)
- channel.synchronous = True
- #first, wait for the ok for the transfer
- ack.get_response(timeout=1)
-
- #create a new connection for consumer, with specific max frame size (< data)
- other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0})
- ch2 = other.channel(1)
- ch2.channel_open()
- ch2.message_subscribe(queue="ref_queue", destination="c1")
- queue = other.queue("c1")
-
- msg = queue.get(timeout=1)
- self.assertTrue(isinstance(msg.body, ReferenceId))
- self.assertTrue(msg.reference)
- self.assertEquals(data, msg.reference.get_complete())
-
- def test_reference_completion(self):
- """
- Test that reference transfer are not deemed complete until
- closed (therefore are not acked or routed until that point)
- """
- channel = self.channel
- channel.queue_declare(queue="ref_queue", exclusive=True)
- channel.message_subscribe(queue="ref_queue", destination="c1")
- queue = self.client.queue("c1")
-
- refId = "myref"
- channel.message_open(reference=refId)
- channel.message_append(reference=refId, bytes="abcd")
- channel.synchronous = False
- ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId))
- channel.synchronous = True
-
- try:
- msg = queue.get(timeout=1)
- self.fail("Got unexpected message on queue: " + msg)
- except Empty: None
-
- self.assertTrue(not ack.is_complete())
-
- channel.message_close(reference=refId)
-
- #first, wait for the ok for the transfer
- ack.get_response(timeout=1)
-
- self.assertDataEquals(channel, queue.get(timeout=1), "abcd")
- def test_reference_multi_transfer(self):
- """
- Test that multiple transfer requests for the same reference are
- correctly handled.
- """
- channel = self.channel
- #declare and consume from two queues
- channel.queue_declare(queue="q-one", exclusive=True)
- channel.queue_declare(queue="q-two", exclusive=True)
- channel.message_subscribe(queue="q-one", destination="q-one")
- channel.message_subscribe(queue="q-two", destination="q-two")
- queue1 = self.client.queue("q-one")
- queue2 = self.client.queue("q-two")
-
- #transfer a single ref to both queues (in separate commands)
- channel.message_open(reference="my-ref")
- channel.synchronous = False
- ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref"))
- channel.message_append(reference="my-ref", bytes="my data")
- ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref"))
- channel.synchronous = True
- channel.message_close(reference="my-ref")
-
- #check that both queues have the message
- self.assertDataEquals(channel, queue1.get(timeout=1), "my data")
- self.assertDataEquals(channel, queue2.get(timeout=1), "my data")
- self.assertEmpty(queue1)
- self.assertEmpty(queue2)
-
- #transfer a single ref to the same queue twice (in separate commands)
- channel.message_open(reference="my-ref")
- channel.synchronous = False
- ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref"))
- channel.message_append(reference="my-ref", bytes="second message")
- ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref"))
- channel.synchronous = True
- channel.message_close(reference="my-ref")
-
- msg1 = queue1.get(timeout=1)
- msg2 = queue1.get(timeout=1)
- #order is undefined
- if msg1.message_id == "abc":
- self.assertEquals(msg2.message_id, "xyz")
- else:
- self.assertEquals(msg1.message_id, "xyz")
- self.assertEquals(msg2.message_id, "abc")
-
- #would be legal for the incoming messages to be transfered
- #inline or by reference in any combination
-
- if isinstance(msg1.body, ReferenceId):
- self.assertEquals("second message", msg1.reference.get_complete())
- if isinstance(msg2.body, ReferenceId):
- if msg1.body != msg2.body:
- self.assertEquals("second message", msg2.reference.get_complete())
- #else ok, as same ref as msg1
- else:
- self.assertEquals("second message", msg1.body)
- if isinstance(msg2.body, ReferenceId):
- self.assertEquals("second message", msg2.reference.get_complete())
- else:
- self.assertEquals("second message", msg2.body)
-
- self.assertEmpty(queue1)
-
- def test_reference_unopened_on_append_error(self):
- channel = self.channel
- try:
- channel.message_append(reference="unopened")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_reference_unopened_on_close_error(self):
- channel = self.channel
- try:
- channel.message_close(reference="unopened")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_reference_unopened_on_transfer_error(self):
- channel = self.channel
- try:
- channel.message_transfer(body=ReferenceId("unopened"))
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_reference_already_opened_error(self):
- channel = self.channel
- channel.message_open(reference="a")
- try:
- channel.message_open(reference="a")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_empty_reference(self):
- channel = self.channel
- channel.queue_declare(queue="ref_queue", exclusive=True)
- channel.message_subscribe(queue="ref_queue", destination="c1")
- queue = self.client.queue("c1")
-
- refId = "myref"
- channel.message_open(reference=refId)
- channel.synchronous = False
- ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId))
- channel.synchronous = True
- channel.message_close(reference=refId)
-
- #first, wait for the ok for the transfer
- ack.get_response(timeout=1)
-
- msg = queue.get(timeout=1)
- self.assertEquals(msg.message_id, "empty-msg")
- self.assertDataEquals(channel, msg, "")
def test_reject(self):
channel = self.channel
channel.queue_declare(queue = "q", exclusive=True)
channel.message_subscribe(queue = "q", destination = "consumer")
- channel.message_transfer(routing_key = "q", body="blah, blah")
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah"))
msg = self.client.queue("consumer").get(timeout = 1)
- self.assertEquals(msg.body, "blah, blah")
+ self.assertEquals(msg.content.body, "blah, blah")
channel.message_cancel(destination = "consumer")
msg.reject()
channel.message_subscribe(queue = "q", destination = "checker")
msg = self.client.queue("checker").get(timeout = 1)
- self.assertEquals(msg.body, "blah, blah")
-
- def test_checkpoint(self):
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
-
- channel.message_open(reference="my-ref")
- channel.message_append(reference="my-ref", bytes="abcdefgh")
- channel.message_append(reference="my-ref", bytes="ijklmnop")
- channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint")
- channel.channel_close()
-
- channel = self.client.channel(2)
- channel.channel_open()
- channel.message_subscribe(queue = "q", destination = "consumer")
- offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value
- self.assertTrue(offset<=16)
- channel.message_append(reference="my-ref", bytes="qrstuvwxyz")
- channel.synchronous = False
- channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref"))
- channel.synchronous = True
- channel.message_close(reference="my-ref")
-
- self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz")
- self.assertEmpty(self.client.queue("consumer"))
+ self.assertEquals(msg.content.body, "blah, blah")
def test_credit_flow_messages(self):
"""
@@ -660,7 +370,7 @@ class MessageTests(TestBase):
channel.message_stop(destination = "c")
#send batch of messages to queue
for i in range(1, 11):
- channel.message_transfer(routing_key = "q", body = "Message %d" % i)
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
#set message credit to finite amount (less than enough for all messages)
channel.message_flow(unit = 0, value = 5, destination = "c")
@@ -692,7 +402,7 @@ class MessageTests(TestBase):
channel.message_stop(destination = "c")
#send batch of messages to queue
for i in range(1, 11):
- channel.message_transfer(routing_key = "q", body = "abcdefgh")
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
#each message is currently interpreted as requiring 75 bytes of credit
#set byte credit to finite amount (less than enough for all messages)
@@ -726,7 +436,7 @@ class MessageTests(TestBase):
channel.message_stop(destination = "c")
#send batch of messages to queue
for i in range(1, 11):
- channel.message_transfer(routing_key = "q", body = "Message %d" % i)
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
#set message credit to finite amount (less than enough for all messages)
channel.message_flow(unit = 0, value = 5, destination = "c")
@@ -760,7 +470,7 @@ class MessageTests(TestBase):
channel.message_stop(destination = "c")
#send batch of messages to queue
for i in range(1, 11):
- channel.message_transfer(routing_key = "q", body = "abcdefgh")
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
#each message is currently interpreted as requiring 75 bytes of credit
#set byte credit to finite amount (less than enough for all messages)
@@ -783,11 +493,5 @@ class MessageTests(TestBase):
self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
self.assertEmpty(q)
-
-
def assertDataEquals(self, channel, msg, expected):
- if isinstance(msg.body, ReferenceId):
- data = msg.reference.get_complete()
- else:
- data = msg.body
- self.assertEquals(expected, data)
+ self.assertEquals(expected, msg.content.body)