diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /python | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r-- | python/examples/README.txt | 5 | ||||
-rwxr-xr-x | python/examples/api/receive | 194 | ||||
-rwxr-xr-x | python/examples/api/send | 281 | ||||
-rw-r--r-- | python/examples/api/statistics.py | 139 | ||||
-rw-r--r-- | python/qpid/client.py | 10 | ||||
-rw-r--r-- | python/qpid/connection08.py | 41 | ||||
-rw-r--r-- | python/qpid/delegates.py | 20 | ||||
-rw-r--r-- | python/qpid/messaging/driver.py | 19 | ||||
-rw-r--r-- | python/qpid/messaging/endpoints.py | 2 | ||||
-rw-r--r-- | python/qpid/messaging/transports.py | 36 | ||||
-rw-r--r-- | python/qpid/testlib.py | 6 | ||||
-rw-r--r-- | python/qpid/tests/__init__.py | 1 | ||||
-rw-r--r-- | python/qpid/tests/util.py | 46 | ||||
-rw-r--r-- | python/qpid/util.py | 26 | ||||
-rwxr-xr-x | python/setup.py | 2 |
15 files changed, 768 insertions, 60 deletions
diff --git a/python/examples/README.txt b/python/examples/README.txt index 4395160fec..3a3e421a1e 100644 --- a/python/examples/README.txt +++ b/python/examples/README.txt @@ -14,6 +14,11 @@ api/spout -- A simple messaging client that sends messages to the target specified on the command line. +api/send -- Sends messages to a specified queue. + +api/receive -- Receives messages from a specified queue. + Use with the send example above. + api/server -- An example server that process incoming messages and sends replies. diff --git a/python/examples/api/receive b/python/examples/api/receive new file mode 100755 index 0000000000..f14df277ac --- /dev/null +++ b/python/examples/api/receive @@ -0,0 +1,194 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, sys, time +import statistics +from qpid.messaging import * + +SECOND = 1000 +TIME_SEC = 1000000000 + +op = optparse.OptionParser(usage="usage: %prog [options]", description="Drains messages from the specified address") +op.add_option("-b", "--broker", default="localhost:5672", type="str", help="url of broker to connect to") +op.add_option("-a", "--address", type="str", help="address to receive from") +op.add_option("--connection-options", default={}, help="options for the connection") +op.add_option("-m", "--messages", default=0, type="int", help="stop after N messages have been received, 0 means no limit") +op.add_option("--timeout", default=0, type="int", help="timeout in seconds to wait before exiting") +op.add_option("-f", "--forever", default=False, action="store_true", help="ignore timeout and wait forever") +op.add_option("--ignore-duplicates", default=False, action="store_true", help="Detect and ignore duplicates (by checking 'sn' header)") +op.add_option("--verify-sequence", default=False, action="store_true", help="Verify there are no gaps in the message sequence (by checking 'sn' header)") +op.add_option("--check-redelivered", default=False, action="store_true", help="Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)") +op.add_option("--capacity", default=1000, type="int", help="size of the senders outgoing message queue") +op.add_option("--ack-frequency", default=100, type="int", help="Ack frequency (0 implies none of the messages will get accepted)") +op.add_option("--tx", default=0, type="int", help="batch size for transactions (0 implies transaction are not used)") +op.add_option("--rollback-frequency", default=0, type="int", help="rollback frequency (0 implies no transaction will be rolledback)") +op.add_option("--print-content", type="str", default="yes", help="print out message content") +op.add_option("--print-headers", type="str", default="no", help="print out message headers") +op.add_option("--failover-updates", default=False, action="store_true", help="Listen for membership updates distributed via amq.failover") +op.add_option("--report-total", default=False, action="store_true", help="Report total throughput statistics") +op.add_option("--report-every", default=0, type="int", help="Report throughput statistics every N messages") +op.add_option("--report-header", type="str", default="yes", help="Headers on report") +op.add_option("--ready-address", type="str", help="send a message to this address when ready to receive") +op.add_option("--receive-rate", default=0, type="int", help="Receive at rate of N messages/second. 0 means receive as fast as possible") +#op.add_option("--help", default=False, action="store_true", help="print this usage statement") + +def getTimeout(timeout, forever): + if forever: + return None + else: + return SECOND*timeout + + +EOS = "eos" +SN = "sn" + +# Check for duplicate or dropped messages by sequence number +class SequenceTracker: + def __init__(self, opts): + self.opts = opts + self.lastSn = 0 + + # Return True if the message should be procesed, false if it should be ignored. + def track(self, message): + if not(self.opts.verify_sequence) or (self.opts.ignore_duplicates): + return True + sn = message.properties[SN] + duplicate = (sn <= lastSn) + dropped = (sn > lastSn+1) + if self.opts.verify_sequence and dropped: + raise Exception("Gap in sequence numbers %s-%s" %(lastSn, sn)) + ignore = (duplicate and self.opts.ignore_duplicates) + if ignore and self.opts.check_redelivered and (not msg.redelivered): + raise Exception("duplicate sequence number received, message not marked as redelivered!") + if not duplicate: + lastSn = sn + return (not(ignore)) + + +def main(): + opts, args = op.parse_args() + if not opts.address: + raise Exception("Address must be specified!") + + broker = opts.broker + address = opts.address + connection = Connection(opts.broker, **opts.connection_options) + + try: + connection.open() + if opts.failover_updates: + auto_fetch_reconnect_urls(connection) + session = connection.session(transactional=(opts.tx)) + receiver = session.receiver(opts.address) + if opts.capacity > 0: + receiver.capacity = opts.capacity + msg = Message() + count = 0 + txCount = 0 + sequenceTracker = SequenceTracker(opts) + timeout = getTimeout(opts.timeout, opts.forever) + done = False + stats = statistics.ThroughputAndLatency() + reporter = statistics.Reporter(opts.report_every, opts.report_header == "yes", stats) + + if opts.ready_address is not None: + session.sender(opts.ready_address).send(msg) + if opts.tx > 0: + session.commit() + # For receive rate calculation + start = time.time()*TIME_SEC + interval = 0 + if opts.receive_rate > 0: + interval = TIME_SEC / opts.receive_rate + + replyTo = {} # a dictionary of reply-to address -> sender mapping + + while (not done): + try: + msg = receiver.fetch(timeout=timeout) + reporter.message(msg) + if sequenceTracker.track(msg): + if msg.content == EOS: + done = True + else: + count+=1 + if opts.print_headers == "yes": + if msg.subject is not None: + print "Subject: %s" %msg.subject + if msg.reply_to is not None: + print "ReplyTo: %s" %msg.reply_to + if msg.correlation_id is not None: + print "CorrelationId: %s" %msg.correlation_id + if msg.user_id is not None: + print "UserId: %s" %msg.user_id + if msg.ttl is not None: + print "TTL: %s" %msg.ttl + if msg.priority is not None: + print "Priority: %s" %msg.priority + if msg.durable: + print "Durable: true" + if msg.redelivered: + print "Redelivered: true" + print "Properties: %s" %msg.properties + print + if opts.print_content == "yes": + print msg.content + if (opts.messages > 0) and (count >= opts.messages): + done = True + # end of "if sequenceTracker.track(msg):" + if (opts.tx > 0) and (count % opts.tx == 0): + txCount+=1 + if (opts.rollback_frequency > 0) and (txCount % opts.rollback_frequency == 0): + session.rollback() + else: + session.commit() + elif (opts.ack_frequency > 0) and (count % opts.ack_frequency == 0): + session.acknowledge() + if msg.reply_to is not None: # Echo message back to reply-to address. + if msg.reply_to not in replyTo: + replyTo[msg.reply_to] = session.sender(msg.reply_to) + replyTo[msg.reply_to].capacity = opts.capacity + replyTo[msg.reply_to].send(msg) + if opts.receive_rate > 0: + delay = start + count*interval - time.time()*TIME_SEC + if delay > 0: + time.sleep(delay) + # Clear out message properties & content for next iteration. + msg = Message() + except Empty: # no message fetched => break the while cycle + break + # end of while cycle + if opts.report_total: + reporter.report() + if opts.tx > 0: + txCount+=1 + if opts.rollback_frequency and (txCount % opts.rollback_frequency == 0): + session.rollback() + else: + session.commit() + else: + session.acknowledge() + session.close() + connection.close() + except Exception,e: + print e + connection.close() + +if __name__ == "__main__": main() diff --git a/python/examples/api/send b/python/examples/api/send new file mode 100755 index 0000000000..b0105e41a6 --- /dev/null +++ b/python/examples/api/send @@ -0,0 +1,281 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, random, os, time, uuid +from qpid.messaging import * +import statistics + +EOS = "eos" +SN = "sn" +TS = "ts" + +TIME_SEC = 1000000000 +SECOND = 1000 + +def nameval(st): + idx = st.find("=") + if idx >= 0: + name = st[0:idx] + value = st[idx+1:] + else: + name = st + value = None + return name, value + + +op = optparse.OptionParser(usage="usage: %prog [options]", description="Spouts messages to the specified address") +op.add_option("-b", "--broker", default="localhost:5672", type="str", help="url of broker to connect to") +op.add_option("-a", "--address", type="str", help="address to send to") +op.add_option("--connection-options", default={}, help="options for the connection") +op.add_option("-m", "--messages", default=1, type="int", help="stop after N messages have been sent, 0 means no limit") +op.add_option("-i", "--id", type="str", help="use the supplied id instead of generating one") +op.add_option("--reply-to", type="str", help="specify reply-to address") +op.add_option("--send-eos", default=0, type="int", help="Send N EOS messages to mark end of input") +op.add_option("--durable", default=False, action="store_true", help="Mark messages as durable") +op.add_option("--ttl", default=0, type="int", help="Time-to-live for messages, in milliseconds") +op.add_option("--priority", default=0, type="int", help="Priority for messages (higher value implies higher priority)") +op.add_option("-P", "--property", default=[], action="append", type="str", help="specify message property") +op.add_option("--correlation-id", type="str", help="correlation-id for message") +op.add_option("--user-id", type="str", help="userid for message") +op.add_option("--content-string", type="str", help="use CONTENT as message content") +op.add_option("--content-size", default=0, type="int", help="create an N-byte message content") +op.add_option("-M", "--content-map", default=[], action="append", type="str", help="specify entry for map content") +op.add_option("--content-stdin", default=False, action="store_true", help="read message content from stdin, one line per message") +op.add_option("--capacity", default=1000, type="int", help="size of the senders outgoing message queue") +op.add_option("--tx", default=0, type="int", help="batch size for transactions (0 implies transaction are not used)") +op.add_option("--rollback-frequency", default=0, type="int", help="rollback frequency (0 implies no transaction will be rolledback)") +op.add_option("--failover-updates", default=False, action="store_true", help="Listen for membership updates distributed via amq.failover") +op.add_option("--report-total", default=False, action="store_true", help="Report total throughput statistics") +op.add_option("--report-every", default=0, type="int", help="Report throughput statistics every N messages") +op.add_option("--report-header", type="str", default="yes", help="Headers on report") +op.add_option("--send-rate", default=0, type="int", help="Send at rate of N messages/second. 0 means send as fast as possible") +op.add_option("--flow-control", default=0, type="int", help="Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.") +op.add_option("--sequence", type="str", default="yes", help="Add a sequence number messages property (required for duplicate/lost message detection)") +op.add_option("--timestamp", type="str", default="yes", help="Add a time stamp messages property (required for latency measurement)") +op.add_option("--group-key", type="str", help="Generate groups of messages using message header 'KEY' to hold the group identifier") +op.add_option("--group-prefix", default="GROUP-", type="str", help="Generate group identifers with 'STRING' prefix (if group-key specified)") +op.add_option("--group-size", default=10, type="int", help="Number of messages per a group (if group-key specified)") +op.add_option("--group-randomize-size", default=False, action="store_true", help="Randomize the number of messages per group to [1...group-size] (if group-key specified)") +op.add_option("--group-interleave", default=1, type="int", help="Simultaineously interleave messages from N different groups (if group-key specified)") + + +class ContentGenerator: + def setContent(self, msg): + return + +class GetlineContentGenerator(ContentGenerator): + def setContent(self, msg): + content = sys.stdin.readline() + got = (not line) + if (got): + msg.content = content + return got + +class FixedContentGenerator(ContentGenerator): + def __init__(self, content=None): + self.content = content + + def setContent(self, msg): + msg.content = self.content + return True + +class MapContentGenerator(ContentGenerator): + def __init__(self, opts=None): + self.opts = opts + + def setContent(self, msg): + self.content = {} + for e in self.opts.content_map: + name, val = nameval(p) + content[name] = val + msg.content = self.content + return True + + +# tag each generated message with a group identifer +class GroupGenerator: + def __init__(self, key, prefix, size, randomize, interleave): + groupKey = key + groupPrefix = prefix + groupSize = size + randomizeSize = randomize + groupSuffix = 0 + if (randomize > 0): + random.seed(os.getpid()) + + for i in range(0, interleave): + self.newGroup() + current = 0 + + def setGroupInfo(self, msg): + if (current == len(groups)): + current = 0 + my_group = groups[current] + msg.properties[groupKey] = my_group[id]; + # print "SENDING GROUPID=[%s]\n" % my_group[id] + my_group[count]=my_group[count]+1 + if (my_group[count] == my_group[size]): + self.newGroup() + del groups[current] + else: + current+=1 + + def newGroup(self): + groupId = "%s%s" % (groupPrefix, groupSuffix) + groupSuffix+=1 + size = groupSize + if (randomizeSize == True): + size = random.randint(1,groupSize) + # print "New group: GROUPID=["%s] size=%s" % (groupId, size) + groups.append({'id':groupId, 'size':size, 'count':0}) + + + +def main(): + opts, args = op.parse_args() + if not opts.address: + raise Exception("Address must be specified!") + + broker = opts.broker + address = opts.address + connection = Connection(opts.broker, **opts.connection_options) + + try: + connection.open() + if (opts.failover_updates): + auto_fetch_reconnect_urls(connection) + session = connection.session(transactional=(opts.tx)) + sender = session.sender(opts.address) + if (opts.capacity>0): + sender.capacity = opts.capacity + sent = 0 + txCount = 0 + stats = statistics.Throughput() + reporter = statistics.Reporter(opts.report_every, opts.report_header == "yes", stats) + + contentGen = ContentGenerator() + content = "" # auxiliary variable for determining content type of message - needs to be changed to {} for Map message + if opts.content_stdin: + opts.messages = 0 # Don't limit number of messages sent. + contentGen = GetlineContentGenerator() + elif opts.content_map is not None: + contentGen = MapContentGenerator(opts) + content = {} + elif opts.content_size is not None: + contentGen = FixedContentGenerator('X' * opts.content_size) + else: + contentGen = FixedContentGenerator(opts.content_string) + if opts.group_key is not None: + groupGen = GroupGenerator(opts.group_key, opts.group_prefix, opts.group_size, opts.group_random_size, opts.group_interleave) + + msg = Message(content=content) + msg.durable = opts.durable + if opts.ttl: + msg.ttl = opts.ttl/1000.0 + if opts.priority: + msg.priority = opts.priority + if opts.reply_to is not None: + if opts.flow_control > 0: + raise Exception("Can't use reply-to and flow-control together") + msg.reply_to = opts.reply_to + if opts.user_id is not None: + msg.user_id = opts.user_id + if opts.correlation_id is not None: + msg.correlation_id = opts.correlation_id + for p in opts.property: + name, val = nameval(p) + msg.properties[name] = val + + start = time.time()*TIME_SEC + interval = 0 + if opts.send_rate > 0: + interval = TIME_SEC/opts.send_rate + + flowControlAddress = "flow-" + str(uuid.uuid1()) + ";{create:always,delete:always}" + flowSent = 0 + if opts.flow_control > 0: + flowControlReceiver = session.receiver(flowControlAddress) + flowControlReceiver.capacity = 2 + + while (contentGen.setContent(msg) == True): + sent+=1 + if opts.sequence == "yes": + msg.properties[SN] = sent + + if opts.flow_control > 0: + if (sent % opts.flow_control == 0): + msg.reply_to = flowControlAddress + flowSent+=1 + else: + msg.reply_to = "" # Clear the reply address. + + if 'groupGen' in vars(): + groupGen.setGroupInfo(msg) + + if (opts.timestamp == "yes"): + msg.properties[TS] = int(time.time()*TIME_SEC) + sender.send(msg) + reporter.message(msg) + + if ((opts.tx > 0) and (sent % opts.tx == 0)): + txCount+=1 + if ((opts.rollbackFrequency > 0) and (txCount % opts.rollbackFrequency == 0)): + session.rollback() + else: + session.commit() + if ((opts.messages > 0) and (sent >= opts.messages)): + break + + if (opts.flow_control > 0) and (flowSent == 2): + flowControlReceiver.fetch(timeout=SECOND) + flowSent -= 1 + + if (opts.send_rate > 0): + delay = start + sent*interval - time.time()*TIME_SEC + if (delay > 0): + time.sleep(delay) + #end of while + + while flowSent > 0: + flowControlReceiver.fetch(timeout=SECOND) + flowSent -= 1 + + if (opts.report_total): + reporter.report() + for i in reversed(range(1,opts.send_eos+1)): + if (opts.sequence == "yes"): + sent+=1 + msg.properties[SN] = sent + msg.properties[EOS] = True #TODO (also in C++ client): add in ability to send digest or similar + sender.send(msg) + if ((opts.tx > 0) and (sent % opts.tx == 0)): + txCount+=1 + if ((opts.rollback_frequency > 0) and (txCount % opts.rollback_frequency == 0)): + session.rollback() + else: + session.commit() + session.sync() + session.close() + connection.close() + except Exception,e: + print e + connection.close() + +if __name__ == "__main__": main() diff --git a/python/examples/api/statistics.py b/python/examples/api/statistics.py new file mode 100644 index 0000000000..e095920e90 --- /dev/null +++ b/python/examples/api/statistics.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import time + +TS = "ts" +TIME_SEC = 1000000000 +MILLISECOND = 1000 + +class Statistic: + def message(self, msg): + return + def report(self): + return "" + def header(self): + return "" + + +class Throughput(Statistic): + def __init__(self): + self.messages = 0 + self.started = False + + def message(self, m): + self.messages += 1 + if not self.started: + self.start = time.time() + self.started = True + + def header(self): + return "tp(m/s)" + + def report(self): + if self.started: + elapsed = time.time() - self.start + return str(int(self.messages/elapsed)) + else: + return "0" + + +class ThroughputAndLatency(Throughput): + def __init__(self): + Throughput.__init__(self) + self.total = 0.0 + self.min = float('inf') + self.max = -float('inf') + self.samples = 0 + + def message(self, m): + Throughput.message(self, m) + if TS in m.properties: + self.samples+=1 + latency = MILLISECOND * (time.time() - float(m.properties[TS])/TIME_SEC) + if latency > 0: + self.total += latency + if latency < self.min: + self.min = latency + if latency > self.max: + self.max = latency + + def header(self): +# Throughput.header(self) + return "%s\tl-min\tl-max\tl-avg" % Throughput.header(self) + + def report(self): + output = Throughput.report(self) + if (self.samples > 0): + output += "\t%.2f\t%.2f\t%.2f" %(self.min, self.max, self.total/self.samples) + return output + + +# Report batch and overall statistics +class ReporterBase: + def __init__(self, batch, wantHeader): + self.batchSize = batch + self.batchCount = 0 + self.headerPrinted = not wantHeader + self.overall = None + self.batch = None + + def create(self): + return + + # Count message in the statistics + def message(self, m): + if self.overall == None: + self.overall = self.create() + self.overall.message(m) + if self.batchSize: + if self.batch == None: + self.batch = self.create() + self.batch.message(m) + self.batchCount+=1 + if self.batchCount == self.batchSize: + self.header() + print self.batch.report() + self.create() + self.batchCount = 0 + + # Print overall report. + def report(self): + if self.overall == None: + self.overall = self.create() + self.header() + print self.overall.report() + + def header(self): + if not self.headerPrinted: + if self.overall == None: + self.overall = self.create() + print self.overall.header() + self.headerPrinted = True + + +class Reporter(ReporterBase): + def __init__(self, batchSize, wantHeader, Stats): + ReporterBase.__init__(self, batchSize, wantHeader) + self.__stats = Stats + + def create(self): + ClassName = self.__stats.__class__ + return ClassName() diff --git a/python/qpid/client.py b/python/qpid/client.py index 5a877bb8d6..4d42a8b20f 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -18,13 +18,14 @@ # """ -An AQMP client implementation that uses a custom delegate for +An AMQP client implementation that uses a custom delegate for interacting with the server. """ import os, threading from peer import Peer, Channel, Closed from delegate import Delegate +from util import get_client_properties_with_defaults from connection08 import Connection, Frame, connect from spec08 import load from queue import Queue @@ -76,12 +77,12 @@ class Client: self.lock.release() return q - def start(self, response, mechanism="AMQPLAIN", locale="en_US", tune_params=None): + def start(self, response, mechanism="AMQPLAIN", locale="en_US", tune_params=None, client_properties=None): self.mechanism = mechanism self.response = response self.locale = locale self.tune_params = tune_params - + self.client_properties=get_client_properties_with_defaults(provided_client_properties=client_properties) self.socket = connect(self.host, self.port) self.conn = Connection(self.socket, self.spec) self.peer = Peer(self.conn, ClientDelegate(self), Session) @@ -128,7 +129,8 @@ class ClientDelegate(Delegate): def connection_start(self, ch, msg): msg.start_ok(mechanism=self.client.mechanism, response=self.client.response, - locale=self.client.locale) + locale=self.client.locale, + client_properties=self.client.client_properties) def connection_tune(self, ch, msg): if self.client.tune_params: diff --git a/python/qpid/connection08.py b/python/qpid/connection08.py index 654148dad2..0045e122ea 100644 --- a/python/qpid/connection08.py +++ b/python/qpid/connection08.py @@ -28,6 +28,9 @@ from cStringIO import StringIO from codec import EOF from compat import SHUT_RDWR from exceptions import VersionError +from logging import getLogger, DEBUG + +log = getLogger("qpid.connection08") class SockIO: @@ -35,7 +38,8 @@ class SockIO: self.sock = sock def write(self, buf): -# print "OUT: %r" % buf + if log.isEnabledFor(DEBUG): + log.debug("OUT: %r", buf) self.sock.sendall(buf) def read(self, n): @@ -47,8 +51,9 @@ class SockIO: break if len(s) == 0: break -# print "IN: %r" % s data += s + if log.isEnabledFor(DEBUG): + log.debug("IN: %r", data) return data def flush(self): @@ -120,19 +125,25 @@ class Connection: (self.spec.major, self.spec.minor, major, minor)) else: raise FramingError("unknown frame type: %s" % tid) - channel = c.decode_short() - body = c.decode_longstr() - dec = codec.Codec(StringIO(body), self.spec) - frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) - frame.channel = channel - end = c.decode_octet() - if end != self.FRAME_END: - garbage = "" - while end != self.FRAME_END: - garbage += chr(end) - end = c.decode_octet() - raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage) - return frame + try: + channel = c.decode_short() + body = c.decode_longstr() + dec = codec.Codec(StringIO(body), self.spec) + frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) + frame.channel = channel + end = c.decode_octet() + if end != self.FRAME_END: + garbage = "" + while end != self.FRAME_END: + garbage += chr(end) + end = c.decode_octet() + raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage) + return frame + except EOF: + # An EOF caught here can indicate an error decoding the frame, + # rather than that a disconnection occurred,so it's worth logging it. + log.exception("Error occurred when reading frame with tid %s" % tid) + raise def write_0_9(self, frame): self.write_8_0(frame) diff --git a/python/qpid/delegates.py b/python/qpid/delegates.py index 5e44a3a6dc..ae7ed7f988 100644 --- a/python/qpid/delegates.py +++ b/python/qpid/delegates.py @@ -18,7 +18,7 @@ # import os, connection, session -from util import notify +from util import notify, get_client_properties_with_defaults from datatypes import RangedSet from exceptions import VersionError, Closed from logging import getLogger @@ -137,24 +137,12 @@ class Server(Delegate): class Client(Delegate): - ppid = 0 - try: - ppid = os.getppid() - except: - pass - - PROPERTIES = {"product": "qpid python client", - "version": "development", - "platform": os.name, - "qpid.client_process": os.path.basename(sys.argv[0]), - "qpid.client_pid": os.getpid(), - "qpid.client_ppid": ppid} - def __init__(self, connection, username=None, password=None, mechanism=None, heartbeat=None, **kwargs): Delegate.__init__(self, connection) - self.client_properties=Client.PROPERTIES.copy() - self.client_properties.update(kwargs.get("client_properties",{})) + provided_client_properties = kwargs.get("client_properties") + self.client_properties=get_client_properties_with_defaults(provided_client_properties) + ## ## self.acceptableMechanisms is the list of SASL mechanisms that the client is willing to ## use. If it's None, then any mechanism is acceptable. diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index 3cb62d75c9..2bd638f327 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -31,7 +31,7 @@ from qpid.messaging.exceptions import * from qpid.messaging.message import get_codec, Disposition, Message from qpid.ops import * from qpid.selector import Selector -from qpid.util import URL, default +from qpid.util import URL, default,get_client_properties_with_defaults from qpid.validator import And, Context, List, Map, Types, Values from threading import Condition, Thread @@ -90,20 +90,6 @@ SUBJECT_DEFAULTS = { "topic": "#" } -# XXX -ppid = 0 -try: - ppid = os.getppid() -except: - pass - -CLIENT_PROPERTIES = {"product": "qpid python client", - "version": "development", - "platform": os.name, - "qpid.client_process": os.path.basename(sys.argv[0]), - "qpid.client_pid": os.getpid(), - "qpid.client_ppid": ppid} - def noop(): pass def sync_noop(): pass @@ -710,8 +696,7 @@ class Engine: except sasl.SASLError, e: raise AuthenticationFailure(text=str(e)) - client_properties = CLIENT_PROPERTIES.copy() - client_properties.update(self.connection.client_properties) + client_properties = get_client_properties_with_defaults(provided_client_properties=self.connection.client_properties); self.write_op(ConnectionStartOk(client_properties=client_properties, mechanism=mech, response=initial)) diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index e632c0c5b8..95ff5516d0 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -871,7 +871,7 @@ class Sender(Endpoint): self.queued += 1 if sync: - self.sync() + self.sync(timeout=timeout) assert message not in self.session.outgoing else: self._wakeup() diff --git a/python/qpid/messaging/transports.py b/python/qpid/messaging/transports.py index 532c365884..e901e98258 100644 --- a/python/qpid/messaging/transports.py +++ b/python/qpid/messaging/transports.py @@ -55,7 +55,41 @@ try: from ssl import wrap_socket, SSLError, SSL_ERROR_WANT_READ, \ SSL_ERROR_WANT_WRITE except ImportError: - pass + + ## try the older python SSL api: + from socket import ssl + + class old_ssl(SocketTransport): + def __init__(self, conn, host, port): + SocketTransport.__init__(self, conn, host, port) + # Bug (QPID-4337): this is the "old" version of python SSL. + # The private key is required. If a certificate is given, but no + # keyfile, assume the key is contained in the certificate + ssl_keyfile = conn.ssl_keyfile + ssl_certfile = conn.ssl_certfile + if ssl_certfile and not ssl_keyfile: + ssl_keyfile = ssl_certfile + self.ssl = ssl(self.socket, keyfile=ssl_keyfile, certfile=ssl_certfile) + self.socket.setblocking(1) + + def reading(self, reading): + return reading + + def writing(self, writing): + return writing + + def recv(self, n): + return self.ssl.read(n) + + def send(self, s): + return self.ssl.write(s) + + def close(self): + self.socket.close() + + TRANSPORTS["ssl"] = old_ssl + TRANSPORTS["tcp+tls"] = old_ssl + else: class tls(SocketTransport): diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index f9796982f5..2b283f3998 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -73,7 +73,7 @@ class TestBase(unittest.TestCase): else: self.client.close() - def connect(self, host=None, port=None, user=None, password=None, tune_params=None): + def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None): """Create a new connction, return the Client object""" host = host or self.config.broker.host port = port or self.config.broker.port or 5672 @@ -82,9 +82,9 @@ class TestBase(unittest.TestCase): client = qpid.client.Client(host, port) try: if client.spec.major == 8 and client.spec.minor == 0: - client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params) + client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params, client_properties=client_properties) else: - client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params) + client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params, client_properties=client_properties) except qpid.client.Closed, e: if isinstance(e.args[0], VersionError): raise Skipped(e.args[0]) diff --git a/python/qpid/tests/__init__.py b/python/qpid/tests/__init__.py index 101a0c3759..dc9988515e 100644 --- a/python/qpid/tests/__init__.py +++ b/python/qpid/tests/__init__.py @@ -37,6 +37,7 @@ import qpid.tests.datatypes import qpid.tests.connection import qpid.tests.spec010 import qpid.tests.codec010 +import qpid.tests.util class TestTestsXXX(Test): diff --git a/python/qpid/tests/util.py b/python/qpid/tests/util.py new file mode 100644 index 0000000000..9777443720 --- /dev/null +++ b/python/qpid/tests/util.py @@ -0,0 +1,46 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from unittest import TestCase +from qpid.util import get_client_properties_with_defaults + +class UtilTest (TestCase): + + def test_get_spec_recommended_client_properties(self): + client_properties = get_client_properties_with_defaults(provided_client_properties={"mykey":"myvalue"}) + self.assertTrue("product" in client_properties) + self.assertTrue("version" in client_properties) + self.assertTrue("platform" in client_properties) + + def test_get_client_properties_with_provided_value(self): + client_properties = get_client_properties_with_defaults(provided_client_properties={"mykey":"myvalue"}) + self.assertTrue("product" in client_properties) + self.assertTrue("mykey" in client_properties) + self.assertEqual("myvalue", client_properties["mykey"]) + + def test_get_client_properties_with_no_provided_values(self): + client_properties = get_client_properties_with_defaults(provided_client_properties=None) + self.assertTrue("product" in client_properties) + + client_properties = get_client_properties_with_defaults() + self.assertTrue("product" in client_properties) + + def test_get_client_properties_with_provided_value_that_overrides_default(self): + client_properties = get_client_properties_with_defaults(provided_client_properties={"version":"myversion"}) + self.assertEqual("myversion", client_properties["version"]) + diff --git a/python/qpid/util.py b/python/qpid/util.py index 39ad1d830e..8da17ce0c6 100644 --- a/python/qpid/util.py +++ b/python/qpid/util.py @@ -17,15 +17,19 @@ # under the License. # -import os, socket, time, textwrap, re +import os, socket, time, textwrap, re, sys try: from ssl import wrap_socket as ssl except ImportError: from socket import ssl as wrap_socket class ssl: - def __init__(self, sock, keyfile=None, certfile=None, trustfile=None): + # Bug (QPID-4337): this is the "old" version of python SSL. + # The private key is required. If a certificate is given, but no + # keyfile, assume the key is contained in the certificate + if certfile and not keyfile: + keyfile = certfile self.sock = sock self.ssl = wrap_socket(sock, keyfile=keyfile, certfile=certfile) @@ -38,6 +42,24 @@ except ImportError: def close(self): self.sock.close() +def get_client_properties_with_defaults(provided_client_properties={}): + ppid = 0 + try: + ppid = os.getppid() + except: + pass + + client_properties = {"product": "qpid python client", + "version": "development", + "platform": os.name, + "qpid.client_process": os.path.basename(sys.argv[0]), + "qpid.client_pid": os.getpid(), + "qpid.client_ppid": ppid} + + if provided_client_properties: + client_properties.update(provided_client_properties) + return client_properties + def connect(host, port): for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): af, socktype, proto, canonname, sa = res diff --git a/python/setup.py b/python/setup.py index 0b9d99a1af..56af530b43 100755 --- a/python/setup.py +++ b/python/setup.py @@ -298,7 +298,7 @@ class install_lib(_install_lib): return outfiles + extra setup(name="qpid-python", - version="0.19", + version="0.21", author="Apache Qpid", author_email="dev@qpid.apache.org", packages=["mllib", "qpid", "qpid.messaging", "qpid.tests", |