diff options
Diffstat (limited to 'python/qpid')
| -rw-r--r-- | python/qpid/client.py | 11 | ||||
| -rw-r--r-- | python/qpid/connection.py | 3 | ||||
| -rw-r--r-- | python/qpid/peer.py | 11 | ||||
| -rw-r--r-- | python/qpid/reference.py | 2 | ||||
| -rw-r--r-- | python/qpid/spec.py | 5 | ||||
| -rw-r--r-- | python/qpid/testlib.py | 53 |
6 files changed, 56 insertions, 29 deletions
diff --git a/python/qpid/client.py b/python/qpid/client.py index ea6aa7901a..e548ef0e99 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -76,7 +76,8 @@ class Client: self.locale = locale self.tune_params = tune_params - self.conn = Connection(connect(self.host, self.port), self.spec) + self.socket = connect(self.host, self.port) + self.conn = Connection(self.socket, self.spec) self.peer = Peer(self.conn, ClientDelegate(self), self.opened) self.conn.init() @@ -90,6 +91,9 @@ class Client: def opened(self, ch): ch.references = References() + def close(self): + self.socket.close() + class ClientDelegate(Delegate): def __init__(self, client): @@ -112,9 +116,8 @@ class ClientDelegate(Delegate): def message_transfer(self, ch, msg): if isinstance(msg.body, ReferenceId): - self.client.queue(msg.destination).put(ch.references.get(msg.body.id)) - else: - self.client.queue(msg.destination).put(msg) + msg.reference = ch.references.get(msg.body.id) + self.client.queue(msg.destination).put(msg) def message_open(self, ch, msg): ch.references.open(msg.reference) diff --git a/python/qpid/connection.py b/python/qpid/connection.py index 0785fe8774..cdfa2c2dc0 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -53,6 +53,9 @@ class SockIO: def flush(self): pass + def close(self): + self.sock.shutdown(socket.SHUT_RDWR) + def connect(host, port): sock = socket.socket() sock.connect((host, port)) diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 6c8c6647c9..28db20f187 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -97,9 +97,12 @@ class Peer: self.fatal() def close(self, reason): + # We must close the delegate first because closing channels + # may wake up waiting threads and we don't want them to see + # the delegate as open. + self.delegate.close(reason) for ch in self.channels.values(): ch.close(reason) - self.delegate.close(reason) def writer(self): try: @@ -144,7 +147,7 @@ class Requester: self.write(frame, content) def receive(self, channel, frame): - listener = self.outstanding.pop(frame.id) + listener = self.outstanding.pop(frame.request_id) listener(channel, frame) class Responder: @@ -178,8 +181,8 @@ class Channel: self.requester = Requester(self.write) self.responder = Responder(self.write) - # XXX: better switch - self.reliable = False + # Use reliable framing if version == 0-9. + self.reliable = (spec.major == 0 and spec.minor == 9) self.synchronous = True def close(self, reason): diff --git a/python/qpid/reference.py b/python/qpid/reference.py index d357560390..48ecb67656 100644 --- a/python/qpid/reference.py +++ b/python/qpid/reference.py @@ -111,7 +111,7 @@ class References: self.get(id).close() self.lock.acquire() try: - del map[id] + self.map.pop(id) finally: self.lock.release() diff --git a/python/qpid/spec.py b/python/qpid/spec.py index e430c45b96..4f0661bcbc 100644 --- a/python/qpid/spec.py +++ b/python/qpid/spec.py @@ -79,12 +79,11 @@ class Spec(Metadata): PRINT=["major", "minor", "file"] - def __init__(self, major, minor, file, errata): + def __init__(self, major, minor, file): Metadata.__init__(self) self.major = major self.minor = minor self.file = file - self.errata = errata self.constants = SpecContainer() self.classes = SpecContainer() # methods indexed by classname_methname @@ -275,7 +274,7 @@ def load_fields(nd, l, domains): def load(specfile, *errata): doc = xmlutil.parse(specfile) spec_root = doc["amqp"][0] - spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile, errata) + spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile) for root in [spec_root] + map(lambda x: xmlutil.parse(x)["amqp"][0], errata): # constants diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index dcbf0ed91c..05641bce7e 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -26,6 +26,7 @@ import qpid.client, qpid.spec import Queue from getopt import getopt, GetoptError from qpid.content import Content +from qpid.message import Message def findmodules(root): """Find potential python modules under directory root""" @@ -56,15 +57,16 @@ class TestRunner: run-tests [options] [test*] The name of a test is package.module.ClassName.testMethod Options: - -?/-h/--help : this message + -?/-h/--help : this message -s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations: 0-8 - use the default 0-8 specification. 0-9 - use the default 0-9 specification. + -e/--errata <errata.xml> : file containing amqp XML errata -b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to - -v/--verbose : verbose - lists tests as they are run. - -d/--debug : enable debug logging. - -i/--ignore <test> : ignore the named test. - -I/--ignore-file : file containing patterns to ignore. + -v/--verbose : verbose - lists tests as they are run. + -d/--debug : enable debug logging. + -i/--ignore <test> : ignore the named test. + -I/--ignore-file : file containing patterns to ignore. """ sys.exit(1) @@ -103,24 +105,27 @@ Options: for opt, value in opts: if opt in ("-?", "-h", "--help"): self._die() if opt in ("-s", "--spec"): self.specfile = value + if opt in ("-e", "--errata"): self.errata.append(value) if opt in ("-b", "--broker"): self.setBroker(value) if opt in ("-v", "--verbose"): self.verbose = 2 if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG) if opt in ("-i", "--ignore"): self.ignore.append(value) if opt in ("-I", "--ignore-file"): self.ignoreFile(value) + # Abbreviations for default settings. if (self.specfile == "0-8"): - self.specfile = "../specs/amqp.0-8.xml" + self.specfile = "../specs/amqp.0-8.xml" if (self.specfile == "0-9"): - self.specfile = "../specs/amqp.0-9.xml" - self.errata = ["../specs/amqp-errata.0-9.xml"] + self.specfile = "../specs/amqp.0-9.xml" + self.errata.append("../specs/amqp-errata.0-9.xml") + if (self.specfile == None): + self._die("No XML specification provided") print "Using specification from:", self.specfile self.spec = qpid.spec.load(self.specfile, *self.errata) if len(self.tests) == 0: if self.use08spec(): - testdir="tests_0-8" + self.tests=findmodules("tests_0-8") else: - testdir="tests" - self.tests=findmodules(testdir) + self.tests=findmodules("tests") def testSuite(self): class IgnoringTestSuite(unittest.TestSuite): @@ -137,7 +142,11 @@ Options: self._parseargs(args) runner = unittest.TextTestRunner(descriptions=False, verbosity=self.verbose) - result = runner.run(self.testSuite()) + try: + result = runner.run(self.testSuite()) + except: + print "Unhandled error in test:", sys.exc_info() + if (self.ignore): print "=======================================" print "NOTE: the following tests were ignored:" @@ -181,10 +190,18 @@ class TestBase(unittest.TestCase): self.channel.channel_open() def tearDown(self): - for ch, q in self.queues: - ch.queue_delete(queue=q) - for ch, ex in self.exchanges: - ch.exchange_delete(exchange=ex) + try: + for ch, q in self.queues: + ch.queue_delete(queue=q) + for ch, ex in self.exchanges: + ch.exchange_delete(exchange=ex) + except: + print "Error on tearDown:", sys.exc_info() + + if not self.client.closed: + self.client.channel(0).connection_close(reply_code=200) + else: + self.client.close() def connect(self, *args, **keys): """Create a new connction, return the Client object""" @@ -261,13 +278,15 @@ class TestBase(unittest.TestCase): """ self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) - def assertChannelException(self, expectedCode, message): + def assertChannelException(self, expectedCode, message): + if not isinstance(message, Message): self.fail("expected channel_close method") self.assertEqual("channel", message.method.klass.name) self.assertEqual("close", message.method.name) self.assertEqual(expectedCode, message.reply_code) def assertConnectionException(self, expectedCode, message): + if not isinstance(message, Message): self.fail("expected connection_close method") self.assertEqual("connection", message.method.klass.name) self.assertEqual("close", message.method.name) self.assertEqual(expectedCode, message.reply_code) |
