summaryrefslogtreecommitdiff
path: root/python/tests_0-10/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests_0-10/message.py')
-rw-r--r--python/tests_0-10/message.py60
1 files changed, 35 insertions, 25 deletions
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index 5f97d6c705..64e2bc44c4 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -464,10 +464,10 @@ class MessageTests(TestBase010):
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
session.message_subscribe(queue = "q", destination = "c")
- session.message_flow_mode(mode = 0, destination = "c")
+ session.message_set_flow_mode(flow_mode = 0, destination = "c")
#send batch of messages to queue
for i in range(1, 11):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i))
#set message credit to finite amount (less than enough for all messages)
session.message_flow(unit = 0, value = 5, destination = "c")
@@ -494,13 +494,13 @@ class MessageTests(TestBase010):
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
session.message_subscribe(queue = "q", destination = "c")
- session.message_flow_mode(mode = 0, destination = "c")
+ session.message_set_flow_mode(flow_mode = 0, destination = "c")
#send batch of messages to queue
for i in range(10):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh"))
#each message is currently interpreted as requiring msg_size bytes of credit
- msg_size = 35
+ msg_size = 21
#set byte credit to finite amount (less than enough for all messages)
session.message_flow(unit = 1, value = msg_size*5, destination = "c")
@@ -527,11 +527,11 @@ class MessageTests(TestBase010):
session = self.session
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
- session.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
- session.message_flow_mode(mode = 1, destination = "c")
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_set_flow_mode(flow_mode = 1, destination = "c")
#send batch of messages to queue
for i in range(1, 11):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i))
#set message credit to finite amount (less than enough for all messages)
session.message_flow(unit = 0, value = 5, destination = "c")
@@ -539,13 +539,16 @@ class MessageTests(TestBase010):
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
#check that expected number were received
q = session.incoming("c")
- for i in range(1, 6):
+ for i in range(1, 6):
msg = q.get(timeout = 1)
+ session.receiver._completed.add(msg.id)#TODO: this may be done automatically
self.assertDataEquals(session, msg, "Message %d" % i)
self.assertEmpty(q)
#acknowledge messages and check more are received
- msg.complete(cumulative=True)
+ #TODO: there may be a nicer way of doing this
+ session.channel.session_completed(session.receiver._completed)
+
for i in range(6, 11):
self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
self.assertEmpty(q)
@@ -559,14 +562,14 @@ class MessageTests(TestBase010):
session = self.session
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
- session.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
- session.message_flow_mode(mode = 1, destination = "c")
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_set_flow_mode(flow_mode = 1, destination = "c")
#send batch of messages to queue
for i in range(10):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh"))
#each message is currently interpreted as requiring msg_size bytes of credit
- msg_size = 40
+ msg_size = 19
#set byte credit to finite amount (less than enough for all messages)
session.message_flow(unit = 1, value = msg_size*5, destination = "c")
@@ -584,7 +587,9 @@ class MessageTests(TestBase010):
#ack each message individually and check more are received
for i in range(5):
msg = msgs.pop()
- msg.complete(cumulative=False)
+ #TODO: there may be a nicer way of doing this
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
self.assertEmpty(q)
@@ -595,13 +600,17 @@ class MessageTests(TestBase010):
session = self.session
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 6):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))
- self.subscribe(queue = "q", destination = "a", acquire_mode = 1)
- self.subscribe(queue = "q", destination = "b", acquire_mode = 1)
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
+ session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1)
+ session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "b")
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
for i in range(6, 11):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))
#both subscribers should see all messages
qA = session.incoming("a")
@@ -610,8 +619,9 @@ class MessageTests(TestBase010):
for q in [qA, qB]:
msg = q.get(timeout = 1)
self.assertEquals("Message %s" % i, msg.body)
- msg.complete()
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
#messages should still be on the queue:
self.assertEquals(10, session.queue_query(queue = "q").message_count)
@@ -625,7 +635,7 @@ class MessageTests(TestBase010):
#use fanout for now:
session.exchange_bind(exchange="amq.fanout", queue="q")
session.message_transfer(destination="amq.fanout", message=Message("acquire me"))
- #session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
+ #session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
session.message_flow(destination="a", unit=0, value=0xFFFFFFFF)
@@ -724,11 +734,11 @@ class MessageTests(TestBase010):
#publish some messages
self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 11):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
#consume some of them
session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
- session.message_flow_mode(mode = 0, destination = "a")
+ session.message_set_flow_mode(flow_mode = 0, destination = "a")
session.message_flow(unit = 0, value = 5, destination = "a")
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
@@ -762,7 +772,7 @@ class MessageTests(TestBase010):
#publish some messages
self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 11):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
#create a not-acquired subscriber
session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
@@ -861,7 +871,7 @@ class MessageTests(TestBase010):
def assertEmpty(self, queue):
try:
extra = queue.get(timeout=1)
- self.fail("Queue not empty, contains: " + extra.content.body)
+ self.fail("Queue not empty, contains: " + extra.body)
except Empty: None
class SizelessContent(Content):