summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-09-03 17:35:35 +0000
committerGordon Sim <gsim@apache.org>2007-09-03 17:35:35 +0000
commit331b0e84ae06da0c057a82c0f5b67f550bcd636b (patch)
tree91342743f16ad473b456a5ef336409e4af38cd5a /python
parentcadb67eb27e00abb493793363dcf37f4d2f563dc (diff)
downloadqpid-python-331b0e84ae06da0c057a82c0f5b67f550bcd636b.tar.gz
Initial implementation (plus very simple tests) of message.acquire, message.release, message.reject and message.flush.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@572394 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/cpp_failing_0-10.txt1
-rw-r--r--python/qpid/client.py3
-rw-r--r--python/qpid/peer.py1
-rw-r--r--python/tests_0-10/message.py78
4 files changed, 75 insertions, 8 deletions
diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt
index 97cf420717..5b2fb593e1 100644
--- a/python/cpp_failing_0-10.txt
+++ b/python/cpp_failing_0-10.txt
@@ -1,4 +1,3 @@
tests_0-10.alternate-exchange.AlternateExchangeTests.test_immediate
-tests_0-10.message.MessageTests.test_reject
tests_0-10.basic.BasicTests.test_get
diff --git a/python/qpid/client.py b/python/qpid/client.py
index edcd1b8ad2..5734873c6f 100644
--- a/python/qpid/client.py
+++ b/python/qpid/client.py
@@ -126,6 +126,9 @@ class ClientDelegate(Delegate):
def message_append(self, ch, msg):
ch.references.get(msg.reference).append(msg.bytes)
+ def message_acquired(self, ch, msg):
+ ch.control_queue.put(msg)
+
def basic_deliver(self, ch, msg):
self.client.queue(msg.consumer_tag).put(msg)
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 6ad5482f09..b9dd4e466a 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -190,6 +190,7 @@ class Channel:
self.completion = OutgoingCompletion()
self.incoming_completion = IncomingCompletion(self)
self.futures = {}
+ self.control_queue = Queue(0)#used for incoming methods that appas may want to handle themselves
# Use reliable framing if version == 0-9.
if spec.major == 0 and spec.minor == 9:
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index f08f437a65..8089709314 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -339,20 +339,19 @@ class MessageTests(TestBase):
msg = queue.get(timeout=1)
self.assertEqual(large, msg.content.body)
-
-
def test_reject(self):
channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
+ channel.queue_declare(queue = "q", exclusive=True, alternate_exchange="amq.fanout")
+ channel.queue_declare(queue = "r", exclusive=True)
+ channel.queue_bind(queue = "r", exchange = "amq.fanout")
- channel.message_subscribe(queue = "q", destination = "consumer")
+ channel.message_subscribe(queue = "q", destination = "consumer", confirm_mode = 1)
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah"))
msg = self.client.queue("consumer").get(timeout = 1)
self.assertEquals(msg.content.body, "blah, blah")
- channel.message_cancel(destination = "consumer")
- msg.reject()
+ channel.message_reject([msg.command_id, msg.command_id])
- channel.message_subscribe(queue = "q", destination = "checker")
+ channel.message_subscribe(queue = "r", destination = "checker")
msg = self.client.queue("checker").get(timeout = 1)
self.assertEquals(msg.content.body, "blah, blah")
@@ -493,5 +492,70 @@ class MessageTests(TestBase):
self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
self.assertEmpty(q)
+ def test_subscribe_not_acquired(self):
+ """
+ Test the not-acquired modes works as expected for a simple case
+ """
+ #NOTE: I'm using not-acquired == 1 and pre-acquired == 0 as
+ #that keeps the default behaviour as expected. This was
+ #discussed by the SIG, and I'd rather not change all the
+ #existing tests twice.
+
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+ for i in range(1, 6):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+
+ channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
+ channel.message_subscribe(queue = "q", destination = "b", acquire_mode = 1)
+
+ for i in range(6, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+
+ #both subscribers should see all messages
+ qA = self.client.queue("a")
+ qB = self.client.queue("b")
+ for i in range(1, 11):
+ for q in [qA, qB]:
+ msg = q.get(timeout = 1)
+ self.assertEquals("Message %s" % i, msg.content.body)
+ msg.complete()
+
+ #messages should still be on the queue:
+ self.assertEquals(10, channel.queue_query(queue = "q").message_count)
+
+ def test_acquire(self):
+ """
+ Test explicit acquire function
+ """
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
+
+ channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1)
+ msg = self.client.queue("a").get(timeout = 1)
+ channel.message_acquire([msg.command_id, msg.command_id])
+ msg.complete()
+
+ #message should have been removed from the queue:
+ self.assertEquals(0, channel.queue_query(queue = "q").message_count)
+
+ def test_release(self):
+ """
+ Test explicit release function
+ """
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me"))
+
+ channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1)
+ msg = self.client.queue("a").get(timeout = 1)
+ channel.message_cancel(destination = "a")
+ channel.message_release([msg.command_id, msg.command_id])
+ msg.complete()
+
+ #message should not have been removed from the queue:
+ self.assertEquals(1, channel.queue_query(queue = "q").message_count)
+
def assertDataEquals(self, channel, msg, expected):
self.assertEquals(expected, msg.content.body)