From 73b7ec00e546b27d2bed22d30ad48a8735cfe6f0 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 28 Mar 2011 23:18:16 +0000 Subject: Merge branch 'trunk' into qpid-2920 Conflicts: qpid/cpp/src/cluster.mk qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1086439 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/python/qpid/messaging/driver.py | 12 +++++++++--- qpid/python/qpid/messaging/endpoints.py | 1 + qpid/python/qpid/messaging/transports.py | 18 ++++++++++-------- qpid/python/qpid/tests/messaging/endpoints.py | 22 ++++++++++++++++++---- 4 files changed, 38 insertions(+), 15 deletions(-) (limited to 'qpid/python') diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index 2eb2c1863e..78af2827df 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -66,7 +66,7 @@ class Attachment: # XXX -DURABLE_DEFAULT=True +DURABLE_DEFAULT=False # XXX @@ -526,7 +526,7 @@ class Driver: rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) trans = transports.TRANSPORTS.get(self.connection.transport) if trans: - self._transport = trans(host, port) + self._transport = trans(self.connection, host, port) else: raise ConnectError("no such transport: %s" % self.connection.transport) if self._retrying and self._reconnect_log: @@ -930,6 +930,7 @@ class Engine: def resolve_declare(self, sst, lnk, dir, action): declare = lnk.options.get("create") in ("always", dir) + assrt = lnk.options.get("assert") in ("always", dir) def do_resolved(type, subtype): err = None if type is None: @@ -938,7 +939,12 @@ class Engine: else: err = NotFound(text="no such queue: %s" % lnk.name) else: - action(type, subtype) + if assrt: + expected = lnk.options.get("node", {}).get("type") + if expected and type != expected: + err = AssertionFailed(text="expected %s, got %s" % (expected, type)) + if err is None: + action(type, subtype) if err: tgt = lnk.target diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py index 30c5850397..cfc89d4e2b 100644 --- a/qpid/python/qpid/messaging/endpoints.py +++ b/qpid/python/qpid/messaging/endpoints.py @@ -158,6 +158,7 @@ class Connection(Endpoint): self.reconnect_log = options.get("reconnect_log", True) self.address_ttl = options.get("address_ttl", 60) + self.tcp_nodelay = options.get("tcp_nodelay", False) self.options = options diff --git a/qpid/python/qpid/messaging/transports.py b/qpid/python/qpid/messaging/transports.py index 8133a45604..7abaae12e8 100644 --- a/qpid/python/qpid/messaging/transports.py +++ b/qpid/python/qpid/messaging/transports.py @@ -17,18 +17,23 @@ # under the License. # +import socket from qpid.util import connect TRANSPORTS = {} -class tcp: +class SocketTransport: - def __init__(self, host, port): + def __init__(self, conn, host, port): self.socket = connect(host, port) + if conn.tcp_nodelay: + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) def fileno(self): return self.socket.fileno() +class tcp(SocketTransport): + def reading(self, reading): return reading @@ -52,17 +57,14 @@ try: except ImportError: pass else: - class tls: + class tls(SocketTransport): - def __init__(self, host, port): - self.socket = connect(host, port) + def __init__(self, conn, host, port): + SocketTransport.__init__(self, conn, host, port) self.tls = wrap_socket(self.socket) self.socket.setblocking(0) self.state = None - def fileno(self): - return self.socket.fileno() - def reading(self, reading): if self.state is None: return reading diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py index 0977b2ab3a..db5ec03df2 100644 --- a/qpid/python/qpid/tests/messaging/endpoints.py +++ b/qpid/python/qpid/tests/messaging/endpoints.py @@ -46,6 +46,10 @@ class SetupTests(Base): self.conn.open() self.ping(self.conn.session()) + def testTcpNodelay(self): + self.conn = Connection.establish(self.broker, tcp_nodelay=True) + assert self.conn._driver._transport.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) + def testConnectError(self): try: # Specifying port 0 yields a bad address on Windows; port 4 is unassigned @@ -111,8 +115,8 @@ class SetupTests(Base): class flaky: - def __init__(self, host, port): - self.real = real(host, port) + def __init__(self, conn, host, port): + self.real = real(conn, host, port) self.sent_count = 0 self.recv_count = 0 @@ -251,8 +255,8 @@ class ConnectionTests(Base): class hangable: - def __init__(self, host, port): - self.tcp = TRANSPORTS["tcp"](host, port) + def __init__(self, conn, host, port): + self.tcp = TRANSPORTS["tcp"](conn, host, port) self.hung = False def hang(self): @@ -1182,6 +1186,16 @@ test-link-bindings-queue; { snd.send(m) self.drain(qrcv, expected=msgs) + def testAssert1(self): + try: + snd = self.ssn.sender("amq.topic; {assert: always, node: {type: queue}}") + assert 0, "assertion failed to trigger" + except AssertionFailed, e: + pass + + def testAssert2(self): + snd = self.ssn.sender("amq.topic; {assert: always}") + NOSUCH_Q = "this-queue-should-not-exist" UNPARSEABLE_ADDR = "name/subject; {bad options" UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" -- cgit v1.2.1