diff options
Diffstat (limited to 'qpid/cpp/src/tests/cluster_tests.py')
| -rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 204 |
1 files changed, 202 insertions, 2 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 807e9508c3..e67a691283 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -713,6 +713,204 @@ acl allow all all cluster.start() fetch(cluster[2]) +# Some utility code for transaction tests +XA_RBROLLBACK = 1 +XA_RBTIMEOUT = 2 +XA_OK = 0 +dtx_branch_counter = 0 + +class DtxTestFixture: + """Bundle together some common requirements for dtx tests.""" + def __init__(self, test, broker, name, exclusive=False): + self.test = test + self.broker = broker + self.name = name + # Use old API. DTX is not supported in messaging API. + self.connection = broker.connect_old() + self.session = self.connection.session(name, 1) # 1 second timeout + self.queue = self.session.queue_declare(name, exclusive=exclusive) + self.xid = self.session.xid(format=0, global_id=name) + self.session.dtx_select() + self.consumer = None + + def start(self): + self.test.assertEqual(XA_OK, self.session.dtx_start(xid=self.xid).status) + + def end(self): + self.test.assertEqual(XA_OK, self.session.dtx_end(xid=self.xid).status) + + def prepare(self): + self.test.assertEqual(XA_OK, self.session.dtx_prepare(xid=self.xid).status) + + def commit(self, one_phase=True): + self.test.assertEqual( + XA_OK, self.session.dtx_commit(xid=self.xid, one_phase=one_phase).status) + + def rollback(self): + self.test.assertEqual(XA_OK, self.session.dtx_rollback(xid=self.xid).status) + + def send(self, messages): + for m in messages: + dp=self.session.delivery_properties(routing_key=self.name) + mp=self.session.message_properties() + self.session.message_transfer(message=qpid.datatypes.Message(dp, mp, m)) + + def accept(self): + """Accept 1 message from queue""" + consumer_tag="%s-consumer"%(self.name) + self.session.message_subscribe(queue=self.name, destination=consumer_tag) + self.session.message_flow(unit = self.session.credit_unit.message, value = 1, destination = consumer_tag) + self.session.message_flow(unit = self.session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag) + msg = self.session.incoming(consumer_tag).get(timeout=1) + self.session.message_cancel(destination=consumer_tag) + self.session.message_accept(qpid.datatypes.RangedSet(msg.id)) + return msg + + + def verify(self, cluster, messages): + for b in cluster: + self.test.assert_browse(b.connect().session(), self.name, messages) + + +class DtxTests(BrokerTest): + + def test_dtx_update(self): + """Verify that DTX transaction state is updated to a new broker. + Start a collection of transactions, then add a new cluster member, + then verify they commit/rollback correctly on the new broker.""" + + # Note: multiple test have been bundled into one to avoid the need to start/stop + # multiple brokers per test. + + cluster=self.cluster(1) + + # Transaction that will be open when new member joins, then committed. + t1 = DtxTestFixture(self, cluster[0], "t1") + t1.start() + t1.send(["1", "2"]) + t1.verify(cluster, []) # Not visible outside of transaction + + # Transaction that will be open when new member joins, then rolled back. + t2 = DtxTestFixture(self, cluster[0], "t2") + t2.start() + t2.send(["1", "2"]) + + # Transaction that will be prepared when new member joins, then committed. + t3 = DtxTestFixture(self, cluster[0], "t3") + t3.start() + t3.send(["1", "2"]) + t3.end() + t3.prepare() + t1.verify(cluster, []) # Not visible outside of transaction + + # Transaction that will be prepared when new member joins, then rolled back. + t4 = DtxTestFixture(self, cluster[0], "t4") + t4.start() + t4.send(["1", "2"]) + t4.end() + t4.prepare() + + # Transaction using an exclusive queue + t5 = DtxTestFixture(self, cluster[0], "t5", exclusive=True) + t5.start() + t5.send(["1", "2"]) + + # Accept messages in a transaction before/after join then commit + t6 = DtxTestFixture(self, cluster[0], "t6") + t6.send(["a","b","c"]) + t6.start() + t6.verify(cluster, ["a","b","c"]) + self.assertEqual(t6.accept().body, "a"); + t6.verify(cluster, ["b","c"]) + + # Accept messages in a transaction before/after join then roll back + t7 = DtxTestFixture(self, cluster[0], "t7") + t7.send(["a","b","c"]) + t7.start() + t7.verify(cluster, ["a","b","c"]) + self.assertEqual(t7.accept().body, "a"); + t7.verify(cluster, ["b","c"]) + + # Start new member + cluster.start() + + # Commit t1 + t1.send(["3","4"]) + t1.verify(cluster, []) + t1.end() + t1.commit(one_phase=True) + t1.verify(cluster, ["1","2","3","4"]) + + # Rollback t2 + t2.send(["3","4"]) + t2.verify(cluster, []) + t2.end() + t2.rollback() + t2.verify(cluster, []) + + # Commit t3 + t3.verify(cluster, []) + t3.commit(one_phase=False) + t3.verify(cluster, ["1","2"]) + + # Rollback t4 + t4.verify(cluster, []) + t4.rollback() + t4.verify(cluster, []) + + # Commit t5 + t5.send(["3","4"]) + t5.verify(cluster, []) + t5.end() + t5.commit(one_phase=True) + t5.verify(cluster, ["1","2","3","4"]) + + # Commit t7 + t6.verify(cluster, ["b", "c"]) + self.assertEqual(t6.accept().body, "b"); + t6.verify(cluster, ["c"]) + t6.end() + t6.commit(one_phase=True) + t6.verify(cluster, ["c"]) + t6.session.close() # Make sure they're not requeued by the session. + t6.verify(cluster, ["c"]) + + # Rollback t7 + t7.verify(cluster, ["b", "c"]) + self.assertEqual(t7.accept().body, "b"); + t7.verify(cluster, ["c"]) + t7.end() + t7.rollback() + t7.verify(cluster, ["a", "b", "c"]) + + +class TxTests(BrokerTest): + + def test_tx_update(self): + """Verify that transaction state is updated to a new broker""" + + def make_message(session, body=None, key=None, id=None): + dp=session.delivery_properties(routing_key=key) + mp=session.message_properties(correlation_id=id) + return qpid.datatypes.Message(dp, mp, body) + + cluster=self.cluster(1) + # Use old API. TX is not supported in messaging API. + c = cluster[0].connect_old() + s = c.session("tx-session", 1) + s.queue_declare(queue="q") + # Start transaction + s.tx_select() + s.message_transfer(message=make_message(s, "1", "q")) + # Start new member mid-transaction + cluster.start() + # Do more work + s.message_transfer(message=make_message(s, "2", "q")) + # Commit the transaction and verify the results. + s.tx_commit() + for b in cluster: self.assert_browse(b.connect().session(), "q", ["1","2"]) + + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): @@ -1001,6 +1199,8 @@ class LongTests(BrokerTest): logger = logging.getLogger() log_level = logger.getEffectiveLevel() logger.setLevel(logging.ERROR) + sender = None + receiver = None try: # Start sender and receiver threads receiver = Receiver(cluster[0], "q;{create:always}") @@ -1031,8 +1231,8 @@ class LongTests(BrokerTest): finally: # Detach to avoid slow reconnect attempts during shut-down if test fails. - sender.connection.detach() - receiver.connection.detach() + if sender: sender.connection.detach() + if receiver: receiver.connection.detach() logger.setLevel(log_level) class StoreTests(BrokerTest): |
