summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/brokertest.py17
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py57
-rw-r--r--qpid/cpp/src/tests/qpid-txtest2.cpp12
3 files changed, 76 insertions, 10 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 44824fe67e..461ef0de9a 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -428,17 +428,22 @@ class Broker(Popen):
assert not error.search(line) or ignore.search(line), "Errors in log file %s: %s"%(log, line)
finally: log.close()
+def receiver_iter(receiver, timeout=0):
+ """Make an iterator out of a receiver. Returns messages till Empty is raised."""
+ try:
+ while True:
+ yield receiver.fetch(timeout=timeout)
+ except qm.Empty:
+ pass
+
def browse(session, queue, timeout=0, transform=lambda m: m.content):
"""Return a list with the contents of each message on queue."""
r = session.receiver("%s;{mode:browse}"%(queue))
r.capacity = 100
try:
- contents = []
- try:
- while True: contents.append(transform(r.fetch(timeout=timeout)))
- except qm.Empty: pass
- finally: r.close()
- return contents
+ return [transform(m) for m in receiver_iter(r, timeout)]
+ finally:
+ r.close()
def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"):
"""Assert that the contents of messages on queue (as retrieved
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index f71560dffb..58b3ff2802 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1217,7 +1217,6 @@ class RecoveryTests(HaBrokerTest):
def test_stalled_backup(self):
"""Make sure that a stalled backup broker does not stall the primary"""
- # FIXME aconway 2014-04-15: merge with test_join_ready_cluster?
cluster = HaCluster(self, 3, args=["--link-heartbeat-interval=1"])
os.kill(cluster[1].pid, signal.SIGSTOP)
s = cluster[0].connect().session()
@@ -1272,7 +1271,7 @@ class StoreTests(HaBrokerTest):
"""Verify that a backup erases queue data from store recovery before
doing catch-up from the primary."""
if self.check_skip(): return
- cluster = HaCluster(self, 2)
+ cluster = HaCluster(self, 2, args=['--log-enable=trace+:ha', '--log-enable=trace+:Store'])
sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session()
s1 = sn.sender("q1;{create:always,node:{durable:true}}")
for m in ["foo","bar"]: s1.send(qm.Message(m, durable=True))
@@ -1362,9 +1361,61 @@ class TransactionTests(HaBrokerTest):
tx.acknowledge()
tx.commit()
tx.sync()
+ tx.close()
+
+ for b in cluster:
+ self.assert_simple_commit_outcome(b, tx_queues)
+
+ # Verify non-tx dequeue is replicated correctly
+ c = cluster.connect(0, protocol=self.tx_protocol)
+ r = c.session().receiver("b")
+ ri = receiver_iter(r, timeout=1)
+ self.assertEqual(['0', '1', '2', 'x', 'y', 'z'], [m.content for m in ri])
+ r.session.acknowledge()
+ for b in cluster: b.assert_browse_backup("b", [], msg=b)
+
+ def check_enq_deq(self, cluster, queue, expect):
+ for b in cluster:
+ q = b.agent.getQueue(queue)
+ self.assertEqual(
+ (b.name,)+expect,
+ (b.name, q.msgTotalEnqueues, q.msgTotalDequeues, q.msgTxnEnqueues, q.msgTxnDequeues))
+
+ def test_tx_enq_notx_deq(self):
+ """Verify that a non-tx dequeue of a tx enqueue is replicated correctly"""
+ cluster = HaCluster(self, 2, test_store=True)
+ c = cluster.connect(0, protocol=self.tx_protocol)
+ tx = c.session(transactional=True)
+ c.session().sender("qq;{create:always}").send("m1")
+ tx.sender("qq;{create:always}").send("tx")
+ tx.commit()
tx.close()
- for b in cluster: self.assert_simple_commit_outcome(b, tx_queues)
+ c.session().sender("qq;{create:always}").send("m2")
+ self.check_enq_deq(cluster, 'qq', (3, 0, 1, 0))
+
+ notx = c.session()
+ self.assertEqual(['m1', 'tx', 'm2'], [m.content for m in receiver_iter(notx.receiver('qq'))])
+ notx.acknowledge()
+ self.check_enq_deq(cluster, 'qq', (3, 3, 1, 0))
+ for b in cluster: b.assert_browse_backup('qq', [], msg=b)
+ for b in cluster: self.assert_tx_clean(b)
+
+ def test_tx_enq_notx_deq_qpid_send(self):
+ """Verify that a non-tx dequeue of a tx enqueue is replicated correctly"""
+ cluster = HaCluster(self, 2, test_store=True)
+
+ self.popen(
+ ['qpid-send', '-a', 'qq;{create:always}', '-b', cluster[0].host_port(), '--tx=1',
+ '--content-string=foo']
+ ).assert_exit_ok()
+ for b in cluster: b.assert_browse_backup('qq', ['foo'], msg=b)
+ self.check_enq_deq(cluster, 'qq', (1, 0, 1, 0))
+
+ self.popen(['qpid-receive', '-a', 'qq', '-b', cluster[0].host_port()]).assert_exit_ok()
+ self.check_enq_deq(cluster, 'qq', (1, 1, 1, 0))
+ for b in cluster: b.assert_browse_backup('qq', [], msg=b)
+ for b in cluster: self.assert_tx_clean(b)
def assert_tx_clean(self, b):
"""Verify that there are no transaction artifacts
diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp
index a744d07a12..3d9941baee 100644
--- a/qpid/cpp/src/tests/qpid-txtest2.cpp
+++ b/qpid/cpp/src/tests/qpid-txtest2.cpp
@@ -205,7 +205,7 @@ struct Transfer : public TransactionalClient, public Runnable
}
session.commit();
t++;
- if (!opts.quiet && t % 10 == 0) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl;
+ if (!opts.quiet) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl;
} catch (const TransactionAborted&) {
std::cout << "Transaction " << (t+1) << " of " << opts.txCount << " was aborted and will be retried" << std::endl;
session = connection.createTransactionalSession();
@@ -246,6 +246,16 @@ struct Controller : public Client
for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {
std::string address = *i + (opts.durable ? CREATE_DURABLE : CREATE_NON_DURABLE);
+
+ // Clear out any garbage on queues.
+ Receiver receiver = session.createReceiver(address);
+ Message rmsg;
+ uint count(0);
+ while (receiver.fetch(rmsg, Duration::IMMEDIATE)) ++count;
+ session.acknowledge();
+ receiver.close();
+ if (!opts.quiet) std::cout << "Cleaned up " << count << " messages from " << *i << std::endl;
+
Sender sender = session.createSender(address);
if (i == queues.begin()) {
for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) {