diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/python/qpid-python-test | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/qpid-python-test')
-rwxr-xr-x | qpid/python/qpid-python-test | 587 |
1 files changed, 587 insertions, 0 deletions
diff --git a/qpid/python/qpid-python-test b/qpid/python/qpid-python-test new file mode 100755 index 0000000000..a47f633565 --- /dev/null +++ b/qpid/python/qpid-python-test @@ -0,0 +1,587 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# TODO: summarize, test harness preconditions (e.g. broker is alive) + +import logging, optparse, os, struct, sys, time, traceback, types +from fnmatch import fnmatchcase as match +from getopt import GetoptError +from logging import getLogger, StreamHandler, Formatter, Filter, \ + WARN, DEBUG, ERROR +from qpid.harness import Skipped +from qpid.util import URL + +levels = { + "DEBUG": DEBUG, + "WARN": WARN, + "ERROR": ERROR + } + +sorted_levels = [(v, k) for k, v in levels.items()] +sorted_levels.sort() +sorted_levels = [v for k, v in sorted_levels] + +parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...", + description="Run tests matching the specified PATTERNs.") +parser.add_option("-l", "--list", action="store_true", default=False, + help="list tests instead of executing them") +parser.add_option("-b", "--broker", default="localhost", + help="run tests against BROKER (default %default)") +parser.add_option("-f", "--log-file", metavar="FILE", help="log output to FILE") +parser.add_option("-v", "--log-level", metavar="LEVEL", default="WARN", + help="only display log messages of LEVEL or higher severity: " + "%s (default %%default)" % ", ".join(sorted_levels)) +parser.add_option("-c", "--log-category", metavar="CATEGORY", action="append", + dest="log_categories", default=[], + help="log only categories matching CATEGORY pattern") +parser.add_option("-m", "--module", action="append", default=[], + dest="modules", help="add module to test search path") +parser.add_option("-i", "--ignore", action="append", default=[], + help="ignore tests matching IGNORE pattern") +parser.add_option("-I", "--ignore-file", metavar="IFILE", action="append", + default=[], + help="ignore tests matching patterns in IFILE") +parser.add_option("-H", "--halt-on-error", action="store_true", default=False, + dest="hoe", help="halt if an error is encountered") +parser.add_option("-t", "--time", action="store_true", default=False, + help="report timing information on test run") +parser.add_option("-D", "--define", metavar="DEFINE", dest="defines", + action="append", default=[], help="define test parameters") + +class Config: + + def __init__(self): + self.broker = URL("localhost") + self.defines = {} + self.log_file = None + self.log_level = WARN + self.log_categories = [] + +opts, args = parser.parse_args() + +includes = [] +excludes = ["*__*__"] +config = Config() +list_only = opts.list +config.broker = URL(opts.broker) +for d in opts.defines: + try: + idx = d.index("=") + name = d[:idx] + value = d[idx+1:] + config.defines[name] = value + except ValueError: + config.defines[d] = None +config.log_file = opts.log_file +config.log_level = levels[opts.log_level.upper()] +config.log_categories = opts.log_categories +excludes.extend([v.strip() for v in opts.ignore]) +for v in opts.ignore_file: + f = open(v) + for line in f: + line = line.strip() + if line.startswith("#"): + continue + excludes.append(line) + f.close() + +for a in args: + includes.append(a.strip()) + +if not includes: + if opts.modules: + includes.append("*") + else: + includes.extend(["qpid.tests.*"]) + +def is_ignored(path): + for p in excludes: + if match(path, p): + return True + return False + +def is_included(path): + if is_ignored(path): + return False + for p in includes: + if match(path, p): + return True + return False + +def is_smart(): + return sys.stdout.isatty() and os.environ.get("TERM", "dumb") != "dumb" + +try: + import fcntl, termios + + def width(): + if is_smart(): + s = struct.pack("HHHH", 0, 0, 0, 0) + fd_stdout = sys.stdout.fileno() + x = fcntl.ioctl(fd_stdout, termios.TIOCGWINSZ, s) + rows, cols, xpx, ypx = struct.unpack("HHHH", x) + return cols + else: + try: + return int(os.environ.get("COLUMNS", "80")) + except ValueError: + return 80 + + WIDTH = width() + + def resize(sig, frm): + global WIDTH + WIDTH = width() + + import signal + signal.signal(signal.SIGWINCH, resize) + +except ImportError: + WIDTH = 80 + +def vt100_attrs(*attrs): + return "\x1B[%sm" % ";".join(map(str, attrs)) + +vt100_reset = vt100_attrs(0) + +KEYWORDS = {"pass": (32,), + "skip": (33,), + "fail": (31,), + "start": (34,), + "total": (34,), + "ignored": (33,), + "selected": (34,), + "elapsed": (34,), + "average": (34,)} + +COLORIZE = is_smart() + +def colorize_word(word, text=None): + if text is None: + text = word + return colorize(text, *KEYWORDS.get(word, ())) + +def colorize(text, *attrs): + if attrs and COLORIZE: + return "%s%s%s" % (vt100_attrs(*attrs), text, vt100_reset) + else: + return text + +def indent(text): + lines = text.split("\n") + return " %s" % "\n ".join(lines) + +class Interceptor: + + def __init__(self): + self.newline = False + self.indent = False + self.passthrough = True + self.dirty = False + self.last = None + + def begin(self): + self.newline = True + self.indent = True + self.passthrough = False + self.dirty = False + self.last = None + + def reset(self): + self.newline = False + self.indent = False + self.passthrough = True + +class StreamWrapper: + + def __init__(self, interceptor, stream, prefix=" "): + self.interceptor = interceptor + self.stream = stream + self.prefix = prefix + + def fileno(self): + return self.stream.fileno() + + def isatty(self): + return self.stream.isatty() + + def write(self, s): + if self.interceptor.passthrough: + self.stream.write(s) + return + + if s: + self.interceptor.dirty = True + + if self.interceptor.newline: + self.interceptor.newline = False + self.stream.write(" %s\n" % colorize_word("start")) + self.interceptor.indent = True + if self.interceptor.indent: + self.stream.write(self.prefix) + if s.endswith("\n"): + s = s.replace("\n", "\n%s" % self.prefix)[:-2] + self.interceptor.indent = True + else: + s = s.replace("\n", "\n%s" % self.prefix) + self.interceptor.indent = False + self.stream.write(s) + + if s: + self.interceptor.last = s[-1] + + def flush(self): + self.stream.flush() + +interceptor = Interceptor() + +out_wrp = StreamWrapper(interceptor, sys.stdout) +err_wrp = StreamWrapper(interceptor, sys.stderr) + +out = sys.stdout +err = sys.stderr +sys.stdout = out_wrp +sys.stderr = err_wrp + +class PatternFilter(Filter): + + def __init__(self, *patterns): + Filter.__init__(self, patterns) + self.patterns = patterns + + def filter(self, record): + if not self.patterns: + return True + for p in self.patterns: + if match(record.name, p): + return True + return False + +root = getLogger() +handler = StreamHandler(sys.stdout) +filter = PatternFilter(*config.log_categories) +handler.addFilter(filter) +handler.setFormatter(Formatter("%(asctime)s %(levelname)s %(message)s")) +root.addHandler(handler) +root.setLevel(WARN) + +log = getLogger("qpid.test") + +PASS = "pass" +SKIP = "skip" +FAIL = "fail" + +class Runner: + + def __init__(self): + self.exceptions = [] + self.skip = False + + def passed(self): + return not self.exceptions + + def skipped(self): + return self.skip + + def failed(self): + return self.exceptions and not self.skip + + def halt(self): + return self.exceptions or self.skip + + def run(self, name, phase): + try: + phase() + except KeyboardInterrupt: + raise + except: + exi = sys.exc_info() + if issubclass(exi[0], Skipped): + self.skip = True + self.exceptions.append((name, exi)) + + def status(self): + if self.passed(): + return PASS + elif self.skipped(): + return SKIP + elif self.failed(): + return FAIL + else: + return None + + def print_exceptions(self): + for name, info in self.exceptions: + if issubclass(info[0], Skipped): + print indent("".join(traceback.format_exception_only(*info[:2]))).rstrip() + else: + print "Error during %s:" % name + print indent("".join(traceback.format_exception(*info))).rstrip() + +ST_WIDTH = 8 + +def run_test(name, test, config): + patterns = filter.patterns + level = root.level + filter.patterns = config.log_categories + root.setLevel(config.log_level) + + parts = name.split(".") + line = None + output = "" + for part in parts: + if line: + if len(line) + len(part) >= (WIDTH - ST_WIDTH - 1): + output += "%s. \\\n" % line + line = " %s" % part + else: + line = "%s.%s" % (line, part) + else: + line = part + + if line: + output += "%s %s" % (line, (((WIDTH - ST_WIDTH) - len(line))*".")) + sys.stdout.write(output) + sys.stdout.flush() + interceptor.begin() + try: + runner = test() + finally: + interceptor.reset() + if interceptor.dirty: + if interceptor.last != "\n": + sys.stdout.write("\n") + sys.stdout.write(output) + print " %s" % colorize_word(runner.status()) + if runner.failed() or runner.skipped(): + runner.print_exceptions() + root.setLevel(level) + filter.patterns = patterns + return runner.status() + +class FunctionTest: + + def __init__(self, test): + self.test = test + + def name(self): + return "%s.%s" % (self.test.__module__, self.test.__name__) + + def run(self): + return run_test(self.name(), self._run, config) + + def _run(self): + runner = Runner() + runner.run("test", lambda: self.test(config)) + return runner + + def __repr__(self): + return "FunctionTest(%r)" % self.test + +class MethodTest: + + def __init__(self, cls, method): + self.cls = cls + self.method = method + + def name(self): + return "%s.%s.%s" % (self.cls.__module__, self.cls.__name__, self.method) + + def run(self): + return run_test(self.name(), self._run, config) + + def _run(self): + runner = Runner() + inst = self.cls(self.method) + test = getattr(inst, self.method) + + if hasattr(inst, "configure"): + runner.run("configure", lambda: inst.configure(config)) + if runner.halt(): return runner + if hasattr(inst, "setUp"): + runner.run("setup", inst.setUp) + if runner.halt(): return runner + elif hasattr(inst, "setup"): + runner.run("setup", inst.setup) + if runner.halt(): return runner + + runner.run("test", test) + + if hasattr(inst, "tearDown"): + runner.run("teardown", inst.tearDown) + elif hasattr(inst, "teardown"): + runner.run("teardown", inst.teardown) + + return runner + + def __repr__(self): + return "MethodTest(%r, %r)" % (self.cls, self.method) + +class PatternMatcher: + + def __init__(self, *patterns): + self.patterns = patterns + + def matches(self, name): + for p in self.patterns: + if match(name, p): + return True + return False + +class FunctionScanner(PatternMatcher): + + def inspect(self, obj): + return type(obj) == types.FunctionType and self.matches(name) + + def descend(self, func): + # the None is required for older versions of python + return; yield None + + def extract(self, func): + yield FunctionTest(func) + +class ClassScanner(PatternMatcher): + + def inspect(self, obj): + return type(obj) in (types.ClassType, types.TypeType) and self.matches(obj.__name__) + + def descend(self, cls): + # the None is required for older versions of python + return; yield None + + def extract(self, cls): + names = dir(cls) + names.sort() + for name in names: + obj = getattr(cls, name) + t = type(obj) + if t == types.MethodType and name.startswith("test"): + yield MethodTest(cls, name) + +class ModuleScanner: + + def inspect(self, obj): + return type(obj) == types.ModuleType + + def descend(self, obj): + names = dir(obj) + names.sort() + for name in names: + yield getattr(obj, name) + + def extract(self, obj): + # the None is required for older versions of python + return; yield None + +class Harness: + + def __init__(self): + self.scanners = [ + ModuleScanner(), + ClassScanner("*Test", "*Tests", "*TestCase"), + FunctionScanner("test_*") + ] + self.tests = [] + self.scanned = [] + + def scan(self, *roots): + objects = list(roots) + + while objects: + obj = objects.pop(0) + for s in self.scanners: + if s.inspect(obj): + self.tests.extend(s.extract(obj)) + for child in s.descend(obj): + if not (child in self.scanned or child in objects): + objects.append(child) + self.scanned.append(obj) + +modules = opts.modules +if not modules: + modules.extend(["qpid.tests"]) +h = Harness() +for name in modules: + m = __import__(name, None, None, ["dummy"]) + h.scan(m) + +filtered = [t for t in h.tests if is_included(t.name())] +ignored = [t for t in h.tests if is_ignored(t.name())] +total = len(filtered) + len(ignored) + +passed = 0 +failed = 0 +skipped = 0 +start = time.time() +for t in filtered: + if list_only: + print t.name() + else: + st = t.run() + if st == PASS: + passed += 1 + elif st == SKIP: + skipped += 1 + elif st == FAIL: + failed += 1 + if opts.hoe: + break +end = time.time() + +run = passed + failed + +if not list_only: + if passed: + _pass = "pass" + else: + _pass = "fail" + if failed: + outcome = "fail" + else: + outcome = "pass" + if ignored: + ign = "ignored" + else: + ign = "pass" + if skipped: + skip = "skip" + else: + skip = "pass" + print colorize("Totals:", 1), + totals = [colorize_word("total", "%s tests" % total), + colorize_word(_pass, "%s passed" % passed), + colorize_word(skip, "%s skipped" % skipped), + colorize_word(ign, "%s ignored" % len(ignored)), + colorize_word(outcome, "%s failed" % failed)] + print ", ".join(totals), + if opts.hoe and failed > 0: + print " -- (halted after %s)" % run + else: + print + if opts.time and run > 0: + print colorize("Timing:", 1), + timing = [colorize_word("elapsed", "%.2fs elapsed" % (end - start)), + colorize_word("average", "%.2fs average" % ((end - start)/run))] + print ", ".join(timing) + +if failed or skipped: + sys.exit(1) +else: + sys.exit(0) |