diff options
| author | Alan Conway <aconway@apache.org> | 2013-11-12 16:58:52 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-11-12 16:58:52 +0000 |
| commit | 0630ea05003e6c530b9dde889e8296b12e67e41b (patch) | |
| tree | d3c441f0071f398f9ad31f872a95363109bf9212 /qpid/cpp/src/tests | |
| parent | c9b6567bbd2167284d357f4021954e8e5f976b67 (diff) | |
| download | qpid-python-0630ea05003e6c530b9dde889e8296b12e67e41b.tar.gz | |
QPID-5275: HA transactions failing in qpid-cluster-benchmark
The test was failing due to incorrect handling of the transaction lifecycle:
- Failing to handle the automatic rollback of the empty TX at session close.
- Deleting the tx-q before all backups were finished with it.
The fixes include
- Make tx-q auto-delete, deleted only when the TxReplicators cancel their subscriptions.
- Use markInUse/releaseFromUse on the primary to keep the tx-q until the primary is done.
- Count TxReplicators for auto-delete (unlike normal QueueReplicators)
- Improved error handling and log messages
- Handle *incoming* exceptions on a federation link by passing to ErrorListener
- QueueReplicator catches incoming not-found and resource-deleted exceptions
- close the backup bridge, handle race between subscribe and delete.
- Simplify QueueSnapshots, remove need for snapshot map.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1541146 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
| -rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 22 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 63 |
2 files changed, 62 insertions, 23 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index d1f020a945..28e7f6b182 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -381,6 +381,16 @@ class Broker(Popen): "Broker %s not responding: (%s)%s"%( self.name,e,error_line(self.log, 5))) + def assert_log_clean(self, ignore=None): + log = open(self.get_log()) + try: + error = re.compile("] error|] critical") + if ignore: ignore = re.compile(ignore) + else: ignore = re.compile("\000") # Won't match anything + for line in log.readlines(): + assert not error.search(line) or ignore.search(line), "Errors in log file %s: %s"%(log, line) + finally: log.close() + def browse(session, queue, timeout=0, transform=lambda m: m.content): """Return a list with the contents of each message on queue.""" r = session.receiver("%s;{mode:browse}"%(queue)) @@ -549,7 +559,11 @@ class NumberedSender(Thread): self.condition.release() self.write_message(self.sent) self.sent += 1 - except Exception: self.error = RethrownException(self.sender.pname) + except Exception, e: + self.error = RethrownException( + "%s: (%s)%s"%(self.sender.pname,e, + error_line(self.sender.outfile("err")))) + def notify_received(self, count): """Called by receiver to enable flow control. count = messages received so far.""" @@ -612,8 +626,10 @@ class NumberedReceiver(Thread): if self.sender: self.sender.notify_received(self.received) m = self.read_message() - except Exception: - self.error = RethrownException(self.receiver.pname) + except Exception, e: + self.error = RethrownException( + "%s: (%s)%s"%(self.receiver.pname,e, + error_line(self.receiver.outfile("err")))) def check(self): """Raise an exception if there has been an error""" diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 79024d48e3..138868f64e 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -30,19 +30,10 @@ from qpidtoollibs import BrokerAgent, EventHelper log = getLogger(__name__) -def grep(filename, regexp): - for line in open(filename).readlines(): - if (regexp.search(line)): return True - return False class HaBrokerTest(BrokerTest): """Base class for HA broker tests""" - def assert_log_no_errors(self, broker): - log = broker.get_log() - if grep(log, re.compile("] error|] critical")): - self.fail("Errors in log file %s"%(log)) - class ReplicationTests(HaBrokerTest): """Correctness tests for HA replication.""" @@ -838,7 +829,7 @@ acl deny all all # It is possible for the backup to attempt to subscribe after the queue # is deleted. This is not an error, but is logged as an error on the primary. # The backup does not log this as an error so we only check the backup log for errors. - self.assert_log_no_errors(cluster[1]) + cluster[1].assert_log_clean() def test_missed_recreate(self): """If a queue or exchange is destroyed and one with the same name re-created @@ -1003,6 +994,32 @@ class LongTests(HaBrokerTest): dead = filter(lambda b: not b.is_running(), brokers) if dead: raise Exception("Brokers not running: %s"%dead) + def test_tx_send_receive(self): + brokers = HaCluster(self, 3) + sender = self.popen( + ["qpid-send", + "--broker", brokers[0].host_port(), + "--address", "q;{create:always}", + "--messages=1000", + "--tx=10" + ]) + receiver = self.popen( + ["qpid-receive", + "--broker", brokers[0].host_port(), + "--address", "q;{create:always}", + "--messages=990", + "--timeout=10", + "--tx=10" + ]) + self.assertEqual(sender.wait(), 0) + self.assertEqual(receiver.wait(), 0) + expect = [long(i) for i in range(991, 1001)] + sn = lambda m: m.properties["sn"] + brokers[0].assert_browse("q", expect, transform=sn) + brokers[1].assert_browse_backup("q", expect, transform=sn) + brokers[2].assert_browse_backup("q", expect, transform=sn) + + def test_qmf_order(self): """QPID 4402: HA QMF events can be out of order. This test mimics the test described in the JIRA. Two threads repeatedly @@ -1352,12 +1369,14 @@ class TransactionTests(HaBrokerTest): def assert_tx_clean(self, b): """Verify that there are no transaction artifacts (exchanges, queues, subscriptions) on b.""" - queues=[] - def txq(): queues = b.agent().tx_queues(); return not queues - assert retry(txq), "%s: unexpected %s"%(b,queues) - subs=[] - def txs(): subs = self.tx_subscriptions(b); return not subs - assert retry(txs), "%s: unexpected %s"%(b,subs) + class FunctionCache: # Call a function and cache the result. + def __init__(self, f): self.f, self.value = f, None + def __call__(self): self.value = self.f(); return self.value + + txq= FunctionCache(b.agent().tx_queues) + assert retry(lambda: not txq()), "%s: unexpected %s"%(b, txq.value) + txsub = FunctionCache(lambda: self.tx_subscriptions(b)) + assert retry(lambda: not txsub()), "%s: unexpected %s"%(b, txsub.value) # TODO aconway 2013-10-15: TX exchanges don't show up in management. def assert_simple_commit_outcome(self, b, tx_queues): @@ -1462,18 +1481,22 @@ class TransactionTests(HaBrokerTest): self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster]) cluster[1].kill(final=False) s.send("b") - self.assert_commit_raises(tx) - for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b) + tx.commit() + tx.close() + for b in [cluster[0],cluster[2]]: + self.assert_tx_clean(b) + b.assert_browse_backup("q", ["a","b"], msg=b) # Joining tx = cluster[0].connect().session(transactional=True) s = tx.sender("q;{create:always}") s.send("foo") - cluster.restart(1) + cluster.restart(1) # Not a part of the current transaction. tx.commit() tx.close() for b in cluster: self.assert_tx_clean(b) # The new member is not in the tx but receives the results normal replication. - for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b) + for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b) + # FIXME aconway 2013-11-07: assert_log_clean def test_tx_block_threads(self): """Verify that TXs blocked in commit don't deadlock.""" |
