summaryrefslogtreecommitdiff
path: root/python/qpid/testlib.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/testlib.py')
-rw-r--r--python/qpid/testlib.py300
1 files changed, 75 insertions, 225 deletions
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index b5aa59f586..1439b892ea 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -21,191 +21,13 @@
# Support library for qpid python tests.
#
-import sys, re, unittest, os, random, logging, traceback
-import qpid.client, qpid.spec
+import unittest, traceback, socket
+import qpid.client, qmf.console
import Queue
-from fnmatch import fnmatch
-from getopt import getopt, GetoptError
from qpid.content import Content
from qpid.message import Message
-
-#0-10 support
-from qpid.connection import Connection
-from qpid.spec010 import load
-from qpid.util import connect
-
-def findmodules(root):
- """Find potential python modules under directory root"""
- found = []
- for dirpath, subdirs, files in os.walk(root):
- modpath = dirpath.replace(os.sep, '.')
- if not re.match(r'\.svn$', dirpath): # Avoid SVN directories
- for f in files:
- match = re.match(r'(.+)\.py$', f)
- if match and f != '__init__.py':
- found.append('.'.join([modpath, match.group(1)]))
- return found
-
-def default(value, default):
- if (value == None): return default
- else: return value
-
-class TestRunner:
-
- SPEC_FOLDER = "../specs"
-
- """Runs unit tests.
-
- Parses command line arguments, provides utility functions for tests,
- runs the selected test suite.
- """
-
- def _die(self, message = None):
- if message: print message
- print """
-run-tests [options] [test*]
-The name of a test is package.module.ClassName.testMethod
-Options:
- -?/-h/--help : this message
- -s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations:
- 0-8 - use the default 0-8 specification.
- 0-9 - use the default 0-9 specification.
- -e/--errata <errata.xml> : file containing amqp XML errata
- -b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to
- -v/--verbose : verbose - lists tests as they are run.
- -d/--debug : enable debug logging.
- -i/--ignore <test> : ignore the named test.
- -I/--ignore-file : file containing patterns to ignore.
- -S/--skip-self-test : skips the client self tests in the 'tests folder'
- -F/--spec-folder : folder that contains the specs to be loaded
- """
- sys.exit(1)
-
- def setBroker(self, broker):
- rex = re.compile(r"""
- # [ <user> [ / <password> ] @] <host> [ :<port> ]
- ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X)
- match = rex.match(broker)
- if not match: self._die("'%s' is not a valid broker" % (broker))
- self.user, self.password, self.host, self.port = match.groups()
- self.port = int(default(self.port, 5672))
- self.user = default(self.user, "guest")
- self.password = default(self.password, "guest")
-
- def ignoreFile(self, filename):
- f = file(filename)
- for line in f.readlines(): self.ignore.append(line.strip())
- f.close()
-
- def use08spec(self):
- "True if we are running with the old 0-8 spec."
- # NB: AMQP 0-8 identifies itself as 8-0 for historical reasons.
- return self.spec.major==8 and self.spec.minor==0
-
- def _parseargs(self, args):
- # Defaults
- self.setBroker("localhost")
- self.verbose = 1
- self.ignore = []
- self.specfile = "0-8"
- self.errata = []
- self.skip_self_test = False
-
- try:
- opts, self.tests = getopt(args, "s:e:b:h?dvSi:I:F:",
- ["help", "spec", "errata=", "server",
- "verbose", "skip-self-test", "ignore",
- "ignore-file", "spec-folder"])
- except GetoptError, e:
- self._die(str(e))
- for opt, value in opts:
- if opt in ("-?", "-h", "--help"): self._die()
- if opt in ("-s", "--spec"): self.specfile = value
- if opt in ("-e", "--errata"): self.errata.append(value)
- if opt in ("-b", "--broker"): self.setBroker(value)
- if opt in ("-v", "--verbose"): self.verbose = 2
- if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG)
- if opt in ("-i", "--ignore"): self.ignore.append(value)
- if opt in ("-I", "--ignore-file"): self.ignoreFile(value)
- if opt in ("-S", "--skip-self-test"): self.skip_self_test = True
- if opt in ("-F", "--spec-folder"): TestRunner.SPEC_FOLDER = value
- # Abbreviations for default settings.
- if (self.specfile == "0-10"):
- self.spec = load(self.get_spec_file("amqp.0-10.xml"))
- elif (self.specfile == "0-10-errata"):
- self.spec = load(self.get_spec_file("amqp.0-10-qpid-errata.xml"))
- else:
- if (self.specfile == "0-8"):
- self.specfile = self.get_spec_file("amqp.0-8.xml")
- elif (self.specfile == "0-9"):
- self.specfile = self.get_spec_file("amqp.0-9.xml")
- self.errata.append(self.get_spec_file("amqp-errata.0-9.xml"))
-
- if (self.specfile == None):
- self._die("No XML specification provided")
- print "Using specification from:", self.specfile
-
- self.spec = qpid.spec.load(self.specfile, *self.errata)
-
- if len(self.tests) == 0:
- if not self.skip_self_test:
- self.tests=findmodules("tests")
- if self.use08spec():
- self.tests+=findmodules("tests_0-8")
- elif (self.spec.major == 99 and self.spec.minor == 0):
- self.tests+=findmodules("tests_0-10_preview")
- elif (self.spec.major == 0 and self.spec.minor == 10):
- self.tests+=findmodules("tests_0-10")
- else:
- self.tests+=findmodules("tests_0-9")
-
- def testSuite(self):
- class IgnoringTestSuite(unittest.TestSuite):
- def addTest(self, test):
- if isinstance(test, unittest.TestCase):
- for pattern in testrunner.ignore:
- if fnmatch(test.id(), pattern):
- return
- unittest.TestSuite.addTest(self, test)
-
- # Use our IgnoringTestSuite in the test loader.
- unittest.TestLoader.suiteClass = IgnoringTestSuite
- return unittest.defaultTestLoader.loadTestsFromNames(self.tests)
-
- def run(self, args=sys.argv[1:]):
- self._parseargs(args)
- runner = unittest.TextTestRunner(descriptions=False,
- verbosity=self.verbose)
- result = runner.run(self.testSuite())
-
- if (self.ignore):
- print "======================================="
- print "NOTE: the following tests were ignored:"
- for t in self.ignore: print t
- print "======================================="
-
- return result.wasSuccessful()
-
- def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None):
- """Connect to the broker, returns a qpid.client.Client"""
- host = host or self.host
- port = port or self.port
- spec = spec or self.spec
- user = user or self.user
- password = password or self.password
- client = qpid.client.Client(host, port, spec)
- if self.use08spec():
- client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params)
- else:
- client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params)
- return client
-
- def get_spec_file(self, fname):
- return TestRunner.SPEC_FOLDER + os.sep + fname
-
-# Global instance for tests to call connect.
-testrunner = TestRunner()
-
+from qpid.harness import Skipped
+from qpid.exceptions import VersionError
class TestBase(unittest.TestCase):
"""Base class for Qpid test cases.
@@ -219,13 +41,16 @@ class TestBase(unittest.TestCase):
resources to clean up later.
"""
+ def configure(self, config):
+ self.config = config
+
def setUp(self):
self.queues = []
self.exchanges = []
self.client = self.connect()
self.channel = self.client.channel(1)
self.version = (self.client.spec.major, self.client.spec.minor)
- if self.version == (8, 0):
+ if self.version == (8, 0) or self.version == (0, 9):
self.channel.channel_open()
else:
self.channel.session_open()
@@ -245,9 +70,26 @@ class TestBase(unittest.TestCase):
else:
self.client.close()
- def connect(self, *args, **keys):
+ def connect(self, host=None, port=None, user=None, password=None, tune_params=None):
"""Create a new connction, return the Client object"""
- return testrunner.connect(*args, **keys)
+ host = host or self.config.broker.host
+ port = port or self.config.broker.port or 5672
+ user = user or "guest"
+ password = password or "guest"
+ client = qpid.client.Client(host, port)
+ try:
+ if client.spec.major == 8 and client.spec.minor == 0:
+ client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params)
+ else:
+ client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params)
+ except qpid.client.Closed, e:
+ if isinstance(e.args[0], VersionError):
+ raise Skipped(e.args[0])
+ else:
+ raise e
+ except socket.error, e:
+ raise Skipped(e)
+ return client
def queue_declare(self, channel=None, *args, **keys):
channel = channel or self.channel
@@ -271,24 +113,15 @@ class TestBase(unittest.TestCase):
def consume(self, queueName):
"""Consume from named queue returns the Queue object."""
- if testrunner.use08spec():
- reply = self.channel.basic_consume(queue=queueName, no_ack=True)
- return self.client.queue(reply.consumer_tag)
- else:
- if not "uniqueTag" in dir(self): self.uniqueTag = 1
- else: self.uniqueTag += 1
- consumer_tag = "tag" + str(self.uniqueTag)
- self.channel.message_subscribe(queue=queueName, destination=consumer_tag)
- self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
- self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
- return self.client.queue(consumer_tag)
+ reply = self.channel.basic_consume(queue=queueName, no_ack=True)
+ return self.client.queue(reply.consumer_tag)
def subscribe(self, channel=None, **keys):
channel = channel or self.channel
consumer_tag = keys["destination"]
channel.message_subscribe(**keys)
- channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
- channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
+ channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL)
+ channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL)
def assertEmpty(self, queue):
"""Assert that the queue is empty"""
@@ -302,24 +135,14 @@ class TestBase(unittest.TestCase):
Publish to exchange and assert queue.get() returns the same message.
"""
body = self.uniqueString()
- if testrunner.use08spec():
- self.channel.basic_publish(
- exchange=exchange,
- content=Content(body, properties=properties),
- routing_key=routing_key)
- else:
- self.channel.message_transfer(
- destination=exchange,
- content=Content(body, properties={'application_headers':properties,'routing_key':routing_key}))
+ self.channel.basic_publish(
+ exchange=exchange,
+ content=Content(body, properties=properties),
+ routing_key=routing_key)
msg = queue.get(timeout=1)
- if testrunner.use08spec():
- self.assertEqual(body, msg.content.body)
- if (properties):
- self.assertEqual(properties, msg.content.properties)
- else:
- self.assertEqual(body, msg.content.body)
- if (properties):
- self.assertEqual(properties, msg.content['application_headers'])
+ self.assertEqual(body, msg.content.body)
+ if (properties):
+ self.assertEqual(properties, msg.content.properties)
def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
"""
@@ -329,7 +152,7 @@ class TestBase(unittest.TestCase):
self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
def assertChannelException(self, expectedCode, message):
- if self.version == (8, 0): #or "transitional" in self.client.spec.file:
+ if self.version == (8, 0) or self.version == (0, 9):
if not isinstance(message, Message): self.fail("expected channel_close method, got %s" % (message))
self.assertEqual("channel", message.method.klass.name)
self.assertEqual("close", message.method.name)
@@ -346,31 +169,58 @@ class TestBase(unittest.TestCase):
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
+#0-10 support
+from qpid.connection import Connection
+from qpid.util import connect, ssl, URL
+
class TestBase010(unittest.TestCase):
"""
Base class for Qpid test cases. using the final 0-10 spec
"""
+ def configure(self, config):
+ self.config = config
+ self.broker = config.broker
+ self.defines = self.config.defines
+
def setUp(self):
- spec = testrunner.spec
- self.conn = Connection(connect(testrunner.host, testrunner.port), spec,
- username=testrunner.user, password=testrunner.password)
- self.conn.start(timeout=10)
+ self.conn = self.connect()
self.session = self.conn.session("test-session", timeout=10)
+ self.qmf = None
+
+ def startQmf(self, handler=None):
+ self.qmf = qmf.console.Session(handler)
+ self.qmf_broker = self.qmf.addBroker(str(self.broker))
def connect(self, host=None, port=None):
- spec = testrunner.spec
- conn = Connection(connect(host or testrunner.host, port or testrunner.port), spec)
- conn.start(timeout=10)
+ url = self.broker
+ if url.scheme == URL.AMQPS:
+ default_port = 5671
+ else:
+ default_port = 5672
+ try:
+ sock = connect(host or url.host, port or url.port or default_port)
+ except socket.error, e:
+ raise Skipped(e)
+ if url.scheme == URL.AMQPS:
+ sock = ssl(sock)
+ conn = Connection(sock, username=url.user or "guest",
+ password=url.password or "guest")
+ try:
+ conn.start(timeout=10)
+ except VersionError, e:
+ raise Skipped(e)
return conn
def tearDown(self):
if not self.session.error(): self.session.close(timeout=10)
self.conn.close(timeout=10)
+ if self.qmf:
+ self.qmf.delBroker(self.qmf_broker)
def subscribe(self, session=None, **keys):
session = session or self.session
consumer_tag = keys["destination"]
session.message_subscribe(**keys)
- session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
- session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL)
+ session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL)