diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2010-03-31 21:17:09 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2010-03-31 21:17:09 +0000 |
| commit | 30ff4dcc85eaf5a2ea52cad9d965086c8062a4ce (patch) | |
| tree | f316eee10c4613d74f2b768e043cf91a084e896f /qpid/python | |
| parent | c53c4cc94e121c0fc3df6010cffa1bbb49a779db (diff) | |
| download | qpid-python-30ff4dcc85eaf5a2ea52cad9d965086c8062a4ce.tar.gz | |
added SSL support to API
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@929717 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
| -rwxr-xr-x | qpid/python/examples/api/drain | 2 | ||||
| -rwxr-xr-x | qpid/python/examples/api/server | 2 | ||||
| -rwxr-xr-x | qpid/python/examples/api/spout | 2 | ||||
| -rw-r--r-- | qpid/python/qpid/messaging/driver.py | 34 | ||||
| -rw-r--r-- | qpid/python/qpid/messaging/endpoints.py | 7 | ||||
| -rw-r--r-- | qpid/python/qpid/messaging/transports.py | 107 | ||||
| -rw-r--r-- | qpid/python/qpid/tests/messaging/__init__.py | 11 | ||||
| -rw-r--r-- | qpid/python/qpid/tests/messaging/endpoints.py | 19 | ||||
| -rw-r--r-- | qpid/python/qpid/tests/messaging/message.py | 2 |
9 files changed, 159 insertions, 27 deletions
diff --git a/qpid/python/examples/api/drain b/qpid/python/examples/api/drain index c244cbc09c..d7ac03afa6 100755 --- a/qpid/python/examples/api/drain +++ b/qpid/python/examples/api/drain @@ -73,7 +73,7 @@ class Formatter: return eval(st, self.environ) # XXX: should make URL default the port for us -conn = Connection(url.host, url.port or AMQP_PORT, +conn = Connection(url.host, url.port, username=url.user, password=url.password, reconnect=opts.reconnect, diff --git a/qpid/python/examples/api/server b/qpid/python/examples/api/server index d7cd53de4b..f0bf1c2a4b 100755 --- a/qpid/python/examples/api/server +++ b/qpid/python/examples/api/server @@ -51,7 +51,7 @@ else: parser.error("address is required") # XXX: should make URL default the port for us -conn = Connection(url.host, url.port or AMQP_PORT, +conn = Connection(url.host, url.port, username=url.user, password=url.password, reconnect=opts.reconnect, diff --git a/qpid/python/examples/api/spout b/qpid/python/examples/api/spout index 9606c3501f..0d37ede512 100755 --- a/qpid/python/examples/api/spout +++ b/qpid/python/examples/api/spout @@ -93,7 +93,7 @@ else: content = text # XXX: should make URL default the port for us -conn = Connection(url.host, url.port or AMQP_PORT, +conn = Connection(url.host, url.port, username=url.user, password=url.password, reconnect=opts.reconnect, diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index 01393d6d70..cd86542860 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -26,13 +26,12 @@ from qpid.datatypes import RangedSet, Serial from qpid.exceptions import Timeout, VersionError from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ FrameDecoder, SegmentDecoder, OpDecoder -from qpid.messaging import address +from qpid.messaging import address, transports from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED from qpid.messaging.exceptions import ConnectError from qpid.messaging.message import get_codec, Disposition, Message from qpid.ops import * from qpid.selector import Selector -from qpid.util import connect from qpid.validator import And, Context, List, Map, Types, Values from threading import Condition, Thread @@ -328,7 +327,7 @@ class Driver: self.connection.backups self._host = 0 self._retrying = False - self._socket = None + self._transport = None self._timeout = None @@ -346,15 +345,17 @@ class Driver: self._selector.unregister(self) def fileno(self): - return self._socket.fileno() + return self._transport.fileno() @synchronized def reading(self): - return self._socket is not None + return self._transport is not None and \ + self._transport.reading(True) @synchronized def writing(self): - return self._socket is not None and self.engine.pending() + return self._transport is not None and \ + self._transport.writing(self.engine.pending()) @synchronized def timing(self): @@ -363,8 +364,10 @@ class Driver: @synchronized def readable(self): try: - data = self._socket.recv(64*1024) - if data: + data = self._transport.recv(64*1024) + if data is None: + return + elif data: rawlog.debug("READ[%s]: %r", self.log_id, data) self.engine.write(data) else: @@ -404,8 +407,8 @@ class Driver: def st_closed(self): # XXX: this log statement seems to sometimes hit when the socket is not connected # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) - self._socket.close() - self._socket = None + self._transport.close() + self._transport = None self.engine = None return True @@ -416,7 +419,8 @@ class Driver: def writeable(self): notify = False try: - n = self._socket.send(self.engine.peek()) + n = self._transport.send(self.engine.peek()) + if n == 0: return sent = self.engine.read(n) rawlog.debug("SENT[%s]: %r", self.log_id, sent) except socket.error, e: @@ -433,7 +437,7 @@ class Driver: def dispatch(self): try: - if self._socket is None: + if self._transport is None: if self.connection._connected: self.connect() else: @@ -454,7 +458,11 @@ class Driver: self.engine = Engine(self.connection) self.engine.open() rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) - self._socket = connect(host, port) + trans = getattr(transports, self.connection.transport, None) + if trans: + self._transport = trans(host, port) + else: + raise ConnectError("no such transport: %s" % self.connection.transport) if self._retrying: log.warn("reconnect succeeded: %s:%s", host, port) self._timeout = None diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py index 3a23cd6e28..c3f76e4adc 100644 --- a/qpid/python/qpid/messaging/endpoints.py +++ b/qpid/python/qpid/messaging/endpoints.py @@ -79,7 +79,6 @@ class Connection: @return: a disconnected Connection """ self.host = host - self.port = default(port, AMQP_PORT) self.username = username self.password = password self.mechanisms = options.get("mechanisms") @@ -87,9 +86,15 @@ class Connection: self.reconnect = options.get("reconnect", False) self.reconnect_delay = options.get("reconnect_delay", 3) self.reconnect_limit = options.get("reconnect_limit") + self.transport = options.get("transport", "plain") self.backups = options.get("backups", []) self.options = options + if self.transport == "tls": + self.port = default(port, AMQPS_PORT) + else: + self.port = default(port, AMQP_PORT) + self.id = str(uuid4()) self.session_counter = 0 self.sessions = {} diff --git a/qpid/python/qpid/messaging/transports.py b/qpid/python/qpid/messaging/transports.py new file mode 100644 index 0000000000..1dea469fe5 --- /dev/null +++ b/qpid/python/qpid/messaging/transports.py @@ -0,0 +1,107 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.util import connect + +class plain: + + def __init__(self, host, port): + self.socket = connect(host, port) + + def fileno(self): + return self.socket.fileno() + + def reading(self, reading): + return reading + + def writing(self, writing): + return writing + + def send(self, bytes): + return self.socket.send(bytes) + + def recv(self, n): + return self.socket.recv(n) + + def close(self): + self.socket.close() + +try: + from ssl import wrap_socket, SSLError, SSL_ERROR_WANT_READ, \ + SSL_ERROR_WANT_WRITE +except ImportError: + pass +else: + class tls: + + def __init__(self, host, port): + self.socket = connect(host, port) + self.tls = wrap_socket(self.socket) + self.socket.setblocking(0) + self.state = None + + def fileno(self): + return self.socket.fileno() + + def reading(self, reading): + if self.state is None: + return reading + else: + return self.state == SSL_ERROR_WANT_READ + + def writing(self, writing): + if self.state is None: + return writing + else: + return self.state == SSL_ERROR_WANT_WRITE + + def send(self, bytes): + self._clear_state() + try: + return self.tls.write(bytes) + except SSLError, e: + if self._update_state(e.args[0]): + return 0 + else: + raise + + def recv(self, n): + self._clear_state() + try: + return self.tls.read(n) + except SSLError, e: + if self._update_state(e.args[0]): + return None + else: + raise + + def _clear_state(self): + self.state = None + + def _update_state(self, code): + if code in (SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE): + self.state = code + return True + else: + return False + + def close(self): + self.socket.setblocking(1) + # this closes the underlying socket + self.tls.close() diff --git a/qpid/python/qpid/tests/messaging/__init__.py b/qpid/python/qpid/tests/messaging/__init__.py index c3581efb9d..b2a600982a 100644 --- a/qpid/python/qpid/tests/messaging/__init__.py +++ b/qpid/python/qpid/tests/messaging/__init__.py @@ -143,4 +143,15 @@ class Base(Test): def reconnect(self): return self.get_bool("reconnect") + + def transport(self): + if self.broker.scheme == self.broker.AMQPS: + return "tls" + else: + return "plain" + + def connection_options(self): + return {"reconnect": self.reconnect(), + "transport": self.transport()} + import address, endpoints, message diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py index b1f00b680c..aced76feac 100644 --- a/qpid/python/qpid/tests/messaging/endpoints.py +++ b/qpid/python/qpid/tests/messaging/endpoints.py @@ -30,13 +30,13 @@ class SetupTests(Base): def testOpen(self): # XXX: need to flesh out URL support/syntax self.conn = Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) + **self.connection_options()) self.ping(self.conn.session()) def testConnect(self): # XXX: need to flesh out URL support/syntax self.conn = Connection(self.broker.host, self.broker.port, - reconnect=self.reconnect()) + **self.connection_options()) self.conn.connect() self.ping(self.conn.session()) @@ -65,7 +65,8 @@ class SetupTests(Base): for i in range(32): if fds: os.close(fds.pop()) for i in xrange(64): - conn = Connection.open(self.broker.host, self.broker.port) + conn = Connection.open(self.broker.host, self.broker.port, + **self.connection_options()) conn.close() finally: while fds: @@ -75,7 +76,7 @@ class ConnectionTests(Base): def setup_connection(self): return Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) + **self.connection_options()) def testSessionAnon(self): ssn1 = self.conn.session() @@ -118,7 +119,7 @@ class SessionTests(Base): def setup_connection(self): return Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) + **self.connection_options()) def setup_session(self): return self.conn.session() @@ -405,7 +406,7 @@ class ReceiverTests(Base): def setup_connection(self): return Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) + **self.connection_options()) def setup_session(self): return self.conn.session() @@ -575,7 +576,7 @@ class AddressTests(Base): def setup_connection(self): return Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) + **self.connection_options()) def setup_session(self): return self.conn.session() @@ -846,7 +847,7 @@ class AddressErrorTests(Base): def setup_connection(self): return Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) + **self.connection_options()) def setup_session(self): return self.conn.session() @@ -913,7 +914,7 @@ class SenderTests(Base): def setup_connection(self): return Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) + **self.connection_options()) def setup_session(self): return self.conn.session() diff --git a/qpid/python/qpid/tests/messaging/message.py b/qpid/python/qpid/tests/messaging/message.py index f2701af64b..9272be7fa4 100644 --- a/qpid/python/qpid/tests/messaging/message.py +++ b/qpid/python/qpid/tests/messaging/message.py @@ -54,7 +54,7 @@ class MessageEchoTests(Base): def setup_connection(self): return Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) + **self.connection_options()) def setup_session(self): return self.conn.session() |
