diff options
| author | Gordon Sim <gsim@apache.org> | 2007-09-03 17:35:35 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-09-03 17:35:35 +0000 |
| commit | 331b0e84ae06da0c057a82c0f5b67f550bcd636b (patch) | |
| tree | 91342743f16ad473b456a5ef336409e4af38cd5a /python | |
| parent | cadb67eb27e00abb493793363dcf37f4d2f563dc (diff) | |
| download | qpid-python-331b0e84ae06da0c057a82c0f5b67f550bcd636b.tar.gz | |
Initial implementation (plus very simple tests) of message.acquire, message.release, message.reject and message.flush.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@572394 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
| -rw-r--r-- | python/cpp_failing_0-10.txt | 1 | ||||
| -rw-r--r-- | python/qpid/client.py | 3 | ||||
| -rw-r--r-- | python/qpid/peer.py | 1 | ||||
| -rw-r--r-- | python/tests_0-10/message.py | 78 |
4 files changed, 75 insertions, 8 deletions
diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt index 97cf420717..5b2fb593e1 100644 --- a/python/cpp_failing_0-10.txt +++ b/python/cpp_failing_0-10.txt @@ -1,4 +1,3 @@ tests_0-10.alternate-exchange.AlternateExchangeTests.test_immediate -tests_0-10.message.MessageTests.test_reject tests_0-10.basic.BasicTests.test_get diff --git a/python/qpid/client.py b/python/qpid/client.py index edcd1b8ad2..5734873c6f 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -126,6 +126,9 @@ class ClientDelegate(Delegate): def message_append(self, ch, msg): ch.references.get(msg.reference).append(msg.bytes) + def message_acquired(self, ch, msg): + ch.control_queue.put(msg) + def basic_deliver(self, ch, msg): self.client.queue(msg.consumer_tag).put(msg) diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 6ad5482f09..b9dd4e466a 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -190,6 +190,7 @@ class Channel: self.completion = OutgoingCompletion() self.incoming_completion = IncomingCompletion(self) self.futures = {} + self.control_queue = Queue(0)#used for incoming methods that appas may want to handle themselves # Use reliable framing if version == 0-9. if spec.major == 0 and spec.minor == 9: diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index f08f437a65..8089709314 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -339,20 +339,19 @@ class MessageTests(TestBase): msg = queue.get(timeout=1) self.assertEqual(large, msg.content.body) - - def test_reject(self): channel = self.channel - channel.queue_declare(queue = "q", exclusive=True) + channel.queue_declare(queue = "q", exclusive=True, alternate_exchange="amq.fanout") + channel.queue_declare(queue = "r", exclusive=True) + channel.queue_bind(queue = "r", exchange = "amq.fanout") - channel.message_subscribe(queue = "q", destination = "consumer") + channel.message_subscribe(queue = "q", destination = "consumer", confirm_mode = 1) channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah")) msg = self.client.queue("consumer").get(timeout = 1) self.assertEquals(msg.content.body, "blah, blah") - channel.message_cancel(destination = "consumer") - msg.reject() + channel.message_reject([msg.command_id, msg.command_id]) - channel.message_subscribe(queue = "q", destination = "checker") + channel.message_subscribe(queue = "r", destination = "checker") msg = self.client.queue("checker").get(timeout = 1) self.assertEquals(msg.content.body, "blah, blah") @@ -493,5 +492,70 @@ class MessageTests(TestBase): self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) + def test_subscribe_not_acquired(self): + """ + Test the not-acquired modes works as expected for a simple case + """ + #NOTE: I'm using not-acquired == 1 and pre-acquired == 0 as + #that keeps the default behaviour as expected. This was + #discussed by the SIG, and I'd rather not change all the + #existing tests twice. + + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + for i in range(1, 6): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) + + channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) + channel.message_subscribe(queue = "q", destination = "b", acquire_mode = 1) + + for i in range(6, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) + + #both subscribers should see all messages + qA = self.client.queue("a") + qB = self.client.queue("b") + for i in range(1, 11): + for q in [qA, qB]: + msg = q.get(timeout = 1) + self.assertEquals("Message %s" % i, msg.content.body) + msg.complete() + + #messages should still be on the queue: + self.assertEquals(10, channel.queue_query(queue = "q").message_count) + + def test_acquire(self): + """ + Test explicit acquire function + """ + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me")) + + channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1) + msg = self.client.queue("a").get(timeout = 1) + channel.message_acquire([msg.command_id, msg.command_id]) + msg.complete() + + #message should have been removed from the queue: + self.assertEquals(0, channel.queue_query(queue = "q").message_count) + + def test_release(self): + """ + Test explicit release function + """ + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me")) + + channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1) + msg = self.client.queue("a").get(timeout = 1) + channel.message_cancel(destination = "a") + channel.message_release([msg.command_id, msg.command_id]) + msg.complete() + + #message should not have been removed from the queue: + self.assertEquals(1, channel.queue_query(queue = "q").message_count) + def assertDataEquals(self, channel, msg, expected): self.assertEquals(expected, msg.content.body) |
