summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-11 21:56:49 +0000
committerGordon Sim <gsim@apache.org>2008-03-11 21:56:49 +0000
commit2250ec787ae5fb84fbebfee35bd9925ebd1dd679 (patch)
treedb1768a5f613ca2a70187c52b96896ca1ccdcfdc /python
parentae650ea316f73b40db94a02556fd2cb335a7c816 (diff)
downloadqpid-python-2250ec787ae5fb84fbebfee35bd9925ebd1dd679.tar.gz
Enabled tx methods on final 0-10 path and converted tests accordingly
Added read/write- uuid to codec010 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@636121 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/cpp_failing_0-10.txt4
-rw-r--r--python/qpid/codec010.py6
-rw-r--r--python/qpid/testlib.py6
-rw-r--r--python/tests_0-10/message.py32
-rw-r--r--python/tests_0-10/tx.py218
5 files changed, 143 insertions, 123 deletions
diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt
index be61fed0a2..8ae41b1d87 100644
--- a/python/cpp_failing_0-10.txt
+++ b/python/cpp_failing_0-10.txt
@@ -1,9 +1,6 @@
tests.codec.FieldTableTestCase.test_field_table_decode
tests.codec.FieldTableTestCase.test_field_table_multiple_name_value_pair
tests.codec.FieldTableTestCase.test_field_table_name_value_pair
-tests_0-10.tx.TxTests.test_auto_rollback
-tests_0-10.tx.TxTests.test_commit
-tests_0-10.tx.TxTests.test_rollback
tests_0-10.execution.ExecutionTests.test_flush
tests_0-10.dtx.DtxTests.test_bad_resume
tests_0-10.dtx.DtxTests.test_end
@@ -26,7 +23,6 @@ tests_0-10.dtx.DtxTests.test_start_join
tests_0-10.dtx.DtxTests.test_start_join_and_resume
tests_0-10.dtx.DtxTests.test_suspend_resume
tests_0-10.dtx.DtxTests.test_suspend_start_end_resume
-tests_0-10.message.MessageTests.test_consume_exclusive
tests_0-10.message.MessageTests.test_consume_no_local
tests_0-10.message.MessageTests.test_consume_no_local_awkward
tests_0-10.message.MessageTests.test_no_size
diff --git a/python/qpid/codec010.py b/python/qpid/codec010.py
index 60adbdc2e4..0e4244fb75 100644
--- a/python/qpid/codec010.py
+++ b/python/qpid/codec010.py
@@ -221,6 +221,12 @@ class Codec(Packer):
attr = "write_uint%d" % (width*8)
getattr(self, attr)(n)
+ def write_uuid(self, s):
+ self.pack("16s", s)
+
+ def read_uuid(self):
+ return self.unpack("16s")
+
class StringCodec(Codec):
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index 7e5a2a6b66..f633c4e77d 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -352,6 +352,12 @@ class TestBase010(unittest.TestCase):
self.conn.start(timeout=10)
self.session = self.conn.session("test-session", timeout=10)
+ def connect(self):
+ spec = testrunner.spec
+ conn = Connection(connect(testrunner.host, testrunner.port), spec)
+ conn.start(timeout=10)
+ return conn
+
def tearDown(self):
if not self.session.error(): self.session.close(timeout=10)
self.conn.close(timeout=10)
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index d176962b8b..df9fde1593 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -96,31 +96,29 @@ class MessageTests(TestBase010):
def test_consume_exclusive(self):
"""
- Test that the exclusive flag is honoured in the consume method
+ Test an exclusive consumer prevents other consumer being created
"""
session = self.session
- #setup, declare a queue:
session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
-
- #check that an exclusive consumer prevents other consumer being created:
- self.subscribe(destination="first", queue="test-queue-2", exclusive=True)
+ session.message_subscribe(destination="first", queue="test-queue-2", exclusive=True)
try:
- self.subscribe(destination="second", queue="test-queue-2")
+ session.message_subscribe(destination="second", queue="test-queue-2")
self.fail("Expected consume request to fail due to previous exclusive consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
-
- #open new session and cleanup last consumer:
- session = self.client.session(2)
- session.session_open()
+ except SessionException, e:
+ self.assertEquals(403, e.args[0].error_code)
- #check that an exclusive consumer cannot be created if a consumer already exists:
- self.subscribe(session, destination="first", queue="test-queue-2")
+ def test_consume_exclusive2(self):
+ """
+ Check that an exclusive consumer cannot be created if a consumer already exists:
+ """
+ session = self.session
+ session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
+ session.message_subscribe(destination="first", queue="test-queue-2")
try:
- self.subscribe(destination="second", queue="test-queue-2", exclusive=True)
+ session.message_subscribe(destination="second", queue="test-queue-2", exclusive=True)
self.fail("Expected exclusive consume request to fail due to previous consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
+ except SessionException, e:
+ self.assertEquals(403, e.args[0].error_code)
def test_consume_queue_not_found(self):
"""
diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py
index 3fd1065af3..59298bad1b 100644
--- a/python/tests_0-10/tx.py
+++ b/python/tests_0-10/tx.py
@@ -18,10 +18,10 @@
#
from qpid.client import Client, Closed
from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import testrunner, TestBase010
-class TxTests(TestBase):
+class TxTests(TestBase010):
"""
Tests for 'methods' on the amqp tx 'class'
"""
@@ -30,202 +30,216 @@ class TxTests(TestBase):
"""
Test that commited publishes are delivered and commited acks are not re-delivered
"""
- channel2 = self.client.channel(2)
- channel2.session_open()
- self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
- channel2.tx_commit()
- channel2.session_close()
+ session2 = self.conn.session("worker", 2)
+ self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
+ session2.tx_commit()
+ session2.close()
- #use a different channel with new subscriptions to ensure
+ #use a different session with new subscriptions to ensure
#there is no redelivery of acked messages:
- channel = self.channel
- channel.tx_select()
+ session = self.session
+ session.tx_select()
- self.subscribe(channel, queue="tx-commit-a", destination="qa", confirm_mode=1)
- queue_a = self.client.queue("qa")
+ self.subscribe(session, queue="tx-commit-a", destination="qa")
+ queue_a = session.incoming("qa")
- self.subscribe(channel, queue="tx-commit-b", destination="qb", confirm_mode=1)
- queue_b = self.client.queue("qb")
+ self.subscribe(session, queue="tx-commit-b", destination="qb")
+ queue_b = session.incoming("qb")
- self.subscribe(channel, queue="tx-commit-c", destination="qc", confirm_mode=1)
- queue_c = self.client.queue("qc")
+ self.subscribe(session, queue="tx-commit-c", destination="qc")
+ queue_c = session.incoming("qc")
#check results
for i in range(1, 5):
msg = queue_c.get(timeout=1)
- self.assertEqual("TxMessage %d" % i, msg.content.body)
- msg.complete()
+ self.assertEqual("TxMessage %d" % i, msg.body)
+ session.message_accept(RangedSet(msg.id))
msg = queue_b.get(timeout=1)
- self.assertEqual("TxMessage 6", msg.content.body)
- msg.complete()
+ self.assertEqual("TxMessage 6", msg.body)
+ session.message_accept(RangedSet(msg.id))
msg = queue_a.get(timeout=1)
- self.assertEqual("TxMessage 7", msg.content.body)
- msg.complete()
+ self.assertEqual("TxMessage 7", msg.body)
+ session.message_accept(RangedSet(msg.id))
for q in [queue_a, queue_b, queue_c]:
try:
extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
+ self.fail("Got unexpected message: " + extra.body)
except Empty: None
#cleanup
- channel.tx_commit()
+ session.tx_commit()
def test_auto_rollback(self):
"""
- Test that a channel closed with an open transaction is effectively rolled back
+ Test that a session closed with an open transaction is effectively rolled back
"""
- channel2 = self.client.channel(2)
- channel2.session_open()
- queue_a, queue_b, queue_c = self.perform_txn_work(channel2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
+ session2 = self.conn.session("worker", 2)
+ queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
for q in [queue_a, queue_b, queue_c]:
try:
extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
+ self.fail("Got unexpected message: " + extra.body)
except Empty: None
- channel2.session_close()
- channel = self.channel
- channel.tx_select()
+ session2.close()
- self.subscribe(channel, queue="tx-autorollback-a", destination="qa", confirm_mode=1)
- queue_a = self.client.queue("qa")
+ session = self.session
+ session.tx_select()
- self.subscribe(channel, queue="tx-autorollback-b", destination="qb", confirm_mode=1)
- queue_b = self.client.queue("qb")
+ self.subscribe(session, queue="tx-autorollback-a", destination="qa")
+ queue_a = session.incoming("qa")
- self.subscribe(channel, queue="tx-autorollback-c", destination="qc", confirm_mode=1)
- queue_c = self.client.queue("qc")
+ self.subscribe(session, queue="tx-autorollback-b", destination="qb")
+ queue_b = session.incoming("qb")
+
+ self.subscribe(session, queue="tx-autorollback-c", destination="qc")
+ queue_c = session.incoming("qc")
#check results
for i in range(1, 5):
msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
- msg.complete()
+ self.assertEqual("Message %d" % i, msg.body)
+ session.message_accept(RangedSet(msg.id))
msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.content.body)
- msg.complete()
+ self.assertEqual("Message 6", msg.body)
+ session.message_accept(RangedSet(msg.id))
msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.content.body)
- msg.complete()
+ self.assertEqual("Message 7", msg.body)
+ session.message_accept(RangedSet(msg.id))
for q in [queue_a, queue_b, queue_c]:
try:
extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
+ self.fail("Got unexpected message: " + extra.body)
except Empty: None
#cleanup
- channel.tx_commit()
+ session.tx_commit()
def test_rollback(self):
"""
Test that rolled back publishes are not delivered and rolled back acks are re-delivered
"""
- channel = self.channel
- queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
+ session = self.session
+ queue_a, queue_b, queue_c, consumed = self.perform_txn_work(session, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
for q in [queue_a, queue_b, queue_c]:
try:
extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
+ self.fail("Got unexpected message: " + extra.body)
except Empty: None
- #stop subscriptions (ensures no delivery occurs during rollback as messages are requeued)
- for d in ["sub_a", "sub_b", "sub_c"]:
- channel.message_stop(destination=d)
-
- channel.tx_rollback()
+ session.tx_rollback()
- #restart susbcriptions
- for d in ["sub_a", "sub_b", "sub_c"]:
- channel.message_flow(destination=d, unit=0, value=0xFFFFFFFF)
- channel.message_flow(destination=d, unit=1, value=0xFFFFFFFF)
+ #need to release messages to get them redelivered now:
+ session.message_release(consumed)
#check results
for i in range(1, 5):
msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
- msg.complete()
+ self.assertEqual("Message %d" % i, msg.body)
+ session.message_accept(RangedSet(msg.id))
msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.content.body)
- msg.complete()
+ self.assertEqual("Message 6", msg.body)
+ session.message_accept(RangedSet(msg.id))
msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.content.body)
- msg.complete()
+ self.assertEqual("Message 7", msg.body)
+ session.message_accept(RangedSet(msg.id))
for q in [queue_a, queue_b, queue_c]:
try:
extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
+ self.fail("Got unexpected message: " + extra.body)
except Empty: None
#cleanup
- channel.tx_commit()
+ session.tx_commit()
- def perform_txn_work(self, channel, name_a, name_b, name_c):
+ def perform_txn_work(self, session, name_a, name_b, name_c):
"""
Utility method that does some setup and some work under a transaction. Used for testing both
commit and rollback
"""
#setup:
- channel.queue_declare(queue=name_a, exclusive=True, auto_delete=True)
- channel.queue_declare(queue=name_b, exclusive=True, auto_delete=True)
- channel.queue_declare(queue=name_c, exclusive=True, auto_delete=True)
+ session.queue_declare(queue=name_a, exclusive=True, auto_delete=True)
+ session.queue_declare(queue=name_b, exclusive=True, auto_delete=True)
+ session.queue_declare(queue=name_c, exclusive=True, auto_delete=True)
key = "my_key_" + name_b
topic = "my_topic_" + name_c
- channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key)
- channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic)
+ session.exchange_bind(queue=name_b, exchange="amq.direct", binding_key=key)
+ session.exchange_bind(queue=name_c, exchange="amq.topic", binding_key=topic)
+ dp = session.delivery_properties(routing_key=name_a)
for i in range(1, 5):
- channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"msg%d" % i}, body="Message %d" % i))
+ mp = session.message_properties(message_id="msg%d" % i)
+ session.message_transfer(message=Message(dp, mp, "Message %d" % i))
+
+ dp = session.delivery_properties(routing_key=key)
+ mp = session.message_properties(message_id="msg6")
+ session.message_transfer(destination="amq.direct", message=Message(dp, mp, "Message 6"))
- channel.message_transfer(destination="amq.direct",
- content=Content(properties={'routing_key':key, 'message_id':"msg6"}, body="Message 6"))
- channel.message_transfer(destination="amq.topic",
- content=Content(properties={'routing_key':topic, 'message_id':"msg7"}, body="Message 7"))
+ dp = session.delivery_properties(routing_key=topic)
+ mp = session.message_properties(message_id="msg7")
+ session.message_transfer(destination="amq.topic", message=Message(dp, mp, "Message 7"))
- channel.tx_select()
+ session.tx_select()
#consume and ack messages
- self.subscribe(channel, queue=name_a, destination="sub_a", confirm_mode=1)
- queue_a = self.client.queue("sub_a")
+ acked = RangedSet()
+ self.subscribe(session, queue=name_a, destination="sub_a")
+ queue_a = session.incoming("sub_a")
for i in range(1, 5):
msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
+ acked.add(msg.id)
+ self.assertEqual("Message %d" % i, msg.body)
- msg.complete()
-
- self.subscribe(channel, queue=name_b, destination="sub_b", confirm_mode=1)
- queue_b = self.client.queue("sub_b")
+ self.subscribe(session, queue=name_b, destination="sub_b")
+ queue_b = session.incoming("sub_b")
msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.content.body)
- msg.complete()
+ self.assertEqual("Message 6", msg.body)
+ acked.add(msg.id)
- sub_c = self.subscribe(channel, queue=name_c, destination="sub_c", confirm_mode=1)
- queue_c = self.client.queue("sub_c")
+ sub_c = self.subscribe(session, queue=name_c, destination="sub_c")
+ queue_c = session.incoming("sub_c")
msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.content.body)
- msg.complete()
+ self.assertEqual("Message 7", msg.body)
+ acked.add(msg.id)
+
+ session.message_accept(acked)
+ dp = session.delivery_properties(routing_key=topic)
#publish messages
for i in range(1, 5):
- channel.message_transfer(destination="amq.topic",
- content=Content(properties={'routing_key':topic, 'message_id':"tx-msg%d" % i},
- body="TxMessage %d" % i))
-
- channel.message_transfer(destination="amq.direct",
- content=Content(properties={'routing_key':key, 'message_id':"tx-msg6"},
- body="TxMessage 6"))
- channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"tx-msg7"},
- body="TxMessage 7"))
- return queue_a, queue_b, queue_c
+ mp = session.message_properties(message_id="tx-msg%d" % i)
+ session.message_transfer(destination="amq.topic", message=Message(dp, mp, "TxMessage %d" % i))
+
+ dp = session.delivery_properties(routing_key=key)
+ mp = session.message_properties(message_id="tx-msg6")
+ session.message_transfer(destination="amq.direct", message=Message(dp, mp, "TxMessage 6"))
+
+ dp = session.delivery_properties(routing_key=name_a)
+ mp = session.message_properties(message_id="tx-msg7")
+ session.message_transfer(message=Message(dp, mp, "TxMessage 7"))
+ return queue_a, queue_b, queue_c, acked
+
+ def subscribe(self, session=None, **keys):
+ session = session or self.session
+ consumer_tag = keys["destination"]
+ session.message_subscribe(**keys)
+ session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
+
+ def complete(self, session, msg):
+ session.receiver._completed.add(msg.id)#TODO: this may be done automatically
+ session.channel.session_completed(session.receiver._completed)
+