diff options
author | Alan Conway <aconway@apache.org> | 2010-06-22 13:29:52 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-06-22 13:29:52 +0000 |
commit | a49decc7d56bdb704a5d1580058c0da57e9a9353 (patch) | |
tree | af0acf1f9e7e5f48336407ae438e11528db75b38 /cpp/src/tests | |
parent | 265841a55cca55a7d3f8eea1d9e9c24a5fc2e350 (diff) | |
download | qpid-python-a49decc7d56bdb704a5d1580058c0da57e9a9353.tar.gz |
Fix cluster broker crashes when management is active.
Cluser brokers were exiting with errors "modified cluster state
outside cluster context" and "confirmed < (50+0) but only sent < (49+0)"
Fix was to:
- delay completion of incoming update till update connection closes.
- delay addding new connections to managment until connection is announced.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@956882 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 2 | ||||
-rwxr-xr-x | cpp/src/tests/run_long_cluster_tests | 2 | ||||
-rwxr-xr-x | cpp/src/tests/verify_cluster_objects | 456 |
3 files changed, 76 insertions, 384 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 974c00b4dc..944de96fb5 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -255,7 +255,7 @@ class LongTests(BrokerTest): StoppableThread.stop(self) # def test_management - args=["--mgmt-pub-interval", 1] # Publish management information every second. + args = ["--mgmt-pub-interval", 1] # Publish management information every second. # Use store if present. if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib] cluster = self.cluster(3, args) diff --git a/cpp/src/tests/run_long_cluster_tests b/cpp/src/tests/run_long_cluster_tests index 05c7867e2e..5dce0be585 100755 --- a/cpp/src/tests/run_long_cluster_tests +++ b/cpp/src/tests/run_long_cluster_tests @@ -20,5 +20,5 @@ # srcdir=`dirname $0` -$srcdir/run_cluster_tests 'cluster_tests.LongTests.*' -DDURATION=2 +$srcdir/run_cluster_tests 'cluster_tests.LongTests.*' -DDURATION=4 diff --git a/cpp/src/tests/verify_cluster_objects b/cpp/src/tests/verify_cluster_objects index be6d67d81a..a96c636875 100755 --- a/cpp/src/tests/verify_cluster_objects +++ b/cpp/src/tests/verify_cluster_objects @@ -1,6 +1,5 @@ #!/usr/bin/env python -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -19,390 +18,83 @@ # under the License. # -import os -import getopt -import sys -import locale -import socket -import re -from qmf.console import Session, SchemaClass - -_host = "localhost" -_connTimeout = 10 -_verbose = 0 -_del_test = False; -pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") -_debug_recursion = 0 - -def Usage (): - print "Usage: verify_cluster_objects [OPTIONS] [broker-addr]" - print - print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" - print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" - print - print " This program contacts every node of a cluster, loads all manageable objects from" - print " those nodes and verifies that the management data is identical across the clusters." - print - print "Options:" - print " --timeout seconds (10) Maximum time to wait for broker connection" - print " --verbose level (0) Show details of objects and their IDs" - print " --delete Delete some objects after creation, to test synchup" - print - sys.exit (1) - -class IpAddr: - def __init__(self, text): - if text.find("@") != -1: - tokens = text.split("@") - text = tokens[1] - if text.find(":") != -1: - tokens = text.split(":") - text = tokens[0] - self.port = int(tokens[1]) - else: - self.port = 5672 - self.dottedQuad = socket.gethostbyname(text) - nums = self.dottedQuad.split(".") - self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) - - def bestAddr(self, addrPortList): - bestDiff = 0xFFFFFFFFL - bestAddr = None - for addrPort in addrPortList: - diff = IpAddr(addrPort[0]).addr ^ self.addr - if diff < bestDiff: - bestDiff = diff - bestAddr = addrPort - return bestAddr - -class ObjectId: - """Object identity, use for dictionaries by object id""" - def __init__(self, object): self.object = object - def __eq__(self, other): return self.object is other.object - def __hash__(self): return hash(id(self.object)) - -class Broker(object): - def __init__(self, qmf, broker): - self.broker = broker - self.qmf = qmf - - agents = qmf.getAgents() - for a in agents: - if a.getAgentBank() == '0': - self.brokerAgent = a - - bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", - _agent=self.brokerAgent)[0] - self.currentTime = bobj.getTimestamps()[0] - try: - self.uptime = bobj.uptime - except: - self.uptime = 0 - self.tablesByName = {} - self.package = "org.apache.qpid.broker" - self.id_cache = {} # Cache for getAbstractId - - def getUrl(self): - return self.broker.getUrl() - - def getData(self): - if _verbose > 1: - print "Broker:", self.broker - - classList = self.qmf.getClasses(self.package) - for cls in classList: - if self.qmf.getSchema(cls).kind == SchemaClass.CLASS_KIND_TABLE: - self.loadTable(cls) +# Verify managment objects are consistent in a cluster. +# Arguments: url of one broker in the cluster. +import qmf.console, sys, re - # - # this should be a method on an object, but is kept here for now, until - # we finish sorting out the treatment of names in qmfv2 - # - def getAbstractId(self, object): - """ return a string the of the hierarchical name """ - if (ObjectId(object) in self.id_cache): return self.id_cache[ObjectId(object)] - global _debug_recursion - result = u"" - valstr = u"" - _debug_recursion += 1 - debug_prefix = _debug_recursion - if (_verbose > 9): - print debug_prefix, " enter gai: props ", object._properties - for property, value in object._properties: +class Session(qmf.console.Session): + """A qmf.console.Session that caches useful values""" - # we want to recurse on things which are refs. we tell by - # asking each property if it's an index. I think... - if (_verbose > 9): - print debug_prefix, " prop ", property, " val " , value, " idx ", - property.index, " type ", property.type - - # property is an instance, you can ask its type, name, etc. - - # special case system refs, as they will never be the same on - # distinct cluster nodes. later we probably want a different - # way of representing these objects, like for instance don't - # include the system ref in the hierarchy. - - if property.name == "systemRef": - _debug_recursion -= 1 - self.id_cache[ObjectId(object)] = "" - return "" - - if property.index: - if result != u"": - result += u":" - if property.type == 10: - try: - recursive_objects = object._session.getObjects(_objectId = value, _broker=object._broker) - if (_verbose > 9): - print debug_prefix, " r ", recursive_objects[0] - for rp, rv in recursive_objects[0]._properties: - print debug_prefix, " rrr ", rp, " idx-p ", rp.index, " v ", rv - print debug_prefix, " recursing on ", recursive_objects[0] - valstr = self.getAbstractId(recursive_objects[0]) - if (_verbose > 9): - print debug_prefix, " recursing on ", recursive_objects[0], - " -> ", valstr - except Exception, e: - if (_verbose > 9): - print debug_prefix, " except ", e - valstr = u"<undecodable>" - else: - # this yields UUID-blah. not good. try something else - # valstr = value.__repr__() - # print debug_prefix, " val ", value - - # yetch. this needs to be abstracted someplace? I don't - # think we have the infrastructure we need to make these id - # strings be sensible in the general case - if property.name == "systemId": - # special case. try to do something sensible about systemref objects - valstr = object.nodeName - else: - valstr = value.__repr__() # I think... - result += valstr - if (_verbose > 9): - print debug_prefix, " id ", self, " -> ", result - _debug_recursion -= 1 - self.id_cache[ObjectId(object)] = result - return result - - def loadTable(self, cls): - if _verbose > 1: - print " Class:", cls.getClassName() - list = self.qmf.getObjects(_class=cls.getClassName(), - _package=cls.getPackageName(), - _agent=self.brokerAgent) - - # tables-by-name maps class name to a table by object-name of - # objects. ie use the class name ("broker", "queue", etc) to - # index tables-by-name, returning a second table, use the - # object name to index that to get an object. - - self.tablesByName[cls.getClassName()] = {} - for obj in list: - # make sure we aren't colliding on name. it's an internal - # error (ie, the name-generation code is busted) if we do - key = self.getAbstractId(obj) - if key in self.tablesByName[cls.getClassName()]: - raise Exception("internal error: collision for %s on key %s\n" - % (obj, key)) - - self.tablesByName[cls.getClassName()][key] = obj - if _verbose > 1: - print " ", obj.getObjectId(), " ", obj.getIndex(), " ", key - - -class BrokerManager: def __init__(self): - self.brokerName = None - self.qmf = None - self.broker = None - self.brokers = [] - self.cluster = None - - def SetBroker(self, brokerUrl): - self.url = brokerUrl - self.qmf = Session() - self.broker = self.qmf.addBroker(brokerUrl, _connTimeout) - agents = self.qmf.getAgents() - for a in agents: - if a.getAgentBank() == '0': - self.brokerAgent = a - - def Disconnect(self): - if self.broker: - self.qmf.delBroker(self.broker) - - def _getCluster(self): - packages = self.qmf.getPackages() - if "org.apache.qpid.cluster" not in packages: - return None + qmf.console.Session.__init__(self) + self.classes = None - clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) - if len(clusters) == 0: - print "Clustering is installed but not enabled on the broker." - return None + def all_classes(self): + if self.classes is None: + self.classes = [c for p in self.getPackages() for c in self.getClasses(p)] + return self.classes - self.cluster = clusters[0] - - def _getHostList(self, urlList): - hosts = [] - hostAddr = IpAddr(_host) - for url in urlList: - if url.find("amqp:") != 0: - raise Exception("Invalid URL 1") - url = url[5:] - addrs = str(url).split(",") - addrList = [] - for addr in addrs: - tokens = addr.split(":") - if len(tokens) != 3: - raise Exception("Invalid URL 2") - addrList.append((tokens[1], tokens[2])) - - # Find the address in the list that is most likely to be - # in the same subnet as the address with which we made the - # original QMF connection. This increases the probability - # that we will be able to reach the cluster member. - - best = hostAddr.bestAddr(addrList) - bestUrl = best[0] + ":" + best[1] - hosts.append(bestUrl) - return hosts - - - # the main fun which tests for broker state "identity". now that - # we're using qmf2 style object names across the board, that test - # means that we are ensuring that for all objects of a given - # class, an object of that class with the same object name exists - # on the peer broker. - - def verify(self): - if _verbose > 0: - print "Connecting to the cluster..." - self._getCluster() - if self.cluster: - memberList = self.cluster.members.split(";") - hostList = self._getHostList(memberList) - self.qmf.delBroker(self.broker) - self.broker = None - for host in hostList: - b = self.qmf.addBroker(host, _connTimeout) - self.brokers.append(Broker(self.qmf, b)) - if _verbose > 0: - print " ", b - else: - raise Exception("Failed - Not a cluster") - - failures = [] - - # Wait until connections to all nodes are established before - # loading the management data. This will ensure that the - # objects are all stable and the same. - if _verbose > 0: - print "Loading management data from nodes..." - for broker in self.brokers: - broker.getData() - - # If we're testing delete-some-objects functionality, create a - # few widgets here and then delete them. - if _del_test: - if _verbose > 0: - print "Running delete test" - # just stick 'em in the first broker - b = self.brokers[0] - session = b.qmf.brokers[0].getAmqpSession() - session.queue_declare(queue="foo", exclusive=True, auto_delete=True) - session.exchange_bind(exchange="amq.direct", - queue="foo", binding_key="foo") - session.queue_declare(queue="bar", exclusive=True, auto_delete=True) - session.exchange_bind(exchange="amq.direct", - queue="bar", binding_key="bar") - # now delete 'em - session.exchange_unbind(queue="foo", exchange="amq.direct", binding_key="foo") - session.exchange_unbind(queue="bar", exchange="amq.direct", binding_key="bar") - session.queue_delete("bar") - session.queue_delete("foo") - - # Verify that each node has the same set of objects (based on - # object name). - if _verbose > 0: - print "Verifying objects based on object name..." - base = self.brokers[0] - for broker in self.brokers[1:]: - - # walk over the class names, for each class (with some - # exceptions) walk over the objects of that class, making - # sure they match between broker A and broker B - - for className in base.tablesByName: - if className in ["broker", "system", "connection"]: - continue - - tab1 = base.tablesByName[className] - tab2 = broker.tablesByName[className] - - for key in tab1: - if key not in tab2: - failures.append("%s key %s not found on node %s" % - (className, key, broker.getUrl())) - for key in tab2: - if key not in tab1: - failures.append("%s key %s not found on node %s" % - (className, key, base.getUrl())) - - if len(failures) > 0: - print "Failures:" - for failure in failures: - print " %s" % failure - raise Exception("Failures") - - if _verbose > 0: - print "Success" - -## -## Main Program -## - -try: - longOpts = ("verbose=", "timeout=", "delete") - (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "", longOpts) -except: - Usage() - -try: - encoding = locale.getpreferredencoding() - cargs = [a.decode(encoding) for a in encArgs] -except: - cargs = encArgs - -for opt in optlist: - if opt[0] == "--timeout": - _connTimeout = int(opt[1]) - if _connTimeout == 0: - _connTimeout = None - elif opt[0] == "--verbose": - _verbose = int(opt[1]) - elif opt[0] == "--delete": - _del_test = True; - else: - Usage() - -nargs = len(cargs) -bm = BrokerManager() - -if nargs == 1: - _host = cargs[0] - -try: - bm.SetBroker(_host) - bm.verify() -except KeyboardInterrupt: - print -except Exception,e: - print "Failed: %s - %s" % (e.__class__.__name__, e) - sys.exit(1) - -bm.Disconnect() +class Broker: + def __init__(self, url, qmf): + self.url = url + self.qmf = qmf + self.broker = self.qmf.addBroker(url) + self.broker._waitForStable() + self.objects = None + self.ignore_list = [ re.compile("org.apache.qpid.broker:system:") ] + + def get_objects(self): + def ignore(name): + for m in (m for m in self.ignore_list if m.match(name)): + return True + if self.objects is None: + obj_list = [] + for c in self.qmf.all_classes(): + for o in self.qmf.getObjects(_key=c, _broker=self.broker): + name=o.getObjectId().getObject() + if not ignore(name): obj_list.append(name) + self.objects = set(obj_list) + if (len(obj_list) != len(self.objects)): + raise Exception("Duplicates in object list for %s"%(self.url)) + return self.objects + + def compare(self,other): + def compare1(x,y): + diff = x.get_objects() - y.get_objects() + if diff: + print "ERROR: found on %s but not %s"%(x, y) + for o in diff: print " %s"%(o) + return False + return True + + so = compare1(self, other) + os = compare1(other, self) + return so and os + + def __str__(self): return self.url + + def get_cluster(self): + """Given one Broker, return list of all brokers in its cluster""" + clusters = self.qmf.getObjects(_class="cluster") + if not clusters: raise ("%s is not a cluster member"%(self.url)) + def first_address(url): + """Python doesn't understand the brokers URL syntax. Extract a simple addres""" + return re.compile("amqp:tcp:([^,]*)").match(url).group(1) + return [Broker(first_address(url), self.qmf) for url in clusters[0].members.split(";")] + + def __del__(self): self.qmf.delBroker(self.broker) + +def main(argv=None): + if argv is None: argv = sys.argv + qmf = Session() + brokers = Broker(argv[1], qmf).get_cluster() + base = brokers.pop(0) + result = 0 + for b in brokers: + if not base.compare(b): result = 1 + del base + del brokers + return result + +if __name__ == "__main__": sys.exit(main()) |