# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # # Support library for qpid python tests. # import sys, re, unittest, os, random, logging, traceback import qpid.client, qpid.spec import Queue from fnmatch import fnmatch from getopt import getopt, GetoptError from qpid.content import Content from qpid.message import Message #0-10 support from qpid.connection import Connection from qpid.spec010 import load from qpid.util import connect def findmodules(root): """Find potential python modules under directory root""" found = [] for dirpath, subdirs, files in os.walk(root): modpath = dirpath.replace(os.sep, '.') if not re.match(r'\.svn$', dirpath): # Avoid SVN directories for f in files: match = re.match(r'(.+)\.py$', f) if match and f != '__init__.py': found.append('.'.join([modpath, match.group(1)])) return found def default(value, default): if (value == None): return default else: return value class TestRunner: SPEC_FOLDER = "../specs" """Runs unit tests. Parses command line arguments, provides utility functions for tests, runs the selected test suite. """ def _die(self, message = None): if message: print message print """ run-tests [options] [test*] The name of a test is package.module.ClassName.testMethod Options: -?/-h/--help : this message -s/--spec : URL of AMQP XML specification or one of these abbreviations: 0-8 - use the default 0-8 specification. 0-9 - use the default 0-9 specification. -e/--errata : file containing amqp XML errata -b/--broker [[/]@][:] : broker to connect to -v/--verbose : verbose - lists tests as they are run. -d/--debug : enable debug logging. -i/--ignore : ignore the named test. -I/--ignore-file : file containing patterns to ignore. -S/--skip-self-test : skips the client self tests in the 'tests folder' -F/--spec-folder : folder that contains the specs to be loaded """ sys.exit(1) def setBroker(self, broker): rex = re.compile(r""" # [ [ / ] @] [ : ] ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X) match = rex.match(broker) if not match: self._die("'%s' is not a valid broker" % (broker)) self.user, self.password, self.host, self.port = match.groups() self.port = int(default(self.port, 5672)) self.user = default(self.user, "guest") self.password = default(self.password, "guest") def ignoreFile(self, filename): f = file(filename) for line in f.readlines(): self.ignore.append(line.strip()) f.close() def use08spec(self): "True if we are running with the old 0-8 spec." # NB: AMQP 0-8 identifies itself as 8-0 for historical reasons. return self.spec.major==8 and self.spec.minor==0 def _parseargs(self, args): # Defaults self.setBroker("localhost") self.verbose = 1 self.ignore = [] self.specfile = "0-8" self.errata = [] self.skip_self_test = False try: opts, self.tests = getopt(args, "s:e:b:h?dvSi:I:F:", ["help", "spec", "errata=", "server", "verbose", "skip-self-test", "ignore", "ignore-file", "spec-folder"]) except GetoptError, e: self._die(str(e)) for opt, value in opts: if opt in ("-?", "-h", "--help"): self._die() if opt in ("-s", "--spec"): self.specfile = value if opt in ("-e", "--errata"): self.errata.append(value) if opt in ("-b", "--broker"): self.setBroker(value) if opt in ("-v", "--verbose"): self.verbose = 2 if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG) if opt in ("-i", "--ignore"): self.ignore.append(value) if opt in ("-I", "--ignore-file"): self.ignoreFile(value) if opt in ("-S", "--skip-self-test"): self.skip_self_test = True if opt in ("-F", "--spec-folder"): TestRunner.SPEC_FOLDER = value # Abbreviations for default settings. if (self.specfile == "0-10"): self.spec = load(self.get_spec_file("amqp.0-10.xml")) elif (self.specfile == "0-10-errata"): self.spec = load(self.get_spec_file("amqp.0-10-qpid-errata.xml")) else: if (self.specfile == "0-8"): self.specfile = self.get_spec_file("amqp.0-8.xml") elif (self.specfile == "0-9"): self.specfile = self.get_spec_file("amqp.0-9.xml") self.errata.append(self.get_spec_file("amqp-errata.0-9.xml")) if (self.specfile == None): self._die("No XML specification provided") print "Using specification from:", self.specfile self.spec = qpid.spec.load(self.specfile, *self.errata) if len(self.tests) == 0: if not self.skip_self_test: self.tests=findmodules("tests") if self.use08spec(): self.tests+=findmodules("tests_0-8") elif (self.spec.major == 99 and self.spec.minor == 0): self.tests+=findmodules("tests_0-10_preview") elif (self.spec.major == 0 and self.spec.minor == 10): self.tests+=findmodules("tests_0-10") else: self.tests+=findmodules("tests_0-9") def testSuite(self): class IgnoringTestSuite(unittest.TestSuite): def addTest(self, test): if isinstance(test, unittest.TestCase): for pattern in testrunner.ignore: if fnmatch(test.id(), pattern): return unittest.TestSuite.addTest(self, test) # Use our IgnoringTestSuite in the test loader. unittest.TestLoader.suiteClass = IgnoringTestSuite return unittest.defaultTestLoader.loadTestsFromNames(self.tests) def run(self, args=sys.argv[1:]): self._parseargs(args) runner = unittest.TextTestRunner(descriptions=False, verbosity=self.verbose) result = runner.run(self.testSuite()) if (self.ignore): print "=======================================" print "NOTE: the following tests were ignored:" for t in self.ignore: print t print "=======================================" return result.wasSuccessful() def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None): """Connect to the broker, returns a qpid.client.Client""" host = host or self.host port = port or self.port spec = spec or self.spec user = user or self.user password = password or self.password client = qpid.client.Client(host, port, spec) if self.use08spec(): client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params) else: client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params) return client def get_spec_file(self, fname): return TestRunner.SPEC_FOLDER + os.sep + fname # Global instance for tests to call connect. testrunner = TestRunner() class TestBase(unittest.TestCase): """Base class for Qpid test cases. self.client is automatically connected with channel 1 open before the test methods are run. Deletes queues and exchanges after. Tests call self.queue_declare(channel, ...) and self.exchange_declare(chanel, ...) which are wrappers for the Channel functions that note resources to clean up later. """ def setUp(self): self.queues = [] self.exchanges = [] self.client = self.connect() self.channel = self.client.channel(1) self.version = (self.client.spec.major, self.client.spec.minor) if self.version == (8, 0): self.channel.channel_open() else: self.channel.session_open() def tearDown(self): try: for ch, q in self.queues: ch.queue_delete(queue=q) for ch, ex in self.exchanges: ch.exchange_delete(exchange=ex) except: print "Error on tearDown:" print traceback.print_exc() if not self.client.closed: self.client.channel(0).connection_close(reply_code=200) else: self.client.close() def connect(self, *args, **keys): """Create a new connction, return the Client object""" return testrunner.connect(*args, **keys) def queue_declare(self, channel=None, *args, **keys): channel = channel or self.channel reply = channel.queue_declare(*args, **keys) self.queues.append((channel, keys["queue"])) return reply def exchange_declare(self, channel=None, ticket=0, exchange='', type='', passive=False, durable=False, auto_delete=False, arguments={}): channel = channel or self.channel reply = channel.exchange_declare(ticket=ticket, exchange=exchange, type=type, passive=passive,durable=durable, auto_delete=auto_delete, arguments=arguments) self.exchanges.append((channel,exchange)) return reply def uniqueString(self): """Generate a unique string, unique for this TestBase instance""" if not "uniqueCounter" in dir(self): self.uniqueCounter = 1; return "Test Message " + str(self.uniqueCounter) def consume(self, queueName): """Consume from named queue returns the Queue object.""" if testrunner.use08spec(): reply = self.channel.basic_consume(queue=queueName, no_ack=True) return self.client.queue(reply.consumer_tag) else: if not "uniqueTag" in dir(self): self.uniqueTag = 1 else: self.uniqueTag += 1 consumer_tag = "tag" + str(self.uniqueTag) self.channel.message_subscribe(queue=queueName, destination=consumer_tag) self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) return self.client.queue(consumer_tag) def subscribe(self, channel=None, **keys): channel = channel or self.channel consumer_tag = keys["destination"] channel.message_subscribe(**keys) channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) def assertEmpty(self, queue): """Assert that the queue is empty""" try: queue.get(timeout=1) self.fail("Queue is not empty.") except Queue.Empty: None # Ignore def assertPublishGet(self, queue, exchange="", routing_key="", properties=None): """ Publish to exchange and assert queue.get() returns the same message. """ body = self.uniqueString() if testrunner.use08spec(): self.channel.basic_publish( exchange=exchange, content=Content(body, properties=properties), routing_key=routing_key) else: self.channel.message_transfer( destination=exchange, content=Content(body, properties={'application_headers':properties,'routing_key':routing_key})) msg = queue.get(timeout=1) if testrunner.use08spec(): self.assertEqual(body, msg.content.body) if (properties): self.assertEqual(properties, msg.content.properties) else: self.assertEqual(body, msg.content.body) if (properties): self.assertEqual(properties, msg.content['application_headers']) def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): """ Publish a message and consume it, assert it comes back intact. Return the Queue object used to consume. """ self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) def assertChannelException(self, expectedCode, message): if self.version == (8, 0): #or "transitional" in self.client.spec.file: if not isinstance(message, Message): self.fail("expected channel_close method, got %s" % (message)) self.assertEqual("channel", message.method.klass.name) self.assertEqual("close", message.method.name) else: if not isinstance(message, Message): self.fail("expected session_closed method, got %s" % (message)) self.assertEqual("session", message.method.klass.name) self.assertEqual("closed", message.method.name) self.assertEqual(expectedCode, message.reply_code) def assertConnectionException(self, expectedCode, message): if not isinstance(message, Message): self.fail("expected connection_close method, got %s" % (message)) self.assertEqual("connection", message.method.klass.name) self.assertEqual("close", message.method.name) self.assertEqual(expectedCode, message.reply_code) class TestBase010(unittest.TestCase): """ Base class for Qpid test cases. using the final 0-10 spec """ def setUp(self): spec = testrunner.spec self.conn = Connection(connect(testrunner.host, testrunner.port), spec, username=testrunner.user, password=testrunner.password) self.conn.start(timeout=10) self.session = self.conn.session("test-session", timeout=10) def connect(self, host=None, port=None): spec = testrunner.spec conn = Connection(connect(host or testrunner.host, port or testrunner.port), spec) conn.start(timeout=10) return conn def tearDown(self): if not self.session.error(): self.session.close(timeout=10) self.conn.close(timeout=10) def subscribe(self, session=None, **keys): session = session or self.session consumer_tag = keys["destination"] session.message_subscribe(**keys) session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)