From 55a530448b4107edcb3bb8543b562c7208080995 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 16 Mar 2007 20:26:11 +0000 Subject: Merged revisions 496593 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9 ........ r496593 | rhs | 2007-01-16 00:28:25 -0500 (Tue, 16 Jan 2007) | 1 line 0-9 request/response framing for python ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@519129 13f79535-47bb-0310-9956-ffa450edef68 --- python/qpid/connection.py | 204 ++++++++++++++++++++++++++++++---------------- 1 file changed, 133 insertions(+), 71 deletions(-) (limited to 'python/qpid/connection.py') diff --git a/python/qpid/connection.py b/python/qpid/connection.py index 0b788e091b..75fb134760 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -25,7 +25,7 @@ server, or even a proxy implementation. import socket, codec,logging from cStringIO import StringIO -from spec import load, pythonize +from spec import load from codec import EOF class SockIO: @@ -53,19 +53,27 @@ class SockIO: def flush(self): pass +def connect(host, port): + sock = socket.socket() + sock.connect((host, port)) + sock.setblocking(1) + return SockIO(sock) + +def listen(host, port, predicate = lambda: True): + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((host, port)) + sock.listen(5) + while predicate(): + s, a = sock.accept() + yield SockIO(s) + class Connection: - def __init__(self, host, port, spec): - self.host = host - self.port = port + def __init__(self, io, spec): + self.codec = codec.Codec(io) self.spec = spec - self.FRAME_END = self.spec.constants.bypyname["frame_end"].id - - def connect(self): - sock = socket.socket() - sock.connect((self.host, self.port)) - sock.setblocking(1) - self.codec = codec.Codec(SockIO(sock)) + self.FRAME_END = self.spec.constants.byname["frame_end"].id def flush(self): self.codec.flush() @@ -76,53 +84,55 @@ class Connection: self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major, self.spec.minor) + def tini(self): + self.codec.unpack(Connection.INIT) + def write(self, frame): c = self.codec - c.encode_octet(self.spec.constants.bypyname[frame.payload.type].id) + c.encode_octet(self.spec.constants.byname[frame.type].id) c.encode_short(frame.channel) - frame.payload.encode(c) + body = StringIO() + enc = codec.Codec(body) + frame.encode(enc) + enc.flush() + c.encode_longstr(body.getvalue()) c.encode_octet(self.FRAME_END) def read(self): c = self.codec - type = pythonize(self.spec.constants.byid[c.decode_octet()].name) + type = self.spec.constants.byid[c.decode_octet()].name channel = c.decode_short() - payload = Frame.DECODERS[type].decode(self.spec, c) + body = c.decode_longstr() + dec = codec.Codec(StringIO(body)) + frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) + frame.channel = channel end = c.decode_octet() if end != self.FRAME_END: - raise "frame error: expected %r, got %r" % (self.FRAME_END, end) - frame = Frame(channel, payload) + garbage = "" + while end != self.FRAME_END: + garbage += chr(end) + end = c.decode_octet() + raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage) return frame class Frame: - METHOD = "frame_method" - HEADER = "frame_header" - BODY = "frame_body" - OOB_METHOD = "frame_oob_method" - OOB_HEADER = "frame_oob_header" - OOB_BODY = "frame_oob_body" - TRACE = "frame_trace" - HEARTBEAT = "frame_heartbeat" - DECODERS = {} - def __init__(self, channel, payload): - self.channel = channel - self.payload = payload - - def __str__(self): - return "[%d] %s" % (self.channel, self.payload) - -class Payload: - class __metaclass__(type): def __new__(cls, name, bases, dict): - for req in ("encode", "decode", "type"): - if not dict.has_key(req): - raise TypeError("%s must define %s" % (name, req)) + for attr in ("encode", "decode", "type"): + if not dict.has_key(attr): + raise TypeError("%s must define %s" % (name, attr)) dict["decode"] = staticmethod(dict["decode"]) + if dict.has_key("__init__"): + __init__ = dict["__init__"] + def init(self, *args, **kwargs): + args = list(args) + self.init(args, kwargs) + __init__(self, *args, **kwargs) + dict["__init__"] = init t = type.__new__(cls, name, bases, dict) if t.type != None: Frame.DECODERS[t.type] = t @@ -130,50 +140,100 @@ class Payload: type = None + def init(self, args, kwargs): + self.channel = kwargs.pop("channel", 0) + def encode(self, enc): abstract - def decode(spec, dec): abstract + def decode(spec, dec, size): abstract -class Method(Payload): +class Method(Frame): - type = Frame.METHOD + type = "frame_method" - def __init__(self, method, *args): + def __init__(self, method, args): if len(args) != len(method.fields): - argspec = ["%s: %s" % (pythonize(f.name), f.type) + argspec = ["%s: %s" % (f.name, f.type) for f in method.fields] raise TypeError("%s.%s expecting (%s), got %s" % - (pythonize(method.klass.name), - pythonize(method.name), ", ".join(argspec), args)) + (method.klass.name, method.name, ", ".join(argspec), + args)) self.method = method + self.method_type = method self.args = args - def encode(self, enc): - buf = StringIO() - c = codec.Codec(buf) + def encode(self, c): c.encode_short(self.method.klass.id) c.encode_short(self.method.id) for field, arg in zip(self.method.fields, self.args): c.encode(field.type, arg) - c.flush() - enc.encode_longstr(buf.getvalue()) - def decode(spec, dec): - enc = dec.decode_longstr() - c = codec.Codec(StringIO(enc)) + def decode(spec, c, size): klass = spec.classes.byid[c.decode_short()] meth = klass.methods.byid[c.decode_short()] args = tuple([c.decode(f.type) for f in meth.fields]) - return Method(meth, *args) + return Method(meth, args) def __str__(self): - return "%s %s" % (self.method, ", ".join([str(a) for a in self.args])) + return "[%s] %s %s" % (self.channel, self.method, + ", ".join([str(a) for a in self.args])) + +class Request(Frame): -class Header(Payload): + type = "frame_request" - type = Frame.HEADER + def __init__(self, id, response_mark, method): + self.id = id + self.response_mark = response_mark + self.method = method + self.method_type = method.method_type + self.args = method.args + + def encode(self, enc): + enc.encode_longlong(self.id) + enc.encode_longlong(self.response_mark) + # reserved + enc.encode_long(0) + self.method.encode(enc) + + def decode(spec, dec, size): + id = dec.decode_longlong() + mark = dec.decode_longlong() + # reserved + dec.decode_long() + method = Method.decode(spec, dec, size - 20) + return Request(id, mark, method) + +class Response(Frame): + + type = "frame_response" + + def __init__(self, id, request_id, batch_offset, method): + self.id = id + self.request_id = request_id + self.batch_offset = batch_offset + self.method = method + self.method_type = method.method_type + self.args = method.args + + def encode(self, enc): + enc.encode_longlong(self.id) + enc.encode_longlong(self.request_id) + enc.encode_long(self.batch_offset) + self.method.encode(enc) - def __init__(self, klass, weight, size, **properties): + def decode(spec, dec, size): + id = dec.decode_longlong() + request_id = dec.decode_longlong() + batch_offset = dec.decode_long() + method = Method.decode(spec, dec, size - 20) + return Response(id, request_id, batch_offset, method) + +class Header(Frame): + + type = "frame_header" + + def __init__(self, klass, weight, size, properties): self.klass = klass self.weight = weight self.size = size @@ -188,9 +248,7 @@ class Header(Payload): def __delitem__(self, name): del self.properties[name] - def encode(self, enc): - buf = StringIO() - c = codec.Codec(buf) + def encode(self, c): c.encode_short(self.klass.id) c.encode_short(self.weight) c.encode_longlong(self.size) @@ -218,11 +276,8 @@ class Header(Payload): v = self.properties.get(f.name) if v != None: c.encode(f.type, v) - c.flush() - enc.encode_longstr(buf.getvalue()) - def decode(spec, dec): - c = codec.Codec(StringIO(dec.decode_longstr())) + def decode(spec, c, size): klass = spec.classes.byid[c.decode_short()] weight = c.decode_short() size = c.decode_longlong() @@ -247,24 +302,31 @@ class Header(Payload): # plain '' strings can be used as keywords so we need to # stringify the names. properties[str(f.name)] = c.decode(f.type) - return Header(klass, weight, size, **properties) + return Header(klass, weight, size, properties) def __str__(self): return "%s %s %s %s" % (self.klass, self.weight, self.size, self.properties) -class Body(Payload): +class Body(Frame): - type = Frame.BODY + type = "frame_body" def __init__(self, content): self.content = content def encode(self, enc): - enc.encode_longstr(self.content) + enc.write(self.content) - def decode(spec, dec): - return Body(dec.decode_longstr()) + def decode(spec, dec, size): + return Body(dec.read(size)) def __str__(self): return "Body(%r)" % self.content + +# TODO: +# OOB_METHOD = "frame_oob_method" +# OOB_HEADER = "frame_oob_header" +# OOB_BODY = "frame_oob_body" +# TRACE = "frame_trace" +# HEARTBEAT = "frame_heartbeat" -- cgit v1.2.1