diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2009-05-06 16:57:38 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2009-05-06 16:57:38 +0000 |
| commit | a7304a1b83f20daa0987cf120ae4feda94de8152 (patch) | |
| tree | b8351fa7e44aad3ec83f039bb74fdcabbf7753a6 /python/qpid/testlib.py | |
| parent | 51aa1c108e8b9c3b72aa1e81bcd9b41b3910c4b4 (diff) | |
| download | qpid-python-a7304a1b83f20daa0987cf120ae4feda94de8152.tar.gz | |
Added the ability to start and stop a test broker from within the python test framework. Also added some cluster test functionality
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@772359 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/testlib.py')
| -rw-r--r-- | python/qpid/testlib.py | 236 |
1 files changed, 234 insertions, 2 deletions
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index 114a56de08..8f5facfec6 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -53,6 +53,7 @@ def default(value, default): class TestRunner: SPEC_FOLDER = "../specs" + qpidd = os.getenv("QPIDD") """Runs unit tests. @@ -73,6 +74,9 @@ Options: 0-10-errata - use the 0-10 specification with qpid errata. -e/--errata <errata.xml> : file containing amqp XML errata -b/--broker [amqps://][<user>[/<password>]@]<host>[:<port>] : broker to connect to + -B/--start-broker <broker-args> : start a local broker using broker-args; set QPIDD + env to point to broker executable. broker-args will be + prepended with "--daemon --port=0" -v/--verbose : verbose - lists tests as they are run. -d/--debug : enable debug logging. -i/--ignore <test> : ignore the named test. @@ -82,6 +86,34 @@ Options: """ sys.exit(1) + def startBroker(self, brokerArgs): + """Start a single broker daemon""" + if TestRunner.qpidd == None: + self._die("QPIDD environment var must point to qpidd when using -B/--start-broker") + cmd = "%s --daemon --port=0 %s" % (TestRunner.qpidd, brokerArgs) + portStr = os.popen(cmd).read() + if len(portStr) == 0: + self._die("%s failed to start" % TestRunner.qpidd) + port = int(portStr) + pid = int(os.popen("%s -p %d -c" % (TestRunner.qpidd, port)).read()) + print "Started broker: pid=%d, port=%d" % (pid, port) + self.brokerTuple = (pid, port) + self.setBroker("localhost:%d" % port) + + def stopBroker(self): + """Stop the broker using qpidd -q""" + if self.brokerTuple: + ret = os.spawnl(os.P_WAIT, TestRunner.qpidd, TestRunner.qpidd, "--port=%d" % self.brokerTuple[1], "-q") + if ret != 0: + self._die("stop_node(): pid=%d port=%d: qpidd -q returned %d" % (self.brokerTuple[0], self.brokerTuple[1], ret)) + print "Stopped broker: pid=%d, port=%d" % self.brokerTuple + + def killBroker(self): + """Kill the broker using kill -9 (SIGTERM)""" + if self.brokerTuple: + os.kill(self.brokerTuple[0], signal.SIGTERM) + print "Killed broker: pid=%d, port=%d" % self.brokerTuple + def setBroker(self, broker): try: self.url = URL(broker) @@ -122,17 +154,22 @@ Options: self.skip_self_test = False try: - opts, self.tests = getopt(args, "s:e:b:h?dvSi:I:F:", + opts, self.tests = getopt(args, "s:e:b:B:h?dvSi:I:F:", ["help", "spec", "errata=", "broker=", - "verbose", "skip-self-test", "ignore", + "start-broker=", "verbose", "skip-self-test", "ignore", "ignore-file", "spec-folder"]) except GetoptError, e: self._die(str(e)) + # check for mutually exclusive options + if "-B" in opts or "--start-broker" in opts: + if "-b" in opts or "--broker" in opts: + self._die("Cannot use -B/--start-broker and -b/broker options together") for opt, value in opts: if opt in ("-?", "-h", "--help"): self._die() if opt in ("-s", "--spec"): self.specfile = value if opt in ("-e", "--errata"): self.errata.append(value) if opt in ("-b", "--broker"): self.setBroker(value) + if opt in ("-B", "--start-broker"): self.startBroker(value) if opt in ("-v", "--verbose"): self.verbose = 2 if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG) if opt in ("-i", "--ignore"): self.ignore.append(value) @@ -182,6 +219,7 @@ Options: return unittest.defaultTestLoader.loadTestsFromNames(self.tests) def run(self, args=sys.argv[1:]): + self.brokerTuple = None self._parseargs(args) runner = unittest.TextTestRunner(descriptions=False, verbosity=self.verbose) @@ -193,6 +231,7 @@ Options: for t in self.ignore: print t print "=======================================" + self.stopBroker() return result.wasSuccessful() def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None): @@ -390,3 +429,196 @@ class TestBase010(unittest.TestCase): session.message_subscribe(**keys) session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL) session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL) + + +class TestBaseCluster(unittest.TestCase): + """ + Base class for cluster tests. Provides methods for starting and stopping clusters and cluster nodes. + """ + _tempDir = os.getenv("TMPDIR") + _qpidd = os.getenv("QPIDD") + _storeLib = os.getenv("LIBSTORE") + _clusterLib = os.getenv("LIBCLUSTER") + + # --- Cluster helper functions --- + + """ + _clusterDict is a dictionary of clusters: + key = cluster name (string) + val = dictionary of node numbers: + key = node number (int) + val = tuple containing (pid, port) + For example, two clusters "TestCluster0" and "TestCluster1" containing several nodes would look as follows: + {"TestCluster0": {0: (pid0-0, port0-0), 1: (pid0-1, port0-1), ...}, "TestCluster1": {0: (pid1-0, port1-0), 1: (pid1-1, port1-1), ...}} + where pidm-n and portm-n are the int pid and port for TestCluster m node n respectively. + """ + _clusterDict = {} + + """Index for (pid, port) tuple""" + PID = 0 + PORT = 1 + + def startBroker(self, qpiddArgs, logFile = None): + """Start a single broker daemon, returns tuple (pid, port)""" + if self._qpidd == None: + raise Exception("Environment variable QPIDD is not set") + cmd = "%s --daemon --port=0 %s" % (self._qpidd, qpiddArgs) + portStr = os.popen(cmd).read() + if len(portStr) == 0: + err = "Broker daemon startup failed." + if logFile != None: + err += " See log file %s" % logFile + raise Exception(err) + port = int(portStr) + pid = int(os.popen("%s -p %d -c" % (self._qpidd, port)).read()) + #print "started broker: pid=%d, port=%d" % (pid, port) + return (pid, port) + + def createClusterNode(self, nodeNumber, clusterName): + """Create a node and add it to the named cluster""" + if self._tempDir == None: + raise Exception("Environment variable TMPDIR is not set") + if self._storeLib == None: + raise Exception("Environment variable LIBSTORE is not set") + if self._clusterLib == None: + raise Exception("Environment variable LIBCLUSTER is not set") + name = "%s-%d" % (clusterName, nodeNumber) + dataDir = os.path.join(self._tempDir, "cluster", name) + logFile = "%s.log" % dataDir + args = "--no-module-dir --load-module=%s --load-module=%s --data-dir=%s --cluster-name=%s --auth=no --log-enable=error+ --log-to-file=%s" % \ + (self._storeLib, self._clusterLib, dataDir, clusterName, logFile) + self._clusterDict[clusterName][nodeNumber] = self.startBroker(args, logFile) + + def createCluster(self, clusterName, numberNodes): + """Create a cluster containing an initial number of nodes""" + self._clusterDict[clusterName] = {} + for n in range(0, numberNodes): + self.createClusterNode(n, clusterName) + + def getTupleList(self): + """Get list of (pid, port) tuples of all known cluster brokers""" + tList = [] + for l in self._clusterDict.itervalues(): + for t in l.itervalues(): + tList.append(t) + return tList + + def getNumBrokers(self): + """Get total number of brokers in all known clusters""" + return len(self.getTupleList()) + + def checkNumBrokers(self, expected): + """Check that the total number of brokers in all known clusters is the expected value""" + if self.getNumBrokers() != expected: + raise Exception("Unexpected number of brokers: expected %d, found %d" % (expected, self.getNumBrokers())) + + def getClusterTupleList(self, clusterName): + """Get list of (pid, port) tuples of all nodes in named cluster""" + return self._clusterDict[clusterName].values() + + def getNumClusterBrokers(self, clusterName): + """Get total number of brokers in named cluster""" + return len(self.getClusterTupleList(clusterName)) + + def getNodeTuple(self, nodeNumber, clusterName): + """Get the (pid, port) tuple for the given cluster node""" + return self._clusterDict[clusterName][nodeNumber] + + def checkNumClusterBrokers(self, clusterName, expected): + """Check that the total number of brokers in the named cluster is the expected value""" + if self.getNumClusterBrokers(clusterName) != expected: + raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \ + (clusterName, expected, self.getNumClusterBrokers(clusterName))) + + def clusterExists(self, clusterName): + """ Return True if clusterName exists, False otherwise""" + return clusterName in self._clusterDict.keys() + + def clusterNodeExists(self, clusterName, nodeNumber): + """ Return True if nodeNumber in clusterName exists, False otherwise""" + if clusterName in self._clusterDict.keys(): + return nodeNumber in self._clusterDict[nodeName] + return False + + def createCheckCluster(self, clusterName, size): + """Create a cluster using the given name and size, then check the number of brokers""" + self.createCluster(clusterName, size) + self.checkNumClusterBrokers(clusterName, size) + + # Kill cluster nodes using signal 9 + + def killNode(self, nodeNumber, clusterName, updateDict = True): + """Kill the given node in the named cluster using kill -9""" + pid = self.getNodeTuple(nodeNumber, clusterName)[self.PID] + os.kill(pid, signal.SIGTERM) + #print "killed broker: pid=%d" % pid + if updateDict: + del(self._clusterDict[clusterName][nodeNumber]) + + def killCluster(self, clusterName, updateDict = True): + """Kill all nodes in the named cluster""" + for n in self._clusterDict[clusterName].iterkeys(): + self.killNode(n, clusterName, False) + if updateDict: + del(self._clusterDict[clusterName]) + + def killClusterCheck(self, clusterName): + """Kill the named cluster and check that the name is removed from the cluster dictionary""" + self.killCluster(clusterName) + if self.clusterExists(clusterName): + raise Exception("Unable to kill cluster %s; %d nodes still exist" % \ + (clusterName, self.getNumClusterBrokers(clusterName))) + + def killAllClusters(self): + """Kill all known clusters""" + for n in self._clusterDict.iterkeys(): + self.killCluster(n, False) + self._clusterDict.clear() + + def killAllClustersCheck(self): + """Kill all known clusters and check that the cluster dictionary is empty""" + self.killAllClusters() + self.checkNumBrokers(0) + + # Stop cluster nodes using qpidd -q + + def stopNode(self, nodeNumber, clusterName, updateDict = True): + """Stop the given node in the named cluster using qpidd -q""" + port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT] + ret = os.spawnl(os.P_WAIT, self._qpidd, self._qpidd, "--port=%d" % port, "-q") + if ret != 0: + raise Exception("stop_node(): cluster=\"%s\" nodeNumber=%d pid=%d port=%d: qpidd -q returned %d" % \ + (clusterName, nodeNumber, self.getNodeTuple(nodeNumber, clusterName)[self.PID], port, ret)) + #print "stopped broker: port=%d" % port + if updateDict: + del(self._clusterDict[clusterName][nodeNumber]) + + def stopAllClusters(self): + """Stop all known clusters""" + for n in self._clusterDict.iterkeys(): + self.stopCluster(n, False) + self._clusterDict.clear() + + + def stopCluster(self, clusterName, updateDict = True): + """Stop all nodes in the named cluster""" + for n in self._clusterDict[clusterName].iterkeys(): + self.stopNode(n, clusterName, False) + if updateDict: + del(self._clusterDict[clusterName]) + + def stopCheckCluster(self, clusterName): + """Stop the named cluster and check that the name is removed from the cluster dictionary""" + self.stopCluster(clusterName) + if self.clusterExists(clusterName): + raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName))) + def stopCheckAll(self): + """Kill all known clusters and check that the cluster dictionary is empty""" + self.stopAllClusters() + self.checkNumBrokers(0) + + def setUp(self): + pass + + def tearDown(self): + pass |
