summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-08-05 19:33:35 +0000
committerAlan Conway <aconway@apache.org>2013-08-05 19:33:35 +0000
commit8b4ce07e63b3ed12b43ae82fc487657c6e18f5e4 (patch)
tree7961e97040227d8253bb8cdc6bf7de11ec92d8de /qpid/cpp/src/tests
parent866bf57249e41266cd713a5f73e3d18d216fad31 (diff)
downloadqpid-python-8b4ce07e63b3ed12b43ae82fc487657c6e18f5e4.tar.gz
QPID-4327: HA Handle brokers joining and leaving during a transaction.
During a transaction: - A broker leaving aborts the transaction. - A broker joining does not participate in the transaction - but does receive the results of the TX via normal replication. Clean up tx-queues when the transaction completes. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1510678 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py9
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py38
2 files changed, 40 insertions, 7 deletions
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index cceb9795eb..ab63602655 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -48,6 +48,9 @@ class QmfAgent(object):
address, client_properties={"qpid.ha-admin":1}, **kwargs)
self._agent = BrokerAgent(self._connection)
+ def get_queues(self):
+ return [q.values['name'] for q in self._agent.getAllQueues()]
+
def __getattr__(self, name):
a = getattr(self._agent, name)
return a
@@ -107,7 +110,7 @@ class HaBroker(Broker):
ha_port = ha_port or HaPort(test)
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=trace+:ha::", # FIXME aconway 2013-07-29: debug
+ "--log-enable=debug+:ha::",
# Non-standard settings for faster tests.
"--link-maintenance-interval=0.1",
# Heartbeat and negotiate time are needed so that a broker wont
@@ -276,8 +279,8 @@ class HaCluster(object):
@s_args: args for specific brokers: s_args[i] for broker i.
"""
self.test = test
- self.args = args
- self.s_args = s_args
+ self.args = copy(args)
+ self.s_args = copy(s_args)
self.kwargs = kwargs
self._ports = [HaPort(test) for i in xrange(n)]
self._set_url()
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index e97614d785..e1ad5cc4fa 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -20,7 +20,7 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
import traceback
-from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
+from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty, ServerError
from qpid.datatypes import uuid4, UUID
from brokertest import *
from ha_test import *
@@ -1278,7 +1278,7 @@ class StoreTests(BrokerTest):
cluster[0].assert_browse("q1", ["x","y","z"])
cluster[1].assert_browse_backup("q1", ["x","y","z"])
- sn = cluster[0].connect(heartbeat=1).session() # FIXME aconway 2012-09-25: should fail over!
+ sn = cluster[0].connect(heartbeat=1).session()
sn.sender("ex/k1").send("boo")
cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"])
cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"])
@@ -1382,18 +1382,48 @@ class TransactionTests(BrokerTest):
self.assertEqual(open_read(cluster[1].store_log), expect)
def test_tx_backup_fail(self):
- # FIXME aconway 2013-07-31: check exception types, reduce timeout.
cluster = HaCluster(
self, 2, test_store=True, s_args=[[],["--test-store-throw=bang"]])
c = cluster[0].connect()
tx = c.session(transactional=True)
s = tx.sender("q;{create:always,node:{durable:true}}")
for m in ["foo","bang","bar"]: s.send(Message(m, durable=True))
- self.assertRaises(Exception, tx.commit)
+ self.assertRaises(ServerError, tx.commit)
for b in cluster: b.assert_browse_backup("q", [])
self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n")
self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n")
+ def test_tx_join_leave(self):
+ """Test cluster members joining/leaving cluster.
+ Also check that tx-queues are cleaned up at end of transaction."""
+
+ def tx_queues(broker):
+ return [q for q in broker.agent().get_queues() if q.startswith("qpid.ha-tx")]
+
+ cluster = HaCluster(self, 3)
+
+ # Leaving
+ tx = cluster[0].connect().session(transactional=True)
+ s = tx.sender("q;{create:always}")
+ s.send("a", sync=True)
+ self.assertEqual([1,1,1], [len(tx_queues(b)) for b in cluster])
+ cluster[1].kill(final=False)
+ s.send("b")
+ self.assertRaises(ServerError, tx.commit)
+ self.assertEqual([[],[]], [tx_queues(b) for b in [cluster[0],cluster[2]]])
+
+ # Joining
+ tx = cluster[0].connect().session(transactional=True)
+ s = tx.sender("q;{create:always}")
+ s.send("foo")
+ tx_q = tx_queues(cluster[0])[0]
+ cluster.restart(1)
+ # Verify the new member should not be in the transaction.
+ # but should receive the result of the transaction via normal replication.
+ cluster[1].wait_no_queue(tx_q)
+ tx.commit()
+ for b in cluster: b.assert_browse_backup("q", ["foo"])
+
if __name__ == "__main__":
outdir = "ha_tests.tmp"
shutil.rmtree(outdir, True)