diff options
| author | Alan Conway <aconway@apache.org> | 2013-08-05 19:33:35 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-08-05 19:33:35 +0000 |
| commit | 8b4ce07e63b3ed12b43ae82fc487657c6e18f5e4 (patch) | |
| tree | 7961e97040227d8253bb8cdc6bf7de11ec92d8de /qpid/cpp/src/tests | |
| parent | 866bf57249e41266cd713a5f73e3d18d216fad31 (diff) | |
| download | qpid-python-8b4ce07e63b3ed12b43ae82fc487657c6e18f5e4.tar.gz | |
QPID-4327: HA Handle brokers joining and leaving during a transaction.
During a transaction:
- A broker leaving aborts the transaction.
- A broker joining does not participate in the transaction
- but does receive the results of the TX via normal replication.
Clean up tx-queues when the transaction completes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1510678 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
| -rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 9 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 38 |
2 files changed, 40 insertions, 7 deletions
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index cceb9795eb..ab63602655 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -48,6 +48,9 @@ class QmfAgent(object): address, client_properties={"qpid.ha-admin":1}, **kwargs) self._agent = BrokerAgent(self._connection) + def get_queues(self): + return [q.values['name'] for q in self._agent.getAllQueues()] + def __getattr__(self, name): a = getattr(self._agent, name) return a @@ -107,7 +110,7 @@ class HaBroker(Broker): ha_port = ha_port or HaPort(test) args = copy(args) args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=trace+:ha::", # FIXME aconway 2013-07-29: debug + "--log-enable=debug+:ha::", # Non-standard settings for faster tests. "--link-maintenance-interval=0.1", # Heartbeat and negotiate time are needed so that a broker wont @@ -276,8 +279,8 @@ class HaCluster(object): @s_args: args for specific brokers: s_args[i] for broker i. """ self.test = test - self.args = args - self.s_args = s_args + self.args = copy(args) + self.s_args = copy(s_args) self.kwargs = kwargs self._ports = [HaPort(test) for i in xrange(n)] self._set_url() diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index e97614d785..e1ad5cc4fa 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -20,7 +20,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest import traceback -from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty +from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty, ServerError from qpid.datatypes import uuid4, UUID from brokertest import * from ha_test import * @@ -1278,7 +1278,7 @@ class StoreTests(BrokerTest): cluster[0].assert_browse("q1", ["x","y","z"]) cluster[1].assert_browse_backup("q1", ["x","y","z"]) - sn = cluster[0].connect(heartbeat=1).session() # FIXME aconway 2012-09-25: should fail over! + sn = cluster[0].connect(heartbeat=1).session() sn.sender("ex/k1").send("boo") cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"]) cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"]) @@ -1382,18 +1382,48 @@ class TransactionTests(BrokerTest): self.assertEqual(open_read(cluster[1].store_log), expect) def test_tx_backup_fail(self): - # FIXME aconway 2013-07-31: check exception types, reduce timeout. cluster = HaCluster( self, 2, test_store=True, s_args=[[],["--test-store-throw=bang"]]) c = cluster[0].connect() tx = c.session(transactional=True) s = tx.sender("q;{create:always,node:{durable:true}}") for m in ["foo","bang","bar"]: s.send(Message(m, durable=True)) - self.assertRaises(Exception, tx.commit) + self.assertRaises(ServerError, tx.commit) for b in cluster: b.assert_browse_backup("q", []) self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n") self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n") + def test_tx_join_leave(self): + """Test cluster members joining/leaving cluster. + Also check that tx-queues are cleaned up at end of transaction.""" + + def tx_queues(broker): + return [q for q in broker.agent().get_queues() if q.startswith("qpid.ha-tx")] + + cluster = HaCluster(self, 3) + + # Leaving + tx = cluster[0].connect().session(transactional=True) + s = tx.sender("q;{create:always}") + s.send("a", sync=True) + self.assertEqual([1,1,1], [len(tx_queues(b)) for b in cluster]) + cluster[1].kill(final=False) + s.send("b") + self.assertRaises(ServerError, tx.commit) + self.assertEqual([[],[]], [tx_queues(b) for b in [cluster[0],cluster[2]]]) + + # Joining + tx = cluster[0].connect().session(transactional=True) + s = tx.sender("q;{create:always}") + s.send("foo") + tx_q = tx_queues(cluster[0])[0] + cluster.restart(1) + # Verify the new member should not be in the transaction. + # but should receive the result of the transaction via normal replication. + cluster[1].wait_no_queue(tx_q) + tx.commit() + for b in cluster: b.assert_browse_backup("q", ["foo"]) + if __name__ == "__main__": outdir = "ha_tests.tmp" shutil.rmtree(outdir, True) |
