summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2011-10-21 14:42:12 +0000
committerStephen D. Huston <shuston@apache.org>2011-10-21 14:42:12 +0000
commitf83677056891e436bf5ba99e79240df2a44528cd (patch)
tree625bfd644b948e89105630759cf6decb0435354d /python
parentebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff)
downloadqpid-python-QPID-2519.tar.gz
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/client.py2
-rw-r--r--python/qpid/codec010.py17
-rw-r--r--python/qpid/messaging/driver.py15
-rw-r--r--python/qpid/messaging/endpoints.py5
-rw-r--r--python/qpid/messaging/transports.py18
-rw-r--r--python/qpid/tests/messaging/endpoints.py25
-rw-r--r--python/qpid/util.py54
-rwxr-xr-xpython/setup.py2
8 files changed, 99 insertions, 39 deletions
diff --git a/python/qpid/client.py b/python/qpid/client.py
index 45ce8498e8..5a877bb8d6 100644
--- a/python/qpid/client.py
+++ b/python/qpid/client.py
@@ -106,7 +106,7 @@ class Client:
try:
id = None
for i in xrange(1, 64*1024):
- if not self.sessions.has_key(id):
+ if not self.sessions.has_key(i):
id = i
break
finally:
diff --git a/python/qpid/codec010.py b/python/qpid/codec010.py
index 0846db6bf7..94a1cd4263 100644
--- a/python/qpid/codec010.py
+++ b/python/qpid/codec010.py
@@ -17,7 +17,7 @@
# under the License.
#
-import datetime
+import datetime, string
from packer import Packer
from datatypes import serial, timestamp, RangedSet, Struct, UUID
from ops import Compound, PRIMITIVE, COMPOUND
@@ -241,15 +241,20 @@ class Codec(Packer):
v = sc.read_primitive(type)
result[k] = v
return result
+
+ def _write_map_elem(self, k, v):
+ type = self.encoding(v)
+ sc = StringCodec()
+ sc.write_str8(k)
+ sc.write_uint8(type.CODE)
+ sc.write_primitive(type, v)
+ return sc.encoded
+
def write_map(self, m):
sc = StringCodec()
if m is not None:
sc.write_uint32(len(m))
- for k, v in m.items():
- type = self.encoding(v)
- sc.write_str8(k)
- sc.write_uint8(type.CODE)
- sc.write_primitive(type, v)
+ sc.write(string.joinfields(map(self._write_map_elem, m.keys(), m.values()), ""))
self.write_vbin32(sc.encoded)
def read_array(self):
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index 2eb2c1863e..7c21388213 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -66,7 +66,7 @@ class Attachment:
# XXX
-DURABLE_DEFAULT=True
+DURABLE_DEFAULT=False
# XXX
@@ -526,7 +526,7 @@ class Driver:
rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
trans = transports.TRANSPORTS.get(self.connection.transport)
if trans:
- self._transport = trans(host, port)
+ self._transport = trans(self.connection, host, port)
else:
raise ConnectError("no such transport: %s" % self.connection.transport)
if self._retrying and self._reconnect_log:
@@ -828,8 +828,9 @@ class Engine:
self._closing = True
def attach(self, ssn):
+ if ssn.closed: return
sst = self._attachments.get(ssn)
- if sst is None and not ssn.closed:
+ if sst is None:
for i in xrange(0, self.channel_max):
if not self._sessions.has_key(i):
ch = i
@@ -930,6 +931,7 @@ class Engine:
def resolve_declare(self, sst, lnk, dir, action):
declare = lnk.options.get("create") in ("always", dir)
+ assrt = lnk.options.get("assert") in ("always", dir)
def do_resolved(type, subtype):
err = None
if type is None:
@@ -938,7 +940,12 @@ class Engine:
else:
err = NotFound(text="no such queue: %s" % lnk.name)
else:
- action(type, subtype)
+ if assrt:
+ expected = lnk.options.get("node", {}).get("type")
+ if expected and type != expected:
+ err = AssertionFailed(text="expected %s, got %s" % (expected, type))
+ if err is None:
+ action(type, subtype)
if err:
tgt = lnk.target
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
index b8101b76e6..338ac70ecf 100644
--- a/python/qpid/messaging/endpoints.py
+++ b/python/qpid/messaging/endpoints.py
@@ -158,6 +158,7 @@ class Connection(Endpoint):
self.reconnect_log = options.get("reconnect_log", True)
self.address_ttl = options.get("address_ttl", 60)
+ self.tcp_nodelay = options.get("tcp_nodelay", False)
self.options = options
@@ -197,7 +198,7 @@ class Connection(Endpoint):
return result
def check_closed(self):
- if self.closed:
+ if not self._connected:
self._condition.gc()
raise ConnectionClosed()
@@ -1006,9 +1007,9 @@ class Receiver(Endpoint, object):
self.draining = True
self._wakeup()
self._ecwait(lambda: not self.draining)
+ msg = self.session._get(self, timeout=0)
self._grant()
self._wakeup()
- msg = self.session._get(self, timeout=0)
if msg is None:
raise Empty()
elif self._capacity not in (0, UNLIMITED.value):
diff --git a/python/qpid/messaging/transports.py b/python/qpid/messaging/transports.py
index 8133a45604..7abaae12e8 100644
--- a/python/qpid/messaging/transports.py
+++ b/python/qpid/messaging/transports.py
@@ -17,18 +17,23 @@
# under the License.
#
+import socket
from qpid.util import connect
TRANSPORTS = {}
-class tcp:
+class SocketTransport:
- def __init__(self, host, port):
+ def __init__(self, conn, host, port):
self.socket = connect(host, port)
+ if conn.tcp_nodelay:
+ self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
def fileno(self):
return self.socket.fileno()
+class tcp(SocketTransport):
+
def reading(self, reading):
return reading
@@ -52,17 +57,14 @@ try:
except ImportError:
pass
else:
- class tls:
+ class tls(SocketTransport):
- def __init__(self, host, port):
- self.socket = connect(host, port)
+ def __init__(self, conn, host, port):
+ SocketTransport.__init__(self, conn, host, port)
self.tls = wrap_socket(self.socket)
self.socket.setblocking(0)
self.state = None
- def fileno(self):
- return self.socket.fileno()
-
def reading(self, reading):
if self.state is None:
return reading
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py
index c303ca652a..db5ec03df2 100644
--- a/python/qpid/tests/messaging/endpoints.py
+++ b/python/qpid/tests/messaging/endpoints.py
@@ -46,6 +46,10 @@ class SetupTests(Base):
self.conn.open()
self.ping(self.conn.session())
+ def testTcpNodelay(self):
+ self.conn = Connection.establish(self.broker, tcp_nodelay=True)
+ assert self.conn._driver._transport.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
+
def testConnectError(self):
try:
# Specifying port 0 yields a bad address on Windows; port 4 is unassigned
@@ -111,8 +115,8 @@ class SetupTests(Base):
class flaky:
- def __init__(self, host, port):
- self.real = real(host, port)
+ def __init__(self, conn, host, port):
+ self.real = real(conn, host, port)
self.sent_count = 0
self.recv_count = 0
@@ -186,6 +190,9 @@ class ConnectionTests(Base):
def setup_connection(self):
return Connection.establish(self.broker, **self.connection_options())
+ def testCheckClosed(self):
+ assert not self.conn.check_closed()
+
def testSessionAnon(self):
ssn1 = self.conn.session()
ssn2 = self.conn.session()
@@ -248,8 +255,8 @@ class ConnectionTests(Base):
class hangable:
- def __init__(self, host, port):
- self.tcp = TRANSPORTS["tcp"](host, port)
+ def __init__(self, conn, host, port):
+ self.tcp = TRANSPORTS["tcp"](conn, host, port)
self.hung = False
def hang(self):
@@ -1179,6 +1186,16 @@ test-link-bindings-queue; {
snd.send(m)
self.drain(qrcv, expected=msgs)
+ def testAssert1(self):
+ try:
+ snd = self.ssn.sender("amq.topic; {assert: always, node: {type: queue}}")
+ assert 0, "assertion failed to trigger"
+ except AssertionFailed, e:
+ pass
+
+ def testAssert2(self):
+ snd = self.ssn.sender("amq.topic; {assert: always}")
+
NOSUCH_Q = "this-queue-should-not-exist"
UNPARSEABLE_ADDR = "name/subject; {bad options"
UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
diff --git a/python/qpid/util.py b/python/qpid/util.py
index e62bebdf24..89677289e2 100644
--- a/python/qpid/util.py
+++ b/python/qpid/util.py
@@ -39,12 +39,17 @@ except ImportError:
self.sock.close()
def connect(host, port):
- sock = socket.socket()
- sock.connect((host, port))
- sock.setblocking(1)
- # XXX: we could use this on read, but we'd have to put write in a
- # loop as well
- # sock.settimeout(1)
+ for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
+ af, socktype, proto, canonname, sa = res
+ sock = socket.socket(af, socktype, proto)
+ try:
+ sock.connect(sa)
+ break
+ except socket.error, msg:
+ sock.close
+ else:
+ # If we got here then we couldn't connect (yet)
+ raise
return sock
def listen(host, port, predicate = lambda: True, bound = lambda: None):
@@ -101,15 +106,23 @@ def fill(text, indent, heading = None):
class URL:
RE = re.compile(r"""
- # [ <scheme>:// ] [ <user> [ / <password> ] @] <host> [ :<port> ]
- ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: / ([^:/@]+) )? @)? ([^@:/]+) (?: :([0-9]+))?$
-""", re.X)
+ # [ <scheme>:// ] [ <user> [ / <password> ] @] ( <host4> | \[ <host6> \] ) [ :<port> ]
+ ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: / ([^:/@]+) )? @)? (?: ([^@:/\[]+) | \[ ([a-f0-9:.]+) \] ) (?: :([0-9]+))?$
+""", re.X | re.I)
AMQPS = "amqps"
AMQP = "amqp"
- def __init__(self, s):
- if isinstance(s, URL):
+ def __init__(self, s=None, **kwargs):
+ if s is None:
+ self.scheme = kwargs.get('scheme', None)
+ self.user = kwargs.get('user', None)
+ self.password = kwargs.get('password', None)
+ self.host = kwargs.get('host', None)
+ self.port = kwargs.get('port', None)
+ if self.host is None:
+ raise ValueError('Host required for url')
+ elif isinstance(s, URL):
self.scheme = s.scheme
self.user = s.user
self.password = s.password
@@ -119,7 +132,8 @@ class URL:
match = URL.RE.match(s)
if match is None:
raise ValueError(s)
- self.scheme, self.user, self.password, self.host, port = match.groups()
+ self.scheme, self.user, self.password, host4, host6, port = match.groups()
+ self.host = host4 or host6
if port is None:
self.port = None
else:
@@ -137,11 +151,25 @@ class URL:
if self.password:
s += "/%s" % self.password
s += "@"
- s += self.host
+ if ':' not in self.host:
+ s += self.host
+ else:
+ s += "[%s]" % self.host
if self.port:
s += ":%s" % self.port
return s
+ def __eq__(self, url):
+ if isinstance(url, basestring):
+ url = URL(url)
+ return \
+ self.scheme==url.scheme and \
+ self.user==url.user and self.password==url.password and \
+ self.host==url.host and self.port==url.port
+
+ def __ne__(self, url):
+ return not self.__eq__(url)
+
def default(value, default):
if value is None:
return default
diff --git a/python/setup.py b/python/setup.py
index 9c04421c51..65cb72854f 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -298,7 +298,7 @@ class install_lib(_install_lib):
return outfiles + extra
setup(name="qpid-python",
- version="0.9",
+ version="0.13",
author="Apache Qpid",
author_email="dev@qpid.apache.org",
packages=["mllib", "qpid", "qpid.messaging", "qpid.tests",