summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-10-29 15:23:25 +0000
committerAlan Conway <aconway@apache.org>2013-10-29 15:23:25 +0000
commit8303ede7e98c64c44d8186552873c091130485a9 (patch)
tree0c033d3686d5b9a890e4683be8675cf2ed1b9e55 /qpid/cpp/src
parente541155a1235c3d010a837a9e1f7806d03836854 (diff)
downloadqpid-python-8303ede7e98c64c44d8186552873c091130485a9.tar.gz
QPID-5139: Add unit test for deadlock caused by blocking HA commit.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1536751 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py1
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py27
2 files changed, 24 insertions, 4 deletions
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 3b5874875a..da28ff6712 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -128,6 +128,7 @@ class HaBroker(Broker):
ha_port = ha_port or HaPort(test)
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
+ "--log-enable=info+",
"--log-enable=debug+:ha::",
# Non-standard settings for faster tests.
"--link-maintenance-interval=0.1",
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 6941a2b545..ad546afc62 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1304,8 +1304,6 @@ def open_read(name):
class TransactionTests(HaBrokerTest):
- load_store=["--load-module", BrokerTest.test_store_lib]
-
def tx_simple_setup(self, broker):
"""Start a transaction, remove messages from queue a, add messages to queue b"""
c = broker.connect()
@@ -1437,6 +1435,9 @@ class TransactionTests(HaBrokerTest):
tx.sync()
self.assert_simple_rollback_outcome(cluster[0], tx_queues)
+ def assert_commit_raises(self, tx):
+ def commit_sync(): tx.commit(timeout=1); tx.sync(timeout=1)
+ self.assertRaises(ServerError, commit_sync)
def test_tx_backup_fail(self):
cluster = HaCluster(
@@ -1445,7 +1446,7 @@ class TransactionTests(HaBrokerTest):
tx = c.session(transactional=True)
s = tx.sender("q;{create:always,node:{durable:true}}")
for m in ["foo","bang","bar"]: s.send(Message(m, durable=True))
- self.assertRaises(ServerError, tx.commit)
+ self.assert_commit_raises(tx)
for b in cluster: b.assert_browse_backup("q", [])
self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n")
self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n")
@@ -1463,7 +1464,7 @@ class TransactionTests(HaBrokerTest):
self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster])
cluster[1].kill(final=False)
s.send("b")
- self.assertRaises(ServerError, tx.commit)
+ self.assert_commit_raises(tx)
self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]])
# Joining
@@ -1475,6 +1476,24 @@ class TransactionTests(HaBrokerTest):
# The new member is not in the tx but receives the results normal replication.
for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
+ def test_tx_block_threads(self):
+ """Verify that TXs blocked in commit don't deadlock."""
+ cluster = HaCluster(self, 2, args=["--worker-threads=2"], test_store=True)
+ n = 10 # Number of concurrent transactions
+ sessions = [cluster[0].connect().session(transactional=True) for i in xrange(n)]
+ # Have the store delay the response for 10s
+ for s in sessions:
+ sn = s.sender("qq;{create:always,node:{durable:true}}")
+ sn.send(Message("foo", durable=True))
+ self.assertEqual(n, len(cluster[1].agent().tx_queues()))
+ threads = [ Thread(target=s.commit) for s in sessions]
+ for t in threads: t.start()
+ cluster[0].ready(timeout=1) # Check for deadlock
+ for b in cluster: b.assert_browse_backup('qq', ['foo']*n)
+ for t in threads: t.join()
+ for s in sessions: s.connection.close()
+
+
if __name__ == "__main__":
outdir = "ha_tests.tmp"
shutil.rmtree(outdir, True)