diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2007-10-14 02:21:59 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2007-10-14 02:21:59 +0000 |
| commit | e0ba5becd0052cabe0cfa997dd35d7362bf2c472 (patch) | |
| tree | 5c55f1dd6be53c4032be45ba422dacca1ded197f /qpid/python | |
| parent | 2a10deef36eec1b8df9ec52dfb44a72eee7059a8 (diff) | |
| download | qpid-python-e0ba5becd0052cabe0cfa997dd35d7362bf2c472.tar.gz | |
Enabled packed struct encoding in python, cpp, and java. Also fixed computation of required byte credit in Message.cpp.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@584474 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
| -rwxr-xr-x | qpid/python/hello-world | 17 | ||||
| -rw-r--r-- | qpid/python/qpid/__init__.py | 23 | ||||
| -rw-r--r-- | qpid/python/qpid/codec.py | 106 | ||||
| -rw-r--r-- | qpid/python/qpid/connection.py | 7 | ||||
| -rw-r--r-- | qpid/python/qpid/peer.py | 4 | ||||
| -rw-r--r-- | qpid/python/qpid/spec.py | 24 | ||||
| -rw-r--r-- | qpid/python/tests/spec.py | 6 | ||||
| -rw-r--r-- | qpid/python/tests_0-10/message.py | 28 |
8 files changed, 170 insertions, 45 deletions
diff --git a/qpid/python/hello-world b/qpid/python/hello-world index b3170c8e0c..5ba14b0fc1 100755 --- a/qpid/python/hello-world +++ b/qpid/python/hello-world @@ -3,14 +3,25 @@ import qpid from qpid.client import Client from qpid.content import Content -client = Client("127.0.0.1", 5672, qpid.spec.load("../specs/amqp.0-10-preview.xml")) +spec = qpid.spec.load("../specs/amqp.0-10-preview.xml") +client = Client("127.0.0.1", 5672, spec) client.start({"LOGIN": "guest", "PASSWORD": "guest"}) ch = client.channel(1) ch.session_open() ch.queue_declare(queue="test") ch.queue_bind(exchange="amq.direct", queue="test", routing_key="test") -print ch.queue_query(queue="test") -ch.message_subscribe(queue="test", destination="test") +#print ch.queue_query(queue="test") +ch.message_subscribe(queue="test", destination="amq.direct") +ch.message_flow("amq.direct", 0, 0xFFFFFFFF) +ch.message_flow("amq.direct", 1, 0xFFFFFFFF) msg = Content("hello world") msg["content_type"] = "text/plain" +msg["routing_key"] = "test" +msg["reply_to"] = spec.struct("reply_to") +msg["reply_to"].exchange_name = "asdf" +msg["reply_to"].routing_key = "fdsa" +msg["application_headers"] = {"x": 1, "y": 2, "z": "zee"} ch.message_transfer(destination="amq.direct", content=msg) +queue = client.queue("amq.direct") +msg = queue.get(timeout=10) +print msg diff --git a/qpid/python/qpid/__init__.py b/qpid/python/qpid/__init__.py index 3f6d82b89e..7afebaf73b 100644 --- a/qpid/python/qpid/__init__.py +++ b/qpid/python/qpid/__init__.py @@ -31,8 +31,12 @@ class Struct: raise AttributeError(attr) return field - def has(self, name): - return self.type.fields.byname.has_key(name) + def exists(self, attr): + return self.type.fields.byname.has_key(attr) + + def has(self, attr): + self._check(attr) + return self._values.has_key(attr) def set(self, attr, value): self._check(attr) @@ -42,17 +46,30 @@ class Struct: field = self._check(attr) return self._values.get(attr, field.default()) + def clear(self, attr): + self._check(attr) + del self._values[attr] + def __setattr__(self, attr, value): self.set(attr, value) def __getattr__(self, attr): return self.get(attr) + def __delattr__(self, attr): + self.clear(attr) + def __setitem__(self, attr, value): self.set(attr, value) def __getitem__(self, attr): return self.get(attr) + def __delitem__(self, attr): + self.clear(attr) + def __str__(self): - return "%s %s" % (self.type.type, self._values) + return "%s %s" % (self.type, self._values) + + def __repr__(self): + return str(self) diff --git a/qpid/python/qpid/codec.py b/qpid/python/qpid/codec.py index f6b0f980cb..c54d48df52 100644 --- a/qpid/python/qpid/codec.py +++ b/qpid/python/qpid/codec.py @@ -94,6 +94,7 @@ class Codec: """ reads in 'n' bytes from the stream. Can raise EOF exception """ + self.clearbits() data = self.stream.read(n) if n > 0 and len(data) == 0: raise EOF() @@ -130,6 +131,10 @@ class Codec: for byte in bytes: self.encode_octet(byte) + def clearbits(self): + if self.incoming_bits: + self.incoming_bits = [] + def pack(self, fmt, *args): """ packs the data 'args' as per the format 'fmt' and writes it to the stream @@ -237,6 +242,12 @@ class Codec: """ return self.unpack("!L") + def encode_signed_long(self, o): + self.pack("!q", o) + + def decode_signed_long(self): + return self.unpack("!q") + def encode_longlong(self, o): """ encodes long long (64 bits) data 'o' in network byte order @@ -332,14 +343,8 @@ class Codec: w = width(code) if fixed(code): value = self.read(w) - elif w == 1: - value = self.decode_shortstr() - elif w == 2: - value = self.dec_str("!H") - elif w == 4: - value = self.decode_longstr() else: - raise ValueError("illegal width: " + w) + value = self.read(self.dec_num(w)) result[key] = value return result @@ -409,19 +414,88 @@ class Codec: def decode_uuid(self): return self.unpack("16s") + def enc_num(self, width, n): + if width == 1: + self.encode_octet(n) + elif width == 2: + self.encode_short(n) + elif width == 3: + self.encode_long(n) + else: + raise ValueError("invalid width: %s" % width) + + def dec_num(self, width): + if width == 1: + return self.decode_octet() + elif width == 2: + return self.decode_short() + elif width == 4: + return self.decode_long() + else: + raise ValueError("invalid width: %s" % width) + def encode_struct(self, type, s): + if False and type.size: + enc = StringIO() + codec = Codec(enc, self.spec) + codec.encode_struct_body(type, s) + codec.flush() + body = enc.getvalue() + self.enc_num(type.size, len(body)) + self.write(body) + else: + self.encode_struct_body(type, s) + + def decode_struct(self, type): + if False and type.size: + size = self.dec_num(type.size) + if size == 0: + return None + return self.decode_struct_body(type) + + def encode_struct_body(self, type, s): + reserved = 8*type.pack - len(type.fields) + assert reserved >= 0 + for f in type.fields: if s == None: - val = f.default() + self.encode_bit(False) + elif f.type == "bit": + self.encode_bit(s.get(f.name)) else: - val = s.get(f.name) - self.encode(f.type, val) + self.encode_bit(s.has(f.name)) + + for i in range(reserved): + self.encode_bit(False) + + for f in type.fields: + if f.type != "bit" and s != None and s.has(f.name): + self.encode(f.type, s.get(f.name)) + self.flush() - def decode_struct(self, type): + def decode_struct_body(self, type): + reserved = 8*type.pack - len(type.fields) + assert reserved >= 0 + s = qpid.Struct(type) + for f in type.fields: - s.set(f.name, self.decode(f.type)) + if f.type == "bit": + s.set(f.name, self.decode_bit()) + elif self.decode_bit(): + s.set(f.name, None) + + for i in range(reserved): + if self.decode_bit(): + raise ValueError("expecting reserved flag") + + for f in type.fields: + if f.type != "bit" and s.has(f.name): + s.set(f.name, self.decode(f.type)) + + self.clearbits() + return s def encode_long_struct(self, s): @@ -429,13 +503,13 @@ class Codec: codec = Codec(enc, self.spec) type = s.type codec.encode_short(type.type) - codec.encode_struct(type, s) + codec.encode_struct_body(type, s) self.encode_longstr(enc.getvalue()) def decode_long_struct(self): codec = Codec(StringIO(self.decode_longstr()), self.spec) type = self.spec.structs[codec.decode_short()] - return codec.decode_struct(type) + return codec.decode_struct_body(type) def fixed(code): return (code >> 6) != 2 @@ -454,9 +528,9 @@ def width(code): raise ValueError(code) # variable width elif code < 192 and code >= 128: - lenlen = (self.code >> 4) & 3 + lenlen = (code >> 4) & 3 if lenlen == 3: raise ValueError(code) return 2 ** lenlen # fixed width else: - return (self.code >> 4) & 7 + return (code >> 4) & 7 diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py index 15d7e69945..1beb60822d 100644 --- a/qpid/python/qpid/connection.py +++ b/qpid/python/qpid/connection.py @@ -163,7 +163,10 @@ class Connection: body_size = frame_size - 12 # TODO: Magic number (frame header size) body = c.read(body_size) dec = codec.Codec(StringIO(body), self.spec) - frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) + try: + frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) + except EOF: + raise "truncated frame body: %r" % body frame.channel = channel frame.subchannel = subchannel end = c.decode_octet() @@ -350,7 +353,7 @@ class Header(Frame): props = self.properties.copy() for k in self.properties: for s in structs: - if s.has(k): + if s.exists(k): s.set(k, props.pop(k)) if props: raise TypeError("no such property: %s" % (", ".join(props))) diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py index 5c05e71cf1..b734031798 100644 --- a/qpid/python/qpid/peer.py +++ b/qpid/python/qpid/peer.py @@ -130,8 +130,8 @@ class Peer: content = None self.delegate(channel, Message(channel, frame, content)) - except QueueClosed, e: - self.close(e) + except QueueClosed: + self.close("worker closed") except: self.fatal() diff --git a/qpid/python/qpid/spec.py b/qpid/python/qpid/spec.py index 2542ccc3e1..31c79276aa 100644 --- a/qpid/python/qpid/spec.py +++ b/qpid/python/qpid/spec.py @@ -29,7 +29,7 @@ class so that the generated code can be reused in a variety of situations. """ -import re, textwrap, new, mllib +import re, textwrap, new, mllib, qpid class SpecContainer: @@ -115,6 +115,10 @@ class Spec(Metadata): klass, meth = parts return self.classes.byname[klass].methods.byname[meth] + def struct(self, name): + type = self.domains.byname[name].type + return qpid.Struct(type) + def define_module(self, name, doc = None): module = new.module(name, doc) module.__file__ = self.file @@ -303,14 +307,26 @@ class Field(Metadata): else: return Method.DEFAULTS[self.type] +WIDTHS = { + "octet": 1, + "short": 2, + "long": 4 + } + +def width(st, default=None): + if st in (None, "none", ""): + return default + else: + return WIDTHS[st] + def get_result(nd, spec): result = nd["result"] if not result: return None name = result["@domain"] if name != None: return spec.domains.byname[name] st_nd = result["struct"] - st = Struct(st_nd["@size"], int(result.parent.parent["@index"])*256 + - int(st_nd["@type"]), st_nd["@pack"]) + st = Struct(width(st_nd["@size"]), int(result.parent.parent["@index"])*256 + + int(st_nd["@type"]), width(st_nd["@pack"], 2)) spec.structs[st.type] = st load_fields(st_nd, st.fields, spec.domains.byname) return st @@ -366,7 +382,7 @@ def load(specfile, *errata): code = st_nd["@type"] if code not in (None, "", "none"): code = int(code) - type = Struct(st_nd["@size"], code, st_nd["@pack"]) + type = Struct(width(st_nd["@size"]), code, width(st_nd["@pack"], 2)) if type.type != None: spec.structs[type.type] = type structs.append((type, st_nd)) diff --git a/qpid/python/tests/spec.py b/qpid/python/tests/spec.py index c00faad3ba..ce03640493 100644 --- a/qpid/python/tests/spec.py +++ b/qpid/python/tests/spec.py @@ -32,13 +32,13 @@ class SpecTest(TestCase): if (spec.major == 0 and spec.minor == 10): assert qdecl_ok == None reply_to = spec.domains.byname["reply_to"] - assert reply_to.type.size == "short" - assert reply_to.type.pack == "short" + assert reply_to.type.size == 2 + assert reply_to.type.pack == 2 assert len(reply_to.type.fields) == 2 qq = spec.method("queue_query") assert qq != None - assert qq.result.size == "long" + assert qq.result.size == 4 assert qq.result.type != None args = qq.result.fields.byname["arguments"] assert args.type == "table" diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py index c414b15214..8212c7be67 100644 --- a/qpid/python/tests_0-10/message.py +++ b/qpid/python/tests_0-10/message.py @@ -437,23 +437,25 @@ class MessageTests(TestBase): channel.message_subscribe(queue = "q", destination = "c") channel.message_flow_mode(mode = 0, destination = "c") #send batch of messages to queue - for i in range(1, 11): + for i in range(10): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) - #each message is currently interpreted as requiring 75 bytes of credit + #each message is currently interpreted as requiring msg_size bytes of credit + msg_size = 40 + #set byte credit to finite amount (less than enough for all messages) - channel.message_flow(unit = 1, value = 75*5, destination = "c") + channel.message_flow(unit = 1, value = msg_size*5, destination = "c") #set infinite message credit channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") #check that expected number were received q = self.client.queue("c") - for i in range(1, 6): + for i in range(5): self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) - + #increase credit again and check more are received - for i in range(6, 11): - channel.message_flow(unit = 1, value = 75, destination = "c") + for i in range(5): + channel.message_flow(unit = 1, value = msg_size, destination = "c") self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) @@ -501,25 +503,27 @@ class MessageTests(TestBase): channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) channel.message_flow_mode(mode = 1, destination = "c") #send batch of messages to queue - for i in range(1, 11): + for i in range(10): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) - #each message is currently interpreted as requiring 75 bytes of credit + #each message is currently interpreted as requiring msg_size bytes of credit + msg_size = 40 + #set byte credit to finite amount (less than enough for all messages) - channel.message_flow(unit = 1, value = 75*5, destination = "c") + channel.message_flow(unit = 1, value = msg_size*5, destination = "c") #set infinite message credit channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") #check that expected number were received q = self.client.queue("c") msgs = [] - for i in range(1, 6): + for i in range(5): msg = q.get(timeout = 1) msgs.append(msg) self.assertDataEquals(channel, msg, "abcdefgh") self.assertEmpty(q) #ack each message individually and check more are received - for i in range(6, 11): + for i in range(5): msg = msgs.pop() msg.complete(cumulative=False) self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") |
