diff options
| author | Gordon Sim <gsim@apache.org> | 2008-03-11 21:56:49 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-03-11 21:56:49 +0000 |
| commit | 2250ec787ae5fb84fbebfee35bd9925ebd1dd679 (patch) | |
| tree | db1768a5f613ca2a70187c52b96896ca1ccdcfdc /python | |
| parent | ae650ea316f73b40db94a02556fd2cb335a7c816 (diff) | |
| download | qpid-python-2250ec787ae5fb84fbebfee35bd9925ebd1dd679.tar.gz | |
Enabled tx methods on final 0-10 path and converted tests accordingly
Added read/write- uuid to codec010
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@636121 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
| -rw-r--r-- | python/cpp_failing_0-10.txt | 4 | ||||
| -rw-r--r-- | python/qpid/codec010.py | 6 | ||||
| -rw-r--r-- | python/qpid/testlib.py | 6 | ||||
| -rw-r--r-- | python/tests_0-10/message.py | 32 | ||||
| -rw-r--r-- | python/tests_0-10/tx.py | 218 |
5 files changed, 143 insertions, 123 deletions
diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt index be61fed0a2..8ae41b1d87 100644 --- a/python/cpp_failing_0-10.txt +++ b/python/cpp_failing_0-10.txt @@ -1,9 +1,6 @@ tests.codec.FieldTableTestCase.test_field_table_decode tests.codec.FieldTableTestCase.test_field_table_multiple_name_value_pair tests.codec.FieldTableTestCase.test_field_table_name_value_pair -tests_0-10.tx.TxTests.test_auto_rollback -tests_0-10.tx.TxTests.test_commit -tests_0-10.tx.TxTests.test_rollback tests_0-10.execution.ExecutionTests.test_flush tests_0-10.dtx.DtxTests.test_bad_resume tests_0-10.dtx.DtxTests.test_end @@ -26,7 +23,6 @@ tests_0-10.dtx.DtxTests.test_start_join tests_0-10.dtx.DtxTests.test_start_join_and_resume tests_0-10.dtx.DtxTests.test_suspend_resume tests_0-10.dtx.DtxTests.test_suspend_start_end_resume -tests_0-10.message.MessageTests.test_consume_exclusive tests_0-10.message.MessageTests.test_consume_no_local tests_0-10.message.MessageTests.test_consume_no_local_awkward tests_0-10.message.MessageTests.test_no_size diff --git a/python/qpid/codec010.py b/python/qpid/codec010.py index 60adbdc2e4..0e4244fb75 100644 --- a/python/qpid/codec010.py +++ b/python/qpid/codec010.py @@ -221,6 +221,12 @@ class Codec(Packer): attr = "write_uint%d" % (width*8) getattr(self, attr)(n) + def write_uuid(self, s): + self.pack("16s", s) + + def read_uuid(self): + return self.unpack("16s") + class StringCodec(Codec): diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index 7e5a2a6b66..f633c4e77d 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -352,6 +352,12 @@ class TestBase010(unittest.TestCase): self.conn.start(timeout=10) self.session = self.conn.session("test-session", timeout=10) + def connect(self): + spec = testrunner.spec + conn = Connection(connect(testrunner.host, testrunner.port), spec) + conn.start(timeout=10) + return conn + def tearDown(self): if not self.session.error(): self.session.close(timeout=10) self.conn.close(timeout=10) diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index d176962b8b..df9fde1593 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -96,31 +96,29 @@ class MessageTests(TestBase010): def test_consume_exclusive(self): """ - Test that the exclusive flag is honoured in the consume method + Test an exclusive consumer prevents other consumer being created """ session = self.session - #setup, declare a queue: session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) - - #check that an exclusive consumer prevents other consumer being created: - self.subscribe(destination="first", queue="test-queue-2", exclusive=True) + session.message_subscribe(destination="first", queue="test-queue-2", exclusive=True) try: - self.subscribe(destination="second", queue="test-queue-2") + session.message_subscribe(destination="second", queue="test-queue-2") self.fail("Expected consume request to fail due to previous exclusive consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - #open new session and cleanup last consumer: - session = self.client.session(2) - session.session_open() + except SessionException, e: + self.assertEquals(403, e.args[0].error_code) - #check that an exclusive consumer cannot be created if a consumer already exists: - self.subscribe(session, destination="first", queue="test-queue-2") + def test_consume_exclusive2(self): + """ + Check that an exclusive consumer cannot be created if a consumer already exists: + """ + session = self.session + session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) + session.message_subscribe(destination="first", queue="test-queue-2") try: - self.subscribe(destination="second", queue="test-queue-2", exclusive=True) + session.message_subscribe(destination="second", queue="test-queue-2", exclusive=True) self.fail("Expected exclusive consume request to fail due to previous consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) + except SessionException, e: + self.assertEquals(403, e.args[0].error_code) def test_consume_queue_not_found(self): """ diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py index 3fd1065af3..59298bad1b 100644 --- a/python/tests_0-10/tx.py +++ b/python/tests_0-10/tx.py @@ -18,10 +18,10 @@ # from qpid.client import Client, Closed from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.datatypes import Message, RangedSet +from qpid.testlib import testrunner, TestBase010 -class TxTests(TestBase): +class TxTests(TestBase010): """ Tests for 'methods' on the amqp tx 'class' """ @@ -30,202 +30,216 @@ class TxTests(TestBase): """ Test that commited publishes are delivered and commited acks are not re-delivered """ - channel2 = self.client.channel(2) - channel2.session_open() - self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c") - channel2.tx_commit() - channel2.session_close() + session2 = self.conn.session("worker", 2) + self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", "tx-commit-c") + session2.tx_commit() + session2.close() - #use a different channel with new subscriptions to ensure + #use a different session with new subscriptions to ensure #there is no redelivery of acked messages: - channel = self.channel - channel.tx_select() + session = self.session + session.tx_select() - self.subscribe(channel, queue="tx-commit-a", destination="qa", confirm_mode=1) - queue_a = self.client.queue("qa") + self.subscribe(session, queue="tx-commit-a", destination="qa") + queue_a = session.incoming("qa") - self.subscribe(channel, queue="tx-commit-b", destination="qb", confirm_mode=1) - queue_b = self.client.queue("qb") + self.subscribe(session, queue="tx-commit-b", destination="qb") + queue_b = session.incoming("qb") - self.subscribe(channel, queue="tx-commit-c", destination="qc", confirm_mode=1) - queue_c = self.client.queue("qc") + self.subscribe(session, queue="tx-commit-c", destination="qc") + queue_c = session.incoming("qc") #check results for i in range(1, 5): msg = queue_c.get(timeout=1) - self.assertEqual("TxMessage %d" % i, msg.content.body) - msg.complete() + self.assertEqual("TxMessage %d" % i, msg.body) + session.message_accept(RangedSet(msg.id)) msg = queue_b.get(timeout=1) - self.assertEqual("TxMessage 6", msg.content.body) - msg.complete() + self.assertEqual("TxMessage 6", msg.body) + session.message_accept(RangedSet(msg.id)) msg = queue_a.get(timeout=1) - self.assertEqual("TxMessage 7", msg.content.body) - msg.complete() + self.assertEqual("TxMessage 7", msg.body) + session.message_accept(RangedSet(msg.id)) for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None #cleanup - channel.tx_commit() + session.tx_commit() def test_auto_rollback(self): """ - Test that a channel closed with an open transaction is effectively rolled back + Test that a session closed with an open transaction is effectively rolled back """ - channel2 = self.client.channel(2) - channel2.session_open() - queue_a, queue_b, queue_c = self.perform_txn_work(channel2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") + session2 = self.conn.session("worker", 2) + queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None - channel2.session_close() - channel = self.channel - channel.tx_select() + session2.close() - self.subscribe(channel, queue="tx-autorollback-a", destination="qa", confirm_mode=1) - queue_a = self.client.queue("qa") + session = self.session + session.tx_select() - self.subscribe(channel, queue="tx-autorollback-b", destination="qb", confirm_mode=1) - queue_b = self.client.queue("qb") + self.subscribe(session, queue="tx-autorollback-a", destination="qa") + queue_a = session.incoming("qa") - self.subscribe(channel, queue="tx-autorollback-c", destination="qc", confirm_mode=1) - queue_c = self.client.queue("qc") + self.subscribe(session, queue="tx-autorollback-b", destination="qb") + queue_b = session.incoming("qb") + + self.subscribe(session, queue="tx-autorollback-c", destination="qc") + queue_c = session.incoming("qc") #check results for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - msg.complete() + self.assertEqual("Message %d" % i, msg.body) + session.message_accept(RangedSet(msg.id)) msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) - msg.complete() + self.assertEqual("Message 6", msg.body) + session.message_accept(RangedSet(msg.id)) msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) - msg.complete() + self.assertEqual("Message 7", msg.body) + session.message_accept(RangedSet(msg.id)) for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None #cleanup - channel.tx_commit() + session.tx_commit() def test_rollback(self): """ Test that rolled back publishes are not delivered and rolled back acks are re-delivered """ - channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c") + session = self.session + queue_a, queue_b, queue_c, consumed = self.perform_txn_work(session, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c") for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None - #stop subscriptions (ensures no delivery occurs during rollback as messages are requeued) - for d in ["sub_a", "sub_b", "sub_c"]: - channel.message_stop(destination=d) - - channel.tx_rollback() + session.tx_rollback() - #restart susbcriptions - for d in ["sub_a", "sub_b", "sub_c"]: - channel.message_flow(destination=d, unit=0, value=0xFFFFFFFF) - channel.message_flow(destination=d, unit=1, value=0xFFFFFFFF) + #need to release messages to get them redelivered now: + session.message_release(consumed) #check results for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - msg.complete() + self.assertEqual("Message %d" % i, msg.body) + session.message_accept(RangedSet(msg.id)) msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) - msg.complete() + self.assertEqual("Message 6", msg.body) + session.message_accept(RangedSet(msg.id)) msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) - msg.complete() + self.assertEqual("Message 7", msg.body) + session.message_accept(RangedSet(msg.id)) for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None #cleanup - channel.tx_commit() + session.tx_commit() - def perform_txn_work(self, channel, name_a, name_b, name_c): + def perform_txn_work(self, session, name_a, name_b, name_c): """ Utility method that does some setup and some work under a transaction. Used for testing both commit and rollback """ #setup: - channel.queue_declare(queue=name_a, exclusive=True, auto_delete=True) - channel.queue_declare(queue=name_b, exclusive=True, auto_delete=True) - channel.queue_declare(queue=name_c, exclusive=True, auto_delete=True) + session.queue_declare(queue=name_a, exclusive=True, auto_delete=True) + session.queue_declare(queue=name_b, exclusive=True, auto_delete=True) + session.queue_declare(queue=name_c, exclusive=True, auto_delete=True) key = "my_key_" + name_b topic = "my_topic_" + name_c - channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key) - channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) + session.exchange_bind(queue=name_b, exchange="amq.direct", binding_key=key) + session.exchange_bind(queue=name_c, exchange="amq.topic", binding_key=topic) + dp = session.delivery_properties(routing_key=name_a) for i in range(1, 5): - channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"msg%d" % i}, body="Message %d" % i)) + mp = session.message_properties(message_id="msg%d" % i) + session.message_transfer(message=Message(dp, mp, "Message %d" % i)) + + dp = session.delivery_properties(routing_key=key) + mp = session.message_properties(message_id="msg6") + session.message_transfer(destination="amq.direct", message=Message(dp, mp, "Message 6")) - channel.message_transfer(destination="amq.direct", - content=Content(properties={'routing_key':key, 'message_id':"msg6"}, body="Message 6")) - channel.message_transfer(destination="amq.topic", - content=Content(properties={'routing_key':topic, 'message_id':"msg7"}, body="Message 7")) + dp = session.delivery_properties(routing_key=topic) + mp = session.message_properties(message_id="msg7") + session.message_transfer(destination="amq.topic", message=Message(dp, mp, "Message 7")) - channel.tx_select() + session.tx_select() #consume and ack messages - self.subscribe(channel, queue=name_a, destination="sub_a", confirm_mode=1) - queue_a = self.client.queue("sub_a") + acked = RangedSet() + self.subscribe(session, queue=name_a, destination="sub_a") + queue_a = session.incoming("sub_a") for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) + acked.add(msg.id) + self.assertEqual("Message %d" % i, msg.body) - msg.complete() - - self.subscribe(channel, queue=name_b, destination="sub_b", confirm_mode=1) - queue_b = self.client.queue("sub_b") + self.subscribe(session, queue=name_b, destination="sub_b") + queue_b = session.incoming("sub_b") msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) - msg.complete() + self.assertEqual("Message 6", msg.body) + acked.add(msg.id) - sub_c = self.subscribe(channel, queue=name_c, destination="sub_c", confirm_mode=1) - queue_c = self.client.queue("sub_c") + sub_c = self.subscribe(session, queue=name_c, destination="sub_c") + queue_c = session.incoming("sub_c") msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) - msg.complete() + self.assertEqual("Message 7", msg.body) + acked.add(msg.id) + + session.message_accept(acked) + dp = session.delivery_properties(routing_key=topic) #publish messages for i in range(1, 5): - channel.message_transfer(destination="amq.topic", - content=Content(properties={'routing_key':topic, 'message_id':"tx-msg%d" % i}, - body="TxMessage %d" % i)) - - channel.message_transfer(destination="amq.direct", - content=Content(properties={'routing_key':key, 'message_id':"tx-msg6"}, - body="TxMessage 6")) - channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"tx-msg7"}, - body="TxMessage 7")) - return queue_a, queue_b, queue_c + mp = session.message_properties(message_id="tx-msg%d" % i) + session.message_transfer(destination="amq.topic", message=Message(dp, mp, "TxMessage %d" % i)) + + dp = session.delivery_properties(routing_key=key) + mp = session.message_properties(message_id="tx-msg6") + session.message_transfer(destination="amq.direct", message=Message(dp, mp, "TxMessage 6")) + + dp = session.delivery_properties(routing_key=name_a) + mp = session.message_properties(message_id="tx-msg7") + session.message_transfer(message=Message(dp, mp, "TxMessage 7")) + return queue_a, queue_b, queue_c, acked + + def subscribe(self, session=None, **keys): + session = session or self.session + consumer_tag = keys["destination"] + session.message_subscribe(**keys) + session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) + session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) + + def complete(self, session, msg): + session.receiver._completed.add(msg.id)#TODO: this may be done automatically + session.channel.session_completed(session.receiver._completed) + |
