diff options
Diffstat (limited to 'qpid/cpp/src/tests')
| -rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 17 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 57 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/qpid-txtest2.cpp | 12 |
3 files changed, 76 insertions, 10 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 44824fe67e..461ef0de9a 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -428,17 +428,22 @@ class Broker(Popen): assert not error.search(line) or ignore.search(line), "Errors in log file %s: %s"%(log, line) finally: log.close() +def receiver_iter(receiver, timeout=0): + """Make an iterator out of a receiver. Returns messages till Empty is raised.""" + try: + while True: + yield receiver.fetch(timeout=timeout) + except qm.Empty: + pass + def browse(session, queue, timeout=0, transform=lambda m: m.content): """Return a list with the contents of each message on queue.""" r = session.receiver("%s;{mode:browse}"%(queue)) r.capacity = 100 try: - contents = [] - try: - while True: contents.append(transform(r.fetch(timeout=timeout))) - except qm.Empty: pass - finally: r.close() - return contents + return [transform(m) for m in receiver_iter(r, timeout)] + finally: + r.close() def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"): """Assert that the contents of messages on queue (as retrieved diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index f71560dffb..58b3ff2802 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1217,7 +1217,6 @@ class RecoveryTests(HaBrokerTest): def test_stalled_backup(self): """Make sure that a stalled backup broker does not stall the primary""" - # FIXME aconway 2014-04-15: merge with test_join_ready_cluster? cluster = HaCluster(self, 3, args=["--link-heartbeat-interval=1"]) os.kill(cluster[1].pid, signal.SIGSTOP) s = cluster[0].connect().session() @@ -1272,7 +1271,7 @@ class StoreTests(HaBrokerTest): """Verify that a backup erases queue data from store recovery before doing catch-up from the primary.""" if self.check_skip(): return - cluster = HaCluster(self, 2) + cluster = HaCluster(self, 2, args=['--log-enable=trace+:ha', '--log-enable=trace+:Store']) sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session() s1 = sn.sender("q1;{create:always,node:{durable:true}}") for m in ["foo","bar"]: s1.send(qm.Message(m, durable=True)) @@ -1362,9 +1361,61 @@ class TransactionTests(HaBrokerTest): tx.acknowledge() tx.commit() tx.sync() + tx.close() + + for b in cluster: + self.assert_simple_commit_outcome(b, tx_queues) + + # Verify non-tx dequeue is replicated correctly + c = cluster.connect(0, protocol=self.tx_protocol) + r = c.session().receiver("b") + ri = receiver_iter(r, timeout=1) + self.assertEqual(['0', '1', '2', 'x', 'y', 'z'], [m.content for m in ri]) + r.session.acknowledge() + for b in cluster: b.assert_browse_backup("b", [], msg=b) + + def check_enq_deq(self, cluster, queue, expect): + for b in cluster: + q = b.agent.getQueue(queue) + self.assertEqual( + (b.name,)+expect, + (b.name, q.msgTotalEnqueues, q.msgTotalDequeues, q.msgTxnEnqueues, q.msgTxnDequeues)) + + def test_tx_enq_notx_deq(self): + """Verify that a non-tx dequeue of a tx enqueue is replicated correctly""" + cluster = HaCluster(self, 2, test_store=True) + c = cluster.connect(0, protocol=self.tx_protocol) + tx = c.session(transactional=True) + c.session().sender("qq;{create:always}").send("m1") + tx.sender("qq;{create:always}").send("tx") + tx.commit() tx.close() - for b in cluster: self.assert_simple_commit_outcome(b, tx_queues) + c.session().sender("qq;{create:always}").send("m2") + self.check_enq_deq(cluster, 'qq', (3, 0, 1, 0)) + + notx = c.session() + self.assertEqual(['m1', 'tx', 'm2'], [m.content for m in receiver_iter(notx.receiver('qq'))]) + notx.acknowledge() + self.check_enq_deq(cluster, 'qq', (3, 3, 1, 0)) + for b in cluster: b.assert_browse_backup('qq', [], msg=b) + for b in cluster: self.assert_tx_clean(b) + + def test_tx_enq_notx_deq_qpid_send(self): + """Verify that a non-tx dequeue of a tx enqueue is replicated correctly""" + cluster = HaCluster(self, 2, test_store=True) + + self.popen( + ['qpid-send', '-a', 'qq;{create:always}', '-b', cluster[0].host_port(), '--tx=1', + '--content-string=foo'] + ).assert_exit_ok() + for b in cluster: b.assert_browse_backup('qq', ['foo'], msg=b) + self.check_enq_deq(cluster, 'qq', (1, 0, 1, 0)) + + self.popen(['qpid-receive', '-a', 'qq', '-b', cluster[0].host_port()]).assert_exit_ok() + self.check_enq_deq(cluster, 'qq', (1, 1, 1, 0)) + for b in cluster: b.assert_browse_backup('qq', [], msg=b) + for b in cluster: self.assert_tx_clean(b) def assert_tx_clean(self, b): """Verify that there are no transaction artifacts diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp index a744d07a12..3d9941baee 100644 --- a/qpid/cpp/src/tests/qpid-txtest2.cpp +++ b/qpid/cpp/src/tests/qpid-txtest2.cpp @@ -205,7 +205,7 @@ struct Transfer : public TransactionalClient, public Runnable } session.commit(); t++; - if (!opts.quiet && t % 10 == 0) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl; + if (!opts.quiet) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl; } catch (const TransactionAborted&) { std::cout << "Transaction " << (t+1) << " of " << opts.txCount << " was aborted and will be retried" << std::endl; session = connection.createTransactionalSession(); @@ -246,6 +246,16 @@ struct Controller : public Client for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { std::string address = *i + (opts.durable ? CREATE_DURABLE : CREATE_NON_DURABLE); + + // Clear out any garbage on queues. + Receiver receiver = session.createReceiver(address); + Message rmsg; + uint count(0); + while (receiver.fetch(rmsg, Duration::IMMEDIATE)) ++count; + session.acknowledge(); + receiver.close(); + if (!opts.quiet) std::cout << "Cleaned up " << count << " messages from " << *i << std::endl; + Sender sender = session.createSender(address); if (i == queues.begin()) { for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) { |
