diff options
Diffstat (limited to 'python/qpid/testlib.py')
| -rw-r--r-- | python/qpid/testlib.py | 300 |
1 files changed, 75 insertions, 225 deletions
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index b5aa59f586..1439b892ea 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -21,191 +21,13 @@ # Support library for qpid python tests. # -import sys, re, unittest, os, random, logging, traceback -import qpid.client, qpid.spec +import unittest, traceback, socket +import qpid.client, qmf.console import Queue -from fnmatch import fnmatch -from getopt import getopt, GetoptError from qpid.content import Content from qpid.message import Message - -#0-10 support -from qpid.connection import Connection -from qpid.spec010 import load -from qpid.util import connect - -def findmodules(root): - """Find potential python modules under directory root""" - found = [] - for dirpath, subdirs, files in os.walk(root): - modpath = dirpath.replace(os.sep, '.') - if not re.match(r'\.svn$', dirpath): # Avoid SVN directories - for f in files: - match = re.match(r'(.+)\.py$', f) - if match and f != '__init__.py': - found.append('.'.join([modpath, match.group(1)])) - return found - -def default(value, default): - if (value == None): return default - else: return value - -class TestRunner: - - SPEC_FOLDER = "../specs" - - """Runs unit tests. - - Parses command line arguments, provides utility functions for tests, - runs the selected test suite. - """ - - def _die(self, message = None): - if message: print message - print """ -run-tests [options] [test*] -The name of a test is package.module.ClassName.testMethod -Options: - -?/-h/--help : this message - -s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations: - 0-8 - use the default 0-8 specification. - 0-9 - use the default 0-9 specification. - -e/--errata <errata.xml> : file containing amqp XML errata - -b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to - -v/--verbose : verbose - lists tests as they are run. - -d/--debug : enable debug logging. - -i/--ignore <test> : ignore the named test. - -I/--ignore-file : file containing patterns to ignore. - -S/--skip-self-test : skips the client self tests in the 'tests folder' - -F/--spec-folder : folder that contains the specs to be loaded - """ - sys.exit(1) - - def setBroker(self, broker): - rex = re.compile(r""" - # [ <user> [ / <password> ] @] <host> [ :<port> ] - ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X) - match = rex.match(broker) - if not match: self._die("'%s' is not a valid broker" % (broker)) - self.user, self.password, self.host, self.port = match.groups() - self.port = int(default(self.port, 5672)) - self.user = default(self.user, "guest") - self.password = default(self.password, "guest") - - def ignoreFile(self, filename): - f = file(filename) - for line in f.readlines(): self.ignore.append(line.strip()) - f.close() - - def use08spec(self): - "True if we are running with the old 0-8 spec." - # NB: AMQP 0-8 identifies itself as 8-0 for historical reasons. - return self.spec.major==8 and self.spec.minor==0 - - def _parseargs(self, args): - # Defaults - self.setBroker("localhost") - self.verbose = 1 - self.ignore = [] - self.specfile = "0-8" - self.errata = [] - self.skip_self_test = False - - try: - opts, self.tests = getopt(args, "s:e:b:h?dvSi:I:F:", - ["help", "spec", "errata=", "server", - "verbose", "skip-self-test", "ignore", - "ignore-file", "spec-folder"]) - except GetoptError, e: - self._die(str(e)) - for opt, value in opts: - if opt in ("-?", "-h", "--help"): self._die() - if opt in ("-s", "--spec"): self.specfile = value - if opt in ("-e", "--errata"): self.errata.append(value) - if opt in ("-b", "--broker"): self.setBroker(value) - if opt in ("-v", "--verbose"): self.verbose = 2 - if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG) - if opt in ("-i", "--ignore"): self.ignore.append(value) - if opt in ("-I", "--ignore-file"): self.ignoreFile(value) - if opt in ("-S", "--skip-self-test"): self.skip_self_test = True - if opt in ("-F", "--spec-folder"): TestRunner.SPEC_FOLDER = value - # Abbreviations for default settings. - if (self.specfile == "0-10"): - self.spec = load(self.get_spec_file("amqp.0-10.xml")) - elif (self.specfile == "0-10-errata"): - self.spec = load(self.get_spec_file("amqp.0-10-qpid-errata.xml")) - else: - if (self.specfile == "0-8"): - self.specfile = self.get_spec_file("amqp.0-8.xml") - elif (self.specfile == "0-9"): - self.specfile = self.get_spec_file("amqp.0-9.xml") - self.errata.append(self.get_spec_file("amqp-errata.0-9.xml")) - - if (self.specfile == None): - self._die("No XML specification provided") - print "Using specification from:", self.specfile - - self.spec = qpid.spec.load(self.specfile, *self.errata) - - if len(self.tests) == 0: - if not self.skip_self_test: - self.tests=findmodules("tests") - if self.use08spec(): - self.tests+=findmodules("tests_0-8") - elif (self.spec.major == 99 and self.spec.minor == 0): - self.tests+=findmodules("tests_0-10_preview") - elif (self.spec.major == 0 and self.spec.minor == 10): - self.tests+=findmodules("tests_0-10") - else: - self.tests+=findmodules("tests_0-9") - - def testSuite(self): - class IgnoringTestSuite(unittest.TestSuite): - def addTest(self, test): - if isinstance(test, unittest.TestCase): - for pattern in testrunner.ignore: - if fnmatch(test.id(), pattern): - return - unittest.TestSuite.addTest(self, test) - - # Use our IgnoringTestSuite in the test loader. - unittest.TestLoader.suiteClass = IgnoringTestSuite - return unittest.defaultTestLoader.loadTestsFromNames(self.tests) - - def run(self, args=sys.argv[1:]): - self._parseargs(args) - runner = unittest.TextTestRunner(descriptions=False, - verbosity=self.verbose) - result = runner.run(self.testSuite()) - - if (self.ignore): - print "=======================================" - print "NOTE: the following tests were ignored:" - for t in self.ignore: print t - print "=======================================" - - return result.wasSuccessful() - - def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None): - """Connect to the broker, returns a qpid.client.Client""" - host = host or self.host - port = port or self.port - spec = spec or self.spec - user = user or self.user - password = password or self.password - client = qpid.client.Client(host, port, spec) - if self.use08spec(): - client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params) - else: - client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params) - return client - - def get_spec_file(self, fname): - return TestRunner.SPEC_FOLDER + os.sep + fname - -# Global instance for tests to call connect. -testrunner = TestRunner() - +from qpid.harness import Skipped +from qpid.exceptions import VersionError class TestBase(unittest.TestCase): """Base class for Qpid test cases. @@ -219,13 +41,16 @@ class TestBase(unittest.TestCase): resources to clean up later. """ + def configure(self, config): + self.config = config + def setUp(self): self.queues = [] self.exchanges = [] self.client = self.connect() self.channel = self.client.channel(1) self.version = (self.client.spec.major, self.client.spec.minor) - if self.version == (8, 0): + if self.version == (8, 0) or self.version == (0, 9): self.channel.channel_open() else: self.channel.session_open() @@ -245,9 +70,26 @@ class TestBase(unittest.TestCase): else: self.client.close() - def connect(self, *args, **keys): + def connect(self, host=None, port=None, user=None, password=None, tune_params=None): """Create a new connction, return the Client object""" - return testrunner.connect(*args, **keys) + host = host or self.config.broker.host + port = port or self.config.broker.port or 5672 + user = user or "guest" + password = password or "guest" + client = qpid.client.Client(host, port) + try: + if client.spec.major == 8 and client.spec.minor == 0: + client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params) + else: + client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params) + except qpid.client.Closed, e: + if isinstance(e.args[0], VersionError): + raise Skipped(e.args[0]) + else: + raise e + except socket.error, e: + raise Skipped(e) + return client def queue_declare(self, channel=None, *args, **keys): channel = channel or self.channel @@ -271,24 +113,15 @@ class TestBase(unittest.TestCase): def consume(self, queueName): """Consume from named queue returns the Queue object.""" - if testrunner.use08spec(): - reply = self.channel.basic_consume(queue=queueName, no_ack=True) - return self.client.queue(reply.consumer_tag) - else: - if not "uniqueTag" in dir(self): self.uniqueTag = 1 - else: self.uniqueTag += 1 - consumer_tag = "tag" + str(self.uniqueTag) - self.channel.message_subscribe(queue=queueName, destination=consumer_tag) - self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) - self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) - return self.client.queue(consumer_tag) + reply = self.channel.basic_consume(queue=queueName, no_ack=True) + return self.client.queue(reply.consumer_tag) def subscribe(self, channel=None, **keys): channel = channel or self.channel consumer_tag = keys["destination"] channel.message_subscribe(**keys) - channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) - channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) + channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL) + channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL) def assertEmpty(self, queue): """Assert that the queue is empty""" @@ -302,24 +135,14 @@ class TestBase(unittest.TestCase): Publish to exchange and assert queue.get() returns the same message. """ body = self.uniqueString() - if testrunner.use08spec(): - self.channel.basic_publish( - exchange=exchange, - content=Content(body, properties=properties), - routing_key=routing_key) - else: - self.channel.message_transfer( - destination=exchange, - content=Content(body, properties={'application_headers':properties,'routing_key':routing_key})) + self.channel.basic_publish( + exchange=exchange, + content=Content(body, properties=properties), + routing_key=routing_key) msg = queue.get(timeout=1) - if testrunner.use08spec(): - self.assertEqual(body, msg.content.body) - if (properties): - self.assertEqual(properties, msg.content.properties) - else: - self.assertEqual(body, msg.content.body) - if (properties): - self.assertEqual(properties, msg.content['application_headers']) + self.assertEqual(body, msg.content.body) + if (properties): + self.assertEqual(properties, msg.content.properties) def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): """ @@ -329,7 +152,7 @@ class TestBase(unittest.TestCase): self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) def assertChannelException(self, expectedCode, message): - if self.version == (8, 0): #or "transitional" in self.client.spec.file: + if self.version == (8, 0) or self.version == (0, 9): if not isinstance(message, Message): self.fail("expected channel_close method, got %s" % (message)) self.assertEqual("channel", message.method.klass.name) self.assertEqual("close", message.method.name) @@ -346,31 +169,58 @@ class TestBase(unittest.TestCase): self.assertEqual("close", message.method.name) self.assertEqual(expectedCode, message.reply_code) +#0-10 support +from qpid.connection import Connection +from qpid.util import connect, ssl, URL + class TestBase010(unittest.TestCase): """ Base class for Qpid test cases. using the final 0-10 spec """ + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + def setUp(self): - spec = testrunner.spec - self.conn = Connection(connect(testrunner.host, testrunner.port), spec, - username=testrunner.user, password=testrunner.password) - self.conn.start(timeout=10) + self.conn = self.connect() self.session = self.conn.session("test-session", timeout=10) + self.qmf = None + + def startQmf(self, handler=None): + self.qmf = qmf.console.Session(handler) + self.qmf_broker = self.qmf.addBroker(str(self.broker)) def connect(self, host=None, port=None): - spec = testrunner.spec - conn = Connection(connect(host or testrunner.host, port or testrunner.port), spec) - conn.start(timeout=10) + url = self.broker + if url.scheme == URL.AMQPS: + default_port = 5671 + else: + default_port = 5672 + try: + sock = connect(host or url.host, port or url.port or default_port) + except socket.error, e: + raise Skipped(e) + if url.scheme == URL.AMQPS: + sock = ssl(sock) + conn = Connection(sock, username=url.user or "guest", + password=url.password or "guest") + try: + conn.start(timeout=10) + except VersionError, e: + raise Skipped(e) return conn def tearDown(self): if not self.session.error(): self.session.close(timeout=10) self.conn.close(timeout=10) + if self.qmf: + self.qmf.delBroker(self.qmf_broker) def subscribe(self, session=None, **keys): session = session or self.session consumer_tag = keys["destination"] session.message_subscribe(**keys) - session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) - session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) + session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL) + session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL) |
