diff options
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 240 |
1 files changed, 214 insertions, 26 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 293712fe80..6941a2b545 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -20,7 +20,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest import traceback -from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty +from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty, ServerError from qpid.datatypes import uuid4, UUID from brokertest import * from ha_test import * @@ -37,6 +37,7 @@ def grep(filename, regexp): class HaBrokerTest(BrokerTest): """Base class for HA broker tests""" + def assert_log_no_errors(self, broker): log = broker.get_log() if grep(log, re.compile("] error|] critical")): @@ -219,7 +220,8 @@ class ReplicationTests(HaBrokerTest): backup.connect_admin().close() # Test discovery: should connect to primary after reject by backup - c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) + c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], + reconnect=True) s = c.session() sender = s.sender("q;{create:always}") backup.wait_backup("q") @@ -561,9 +563,9 @@ class ReplicationTests(HaBrokerTest): return acl=os.path.join(os.getcwd(), "policy.acl") aclf=file(acl,"w") - # Verify that replication works with auth=yes and HA user has at least the following - # privileges: + # Minimum set of privileges required for the HA user. aclf.write(""" +# HA user acl allow zag@QPID access queue acl allow zag@QPID create queue acl allow zag@QPID consume queue @@ -575,6 +577,9 @@ acl allow zag@QPID publish exchange acl allow zag@QPID delete exchange acl allow zag@QPID access method acl allow zag@QPID create link +# Normal user +acl allow zig@QPID all all + acl deny all all """) aclf.close() @@ -585,14 +590,16 @@ acl deny all all "--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN" ], client_credentials=Credentials("zag", "zag", "PLAIN")) - s0 = cluster[0].connect(username="zag", password="zag").session(); - s0.receiver("q;{create:always}") - s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}") - cluster[1].wait_backup("q") - cluster[1].wait_backup("ex") - s1 = cluster[1].connect_admin().session(); # Uses Credentials above. - s1.sender("ex").send("foo"); - self.assertEqual(s1.receiver("q").fetch().content, "foo") + c = cluster[0].connect(username="zig", password="zig") + s0 = c.session(); + s0.sender("q;{create:always}") + s0.sender("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}") + s0.sender("ex").send("foo"); + s1 = c.session(transactional=True) + s1.sender("ex").send("foo-tx"); + cluster[1].assert_browse_backup("q", ["foo"]) + s1.commit() + cluster[1].assert_browse_backup("q", ["foo", "foo-tx"]) def test_alternate_exchange(self): """Verify that alternate-exchange on exchanges and queues is propagated @@ -927,20 +934,22 @@ class LongTests(HaBrokerTest): if d: return float(d)*60 else: return 3 # Default is to be quick - # FIXME aconway 2013-06-27: skip this test pending a fix for - # https://issues.apache.org/jira/browse/QPID-4944 - def skip_test_failover_send_receive(self): + def test_failover_send_receive(self): """Test failover with continuous send-receive""" brokers = HaCluster(self, 3) # Start sender and receiver threads n = 10 - senders = [NumberedSender(brokers[0], url=brokers.url, - max_depth=1024, failover_updates=False, - queue="test%s"%(i)) for i in xrange(n)] - receivers = [NumberedReceiver(brokers[0], url=brokers.url, sender=senders[i], - failover_updates=False, - queue="test%s"%(i)) for i in xrange(n)] + senders = [ + NumberedSender( + brokers[0], url=brokers.url,max_depth=50, failover_updates=False, + queue="test%s"%(i), args=["--capacity=10"]) for i in xrange(n)] + + receivers = [ + NumberedReceiver( + brokers[0], url=brokers.url, sender=senders[i],failover_updates=False, + queue="test%s"%(i), args=["--capacity=10"]) for i in xrange(n)] + for r in receivers: r.start() for s in senders: s.start() @@ -991,7 +1000,7 @@ class LongTests(HaBrokerTest): finally: for s in senders: s.stop() for r in receivers: r.stop() - dead = filter(lambda i: not brokers[i].is_running(), xrange(3)) + dead = filter(lambda b: not b.is_running(), brokers) if dead: raise Exception("Brokers not running: %s"%dead) def test_qmf_order(self): @@ -1200,7 +1209,7 @@ class ConfigurationTests(HaBrokerTest): cluster[0].set_brokers_url(cluster.url+",xxx:1234") self.assertRaises(Empty, r.fetch, 0) # Not updated for brokers URL -class StoreTests(BrokerTest): +class StoreTests(HaBrokerTest): """Test for HA with persistence.""" def check_skip(self): @@ -1248,7 +1257,7 @@ class StoreTests(BrokerTest): doing catch-up from the primary.""" if self.check_skip(): return cluster = HaCluster(self, 2) - sn = cluster[0].connect(heartbeat=1).session() + sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session() s1 = sn.sender("q1;{create:always,node:{durable:true}}") for m in ["foo","bar"]: s1.send(Message(m, durable=True)) s2 = sn.sender("q2;{create:always,node:{durable:true}}") @@ -1259,7 +1268,7 @@ class StoreTests(BrokerTest): cluster[1].assert_browse_backup("q2", ["hello"]) # Make changes that the backup doesn't see cluster.kill(1, promote_next=False, final=False) - r1 = cluster[0].connect(heartbeat=1).session().receiver("q1") + r1 = cluster[0].connect(heartbeat=HaBroker.heartbeat).session().receiver("q1") for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m) r1.session.acknowledge() for m in ["x","y","z"]: s1.send(Message(m, durable=True)) @@ -1278,7 +1287,7 @@ class StoreTests(BrokerTest): cluster[0].assert_browse("q1", ["x","y","z"]) cluster[1].assert_browse_backup("q1", ["x","y","z"]) - sn = cluster[0].connect(heartbeat=1).session() # FIXME aconway 2012-09-25: should fail over! + sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session() sn.sender("ex/k1").send("boo") cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"]) cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"]) @@ -1287,6 +1296,185 @@ class StoreTests(BrokerTest): cluster[0].assert_browse("q2", ["hello", "end"]) cluster[1].assert_browse_backup("q2", ["hello", "end"]) +def open_read(name): + try: + f = open(name) + return f.read() + finally: f.close() + +class TransactionTests(HaBrokerTest): + + load_store=["--load-module", BrokerTest.test_store_lib] + + def tx_simple_setup(self, broker): + """Start a transaction, remove messages from queue a, add messages to queue b""" + c = broker.connect() + # Send messages to a, no transaction. + sa = c.session().sender("a;{create:always,node:{durable:true}}") + tx_msgs = ["x","y","z"] + for m in tx_msgs: sa.send(Message(content=m, durable=True)) + + # Receive messages from a, in transaction. + tx = c.session(transactional=True) + txr = tx.receiver("a") + tx_msgs2 = [txr.fetch(1).content for i in xrange(3)] + self.assertEqual(tx_msgs, tx_msgs2) + + # Send messages to b, transactional, mixed with non-transactional. + sb = c.session().sender("b;{create:always,node:{durable:true}}") + txs = tx.sender("b") + msgs = [str(i) for i in xrange(3)] + for tx_m,m in zip(tx_msgs2, msgs): + txs.send(tx_m); + sb.send(m) + return tx + + def tx_subscriptions(self, broker): + """Return list of queue names for tx subscriptions""" + return [q for q in broker.agent().repsub_queues() + if q.startswith("qpid.ha-tx")] + + def test_tx_simple_commit(self): + cluster = HaCluster(self, 2, test_store=True) + tx = self.tx_simple_setup(cluster[0]) + tx.sync() + tx_queues = cluster[0].agent().tx_queues() + + # NOTE: backup does not process transactional dequeues until prepare + cluster[1].assert_browse_backup("a", ["x","y","z"]) + cluster[1].assert_browse_backup("b", ['0', '1', '2']) + + tx.acknowledge() + tx.commit() + tx.sync() + + for b in cluster: self.assert_simple_commit_outcome(b, tx_queues) + + def assert_tx_cleanup(self, b, tx_queues): + """Verify that there are no transaction artifacts + (exchanges, queues, subscriptions) on b.""" + + self.assertEqual(0, len(b.agent().tx_queues()), msg=b) + self.assertEqual(0, len(self.tx_subscriptions(b)), msg=b) + + # TX exchanges don't show up in management so test for existence by name. + s = b.connect_admin().session() + try: + for q in tx_queues: + try: + s.sender("%s;{node:{type:topic}}"%q) + self.fail("Found tx exchange %s on %s "%(q,b)) + except NotFound: pass + finally: s.connection.close() + + def assert_simple_commit_outcome(self, b, tx_queues): + b.assert_browse_backup("a", [], msg=b) + b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b) + # Check for expected actions on the store + expect = """<enqueue a x> +<enqueue a y> +<enqueue a z> +<begin tx 1> +<dequeue a x tx=1> +<dequeue a y tx=1> +<dequeue a z tx=1> +<commit tx=1> +""" + self.assertEqual(expect, open_read(b.store_log), msg=b) + self.assert_tx_cleanup(b, tx_queues) + + def test_tx_simple_rollback(self): + cluster = HaCluster(self, 2, test_store=True) + tx = self.tx_simple_setup(cluster[0]) + tx.sync() + tx_queues = cluster[0].agent().tx_queues() + tx.acknowledge() + tx.rollback() + for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + + def assert_simple_rollback_outcome(self, b, tx_queues): + b.assert_browse_backup("a", ["x","y","z"], msg=b) + b.assert_browse_backup("b", ['0', '1', '2'], msg=b) + # Check for expected actions on the store + expect = """<enqueue a x> +<enqueue a y> +<enqueue a z> +""" + self.assertEqual(open_read(b.store_log), expect, msg=b) + self.assert_tx_cleanup(b, tx_queues) + + def test_tx_simple_failover(self): + cluster = HaCluster(self, 3, test_store=True) + tx = self.tx_simple_setup(cluster[0]) + tx.sync() + tx_queues = cluster[0].agent().tx_queues() + tx.acknowledge() + cluster.bounce(0) # Should cause roll-back + cluster[0].wait_status("ready") # Restarted. + cluster[1].wait_status("active") # Promoted. + cluster[2].wait_status("ready") # Failed over. + for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + + def test_tx_no_backups(self): + """Test the special case of a TX where there are no backups""" + + # Test commit + cluster = HaCluster(self, 1, test_store=True) + tx = self.tx_simple_setup(cluster[0]) + tx.acknowledge() + tx.commit() + tx.sync() + tx_queues = cluster[0].agent().tx_queues() + self.assert_simple_commit_outcome(cluster[0], tx_queues) + + # Test rollback + cluster = HaCluster(self, 1, test_store=True) + tx = self.tx_simple_setup(cluster[0]) + tx.sync() + tx_queues = cluster[0].agent().tx_queues() + tx.acknowledge() + tx.rollback() + tx.sync() + self.assert_simple_rollback_outcome(cluster[0], tx_queues) + + + def test_tx_backup_fail(self): + cluster = HaCluster( + self, 2, test_store=True, s_args=[[],["--test-store-throw=bang"]]) + c = cluster[0].connect() + tx = c.session(transactional=True) + s = tx.sender("q;{create:always,node:{durable:true}}") + for m in ["foo","bang","bar"]: s.send(Message(m, durable=True)) + self.assertRaises(ServerError, tx.commit) + for b in cluster: b.assert_browse_backup("q", []) + self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n") + self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n") + + def test_tx_join_leave(self): + """Test cluster members joining/leaving cluster. + Also check that tx-queues are cleaned up at end of transaction.""" + + cluster = HaCluster(self, 3) + + # Leaving + tx = cluster[0].connect().session(transactional=True) + s = tx.sender("q;{create:always}") + s.send("a", sync=True) + self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster]) + cluster[1].kill(final=False) + s.send("b") + self.assertRaises(ServerError, tx.commit) + self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]]) + + # Joining + tx = cluster[0].connect().session(transactional=True) + s = tx.sender("q;{create:always}") + s.send("foo") + cluster.restart(1) + tx.commit() + # The new member is not in the tx but receives the results normal replication. + for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b) + if __name__ == "__main__": outdir = "ha_tests.tmp" shutil.rmtree(outdir, True) |
