diff options
Diffstat (limited to 'python/qpid')
-rw-r--r-- | python/qpid/client.py | 2 | ||||
-rw-r--r-- | python/qpid/codec010.py | 17 | ||||
-rw-r--r-- | python/qpid/messaging/driver.py | 15 | ||||
-rw-r--r-- | python/qpid/messaging/endpoints.py | 5 | ||||
-rw-r--r-- | python/qpid/messaging/transports.py | 18 | ||||
-rw-r--r-- | python/qpid/tests/messaging/endpoints.py | 25 | ||||
-rw-r--r-- | python/qpid/util.py | 54 |
7 files changed, 98 insertions, 38 deletions
diff --git a/python/qpid/client.py b/python/qpid/client.py index 45ce8498e8..5a877bb8d6 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -106,7 +106,7 @@ class Client: try: id = None for i in xrange(1, 64*1024): - if not self.sessions.has_key(id): + if not self.sessions.has_key(i): id = i break finally: diff --git a/python/qpid/codec010.py b/python/qpid/codec010.py index 0846db6bf7..94a1cd4263 100644 --- a/python/qpid/codec010.py +++ b/python/qpid/codec010.py @@ -17,7 +17,7 @@ # under the License. # -import datetime +import datetime, string from packer import Packer from datatypes import serial, timestamp, RangedSet, Struct, UUID from ops import Compound, PRIMITIVE, COMPOUND @@ -241,15 +241,20 @@ class Codec(Packer): v = sc.read_primitive(type) result[k] = v return result + + def _write_map_elem(self, k, v): + type = self.encoding(v) + sc = StringCodec() + sc.write_str8(k) + sc.write_uint8(type.CODE) + sc.write_primitive(type, v) + return sc.encoded + def write_map(self, m): sc = StringCodec() if m is not None: sc.write_uint32(len(m)) - for k, v in m.items(): - type = self.encoding(v) - sc.write_str8(k) - sc.write_uint8(type.CODE) - sc.write_primitive(type, v) + sc.write(string.joinfields(map(self._write_map_elem, m.keys(), m.values()), "")) self.write_vbin32(sc.encoded) def read_array(self): diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index 2eb2c1863e..7c21388213 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -66,7 +66,7 @@ class Attachment: # XXX -DURABLE_DEFAULT=True +DURABLE_DEFAULT=False # XXX @@ -526,7 +526,7 @@ class Driver: rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) trans = transports.TRANSPORTS.get(self.connection.transport) if trans: - self._transport = trans(host, port) + self._transport = trans(self.connection, host, port) else: raise ConnectError("no such transport: %s" % self.connection.transport) if self._retrying and self._reconnect_log: @@ -828,8 +828,9 @@ class Engine: self._closing = True def attach(self, ssn): + if ssn.closed: return sst = self._attachments.get(ssn) - if sst is None and not ssn.closed: + if sst is None: for i in xrange(0, self.channel_max): if not self._sessions.has_key(i): ch = i @@ -930,6 +931,7 @@ class Engine: def resolve_declare(self, sst, lnk, dir, action): declare = lnk.options.get("create") in ("always", dir) + assrt = lnk.options.get("assert") in ("always", dir) def do_resolved(type, subtype): err = None if type is None: @@ -938,7 +940,12 @@ class Engine: else: err = NotFound(text="no such queue: %s" % lnk.name) else: - action(type, subtype) + if assrt: + expected = lnk.options.get("node", {}).get("type") + if expected and type != expected: + err = AssertionFailed(text="expected %s, got %s" % (expected, type)) + if err is None: + action(type, subtype) if err: tgt = lnk.target diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index b8101b76e6..338ac70ecf 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -158,6 +158,7 @@ class Connection(Endpoint): self.reconnect_log = options.get("reconnect_log", True) self.address_ttl = options.get("address_ttl", 60) + self.tcp_nodelay = options.get("tcp_nodelay", False) self.options = options @@ -197,7 +198,7 @@ class Connection(Endpoint): return result def check_closed(self): - if self.closed: + if not self._connected: self._condition.gc() raise ConnectionClosed() @@ -1006,9 +1007,9 @@ class Receiver(Endpoint, object): self.draining = True self._wakeup() self._ecwait(lambda: not self.draining) + msg = self.session._get(self, timeout=0) self._grant() self._wakeup() - msg = self.session._get(self, timeout=0) if msg is None: raise Empty() elif self._capacity not in (0, UNLIMITED.value): diff --git a/python/qpid/messaging/transports.py b/python/qpid/messaging/transports.py index 8133a45604..7abaae12e8 100644 --- a/python/qpid/messaging/transports.py +++ b/python/qpid/messaging/transports.py @@ -17,18 +17,23 @@ # under the License. # +import socket from qpid.util import connect TRANSPORTS = {} -class tcp: +class SocketTransport: - def __init__(self, host, port): + def __init__(self, conn, host, port): self.socket = connect(host, port) + if conn.tcp_nodelay: + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) def fileno(self): return self.socket.fileno() +class tcp(SocketTransport): + def reading(self, reading): return reading @@ -52,17 +57,14 @@ try: except ImportError: pass else: - class tls: + class tls(SocketTransport): - def __init__(self, host, port): - self.socket = connect(host, port) + def __init__(self, conn, host, port): + SocketTransport.__init__(self, conn, host, port) self.tls = wrap_socket(self.socket) self.socket.setblocking(0) self.state = None - def fileno(self): - return self.socket.fileno() - def reading(self, reading): if self.state is None: return reading diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py index c303ca652a..db5ec03df2 100644 --- a/python/qpid/tests/messaging/endpoints.py +++ b/python/qpid/tests/messaging/endpoints.py @@ -46,6 +46,10 @@ class SetupTests(Base): self.conn.open() self.ping(self.conn.session()) + def testTcpNodelay(self): + self.conn = Connection.establish(self.broker, tcp_nodelay=True) + assert self.conn._driver._transport.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) + def testConnectError(self): try: # Specifying port 0 yields a bad address on Windows; port 4 is unassigned @@ -111,8 +115,8 @@ class SetupTests(Base): class flaky: - def __init__(self, host, port): - self.real = real(host, port) + def __init__(self, conn, host, port): + self.real = real(conn, host, port) self.sent_count = 0 self.recv_count = 0 @@ -186,6 +190,9 @@ class ConnectionTests(Base): def setup_connection(self): return Connection.establish(self.broker, **self.connection_options()) + def testCheckClosed(self): + assert not self.conn.check_closed() + def testSessionAnon(self): ssn1 = self.conn.session() ssn2 = self.conn.session() @@ -248,8 +255,8 @@ class ConnectionTests(Base): class hangable: - def __init__(self, host, port): - self.tcp = TRANSPORTS["tcp"](host, port) + def __init__(self, conn, host, port): + self.tcp = TRANSPORTS["tcp"](conn, host, port) self.hung = False def hang(self): @@ -1179,6 +1186,16 @@ test-link-bindings-queue; { snd.send(m) self.drain(qrcv, expected=msgs) + def testAssert1(self): + try: + snd = self.ssn.sender("amq.topic; {assert: always, node: {type: queue}}") + assert 0, "assertion failed to trigger" + except AssertionFailed, e: + pass + + def testAssert2(self): + snd = self.ssn.sender("amq.topic; {assert: always}") + NOSUCH_Q = "this-queue-should-not-exist" UNPARSEABLE_ADDR = "name/subject; {bad options" UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" diff --git a/python/qpid/util.py b/python/qpid/util.py index e62bebdf24..89677289e2 100644 --- a/python/qpid/util.py +++ b/python/qpid/util.py @@ -39,12 +39,17 @@ except ImportError: self.sock.close() def connect(host, port): - sock = socket.socket() - sock.connect((host, port)) - sock.setblocking(1) - # XXX: we could use this on read, but we'd have to put write in a - # loop as well - # sock.settimeout(1) + for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): + af, socktype, proto, canonname, sa = res + sock = socket.socket(af, socktype, proto) + try: + sock.connect(sa) + break + except socket.error, msg: + sock.close + else: + # If we got here then we couldn't connect (yet) + raise return sock def listen(host, port, predicate = lambda: True, bound = lambda: None): @@ -101,15 +106,23 @@ def fill(text, indent, heading = None): class URL: RE = re.compile(r""" - # [ <scheme>:// ] [ <user> [ / <password> ] @] <host> [ :<port> ] - ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: / ([^:/@]+) )? @)? ([^@:/]+) (?: :([0-9]+))?$ -""", re.X) + # [ <scheme>:// ] [ <user> [ / <password> ] @] ( <host4> | \[ <host6> \] ) [ :<port> ] + ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: / ([^:/@]+) )? @)? (?: ([^@:/\[]+) | \[ ([a-f0-9:.]+) \] ) (?: :([0-9]+))?$ +""", re.X | re.I) AMQPS = "amqps" AMQP = "amqp" - def __init__(self, s): - if isinstance(s, URL): + def __init__(self, s=None, **kwargs): + if s is None: + self.scheme = kwargs.get('scheme', None) + self.user = kwargs.get('user', None) + self.password = kwargs.get('password', None) + self.host = kwargs.get('host', None) + self.port = kwargs.get('port', None) + if self.host is None: + raise ValueError('Host required for url') + elif isinstance(s, URL): self.scheme = s.scheme self.user = s.user self.password = s.password @@ -119,7 +132,8 @@ class URL: match = URL.RE.match(s) if match is None: raise ValueError(s) - self.scheme, self.user, self.password, self.host, port = match.groups() + self.scheme, self.user, self.password, host4, host6, port = match.groups() + self.host = host4 or host6 if port is None: self.port = None else: @@ -137,11 +151,25 @@ class URL: if self.password: s += "/%s" % self.password s += "@" - s += self.host + if ':' not in self.host: + s += self.host + else: + s += "[%s]" % self.host if self.port: s += ":%s" % self.port return s + def __eq__(self, url): + if isinstance(url, basestring): + url = URL(url) + return \ + self.scheme==url.scheme and \ + self.user==url.user and self.password==url.password and \ + self.host==url.host and self.port==url.port + + def __ne__(self, url): + return not self.__eq__(url) + def default(value, default): if value is None: return default |