diff options
Diffstat (limited to 'python/qpid/messaging')
-rw-r--r-- | python/qpid/messaging/driver.py | 15 | ||||
-rw-r--r-- | python/qpid/messaging/endpoints.py | 5 | ||||
-rw-r--r-- | python/qpid/messaging/transports.py | 18 |
3 files changed, 24 insertions, 14 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index 2eb2c1863e..7c21388213 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -66,7 +66,7 @@ class Attachment: # XXX -DURABLE_DEFAULT=True +DURABLE_DEFAULT=False # XXX @@ -526,7 +526,7 @@ class Driver: rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) trans = transports.TRANSPORTS.get(self.connection.transport) if trans: - self._transport = trans(host, port) + self._transport = trans(self.connection, host, port) else: raise ConnectError("no such transport: %s" % self.connection.transport) if self._retrying and self._reconnect_log: @@ -828,8 +828,9 @@ class Engine: self._closing = True def attach(self, ssn): + if ssn.closed: return sst = self._attachments.get(ssn) - if sst is None and not ssn.closed: + if sst is None: for i in xrange(0, self.channel_max): if not self._sessions.has_key(i): ch = i @@ -930,6 +931,7 @@ class Engine: def resolve_declare(self, sst, lnk, dir, action): declare = lnk.options.get("create") in ("always", dir) + assrt = lnk.options.get("assert") in ("always", dir) def do_resolved(type, subtype): err = None if type is None: @@ -938,7 +940,12 @@ class Engine: else: err = NotFound(text="no such queue: %s" % lnk.name) else: - action(type, subtype) + if assrt: + expected = lnk.options.get("node", {}).get("type") + if expected and type != expected: + err = AssertionFailed(text="expected %s, got %s" % (expected, type)) + if err is None: + action(type, subtype) if err: tgt = lnk.target diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index b8101b76e6..338ac70ecf 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -158,6 +158,7 @@ class Connection(Endpoint): self.reconnect_log = options.get("reconnect_log", True) self.address_ttl = options.get("address_ttl", 60) + self.tcp_nodelay = options.get("tcp_nodelay", False) self.options = options @@ -197,7 +198,7 @@ class Connection(Endpoint): return result def check_closed(self): - if self.closed: + if not self._connected: self._condition.gc() raise ConnectionClosed() @@ -1006,9 +1007,9 @@ class Receiver(Endpoint, object): self.draining = True self._wakeup() self._ecwait(lambda: not self.draining) + msg = self.session._get(self, timeout=0) self._grant() self._wakeup() - msg = self.session._get(self, timeout=0) if msg is None: raise Empty() elif self._capacity not in (0, UNLIMITED.value): diff --git a/python/qpid/messaging/transports.py b/python/qpid/messaging/transports.py index 8133a45604..7abaae12e8 100644 --- a/python/qpid/messaging/transports.py +++ b/python/qpid/messaging/transports.py @@ -17,18 +17,23 @@ # under the License. # +import socket from qpid.util import connect TRANSPORTS = {} -class tcp: +class SocketTransport: - def __init__(self, host, port): + def __init__(self, conn, host, port): self.socket = connect(host, port) + if conn.tcp_nodelay: + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) def fileno(self): return self.socket.fileno() +class tcp(SocketTransport): + def reading(self, reading): return reading @@ -52,17 +57,14 @@ try: except ImportError: pass else: - class tls: + class tls(SocketTransport): - def __init__(self, host, port): - self.socket = connect(host, port) + def __init__(self, conn, host, port): + SocketTransport.__init__(self, conn, host, port) self.tls = wrap_socket(self.socket) self.socket.setblocking(0) self.state = None - def fileno(self): - return self.socket.fileno() - def reading(self, reading): if self.state is None: return reading |