diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/tests/testlib.py | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/testlib.py')
-rw-r--r-- | cpp/src/tests/testlib.py | 766 |
1 files changed, 0 insertions, 766 deletions
diff --git a/cpp/src/tests/testlib.py b/cpp/src/tests/testlib.py deleted file mode 100644 index 71ad59e5c1..0000000000 --- a/cpp/src/tests/testlib.py +++ /dev/null @@ -1,766 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# -# Support library for qpid python tests. -# - -import os, re, signal, subprocess, time, unittest - -class TestBase(unittest.TestCase): - """ - Base class for qpid tests. Provides broker start/stop/kill methods - """ - - """ - The following environment vars control if and how the test is run, and determine where many of the helper - executables/libs are to be found. - """ - _storeLib = os.getenv("STORE_LIB") - _storeEnable = _storeLib != None # Must be True for durability to be enabled during the test - _qpiddExec = os.getenv("QPIDD_EXEC", "/usr/sbin/qpidd") - _tempStoreDir = os.path.abspath(os.getenv("TMP_DATA_DIR", "/tmp/qpid")) - - """Global message counter ensures unique messages""" - _msgCnt = 0 - - # --- Helper functions for parameter handling --- - - def _paramBool(self, key, val, keyOnly = False): - if val == None: - return "" - if keyOnly: - if val: - return " --%s" % key - else: - return "" - else: - if val: - return " --%s yes" % key - else: - return " --%s no" % key - - # --- Helper functions for message creation --- - - def _makeMessage(self, msgSize): - msg = "Message-%04d" % self._msgCnt - self._msgCnt = self._msgCnt + 1 - msgLen = len(msg) - if msgSize > msgLen: - for i in range(msgLen, msgSize): - if i == msgLen: - msg += "-" - else: - msg += chr(ord('a') + (i % 26)) - return msg - - def _makeMessageList(self, numMsgs, msgSize): - if msgSize == None: - msgSize = 12 - msgs = "" - for m in range(0, numMsgs): - msgs += "%s\n" % self._makeMessage(msgSize) - return msgs - - # --- Starting and stopping a broker --- - - def startBroker(self, qpiddArgs, logFile = None): - """Start a single broker daemon, returns tuple (pid, port)""" - if self._qpiddExec == None: - raise Exception("Environment variable QPIDD is not set") - cmd = "%s --daemon --port=0 %s" % (self._qpiddExec, qpiddArgs) - portStr = os.popen(cmd).read() - if len(portStr) == 0: - err = "Broker daemon startup failed." - if logFile != None: - err += " See log file %s" % logFile - raise Exception(err) - port = int(portStr) - pidStr = os.popen("%s -p %d -c" % (self._qpiddExec, port)).read() - try: - pid = int(pidStr) - except: - raise Exception("Unable to get pid: \"%s -p %d -c\" returned %s" % (self._qpiddExec, port, pidStr)) - #print "started broker: pid=%d, port=%d args: %s" % (pid, port, qpiddArgs) - return (pid, port) - - def killBroker(self, nodeTuple, ignoreFailures = False): - """Kill a broker using kill -9""" - try: - os.kill(nodeTuple[self.PID], signal.SIGKILL) - try: - os.waitpid(nodeTuple[self.PID], 0) - except: - pass - #print "killed broker: port=%d pid=%d" % (nodeTuple[self.PORT], nodeTuple[self.PID]) - except: - if ignoreFailures: - print "WARNING: killBroker (port=%d pid=%d) failed - ignoring." % (nodeTuple[self.PORT], nodeTuple[self.PID]) - else: - raise - - def stopBroker(self, nodeTuple, ignoreFailures = False): - """Stop a broker using qpidd -q""" - try: - ret = os.spawnl(os.P_WAIT, self._qpiddExec, self._qpiddExec, "--port=%d" % nodeTuple[self.PORT], "--quit", "--no-module-dir") - if ret != 0: - raise Exception("stopBroker(): port=%d: qpidd -q returned %d" % (nodeTuple[self.PORT], ret)) - try: - os.waitpid(nodeTuple[self.PID], 0) - except: - pass - #print "stopped broker: port=%d pid=%d" % (nodeTuple[self.PORT], nodeTuple[self.PID]) - except: - if ignoreFailures: - print "WARNING: stopBroker (port=%d pid=%d) failed - ignoring." % (nodeTuple[self.PORT], nodeTuple[self.PID]) - else: - raise - - - -class TestBaseCluster(TestBase): - """ - Base class for cluster tests. Provides methods for starting and stopping clusters and cluster nodes. - """ - - """ - The following environment vars control if and how the test is run, and determine where many of the helper - executables/libs are to be found. - """ - _clusterLib = os.getenv("CLUSTER_LIB") - _clusterTestEnable = _clusterLib != None # Must be True for these cluster tests to run - _xmlLib = os.getenv("XML_LIB") - _xmlEnable = _xmlLib != None - _qpidConfigExec = os.getenv("QPID_CONFIG_EXEC", "/usr/bin/qpid-config") - _qpidRouteExec = os.getenv("QPID_ROUTE_EXEC", "/usr/bin/qpid-route") - _receiverExec = os.getenv("RECEIVER_EXEC", "/usr/libexec/qpid/test/receiver") - _senderExec = os.getenv("SENDER_EXEC", "/usr/libexec/qpid/test/sender") - - - """ - _clusterDict is a dictionary of clusters: - key = cluster name (string) - val = dictionary of node numbers: - key = integer node number - val = tuple containing (pid, port) - For example, two clusters "TestCluster0" and "TestCluster1" containing several nodes would look as follows: - {"TestCluster0": {0: (pid0-0, port0-0), 1: (pid0-1, port0-1), ...}, "TestCluster1": {0: (pid1-0, port1-0), 1: (pid1-1, port1-1), ...}} - where pidm-n and portm-n are the int pid and port for TestCluster m node n respectively. - """ - _clusterDict = {} - - """Index for (pid, port) tuple""" - PID = 0 - PORT = 1 - - def run(self, res): - """ Skip cluster testing if env var RUN_CLUSTER_TESTS is not defined.""" - if not self._clusterTestEnable: - return - unittest.TestCase.run(self, res) - - # --- Private helper / convenience functions --- - - def _checkPids(self, clusterName = None): - for pid, port in self.getTupleList(): - try: - os.kill(pid, 0) - except: - raise Exception("_checkPids(): Broker with pid %d expected but does not exist! (crashed?)" % pid) - - - # --- Starting cluster node(s) --- - - def createClusterNode(self, nodeNumber, clusterName): - """Create a node and add it to the named cluster""" - if self._tempStoreDir == None: - raise Exception("Environment variable TMP_DATA_DIR is not set") - if self._clusterLib == None: - raise Exception("Environment variable LIBCLUSTER is not set") - name = "%s-%d" % (clusterName, nodeNumber) - dataDir = os.path.join(self._tempStoreDir, "cluster", name) - logFile = "%s.log" % dataDir - args = "--no-module-dir --load-module=%s --data-dir=%s --cluster-name=%s --auth=no --log-enable=notice+ --log-to-file=%s" % \ - (self._clusterLib, dataDir, clusterName, logFile) - if self._storeEnable: - if self._storeLib == None: - raise Exception("Environment variable LIBSTORE is not set") - args += " --load-module %s" % self._storeLib - self._clusterDict[clusterName][nodeNumber] = self.startBroker(args, logFile) - - def createCluster(self, clusterName, numberNodes = 0): - """Create a cluster containing an initial number of nodes""" - self._clusterDict[clusterName] = {} - for n in range(0, numberNodes): - self.createClusterNode(n, clusterName) - - def waitForNodes(self, clusterName): - """Wait for all nodes to become active (ie finish cluster sync)""" - # TODO - connect to each known node in cluster - # Until this is done, wait a bit (hack) - time.sleep(1) - - # --- Cluster and node status --- - - def getTupleList(self, clusterName = None): - """Get list of (pid, port) tuples of all known cluster brokers""" - tList = [] - for c, l in self._clusterDict.iteritems(): - if clusterName == None or c == clusterName: - for t in l.itervalues(): - tList.append(t) - return tList - - def getNumBrokers(self): - """Get total number of brokers in all known clusters""" - return len(self.getTupleList()) - - def checkNumBrokers(self, expected = None, checkPids = True): - """Check that the total number of brokers in all known clusters is the expected value""" - if expected != None and self.getNumBrokers() != expected: - raise Exception("Unexpected number of brokers: expected %d, found %d" % (expected, self.getNumBrokers())) - if checkPids: - self._checkPids() - - def getClusterTupleList(self, clusterName): - """Get list of (pid, port) tuples of all nodes in named cluster""" - if clusterName in self._clusterDict: - return self._clusterDict[clusterName].values() - return [] - - def getNumClusterBrokers(self, clusterName): - """Get total number of brokers in named cluster""" - return len(self.getClusterTupleList(clusterName)) - - def getNodeTuple(self, nodeNumber, clusterName): - """Get the (pid, port) tuple for the given cluster node""" - return self._clusterDict[clusterName][nodeNumber] - - def checkNumClusterBrokers(self, clusterName, expected = None, checkPids = True, waitForNodes = True): - """Check that the total number of brokers in the named cluster is the expected value""" - if expected != None and self.getNumClusterBrokers(clusterName) != expected: - raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \ - (clusterName, expected, self.getNumClusterBrokers(clusterName))) - if checkPids: - self._checkPids(clusterName) - if waitForNodes: - self.waitForNodes(clusterName) - - def clusterExists(self, clusterName): - """ Return True if clusterName exists, False otherwise""" - return clusterName in self._clusterDict.keys() - - def clusterNodeExists(self, clusterName, nodeNumber): - """ Return True if nodeNumber in clusterName exists, False otherwise""" - if clusterName in self._clusterDict.keys(): - return nodeNumber in self._clusterDict[nodeName] - return False - - def createCheckCluster(self, clusterName, size): - """Create a cluster using the given name and size, then check the number of brokers""" - self.createCluster(clusterName, size) - self.checkNumClusterBrokers(clusterName, size) - - # --- Kill cluster nodes using signal 9 --- - - def killNode(self, nodeNumber, clusterName, updateDict = True, ignoreFailures = False): - """Kill the given node in the named cluster using kill -9""" - self.killBroker(self.getNodeTuple(nodeNumber, clusterName), ignoreFailures) - if updateDict: - del(self._clusterDict[clusterName][nodeNumber]) - - def killCluster(self, clusterName, updateDict = True, ignoreFailures = False): - """Kill all nodes in the named cluster""" - for n in self._clusterDict[clusterName].iterkeys(): - self.killNode(n, clusterName, False, ignoreFailures) - if updateDict: - del(self._clusterDict[clusterName]) - - def killClusterCheck(self, clusterName): - """Kill the named cluster and check that the name is removed from the cluster dictionary""" - self.killCluster(clusterName) - if self.clusterExists(clusterName): - raise Exception("Unable to kill cluster %s; %d nodes still exist" % \ - (clusterName, self.getNumClusterBrokers(clusterName))) - - def killAllClusters(self, ignoreFailures = False): - """Kill all known clusters""" - for n in self._clusterDict.iterkeys(): - self.killCluster(n, False, ignoreFailures) - self._clusterDict.clear() - - def killAllClustersCheck(self, ignoreFailures = False): - """Kill all known clusters and check that the cluster dictionary is empty""" - self.killAllClusters(ignoreFailures) - self.checkNumBrokers(0) - - # --- Stop cluster nodes using qpidd -q --- - - def stopNode(self, nodeNumber, clusterName, updateDict = True, ignoreFailures = False): - """Stop the given node in the named cluster using qpidd -q""" - self.stopBroker(self.getNodeTuple(nodeNumber, clusterName), ignoreFailures) - if updateDict: - del(self._clusterDict[clusterName][nodeNumber]) - - def stopAllClusters(self, ignoreFailures = False): - """Stop all known clusters""" - for n in self._clusterDict.iterkeys(): - self.stopCluster(n, False, ignoreFailures) - self._clusterDict.clear() - - - def stopCluster(self, clusterName, updateDict = True, ignoreFailures = False): - """Stop all nodes in the named cluster""" - for n in self._clusterDict[clusterName].iterkeys(): - self.stopNode(n, clusterName, False, ignoreFailures) - if updateDict: - del(self._clusterDict[clusterName]) - - def stopCheckCluster(self, clusterName, ignoreFailures = False): - """Stop the named cluster and check that the name is removed from the cluster dictionary""" - self.stopCluster(clusterName, True, ignoreFailures) - if self.clusterExists(clusterName): - raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName))) - - def stopAllCheck(self, ignoreFailures = False): - """Kill all known clusters and check that the cluster dictionary is empty""" - self.stopAllClusters() - self.checkNumBrokers(0) - - # --- qpid-config functions --- - - def _qpidConfig(self, nodeNumber, clusterName, action): - """Configure some aspect of a qpid broker using the qpid_config executable""" - port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT] - #print "%s -b localhost:%d %s" % (self._qpidConfigExec, port, action) - ret = os.spawnl(os.P_WAIT, self._qpidConfigExec, self._qpidConfigExec, "-b", "localhost:%d" % port, *action.split()) - if ret != 0: - raise Exception("_qpidConfig(): cluster=\"%s\" nodeNumber=%d port=%d action=\"%s\" returned %d" % \ - (clusterName, nodeNumber, port, action, ret)) - - def addExchange(self, nodeNumber, clusterName, exchangeType, exchangeName, durable = False, sequence = False, \ - ive = False): - """Add a named exchange.""" - action = "add exchange %s %s" % (exchangeType, exchangeName) - action += self._paramBool("durable", durable, True) - action += self._paramBool("sequence", sequence, True) - action += self._paramBool("ive", ive, True) - self._qpidConfig(nodeNumber, clusterName, action) - - def deleteExchange(self, nodeNumber, clusterName, exchangeName): - """Delete a named exchange""" - self._qpidConfig(nodeNumber, clusterName, "del exchange %s" % exchangeName) - - def addQueue(self, nodeNumber, clusterName, queueName, configArgs = None): - """Add a queue using qpid-config.""" - action = "add queue %s" % queueName - if self._storeEnable: - action += " --durable" - if configArgs != None: - action += " %s" % configArgs - self._qpidConfig(nodeNumber, clusterName, action) - - def delQueue(self, nodeNumber, clusterName, queueName): - """Delete a named queue using qpid-config.""" - self._qpidConfig(nodeNumber, clusterName, "del queue %s" % queueName) - - def bind(self, nodeNumber, clusterName, exchangeName, queueName, key): - """Create an exchange-queue binding using qpid-config.""" - self._qpidConfig(nodeNumber, clusterName, "bind %s %s %s" % (exchangeName, queueName, key)) - - def unbind(self, nodeNumber, clusterName, exchangeName, queueName, key): - """Remove an exchange-queue binding using qpid-config.""" - self._qpidConfig(nodeNumber, clusterName, "unbind %s %s %s" % (exchangeName, queueName, key)) - - # --- qpid-route functions (federation) --- - - def brokerDict(self, nodeNumber, clusterName, host = "localhost", user = None, password = None): - """Returns a dictionary containing the broker info to be passed to route functions""" - port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT] - return {"cluster": clusterName, "node":nodeNumber, "port":port, "host":host, "user":user, "password":password} - - def _brokerStr(self, brokerDict): - """Set up a broker string in the format [user/password@]host:port""" - str = "" - if brokerDict["user"] !=None and brokerDict["password"] != None: - str = "%s@%s" % (brokerDict["user"], brokerDict["password"]) - str += "%s:%d" % (brokerDict["host"], brokerDict["port"]) - return str - - def _qpidRoute(self, action): - """Set up a route using qpid-route""" - #print "%s %s" % (self._qpidRouteExec, action) - ret = os.spawnl(os.P_WAIT, self._qpidRouteExec, self._qpidRouteExec, *action.split()) - if ret != 0: - raise Exception("_qpidRoute(): action=\"%s\" returned %d" % (action, ret)) - - def routeDynamicAdd(self, destBrokerDict, srcBrokerDict, exchangeName): - self._qpidRoute("dynamic add %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName)) - - def routeDynamicDelete(self, destBrokerDict, srcBrokerDict, exchangeName): - self._qpidRoute("dynamic del %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName)) - - def routeAdd(self, destBrokerDict, srcBrokerDict, exchangeName, routingKey): - self._qpidRoute("route add %s %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName, routingKey)) - - def routeDelete(self, destBrokerDict, srcBrokerDict, exchangeName, routingKey): - self._qpidRoute("route del %s %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName, routingKey)) - - def routeQueueAdd(self, destBrokerDict, srcBrokerDict, exchangeName, queueName): - self._qpidRoute("queue add %s %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName, queueName)) - - def routeQueueDelete(self, destBrokerDict, srcBrokerDict, exchangeName, queueName): - self._qpidRoute("queue del %s %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName, queueName)) - - def routeLinkAdd(self, destBrokerDict, srcBrokerDict): - self._qpidRoute("link add %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict))) - - def routeLinkDelete(self, destBrokerDict, srcBrokerDict): - self._qpidRoute("link del %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict))) - - # --- Message send and receive functions --- - - def _receiver(self, action): - if self._receiverExec == None: - raise Exception("Environment variable RECEIVER is not set") - cmd = "%s %s" % (self._receiverExec, action) - #print cmd - return subprocess.Popen(cmd.split(), stdout = subprocess.PIPE) - - def _sender(self, action): - if self._senderExec == None: - raise Exception("Environment variable SENDER is not set") - cmd = "%s %s" % (self._senderExec, action) - #print cmd - return subprocess.Popen(cmd.split(), stdin = subprocess.PIPE) - - def createReciever(self, nodeNumber, clusterName, queueName, numMsgs = None, receiverArgs = None): - port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT] - action = "--port %d --queue %s" % (port, queueName) - if numMsgs != None: - action += " --messages %d" % numMsgs - if receiverArgs != None: - action += " %s" % receiverArgs - return self._receiver(action) - - def createSender(self, nodeNumber, clusterName, exchangeName, routingKey, senderArgs = None): - port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT] - action = "--port %d --exchange %s" % (port, exchangeName) - if routingKey != None and len(routingKey) > 0: - action += " --routing-key %s" % routingKey - if self._storeEnable: - action += " --durable yes" - if senderArgs != None: - action += " %s" % senderArgs - return self._sender(action) - - def createBindDirectExchangeQueue(self, nodeNumber, clusterName, exchangeName, queueName): - self.addExchange(nodeNumber, clusterName, "direct", exchangeName) - self.addQueue(nodeNumber, clusterName, queueName) - self.bind(nodeNumber, clusterName, exchangeName, queueName, queueName) - - def createBindTopicExchangeQueues(self, nodeNumber, clusterName, exchangeName, queueNameKeyList): - self.addExchange(nodeNumber, clusterName, "topic", exchangeName) - for queueName, key in queueNameKeyList.iteritems(): - self.addQueue(nodeNumber, clusterName, queueName) - self.bind(nodeNumber, clusterName, exchangeName, queueName, key) - - def createBindFanoutExchangeQueues(self, nodeNumber, clusterName, exchangeName, queueNameList): - self.addExchange(nodeNumber, clusterName, "fanout", exchangeName) - for queueName in queueNameList: - self.addQueue(nodeNumber, clusterName, queueName) - self.bind(nodeNumber, clusterName, exchangeName, queueName, "") - - def sendMsgs(self, nodeNumber, clusterName, exchangeName, routingKey, numMsgs, msgSize = None, wait = True): - msgs = self._makeMessageList(numMsgs, msgSize) - sender = self.createSender(nodeNumber, clusterName, exchangeName, routingKey) - sender.stdin.write(msgs) - sender.stdin.close() - if wait: - sender.wait() - return msgs - - def receiveMsgs(self, nodeNumber, clusterName, queueName, numMsgs, wait = True): - receiver = self.createReciever(nodeNumber, clusterName, queueName, numMsgs) - cnt = 0 - msgs = "" - while cnt < numMsgs: - rx = receiver.stdout.readline() - if rx == "" and receiver.poll() != None: break - msgs += rx - cnt = cnt + 1 - if wait: - receiver.wait() - return msgs - - - # --- Exchange-specific helper inner classes --- - - class TestHelper: - """ - This is a "virtual" superclass for test helpers, and is not useful on its own, but the - per-exchange subclasses are designed to keep track of the messages sent to and received - from queues which have bindings to that exchange type. - """ - - def __init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList): - - """Dictionary of queues and lists of messages sent to them.""" - self._txMsgs = {} - """Dictionary of queues and lists of messages received from them.""" - self._rxMsgs = {} - """List of node numbers currently in the cluster""" - self._nodes = [] - """List of node numbers which have been killed and can therefore be recovered""" - self._deadNodes = [] - """Last node to be used""" - self._lastNode = None - - self._testBaseCluster = testBaseCluster - self._clusterName = clusterName - self._exchangeName = exchangeName - self._queueNameList = queueNameList - self._addQueues(queueNameList) - self._testBaseCluster.createCheckCluster(clusterName, numNodes) - self._nodes.extend(range(0, numNodes)) - - def _addQueues(self, queueNameList): - for qn in queueNameList: - if not qn in self._txMsgs: - self._txMsgs[qn] = [] - if not qn in self._rxMsgs: - self._rxMsgs[qn] = [] - - def _bindQueue(self, queueName, bindingKey, nodeNumber = None): - """Bind a queue to an exchange using a binding key.""" - if nodeNumber == None: - nodeNumber = self._nodes[0] # first available node - self._testBaseCluster.addQueue(nodeNumber, self._clusterName, queueName) - self._testBaseCluster.bind(nodeNumber, self._clusterName, self._exchangeName, queueName, bindingKey) - - def _highestNodeNumber(self): - """Find the highest node number used so far between the current nodes and those stopped/killed.""" - highestNode = self._nodes[-1] - if len(self._deadNodes) == 0: - return highestNode - highestDeadNode = self._deadNodes[-1] - if highestNode > highestDeadNode: - return highestNode - return highestDeadNode - - def killCluster(self): - """Kill all nodes in the cluster""" - self._testBaseCluster.killCluster(self._clusterName) - self._testBaseCluster.checkNumClusterBrokers(self._clusterName, 0) - self._deadNodes.extend(self._nodes) - self._deadNodes.sort() - del self._nodes[:] - - def restoreCluster(self, lastNode = None, restoreNodes = True): - """Restore a previously killed cluster""" - self._testBaseCluster.createCluster(self._clusterName) - if restoreNodes: - numNodes = len(self._deadNodes) - self.restoreNodes(lastNode) - self._testBaseCluster.checkNumClusterBrokers(self._clusterName, numNodes) - - def addNodes(self, numberOfNodes = 1): - """Add a fixed number of nodes to the cluster.""" - nodeStart = self._highestNodeNumber() + 1 - for i in range(0, numberOfNodes): - nodeNumber = nodeStart + i - self._testBaseCluster.createClusterNode(nodeNumber, self._clusterName) - self._nodes.append(nodeNumber) - self._testBaseCluster.checkNumClusterBrokers(self._clusterName, len(self._nodes)) - self._testBaseCluster.waitForNodes(self._clusterName) - - def restoreNode(self, nodeNumber): - """Restore a cluster node that has been previously killed""" - if nodeNumber not in self._deadNodes: - raise Exception("restoreNode(): Node number %d not in dead node list %s" % (nodeNumber, self._deadNodes)) - self._testBaseCluster.createClusterNode(nodeNumber, self._clusterName) - self._deadNodes.remove(nodeNumber) - self._nodes.append(nodeNumber) - self._nodes.sort() - - def restoreNodes(self, lastNode = None): - """Restore all known cluster nodes that have been previously killed starting with a known last-used node""" - if len(self._nodes) == 0: # restore last-used node first - if lastNode == None: - lastNode = self._lastNode - self.restoreNode(lastNode) - while len(self._deadNodes) > 0: - self.restoreNode(self._deadNodes[0]) - self._testBaseCluster.waitForNodes(self._clusterName) - - def killNode(self, nodeNumber): - """Kill a cluster node (if it is in the _nodes list).""" - if nodeNumber not in self._nodes: - raise Exception("killNode(): Node number %d not in node list %s" % (nodeNumber, self._nodes)) - self._testBaseCluster.killNode(nodeNumber, self._clusterName) - self._nodes.remove(nodeNumber) - self._deadNodes.append(nodeNumber) - self._deadNodes.sort() - - def sendMsgs(self, routingKey, numMsgs, nodeNumber = None, msgSize = None, wait = True): - """Send a fixed number of messages using the given routing key.""" - if nodeNumber == None: - nodeNumber = self._nodes[0] # Use first available node - msgs = self._testBaseCluster._makeMessageList(numMsgs, msgSize) - sender = self._testBaseCluster.createSender(nodeNumber, self._clusterName, self._exchangeName, routingKey) - sender.stdin.write(msgs) - sender.stdin.close() - if wait: - sender.wait() - self._lastNode = nodeNumber - return msgs.split() - - # TODO - this i/f is messy: one mumMsgs can be given, but a list of queues - # so assuming numMsgs for each queue - # A mechanism is needed to specify a different numMsgs per queue - def receiveMsgs(self, numMsgs, nodeNumber = None, queueNameList = None, wait = True): - """Receive a fixed number of messages from a named queue. If numMsgs == None, get all remaining messages.""" - if nodeNumber == None: - nodeNumber = self._nodes[0] # Use first available node - if queueNameList == None: - queueNameList = self._txMsgs.iterkeys() - for qn in queueNameList: - nm = numMsgs - if nm == None: - nm = len(self._txMsgs[qn]) - len(self._rxMsgs[qn]) # get all remaining messages - if nm > 0: - while nm > 0: - receiver = self._testBaseCluster.createReciever(nodeNumber, self._clusterName, qn, nm) - cnt = 0 - while cnt < nm: - rx = receiver.stdout.readline().strip() - if rx == "": - if receiver.poll() != None: break - elif rx not in self._rxMsgs[qn]: - self._rxMsgs[qn].append(rx) - cnt = cnt + 1 - nm = nm - cnt - if wait: - receiver.wait() - self._rxMsgs[qn].sort() - self._lastNode = nodeNumber - - def receiveRemainingMsgs(self, nodeNumber = None, queueNameList = None, wait = True): - """Receive all remaining messages on named queue.""" - self.receiveMsgs(None, nodeNumber, queueNameList, wait) - - def checkMsgs(self): - """Return True if all expected messages have been received (ie the transmit and receive list are identical).""" - txMsgTot = 0 - rxMsgTot = 0 - for qn, txMsgList in self._txMsgs.iteritems(): - rxMsgList = self._rxMsgs[qn] - txMsgTot = txMsgTot + len(txMsgList) - rxMsgTot = rxMsgTot + len(rxMsgList) - if len(txMsgList) != len(rxMsgList): - return False - for i, m in enumerate(txMsgList): - if m != rxMsgList[i]: - return False - if txMsgTot == 0 and rxMsgTot == 0: - print "WARNING: No messages were either sent or received" - return True - - def finalizeTest(self): - """Recover all the remaining messages on all queues, then check that all expected messages were received.""" - self.receiveRemainingMsgs() - self._testBaseCluster.stopAllCheck() - if not self.checkMsgs(): - self.printMsgs() - self._testBaseCluster.fail("Send - receive message mismatch") - - def printMsgs(self, txMsgs = True, rxMsgs = True): - """Print all messages transmitted and received.""" - for qn, txMsgList in self._txMsgs.iteritems(): - print "Queue: %s" % qn - if txMsgs: - print " txMsgList = %s" % txMsgList - if rxMsgs: - rxMsgList = self._rxMsgs[qn] - print " rxMsgList = %s" % rxMsgList - - - class DirectExchangeTestHelper(TestHelper): - - def __init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList): - TestBaseCluster.TestHelper.__init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList) - self._testBaseCluster.addExchange(0, clusterName, "direct", exchangeName) - for qn in queueNameList: - self._bindQueue(qn, qn) - - def addQueues(self, queueNameList): - self._addQueues(queueNameList) - for qn in queueNameList: - self._bindQueue(qn, qn) - - def sendMsgs(self, numMsgs, nodeNumber = None, queueNameList = None, msgSize = None, wait = True): - if queueNameList == None: - queueNameList = self._txMsgs.iterkeys() - for qn in queueNameList: - self._txMsgs[qn].extend(TestBaseCluster.TestHelper.sendMsgs(self, qn, numMsgs, nodeNumber, msgSize, wait)) - - - class TopicExchangeTestHelper(TestHelper): - - def __init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameKeyList): - self._queueNameKeyList = queueNameKeyList - TestBaseCluster.TestHelper.__init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameKeyList.iterkeys()) - self._testBaseCluster.addExchange(0, clusterName, "topic", exchangeName) - for qn, bk in queueNameKeyList.iteritems(): - self._bindQueue(qn, bk) - - def addQueues(self, queueNameKeyList): - self._addQueues(queueNameKeyList.iterkeys()) - for qn, bk in queueNameKeyList.iteritems(): - self._bindQueue(qn, bk) - - def _prepareRegex(self, bk): - # This regex conversion is not very complete - there are other chars that should be escaped too - return "^%s$" % bk.replace(".", r"\.").replace("*", r"[^.]*").replace("#", ".*") - - def sendMsgs(self, routingKey, numMsgs, nodeNumber = None, msgSize = None, wait = True): - msgList = TestBaseCluster.TestHelper.sendMsgs(self, routingKey, numMsgs, nodeNumber, msgSize, wait) - for qn, bk in self._queueNameKeyList.iteritems(): - if re.match(self._prepareRegex(bk), routingKey): - self._txMsgs[qn].extend(msgList) - - - class FanoutExchangeTestHelper(TestHelper): - - def __init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList): - TestBaseCluster.TestHelper.__init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList) - self._testBaseCluster.addExchange(0, clusterName, "fanout", exchangeName) - for qn in queueNameList: - self._bindQueue(qn, "") - - def addQueues(self, queueNameList): - self._addQueues(queueNameList) - for qn in queueNameList: - self._bindQueue(qn, "") - - def sendMsgs(self, numMsgs, nodeNumber = None, msgSize = None, wait = True): - msgList = TestBaseCluster.TestHelper.sendMsgs(self, "", numMsgs, nodeNumber, msgSize, wait) - for ml in self._txMsgs.itervalues(): - ml.extend(msgList) - |