summaryrefslogtreecommitdiff
path: root/python/qpid/connection.py
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-03-16 20:26:11 +0000
committerAlan Conway <aconway@apache.org>2007-03-16 20:26:11 +0000
commit55a530448b4107edcb3bb8543b562c7208080995 (patch)
tree23be2798e546f641ff4652f7255c090a39cd3010 /python/qpid/connection.py
parentf3cb9466b4b969747f97ab6716964179db96f124 (diff)
downloadqpid-python-55a530448b4107edcb3bb8543b562c7208080995.tar.gz
Merged revisions 496593 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9 ........ r496593 | rhs | 2007-01-16 00:28:25 -0500 (Tue, 16 Jan 2007) | 1 line 0-9 request/response framing for python ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@519129 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/connection.py')
-rw-r--r--python/qpid/connection.py204
1 files changed, 133 insertions, 71 deletions
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
index 0b788e091b..75fb134760 100644
--- a/python/qpid/connection.py
+++ b/python/qpid/connection.py
@@ -25,7 +25,7 @@ server, or even a proxy implementation.
import socket, codec,logging
from cStringIO import StringIO
-from spec import load, pythonize
+from spec import load
from codec import EOF
class SockIO:
@@ -53,19 +53,27 @@ class SockIO:
def flush(self):
pass
+def connect(host, port):
+ sock = socket.socket()
+ sock.connect((host, port))
+ sock.setblocking(1)
+ return SockIO(sock)
+
+def listen(host, port, predicate = lambda: True):
+ sock = socket.socket()
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((host, port))
+ sock.listen(5)
+ while predicate():
+ s, a = sock.accept()
+ yield SockIO(s)
+
class Connection:
- def __init__(self, host, port, spec):
- self.host = host
- self.port = port
+ def __init__(self, io, spec):
+ self.codec = codec.Codec(io)
self.spec = spec
- self.FRAME_END = self.spec.constants.bypyname["frame_end"].id
-
- def connect(self):
- sock = socket.socket()
- sock.connect((self.host, self.port))
- sock.setblocking(1)
- self.codec = codec.Codec(SockIO(sock))
+ self.FRAME_END = self.spec.constants.byname["frame_end"].id
def flush(self):
self.codec.flush()
@@ -76,53 +84,55 @@ class Connection:
self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major,
self.spec.minor)
+ def tini(self):
+ self.codec.unpack(Connection.INIT)
+
def write(self, frame):
c = self.codec
- c.encode_octet(self.spec.constants.bypyname[frame.payload.type].id)
+ c.encode_octet(self.spec.constants.byname[frame.type].id)
c.encode_short(frame.channel)
- frame.payload.encode(c)
+ body = StringIO()
+ enc = codec.Codec(body)
+ frame.encode(enc)
+ enc.flush()
+ c.encode_longstr(body.getvalue())
c.encode_octet(self.FRAME_END)
def read(self):
c = self.codec
- type = pythonize(self.spec.constants.byid[c.decode_octet()].name)
+ type = self.spec.constants.byid[c.decode_octet()].name
channel = c.decode_short()
- payload = Frame.DECODERS[type].decode(self.spec, c)
+ body = c.decode_longstr()
+ dec = codec.Codec(StringIO(body))
+ frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
+ frame.channel = channel
end = c.decode_octet()
if end != self.FRAME_END:
- raise "frame error: expected %r, got %r" % (self.FRAME_END, end)
- frame = Frame(channel, payload)
+ garbage = ""
+ while end != self.FRAME_END:
+ garbage += chr(end)
+ end = c.decode_octet()
+ raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
return frame
class Frame:
- METHOD = "frame_method"
- HEADER = "frame_header"
- BODY = "frame_body"
- OOB_METHOD = "frame_oob_method"
- OOB_HEADER = "frame_oob_header"
- OOB_BODY = "frame_oob_body"
- TRACE = "frame_trace"
- HEARTBEAT = "frame_heartbeat"
-
DECODERS = {}
- def __init__(self, channel, payload):
- self.channel = channel
- self.payload = payload
-
- def __str__(self):
- return "[%d] %s" % (self.channel, self.payload)
-
-class Payload:
-
class __metaclass__(type):
def __new__(cls, name, bases, dict):
- for req in ("encode", "decode", "type"):
- if not dict.has_key(req):
- raise TypeError("%s must define %s" % (name, req))
+ for attr in ("encode", "decode", "type"):
+ if not dict.has_key(attr):
+ raise TypeError("%s must define %s" % (name, attr))
dict["decode"] = staticmethod(dict["decode"])
+ if dict.has_key("__init__"):
+ __init__ = dict["__init__"]
+ def init(self, *args, **kwargs):
+ args = list(args)
+ self.init(args, kwargs)
+ __init__(self, *args, **kwargs)
+ dict["__init__"] = init
t = type.__new__(cls, name, bases, dict)
if t.type != None:
Frame.DECODERS[t.type] = t
@@ -130,50 +140,100 @@ class Payload:
type = None
+ def init(self, args, kwargs):
+ self.channel = kwargs.pop("channel", 0)
+
def encode(self, enc): abstract
- def decode(spec, dec): abstract
+ def decode(spec, dec, size): abstract
-class Method(Payload):
+class Method(Frame):
- type = Frame.METHOD
+ type = "frame_method"
- def __init__(self, method, *args):
+ def __init__(self, method, args):
if len(args) != len(method.fields):
- argspec = ["%s: %s" % (pythonize(f.name), f.type)
+ argspec = ["%s: %s" % (f.name, f.type)
for f in method.fields]
raise TypeError("%s.%s expecting (%s), got %s" %
- (pythonize(method.klass.name),
- pythonize(method.name), ", ".join(argspec), args))
+ (method.klass.name, method.name, ", ".join(argspec),
+ args))
self.method = method
+ self.method_type = method
self.args = args
- def encode(self, enc):
- buf = StringIO()
- c = codec.Codec(buf)
+ def encode(self, c):
c.encode_short(self.method.klass.id)
c.encode_short(self.method.id)
for field, arg in zip(self.method.fields, self.args):
c.encode(field.type, arg)
- c.flush()
- enc.encode_longstr(buf.getvalue())
- def decode(spec, dec):
- enc = dec.decode_longstr()
- c = codec.Codec(StringIO(enc))
+ def decode(spec, c, size):
klass = spec.classes.byid[c.decode_short()]
meth = klass.methods.byid[c.decode_short()]
args = tuple([c.decode(f.type) for f in meth.fields])
- return Method(meth, *args)
+ return Method(meth, args)
def __str__(self):
- return "%s %s" % (self.method, ", ".join([str(a) for a in self.args]))
+ return "[%s] %s %s" % (self.channel, self.method,
+ ", ".join([str(a) for a in self.args]))
+
+class Request(Frame):
-class Header(Payload):
+ type = "frame_request"
- type = Frame.HEADER
+ def __init__(self, id, response_mark, method):
+ self.id = id
+ self.response_mark = response_mark
+ self.method = method
+ self.method_type = method.method_type
+ self.args = method.args
+
+ def encode(self, enc):
+ enc.encode_longlong(self.id)
+ enc.encode_longlong(self.response_mark)
+ # reserved
+ enc.encode_long(0)
+ self.method.encode(enc)
+
+ def decode(spec, dec, size):
+ id = dec.decode_longlong()
+ mark = dec.decode_longlong()
+ # reserved
+ dec.decode_long()
+ method = Method.decode(spec, dec, size - 20)
+ return Request(id, mark, method)
+
+class Response(Frame):
+
+ type = "frame_response"
+
+ def __init__(self, id, request_id, batch_offset, method):
+ self.id = id
+ self.request_id = request_id
+ self.batch_offset = batch_offset
+ self.method = method
+ self.method_type = method.method_type
+ self.args = method.args
+
+ def encode(self, enc):
+ enc.encode_longlong(self.id)
+ enc.encode_longlong(self.request_id)
+ enc.encode_long(self.batch_offset)
+ self.method.encode(enc)
- def __init__(self, klass, weight, size, **properties):
+ def decode(spec, dec, size):
+ id = dec.decode_longlong()
+ request_id = dec.decode_longlong()
+ batch_offset = dec.decode_long()
+ method = Method.decode(spec, dec, size - 20)
+ return Response(id, request_id, batch_offset, method)
+
+class Header(Frame):
+
+ type = "frame_header"
+
+ def __init__(self, klass, weight, size, properties):
self.klass = klass
self.weight = weight
self.size = size
@@ -188,9 +248,7 @@ class Header(Payload):
def __delitem__(self, name):
del self.properties[name]
- def encode(self, enc):
- buf = StringIO()
- c = codec.Codec(buf)
+ def encode(self, c):
c.encode_short(self.klass.id)
c.encode_short(self.weight)
c.encode_longlong(self.size)
@@ -218,11 +276,8 @@ class Header(Payload):
v = self.properties.get(f.name)
if v != None:
c.encode(f.type, v)
- c.flush()
- enc.encode_longstr(buf.getvalue())
- def decode(spec, dec):
- c = codec.Codec(StringIO(dec.decode_longstr()))
+ def decode(spec, c, size):
klass = spec.classes.byid[c.decode_short()]
weight = c.decode_short()
size = c.decode_longlong()
@@ -247,24 +302,31 @@ class Header(Payload):
# plain '' strings can be used as keywords so we need to
# stringify the names.
properties[str(f.name)] = c.decode(f.type)
- return Header(klass, weight, size, **properties)
+ return Header(klass, weight, size, properties)
def __str__(self):
return "%s %s %s %s" % (self.klass, self.weight, self.size,
self.properties)
-class Body(Payload):
+class Body(Frame):
- type = Frame.BODY
+ type = "frame_body"
def __init__(self, content):
self.content = content
def encode(self, enc):
- enc.encode_longstr(self.content)
+ enc.write(self.content)
- def decode(spec, dec):
- return Body(dec.decode_longstr())
+ def decode(spec, dec, size):
+ return Body(dec.read(size))
def __str__(self):
return "Body(%r)" % self.content
+
+# TODO:
+# OOB_METHOD = "frame_oob_method"
+# OOB_HEADER = "frame_oob_header"
+# OOB_BODY = "frame_oob_body"
+# TRACE = "frame_trace"
+# HEARTBEAT = "frame_heartbeat"