summaryrefslogtreecommitdiff
path: root/python/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid')
-rw-r--r--python/qpid/client.py10
-rw-r--r--python/qpid/connection08.py41
-rw-r--r--python/qpid/delegates.py20
-rw-r--r--python/qpid/messaging/driver.py19
-rw-r--r--python/qpid/messaging/endpoints.py2
-rw-r--r--python/qpid/messaging/transports.py36
-rw-r--r--python/qpid/testlib.py6
-rw-r--r--python/qpid/tests/__init__.py1
-rw-r--r--python/qpid/tests/util.py46
-rw-r--r--python/qpid/util.py26
10 files changed, 148 insertions, 59 deletions
diff --git a/python/qpid/client.py b/python/qpid/client.py
index 5a877bb8d6..4d42a8b20f 100644
--- a/python/qpid/client.py
+++ b/python/qpid/client.py
@@ -18,13 +18,14 @@
#
"""
-An AQMP client implementation that uses a custom delegate for
+An AMQP client implementation that uses a custom delegate for
interacting with the server.
"""
import os, threading
from peer import Peer, Channel, Closed
from delegate import Delegate
+from util import get_client_properties_with_defaults
from connection08 import Connection, Frame, connect
from spec08 import load
from queue import Queue
@@ -76,12 +77,12 @@ class Client:
self.lock.release()
return q
- def start(self, response, mechanism="AMQPLAIN", locale="en_US", tune_params=None):
+ def start(self, response, mechanism="AMQPLAIN", locale="en_US", tune_params=None, client_properties=None):
self.mechanism = mechanism
self.response = response
self.locale = locale
self.tune_params = tune_params
-
+ self.client_properties=get_client_properties_with_defaults(provided_client_properties=client_properties)
self.socket = connect(self.host, self.port)
self.conn = Connection(self.socket, self.spec)
self.peer = Peer(self.conn, ClientDelegate(self), Session)
@@ -128,7 +129,8 @@ class ClientDelegate(Delegate):
def connection_start(self, ch, msg):
msg.start_ok(mechanism=self.client.mechanism,
response=self.client.response,
- locale=self.client.locale)
+ locale=self.client.locale,
+ client_properties=self.client.client_properties)
def connection_tune(self, ch, msg):
if self.client.tune_params:
diff --git a/python/qpid/connection08.py b/python/qpid/connection08.py
index 654148dad2..0045e122ea 100644
--- a/python/qpid/connection08.py
+++ b/python/qpid/connection08.py
@@ -28,6 +28,9 @@ from cStringIO import StringIO
from codec import EOF
from compat import SHUT_RDWR
from exceptions import VersionError
+from logging import getLogger, DEBUG
+
+log = getLogger("qpid.connection08")
class SockIO:
@@ -35,7 +38,8 @@ class SockIO:
self.sock = sock
def write(self, buf):
-# print "OUT: %r" % buf
+ if log.isEnabledFor(DEBUG):
+ log.debug("OUT: %r", buf)
self.sock.sendall(buf)
def read(self, n):
@@ -47,8 +51,9 @@ class SockIO:
break
if len(s) == 0:
break
-# print "IN: %r" % s
data += s
+ if log.isEnabledFor(DEBUG):
+ log.debug("IN: %r", data)
return data
def flush(self):
@@ -120,19 +125,25 @@ class Connection:
(self.spec.major, self.spec.minor, major, minor))
else:
raise FramingError("unknown frame type: %s" % tid)
- channel = c.decode_short()
- body = c.decode_longstr()
- dec = codec.Codec(StringIO(body), self.spec)
- frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
- frame.channel = channel
- end = c.decode_octet()
- if end != self.FRAME_END:
- garbage = ""
- while end != self.FRAME_END:
- garbage += chr(end)
- end = c.decode_octet()
- raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
- return frame
+ try:
+ channel = c.decode_short()
+ body = c.decode_longstr()
+ dec = codec.Codec(StringIO(body), self.spec)
+ frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
+ frame.channel = channel
+ end = c.decode_octet()
+ if end != self.FRAME_END:
+ garbage = ""
+ while end != self.FRAME_END:
+ garbage += chr(end)
+ end = c.decode_octet()
+ raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
+ return frame
+ except EOF:
+ # An EOF caught here can indicate an error decoding the frame,
+ # rather than that a disconnection occurred,so it's worth logging it.
+ log.exception("Error occurred when reading frame with tid %s" % tid)
+ raise
def write_0_9(self, frame):
self.write_8_0(frame)
diff --git a/python/qpid/delegates.py b/python/qpid/delegates.py
index 5e44a3a6dc..ae7ed7f988 100644
--- a/python/qpid/delegates.py
+++ b/python/qpid/delegates.py
@@ -18,7 +18,7 @@
#
import os, connection, session
-from util import notify
+from util import notify, get_client_properties_with_defaults
from datatypes import RangedSet
from exceptions import VersionError, Closed
from logging import getLogger
@@ -137,24 +137,12 @@ class Server(Delegate):
class Client(Delegate):
- ppid = 0
- try:
- ppid = os.getppid()
- except:
- pass
-
- PROPERTIES = {"product": "qpid python client",
- "version": "development",
- "platform": os.name,
- "qpid.client_process": os.path.basename(sys.argv[0]),
- "qpid.client_pid": os.getpid(),
- "qpid.client_ppid": ppid}
-
def __init__(self, connection, username=None, password=None,
mechanism=None, heartbeat=None, **kwargs):
Delegate.__init__(self, connection)
- self.client_properties=Client.PROPERTIES.copy()
- self.client_properties.update(kwargs.get("client_properties",{}))
+ provided_client_properties = kwargs.get("client_properties")
+ self.client_properties=get_client_properties_with_defaults(provided_client_properties)
+
##
## self.acceptableMechanisms is the list of SASL mechanisms that the client is willing to
## use. If it's None, then any mechanism is acceptable.
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index 3cb62d75c9..2bd638f327 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -31,7 +31,7 @@ from qpid.messaging.exceptions import *
from qpid.messaging.message import get_codec, Disposition, Message
from qpid.ops import *
from qpid.selector import Selector
-from qpid.util import URL, default
+from qpid.util import URL, default,get_client_properties_with_defaults
from qpid.validator import And, Context, List, Map, Types, Values
from threading import Condition, Thread
@@ -90,20 +90,6 @@ SUBJECT_DEFAULTS = {
"topic": "#"
}
-# XXX
-ppid = 0
-try:
- ppid = os.getppid()
-except:
- pass
-
-CLIENT_PROPERTIES = {"product": "qpid python client",
- "version": "development",
- "platform": os.name,
- "qpid.client_process": os.path.basename(sys.argv[0]),
- "qpid.client_pid": os.getpid(),
- "qpid.client_ppid": ppid}
-
def noop(): pass
def sync_noop(): pass
@@ -710,8 +696,7 @@ class Engine:
except sasl.SASLError, e:
raise AuthenticationFailure(text=str(e))
- client_properties = CLIENT_PROPERTIES.copy()
- client_properties.update(self.connection.client_properties)
+ client_properties = get_client_properties_with_defaults(provided_client_properties=self.connection.client_properties);
self.write_op(ConnectionStartOk(client_properties=client_properties,
mechanism=mech, response=initial))
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
index e632c0c5b8..95ff5516d0 100644
--- a/python/qpid/messaging/endpoints.py
+++ b/python/qpid/messaging/endpoints.py
@@ -871,7 +871,7 @@ class Sender(Endpoint):
self.queued += 1
if sync:
- self.sync()
+ self.sync(timeout=timeout)
assert message not in self.session.outgoing
else:
self._wakeup()
diff --git a/python/qpid/messaging/transports.py b/python/qpid/messaging/transports.py
index 532c365884..e901e98258 100644
--- a/python/qpid/messaging/transports.py
+++ b/python/qpid/messaging/transports.py
@@ -55,7 +55,41 @@ try:
from ssl import wrap_socket, SSLError, SSL_ERROR_WANT_READ, \
SSL_ERROR_WANT_WRITE
except ImportError:
- pass
+
+ ## try the older python SSL api:
+ from socket import ssl
+
+ class old_ssl(SocketTransport):
+ def __init__(self, conn, host, port):
+ SocketTransport.__init__(self, conn, host, port)
+ # Bug (QPID-4337): this is the "old" version of python SSL.
+ # The private key is required. If a certificate is given, but no
+ # keyfile, assume the key is contained in the certificate
+ ssl_keyfile = conn.ssl_keyfile
+ ssl_certfile = conn.ssl_certfile
+ if ssl_certfile and not ssl_keyfile:
+ ssl_keyfile = ssl_certfile
+ self.ssl = ssl(self.socket, keyfile=ssl_keyfile, certfile=ssl_certfile)
+ self.socket.setblocking(1)
+
+ def reading(self, reading):
+ return reading
+
+ def writing(self, writing):
+ return writing
+
+ def recv(self, n):
+ return self.ssl.read(n)
+
+ def send(self, s):
+ return self.ssl.write(s)
+
+ def close(self):
+ self.socket.close()
+
+ TRANSPORTS["ssl"] = old_ssl
+ TRANSPORTS["tcp+tls"] = old_ssl
+
else:
class tls(SocketTransport):
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index f9796982f5..2b283f3998 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -73,7 +73,7 @@ class TestBase(unittest.TestCase):
else:
self.client.close()
- def connect(self, host=None, port=None, user=None, password=None, tune_params=None):
+ def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None):
"""Create a new connction, return the Client object"""
host = host or self.config.broker.host
port = port or self.config.broker.port or 5672
@@ -82,9 +82,9 @@ class TestBase(unittest.TestCase):
client = qpid.client.Client(host, port)
try:
if client.spec.major == 8 and client.spec.minor == 0:
- client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params)
+ client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params, client_properties=client_properties)
else:
- client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params)
+ client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params, client_properties=client_properties)
except qpid.client.Closed, e:
if isinstance(e.args[0], VersionError):
raise Skipped(e.args[0])
diff --git a/python/qpid/tests/__init__.py b/python/qpid/tests/__init__.py
index 101a0c3759..dc9988515e 100644
--- a/python/qpid/tests/__init__.py
+++ b/python/qpid/tests/__init__.py
@@ -37,6 +37,7 @@ import qpid.tests.datatypes
import qpid.tests.connection
import qpid.tests.spec010
import qpid.tests.codec010
+import qpid.tests.util
class TestTestsXXX(Test):
diff --git a/python/qpid/tests/util.py b/python/qpid/tests/util.py
new file mode 100644
index 0000000000..9777443720
--- /dev/null
+++ b/python/qpid/tests/util.py
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from unittest import TestCase
+from qpid.util import get_client_properties_with_defaults
+
+class UtilTest (TestCase):
+
+ def test_get_spec_recommended_client_properties(self):
+ client_properties = get_client_properties_with_defaults(provided_client_properties={"mykey":"myvalue"})
+ self.assertTrue("product" in client_properties)
+ self.assertTrue("version" in client_properties)
+ self.assertTrue("platform" in client_properties)
+
+ def test_get_client_properties_with_provided_value(self):
+ client_properties = get_client_properties_with_defaults(provided_client_properties={"mykey":"myvalue"})
+ self.assertTrue("product" in client_properties)
+ self.assertTrue("mykey" in client_properties)
+ self.assertEqual("myvalue", client_properties["mykey"])
+
+ def test_get_client_properties_with_no_provided_values(self):
+ client_properties = get_client_properties_with_defaults(provided_client_properties=None)
+ self.assertTrue("product" in client_properties)
+
+ client_properties = get_client_properties_with_defaults()
+ self.assertTrue("product" in client_properties)
+
+ def test_get_client_properties_with_provided_value_that_overrides_default(self):
+ client_properties = get_client_properties_with_defaults(provided_client_properties={"version":"myversion"})
+ self.assertEqual("myversion", client_properties["version"])
+
diff --git a/python/qpid/util.py b/python/qpid/util.py
index 39ad1d830e..8da17ce0c6 100644
--- a/python/qpid/util.py
+++ b/python/qpid/util.py
@@ -17,15 +17,19 @@
# under the License.
#
-import os, socket, time, textwrap, re
+import os, socket, time, textwrap, re, sys
try:
from ssl import wrap_socket as ssl
except ImportError:
from socket import ssl as wrap_socket
class ssl:
-
def __init__(self, sock, keyfile=None, certfile=None, trustfile=None):
+ # Bug (QPID-4337): this is the "old" version of python SSL.
+ # The private key is required. If a certificate is given, but no
+ # keyfile, assume the key is contained in the certificate
+ if certfile and not keyfile:
+ keyfile = certfile
self.sock = sock
self.ssl = wrap_socket(sock, keyfile=keyfile, certfile=certfile)
@@ -38,6 +42,24 @@ except ImportError:
def close(self):
self.sock.close()
+def get_client_properties_with_defaults(provided_client_properties={}):
+ ppid = 0
+ try:
+ ppid = os.getppid()
+ except:
+ pass
+
+ client_properties = {"product": "qpid python client",
+ "version": "development",
+ "platform": os.name,
+ "qpid.client_process": os.path.basename(sys.argv[0]),
+ "qpid.client_pid": os.getpid(),
+ "qpid.client_ppid": ppid}
+
+ if provided_client_properties:
+ client_properties.update(provided_client_properties)
+ return client_properties
+
def connect(host, port):
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res