summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-11-12 16:58:52 +0000
committerAlan Conway <aconway@apache.org>2013-11-12 16:58:52 +0000
commit0630ea05003e6c530b9dde889e8296b12e67e41b (patch)
treed3c441f0071f398f9ad31f872a95363109bf9212 /qpid/cpp/src/tests
parentc9b6567bbd2167284d357f4021954e8e5f976b67 (diff)
downloadqpid-python-0630ea05003e6c530b9dde889e8296b12e67e41b.tar.gz
QPID-5275: HA transactions failing in qpid-cluster-benchmark
The test was failing due to incorrect handling of the transaction lifecycle: - Failing to handle the automatic rollback of the empty TX at session close. - Deleting the tx-q before all backups were finished with it. The fixes include - Make tx-q auto-delete, deleted only when the TxReplicators cancel their subscriptions. - Use markInUse/releaseFromUse on the primary to keep the tx-q until the primary is done. - Count TxReplicators for auto-delete (unlike normal QueueReplicators) - Improved error handling and log messages - Handle *incoming* exceptions on a federation link by passing to ErrorListener - QueueReplicator catches incoming not-found and resource-deleted exceptions - close the backup bridge, handle race between subscribe and delete. - Simplify QueueSnapshots, remove need for snapshot map. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1541146 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/brokertest.py22
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py63
2 files changed, 62 insertions, 23 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index d1f020a945..28e7f6b182 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -381,6 +381,16 @@ class Broker(Popen):
"Broker %s not responding: (%s)%s"%(
self.name,e,error_line(self.log, 5)))
+ def assert_log_clean(self, ignore=None):
+ log = open(self.get_log())
+ try:
+ error = re.compile("] error|] critical")
+ if ignore: ignore = re.compile(ignore)
+ else: ignore = re.compile("\000") # Won't match anything
+ for line in log.readlines():
+ assert not error.search(line) or ignore.search(line), "Errors in log file %s: %s"%(log, line)
+ finally: log.close()
+
def browse(session, queue, timeout=0, transform=lambda m: m.content):
"""Return a list with the contents of each message on queue."""
r = session.receiver("%s;{mode:browse}"%(queue))
@@ -549,7 +559,11 @@ class NumberedSender(Thread):
self.condition.release()
self.write_message(self.sent)
self.sent += 1
- except Exception: self.error = RethrownException(self.sender.pname)
+ except Exception, e:
+ self.error = RethrownException(
+ "%s: (%s)%s"%(self.sender.pname,e,
+ error_line(self.sender.outfile("err"))))
+
def notify_received(self, count):
"""Called by receiver to enable flow control. count = messages received so far."""
@@ -612,8 +626,10 @@ class NumberedReceiver(Thread):
if self.sender:
self.sender.notify_received(self.received)
m = self.read_message()
- except Exception:
- self.error = RethrownException(self.receiver.pname)
+ except Exception, e:
+ self.error = RethrownException(
+ "%s: (%s)%s"%(self.receiver.pname,e,
+ error_line(self.receiver.outfile("err"))))
def check(self):
"""Raise an exception if there has been an error"""
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 79024d48e3..138868f64e 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -30,19 +30,10 @@ from qpidtoollibs import BrokerAgent, EventHelper
log = getLogger(__name__)
-def grep(filename, regexp):
- for line in open(filename).readlines():
- if (regexp.search(line)): return True
- return False
class HaBrokerTest(BrokerTest):
"""Base class for HA broker tests"""
- def assert_log_no_errors(self, broker):
- log = broker.get_log()
- if grep(log, re.compile("] error|] critical")):
- self.fail("Errors in log file %s"%(log))
-
class ReplicationTests(HaBrokerTest):
"""Correctness tests for HA replication."""
@@ -838,7 +829,7 @@ acl deny all all
# It is possible for the backup to attempt to subscribe after the queue
# is deleted. This is not an error, but is logged as an error on the primary.
# The backup does not log this as an error so we only check the backup log for errors.
- self.assert_log_no_errors(cluster[1])
+ cluster[1].assert_log_clean()
def test_missed_recreate(self):
"""If a queue or exchange is destroyed and one with the same name re-created
@@ -1003,6 +994,32 @@ class LongTests(HaBrokerTest):
dead = filter(lambda b: not b.is_running(), brokers)
if dead: raise Exception("Brokers not running: %s"%dead)
+ def test_tx_send_receive(self):
+ brokers = HaCluster(self, 3)
+ sender = self.popen(
+ ["qpid-send",
+ "--broker", brokers[0].host_port(),
+ "--address", "q;{create:always}",
+ "--messages=1000",
+ "--tx=10"
+ ])
+ receiver = self.popen(
+ ["qpid-receive",
+ "--broker", brokers[0].host_port(),
+ "--address", "q;{create:always}",
+ "--messages=990",
+ "--timeout=10",
+ "--tx=10"
+ ])
+ self.assertEqual(sender.wait(), 0)
+ self.assertEqual(receiver.wait(), 0)
+ expect = [long(i) for i in range(991, 1001)]
+ sn = lambda m: m.properties["sn"]
+ brokers[0].assert_browse("q", expect, transform=sn)
+ brokers[1].assert_browse_backup("q", expect, transform=sn)
+ brokers[2].assert_browse_backup("q", expect, transform=sn)
+
+
def test_qmf_order(self):
"""QPID 4402: HA QMF events can be out of order.
This test mimics the test described in the JIRA. Two threads repeatedly
@@ -1352,12 +1369,14 @@ class TransactionTests(HaBrokerTest):
def assert_tx_clean(self, b):
"""Verify that there are no transaction artifacts
(exchanges, queues, subscriptions) on b."""
- queues=[]
- def txq(): queues = b.agent().tx_queues(); return not queues
- assert retry(txq), "%s: unexpected %s"%(b,queues)
- subs=[]
- def txs(): subs = self.tx_subscriptions(b); return not subs
- assert retry(txs), "%s: unexpected %s"%(b,subs)
+ class FunctionCache: # Call a function and cache the result.
+ def __init__(self, f): self.f, self.value = f, None
+ def __call__(self): self.value = self.f(); return self.value
+
+ txq= FunctionCache(b.agent().tx_queues)
+ assert retry(lambda: not txq()), "%s: unexpected %s"%(b, txq.value)
+ txsub = FunctionCache(lambda: self.tx_subscriptions(b))
+ assert retry(lambda: not txsub()), "%s: unexpected %s"%(b, txsub.value)
# TODO aconway 2013-10-15: TX exchanges don't show up in management.
def assert_simple_commit_outcome(self, b, tx_queues):
@@ -1462,18 +1481,22 @@ class TransactionTests(HaBrokerTest):
self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster])
cluster[1].kill(final=False)
s.send("b")
- self.assert_commit_raises(tx)
- for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b)
+ tx.commit()
+ tx.close()
+ for b in [cluster[0],cluster[2]]:
+ self.assert_tx_clean(b)
+ b.assert_browse_backup("q", ["a","b"], msg=b)
# Joining
tx = cluster[0].connect().session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("foo")
- cluster.restart(1)
+ cluster.restart(1) # Not a part of the current transaction.
tx.commit()
tx.close()
for b in cluster: self.assert_tx_clean(b)
# The new member is not in the tx but receives the results normal replication.
- for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
+ for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b)
+ # FIXME aconway 2013-11-07: assert_log_clean
def test_tx_block_threads(self):
"""Verify that TXs blocked in commit don't deadlock."""