diff options
Diffstat (limited to 'python/qpid/connection08.py')
-rw-r--r-- | python/qpid/connection08.py | 505 |
1 files changed, 0 insertions, 505 deletions
diff --git a/python/qpid/connection08.py b/python/qpid/connection08.py deleted file mode 100644 index 654148dad2..0000000000 --- a/python/qpid/connection08.py +++ /dev/null @@ -1,505 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -A Connection class containing socket code that uses the spec metadata -to read and write Frame objects. This could be used by a client, -server, or even a proxy implementation. -""" - -import socket, codec, logging, qpid -from cStringIO import StringIO -from codec import EOF -from compat import SHUT_RDWR -from exceptions import VersionError - -class SockIO: - - def __init__(self, sock): - self.sock = sock - - def write(self, buf): -# print "OUT: %r" % buf - self.sock.sendall(buf) - - def read(self, n): - data = "" - while len(data) < n: - try: - s = self.sock.recv(n - len(data)) - except socket.error: - break - if len(s) == 0: - break -# print "IN: %r" % s - data += s - return data - - def flush(self): - pass - - def close(self): - self.sock.shutdown(SHUT_RDWR) - self.sock.close() - -def connect(host, port): - sock = socket.socket() - sock.connect((host, port)) - sock.setblocking(1) - return SockIO(sock) - -def listen(host, port, predicate = lambda: True): - sock = socket.socket() - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind((host, port)) - sock.listen(5) - while predicate(): - s, a = sock.accept() - yield SockIO(s) - -class FramingError(Exception): - pass - -class Connection: - - def __init__(self, io, spec): - self.codec = codec.Codec(io, spec) - self.spec = spec - self.FRAME_END = self.spec.constants.byname["frame_end"].id - self.write = getattr(self, "write_%s_%s" % (self.spec.major, self.spec.minor)) - self.read = getattr(self, "read_%s_%s" % (self.spec.major, self.spec.minor)) - - def flush(self): - self.codec.flush() - - INIT="!4s4B" - - def init(self): - self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major, - self.spec.minor) - - def tini(self): - self.codec.unpack(Connection.INIT) - - def write_8_0(self, frame): - c = self.codec - c.encode_octet(self.spec.constants.byname[frame.type].id) - c.encode_short(frame.channel) - body = StringIO() - enc = codec.Codec(body, self.spec) - frame.encode(enc) - enc.flush() - c.encode_longstr(body.getvalue()) - c.encode_octet(self.FRAME_END) - - def read_8_0(self): - c = self.codec - tid = c.decode_octet() - try: - type = self.spec.constants.byid[tid].name - except KeyError: - if tid == ord('A') and c.unpack("!3s") == "MQP": - _, _, major, minor = c.unpack("4B") - raise VersionError("client: %s-%s, server: %s-%s" % - (self.spec.major, self.spec.minor, major, minor)) - else: - raise FramingError("unknown frame type: %s" % tid) - channel = c.decode_short() - body = c.decode_longstr() - dec = codec.Codec(StringIO(body), self.spec) - frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) - frame.channel = channel - end = c.decode_octet() - if end != self.FRAME_END: - garbage = "" - while end != self.FRAME_END: - garbage += chr(end) - end = c.decode_octet() - raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage) - return frame - - def write_0_9(self, frame): - self.write_8_0(frame) - - def read_0_9(self): - return self.read_8_0() - - def write_0_10(self, frame): - c = self.codec - flags = 0 - if frame.bof: flags |= 0x08 - if frame.eof: flags |= 0x04 - if frame.bos: flags |= 0x02 - if frame.eos: flags |= 0x01 - - c.encode_octet(flags) # TODO: currently fixed at ver=0, B=E=b=e=1 - c.encode_octet(self.spec.constants.byname[frame.type].id) - body = StringIO() - enc = codec.Codec(body, self.spec) - frame.encode(enc) - enc.flush() - frame_size = len(body.getvalue()) + 12 # TODO: Magic number (frame header size) - c.encode_short(frame_size) - c.encode_octet(0) # Reserved - c.encode_octet(frame.subchannel & 0x0f) - c.encode_short(frame.channel) - c.encode_long(0) # Reserved - c.write(body.getvalue()) - c.encode_octet(self.FRAME_END) - - def read_0_10(self): - c = self.codec - flags = c.decode_octet() # TODO: currently ignoring flags - framing_version = (flags & 0xc0) >> 6 - if framing_version != 0: - raise "frame error: unknown framing version" - type = self.spec.constants.byid[c.decode_octet()].name - frame_size = c.decode_short() - if frame_size < 12: # TODO: Magic number (frame header size) - raise "frame error: frame size too small" - reserved1 = c.decode_octet() - field = c.decode_octet() - subchannel = field & 0x0f - channel = c.decode_short() - reserved2 = c.decode_long() # TODO: reserved maybe need to ensure 0 - if (flags & 0x30) != 0 or reserved1 != 0 or (field & 0xf0) != 0: - raise "frame error: reserved bits not all zero" - body_size = frame_size - 12 # TODO: Magic number (frame header size) - body = c.read(body_size) - dec = codec.Codec(StringIO(body), self.spec) - try: - frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) - except EOF: - raise "truncated frame body: %r" % body - frame.channel = channel - frame.subchannel = subchannel - end = c.decode_octet() - if end != self.FRAME_END: - garbage = "" - while end != self.FRAME_END: - garbage += chr(end) - end = c.decode_octet() - raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage) - return frame - - def write_99_0(self, frame): - self.write_0_10(frame) - - def read_99_0(self): - return self.read_0_10() - -class Frame: - - DECODERS = {} - - class __metaclass__(type): - - def __new__(cls, name, bases, dict): - for attr in ("encode", "decode", "type"): - if not dict.has_key(attr): - raise TypeError("%s must define %s" % (name, attr)) - dict["decode"] = staticmethod(dict["decode"]) - if dict.has_key("__init__"): - __init__ = dict["__init__"] - def init(self, *args, **kwargs): - args = list(args) - self.init(args, kwargs) - __init__(self, *args, **kwargs) - dict["__init__"] = init - t = type.__new__(cls, name, bases, dict) - if t.type != None: - Frame.DECODERS[t.type] = t - return t - - type = None - - def init(self, args, kwargs): - self.channel = kwargs.pop("channel", 0) - self.subchannel = kwargs.pop("subchannel", 0) - self.bos = True - self.eos = True - self.bof = True - self.eof = True - - def encode(self, enc): abstract - - def decode(spec, dec, size): abstract - -class Method(Frame): - - type = "frame_method" - - def __init__(self, method, args): - if len(args) != len(method.fields): - argspec = ["%s: %s" % (f.name, f.type) - for f in method.fields] - raise TypeError("%s.%s expecting (%s), got %s" % - (method.klass.name, method.name, ", ".join(argspec), - args)) - self.method = method - self.method_type = method - self.args = args - self.eof = not method.content - - def encode(self, c): - version = (c.spec.major, c.spec.minor) - if version == (0, 10) or version == (99, 0): - c.encode_octet(self.method.klass.id) - c.encode_octet(self.method.id) - else: - c.encode_short(self.method.klass.id) - c.encode_short(self.method.id) - for field, arg in zip(self.method.fields, self.args): - c.encode(field.type, arg) - - def decode(spec, c, size): - version = (c.spec.major, c.spec.minor) - if version == (0, 10) or version == (99, 0): - klass = spec.classes.byid[c.decode_octet()] - meth = klass.methods.byid[c.decode_octet()] - else: - klass = spec.classes.byid[c.decode_short()] - meth = klass.methods.byid[c.decode_short()] - args = tuple([c.decode(f.type) for f in meth.fields]) - return Method(meth, args) - - def __str__(self): - return "[%s] %s %s" % (self.channel, self.method, - ", ".join([str(a) for a in self.args])) - -class Request(Frame): - - type = "frame_request" - - def __init__(self, id, response_mark, method): - self.id = id - self.response_mark = response_mark - self.method = method - self.method_type = method.method_type - self.args = method.args - - def encode(self, enc): - enc.encode_longlong(self.id) - enc.encode_longlong(self.response_mark) - # reserved - enc.encode_long(0) - self.method.encode(enc) - - def decode(spec, dec, size): - id = dec.decode_longlong() - mark = dec.decode_longlong() - # reserved - dec.decode_long() - method = Method.decode(spec, dec, size - 20) - return Request(id, mark, method) - - def __str__(self): - return "[%s] Request(%s) %s" % (self.channel, self.id, self.method) - -class Response(Frame): - - type = "frame_response" - - def __init__(self, id, request_id, batch_offset, method): - self.id = id - self.request_id = request_id - self.batch_offset = batch_offset - self.method = method - self.method_type = method.method_type - self.args = method.args - - def encode(self, enc): - enc.encode_longlong(self.id) - enc.encode_longlong(self.request_id) - enc.encode_long(self.batch_offset) - self.method.encode(enc) - - def decode(spec, dec, size): - id = dec.decode_longlong() - request_id = dec.decode_longlong() - batch_offset = dec.decode_long() - method = Method.decode(spec, dec, size - 20) - return Response(id, request_id, batch_offset, method) - - def __str__(self): - return "[%s] Response(%s,%s,%s) %s" % (self.channel, self.id, self.request_id, self.batch_offset, self.method) - -def uses_struct_encoding(spec): - return (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0) - -class Header(Frame): - - type = "frame_header" - - def __init__(self, klass, weight, size, properties): - self.klass = klass - self.weight = weight - self.size = size - self.properties = properties - self.eof = size == 0 - self.bof = False - - def __getitem__(self, name): - return self.properties[name] - - def __setitem__(self, name, value): - self.properties[name] = value - - def __delitem__(self, name): - del self.properties[name] - - def encode(self, c): - if uses_struct_encoding(c.spec): - self.encode_structs(c) - else: - self.encode_legacy(c) - - def encode_structs(self, c): - # XXX - structs = [qpid.Struct(c.spec.domains.byname["delivery_properties"].type), - qpid.Struct(c.spec.domains.byname["message_properties"].type)] - - # XXX - props = self.properties.copy() - for k in self.properties: - for s in structs: - if s.exists(k): - s.set(k, props.pop(k)) - if props: - raise TypeError("no such property: %s" % (", ".join(props))) - - # message properties store the content-length now, and weight is - # deprecated - if self.size != None: - structs[1].content_length = self.size - - for s in structs: - c.encode_long_struct(s) - - def encode_legacy(self, c): - c.encode_short(self.klass.id) - c.encode_short(self.weight) - c.encode_longlong(self.size) - - # property flags - nprops = len(self.klass.fields) - flags = 0 - for i in range(nprops): - f = self.klass.fields.items[i] - flags <<= 1 - if self.properties.get(f.name) != None: - flags |= 1 - # the last bit indicates more flags - if i > 0 and (i % 15) == 0: - flags <<= 1 - if nprops > (i + 1): - flags |= 1 - c.encode_short(flags) - flags = 0 - flags <<= ((16 - (nprops % 15)) % 16) - c.encode_short(flags) - - # properties - for f in self.klass.fields: - v = self.properties.get(f.name) - if v != None: - c.encode(f.type, v) - - def decode(spec, c, size): - if uses_struct_encoding(spec): - return Header.decode_structs(spec, c, size) - else: - return Header.decode_legacy(spec, c, size) - - def decode_structs(spec, c, size): - structs = [] - start = c.nread - while c.nread - start < size: - structs.append(c.decode_long_struct()) - - # XXX - props = {} - length = None - for s in structs: - for f in s.type.fields: - if s.has(f.name): - props[f.name] = s.get(f.name) - if f.name == "content_length": - length = s.get(f.name) - return Header(None, 0, length, props) - - decode_structs = staticmethod(decode_structs) - - def decode_legacy(spec, c, size): - klass = spec.classes.byid[c.decode_short()] - weight = c.decode_short() - size = c.decode_longlong() - - # property flags - bits = [] - while True: - flags = c.decode_short() - for i in range(15, 0, -1): - if flags >> i & 0x1 != 0: - bits.append(True) - else: - bits.append(False) - if flags & 0x1 == 0: - break - - # properties - properties = {} - for b, f in zip(bits, klass.fields): - if b: - # Note: decode returns a unicode u'' string but only - # plain '' strings can be used as keywords so we need to - # stringify the names. - properties[str(f.name)] = c.decode(f.type) - return Header(klass, weight, size, properties) - - decode_legacy = staticmethod(decode_legacy) - - def __str__(self): - return "%s %s %s %s" % (self.klass, self.weight, self.size, - self.properties) - -class Body(Frame): - - type = "frame_body" - - def __init__(self, content): - self.content = content - self.eof = True - self.bof = False - - def encode(self, enc): - enc.write(self.content) - - def decode(spec, dec, size): - return Body(dec.read(size)) - - def __str__(self): - return "Body(%r)" % self.content - -# TODO: -# OOB_METHOD = "frame_oob_method" -# OOB_HEADER = "frame_oob_header" -# OOB_BODY = "frame_oob_body" -# TRACE = "frame_trace" -# HEARTBEAT = "frame_heartbeat" |