summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-03-31 21:17:09 +0000
committerRafael H. Schloming <rhs@apache.org>2010-03-31 21:17:09 +0000
commit30ff4dcc85eaf5a2ea52cad9d965086c8062a4ce (patch)
treef316eee10c4613d74f2b768e043cf91a084e896f /qpid/python
parentc53c4cc94e121c0fc3df6010cffa1bbb49a779db (diff)
downloadqpid-python-30ff4dcc85eaf5a2ea52cad9d965086c8062a4ce.tar.gz
added SSL support to API
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@929717 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
-rwxr-xr-xqpid/python/examples/api/drain2
-rwxr-xr-xqpid/python/examples/api/server2
-rwxr-xr-xqpid/python/examples/api/spout2
-rw-r--r--qpid/python/qpid/messaging/driver.py34
-rw-r--r--qpid/python/qpid/messaging/endpoints.py7
-rw-r--r--qpid/python/qpid/messaging/transports.py107
-rw-r--r--qpid/python/qpid/tests/messaging/__init__.py11
-rw-r--r--qpid/python/qpid/tests/messaging/endpoints.py19
-rw-r--r--qpid/python/qpid/tests/messaging/message.py2
9 files changed, 159 insertions, 27 deletions
diff --git a/qpid/python/examples/api/drain b/qpid/python/examples/api/drain
index c244cbc09c..d7ac03afa6 100755
--- a/qpid/python/examples/api/drain
+++ b/qpid/python/examples/api/drain
@@ -73,7 +73,7 @@ class Formatter:
return eval(st, self.environ)
# XXX: should make URL default the port for us
-conn = Connection(url.host, url.port or AMQP_PORT,
+conn = Connection(url.host, url.port,
username=url.user,
password=url.password,
reconnect=opts.reconnect,
diff --git a/qpid/python/examples/api/server b/qpid/python/examples/api/server
index d7cd53de4b..f0bf1c2a4b 100755
--- a/qpid/python/examples/api/server
+++ b/qpid/python/examples/api/server
@@ -51,7 +51,7 @@ else:
parser.error("address is required")
# XXX: should make URL default the port for us
-conn = Connection(url.host, url.port or AMQP_PORT,
+conn = Connection(url.host, url.port,
username=url.user,
password=url.password,
reconnect=opts.reconnect,
diff --git a/qpid/python/examples/api/spout b/qpid/python/examples/api/spout
index 9606c3501f..0d37ede512 100755
--- a/qpid/python/examples/api/spout
+++ b/qpid/python/examples/api/spout
@@ -93,7 +93,7 @@ else:
content = text
# XXX: should make URL default the port for us
-conn = Connection(url.host, url.port or AMQP_PORT,
+conn = Connection(url.host, url.port,
username=url.user,
password=url.password,
reconnect=opts.reconnect,
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py
index 01393d6d70..cd86542860 100644
--- a/qpid/python/qpid/messaging/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -26,13 +26,12 @@ from qpid.datatypes import RangedSet, Serial
from qpid.exceptions import Timeout, VersionError
from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
FrameDecoder, SegmentDecoder, OpDecoder
-from qpid.messaging import address
+from qpid.messaging import address, transports
from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
from qpid.messaging.exceptions import ConnectError
from qpid.messaging.message import get_codec, Disposition, Message
from qpid.ops import *
from qpid.selector import Selector
-from qpid.util import connect
from qpid.validator import And, Context, List, Map, Types, Values
from threading import Condition, Thread
@@ -328,7 +327,7 @@ class Driver:
self.connection.backups
self._host = 0
self._retrying = False
- self._socket = None
+ self._transport = None
self._timeout = None
@@ -346,15 +345,17 @@ class Driver:
self._selector.unregister(self)
def fileno(self):
- return self._socket.fileno()
+ return self._transport.fileno()
@synchronized
def reading(self):
- return self._socket is not None
+ return self._transport is not None and \
+ self._transport.reading(True)
@synchronized
def writing(self):
- return self._socket is not None and self.engine.pending()
+ return self._transport is not None and \
+ self._transport.writing(self.engine.pending())
@synchronized
def timing(self):
@@ -363,8 +364,10 @@ class Driver:
@synchronized
def readable(self):
try:
- data = self._socket.recv(64*1024)
- if data:
+ data = self._transport.recv(64*1024)
+ if data is None:
+ return
+ elif data:
rawlog.debug("READ[%s]: %r", self.log_id, data)
self.engine.write(data)
else:
@@ -404,8 +407,8 @@ class Driver:
def st_closed(self):
# XXX: this log statement seems to sometimes hit when the socket is not connected
# XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername())
- self._socket.close()
- self._socket = None
+ self._transport.close()
+ self._transport = None
self.engine = None
return True
@@ -416,7 +419,8 @@ class Driver:
def writeable(self):
notify = False
try:
- n = self._socket.send(self.engine.peek())
+ n = self._transport.send(self.engine.peek())
+ if n == 0: return
sent = self.engine.read(n)
rawlog.debug("SENT[%s]: %r", self.log_id, sent)
except socket.error, e:
@@ -433,7 +437,7 @@ class Driver:
def dispatch(self):
try:
- if self._socket is None:
+ if self._transport is None:
if self.connection._connected:
self.connect()
else:
@@ -454,7 +458,11 @@ class Driver:
self.engine = Engine(self.connection)
self.engine.open()
rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
- self._socket = connect(host, port)
+ trans = getattr(transports, self.connection.transport, None)
+ if trans:
+ self._transport = trans(host, port)
+ else:
+ raise ConnectError("no such transport: %s" % self.connection.transport)
if self._retrying:
log.warn("reconnect succeeded: %s:%s", host, port)
self._timeout = None
diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py
index 3a23cd6e28..c3f76e4adc 100644
--- a/qpid/python/qpid/messaging/endpoints.py
+++ b/qpid/python/qpid/messaging/endpoints.py
@@ -79,7 +79,6 @@ class Connection:
@return: a disconnected Connection
"""
self.host = host
- self.port = default(port, AMQP_PORT)
self.username = username
self.password = password
self.mechanisms = options.get("mechanisms")
@@ -87,9 +86,15 @@ class Connection:
self.reconnect = options.get("reconnect", False)
self.reconnect_delay = options.get("reconnect_delay", 3)
self.reconnect_limit = options.get("reconnect_limit")
+ self.transport = options.get("transport", "plain")
self.backups = options.get("backups", [])
self.options = options
+ if self.transport == "tls":
+ self.port = default(port, AMQPS_PORT)
+ else:
+ self.port = default(port, AMQP_PORT)
+
self.id = str(uuid4())
self.session_counter = 0
self.sessions = {}
diff --git a/qpid/python/qpid/messaging/transports.py b/qpid/python/qpid/messaging/transports.py
new file mode 100644
index 0000000000..1dea469fe5
--- /dev/null
+++ b/qpid/python/qpid/messaging/transports.py
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.util import connect
+
+class plain:
+
+ def __init__(self, host, port):
+ self.socket = connect(host, port)
+
+ def fileno(self):
+ return self.socket.fileno()
+
+ def reading(self, reading):
+ return reading
+
+ def writing(self, writing):
+ return writing
+
+ def send(self, bytes):
+ return self.socket.send(bytes)
+
+ def recv(self, n):
+ return self.socket.recv(n)
+
+ def close(self):
+ self.socket.close()
+
+try:
+ from ssl import wrap_socket, SSLError, SSL_ERROR_WANT_READ, \
+ SSL_ERROR_WANT_WRITE
+except ImportError:
+ pass
+else:
+ class tls:
+
+ def __init__(self, host, port):
+ self.socket = connect(host, port)
+ self.tls = wrap_socket(self.socket)
+ self.socket.setblocking(0)
+ self.state = None
+
+ def fileno(self):
+ return self.socket.fileno()
+
+ def reading(self, reading):
+ if self.state is None:
+ return reading
+ else:
+ return self.state == SSL_ERROR_WANT_READ
+
+ def writing(self, writing):
+ if self.state is None:
+ return writing
+ else:
+ return self.state == SSL_ERROR_WANT_WRITE
+
+ def send(self, bytes):
+ self._clear_state()
+ try:
+ return self.tls.write(bytes)
+ except SSLError, e:
+ if self._update_state(e.args[0]):
+ return 0
+ else:
+ raise
+
+ def recv(self, n):
+ self._clear_state()
+ try:
+ return self.tls.read(n)
+ except SSLError, e:
+ if self._update_state(e.args[0]):
+ return None
+ else:
+ raise
+
+ def _clear_state(self):
+ self.state = None
+
+ def _update_state(self, code):
+ if code in (SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE):
+ self.state = code
+ return True
+ else:
+ return False
+
+ def close(self):
+ self.socket.setblocking(1)
+ # this closes the underlying socket
+ self.tls.close()
diff --git a/qpid/python/qpid/tests/messaging/__init__.py b/qpid/python/qpid/tests/messaging/__init__.py
index c3581efb9d..b2a600982a 100644
--- a/qpid/python/qpid/tests/messaging/__init__.py
+++ b/qpid/python/qpid/tests/messaging/__init__.py
@@ -143,4 +143,15 @@ class Base(Test):
def reconnect(self):
return self.get_bool("reconnect")
+
+ def transport(self):
+ if self.broker.scheme == self.broker.AMQPS:
+ return "tls"
+ else:
+ return "plain"
+
+ def connection_options(self):
+ return {"reconnect": self.reconnect(),
+ "transport": self.transport()}
+
import address, endpoints, message
diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py
index b1f00b680c..aced76feac 100644
--- a/qpid/python/qpid/tests/messaging/endpoints.py
+++ b/qpid/python/qpid/tests/messaging/endpoints.py
@@ -30,13 +30,13 @@ class SetupTests(Base):
def testOpen(self):
# XXX: need to flesh out URL support/syntax
self.conn = Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
self.ping(self.conn.session())
def testConnect(self):
# XXX: need to flesh out URL support/syntax
self.conn = Connection(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
self.conn.connect()
self.ping(self.conn.session())
@@ -65,7 +65,8 @@ class SetupTests(Base):
for i in range(32):
if fds: os.close(fds.pop())
for i in xrange(64):
- conn = Connection.open(self.broker.host, self.broker.port)
+ conn = Connection.open(self.broker.host, self.broker.port,
+ **self.connection_options())
conn.close()
finally:
while fds:
@@ -75,7 +76,7 @@ class ConnectionTests(Base):
def setup_connection(self):
return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
def testSessionAnon(self):
ssn1 = self.conn.session()
@@ -118,7 +119,7 @@ class SessionTests(Base):
def setup_connection(self):
return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
def setup_session(self):
return self.conn.session()
@@ -405,7 +406,7 @@ class ReceiverTests(Base):
def setup_connection(self):
return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
def setup_session(self):
return self.conn.session()
@@ -575,7 +576,7 @@ class AddressTests(Base):
def setup_connection(self):
return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
def setup_session(self):
return self.conn.session()
@@ -846,7 +847,7 @@ class AddressErrorTests(Base):
def setup_connection(self):
return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
def setup_session(self):
return self.conn.session()
@@ -913,7 +914,7 @@ class SenderTests(Base):
def setup_connection(self):
return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
def setup_session(self):
return self.conn.session()
diff --git a/qpid/python/qpid/tests/messaging/message.py b/qpid/python/qpid/tests/messaging/message.py
index f2701af64b..9272be7fa4 100644
--- a/qpid/python/qpid/tests/messaging/message.py
+++ b/qpid/python/qpid/tests/messaging/message.py
@@ -54,7 +54,7 @@ class MessageEchoTests(Base):
def setup_connection(self):
return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
def setup_session(self):
return self.conn.session()