summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-06-22 13:29:52 +0000
committerAlan Conway <aconway@apache.org>2010-06-22 13:29:52 +0000
commita49decc7d56bdb704a5d1580058c0da57e9a9353 (patch)
treeaf0acf1f9e7e5f48336407ae438e11528db75b38 /cpp/src/tests
parent265841a55cca55a7d3f8eea1d9e9c24a5fc2e350 (diff)
downloadqpid-python-a49decc7d56bdb704a5d1580058c0da57e9a9353.tar.gz
Fix cluster broker crashes when management is active.
Cluser brokers were exiting with errors "modified cluster state outside cluster context" and "confirmed < (50+0) but only sent < (49+0)" Fix was to: - delay completion of incoming update till update connection closes. - delay addding new connections to managment until connection is announced. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@956882 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rwxr-xr-xcpp/src/tests/cluster_tests.py2
-rwxr-xr-xcpp/src/tests/run_long_cluster_tests2
-rwxr-xr-xcpp/src/tests/verify_cluster_objects456
3 files changed, 76 insertions, 384 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 974c00b4dc..944de96fb5 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -255,7 +255,7 @@ class LongTests(BrokerTest):
StoppableThread.stop(self)
# def test_management
- args=["--mgmt-pub-interval", 1] # Publish management information every second.
+ args = ["--mgmt-pub-interval", 1] # Publish management information every second.
# Use store if present.
if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
cluster = self.cluster(3, args)
diff --git a/cpp/src/tests/run_long_cluster_tests b/cpp/src/tests/run_long_cluster_tests
index 05c7867e2e..5dce0be585 100755
--- a/cpp/src/tests/run_long_cluster_tests
+++ b/cpp/src/tests/run_long_cluster_tests
@@ -20,5 +20,5 @@
#
srcdir=`dirname $0`
-$srcdir/run_cluster_tests 'cluster_tests.LongTests.*' -DDURATION=2
+$srcdir/run_cluster_tests 'cluster_tests.LongTests.*' -DDURATION=4
diff --git a/cpp/src/tests/verify_cluster_objects b/cpp/src/tests/verify_cluster_objects
index be6d67d81a..a96c636875 100755
--- a/cpp/src/tests/verify_cluster_objects
+++ b/cpp/src/tests/verify_cluster_objects
@@ -1,6 +1,5 @@
#!/usr/bin/env python
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -19,390 +18,83 @@
# under the License.
#
-import os
-import getopt
-import sys
-import locale
-import socket
-import re
-from qmf.console import Session, SchemaClass
-
-_host = "localhost"
-_connTimeout = 10
-_verbose = 0
-_del_test = False;
-pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
-_debug_recursion = 0
-
-def Usage ():
- print "Usage: verify_cluster_objects [OPTIONS] [broker-addr]"
- print
- print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
- print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
- print
- print " This program contacts every node of a cluster, loads all manageable objects from"
- print " those nodes and verifies that the management data is identical across the clusters."
- print
- print "Options:"
- print " --timeout seconds (10) Maximum time to wait for broker connection"
- print " --verbose level (0) Show details of objects and their IDs"
- print " --delete Delete some objects after creation, to test synchup"
- print
- sys.exit (1)
-
-class IpAddr:
- def __init__(self, text):
- if text.find("@") != -1:
- tokens = text.split("@")
- text = tokens[1]
- if text.find(":") != -1:
- tokens = text.split(":")
- text = tokens[0]
- self.port = int(tokens[1])
- else:
- self.port = 5672
- self.dottedQuad = socket.gethostbyname(text)
- nums = self.dottedQuad.split(".")
- self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
-
- def bestAddr(self, addrPortList):
- bestDiff = 0xFFFFFFFFL
- bestAddr = None
- for addrPort in addrPortList:
- diff = IpAddr(addrPort[0]).addr ^ self.addr
- if diff < bestDiff:
- bestDiff = diff
- bestAddr = addrPort
- return bestAddr
-
-class ObjectId:
- """Object identity, use for dictionaries by object id"""
- def __init__(self, object): self.object = object
- def __eq__(self, other): return self.object is other.object
- def __hash__(self): return hash(id(self.object))
-
-class Broker(object):
- def __init__(self, qmf, broker):
- self.broker = broker
- self.qmf = qmf
-
- agents = qmf.getAgents()
- for a in agents:
- if a.getAgentBank() == '0':
- self.brokerAgent = a
-
- bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker",
- _agent=self.brokerAgent)[0]
- self.currentTime = bobj.getTimestamps()[0]
- try:
- self.uptime = bobj.uptime
- except:
- self.uptime = 0
- self.tablesByName = {}
- self.package = "org.apache.qpid.broker"
- self.id_cache = {} # Cache for getAbstractId
-
- def getUrl(self):
- return self.broker.getUrl()
-
- def getData(self):
- if _verbose > 1:
- print "Broker:", self.broker
-
- classList = self.qmf.getClasses(self.package)
- for cls in classList:
- if self.qmf.getSchema(cls).kind == SchemaClass.CLASS_KIND_TABLE:
- self.loadTable(cls)
+# Verify managment objects are consistent in a cluster.
+# Arguments: url of one broker in the cluster.
+import qmf.console, sys, re
- #
- # this should be a method on an object, but is kept here for now, until
- # we finish sorting out the treatment of names in qmfv2
- #
- def getAbstractId(self, object):
- """ return a string the of the hierarchical name """
- if (ObjectId(object) in self.id_cache): return self.id_cache[ObjectId(object)]
- global _debug_recursion
- result = u""
- valstr = u""
- _debug_recursion += 1
- debug_prefix = _debug_recursion
- if (_verbose > 9):
- print debug_prefix, " enter gai: props ", object._properties
- for property, value in object._properties:
+class Session(qmf.console.Session):
+ """A qmf.console.Session that caches useful values"""
- # we want to recurse on things which are refs. we tell by
- # asking each property if it's an index. I think...
- if (_verbose > 9):
- print debug_prefix, " prop ", property, " val " , value, " idx ",
- property.index, " type ", property.type
-
- # property is an instance, you can ask its type, name, etc.
-
- # special case system refs, as they will never be the same on
- # distinct cluster nodes. later we probably want a different
- # way of representing these objects, like for instance don't
- # include the system ref in the hierarchy.
-
- if property.name == "systemRef":
- _debug_recursion -= 1
- self.id_cache[ObjectId(object)] = ""
- return ""
-
- if property.index:
- if result != u"":
- result += u":"
- if property.type == 10:
- try:
- recursive_objects = object._session.getObjects(_objectId = value, _broker=object._broker)
- if (_verbose > 9):
- print debug_prefix, " r ", recursive_objects[0]
- for rp, rv in recursive_objects[0]._properties:
- print debug_prefix, " rrr ", rp, " idx-p ", rp.index, " v ", rv
- print debug_prefix, " recursing on ", recursive_objects[0]
- valstr = self.getAbstractId(recursive_objects[0])
- if (_verbose > 9):
- print debug_prefix, " recursing on ", recursive_objects[0],
- " -> ", valstr
- except Exception, e:
- if (_verbose > 9):
- print debug_prefix, " except ", e
- valstr = u"<undecodable>"
- else:
- # this yields UUID-blah. not good. try something else
- # valstr = value.__repr__()
- # print debug_prefix, " val ", value
-
- # yetch. this needs to be abstracted someplace? I don't
- # think we have the infrastructure we need to make these id
- # strings be sensible in the general case
- if property.name == "systemId":
- # special case. try to do something sensible about systemref objects
- valstr = object.nodeName
- else:
- valstr = value.__repr__() # I think...
- result += valstr
- if (_verbose > 9):
- print debug_prefix, " id ", self, " -> ", result
- _debug_recursion -= 1
- self.id_cache[ObjectId(object)] = result
- return result
-
- def loadTable(self, cls):
- if _verbose > 1:
- print " Class:", cls.getClassName()
- list = self.qmf.getObjects(_class=cls.getClassName(),
- _package=cls.getPackageName(),
- _agent=self.brokerAgent)
-
- # tables-by-name maps class name to a table by object-name of
- # objects. ie use the class name ("broker", "queue", etc) to
- # index tables-by-name, returning a second table, use the
- # object name to index that to get an object.
-
- self.tablesByName[cls.getClassName()] = {}
- for obj in list:
- # make sure we aren't colliding on name. it's an internal
- # error (ie, the name-generation code is busted) if we do
- key = self.getAbstractId(obj)
- if key in self.tablesByName[cls.getClassName()]:
- raise Exception("internal error: collision for %s on key %s\n"
- % (obj, key))
-
- self.tablesByName[cls.getClassName()][key] = obj
- if _verbose > 1:
- print " ", obj.getObjectId(), " ", obj.getIndex(), " ", key
-
-
-class BrokerManager:
def __init__(self):
- self.brokerName = None
- self.qmf = None
- self.broker = None
- self.brokers = []
- self.cluster = None
-
- def SetBroker(self, brokerUrl):
- self.url = brokerUrl
- self.qmf = Session()
- self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
- agents = self.qmf.getAgents()
- for a in agents:
- if a.getAgentBank() == '0':
- self.brokerAgent = a
-
- def Disconnect(self):
- if self.broker:
- self.qmf.delBroker(self.broker)
-
- def _getCluster(self):
- packages = self.qmf.getPackages()
- if "org.apache.qpid.cluster" not in packages:
- return None
+ qmf.console.Session.__init__(self)
+ self.classes = None
- clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
- if len(clusters) == 0:
- print "Clustering is installed but not enabled on the broker."
- return None
+ def all_classes(self):
+ if self.classes is None:
+ self.classes = [c for p in self.getPackages() for c in self.getClasses(p)]
+ return self.classes
- self.cluster = clusters[0]
-
- def _getHostList(self, urlList):
- hosts = []
- hostAddr = IpAddr(_host)
- for url in urlList:
- if url.find("amqp:") != 0:
- raise Exception("Invalid URL 1")
- url = url[5:]
- addrs = str(url).split(",")
- addrList = []
- for addr in addrs:
- tokens = addr.split(":")
- if len(tokens) != 3:
- raise Exception("Invalid URL 2")
- addrList.append((tokens[1], tokens[2]))
-
- # Find the address in the list that is most likely to be
- # in the same subnet as the address with which we made the
- # original QMF connection. This increases the probability
- # that we will be able to reach the cluster member.
-
- best = hostAddr.bestAddr(addrList)
- bestUrl = best[0] + ":" + best[1]
- hosts.append(bestUrl)
- return hosts
-
-
- # the main fun which tests for broker state "identity". now that
- # we're using qmf2 style object names across the board, that test
- # means that we are ensuring that for all objects of a given
- # class, an object of that class with the same object name exists
- # on the peer broker.
-
- def verify(self):
- if _verbose > 0:
- print "Connecting to the cluster..."
- self._getCluster()
- if self.cluster:
- memberList = self.cluster.members.split(";")
- hostList = self._getHostList(memberList)
- self.qmf.delBroker(self.broker)
- self.broker = None
- for host in hostList:
- b = self.qmf.addBroker(host, _connTimeout)
- self.brokers.append(Broker(self.qmf, b))
- if _verbose > 0:
- print " ", b
- else:
- raise Exception("Failed - Not a cluster")
-
- failures = []
-
- # Wait until connections to all nodes are established before
- # loading the management data. This will ensure that the
- # objects are all stable and the same.
- if _verbose > 0:
- print "Loading management data from nodes..."
- for broker in self.brokers:
- broker.getData()
-
- # If we're testing delete-some-objects functionality, create a
- # few widgets here and then delete them.
- if _del_test:
- if _verbose > 0:
- print "Running delete test"
- # just stick 'em in the first broker
- b = self.brokers[0]
- session = b.qmf.brokers[0].getAmqpSession()
- session.queue_declare(queue="foo", exclusive=True, auto_delete=True)
- session.exchange_bind(exchange="amq.direct",
- queue="foo", binding_key="foo")
- session.queue_declare(queue="bar", exclusive=True, auto_delete=True)
- session.exchange_bind(exchange="amq.direct",
- queue="bar", binding_key="bar")
- # now delete 'em
- session.exchange_unbind(queue="foo", exchange="amq.direct", binding_key="foo")
- session.exchange_unbind(queue="bar", exchange="amq.direct", binding_key="bar")
- session.queue_delete("bar")
- session.queue_delete("foo")
-
- # Verify that each node has the same set of objects (based on
- # object name).
- if _verbose > 0:
- print "Verifying objects based on object name..."
- base = self.brokers[0]
- for broker in self.brokers[1:]:
-
- # walk over the class names, for each class (with some
- # exceptions) walk over the objects of that class, making
- # sure they match between broker A and broker B
-
- for className in base.tablesByName:
- if className in ["broker", "system", "connection"]:
- continue
-
- tab1 = base.tablesByName[className]
- tab2 = broker.tablesByName[className]
-
- for key in tab1:
- if key not in tab2:
- failures.append("%s key %s not found on node %s" %
- (className, key, broker.getUrl()))
- for key in tab2:
- if key not in tab1:
- failures.append("%s key %s not found on node %s" %
- (className, key, base.getUrl()))
-
- if len(failures) > 0:
- print "Failures:"
- for failure in failures:
- print " %s" % failure
- raise Exception("Failures")
-
- if _verbose > 0:
- print "Success"
-
-##
-## Main Program
-##
-
-try:
- longOpts = ("verbose=", "timeout=", "delete")
- (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "", longOpts)
-except:
- Usage()
-
-try:
- encoding = locale.getpreferredencoding()
- cargs = [a.decode(encoding) for a in encArgs]
-except:
- cargs = encArgs
-
-for opt in optlist:
- if opt[0] == "--timeout":
- _connTimeout = int(opt[1])
- if _connTimeout == 0:
- _connTimeout = None
- elif opt[0] == "--verbose":
- _verbose = int(opt[1])
- elif opt[0] == "--delete":
- _del_test = True;
- else:
- Usage()
-
-nargs = len(cargs)
-bm = BrokerManager()
-
-if nargs == 1:
- _host = cargs[0]
-
-try:
- bm.SetBroker(_host)
- bm.verify()
-except KeyboardInterrupt:
- print
-except Exception,e:
- print "Failed: %s - %s" % (e.__class__.__name__, e)
- sys.exit(1)
-
-bm.Disconnect()
+class Broker:
+ def __init__(self, url, qmf):
+ self.url = url
+ self.qmf = qmf
+ self.broker = self.qmf.addBroker(url)
+ self.broker._waitForStable()
+ self.objects = None
+ self.ignore_list = [ re.compile("org.apache.qpid.broker:system:") ]
+
+ def get_objects(self):
+ def ignore(name):
+ for m in (m for m in self.ignore_list if m.match(name)):
+ return True
+ if self.objects is None:
+ obj_list = []
+ for c in self.qmf.all_classes():
+ for o in self.qmf.getObjects(_key=c, _broker=self.broker):
+ name=o.getObjectId().getObject()
+ if not ignore(name): obj_list.append(name)
+ self.objects = set(obj_list)
+ if (len(obj_list) != len(self.objects)):
+ raise Exception("Duplicates in object list for %s"%(self.url))
+ return self.objects
+
+ def compare(self,other):
+ def compare1(x,y):
+ diff = x.get_objects() - y.get_objects()
+ if diff:
+ print "ERROR: found on %s but not %s"%(x, y)
+ for o in diff: print " %s"%(o)
+ return False
+ return True
+
+ so = compare1(self, other)
+ os = compare1(other, self)
+ return so and os
+
+ def __str__(self): return self.url
+
+ def get_cluster(self):
+ """Given one Broker, return list of all brokers in its cluster"""
+ clusters = self.qmf.getObjects(_class="cluster")
+ if not clusters: raise ("%s is not a cluster member"%(self.url))
+ def first_address(url):
+ """Python doesn't understand the brokers URL syntax. Extract a simple addres"""
+ return re.compile("amqp:tcp:([^,]*)").match(url).group(1)
+ return [Broker(first_address(url), self.qmf) for url in clusters[0].members.split(";")]
+
+ def __del__(self): self.qmf.delBroker(self.broker)
+
+def main(argv=None):
+ if argv is None: argv = sys.argv
+ qmf = Session()
+ brokers = Broker(argv[1], qmf).get_cluster()
+ base = brokers.pop(0)
+ result = 0
+ for b in brokers:
+ if not base.compare(b): result = 1
+ del base
+ del brokers
+ return result
+
+if __name__ == "__main__": sys.exit(main())