summaryrefslogtreecommitdiff
path: root/python/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid')
-rw-r--r--python/qpid/client.py11
-rw-r--r--python/qpid/connection.py3
-rw-r--r--python/qpid/peer.py11
-rw-r--r--python/qpid/reference.py2
-rw-r--r--python/qpid/spec.py5
-rw-r--r--python/qpid/testlib.py53
6 files changed, 56 insertions, 29 deletions
diff --git a/python/qpid/client.py b/python/qpid/client.py
index ea6aa7901a..e548ef0e99 100644
--- a/python/qpid/client.py
+++ b/python/qpid/client.py
@@ -76,7 +76,8 @@ class Client:
self.locale = locale
self.tune_params = tune_params
- self.conn = Connection(connect(self.host, self.port), self.spec)
+ self.socket = connect(self.host, self.port)
+ self.conn = Connection(self.socket, self.spec)
self.peer = Peer(self.conn, ClientDelegate(self), self.opened)
self.conn.init()
@@ -90,6 +91,9 @@ class Client:
def opened(self, ch):
ch.references = References()
+ def close(self):
+ self.socket.close()
+
class ClientDelegate(Delegate):
def __init__(self, client):
@@ -112,9 +116,8 @@ class ClientDelegate(Delegate):
def message_transfer(self, ch, msg):
if isinstance(msg.body, ReferenceId):
- self.client.queue(msg.destination).put(ch.references.get(msg.body.id))
- else:
- self.client.queue(msg.destination).put(msg)
+ msg.reference = ch.references.get(msg.body.id)
+ self.client.queue(msg.destination).put(msg)
def message_open(self, ch, msg):
ch.references.open(msg.reference)
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
index 0785fe8774..cdfa2c2dc0 100644
--- a/python/qpid/connection.py
+++ b/python/qpid/connection.py
@@ -53,6 +53,9 @@ class SockIO:
def flush(self):
pass
+ def close(self):
+ self.sock.shutdown(socket.SHUT_RDWR)
+
def connect(host, port):
sock = socket.socket()
sock.connect((host, port))
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 6c8c6647c9..28db20f187 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -97,9 +97,12 @@ class Peer:
self.fatal()
def close(self, reason):
+ # We must close the delegate first because closing channels
+ # may wake up waiting threads and we don't want them to see
+ # the delegate as open.
+ self.delegate.close(reason)
for ch in self.channels.values():
ch.close(reason)
- self.delegate.close(reason)
def writer(self):
try:
@@ -144,7 +147,7 @@ class Requester:
self.write(frame, content)
def receive(self, channel, frame):
- listener = self.outstanding.pop(frame.id)
+ listener = self.outstanding.pop(frame.request_id)
listener(channel, frame)
class Responder:
@@ -178,8 +181,8 @@ class Channel:
self.requester = Requester(self.write)
self.responder = Responder(self.write)
- # XXX: better switch
- self.reliable = False
+ # Use reliable framing if version == 0-9.
+ self.reliable = (spec.major == 0 and spec.minor == 9)
self.synchronous = True
def close(self, reason):
diff --git a/python/qpid/reference.py b/python/qpid/reference.py
index d357560390..48ecb67656 100644
--- a/python/qpid/reference.py
+++ b/python/qpid/reference.py
@@ -111,7 +111,7 @@ class References:
self.get(id).close()
self.lock.acquire()
try:
- del map[id]
+ self.map.pop(id)
finally:
self.lock.release()
diff --git a/python/qpid/spec.py b/python/qpid/spec.py
index e430c45b96..4f0661bcbc 100644
--- a/python/qpid/spec.py
+++ b/python/qpid/spec.py
@@ -79,12 +79,11 @@ class Spec(Metadata):
PRINT=["major", "minor", "file"]
- def __init__(self, major, minor, file, errata):
+ def __init__(self, major, minor, file):
Metadata.__init__(self)
self.major = major
self.minor = minor
self.file = file
- self.errata = errata
self.constants = SpecContainer()
self.classes = SpecContainer()
# methods indexed by classname_methname
@@ -275,7 +274,7 @@ def load_fields(nd, l, domains):
def load(specfile, *errata):
doc = xmlutil.parse(specfile)
spec_root = doc["amqp"][0]
- spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile, errata)
+ spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile)
for root in [spec_root] + map(lambda x: xmlutil.parse(x)["amqp"][0], errata):
# constants
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index dcbf0ed91c..05641bce7e 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -26,6 +26,7 @@ import qpid.client, qpid.spec
import Queue
from getopt import getopt, GetoptError
from qpid.content import Content
+from qpid.message import Message
def findmodules(root):
"""Find potential python modules under directory root"""
@@ -56,15 +57,16 @@ class TestRunner:
run-tests [options] [test*]
The name of a test is package.module.ClassName.testMethod
Options:
- -?/-h/--help : this message
+ -?/-h/--help : this message
-s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations:
0-8 - use the default 0-8 specification.
0-9 - use the default 0-9 specification.
+ -e/--errata <errata.xml> : file containing amqp XML errata
-b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to
- -v/--verbose : verbose - lists tests as they are run.
- -d/--debug : enable debug logging.
- -i/--ignore <test> : ignore the named test.
- -I/--ignore-file : file containing patterns to ignore.
+ -v/--verbose : verbose - lists tests as they are run.
+ -d/--debug : enable debug logging.
+ -i/--ignore <test> : ignore the named test.
+ -I/--ignore-file : file containing patterns to ignore.
"""
sys.exit(1)
@@ -103,24 +105,27 @@ Options:
for opt, value in opts:
if opt in ("-?", "-h", "--help"): self._die()
if opt in ("-s", "--spec"): self.specfile = value
+ if opt in ("-e", "--errata"): self.errata.append(value)
if opt in ("-b", "--broker"): self.setBroker(value)
if opt in ("-v", "--verbose"): self.verbose = 2
if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG)
if opt in ("-i", "--ignore"): self.ignore.append(value)
if opt in ("-I", "--ignore-file"): self.ignoreFile(value)
+ # Abbreviations for default settings.
if (self.specfile == "0-8"):
- self.specfile = "../specs/amqp.0-8.xml"
+ self.specfile = "../specs/amqp.0-8.xml"
if (self.specfile == "0-9"):
- self.specfile = "../specs/amqp.0-9.xml"
- self.errata = ["../specs/amqp-errata.0-9.xml"]
+ self.specfile = "../specs/amqp.0-9.xml"
+ self.errata.append("../specs/amqp-errata.0-9.xml")
+ if (self.specfile == None):
+ self._die("No XML specification provided")
print "Using specification from:", self.specfile
self.spec = qpid.spec.load(self.specfile, *self.errata)
if len(self.tests) == 0:
if self.use08spec():
- testdir="tests_0-8"
+ self.tests=findmodules("tests_0-8")
else:
- testdir="tests"
- self.tests=findmodules(testdir)
+ self.tests=findmodules("tests")
def testSuite(self):
class IgnoringTestSuite(unittest.TestSuite):
@@ -137,7 +142,11 @@ Options:
self._parseargs(args)
runner = unittest.TextTestRunner(descriptions=False,
verbosity=self.verbose)
- result = runner.run(self.testSuite())
+ try:
+ result = runner.run(self.testSuite())
+ except:
+ print "Unhandled error in test:", sys.exc_info()
+
if (self.ignore):
print "======================================="
print "NOTE: the following tests were ignored:"
@@ -181,10 +190,18 @@ class TestBase(unittest.TestCase):
self.channel.channel_open()
def tearDown(self):
- for ch, q in self.queues:
- ch.queue_delete(queue=q)
- for ch, ex in self.exchanges:
- ch.exchange_delete(exchange=ex)
+ try:
+ for ch, q in self.queues:
+ ch.queue_delete(queue=q)
+ for ch, ex in self.exchanges:
+ ch.exchange_delete(exchange=ex)
+ except:
+ print "Error on tearDown:", sys.exc_info()
+
+ if not self.client.closed:
+ self.client.channel(0).connection_close(reply_code=200)
+ else:
+ self.client.close()
def connect(self, *args, **keys):
"""Create a new connction, return the Client object"""
@@ -261,13 +278,15 @@ class TestBase(unittest.TestCase):
"""
self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
- def assertChannelException(self, expectedCode, message):
+ def assertChannelException(self, expectedCode, message):
+ if not isinstance(message, Message): self.fail("expected channel_close method")
self.assertEqual("channel", message.method.klass.name)
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
def assertConnectionException(self, expectedCode, message):
+ if not isinstance(message, Message): self.fail("expected connection_close method")
self.assertEqual("connection", message.method.klass.name)
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)