summaryrefslogtreecommitdiff
path: root/python/tests_0-10
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
committerGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
commit80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (patch)
tree13677bf773bf25db03144aa72c97a49d2810240d /python/tests_0-10
parenta9232d5a02a19f093f212cb0b76772a20b45cb1b (diff)
downloadqpid-python-80406d0fb680239a0141b81fb0b9f20d20c9b1e1.tar.gz
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses. Some refactoring around message delivery. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/tests_0-10')
-rw-r--r--python/tests_0-10/broker.py2
-rw-r--r--python/tests_0-10/dtx.py26
-rw-r--r--python/tests_0-10/example.py2
-rw-r--r--python/tests_0-10/message.py24
-rw-r--r--python/tests_0-10/tx.py56
5 files changed, 71 insertions, 39 deletions
diff --git a/python/tests_0-10/broker.py b/python/tests_0-10/broker.py
index 684b36597e..6bc2f7ceb8 100644
--- a/python/tests_0-10/broker.py
+++ b/python/tests_0-10/broker.py
@@ -48,7 +48,7 @@ class BrokerTests(TestBase):
body = "test ack"
ch.message_transfer(routing_key = "otherqueue", body = body)
msg = self.client.queue(ctag).get(timeout = 5)
- msg.ok()
+ msg.complete()
self.assert_(msg.body == body)
def test_simple_delivery_immediate(self):
diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py
index c0d1bd2b74..2835d703ae 100644
--- a/python/tests_0-10/dtx.py
+++ b/python/tests_0-10/dtx.py
@@ -40,6 +40,11 @@ class DtxTests(TestBase):
XA_RBROLLBACK = 1
XA_RBTIMEOUT = 2
XA_OK = 8
+ tx_counter = 0
+
+ def reset_channel(self):
+ self.channel.channel_close()
+ self.channel.channel_open()
def test_simple_commit(self):
"""
@@ -56,6 +61,9 @@ class DtxTests(TestBase):
#commit
self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status)
+ #should close and reopen channel to ensure no unacked messages are held
+ self.reset_channel()
+
#check result
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(1, "queue-b")
@@ -79,6 +87,8 @@ class DtxTests(TestBase):
#commit
self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status)
+ self.reset_channel()
+
#check result
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(1, "queue-b")
@@ -100,6 +110,8 @@ class DtxTests(TestBase):
#rollback
self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+ self.reset_channel()
+
#check result
self.assertMessageCount(1, "queue-a")
self.assertMessageCount(0, "queue-b")
@@ -123,6 +135,8 @@ class DtxTests(TestBase):
#rollback
self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+ self.reset_channel()
+
#check result
self.assertMessageCount(1, "queue-a")
self.assertMessageCount(0, "queue-b")
@@ -191,6 +205,8 @@ class DtxTests(TestBase):
channel = self.channel
#do some transactional work & complete the transaction
self.test_simple_commit()
+ # channel has been reset, so reselect for use with dtx
+ channel.dtx_demarcation_select()
#start association for the same xid as the previously completed txn
tx = self.xid("my-xid")
@@ -355,7 +371,7 @@ class DtxTests(TestBase):
self.assertEqual("two", msg.message_id)
channel.message_cancel(destination="results")
#ack the message then close the channel
- msg.ok()
+ msg.complete()
channel.channel_close()
channel = self.channel
@@ -446,7 +462,7 @@ class DtxTests(TestBase):
channel2.dtx_demarcation_select()
channel2.dtx_demarcation_start(xid=tx)
channel2.message_get(queue="dummy", destination="dummy")
- self.client.queue("dummy").get(timeout=1).ok()
+ self.client.queue("dummy").get(timeout=1).complete()
channel2.message_transfer(routing_key="dummy", body="whatever")
channel2.channel_close()
@@ -548,7 +564,9 @@ class DtxTests(TestBase):
channel.dtx_coordination_rollback(xid=x)
self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
- def xid(self, txid, branchqual = ''):
+ def xid(self, txid):
+ DtxTests.tx_counter += 1
+ branchqual = "v%s" % DtxTests.tx_counter
return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual
def txswap(self, tx, id):
@@ -573,7 +591,7 @@ class DtxTests(TestBase):
#consume from src:
channel.message_get(destination="temp-swap", queue=src)
msg = self.client.queue("temp-swap").get(timeout=1)
- msg.ok();
+ msg.complete();
#re-publish to dest
channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body)
diff --git a/python/tests_0-10/example.py b/python/tests_0-10/example.py
index 7ab4cc7d0a..dc71b0590b 100644
--- a/python/tests_0-10/example.py
+++ b/python/tests_0-10/example.py
@@ -90,5 +90,5 @@ class ExampleTest (TestBase):
self.assertEqual(body, msg.body)
# Now acknowledge the message.
- msg.ok()
+ msg.complete()
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index b25016e680..74e2b6416f 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -171,8 +171,8 @@ class MessageTests(TestBase):
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- msg1.ok(batchoffset=1)#One and Two
- msg4.ok()
+ msg2.complete(cumulative=True)#One and Two
+ msg4.complete(cumulative=False)
channel.message_recover(requeue=False)
@@ -215,8 +215,8 @@ class MessageTests(TestBase):
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- msg1.ok(batchoffset=1) #One and Two
- msg4.ok() #Four
+ msg2.complete(cumulative=True) #One and Two
+ msg4.complete(cumulative=False) #Four
channel.message_cancel(destination="consumer_tag")
@@ -276,14 +276,13 @@ class MessageTests(TestBase):
except Empty: None
#ack messages and check that the next set arrive ok:
- #todo: once batching is implmented, send a single response for all messages
- msg.ok(batchoffset=-4)#1-5
+ msg.complete()
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok(batchoffset=-4)#6-10
+ msg.complete()
try:
extra = queue.get(timeout=1)
@@ -320,13 +319,13 @@ class MessageTests(TestBase):
except Empty: None
#ack messages and check that the next set arrive ok:
- msg.ok(batchoffset=-4)#1-5
+ msg.complete()
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok(batchoffset=-4)#6-10
+ msg.complete()
try:
extra = queue.get(timeout=1)
@@ -376,9 +375,9 @@ class MessageTests(TestBase):
self.assertEqual("Message %d" % i, msg.body)
if (i==13):
- msg.ok(batchoffset=-2)#11, 12 & 13
+ msg.complete()#11, 12 & 13
if(i in [15, 17, 19]):
- msg.ok()
+ msg.complete(cumulative=False)
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
@@ -395,8 +394,7 @@ class MessageTests(TestBase):
self.assertEqual(reply.method.name, "ok")
msg = self.client.queue(tag).get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
- #channel.message_ack(delivery_tag=reply.delivery_tag)
+ msg.complete()
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py
index 0f6b4f5cd1..b499c2d1f9 100644
--- a/python/tests_0-10/tx.py
+++ b/python/tests_0-10/tx.py
@@ -30,23 +30,39 @@ class TxTests(TestBase):
"""
Test that commited publishes are delivered and commited acks are not re-delivered
"""
+ channel2 = self.client.channel(2)
+ channel2.channel_open()
+ self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
+ channel2.tx_commit()
+ channel2.channel_close()
+
+ #use a different channel with new subscriptions to ensure
+ #there is no redelivery of acked messages:
channel = self.channel
- queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c")
- channel.tx_commit()
+ channel.tx_select()
+
+ channel.message_consume(queue="tx-commit-a", destination="qa", no_ack=False)
+ queue_a = self.client.queue("qa")
+
+ channel.message_consume(queue="tx-commit-b", destination="qb", no_ack=False)
+ queue_b = self.client.queue("qb")
+
+ channel.message_consume(queue="tx-commit-c", destination="qc", no_ack=False)
+ queue_c = self.client.queue("qc")
#check results
for i in range(1, 5):
msg = queue_c.get(timeout=1)
self.assertEqual("TxMessage %d" % i, msg.body)
- msg.ok()
+ msg.complete()
msg = queue_b.get(timeout=1)
self.assertEqual("TxMessage 6", msg.body)
- msg.ok()
+ msg.complete()
msg = queue_a.get(timeout=1)
self.assertEqual("TxMessage 7", msg.body)
- msg.ok()
+ msg.complete()
for q in [queue_a, queue_b, queue_c]:
try:
@@ -76,15 +92,15 @@ class TxTests(TestBase):
for i in range(1, 5):
msg = queue_a.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
+ msg.complete()
msg = queue_b.get(timeout=1)
self.assertEqual("Message 6", msg.body)
- msg.ok()
+ msg.complete()
msg = queue_c.get(timeout=1)
self.assertEqual("Message 7", msg.body)
- msg.ok()
+ msg.complete()
for q in [queue_a, queue_b, queue_c]:
try:
@@ -114,15 +130,15 @@ class TxTests(TestBase):
for i in range(1, 5):
msg = queue_a.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
+ msg.complete()
msg = queue_b.get(timeout=1)
self.assertEqual("Message 6", msg.body)
- msg.ok()
+ msg.complete()
msg = queue_c.get(timeout=1)
self.assertEqual("Message 7", msg.body)
- msg.ok()
+ msg.complete()
for q in [queue_a, queue_b, queue_c]:
try:
@@ -150,10 +166,10 @@ class TxTests(TestBase):
channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic)
for i in range(1, 5):
- channel.message_transfer(routing_key=name_a, body="Message %d" % i)
+ channel.message_transfer(routing_key=name_a, message_id="msg%d" % i, body="Message %d" % i)
- channel.message_transfer(routing_key=key, destination="amq.direct", body="Message 6")
- channel.message_transfer(routing_key=topic, destination="amq.topic", body="Message 7")
+ channel.message_transfer(routing_key=key, destination="amq.direct", message_id="msg6", body="Message 6")
+ channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="msg7", body="Message 7")
channel.tx_select()
@@ -164,25 +180,25 @@ class TxTests(TestBase):
msg = queue_a.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok(batchoffset=-3)
+ msg.complete()
channel.message_consume(queue=name_b, destination="sub_b", no_ack=False)
queue_b = self.client.queue("sub_b")
msg = queue_b.get(timeout=1)
self.assertEqual("Message 6", msg.body)
- msg.ok()
+ msg.complete()
sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False)
queue_c = self.client.queue("sub_c")
msg = queue_c.get(timeout=1)
self.assertEqual("Message 7", msg.body)
- msg.ok()
+ msg.complete()
#publish messages
for i in range(1, 5):
- channel.message_transfer(routing_key=topic, destination="amq.topic", body="TxMessage %d" % i)
+ channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="tx-msg%d" % i, body="TxMessage %d" % i)
- channel.message_transfer(routing_key=key, destination="amq.direct", body="TxMessage 6")
- channel.message_transfer(routing_key=name_a, body="TxMessage 7")
+ channel.message_transfer(routing_key=key, destination="amq.direct", message_id="tx-msg6", body="TxMessage 6")
+ channel.message_transfer(routing_key=name_a, message_id="tx-msg7", body="TxMessage 7")
return queue_a, queue_b, queue_c