diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 01:19:00 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 01:19:00 +0000 |
commit | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (patch) | |
tree | dcfb94e75656c6c239fc3dcb754cd2015126424d /python | |
parent | 5eb354b338bb8d8fcd35b6ac3fb33f8103e757c3 (diff) | |
download | qpid-python-ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5.tar.gz |
Undo bad merge from trunk - merged at wrong level.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187150 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r-- | python/qpid/client.py | 2 | ||||
-rw-r--r-- | python/qpid/codec010.py | 17 | ||||
-rw-r--r-- | python/qpid/messaging/driver.py | 15 | ||||
-rw-r--r-- | python/qpid/messaging/endpoints.py | 5 | ||||
-rw-r--r-- | python/qpid/messaging/transports.py | 18 | ||||
-rw-r--r-- | python/qpid/tests/messaging/endpoints.py | 25 | ||||
-rw-r--r-- | python/qpid/util.py | 54 | ||||
-rwxr-xr-x | python/setup.py | 2 |
8 files changed, 39 insertions, 99 deletions
diff --git a/python/qpid/client.py b/python/qpid/client.py index 5a877bb8d6..45ce8498e8 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -106,7 +106,7 @@ class Client: try: id = None for i in xrange(1, 64*1024): - if not self.sessions.has_key(i): + if not self.sessions.has_key(id): id = i break finally: diff --git a/python/qpid/codec010.py b/python/qpid/codec010.py index 94a1cd4263..0846db6bf7 100644 --- a/python/qpid/codec010.py +++ b/python/qpid/codec010.py @@ -17,7 +17,7 @@ # under the License. # -import datetime, string +import datetime from packer import Packer from datatypes import serial, timestamp, RangedSet, Struct, UUID from ops import Compound, PRIMITIVE, COMPOUND @@ -241,20 +241,15 @@ class Codec(Packer): v = sc.read_primitive(type) result[k] = v return result - - def _write_map_elem(self, k, v): - type = self.encoding(v) - sc = StringCodec() - sc.write_str8(k) - sc.write_uint8(type.CODE) - sc.write_primitive(type, v) - return sc.encoded - def write_map(self, m): sc = StringCodec() if m is not None: sc.write_uint32(len(m)) - sc.write(string.joinfields(map(self._write_map_elem, m.keys(), m.values()), "")) + for k, v in m.items(): + type = self.encoding(v) + sc.write_str8(k) + sc.write_uint8(type.CODE) + sc.write_primitive(type, v) self.write_vbin32(sc.encoded) def read_array(self): diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index 7c21388213..2eb2c1863e 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -66,7 +66,7 @@ class Attachment: # XXX -DURABLE_DEFAULT=False +DURABLE_DEFAULT=True # XXX @@ -526,7 +526,7 @@ class Driver: rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) trans = transports.TRANSPORTS.get(self.connection.transport) if trans: - self._transport = trans(self.connection, host, port) + self._transport = trans(host, port) else: raise ConnectError("no such transport: %s" % self.connection.transport) if self._retrying and self._reconnect_log: @@ -828,9 +828,8 @@ class Engine: self._closing = True def attach(self, ssn): - if ssn.closed: return sst = self._attachments.get(ssn) - if sst is None: + if sst is None and not ssn.closed: for i in xrange(0, self.channel_max): if not self._sessions.has_key(i): ch = i @@ -931,7 +930,6 @@ class Engine: def resolve_declare(self, sst, lnk, dir, action): declare = lnk.options.get("create") in ("always", dir) - assrt = lnk.options.get("assert") in ("always", dir) def do_resolved(type, subtype): err = None if type is None: @@ -940,12 +938,7 @@ class Engine: else: err = NotFound(text="no such queue: %s" % lnk.name) else: - if assrt: - expected = lnk.options.get("node", {}).get("type") - if expected and type != expected: - err = AssertionFailed(text="expected %s, got %s" % (expected, type)) - if err is None: - action(type, subtype) + action(type, subtype) if err: tgt = lnk.target diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index 338ac70ecf..b8101b76e6 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -158,7 +158,6 @@ class Connection(Endpoint): self.reconnect_log = options.get("reconnect_log", True) self.address_ttl = options.get("address_ttl", 60) - self.tcp_nodelay = options.get("tcp_nodelay", False) self.options = options @@ -198,7 +197,7 @@ class Connection(Endpoint): return result def check_closed(self): - if not self._connected: + if self.closed: self._condition.gc() raise ConnectionClosed() @@ -1007,9 +1006,9 @@ class Receiver(Endpoint, object): self.draining = True self._wakeup() self._ecwait(lambda: not self.draining) - msg = self.session._get(self, timeout=0) self._grant() self._wakeup() + msg = self.session._get(self, timeout=0) if msg is None: raise Empty() elif self._capacity not in (0, UNLIMITED.value): diff --git a/python/qpid/messaging/transports.py b/python/qpid/messaging/transports.py index 7abaae12e8..8133a45604 100644 --- a/python/qpid/messaging/transports.py +++ b/python/qpid/messaging/transports.py @@ -17,23 +17,18 @@ # under the License. # -import socket from qpid.util import connect TRANSPORTS = {} -class SocketTransport: +class tcp: - def __init__(self, conn, host, port): + def __init__(self, host, port): self.socket = connect(host, port) - if conn.tcp_nodelay: - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) def fileno(self): return self.socket.fileno() -class tcp(SocketTransport): - def reading(self, reading): return reading @@ -57,14 +52,17 @@ try: except ImportError: pass else: - class tls(SocketTransport): + class tls: - def __init__(self, conn, host, port): - SocketTransport.__init__(self, conn, host, port) + def __init__(self, host, port): + self.socket = connect(host, port) self.tls = wrap_socket(self.socket) self.socket.setblocking(0) self.state = None + def fileno(self): + return self.socket.fileno() + def reading(self, reading): if self.state is None: return reading diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py index db5ec03df2..c303ca652a 100644 --- a/python/qpid/tests/messaging/endpoints.py +++ b/python/qpid/tests/messaging/endpoints.py @@ -46,10 +46,6 @@ class SetupTests(Base): self.conn.open() self.ping(self.conn.session()) - def testTcpNodelay(self): - self.conn = Connection.establish(self.broker, tcp_nodelay=True) - assert self.conn._driver._transport.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) - def testConnectError(self): try: # Specifying port 0 yields a bad address on Windows; port 4 is unassigned @@ -115,8 +111,8 @@ class SetupTests(Base): class flaky: - def __init__(self, conn, host, port): - self.real = real(conn, host, port) + def __init__(self, host, port): + self.real = real(host, port) self.sent_count = 0 self.recv_count = 0 @@ -190,9 +186,6 @@ class ConnectionTests(Base): def setup_connection(self): return Connection.establish(self.broker, **self.connection_options()) - def testCheckClosed(self): - assert not self.conn.check_closed() - def testSessionAnon(self): ssn1 = self.conn.session() ssn2 = self.conn.session() @@ -255,8 +248,8 @@ class ConnectionTests(Base): class hangable: - def __init__(self, conn, host, port): - self.tcp = TRANSPORTS["tcp"](conn, host, port) + def __init__(self, host, port): + self.tcp = TRANSPORTS["tcp"](host, port) self.hung = False def hang(self): @@ -1186,16 +1179,6 @@ test-link-bindings-queue; { snd.send(m) self.drain(qrcv, expected=msgs) - def testAssert1(self): - try: - snd = self.ssn.sender("amq.topic; {assert: always, node: {type: queue}}") - assert 0, "assertion failed to trigger" - except AssertionFailed, e: - pass - - def testAssert2(self): - snd = self.ssn.sender("amq.topic; {assert: always}") - NOSUCH_Q = "this-queue-should-not-exist" UNPARSEABLE_ADDR = "name/subject; {bad options" UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" diff --git a/python/qpid/util.py b/python/qpid/util.py index 89677289e2..e62bebdf24 100644 --- a/python/qpid/util.py +++ b/python/qpid/util.py @@ -39,17 +39,12 @@ except ImportError: self.sock.close() def connect(host, port): - for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): - af, socktype, proto, canonname, sa = res - sock = socket.socket(af, socktype, proto) - try: - sock.connect(sa) - break - except socket.error, msg: - sock.close - else: - # If we got here then we couldn't connect (yet) - raise + sock = socket.socket() + sock.connect((host, port)) + sock.setblocking(1) + # XXX: we could use this on read, but we'd have to put write in a + # loop as well + # sock.settimeout(1) return sock def listen(host, port, predicate = lambda: True, bound = lambda: None): @@ -106,23 +101,15 @@ def fill(text, indent, heading = None): class URL: RE = re.compile(r""" - # [ <scheme>:// ] [ <user> [ / <password> ] @] ( <host4> | \[ <host6> \] ) [ :<port> ] - ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: / ([^:/@]+) )? @)? (?: ([^@:/\[]+) | \[ ([a-f0-9:.]+) \] ) (?: :([0-9]+))?$ -""", re.X | re.I) + # [ <scheme>:// ] [ <user> [ / <password> ] @] <host> [ :<port> ] + ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: / ([^:/@]+) )? @)? ([^@:/]+) (?: :([0-9]+))?$ +""", re.X) AMQPS = "amqps" AMQP = "amqp" - def __init__(self, s=None, **kwargs): - if s is None: - self.scheme = kwargs.get('scheme', None) - self.user = kwargs.get('user', None) - self.password = kwargs.get('password', None) - self.host = kwargs.get('host', None) - self.port = kwargs.get('port', None) - if self.host is None: - raise ValueError('Host required for url') - elif isinstance(s, URL): + def __init__(self, s): + if isinstance(s, URL): self.scheme = s.scheme self.user = s.user self.password = s.password @@ -132,8 +119,7 @@ class URL: match = URL.RE.match(s) if match is None: raise ValueError(s) - self.scheme, self.user, self.password, host4, host6, port = match.groups() - self.host = host4 or host6 + self.scheme, self.user, self.password, self.host, port = match.groups() if port is None: self.port = None else: @@ -151,25 +137,11 @@ class URL: if self.password: s += "/%s" % self.password s += "@" - if ':' not in self.host: - s += self.host - else: - s += "[%s]" % self.host + s += self.host if self.port: s += ":%s" % self.port return s - def __eq__(self, url): - if isinstance(url, basestring): - url = URL(url) - return \ - self.scheme==url.scheme and \ - self.user==url.user and self.password==url.password and \ - self.host==url.host and self.port==url.port - - def __ne__(self, url): - return not self.__eq__(url) - def default(value, default): if value is None: return default diff --git a/python/setup.py b/python/setup.py index 65cb72854f..9c04421c51 100755 --- a/python/setup.py +++ b/python/setup.py @@ -298,7 +298,7 @@ class install_lib(_install_lib): return outfiles + extra setup(name="qpid-python", - version="0.13", + version="0.9", author="Apache Qpid", author_email="dev@qpid.apache.org", packages=["mllib", "qpid", "qpid.messaging", "qpid.tests", |