diff options
Diffstat (limited to 'python/tests/tx.py')
| -rw-r--r-- | python/tests/tx.py | 104 |
1 files changed, 41 insertions, 63 deletions
diff --git a/python/tests/tx.py b/python/tests/tx.py index 054fb8d8b7..55a5eaeade 100644 --- a/python/tests/tx.py +++ b/python/tests/tx.py @@ -37,22 +37,24 @@ class TxTests(TestBase): #check results for i in range(1, 5): msg = queue_c.get(timeout=1) - self.assertEqual("TxMessage %d" % i, msg.content.body) + self.assertEqual("TxMessage %d" % i, msg.body) + msg.ok() msg = queue_b.get(timeout=1) - self.assertEqual("TxMessage 6", msg.content.body) + self.assertEqual("TxMessage 6", msg.body) + msg.ok() msg = queue_a.get(timeout=1) - self.assertEqual("TxMessage 7", msg.content.body) + self.assertEqual("TxMessage 7", msg.body) + msg.ok() for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None #cleanup - channel.basic_ack(delivery_tag=0, multiple=True) channel.tx_commit() def test_auto_rollback(self): @@ -65,7 +67,7 @@ class TxTests(TestBase): for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None channel.tx_rollback() @@ -73,22 +75,24 @@ class TxTests(TestBase): #check results for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) + self.assertEqual("Message %d" % i, msg.body) + msg.ok() msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) + self.assertEqual("Message 6", msg.body) + msg.ok() msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) + self.assertEqual("Message 7", msg.body) + msg.ok() for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None #cleanup - channel.basic_ack(delivery_tag=0, multiple=True) channel.tx_commit() def test_rollback(self): @@ -101,7 +105,7 @@ class TxTests(TestBase): for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None channel.tx_rollback() @@ -109,22 +113,24 @@ class TxTests(TestBase): #check results for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) + self.assertEqual("Message %d" % i, msg.body) + msg.ok() msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) + self.assertEqual("Message 6", msg.body) + msg.ok() msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) + self.assertEqual("Message 7", msg.body) + msg.ok() for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None #cleanup - channel.basic_ack(delivery_tag=0, multiple=True) channel.tx_commit() def perform_txn_work(self, channel, name_a, name_b, name_c): @@ -144,66 +150,38 @@ class TxTests(TestBase): channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) for i in range(1, 5): - channel.basic_publish(routing_key=name_a, content=Content("Message %d" % i)) + channel.message_transfer(routing_key=name_a, body="Message %d" % i) - channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("Message 6")) - channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("Message 7")) + channel.message_transfer(routing_key=key, destination="amq.direct", body="Message 6") + channel.message_transfer(routing_key=topic, destination="amq.topic", body="Message 7") channel.tx_select() #consume and ack messages - sub_a = channel.basic_consume(queue=name_a, no_ack=False) - queue_a = self.client.queue(sub_a.consumer_tag) + channel.message_consume(queue=name_a, destination="sub_a", no_ack=False) + queue_a = self.client.queue("sub_a") for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + self.assertEqual("Message %d" % i, msg.body) + msg.ok() - sub_b = channel.basic_consume(queue=name_b, no_ack=False) - queue_b = self.client.queue(sub_b.consumer_tag) + channel.message_consume(queue=name_b, destination="sub_b", no_ack=False) + queue_b = self.client.queue("sub_b") msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) - channel.basic_ack(delivery_tag=msg.delivery_tag) + self.assertEqual("Message 6", msg.body) + msg.ok() - sub_c = channel.basic_consume(queue=name_c, no_ack=False) - queue_c = self.client.queue(sub_c.consumer_tag) + sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False) + queue_c = self.client.queue("sub_c") msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) - channel.basic_ack(delivery_tag=msg.delivery_tag) + self.assertEqual("Message 7", msg.body) + msg.ok() #publish messages for i in range(1, 5): - channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("TxMessage %d" % i)) + channel.message_transfer(routing_key=topic, destination="amq.topic", body="TxMessage %d" % i) - channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("TxMessage 6")) - channel.basic_publish(routing_key=name_a, content=Content("TxMessage 7")) + channel.message_transfer(routing_key=key, destination="amq.direct", body="TxMessage 6") + channel.message_transfer(routing_key=name_a, body="TxMessage 7") return queue_a, queue_b, queue_c - - def test_commit_overlapping_acks(self): - """ - Test that logically 'overlapping' acks do not cause errors on commit - """ - channel = self.channel - channel.queue_declare(queue="commit-overlapping", exclusive=True) - for i in range(1, 10): - channel.basic_publish(routing_key="commit-overlapping", content=Content("Message %d" % i)) - - - channel.tx_select() - - sub = channel.basic_consume(queue="commit-overlapping", no_ack=False) - queue = self.client.queue(sub.consumer_tag) - for i in range(1, 10): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - if i in [3, 6, 10]: - channel.basic_ack(delivery_tag=msg.delivery_tag) - - channel.tx_commit() - - #check all have been acked: - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None |
