diff options
Diffstat (limited to 'python/qpid/messaging')
| -rw-r--r-- | python/qpid/messaging/driver.py | 34 | ||||
| -rw-r--r-- | python/qpid/messaging/endpoints.py | 7 | ||||
| -rw-r--r-- | python/qpid/messaging/transports.py | 107 |
3 files changed, 134 insertions, 14 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index 01393d6d70..cd86542860 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -26,13 +26,12 @@ from qpid.datatypes import RangedSet, Serial from qpid.exceptions import Timeout, VersionError from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ FrameDecoder, SegmentDecoder, OpDecoder -from qpid.messaging import address +from qpid.messaging import address, transports from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED from qpid.messaging.exceptions import ConnectError from qpid.messaging.message import get_codec, Disposition, Message from qpid.ops import * from qpid.selector import Selector -from qpid.util import connect from qpid.validator import And, Context, List, Map, Types, Values from threading import Condition, Thread @@ -328,7 +327,7 @@ class Driver: self.connection.backups self._host = 0 self._retrying = False - self._socket = None + self._transport = None self._timeout = None @@ -346,15 +345,17 @@ class Driver: self._selector.unregister(self) def fileno(self): - return self._socket.fileno() + return self._transport.fileno() @synchronized def reading(self): - return self._socket is not None + return self._transport is not None and \ + self._transport.reading(True) @synchronized def writing(self): - return self._socket is not None and self.engine.pending() + return self._transport is not None and \ + self._transport.writing(self.engine.pending()) @synchronized def timing(self): @@ -363,8 +364,10 @@ class Driver: @synchronized def readable(self): try: - data = self._socket.recv(64*1024) - if data: + data = self._transport.recv(64*1024) + if data is None: + return + elif data: rawlog.debug("READ[%s]: %r", self.log_id, data) self.engine.write(data) else: @@ -404,8 +407,8 @@ class Driver: def st_closed(self): # XXX: this log statement seems to sometimes hit when the socket is not connected # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) - self._socket.close() - self._socket = None + self._transport.close() + self._transport = None self.engine = None return True @@ -416,7 +419,8 @@ class Driver: def writeable(self): notify = False try: - n = self._socket.send(self.engine.peek()) + n = self._transport.send(self.engine.peek()) + if n == 0: return sent = self.engine.read(n) rawlog.debug("SENT[%s]: %r", self.log_id, sent) except socket.error, e: @@ -433,7 +437,7 @@ class Driver: def dispatch(self): try: - if self._socket is None: + if self._transport is None: if self.connection._connected: self.connect() else: @@ -454,7 +458,11 @@ class Driver: self.engine = Engine(self.connection) self.engine.open() rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) - self._socket = connect(host, port) + trans = getattr(transports, self.connection.transport, None) + if trans: + self._transport = trans(host, port) + else: + raise ConnectError("no such transport: %s" % self.connection.transport) if self._retrying: log.warn("reconnect succeeded: %s:%s", host, port) self._timeout = None diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index 3a23cd6e28..c3f76e4adc 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -79,7 +79,6 @@ class Connection: @return: a disconnected Connection """ self.host = host - self.port = default(port, AMQP_PORT) self.username = username self.password = password self.mechanisms = options.get("mechanisms") @@ -87,9 +86,15 @@ class Connection: self.reconnect = options.get("reconnect", False) self.reconnect_delay = options.get("reconnect_delay", 3) self.reconnect_limit = options.get("reconnect_limit") + self.transport = options.get("transport", "plain") self.backups = options.get("backups", []) self.options = options + if self.transport == "tls": + self.port = default(port, AMQPS_PORT) + else: + self.port = default(port, AMQP_PORT) + self.id = str(uuid4()) self.session_counter = 0 self.sessions = {} diff --git a/python/qpid/messaging/transports.py b/python/qpid/messaging/transports.py new file mode 100644 index 0000000000..1dea469fe5 --- /dev/null +++ b/python/qpid/messaging/transports.py @@ -0,0 +1,107 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.util import connect + +class plain: + + def __init__(self, host, port): + self.socket = connect(host, port) + + def fileno(self): + return self.socket.fileno() + + def reading(self, reading): + return reading + + def writing(self, writing): + return writing + + def send(self, bytes): + return self.socket.send(bytes) + + def recv(self, n): + return self.socket.recv(n) + + def close(self): + self.socket.close() + +try: + from ssl import wrap_socket, SSLError, SSL_ERROR_WANT_READ, \ + SSL_ERROR_WANT_WRITE +except ImportError: + pass +else: + class tls: + + def __init__(self, host, port): + self.socket = connect(host, port) + self.tls = wrap_socket(self.socket) + self.socket.setblocking(0) + self.state = None + + def fileno(self): + return self.socket.fileno() + + def reading(self, reading): + if self.state is None: + return reading + else: + return self.state == SSL_ERROR_WANT_READ + + def writing(self, writing): + if self.state is None: + return writing + else: + return self.state == SSL_ERROR_WANT_WRITE + + def send(self, bytes): + self._clear_state() + try: + return self.tls.write(bytes) + except SSLError, e: + if self._update_state(e.args[0]): + return 0 + else: + raise + + def recv(self, n): + self._clear_state() + try: + return self.tls.read(n) + except SSLError, e: + if self._update_state(e.args[0]): + return None + else: + raise + + def _clear_state(self): + self.state = None + + def _update_state(self, code): + if code in (SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE): + self.state = code + return True + else: + return False + + def close(self): + self.socket.setblocking(1) + # this closes the underlying socket + self.tls.close() |
