summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-10-14 02:21:59 +0000
committerRafael H. Schloming <rhs@apache.org>2007-10-14 02:21:59 +0000
commite0ba5becd0052cabe0cfa997dd35d7362bf2c472 (patch)
tree5c55f1dd6be53c4032be45ba422dacca1ded197f /qpid/python
parent2a10deef36eec1b8df9ec52dfb44a72eee7059a8 (diff)
downloadqpid-python-e0ba5becd0052cabe0cfa997dd35d7362bf2c472.tar.gz
Enabled packed struct encoding in python, cpp, and java. Also fixed computation of required byte credit in Message.cpp.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@584474 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
-rwxr-xr-xqpid/python/hello-world17
-rw-r--r--qpid/python/qpid/__init__.py23
-rw-r--r--qpid/python/qpid/codec.py106
-rw-r--r--qpid/python/qpid/connection.py7
-rw-r--r--qpid/python/qpid/peer.py4
-rw-r--r--qpid/python/qpid/spec.py24
-rw-r--r--qpid/python/tests/spec.py6
-rw-r--r--qpid/python/tests_0-10/message.py28
8 files changed, 170 insertions, 45 deletions
diff --git a/qpid/python/hello-world b/qpid/python/hello-world
index b3170c8e0c..5ba14b0fc1 100755
--- a/qpid/python/hello-world
+++ b/qpid/python/hello-world
@@ -3,14 +3,25 @@ import qpid
from qpid.client import Client
from qpid.content import Content
-client = Client("127.0.0.1", 5672, qpid.spec.load("../specs/amqp.0-10-preview.xml"))
+spec = qpid.spec.load("../specs/amqp.0-10-preview.xml")
+client = Client("127.0.0.1", 5672, spec)
client.start({"LOGIN": "guest", "PASSWORD": "guest"})
ch = client.channel(1)
ch.session_open()
ch.queue_declare(queue="test")
ch.queue_bind(exchange="amq.direct", queue="test", routing_key="test")
-print ch.queue_query(queue="test")
-ch.message_subscribe(queue="test", destination="test")
+#print ch.queue_query(queue="test")
+ch.message_subscribe(queue="test", destination="amq.direct")
+ch.message_flow("amq.direct", 0, 0xFFFFFFFF)
+ch.message_flow("amq.direct", 1, 0xFFFFFFFF)
msg = Content("hello world")
msg["content_type"] = "text/plain"
+msg["routing_key"] = "test"
+msg["reply_to"] = spec.struct("reply_to")
+msg["reply_to"].exchange_name = "asdf"
+msg["reply_to"].routing_key = "fdsa"
+msg["application_headers"] = {"x": 1, "y": 2, "z": "zee"}
ch.message_transfer(destination="amq.direct", content=msg)
+queue = client.queue("amq.direct")
+msg = queue.get(timeout=10)
+print msg
diff --git a/qpid/python/qpid/__init__.py b/qpid/python/qpid/__init__.py
index 3f6d82b89e..7afebaf73b 100644
--- a/qpid/python/qpid/__init__.py
+++ b/qpid/python/qpid/__init__.py
@@ -31,8 +31,12 @@ class Struct:
raise AttributeError(attr)
return field
- def has(self, name):
- return self.type.fields.byname.has_key(name)
+ def exists(self, attr):
+ return self.type.fields.byname.has_key(attr)
+
+ def has(self, attr):
+ self._check(attr)
+ return self._values.has_key(attr)
def set(self, attr, value):
self._check(attr)
@@ -42,17 +46,30 @@ class Struct:
field = self._check(attr)
return self._values.get(attr, field.default())
+ def clear(self, attr):
+ self._check(attr)
+ del self._values[attr]
+
def __setattr__(self, attr, value):
self.set(attr, value)
def __getattr__(self, attr):
return self.get(attr)
+ def __delattr__(self, attr):
+ self.clear(attr)
+
def __setitem__(self, attr, value):
self.set(attr, value)
def __getitem__(self, attr):
return self.get(attr)
+ def __delitem__(self, attr):
+ self.clear(attr)
+
def __str__(self):
- return "%s %s" % (self.type.type, self._values)
+ return "%s %s" % (self.type, self._values)
+
+ def __repr__(self):
+ return str(self)
diff --git a/qpid/python/qpid/codec.py b/qpid/python/qpid/codec.py
index f6b0f980cb..c54d48df52 100644
--- a/qpid/python/qpid/codec.py
+++ b/qpid/python/qpid/codec.py
@@ -94,6 +94,7 @@ class Codec:
"""
reads in 'n' bytes from the stream. Can raise EOF exception
"""
+ self.clearbits()
data = self.stream.read(n)
if n > 0 and len(data) == 0:
raise EOF()
@@ -130,6 +131,10 @@ class Codec:
for byte in bytes:
self.encode_octet(byte)
+ def clearbits(self):
+ if self.incoming_bits:
+ self.incoming_bits = []
+
def pack(self, fmt, *args):
"""
packs the data 'args' as per the format 'fmt' and writes it to the stream
@@ -237,6 +242,12 @@ class Codec:
"""
return self.unpack("!L")
+ def encode_signed_long(self, o):
+ self.pack("!q", o)
+
+ def decode_signed_long(self):
+ return self.unpack("!q")
+
def encode_longlong(self, o):
"""
encodes long long (64 bits) data 'o' in network byte order
@@ -332,14 +343,8 @@ class Codec:
w = width(code)
if fixed(code):
value = self.read(w)
- elif w == 1:
- value = self.decode_shortstr()
- elif w == 2:
- value = self.dec_str("!H")
- elif w == 4:
- value = self.decode_longstr()
else:
- raise ValueError("illegal width: " + w)
+ value = self.read(self.dec_num(w))
result[key] = value
return result
@@ -409,19 +414,88 @@ class Codec:
def decode_uuid(self):
return self.unpack("16s")
+ def enc_num(self, width, n):
+ if width == 1:
+ self.encode_octet(n)
+ elif width == 2:
+ self.encode_short(n)
+ elif width == 3:
+ self.encode_long(n)
+ else:
+ raise ValueError("invalid width: %s" % width)
+
+ def dec_num(self, width):
+ if width == 1:
+ return self.decode_octet()
+ elif width == 2:
+ return self.decode_short()
+ elif width == 4:
+ return self.decode_long()
+ else:
+ raise ValueError("invalid width: %s" % width)
+
def encode_struct(self, type, s):
+ if False and type.size:
+ enc = StringIO()
+ codec = Codec(enc, self.spec)
+ codec.encode_struct_body(type, s)
+ codec.flush()
+ body = enc.getvalue()
+ self.enc_num(type.size, len(body))
+ self.write(body)
+ else:
+ self.encode_struct_body(type, s)
+
+ def decode_struct(self, type):
+ if False and type.size:
+ size = self.dec_num(type.size)
+ if size == 0:
+ return None
+ return self.decode_struct_body(type)
+
+ def encode_struct_body(self, type, s):
+ reserved = 8*type.pack - len(type.fields)
+ assert reserved >= 0
+
for f in type.fields:
if s == None:
- val = f.default()
+ self.encode_bit(False)
+ elif f.type == "bit":
+ self.encode_bit(s.get(f.name))
else:
- val = s.get(f.name)
- self.encode(f.type, val)
+ self.encode_bit(s.has(f.name))
+
+ for i in range(reserved):
+ self.encode_bit(False)
+
+ for f in type.fields:
+ if f.type != "bit" and s != None and s.has(f.name):
+ self.encode(f.type, s.get(f.name))
+
self.flush()
- def decode_struct(self, type):
+ def decode_struct_body(self, type):
+ reserved = 8*type.pack - len(type.fields)
+ assert reserved >= 0
+
s = qpid.Struct(type)
+
for f in type.fields:
- s.set(f.name, self.decode(f.type))
+ if f.type == "bit":
+ s.set(f.name, self.decode_bit())
+ elif self.decode_bit():
+ s.set(f.name, None)
+
+ for i in range(reserved):
+ if self.decode_bit():
+ raise ValueError("expecting reserved flag")
+
+ for f in type.fields:
+ if f.type != "bit" and s.has(f.name):
+ s.set(f.name, self.decode(f.type))
+
+ self.clearbits()
+
return s
def encode_long_struct(self, s):
@@ -429,13 +503,13 @@ class Codec:
codec = Codec(enc, self.spec)
type = s.type
codec.encode_short(type.type)
- codec.encode_struct(type, s)
+ codec.encode_struct_body(type, s)
self.encode_longstr(enc.getvalue())
def decode_long_struct(self):
codec = Codec(StringIO(self.decode_longstr()), self.spec)
type = self.spec.structs[codec.decode_short()]
- return codec.decode_struct(type)
+ return codec.decode_struct_body(type)
def fixed(code):
return (code >> 6) != 2
@@ -454,9 +528,9 @@ def width(code):
raise ValueError(code)
# variable width
elif code < 192 and code >= 128:
- lenlen = (self.code >> 4) & 3
+ lenlen = (code >> 4) & 3
if lenlen == 3: raise ValueError(code)
return 2 ** lenlen
# fixed width
else:
- return (self.code >> 4) & 7
+ return (code >> 4) & 7
diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py
index 15d7e69945..1beb60822d 100644
--- a/qpid/python/qpid/connection.py
+++ b/qpid/python/qpid/connection.py
@@ -163,7 +163,10 @@ class Connection:
body_size = frame_size - 12 # TODO: Magic number (frame header size)
body = c.read(body_size)
dec = codec.Codec(StringIO(body), self.spec)
- frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
+ try:
+ frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
+ except EOF:
+ raise "truncated frame body: %r" % body
frame.channel = channel
frame.subchannel = subchannel
end = c.decode_octet()
@@ -350,7 +353,7 @@ class Header(Frame):
props = self.properties.copy()
for k in self.properties:
for s in structs:
- if s.has(k):
+ if s.exists(k):
s.set(k, props.pop(k))
if props:
raise TypeError("no such property: %s" % (", ".join(props)))
diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py
index 5c05e71cf1..b734031798 100644
--- a/qpid/python/qpid/peer.py
+++ b/qpid/python/qpid/peer.py
@@ -130,8 +130,8 @@ class Peer:
content = None
self.delegate(channel, Message(channel, frame, content))
- except QueueClosed, e:
- self.close(e)
+ except QueueClosed:
+ self.close("worker closed")
except:
self.fatal()
diff --git a/qpid/python/qpid/spec.py b/qpid/python/qpid/spec.py
index 2542ccc3e1..31c79276aa 100644
--- a/qpid/python/qpid/spec.py
+++ b/qpid/python/qpid/spec.py
@@ -29,7 +29,7 @@ class so that the generated code can be reused in a variety of
situations.
"""
-import re, textwrap, new, mllib
+import re, textwrap, new, mllib, qpid
class SpecContainer:
@@ -115,6 +115,10 @@ class Spec(Metadata):
klass, meth = parts
return self.classes.byname[klass].methods.byname[meth]
+ def struct(self, name):
+ type = self.domains.byname[name].type
+ return qpid.Struct(type)
+
def define_module(self, name, doc = None):
module = new.module(name, doc)
module.__file__ = self.file
@@ -303,14 +307,26 @@ class Field(Metadata):
else:
return Method.DEFAULTS[self.type]
+WIDTHS = {
+ "octet": 1,
+ "short": 2,
+ "long": 4
+ }
+
+def width(st, default=None):
+ if st in (None, "none", ""):
+ return default
+ else:
+ return WIDTHS[st]
+
def get_result(nd, spec):
result = nd["result"]
if not result: return None
name = result["@domain"]
if name != None: return spec.domains.byname[name]
st_nd = result["struct"]
- st = Struct(st_nd["@size"], int(result.parent.parent["@index"])*256 +
- int(st_nd["@type"]), st_nd["@pack"])
+ st = Struct(width(st_nd["@size"]), int(result.parent.parent["@index"])*256 +
+ int(st_nd["@type"]), width(st_nd["@pack"], 2))
spec.structs[st.type] = st
load_fields(st_nd, st.fields, spec.domains.byname)
return st
@@ -366,7 +382,7 @@ def load(specfile, *errata):
code = st_nd["@type"]
if code not in (None, "", "none"):
code = int(code)
- type = Struct(st_nd["@size"], code, st_nd["@pack"])
+ type = Struct(width(st_nd["@size"]), code, width(st_nd["@pack"], 2))
if type.type != None:
spec.structs[type.type] = type
structs.append((type, st_nd))
diff --git a/qpid/python/tests/spec.py b/qpid/python/tests/spec.py
index c00faad3ba..ce03640493 100644
--- a/qpid/python/tests/spec.py
+++ b/qpid/python/tests/spec.py
@@ -32,13 +32,13 @@ class SpecTest(TestCase):
if (spec.major == 0 and spec.minor == 10):
assert qdecl_ok == None
reply_to = spec.domains.byname["reply_to"]
- assert reply_to.type.size == "short"
- assert reply_to.type.pack == "short"
+ assert reply_to.type.size == 2
+ assert reply_to.type.pack == 2
assert len(reply_to.type.fields) == 2
qq = spec.method("queue_query")
assert qq != None
- assert qq.result.size == "long"
+ assert qq.result.size == 4
assert qq.result.type != None
args = qq.result.fields.byname["arguments"]
assert args.type == "table"
diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py
index c414b15214..8212c7be67 100644
--- a/qpid/python/tests_0-10/message.py
+++ b/qpid/python/tests_0-10/message.py
@@ -437,23 +437,25 @@ class MessageTests(TestBase):
channel.message_subscribe(queue = "q", destination = "c")
channel.message_flow_mode(mode = 0, destination = "c")
#send batch of messages to queue
- for i in range(1, 11):
+ for i in range(10):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
- #each message is currently interpreted as requiring 75 bytes of credit
+ #each message is currently interpreted as requiring msg_size bytes of credit
+ msg_size = 40
+
#set byte credit to finite amount (less than enough for all messages)
- channel.message_flow(unit = 1, value = 75*5, destination = "c")
+ channel.message_flow(unit = 1, value = msg_size*5, destination = "c")
#set infinite message credit
channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
#check that expected number were received
q = self.client.queue("c")
- for i in range(1, 6):
+ for i in range(5):
self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
self.assertEmpty(q)
-
+
#increase credit again and check more are received
- for i in range(6, 11):
- channel.message_flow(unit = 1, value = 75, destination = "c")
+ for i in range(5):
+ channel.message_flow(unit = 1, value = msg_size, destination = "c")
self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
self.assertEmpty(q)
@@ -501,25 +503,27 @@ class MessageTests(TestBase):
channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
channel.message_flow_mode(mode = 1, destination = "c")
#send batch of messages to queue
- for i in range(1, 11):
+ for i in range(10):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
- #each message is currently interpreted as requiring 75 bytes of credit
+ #each message is currently interpreted as requiring msg_size bytes of credit
+ msg_size = 40
+
#set byte credit to finite amount (less than enough for all messages)
- channel.message_flow(unit = 1, value = 75*5, destination = "c")
+ channel.message_flow(unit = 1, value = msg_size*5, destination = "c")
#set infinite message credit
channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
#check that expected number were received
q = self.client.queue("c")
msgs = []
- for i in range(1, 6):
+ for i in range(5):
msg = q.get(timeout = 1)
msgs.append(msg)
self.assertDataEquals(channel, msg, "abcdefgh")
self.assertEmpty(q)
#ack each message individually and check more are received
- for i in range(6, 11):
+ for i in range(5):
msg = msgs.pop()
msg.complete(cumulative=False)
self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")