diff options
| author | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
| commit | 80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (patch) | |
| tree | 13677bf773bf25db03144aa72c97a49d2810240d /python/tests_0-10 | |
| parent | a9232d5a02a19f093f212cb0b76772a20b45cb1b (diff) | |
| download | qpid-python-80406d0fb680239a0141b81fb0b9f20d20c9b1e1.tar.gz | |
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses.
Some refactoring around message delivery.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/tests_0-10')
| -rw-r--r-- | python/tests_0-10/broker.py | 2 | ||||
| -rw-r--r-- | python/tests_0-10/dtx.py | 26 | ||||
| -rw-r--r-- | python/tests_0-10/example.py | 2 | ||||
| -rw-r--r-- | python/tests_0-10/message.py | 24 | ||||
| -rw-r--r-- | python/tests_0-10/tx.py | 56 |
5 files changed, 71 insertions, 39 deletions
diff --git a/python/tests_0-10/broker.py b/python/tests_0-10/broker.py index 684b36597e..6bc2f7ceb8 100644 --- a/python/tests_0-10/broker.py +++ b/python/tests_0-10/broker.py @@ -48,7 +48,7 @@ class BrokerTests(TestBase): body = "test ack" ch.message_transfer(routing_key = "otherqueue", body = body) msg = self.client.queue(ctag).get(timeout = 5) - msg.ok() + msg.complete() self.assert_(msg.body == body) def test_simple_delivery_immediate(self): diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py index c0d1bd2b74..2835d703ae 100644 --- a/python/tests_0-10/dtx.py +++ b/python/tests_0-10/dtx.py @@ -40,6 +40,11 @@ class DtxTests(TestBase): XA_RBROLLBACK = 1 XA_RBTIMEOUT = 2 XA_OK = 8 + tx_counter = 0 + + def reset_channel(self): + self.channel.channel_close() + self.channel.channel_open() def test_simple_commit(self): """ @@ -56,6 +61,9 @@ class DtxTests(TestBase): #commit self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status) + #should close and reopen channel to ensure no unacked messages are held + self.reset_channel() + #check result self.assertMessageCount(0, "queue-a") self.assertMessageCount(1, "queue-b") @@ -79,6 +87,8 @@ class DtxTests(TestBase): #commit self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status) + self.reset_channel() + #check result self.assertMessageCount(0, "queue-a") self.assertMessageCount(1, "queue-b") @@ -100,6 +110,8 @@ class DtxTests(TestBase): #rollback self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + self.reset_channel() + #check result self.assertMessageCount(1, "queue-a") self.assertMessageCount(0, "queue-b") @@ -123,6 +135,8 @@ class DtxTests(TestBase): #rollback self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + self.reset_channel() + #check result self.assertMessageCount(1, "queue-a") self.assertMessageCount(0, "queue-b") @@ -191,6 +205,8 @@ class DtxTests(TestBase): channel = self.channel #do some transactional work & complete the transaction self.test_simple_commit() + # channel has been reset, so reselect for use with dtx + channel.dtx_demarcation_select() #start association for the same xid as the previously completed txn tx = self.xid("my-xid") @@ -355,7 +371,7 @@ class DtxTests(TestBase): self.assertEqual("two", msg.message_id) channel.message_cancel(destination="results") #ack the message then close the channel - msg.ok() + msg.complete() channel.channel_close() channel = self.channel @@ -446,7 +462,7 @@ class DtxTests(TestBase): channel2.dtx_demarcation_select() channel2.dtx_demarcation_start(xid=tx) channel2.message_get(queue="dummy", destination="dummy") - self.client.queue("dummy").get(timeout=1).ok() + self.client.queue("dummy").get(timeout=1).complete() channel2.message_transfer(routing_key="dummy", body="whatever") channel2.channel_close() @@ -548,7 +564,9 @@ class DtxTests(TestBase): channel.dtx_coordination_rollback(xid=x) self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) - def xid(self, txid, branchqual = ''): + def xid(self, txid): + DtxTests.tx_counter += 1 + branchqual = "v%s" % DtxTests.tx_counter return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual def txswap(self, tx, id): @@ -573,7 +591,7 @@ class DtxTests(TestBase): #consume from src: channel.message_get(destination="temp-swap", queue=src) msg = self.client.queue("temp-swap").get(timeout=1) - msg.ok(); + msg.complete(); #re-publish to dest channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body) diff --git a/python/tests_0-10/example.py b/python/tests_0-10/example.py index 7ab4cc7d0a..dc71b0590b 100644 --- a/python/tests_0-10/example.py +++ b/python/tests_0-10/example.py @@ -90,5 +90,5 @@ class ExampleTest (TestBase): self.assertEqual(body, msg.body) # Now acknowledge the message. - msg.ok() + msg.complete() diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index b25016e680..74e2b6416f 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -171,8 +171,8 @@ class MessageTests(TestBase): self.assertEqual("Four", msg4.body) self.assertEqual("Five", msg5.body) - msg1.ok(batchoffset=1)#One and Two - msg4.ok() + msg2.complete(cumulative=True)#One and Two + msg4.complete(cumulative=False) channel.message_recover(requeue=False) @@ -215,8 +215,8 @@ class MessageTests(TestBase): self.assertEqual("Four", msg4.body) self.assertEqual("Five", msg5.body) - msg1.ok(batchoffset=1) #One and Two - msg4.ok() #Four + msg2.complete(cumulative=True) #One and Two + msg4.complete(cumulative=False) #Four channel.message_cancel(destination="consumer_tag") @@ -276,14 +276,13 @@ class MessageTests(TestBase): except Empty: None #ack messages and check that the next set arrive ok: - #todo: once batching is implmented, send a single response for all messages - msg.ok(batchoffset=-4)#1-5 + msg.complete() for i in range(6, 11): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok(batchoffset=-4)#6-10 + msg.complete() try: extra = queue.get(timeout=1) @@ -320,13 +319,13 @@ class MessageTests(TestBase): except Empty: None #ack messages and check that the next set arrive ok: - msg.ok(batchoffset=-4)#1-5 + msg.complete() for i in range(6, 11): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok(batchoffset=-4)#6-10 + msg.complete() try: extra = queue.get(timeout=1) @@ -376,9 +375,9 @@ class MessageTests(TestBase): self.assertEqual("Message %d" % i, msg.body) if (i==13): - msg.ok(batchoffset=-2)#11, 12 & 13 + msg.complete()#11, 12 & 13 if(i in [15, 17, 19]): - msg.ok() + msg.complete(cumulative=False) reply = channel.message_get(no_ack=True, queue="test-get") self.assertEqual(reply.method.klass.name, "message") @@ -395,8 +394,7 @@ class MessageTests(TestBase): self.assertEqual(reply.method.name, "ok") msg = self.client.queue(tag).get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok() - #channel.message_ack(delivery_tag=reply.delivery_tag) + msg.complete() reply = channel.message_get(no_ack=True, queue="test-get") self.assertEqual(reply.method.klass.name, "message") diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py index 0f6b4f5cd1..b499c2d1f9 100644 --- a/python/tests_0-10/tx.py +++ b/python/tests_0-10/tx.py @@ -30,23 +30,39 @@ class TxTests(TestBase): """ Test that commited publishes are delivered and commited acks are not re-delivered """ + channel2 = self.client.channel(2) + channel2.channel_open() + self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c") + channel2.tx_commit() + channel2.channel_close() + + #use a different channel with new subscriptions to ensure + #there is no redelivery of acked messages: channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c") - channel.tx_commit() + channel.tx_select() + + channel.message_consume(queue="tx-commit-a", destination="qa", no_ack=False) + queue_a = self.client.queue("qa") + + channel.message_consume(queue="tx-commit-b", destination="qb", no_ack=False) + queue_b = self.client.queue("qb") + + channel.message_consume(queue="tx-commit-c", destination="qc", no_ack=False) + queue_c = self.client.queue("qc") #check results for i in range(1, 5): msg = queue_c.get(timeout=1) self.assertEqual("TxMessage %d" % i, msg.body) - msg.ok() + msg.complete() msg = queue_b.get(timeout=1) self.assertEqual("TxMessage 6", msg.body) - msg.ok() + msg.complete() msg = queue_a.get(timeout=1) self.assertEqual("TxMessage 7", msg.body) - msg.ok() + msg.complete() for q in [queue_a, queue_b, queue_c]: try: @@ -76,15 +92,15 @@ class TxTests(TestBase): for i in range(1, 5): msg = queue_a.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok() + msg.complete() msg = queue_b.get(timeout=1) self.assertEqual("Message 6", msg.body) - msg.ok() + msg.complete() msg = queue_c.get(timeout=1) self.assertEqual("Message 7", msg.body) - msg.ok() + msg.complete() for q in [queue_a, queue_b, queue_c]: try: @@ -114,15 +130,15 @@ class TxTests(TestBase): for i in range(1, 5): msg = queue_a.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok() + msg.complete() msg = queue_b.get(timeout=1) self.assertEqual("Message 6", msg.body) - msg.ok() + msg.complete() msg = queue_c.get(timeout=1) self.assertEqual("Message 7", msg.body) - msg.ok() + msg.complete() for q in [queue_a, queue_b, queue_c]: try: @@ -150,10 +166,10 @@ class TxTests(TestBase): channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) for i in range(1, 5): - channel.message_transfer(routing_key=name_a, body="Message %d" % i) + channel.message_transfer(routing_key=name_a, message_id="msg%d" % i, body="Message %d" % i) - channel.message_transfer(routing_key=key, destination="amq.direct", body="Message 6") - channel.message_transfer(routing_key=topic, destination="amq.topic", body="Message 7") + channel.message_transfer(routing_key=key, destination="amq.direct", message_id="msg6", body="Message 6") + channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="msg7", body="Message 7") channel.tx_select() @@ -164,25 +180,25 @@ class TxTests(TestBase): msg = queue_a.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok(batchoffset=-3) + msg.complete() channel.message_consume(queue=name_b, destination="sub_b", no_ack=False) queue_b = self.client.queue("sub_b") msg = queue_b.get(timeout=1) self.assertEqual("Message 6", msg.body) - msg.ok() + msg.complete() sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False) queue_c = self.client.queue("sub_c") msg = queue_c.get(timeout=1) self.assertEqual("Message 7", msg.body) - msg.ok() + msg.complete() #publish messages for i in range(1, 5): - channel.message_transfer(routing_key=topic, destination="amq.topic", body="TxMessage %d" % i) + channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="tx-msg%d" % i, body="TxMessage %d" % i) - channel.message_transfer(routing_key=key, destination="amq.direct", body="TxMessage 6") - channel.message_transfer(routing_key=name_a, body="TxMessage 7") + channel.message_transfer(routing_key=key, destination="amq.direct", message_id="tx-msg6", body="TxMessage 6") + channel.message_transfer(routing_key=name_a, message_id="tx-msg7", body="TxMessage 7") return queue_a, queue_b, queue_c |
