summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-03-28 23:18:16 +0000
committerAlan Conway <aconway@apache.org>2011-03-28 23:18:16 +0000
commit73b7ec00e546b27d2bed22d30ad48a8735cfe6f0 (patch)
tree45900f0e0a964b814207fd0adfbacc36b0bbab26 /qpid/python
parentea98033c3df7fbc1364df69c4edb5cf1e808e87e (diff)
downloadqpid-python-73b7ec00e546b27d2bed22d30ad48a8735cfe6f0.tar.gz
Merge branch 'trunk' into qpid-2920
Conflicts: qpid/cpp/src/cluster.mk qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1086439 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/qpid/messaging/driver.py12
-rw-r--r--qpid/python/qpid/messaging/endpoints.py1
-rw-r--r--qpid/python/qpid/messaging/transports.py18
-rw-r--r--qpid/python/qpid/tests/messaging/endpoints.py22
4 files changed, 38 insertions, 15 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py
index 2eb2c1863e..78af2827df 100644
--- a/qpid/python/qpid/messaging/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -66,7 +66,7 @@ class Attachment:
# XXX
-DURABLE_DEFAULT=True
+DURABLE_DEFAULT=False
# XXX
@@ -526,7 +526,7 @@ class Driver:
rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
trans = transports.TRANSPORTS.get(self.connection.transport)
if trans:
- self._transport = trans(host, port)
+ self._transport = trans(self.connection, host, port)
else:
raise ConnectError("no such transport: %s" % self.connection.transport)
if self._retrying and self._reconnect_log:
@@ -930,6 +930,7 @@ class Engine:
def resolve_declare(self, sst, lnk, dir, action):
declare = lnk.options.get("create") in ("always", dir)
+ assrt = lnk.options.get("assert") in ("always", dir)
def do_resolved(type, subtype):
err = None
if type is None:
@@ -938,7 +939,12 @@ class Engine:
else:
err = NotFound(text="no such queue: %s" % lnk.name)
else:
- action(type, subtype)
+ if assrt:
+ expected = lnk.options.get("node", {}).get("type")
+ if expected and type != expected:
+ err = AssertionFailed(text="expected %s, got %s" % (expected, type))
+ if err is None:
+ action(type, subtype)
if err:
tgt = lnk.target
diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py
index 30c5850397..cfc89d4e2b 100644
--- a/qpid/python/qpid/messaging/endpoints.py
+++ b/qpid/python/qpid/messaging/endpoints.py
@@ -158,6 +158,7 @@ class Connection(Endpoint):
self.reconnect_log = options.get("reconnect_log", True)
self.address_ttl = options.get("address_ttl", 60)
+ self.tcp_nodelay = options.get("tcp_nodelay", False)
self.options = options
diff --git a/qpid/python/qpid/messaging/transports.py b/qpid/python/qpid/messaging/transports.py
index 8133a45604..7abaae12e8 100644
--- a/qpid/python/qpid/messaging/transports.py
+++ b/qpid/python/qpid/messaging/transports.py
@@ -17,18 +17,23 @@
# under the License.
#
+import socket
from qpid.util import connect
TRANSPORTS = {}
-class tcp:
+class SocketTransport:
- def __init__(self, host, port):
+ def __init__(self, conn, host, port):
self.socket = connect(host, port)
+ if conn.tcp_nodelay:
+ self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
def fileno(self):
return self.socket.fileno()
+class tcp(SocketTransport):
+
def reading(self, reading):
return reading
@@ -52,17 +57,14 @@ try:
except ImportError:
pass
else:
- class tls:
+ class tls(SocketTransport):
- def __init__(self, host, port):
- self.socket = connect(host, port)
+ def __init__(self, conn, host, port):
+ SocketTransport.__init__(self, conn, host, port)
self.tls = wrap_socket(self.socket)
self.socket.setblocking(0)
self.state = None
- def fileno(self):
- return self.socket.fileno()
-
def reading(self, reading):
if self.state is None:
return reading
diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py
index 0977b2ab3a..db5ec03df2 100644
--- a/qpid/python/qpid/tests/messaging/endpoints.py
+++ b/qpid/python/qpid/tests/messaging/endpoints.py
@@ -46,6 +46,10 @@ class SetupTests(Base):
self.conn.open()
self.ping(self.conn.session())
+ def testTcpNodelay(self):
+ self.conn = Connection.establish(self.broker, tcp_nodelay=True)
+ assert self.conn._driver._transport.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
+
def testConnectError(self):
try:
# Specifying port 0 yields a bad address on Windows; port 4 is unassigned
@@ -111,8 +115,8 @@ class SetupTests(Base):
class flaky:
- def __init__(self, host, port):
- self.real = real(host, port)
+ def __init__(self, conn, host, port):
+ self.real = real(conn, host, port)
self.sent_count = 0
self.recv_count = 0
@@ -251,8 +255,8 @@ class ConnectionTests(Base):
class hangable:
- def __init__(self, host, port):
- self.tcp = TRANSPORTS["tcp"](host, port)
+ def __init__(self, conn, host, port):
+ self.tcp = TRANSPORTS["tcp"](conn, host, port)
self.hung = False
def hang(self):
@@ -1182,6 +1186,16 @@ test-link-bindings-queue; {
snd.send(m)
self.drain(qrcv, expected=msgs)
+ def testAssert1(self):
+ try:
+ snd = self.ssn.sender("amq.topic; {assert: always, node: {type: queue}}")
+ assert 0, "assertion failed to trigger"
+ except AssertionFailed, e:
+ pass
+
+ def testAssert2(self):
+ snd = self.ssn.sender("amq.topic; {assert: always}")
+
NOSUCH_Q = "this-queue-should-not-exist"
UNPARSEABLE_ADDR = "name/subject; {bad options"
UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"