summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py240
1 files changed, 214 insertions, 26 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 293712fe80..6941a2b545 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -20,7 +20,7 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
import traceback
-from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
+from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty, ServerError
from qpid.datatypes import uuid4, UUID
from brokertest import *
from ha_test import *
@@ -37,6 +37,7 @@ def grep(filename, regexp):
class HaBrokerTest(BrokerTest):
"""Base class for HA broker tests"""
+
def assert_log_no_errors(self, broker):
log = broker.get_log()
if grep(log, re.compile("] error|] critical")):
@@ -219,7 +220,8 @@ class ReplicationTests(HaBrokerTest):
backup.connect_admin().close()
# Test discovery: should connect to primary after reject by backup
- c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
+ c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()],
+ reconnect=True)
s = c.session()
sender = s.sender("q;{create:always}")
backup.wait_backup("q")
@@ -561,9 +563,9 @@ class ReplicationTests(HaBrokerTest):
return
acl=os.path.join(os.getcwd(), "policy.acl")
aclf=file(acl,"w")
- # Verify that replication works with auth=yes and HA user has at least the following
- # privileges:
+ # Minimum set of privileges required for the HA user.
aclf.write("""
+# HA user
acl allow zag@QPID access queue
acl allow zag@QPID create queue
acl allow zag@QPID consume queue
@@ -575,6 +577,9 @@ acl allow zag@QPID publish exchange
acl allow zag@QPID delete exchange
acl allow zag@QPID access method
acl allow zag@QPID create link
+# Normal user
+acl allow zig@QPID all all
+
acl deny all all
""")
aclf.close()
@@ -585,14 +590,16 @@ acl deny all all
"--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN"
],
client_credentials=Credentials("zag", "zag", "PLAIN"))
- s0 = cluster[0].connect(username="zag", password="zag").session();
- s0.receiver("q;{create:always}")
- s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
- cluster[1].wait_backup("q")
- cluster[1].wait_backup("ex")
- s1 = cluster[1].connect_admin().session(); # Uses Credentials above.
- s1.sender("ex").send("foo");
- self.assertEqual(s1.receiver("q").fetch().content, "foo")
+ c = cluster[0].connect(username="zig", password="zig")
+ s0 = c.session();
+ s0.sender("q;{create:always}")
+ s0.sender("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
+ s0.sender("ex").send("foo");
+ s1 = c.session(transactional=True)
+ s1.sender("ex").send("foo-tx");
+ cluster[1].assert_browse_backup("q", ["foo"])
+ s1.commit()
+ cluster[1].assert_browse_backup("q", ["foo", "foo-tx"])
def test_alternate_exchange(self):
"""Verify that alternate-exchange on exchanges and queues is propagated
@@ -927,20 +934,22 @@ class LongTests(HaBrokerTest):
if d: return float(d)*60
else: return 3 # Default is to be quick
- # FIXME aconway 2013-06-27: skip this test pending a fix for
- # https://issues.apache.org/jira/browse/QPID-4944
- def skip_test_failover_send_receive(self):
+ def test_failover_send_receive(self):
"""Test failover with continuous send-receive"""
brokers = HaCluster(self, 3)
# Start sender and receiver threads
n = 10
- senders = [NumberedSender(brokers[0], url=brokers.url,
- max_depth=1024, failover_updates=False,
- queue="test%s"%(i)) for i in xrange(n)]
- receivers = [NumberedReceiver(brokers[0], url=brokers.url, sender=senders[i],
- failover_updates=False,
- queue="test%s"%(i)) for i in xrange(n)]
+ senders = [
+ NumberedSender(
+ brokers[0], url=brokers.url,max_depth=50, failover_updates=False,
+ queue="test%s"%(i), args=["--capacity=10"]) for i in xrange(n)]
+
+ receivers = [
+ NumberedReceiver(
+ brokers[0], url=brokers.url, sender=senders[i],failover_updates=False,
+ queue="test%s"%(i), args=["--capacity=10"]) for i in xrange(n)]
+
for r in receivers: r.start()
for s in senders: s.start()
@@ -991,7 +1000,7 @@ class LongTests(HaBrokerTest):
finally:
for s in senders: s.stop()
for r in receivers: r.stop()
- dead = filter(lambda i: not brokers[i].is_running(), xrange(3))
+ dead = filter(lambda b: not b.is_running(), brokers)
if dead: raise Exception("Brokers not running: %s"%dead)
def test_qmf_order(self):
@@ -1200,7 +1209,7 @@ class ConfigurationTests(HaBrokerTest):
cluster[0].set_brokers_url(cluster.url+",xxx:1234")
self.assertRaises(Empty, r.fetch, 0) # Not updated for brokers URL
-class StoreTests(BrokerTest):
+class StoreTests(HaBrokerTest):
"""Test for HA with persistence."""
def check_skip(self):
@@ -1248,7 +1257,7 @@ class StoreTests(BrokerTest):
doing catch-up from the primary."""
if self.check_skip(): return
cluster = HaCluster(self, 2)
- sn = cluster[0].connect(heartbeat=1).session()
+ sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session()
s1 = sn.sender("q1;{create:always,node:{durable:true}}")
for m in ["foo","bar"]: s1.send(Message(m, durable=True))
s2 = sn.sender("q2;{create:always,node:{durable:true}}")
@@ -1259,7 +1268,7 @@ class StoreTests(BrokerTest):
cluster[1].assert_browse_backup("q2", ["hello"])
# Make changes that the backup doesn't see
cluster.kill(1, promote_next=False, final=False)
- r1 = cluster[0].connect(heartbeat=1).session().receiver("q1")
+ r1 = cluster[0].connect(heartbeat=HaBroker.heartbeat).session().receiver("q1")
for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m)
r1.session.acknowledge()
for m in ["x","y","z"]: s1.send(Message(m, durable=True))
@@ -1278,7 +1287,7 @@ class StoreTests(BrokerTest):
cluster[0].assert_browse("q1", ["x","y","z"])
cluster[1].assert_browse_backup("q1", ["x","y","z"])
- sn = cluster[0].connect(heartbeat=1).session() # FIXME aconway 2012-09-25: should fail over!
+ sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session()
sn.sender("ex/k1").send("boo")
cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"])
cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"])
@@ -1287,6 +1296,185 @@ class StoreTests(BrokerTest):
cluster[0].assert_browse("q2", ["hello", "end"])
cluster[1].assert_browse_backup("q2", ["hello", "end"])
+def open_read(name):
+ try:
+ f = open(name)
+ return f.read()
+ finally: f.close()
+
+class TransactionTests(HaBrokerTest):
+
+ load_store=["--load-module", BrokerTest.test_store_lib]
+
+ def tx_simple_setup(self, broker):
+ """Start a transaction, remove messages from queue a, add messages to queue b"""
+ c = broker.connect()
+ # Send messages to a, no transaction.
+ sa = c.session().sender("a;{create:always,node:{durable:true}}")
+ tx_msgs = ["x","y","z"]
+ for m in tx_msgs: sa.send(Message(content=m, durable=True))
+
+ # Receive messages from a, in transaction.
+ tx = c.session(transactional=True)
+ txr = tx.receiver("a")
+ tx_msgs2 = [txr.fetch(1).content for i in xrange(3)]
+ self.assertEqual(tx_msgs, tx_msgs2)
+
+ # Send messages to b, transactional, mixed with non-transactional.
+ sb = c.session().sender("b;{create:always,node:{durable:true}}")
+ txs = tx.sender("b")
+ msgs = [str(i) for i in xrange(3)]
+ for tx_m,m in zip(tx_msgs2, msgs):
+ txs.send(tx_m);
+ sb.send(m)
+ return tx
+
+ def tx_subscriptions(self, broker):
+ """Return list of queue names for tx subscriptions"""
+ return [q for q in broker.agent().repsub_queues()
+ if q.startswith("qpid.ha-tx")]
+
+ def test_tx_simple_commit(self):
+ cluster = HaCluster(self, 2, test_store=True)
+ tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
+ tx_queues = cluster[0].agent().tx_queues()
+
+ # NOTE: backup does not process transactional dequeues until prepare
+ cluster[1].assert_browse_backup("a", ["x","y","z"])
+ cluster[1].assert_browse_backup("b", ['0', '1', '2'])
+
+ tx.acknowledge()
+ tx.commit()
+ tx.sync()
+
+ for b in cluster: self.assert_simple_commit_outcome(b, tx_queues)
+
+ def assert_tx_cleanup(self, b, tx_queues):
+ """Verify that there are no transaction artifacts
+ (exchanges, queues, subscriptions) on b."""
+
+ self.assertEqual(0, len(b.agent().tx_queues()), msg=b)
+ self.assertEqual(0, len(self.tx_subscriptions(b)), msg=b)
+
+ # TX exchanges don't show up in management so test for existence by name.
+ s = b.connect_admin().session()
+ try:
+ for q in tx_queues:
+ try:
+ s.sender("%s;{node:{type:topic}}"%q)
+ self.fail("Found tx exchange %s on %s "%(q,b))
+ except NotFound: pass
+ finally: s.connection.close()
+
+ def assert_simple_commit_outcome(self, b, tx_queues):
+ b.assert_browse_backup("a", [], msg=b)
+ b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
+ # Check for expected actions on the store
+ expect = """<enqueue a x>
+<enqueue a y>
+<enqueue a z>
+<begin tx 1>
+<dequeue a x tx=1>
+<dequeue a y tx=1>
+<dequeue a z tx=1>
+<commit tx=1>
+"""
+ self.assertEqual(expect, open_read(b.store_log), msg=b)
+ self.assert_tx_cleanup(b, tx_queues)
+
+ def test_tx_simple_rollback(self):
+ cluster = HaCluster(self, 2, test_store=True)
+ tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
+ tx_queues = cluster[0].agent().tx_queues()
+ tx.acknowledge()
+ tx.rollback()
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+
+ def assert_simple_rollback_outcome(self, b, tx_queues):
+ b.assert_browse_backup("a", ["x","y","z"], msg=b)
+ b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
+ # Check for expected actions on the store
+ expect = """<enqueue a x>
+<enqueue a y>
+<enqueue a z>
+"""
+ self.assertEqual(open_read(b.store_log), expect, msg=b)
+ self.assert_tx_cleanup(b, tx_queues)
+
+ def test_tx_simple_failover(self):
+ cluster = HaCluster(self, 3, test_store=True)
+ tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
+ tx_queues = cluster[0].agent().tx_queues()
+ tx.acknowledge()
+ cluster.bounce(0) # Should cause roll-back
+ cluster[0].wait_status("ready") # Restarted.
+ cluster[1].wait_status("active") # Promoted.
+ cluster[2].wait_status("ready") # Failed over.
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+
+ def test_tx_no_backups(self):
+ """Test the special case of a TX where there are no backups"""
+
+ # Test commit
+ cluster = HaCluster(self, 1, test_store=True)
+ tx = self.tx_simple_setup(cluster[0])
+ tx.acknowledge()
+ tx.commit()
+ tx.sync()
+ tx_queues = cluster[0].agent().tx_queues()
+ self.assert_simple_commit_outcome(cluster[0], tx_queues)
+
+ # Test rollback
+ cluster = HaCluster(self, 1, test_store=True)
+ tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
+ tx_queues = cluster[0].agent().tx_queues()
+ tx.acknowledge()
+ tx.rollback()
+ tx.sync()
+ self.assert_simple_rollback_outcome(cluster[0], tx_queues)
+
+
+ def test_tx_backup_fail(self):
+ cluster = HaCluster(
+ self, 2, test_store=True, s_args=[[],["--test-store-throw=bang"]])
+ c = cluster[0].connect()
+ tx = c.session(transactional=True)
+ s = tx.sender("q;{create:always,node:{durable:true}}")
+ for m in ["foo","bang","bar"]: s.send(Message(m, durable=True))
+ self.assertRaises(ServerError, tx.commit)
+ for b in cluster: b.assert_browse_backup("q", [])
+ self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n")
+ self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n")
+
+ def test_tx_join_leave(self):
+ """Test cluster members joining/leaving cluster.
+ Also check that tx-queues are cleaned up at end of transaction."""
+
+ cluster = HaCluster(self, 3)
+
+ # Leaving
+ tx = cluster[0].connect().session(transactional=True)
+ s = tx.sender("q;{create:always}")
+ s.send("a", sync=True)
+ self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster])
+ cluster[1].kill(final=False)
+ s.send("b")
+ self.assertRaises(ServerError, tx.commit)
+ self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]])
+
+ # Joining
+ tx = cluster[0].connect().session(transactional=True)
+ s = tx.sender("q;{create:always}")
+ s.send("foo")
+ cluster.restart(1)
+ tx.commit()
+ # The new member is not in the tx but receives the results normal replication.
+ for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
+
if __name__ == "__main__":
outdir = "ha_tests.tmp"
shutil.rmtree(outdir, True)