summaryrefslogtreecommitdiff
path: root/python/tests_0-10
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-07 13:20:02 +0000
committerGordon Sim <gsim@apache.org>2008-03-07 13:20:02 +0000
commit5d8a9df4ec3a4f030ed80e143ce6986c19ab800a (patch)
tree8417c3abe9dd81e6a73084aa36371981e06f9e27 /python/tests_0-10
parent9fd4909832e16734c47c13eebbe4aca66640b1b0 (diff)
downloadqpid-python-5d8a9df4ec3a4f030ed80e143ce6986c19ab800a.tar.gz
Altered management of delivery records to support separateion of completion (which drives flow control) and acceptance.
Converted flow control python tests. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@634661 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/tests_0-10')
-rw-r--r--python/tests_0-10/message.py60
-rw-r--r--python/tests_0-10/queue.py62
2 files changed, 63 insertions, 59 deletions
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index 5f97d6c705..64e2bc44c4 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -464,10 +464,10 @@ class MessageTests(TestBase010):
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
session.message_subscribe(queue = "q", destination = "c")
- session.message_flow_mode(mode = 0, destination = "c")
+ session.message_set_flow_mode(flow_mode = 0, destination = "c")
#send batch of messages to queue
for i in range(1, 11):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i))
#set message credit to finite amount (less than enough for all messages)
session.message_flow(unit = 0, value = 5, destination = "c")
@@ -494,13 +494,13 @@ class MessageTests(TestBase010):
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
session.message_subscribe(queue = "q", destination = "c")
- session.message_flow_mode(mode = 0, destination = "c")
+ session.message_set_flow_mode(flow_mode = 0, destination = "c")
#send batch of messages to queue
for i in range(10):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh"))
#each message is currently interpreted as requiring msg_size bytes of credit
- msg_size = 35
+ msg_size = 21
#set byte credit to finite amount (less than enough for all messages)
session.message_flow(unit = 1, value = msg_size*5, destination = "c")
@@ -527,11 +527,11 @@ class MessageTests(TestBase010):
session = self.session
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
- session.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
- session.message_flow_mode(mode = 1, destination = "c")
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_set_flow_mode(flow_mode = 1, destination = "c")
#send batch of messages to queue
for i in range(1, 11):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i))
#set message credit to finite amount (less than enough for all messages)
session.message_flow(unit = 0, value = 5, destination = "c")
@@ -539,13 +539,16 @@ class MessageTests(TestBase010):
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
#check that expected number were received
q = session.incoming("c")
- for i in range(1, 6):
+ for i in range(1, 6):
msg = q.get(timeout = 1)
+ session.receiver._completed.add(msg.id)#TODO: this may be done automatically
self.assertDataEquals(session, msg, "Message %d" % i)
self.assertEmpty(q)
#acknowledge messages and check more are received
- msg.complete(cumulative=True)
+ #TODO: there may be a nicer way of doing this
+ session.channel.session_completed(session.receiver._completed)
+
for i in range(6, 11):
self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
self.assertEmpty(q)
@@ -559,14 +562,14 @@ class MessageTests(TestBase010):
session = self.session
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
- session.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
- session.message_flow_mode(mode = 1, destination = "c")
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_set_flow_mode(flow_mode = 1, destination = "c")
#send batch of messages to queue
for i in range(10):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh"))
#each message is currently interpreted as requiring msg_size bytes of credit
- msg_size = 40
+ msg_size = 19
#set byte credit to finite amount (less than enough for all messages)
session.message_flow(unit = 1, value = msg_size*5, destination = "c")
@@ -584,7 +587,9 @@ class MessageTests(TestBase010):
#ack each message individually and check more are received
for i in range(5):
msg = msgs.pop()
- msg.complete(cumulative=False)
+ #TODO: there may be a nicer way of doing this
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
self.assertEmpty(q)
@@ -595,13 +600,17 @@ class MessageTests(TestBase010):
session = self.session
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 6):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))
- self.subscribe(queue = "q", destination = "a", acquire_mode = 1)
- self.subscribe(queue = "q", destination = "b", acquire_mode = 1)
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
+ session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1)
+ session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "b")
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
for i in range(6, 11):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))
#both subscribers should see all messages
qA = session.incoming("a")
@@ -610,8 +619,9 @@ class MessageTests(TestBase010):
for q in [qA, qB]:
msg = q.get(timeout = 1)
self.assertEquals("Message %s" % i, msg.body)
- msg.complete()
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
#messages should still be on the queue:
self.assertEquals(10, session.queue_query(queue = "q").message_count)
@@ -625,7 +635,7 @@ class MessageTests(TestBase010):
#use fanout for now:
session.exchange_bind(exchange="amq.fanout", queue="q")
session.message_transfer(destination="amq.fanout", message=Message("acquire me"))
- #session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
+ #session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
session.message_flow(destination="a", unit=0, value=0xFFFFFFFF)
@@ -724,11 +734,11 @@ class MessageTests(TestBase010):
#publish some messages
self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 11):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
#consume some of them
session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
- session.message_flow_mode(mode = 0, destination = "a")
+ session.message_set_flow_mode(flow_mode = 0, destination = "a")
session.message_flow(unit = 0, value = 5, destination = "a")
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
@@ -762,7 +772,7 @@ class MessageTests(TestBase010):
#publish some messages
self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 11):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
#create a not-acquired subscriber
session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
@@ -861,7 +871,7 @@ class MessageTests(TestBase010):
def assertEmpty(self, queue):
try:
extra = queue.get(timeout=1)
- self.fail("Queue not empty, contains: " + extra.content.body)
+ self.fail("Queue not empty, contains: " + extra.body)
except Empty: None
class SizelessContent(Content):
diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py
index f192a2af90..b972166325 100644
--- a/python/tests_0-10/queue.py
+++ b/python/tests_0-10/queue.py
@@ -18,10 +18,10 @@
#
from qpid.client import Client, Closed
from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message
-class QueueTests(TestBase):
+class QueueTests(TestBase010):
"""Tests for 'methods' on the amqp queue 'class'"""
def test_purge(self):
@@ -31,9 +31,9 @@ class QueueTests(TestBase):
session = self.session
#setup, declare a queue and add some messages to it:
session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
- session.message_transfer(content=Content("one", properties={'routing_key':"test-queue"}))
- session.message_transfer(content=Content("two", properties={'routing_key':"test-queue"}))
- session.message_transfer(content=Content("three", properties={'routing_key':"test-queue"}))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "one"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "two"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "three"))
#check that the queue now reports 3 messages:
session.queue_declare(queue="test-queue")
@@ -46,15 +46,16 @@ class QueueTests(TestBase):
self.assertEqual(0, reply.message_count)
#send a further message and consume it, ensuring that the other messages are really gone
- session.message_transfer(content=Content("four", properties={'routing_key':"test-queue"}))
- self.subscribe(queue="test-queue", destination="tag")
- queue = self.client.queue("tag")
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "four"))
+ session.message_subscribe(queue="test-queue", destination="tag")
+ session.message_flow(destination="tag", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="tag", unit=1, value=0xFFFFFFFF)
+ queue = session.incoming("tag")
msg = queue.get(timeout=1)
- self.assertEqual("four", msg.content.body)
+ self.assertEqual("four", msg.body)
#check error conditions (use new sessions):
- session = self.client.session(2)
- session.session_open()
+ session = self.conn.session("error-checker")
try:
#queue specified but doesn't exist:
session.queue_purge(queue="invalid-queue")
@@ -62,8 +63,7 @@ class QueueTests(TestBase):
except Closed, e:
self.assertChannelException(404, e.args[0])
- session = self.client.session(3)
- session.session_open()
+ session = self.conn.session("error-checker")
try:
#queue not specified and none previously declared for channel:
session.queue_purge()
@@ -71,12 +71,6 @@ class QueueTests(TestBase):
except Closed, e:
self.assertConnectionException(530, e.args[0])
- #cleanup
- other = self.connect()
- session = other.session(1)
- session.session_open()
- session.exchange_delete(exchange="test-exchange")
-
def test_declare_exclusive(self):
"""
Test that the exclusive field is honoured in queue.declare
@@ -167,32 +161,32 @@ class QueueTests(TestBase):
self.subscribe(queue="queue-1", destination="queue-1")
self.subscribe(queue="queue-2", destination="queue-2")
- queue1 = self.client.queue("queue-1")
- queue2 = self.client.queue("queue-2")
+ queue1 = session.incoming("queue-1")
+ queue2 = session.incoming("queue-2")
session.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
session.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args)
#send a message that will match both bindings
session.message_transfer(destination=exchange,
- content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers}))
+ message=Message(session.delivery_properties(routing_key=routing_key, application_headers=headers), "one"))
#unbind first queue
session.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
#send another message
session.message_transfer(destination=exchange,
- content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers}))
+ message=Message(session.delivery_properties(routing_key=routing_key, application_headers=headers), "two", ))
#check one queue has both messages and the other has only one
- self.assertEquals("one", queue1.get(timeout=1).content.body)
+ self.assertEquals("one", queue1.get(timeout=1).body)
try:
msg = queue1.get(timeout=1)
- self.fail("Got extra message: %s" % msg.content.body)
+ self.fail("Got extra message: %s" % msg.body)
except Empty: pass
- self.assertEquals("one", queue2.get(timeout=1).content.body)
- self.assertEquals("two", queue2.get(timeout=1).content.body)
+ self.assertEquals("one", queue2.get(timeout=1).body)
+ self.assertEquals("two", queue2.get(timeout=1).body)
try:
msg = queue2.get(timeout=1)
self.fail("Got extra message: " + msg)
@@ -207,9 +201,9 @@ class QueueTests(TestBase):
#straight-forward case:
session.queue_declare(queue="delete-me")
- session.message_transfer(content=Content("a", properties={'routing_key':"delete-me"}))
- session.message_transfer(content=Content("b", properties={'routing_key':"delete-me"}))
- session.message_transfer(content=Content("c", properties={'routing_key':"delete-me"}))
+ session.message_transfer(message=Message("a", session.delivery_properties(routing_key="delete-me")))
+ session.message_transfer(message=Message("b", session.delivery_properties(routing_key="delete-me")))
+ session.message_transfer(message=Message("c", session.delivery_properties(routing_key="delete-me")))
session.queue_delete(queue="delete-me")
#check that it has gone be declaring passively
try:
@@ -238,7 +232,7 @@ class QueueTests(TestBase):
#create a queue and add a message to it (use default binding):
session.queue_declare(queue="delete-me-2")
session.queue_declare(queue="delete-me-2", passive=True)
- session.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"}))
+ session.message_transfer(message=Message("message", session.delivery_properties(routing_key="delete-me-2")))
#try to delete, but only if empty:
try:
@@ -253,9 +247,9 @@ class QueueTests(TestBase):
#empty queue:
self.subscribe(session, destination="consumer_tag", queue="delete-me-2")
- queue = self.client.queue("consumer_tag")
+ queue = session.incoming("consumer_tag")
msg = queue.get(timeout=1)
- self.assertEqual("message", msg.content.body)
+ self.assertEqual("message", msg.body)
session.message_cancel(destination="consumer_tag")
#retry deletion on empty queue: