summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_tests.py
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2011-10-21 14:42:12 +0000
committerStephen D. Huston <shuston@apache.org>2011-10-21 14:42:12 +0000
commitf83677056891e436bf5ba99e79240df2a44528cd (patch)
tree625bfd644b948e89105630759cf6decb0435354d /cpp/src/tests/cluster_tests.py
parentebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff)
downloadqpid-python-QPID-2519.tar.gz
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/cluster_tests.py')
-rwxr-xr-xcpp/src/tests/cluster_tests.py1033
1 files changed, 987 insertions, 46 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index cbad4010b4..0e80e06d34 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -18,12 +18,13 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging
+import cluster_test_logs
from qpid import datatypes, messaging
from brokertest import *
from qpid.harness import Skipped
-from qpid.messaging import Message, Empty
-from threading import Thread, Lock
+from qpid.messaging import Message, Empty, Disposition, REJECTED, util
+from threading import Thread, Lock, Condition
from logging import getLogger
from itertools import chain
from tempfile import NamedTemporaryFile
@@ -96,9 +97,15 @@ class ShortTests(BrokerTest):
destination="amq.direct",
message=qpid.datatypes.Message(props, "content"))
+ # Try message with TTL and differnet headers/properties
+ cluster[0].send_message("q", Message(durable=True, ttl=100000))
+ cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000))
+ cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000))
+
# Now update a new member and compare their dumps.
cluster.start(args=["--test-store-dump", "updatee.dump"])
assert readfile("direct.dump") == readfile("updatee.dump")
+
os.remove("direct.dump")
os.remove("updatee.dump")
@@ -108,19 +115,22 @@ class ShortTests(BrokerTest):
acl=os.path.join(os.getcwd(), "policy.acl")
aclf=file(acl,"w")
aclf.write("""
-acl deny zag@QPID create queue
-acl allow all all
+acl allow zig@QPID all all
+acl deny all all
""")
aclf.close()
- cluster = self.cluster(2, args=["--auth", "yes",
+ cluster = self.cluster(1, args=["--auth", "yes",
"--sasl-config", sasl_config,
"--load-module", os.getenv("ACL_LIB"),
"--acl-file", acl])
# Valid user/password, ensure queue is created.
c = cluster[0].connect(username="zig", password="zig")
- c.session().sender("ziggy;{create:always}")
+ c.session().sender("ziggy;{create:always,node:{x-declare:{exclusive:true}}}")
c.close()
+ cluster.start() # Start second node.
+
+ # Check queue is created on second node.
c = cluster[1].connect(username="zig", password="zig")
c.session().receiver("ziggy;{assert:always}")
c.close()
@@ -149,7 +159,7 @@ acl allow all all
self.fail("Expected exception")
except messaging.exceptions.UnauthorizedAccess: pass
# make sure the queue was not created at the other node.
- c = cluster[0].connect(username="zag", password="zag")
+ c = cluster[1].connect(username="zig", password="zig")
try:
s = c.session()
s.sender("zaggy;{assert:always}")
@@ -157,6 +167,35 @@ acl allow all all
self.fail("Expected exception")
except messaging.exceptions.NotFound: pass
+ def test_sasl_join(self):
+ """Verify SASL authentication between brokers when joining a cluster."""
+ sasl_config=os.path.join(self.rootdir, "sasl_config")
+ # Test with a valid username/password
+ cluster = self.cluster(1, args=["--auth", "yes",
+ "--sasl-config", sasl_config,
+ "--load-module", os.getenv("ACL_LIB"),
+ "--cluster-username=zig",
+ "--cluster-password=zig",
+ "--cluster-mechanism=PLAIN"
+ ])
+ cluster.start()
+ cluster.ready()
+ c = cluster[1].connect(username="zag", password="zag")
+
+ # Test with an invalid username/password
+ cluster = self.cluster(1, args=["--auth", "yes",
+ "--sasl-config", sasl_config,
+ "--load-module", os.getenv("ACL_LIB"),
+ "--cluster-username=x",
+ "--cluster-password=y",
+ "--cluster-mechanism=PLAIN"
+ ])
+ try:
+ cluster.start(expect=EXPECT_EXIT_OK)
+ cluster[1].ready()
+ self.fail("Expected exception")
+ except: pass
+
def test_user_id_update(self):
"""Ensure that user-id of an open session is updated to new cluster members"""
sasl_config=os.path.join(self.rootdir, "sasl_config")
@@ -246,25 +285,6 @@ acl allow all all
session1 = cluster[1].connect().session()
for q in queues: self.assert_browse(session1, "q1", ["foo"])
- def test_dr_no_message(self):
- """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=655141
- Joining broker crashes with 'error deliveryRecord no update message'
- """
-
- cluster = self.cluster(1)
- session0 = cluster[0].connect().session()
- s = session0.sender("q1;{create:always}")
- s.send(Message("a", ttl=0.05), sync=False)
- s.send(Message("b", ttl=0.05), sync=False)
- r1 = session0.receiver("q1")
- self.assertEqual("a", r1.fetch(timeout=0).content)
- r2 = session0.receiver("q1;{mode:browse}")
- self.assertEqual("b", r2.fetch(timeout=0).content)
- # Leave messages un-acknowledged, let the expire, then start new broker.
- time.sleep(.1)
- cluster.start()
- self.assertRaises(Empty, cluster[1].connect().session().receiver("q1").fetch,0)
-
def test_route_update(self):
"""Regression test for https://issues.apache.org/jira/browse/QPID-2982
Links and bridges associated with routes were not replicated on update.
@@ -272,6 +292,7 @@ acl allow all all
client was attached.
"""
args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
+ # First broker will be killed.
cluster0 = self.cluster(1, args=args)
cluster1 = self.cluster(1, args=args)
assert 0 == subprocess.call(
@@ -301,9 +322,695 @@ acl allow all all
qpid_tool.wait()
scanner.join()
assert scanner.found
+ # Regression test for https://issues.apache.org/jira/browse/QPID-3235
+ # Inconsistent stats when changing elder.
+
+ # Force a change of elder
+ cluster0.start()
+ cluster0[0].expect=EXPECT_EXIT_FAIL # About to die.
+ cluster0[0].kill()
+ time.sleep(2) # Allow a management interval to pass.
# Verify logs are consistent
cluster_test_logs.verify_logs()
+ def test_redelivered(self):
+ """Verify that redelivered flag is set correctly on replayed messages"""
+ cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
+ url = "amqp:tcp:%s,tcp:%s" % (cluster[0].host_port(), cluster[1].host_port())
+ queue = "my-queue"
+ cluster[0].declare_queue(queue)
+ self.sender = self.popen(
+ ["qpid-send",
+ "--broker", url,
+ "--address", queue,
+ "--sequence=true",
+ "--send-eos=1",
+ "--messages=100000",
+ "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS)
+ ])
+ self.receiver = self.popen(
+ ["qpid-receive",
+ "--broker", url,
+ "--address", queue,
+ "--ignore-duplicates",
+ "--check-redelivered",
+ "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
+ "--forever"
+ ])
+ time.sleep(1)#give sender enough time to have some messages to replay
+ cluster[0].kill()
+ self.sender.wait()
+ self.receiver.wait()
+ cluster[1].kill()
+
+ class BlockedSend(Thread):
+ """Send a message, send is expected to block.
+ Verify that it does block (for a given timeout), then allow
+ waiting till it unblocks when it is expected to do so."""
+ def __init__(self, sender, msg):
+ self.sender, self.msg = sender, msg
+ self.blocked = True
+ self.condition = Condition()
+ self.timeout = 0.1 # Time to wait for expected results.
+ Thread.__init__(self)
+ def run(self):
+ try:
+ self.sender.send(self.msg, sync=True)
+ self.condition.acquire()
+ try:
+ self.blocked = False
+ self.condition.notify()
+ finally: self.condition.release()
+ except Exception,e: print "BlockedSend exception: %s"%e
+ def start(self):
+ Thread.start(self)
+ time.sleep(self.timeout)
+ assert self.blocked # Expected to block
+ def assert_blocked(self): assert self.blocked
+ def wait(self): # Now expecting to unblock
+ self.condition.acquire()
+ try:
+ while self.blocked:
+ self.condition.wait(self.timeout)
+ if self.blocked: raise Exception("Timed out waiting for send to unblock")
+ finally: self.condition.release()
+ self.join()
+
+ def queue_flowlimit_test(self, brokers):
+ """Verify that the queue's flowlimit configuration and state are
+ correctly replicated.
+ The brokers argument allows this test to run on single broker,
+ cluster of 2 pre-startd brokers or cluster where second broker
+ starts after queue is in flow control.
+ """
+ # configure a queue with a specific flow limit on first broker
+ ssn0 = brokers.first().connect().session()
+ s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+ brokers.first().startQmf()
+ q1 = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ oid = q1.getObjectId()
+ self.assertEqual(q1.name, "flq")
+ self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert not q1.flowStopped
+ self.assertEqual(q1.flowStoppedCount, 0)
+
+ # fill the queue on one broker until flow control is active
+ for x in range(5): s0.send(Message(str(x)))
+ sender = ShortTests.BlockedSend(s0, Message(str(6)))
+ sender.start() # Tests that sender does block
+ # Verify the broker queue goes into a flowStopped state
+ deadline = time.time() + 1
+ while not q1.flowStopped and time.time() < deadline: q1.update()
+ assert q1.flowStopped
+ self.assertEqual(q1.flowStoppedCount, 1)
+ sender.assert_blocked() # Still blocked
+
+ # Now verify the both brokers in cluster have same configuration
+ brokers.second().startQmf()
+ qs = brokers.second().qmf_session.getObjects(_objectId=oid)
+ self.assertEqual(len(qs), 1)
+ q2 = qs[0]
+ self.assertEqual(q2.name, "flq")
+ self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert q2.flowStopped
+ self.assertEqual(q2.flowStoppedCount, 1)
+
+ # now drain the queue using a session to the other broker
+ ssn1 = brokers.second().connect().session()
+ r1 = ssn1.receiver("flq", capacity=6)
+ for x in range(4):
+ r1.fetch(timeout=0)
+ ssn1.acknowledge()
+ sender.wait() # Verify no longer blocked.
+
+ # and re-verify state of queue on both brokers
+ q1.update()
+ assert not q1.flowStopped
+ q2.update()
+ assert not q2.flowStopped
+
+ ssn0.connection.close()
+ ssn1.connection.close()
+ cluster_test_logs.verify_logs()
+
+ def test_queue_flowlimit(self):
+ """Test flow limits on a standalone broker"""
+ broker = self.broker()
+ class Brokers:
+ def first(self): return broker
+ def second(self): return broker
+ self.queue_flowlimit_test(Brokers())
+
+ def test_queue_flowlimit_cluster(self):
+ cluster = self.cluster(2)
+ class Brokers:
+ def first(self): return cluster[0]
+ def second(self): return cluster[1]
+ self.queue_flowlimit_test(Brokers())
+
+ def test_queue_flowlimit_cluster_join(self):
+ cluster = self.cluster(1)
+ class Brokers:
+ def first(self): return cluster[0]
+ def second(self):
+ if len(cluster) == 1: cluster.start()
+ return cluster[1]
+ self.queue_flowlimit_test(Brokers())
+
+ def test_queue_flowlimit_replicate(self):
+ """ Verify that a queue which is in flow control BUT has drained BELOW
+ the flow control 'stop' threshold, is correctly replicated when a new
+ broker is added to the cluster.
+ """
+
+ class AsyncSender(Thread):
+ """Send a fixed number of msgs from a sender in a separate thread
+ so it may block without blocking the test.
+ """
+ def __init__(self, broker, address, count=1, size=4):
+ Thread.__init__(self)
+ self.daemon = True
+ self.broker = broker
+ self.queue = address
+ self.count = count
+ self.size = size
+ self.done = False
+
+ def run(self):
+ self.sender = subprocess.Popen(["qpid-send",
+ "--capacity=1",
+ "--content-size=%s" % self.size,
+ "--messages=%s" % self.count,
+ "--failover-updates",
+ "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
+ "--address=%s" % self.queue,
+ "--broker=%s" % self.broker.host_port()])
+ self.sender.wait()
+ self.done = True
+
+ cluster = self.cluster(2)
+ # create a queue with rather draconian flow control settings
+ ssn0 = cluster[0].connect().session()
+ s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':100, 'qpid.flow_resume_count':20}}}}")
+
+ # fire off the sending thread to broker[0], and wait until the queue
+ # hits flow control on broker[1]
+ sender = AsyncSender(cluster[0], "flq", count=110);
+ sender.start();
+
+ cluster[1].startQmf()
+ q_obj = [q for q in cluster[1].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ deadline = time.time() + 10
+ while not q_obj.flowStopped and time.time() < deadline:
+ q_obj.update()
+ assert q_obj.flowStopped
+ assert not sender.done
+ assert q_obj.msgDepth < 110
+
+ # Now drain enough messages on broker[1] to drop below the flow stop
+ # threshold, but not relieve flow control...
+ receiver = subprocess.Popen(["qpid-receive",
+ "--messages=15",
+ "--timeout=1",
+ "--print-content=no",
+ "--failover-updates",
+ "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
+ "--ack-frequency=1",
+ "--address=flq",
+ "--broker=%s" % cluster[1].host_port()])
+ receiver.wait()
+ q_obj.update()
+ assert q_obj.flowStopped
+ assert not sender.done
+ current_depth = q_obj.msgDepth
+
+ # add a new broker to the cluster, and verify that the queue is in flow
+ # control on that broker
+ cluster.start()
+ cluster[2].startQmf()
+ q_obj = [q for q in cluster[2].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ assert q_obj.flowStopped
+ assert q_obj.msgDepth == current_depth
+
+ # now drain the queue on broker[2], and verify that the sender becomes
+ # unblocked
+ receiver = subprocess.Popen(["qpid-receive",
+ "--messages=95",
+ "--timeout=1",
+ "--print-content=no",
+ "--failover-updates",
+ "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
+ "--ack-frequency=1",
+ "--address=flq",
+ "--broker=%s" % cluster[2].host_port()])
+ receiver.wait()
+ q_obj.update()
+ assert not q_obj.flowStopped
+ self.assertEqual(q_obj.msgDepth, 0)
+
+ # verify that the sender has become unblocked
+ sender.join(timeout=5)
+ assert not sender.isAlive()
+ assert sender.done
+
+ def test_blocked_queue_delete(self):
+ """Verify that producers which are blocked on a queue due to flow
+ control are unblocked when that queue is deleted.
+ """
+
+ cluster = self.cluster(2)
+ cluster[0].startQmf()
+ cluster[1].startQmf()
+
+ # configure a queue with a specific flow limit on first broker
+ ssn0 = cluster[0].connect().session()
+ s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+ q1 = [q for q in cluster[0].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ oid = q1.getObjectId()
+ self.assertEqual(q1.name, "flq")
+ self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert not q1.flowStopped
+ self.assertEqual(q1.flowStoppedCount, 0)
+
+ # fill the queue on one broker until flow control is active
+ for x in range(5): s0.send(Message(str(x)))
+ sender = ShortTests.BlockedSend(s0, Message(str(6)))
+ sender.start() # Tests that sender does block
+ # Verify the broker queue goes into a flowStopped state
+ deadline = time.time() + 1
+ while not q1.flowStopped and time.time() < deadline: q1.update()
+ assert q1.flowStopped
+ self.assertEqual(q1.flowStoppedCount, 1)
+ sender.assert_blocked() # Still blocked
+
+ # Now verify the both brokers in cluster have same configuration
+ qs = cluster[1].qmf_session.getObjects(_objectId=oid)
+ self.assertEqual(len(qs), 1)
+ q2 = qs[0]
+ self.assertEqual(q2.name, "flq")
+ self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert q2.flowStopped
+ self.assertEqual(q2.flowStoppedCount, 1)
+
+ # now delete the blocked queue from other broker
+ ssn1 = cluster[1].connect().session()
+ self.evaluate_address(ssn1, "flq;{delete:always}")
+ sender.wait() # Verify no longer blocked.
+
+ ssn0.connection.close()
+ ssn1.connection.close()
+ cluster_test_logs.verify_logs()
+
+
+ def test_alternate_exchange_update(self):
+ """Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """
+ cluster = self.cluster(1)
+ s0 = cluster[0].connect().session()
+ # create alt queue bound to amq.fanout exchange, will be destination for alternate exchanges
+ self.evaluate_address(s0, "alt;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:alt}]}}")
+ # create direct exchange ex with alternate-exchange amq.fanout and no queues bound
+ self.evaluate_address(s0, "ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'amq.fanout'}}}")
+ # create queue q with alternate-exchange amq.fanout
+ self.evaluate_address(s0, "q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'amq.fanout'}}}")
+
+ def verify(broker):
+ s = broker.connect().session()
+ # Verify unmatched message goes to ex's alternate.
+ s.sender("ex").send("foo")
+ self.assertEqual("foo", s.receiver("alt").fetch(timeout=0).content)
+ # Verify rejected message goes to q's alternate.
+ s.sender("q").send("bar")
+ msg = s.receiver("q").fetch(timeout=0)
+ self.assertEqual("bar", msg.content)
+ s.acknowledge(msg, Disposition(REJECTED)) # Reject the message
+ self.assertEqual("bar", s.receiver("alt").fetch(timeout=0).content)
+
+ verify(cluster[0])
+ cluster.start()
+ verify(cluster[1])
+
+ def test_binding_order(self):
+ """Regression test for binding order inconsistency in cluster"""
+ cluster = self.cluster(1)
+ c0 = cluster[0].connect()
+ s0 = c0.session()
+ # Declare multiple queues bound to same key on amq.topic
+ def declare(q,max=0):
+ if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d, "qpid.flow_stop_count":0}}'%max
+ else: declare = 'x-declare:{}'
+ bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q)
+ s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind))
+ declare('d',max=4) # Only one with a limit
+ for q in ['c', 'b','a']: declare(q)
+ # Add a cluster member, send enough messages to exceed the max count
+ cluster.start()
+ try:
+ s = s0.sender('amq.topic/key')
+ for m in xrange(1,6): s.send(Message(str(m)))
+ self.fail("Expected capacity exceeded exception")
+ except messaging.exceptions.TargetCapacityExceeded: pass
+ c1 = cluster[1].connect()
+ s1 = c1.session()
+ s0 = c0.session() # Old session s0 is broken by exception.
+ # Verify queue contents are consistent.
+ for q in ['a','b','c','d']:
+ self.assertEqual(self.browse(s0, q), self.browse(s1, q))
+ # Verify queue contents are "best effort"
+ for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)])
+ self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)])
+
+ def test_deleted_exchange(self):
+ """QPID-3215: cached exchange reference can cause cluster inconsistencies
+ if exchange is deleted/recreated
+ Verify stand-alone case
+ """
+ cluster = self.cluster()
+ # Verify we do not route message via an exchange that has been destroyed.
+ cluster.start()
+ s0 = cluster[0].connect().session()
+ self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}")
+ self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}")
+ send0 = s0.sender("ex/foo")
+ send0.send("foo")
+ self.assert_browse(s0, "q", ["foo"])
+ self.evaluate_address(s0, "ex;{delete:always}")
+ try:
+ send0.send("bar") # Should fail, exchange is deleted.
+ self.fail("Expected not-found exception")
+ except qpid.messaging.NotFound: pass
+ self.assert_browse(cluster[0].connect().session(), "q", ["foo"])
+
+ def test_deleted_exchange_inconsistent(self):
+ """QPID-3215: cached exchange reference can cause cluster inconsistencies
+ if exchange is deleted/recreated
+
+ Verify cluster inconsistency.
+ """
+ cluster = self.cluster()
+ cluster.start()
+ s0 = cluster[0].connect().session()
+ self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}")
+ self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}")
+ send0 = s0.sender("ex/foo")
+ send0.send("foo")
+ self.assert_browse(s0, "q", ["foo"])
+
+ cluster.start()
+ s1 = cluster[1].connect().session()
+ self.evaluate_address(s0, "ex;{delete:always}")
+ try:
+ send0.send("bar")
+ self.fail("Expected not-found exception")
+ except qpid.messaging.NotFound: pass
+
+ self.assert_browse(s1, "q", ["foo"])
+
+
+ def test_ttl_consistent(self):
+ """Ensure we don't get inconsistent errors with message that have TTL very close together"""
+ messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)]
+ messages.append(Message("x"))
+ cluster = self.cluster(2)
+ sender = cluster[0].connect().session().sender("q;{create:always}")
+
+ def fetch(b):
+ receiver = b.connect().session().receiver("q;{create:always}")
+ while receiver.fetch().content != "x": pass
+
+ for m in messages: sender.send(m, sync=False)
+ for m in messages: sender.send(m, sync=False)
+ fetch(cluster[0])
+ fetch(cluster[1])
+ for m in messages: sender.send(m, sync=False)
+ cluster.start()
+ fetch(cluster[2])
+
+# Some utility code for transaction tests
+XA_RBROLLBACK = 1
+XA_RBTIMEOUT = 2
+XA_OK = 0
+dtx_branch_counter = 0
+
+class DtxStatusException(Exception):
+ def __init__(self, expect, actual):
+ self.expect = expect
+ self.actual = actual
+
+ def str(self):
+ return "DtxStatusException(expect=%s, actual=%s)"%(self.expect, self.actual)
+
+class DtxTestFixture:
+ """Bundle together some common requirements for dtx tests."""
+ def __init__(self, test, broker, name, exclusive=False):
+ self.test = test
+ self.broker = broker
+ self.name = name
+ # Use old API. DTX is not supported in messaging API.
+ self.connection = broker.connect_old()
+ self.session = self.connection.session(name, 1) # 1 second timeout
+ self.queue = self.session.queue_declare(name, exclusive=exclusive)
+ self.session.dtx_select()
+ self.consumer = None
+
+ def xid(self, id=None):
+ if id is None: id = self.name
+ return self.session.xid(format=0, global_id=id)
+
+ def check_status(self, expect, actual):
+ if expect != actual: raise DtxStatusException(expect, actual)
+
+ def start(self, id=None, resume=False):
+ self.check_status(XA_OK, self.session.dtx_start(xid=self.xid(id), resume=resume).status)
+
+ def end(self, id=None, suspend=False):
+ self.check_status(XA_OK, self.session.dtx_end(xid=self.xid(id), suspend=suspend).status)
+
+ def prepare(self, id=None):
+ self.check_status(XA_OK, self.session.dtx_prepare(xid=self.xid(id)).status)
+
+ def commit(self, id=None, one_phase=True):
+ self.check_status(
+ XA_OK, self.session.dtx_commit(xid=self.xid(id), one_phase=one_phase).status)
+
+ def rollback(self, id=None):
+ self.check_status(XA_OK, self.session.dtx_rollback(xid=self.xid(id)).status)
+
+ def set_timeout(self, timeout, id=None):
+ self.session.dtx_set_timeout(xid=self.xid(id),timeout=timeout)
+
+ def send(self, messages):
+ for m in messages:
+ dp=self.session.delivery_properties(routing_key=self.name)
+ mp=self.session.message_properties()
+ self.session.message_transfer(message=qpid.datatypes.Message(dp, mp, m))
+
+ def accept(self):
+ """Accept 1 message from queue"""
+ consumer_tag="%s-consumer"%(self.name)
+ self.session.message_subscribe(queue=self.name, destination=consumer_tag)
+ self.session.message_flow(unit = self.session.credit_unit.message, value = 1, destination = consumer_tag)
+ self.session.message_flow(unit = self.session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
+ msg = self.session.incoming(consumer_tag).get(timeout=1)
+ self.session.message_cancel(destination=consumer_tag)
+ self.session.message_accept(qpid.datatypes.RangedSet(msg.id))
+ return msg
+
+
+ def verify(self, sessions, messages):
+ for s in sessions:
+ self.test.assert_browse(s, self.name, messages)
+
+class DtxTests(BrokerTest):
+
+ def test_dtx_update(self):
+ """Verify that DTX transaction state is updated to a new broker.
+ Start a collection of transactions, then add a new cluster member,
+ then verify they commit/rollback correctly on the new broker."""
+
+ # Note: multiple test have been bundled into one to avoid the need to start/stop
+ # multiple brokers per test.
+
+ cluster=self.cluster(1)
+ sessions = [cluster[0].connect().session()] # For verify
+
+ # Transaction that will be open when new member joins, then committed.
+ t1 = DtxTestFixture(self, cluster[0], "t1")
+ t1.start()
+ t1.send(["1", "2"])
+ t1.verify(sessions, []) # Not visible outside of transaction
+
+ # Transaction that will be open when new member joins, then rolled back.
+ t2 = DtxTestFixture(self, cluster[0], "t2")
+ t2.start()
+ t2.send(["1", "2"])
+
+ # Transaction that will be prepared when new member joins, then committed.
+ t3 = DtxTestFixture(self, cluster[0], "t3")
+ t3.start()
+ t3.send(["1", "2"])
+ t3.end()
+ t3.prepare()
+ t1.verify(sessions, []) # Not visible outside of transaction
+
+ # Transaction that will be prepared when new member joins, then rolled back.
+ t4 = DtxTestFixture(self, cluster[0], "t4")
+ t4.start()
+ t4.send(["1", "2"])
+ t4.end()
+ t4.prepare()
+
+ # Transaction using an exclusive queue
+ t5 = DtxTestFixture(self, cluster[0], "t5", exclusive=True)
+ t5.start()
+ t5.send(["1", "2"])
+
+ # Accept messages in a transaction before/after join then commit
+ t6 = DtxTestFixture(self, cluster[0], "t6")
+ t6.send(["a","b","c"])
+ t6.start()
+ self.assertEqual(t6.accept().body, "a");
+
+ # Accept messages in a transaction before/after join then roll back
+ t7 = DtxTestFixture(self, cluster[0], "t7")
+ t7.send(["a","b","c"])
+ t7.start()
+ self.assertEqual(t7.accept().body, "a");
+
+ # Ended, suspended transactions across join.
+ t8 = DtxTestFixture(self, cluster[0], "t8")
+ t8.start(id="1")
+ t8.send(["x"])
+ t8.end(id="1", suspend=True)
+ t8.start(id="2")
+ t8.send(["y"])
+ t8.end(id="2")
+ t8.start()
+ t8.send("z")
+
+
+ # Start new cluster member
+ cluster.start()
+ sessions.append(cluster[1].connect().session())
+
+ # Commit t1
+ t1.send(["3","4"])
+ t1.verify(sessions, [])
+ t1.end()
+ t1.commit(one_phase=True)
+ t1.verify(sessions, ["1","2","3","4"])
+
+ # Rollback t2
+ t2.send(["3","4"])
+ t2.end()
+ t2.rollback()
+ t2.verify(sessions, [])
+
+ # Commit t3
+ t3.commit(one_phase=False)
+ t3.verify(sessions, ["1","2"])
+
+ # Rollback t4
+ t4.rollback()
+ t4.verify(sessions, [])
+
+ # Commit t5
+ t5.send(["3","4"])
+ t5.verify(sessions, [])
+ t5.end()
+ t5.commit(one_phase=True)
+ t5.verify(sessions, ["1","2","3","4"])
+
+ # Commit t6
+ self.assertEqual(t6.accept().body, "b");
+ t6.verify(sessions, ["c"])
+ t6.end()
+ t6.commit(one_phase=True)
+ t6.session.close() # Make sure they're not requeued by the session.
+ t6.verify(sessions, ["c"])
+
+ # Rollback t7
+ self.assertEqual(t7.accept().body, "b");
+ t7.end()
+ t7.rollback()
+ t7.verify(sessions, ["a", "b", "c"])
+
+ # Resume t8
+ t8.end()
+ t8.commit(one_phase=True)
+ t8.start("1", resume=True)
+ t8.end("1")
+ t8.commit("1", one_phase=True)
+ t8.commit("2", one_phase=True)
+ t8.verify(sessions, ["z", "x","y"])
+
+
+ def test_dtx_failover_rollback(self):
+ """Kill a broker during a transaction, verify we roll back correctly"""
+ cluster=self.cluster(1, expect=EXPECT_EXIT_FAIL)
+ cluster.start(expect=EXPECT_RUNNING)
+
+ # Test unprepared at crash
+ t1 = DtxTestFixture(self, cluster[0], "t1")
+ t1.send(["a"]) # Not in transaction
+ t1.start()
+ t1.send(["b"]) # In transaction
+
+ # Test prepared at crash
+ t2 = DtxTestFixture(self, cluster[0], "t2")
+ t2.send(["a"]) # Not in transaction
+ t2.start()
+ t2.send(["b"]) # In transaction
+ t2.end()
+ t2.prepare()
+
+ # Crash the broker
+ cluster[0].kill()
+
+ # Transactional changes should not appear
+ s = cluster[1].connect().session();
+ self.assert_browse(s, "t1", ["a"])
+ self.assert_browse(s, "t2", ["a"])
+
+ def test_dtx_timeout(self):
+ """Verify that dtx timeout works"""
+ cluster = self.cluster(1)
+ t1 = DtxTestFixture(self, cluster[0], "t1")
+ t1.start()
+ t1.set_timeout(1)
+ time.sleep(1.1)
+ try:
+ t1.end()
+ self.fail("Expected rollback timeout.")
+ except DtxStatusException, e:
+ self.assertEqual(e.actual, XA_RBTIMEOUT)
+
+class TxTests(BrokerTest):
+
+ def test_tx_update(self):
+ """Verify that transaction state is updated to a new broker"""
+
+ def make_message(session, body=None, key=None, id=None):
+ dp=session.delivery_properties(routing_key=key)
+ mp=session.message_properties(correlation_id=id)
+ return qpid.datatypes.Message(dp, mp, body)
+
+ cluster=self.cluster(1)
+ # Use old API. TX is not supported in messaging API.
+ c = cluster[0].connect_old()
+ s = c.session("tx-session", 1)
+ s.queue_declare(queue="q")
+ # Start transaction
+ s.tx_select()
+ s.message_transfer(message=make_message(s, "1", "q"))
+ # Start new member mid-transaction
+ cluster.start()
+ # Do more work
+ s.message_transfer(message=make_message(s, "2", "q"))
+ # Commit the transaction and verify the results.
+ s.tx_commit()
+ for b in cluster: self.assert_browse(b.connect().session(), "q", ["1","2"])
+
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
@@ -316,22 +1023,28 @@ class LongTests(BrokerTest):
# Original cluster will all be killed so expect exit with failure
cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
+ for b in cluster: b.ready() # Wait for brokers to be ready
for b in cluster: ErrorGenerator(b)
# Start sender and receiver threads
cluster[0].declare_queue("test-queue")
- sender = NumberedSender(cluster[1], 1000) # Max queue depth
- receiver = NumberedReceiver(cluster[2], sender)
+ sender = NumberedSender(cluster[0], 1000) # Max queue depth
+ receiver = NumberedReceiver(cluster[0], sender)
receiver.start()
sender.start()
+ # Wait for sender & receiver to get up and running
+ retry(lambda: receiver.received > 0)
# Kill original brokers, start new ones for the duration.
endtime = time.time() + self.duration()
i = 0
while time.time() < endtime:
+ sender.sender.assert_running()
+ receiver.receiver.assert_running()
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ for b in cluster[i:]: b.ready()
ErrorGenerator(b)
time.sleep(5)
sender.stop()
@@ -362,24 +1075,24 @@ class LongTests(BrokerTest):
if self.stopped: break
self.process = self.broker.test.popen(
self.cmd, expect=EXPECT_UNKNOWN)
- finally: self.lock.release()
- try: exit = self.process.wait()
+ finally:
+ self.lock.release()
+ try:
+ exit = self.process.wait()
except OSError, e:
- # Seems to be a race in wait(), it throws
- # "no such process" during test shutdown.
- # Doesn't indicate a test error, ignore.
- return
+ # Process may already have been killed by self.stop()
+ break
except Exception, e:
self.process.unexpected(
"client of %s: %s"%(self.broker.name, e))
self.lock.acquire()
try:
- # Quit and ignore errors if stopped or expecting failure.
if self.stopped: break
if exit != 0:
self.process.unexpected(
"client of %s exit code %s"%(self.broker.name, exit))
- finally: self.lock.release()
+ finally:
+ self.lock.release()
except Exception, e:
self.error = RethrownException("Error in ClientLoop.run")
@@ -401,7 +1114,7 @@ class LongTests(BrokerTest):
args += ["--log-enable=trace+:management"]
# Use store if present.
if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
- cluster = self.cluster(3, args)
+ cluster = self.cluster(3, args, expect=EXPECT_EXIT_FAIL) # brokers will be killed
clients = [] # Per-broker list of clients that only connect to one broker.
mclients = [] # Management clients that connect to every broker in the cluster.
@@ -410,10 +1123,12 @@ class LongTests(BrokerTest):
"""Start ordinary clients for a broker."""
cmds=[
["qpid-tool", "localhost:%s"%(broker.port())],
- ["qpid-perftest", "--count", 50000,
+ ["qpid-perftest", "--count=5000", "--durable=yes",
"--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
- ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
- ["testagent", "localhost", str(broker.port())] ]
+ ["qpid-txtest", "--queue-base-name", "tx-%s"%str(qpid.datatypes.uuid4()),
+ "--port", broker.port()],
+ ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())]
+ ]
clients.append([ClientLoop(broker, cmd) for cmd in cmds])
def start_mclients(broker):
@@ -422,7 +1137,8 @@ class LongTests(BrokerTest):
mclients.append(ClientLoop(broker, cmd))
endtime = time.time() + self.duration()
- runtime = self.duration() / 4 # First run is longer, use quarter of duration.
+ # For long duration, first run is a quarter of the duration.
+ runtime = min(5.0, self.duration() / 3.0)
alive = 0 # First live cluster member
for i in range(len(cluster)): start_clients(cluster[i])
start_mclients(cluster[alive])
@@ -433,7 +1149,7 @@ class LongTests(BrokerTest):
for b in cluster[alive:]: b.ready() # Check if a broker crashed.
# Kill the first broker, expect the clients to fail.
b = cluster[alive]
- b.expect = EXPECT_EXIT_FAIL
+ b.ready()
b.kill()
# Stop the brokers clients and all the mclients.
for c in clients[alive] + mclients:
@@ -443,26 +1159,251 @@ class LongTests(BrokerTest):
mclients = []
# Start another broker and clients
alive += 1
- cluster.start()
+ cluster.start(expect=EXPECT_EXIT_FAIL)
+ cluster[-1].ready() # Wait till its ready
start_clients(cluster[-1])
start_mclients(cluster[alive])
for c in chain(mclients, *clients):
c.stop()
-
+ for b in cluster[alive:]:
+ b.ready() # Verify still alive
+ b.kill()
# Verify that logs are consistent
cluster_test_logs.verify_logs()
def test_management_qmf2(self):
self.test_management(args=["--mgmt-qmf2=yes"])
- def test_connect_consistent(self): # FIXME aconway 2011-01-18:
+ def test_connect_consistent(self):
args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
cluster = self.cluster(2, args=args)
end = time.time() + self.duration()
while (time.time() < end): # Get a management interval
for i in xrange(1000): cluster[0].connect().close()
+ cluster_test_logs.verify_logs()
+
+ def test_flowlimit_failover(self):
+ """Test fail-over during continuous send-receive with flow control
+ active.
+ """
+
+ # Original cluster will all be killed so expect exit with failure
+ cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
+ for b in cluster: b.ready() # Wait for brokers to be ready
+
+ # create a queue with rather draconian flow control settings
+ ssn0 = cluster[0].connect().session()
+ s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}")
+
+ receiver = NumberedReceiver(cluster[0])
+ receiver.start()
+ senders = [NumberedSender(cluster[0]) for i in range(1,3)]
+ for s in senders:
+ s.start()
+ # Wait for senders & receiver to get up and running
+ retry(lambda: receiver.received > 2*senders)
+
+ # Kill original brokers, start new ones for the duration.
+ endtime = time.time() + self.duration();
+ i = 0
+ while time.time() < endtime:
+ for s in senders: s.sender.assert_running()
+ receiver.receiver.assert_running()
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
+ cluster[i].kill()
+ i += 1
+ b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ time.sleep(5)
+ for s in senders:
+ s.stop()
+ receiver.stop()
+ for i in range(i, len(cluster)): cluster[i].kill()
+
+ def test_ttl_failover(self):
+ """Test that messages with TTL don't cause problems in a cluster with failover"""
+
+ class Client(StoppableThread):
+
+ def __init__(self, broker):
+ StoppableThread.__init__(self)
+ self.connection = broker.connect(reconnect=True)
+ self.auto_fetch_reconnect_urls(self.connection)
+ self.session = self.connection.session()
+
+ def auto_fetch_reconnect_urls(self, conn):
+ """Replacment for qpid.messaging.util version which is noisy"""
+ ssn = conn.session("auto-fetch-reconnect-urls")
+ rcv = ssn.receiver("amq.failover")
+ rcv.capacity = 10
+
+ def main():
+ while True:
+ try:
+ msg = rcv.fetch()
+ qpid.messaging.util.set_reconnect_urls(conn, msg)
+ ssn.acknowledge(msg, sync=False)
+ except messaging.exceptions.LinkClosed: return
+ except messaging.exceptions.ConnectionError: return
+
+ thread = Thread(name="auto-fetch-reconnect-urls", target=main)
+ thread.setDaemon(True)
+ thread.start()
+
+ def stop(self):
+ StoppableThread.stop(self)
+ self.connection.detach()
+
+ class Sender(Client):
+ def __init__(self, broker, address):
+ Client.__init__(self, broker)
+ self.sent = 0 # Number of messages _reliably_ sent.
+ self.sender = self.session.sender(address, capacity=1000)
+
+ def send_counted(self, ttl):
+ self.sender.send(Message(str(self.sent), ttl=ttl))
+ self.sent += 1
+
+ def run(self):
+ while not self.stopped:
+ choice = random.randint(0,4)
+ if choice == 0: self.send_counted(None) # No ttl
+ elif choice == 1: self.send_counted(100000) # Large ttl
+ else: # Small ttl, might expire
+ self.sender.send(Message("", ttl=random.random()/10))
+ self.sender.send(Message("z"), sync=True) # Chaser.
+
+ class Receiver(Client):
+
+ def __init__(self, broker, address):
+ Client.__init__(self, broker)
+ self.received = 0 # Number of non-empty (reliable) messages received.
+ self.receiver = self.session.receiver(address, capacity=1000)
+ def run(self):
+ try:
+ while True:
+ m = self.receiver.fetch(1)
+ if m.content == "z": break
+ if m.content: # Ignore unreliable messages
+ # Ignore duplicates
+ if int(m.content) == self.received: self.received += 1
+ except Exception,e: self.error = e
+
+ # def test_ttl_failover
+
+ # Original cluster will all be killed so expect exit with failure
+ # Set small purge interval.
+ cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"])
+ for b in cluster: b.ready() # Wait for brokers to be ready
+
+ # Python client failover produces noisy WARN logs, disable temporarily
+ logger = logging.getLogger()
+ log_level = logger.getEffectiveLevel()
+ logger.setLevel(logging.ERROR)
+ sender = None
+ receiver = None
+ try:
+ # Start sender and receiver threads
+ receiver = Receiver(cluster[0], "q;{create:always}")
+ receiver.start()
+ sender = Sender(cluster[0], "q;{create:always}")
+ sender.start()
+ # Wait for sender & receiver to get up and running
+ retry(lambda: receiver.received > 0)
+
+ # Kill brokers in a cycle.
+ endtime = time.time() + self.duration()
+ runtime = min(5.0, self.duration() / 4.0)
+ i = 0
+ while time.time() < endtime:
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
+ cluster[i].kill()
+ i += 1
+ b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ b.ready()
+ time.sleep(runtime)
+ sender.stop()
+ receiver.stop()
+ for b in cluster[i:]:
+ b.ready() # Check it didn't crash
+ b.kill()
+ self.assertEqual(sender.sent, receiver.received)
cluster_test_logs.verify_logs()
+ finally:
+ # Detach to avoid slow reconnect attempts during shut-down if test fails.
+ if sender: sender.connection.detach()
+ if receiver: receiver.connection.detach()
+ logger.setLevel(log_level)
+
+ def test_msg_group_failover(self):
+ """Test fail-over during continuous send-receive of grouped messages.
+ """
+
+ class GroupedTrafficGenerator(Thread):
+ def __init__(self, url, queue, group_key):
+ Thread.__init__(self)
+ self.url = url
+ self.queue = queue
+ self.group_key = group_key
+ self.status = -1
+
+ def run(self):
+ # generate traffic for approx 10 seconds (2011msgs / 200 per-sec)
+ cmd = ["msg_group_test",
+ "--broker=%s" % self.url,
+ "--address=%s" % self.queue,
+ "--connection-options={%s}" % (Cluster.CONNECTION_OPTIONS),
+ "--group-key=%s" % self.group_key,
+ "--receivers=2",
+ "--senders=3",
+ "--messages=2011",
+ "--send-rate=200",
+ "--capacity=11",
+ "--ack-frequency=23",
+ "--allow-duplicates",
+ "--group-size=37",
+ "--randomize-group-size",
+ "--interleave=13"]
+ # "--trace"]
+ self.generator = Popen( cmd );
+ self.status = self.generator.wait()
+ return self.status
+
+ def results(self):
+ self.join(timeout=30) # 3x assumed duration
+ if self.isAlive(): return -1
+ return self.status
+
+ # Original cluster will all be killed so expect exit with failure
+ cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["-t"])
+ for b in cluster: b.ready() # Wait for brokers to be ready
+
+ # create a queue with rather draconian flow control settings
+ ssn0 = cluster[0].connect().session()
+ q_args = "{'qpid.group_header_key':'group-id', 'qpid.shared_msg_group':1}"
+ s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:%s}}}" % q_args)
+
+ # Kill original brokers, start new ones for the duration.
+ endtime = time.time() + self.duration();
+ i = 0
+ while time.time() < endtime:
+ traffic = GroupedTrafficGenerator( cluster[i].host_port(),
+ "test-group-q", "group-id" )
+ traffic.start()
+ time.sleep(1)
+
+ for x in range(2):
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
+ cluster[i].kill()
+ i += 1
+ b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ time.sleep(1)
+
+ # wait for traffic to finish, verify success
+ self.assertEqual(0, traffic.results())
+
+ for i in range(i, len(cluster)): cluster[i].kill()
+
class StoreTests(BrokerTest):
"""