summaryrefslogtreecommitdiff
path: root/python/qpid/testlib.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/testlib.py')
-rw-r--r--python/qpid/testlib.py226
1 files changed, 0 insertions, 226 deletions
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
deleted file mode 100644
index 1439b892ea..0000000000
--- a/python/qpid/testlib.py
+++ /dev/null
@@ -1,226 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-#
-# Support library for qpid python tests.
-#
-
-import unittest, traceback, socket
-import qpid.client, qmf.console
-import Queue
-from qpid.content import Content
-from qpid.message import Message
-from qpid.harness import Skipped
-from qpid.exceptions import VersionError
-
-class TestBase(unittest.TestCase):
- """Base class for Qpid test cases.
-
- self.client is automatically connected with channel 1 open before
- the test methods are run.
-
- Deletes queues and exchanges after. Tests call
- self.queue_declare(channel, ...) and self.exchange_declare(chanel,
- ...) which are wrappers for the Channel functions that note
- resources to clean up later.
- """
-
- def configure(self, config):
- self.config = config
-
- def setUp(self):
- self.queues = []
- self.exchanges = []
- self.client = self.connect()
- self.channel = self.client.channel(1)
- self.version = (self.client.spec.major, self.client.spec.minor)
- if self.version == (8, 0) or self.version == (0, 9):
- self.channel.channel_open()
- else:
- self.channel.session_open()
-
- def tearDown(self):
- try:
- for ch, q in self.queues:
- ch.queue_delete(queue=q)
- for ch, ex in self.exchanges:
- ch.exchange_delete(exchange=ex)
- except:
- print "Error on tearDown:"
- print traceback.print_exc()
-
- if not self.client.closed:
- self.client.channel(0).connection_close(reply_code=200)
- else:
- self.client.close()
-
- def connect(self, host=None, port=None, user=None, password=None, tune_params=None):
- """Create a new connction, return the Client object"""
- host = host or self.config.broker.host
- port = port or self.config.broker.port or 5672
- user = user or "guest"
- password = password or "guest"
- client = qpid.client.Client(host, port)
- try:
- if client.spec.major == 8 and client.spec.minor == 0:
- client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params)
- else:
- client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params)
- except qpid.client.Closed, e:
- if isinstance(e.args[0], VersionError):
- raise Skipped(e.args[0])
- else:
- raise e
- except socket.error, e:
- raise Skipped(e)
- return client
-
- def queue_declare(self, channel=None, *args, **keys):
- channel = channel or self.channel
- reply = channel.queue_declare(*args, **keys)
- self.queues.append((channel, keys["queue"]))
- return reply
-
- def exchange_declare(self, channel=None, ticket=0, exchange='',
- type='', passive=False, durable=False,
- auto_delete=False,
- arguments={}):
- channel = channel or self.channel
- reply = channel.exchange_declare(ticket=ticket, exchange=exchange, type=type, passive=passive,durable=durable, auto_delete=auto_delete, arguments=arguments)
- self.exchanges.append((channel,exchange))
- return reply
-
- def uniqueString(self):
- """Generate a unique string, unique for this TestBase instance"""
- if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
- return "Test Message " + str(self.uniqueCounter)
-
- def consume(self, queueName):
- """Consume from named queue returns the Queue object."""
- reply = self.channel.basic_consume(queue=queueName, no_ack=True)
- return self.client.queue(reply.consumer_tag)
-
- def subscribe(self, channel=None, **keys):
- channel = channel or self.channel
- consumer_tag = keys["destination"]
- channel.message_subscribe(**keys)
- channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL)
- channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL)
-
- def assertEmpty(self, queue):
- """Assert that the queue is empty"""
- try:
- queue.get(timeout=1)
- self.fail("Queue is not empty.")
- except Queue.Empty: None # Ignore
-
- def assertPublishGet(self, queue, exchange="", routing_key="", properties=None):
- """
- Publish to exchange and assert queue.get() returns the same message.
- """
- body = self.uniqueString()
- self.channel.basic_publish(
- exchange=exchange,
- content=Content(body, properties=properties),
- routing_key=routing_key)
- msg = queue.get(timeout=1)
- self.assertEqual(body, msg.content.body)
- if (properties):
- self.assertEqual(properties, msg.content.properties)
-
- def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
- """
- Publish a message and consume it, assert it comes back intact.
- Return the Queue object used to consume.
- """
- self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
-
- def assertChannelException(self, expectedCode, message):
- if self.version == (8, 0) or self.version == (0, 9):
- if not isinstance(message, Message): self.fail("expected channel_close method, got %s" % (message))
- self.assertEqual("channel", message.method.klass.name)
- self.assertEqual("close", message.method.name)
- else:
- if not isinstance(message, Message): self.fail("expected session_closed method, got %s" % (message))
- self.assertEqual("session", message.method.klass.name)
- self.assertEqual("closed", message.method.name)
- self.assertEqual(expectedCode, message.reply_code)
-
-
- def assertConnectionException(self, expectedCode, message):
- if not isinstance(message, Message): self.fail("expected connection_close method, got %s" % (message))
- self.assertEqual("connection", message.method.klass.name)
- self.assertEqual("close", message.method.name)
- self.assertEqual(expectedCode, message.reply_code)
-
-#0-10 support
-from qpid.connection import Connection
-from qpid.util import connect, ssl, URL
-
-class TestBase010(unittest.TestCase):
- """
- Base class for Qpid test cases. using the final 0-10 spec
- """
-
- def configure(self, config):
- self.config = config
- self.broker = config.broker
- self.defines = self.config.defines
-
- def setUp(self):
- self.conn = self.connect()
- self.session = self.conn.session("test-session", timeout=10)
- self.qmf = None
-
- def startQmf(self, handler=None):
- self.qmf = qmf.console.Session(handler)
- self.qmf_broker = self.qmf.addBroker(str(self.broker))
-
- def connect(self, host=None, port=None):
- url = self.broker
- if url.scheme == URL.AMQPS:
- default_port = 5671
- else:
- default_port = 5672
- try:
- sock = connect(host or url.host, port or url.port or default_port)
- except socket.error, e:
- raise Skipped(e)
- if url.scheme == URL.AMQPS:
- sock = ssl(sock)
- conn = Connection(sock, username=url.user or "guest",
- password=url.password or "guest")
- try:
- conn.start(timeout=10)
- except VersionError, e:
- raise Skipped(e)
- return conn
-
- def tearDown(self):
- if not self.session.error(): self.session.close(timeout=10)
- self.conn.close(timeout=10)
- if self.qmf:
- self.qmf.delBroker(self.qmf_broker)
-
- def subscribe(self, session=None, **keys):
- session = session or self.session
- consumer_tag = keys["destination"]
- session.message_subscribe(**keys)
- session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL)
- session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL)