summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-10-14 02:21:59 +0000
committerRafael H. Schloming <rhs@apache.org>2007-10-14 02:21:59 +0000
commite0ba5becd0052cabe0cfa997dd35d7362bf2c472 (patch)
tree5c55f1dd6be53c4032be45ba422dacca1ded197f
parent2a10deef36eec1b8df9ec52dfb44a72eee7059a8 (diff)
downloadqpid-python-e0ba5becd0052cabe0cfa997dd35d7362bf2c472.tar.gz
Enabled packed struct encoding in python, cpp, and java. Also fixed computation of required byte credit in Message.cpp.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@584474 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/rubygen/amqpgen.rb25
-rw-r--r--qpid/cpp/rubygen/templates/structs.rb49
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java8
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java10
-rwxr-xr-xqpid/java/common/generate67
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java82
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java64
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java43
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java123
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java15
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java75
-rwxr-xr-xqpid/python/hello-world17
-rw-r--r--qpid/python/qpid/__init__.py23
-rw-r--r--qpid/python/qpid/codec.py106
-rw-r--r--qpid/python/qpid/connection.py7
-rw-r--r--qpid/python/qpid/peer.py4
-rw-r--r--qpid/python/qpid/spec.py24
-rw-r--r--qpid/python/tests/spec.py6
-rw-r--r--qpid/python/tests_0-10/message.py28
24 files changed, 576 insertions, 211 deletions
diff --git a/qpid/cpp/rubygen/amqpgen.rb b/qpid/cpp/rubygen/amqpgen.rb
index 21bc05c651..5952b494df 100755
--- a/qpid/cpp/rubygen/amqpgen.rb
+++ b/qpid/cpp/rubygen/amqpgen.rb
@@ -41,10 +41,25 @@ class Module
# Add attribute reader for XML attribute.
def amqp_attr_reader(*attrs)
attrs.each { |a|
- define_method(mangle(a)) {
- @amqp_attr_reader||={ }
- @amqp_attr_reader[a] ||= xml.attributes[a.to_s]
- }
+ case a
+ when Symbol
+ define_method(mangle(a)) {
+ @amqp_attr_reader||={ }
+ @amqp_attr_reader[a] ||= xml.attributes[a.to_s]
+ }
+ when Hash
+ a.each { |attr, default|
+ define_method(mangle(attr)) {
+ @amqp_attr_reader||={ }
+ value = xml.attributes[attr.to_s]
+ if value
+ @amqp_attr_reader[attr] ||= value
+ else
+ @amqp_attr_reader[attr] ||= default
+ end
+ }
+ }
+ end
}
end
@@ -151,7 +166,7 @@ end
class AmqpStruct < AmqpElement
def initialize(xml, parent) super; end
- amqp_attr_reader :size, :type, :pack
+ amqp_attr_reader :size, :type, :pack => "short"
amqp_child_reader :field
def result?() parent.xml.name == "result"; end
diff --git a/qpid/cpp/rubygen/templates/structs.rb b/qpid/cpp/rubygen/templates/structs.rb
index 15962af4df..2f5a3d8365 100644
--- a/qpid/cpp/rubygen/templates/structs.rb
+++ b/qpid/cpp/rubygen/templates/structs.rb
@@ -23,17 +23,17 @@ class StructGen < CppGen
"long-struct"=>"LongString"
}
SizeMap={
- "octet"=>"1",
- "short"=>"2",
- "long"=>"4",
- "longlong"=>"8",
- "timestamp"=>"8"
+ "octet"=>1,
+ "short"=>2,
+ "long"=>4,
+ "longlong"=>8,
+ "timestamp"=>8
}
ValueTypes=["octet", "short", "long", "longlong", "timestamp"]
def is_packed(s)
- false and s.kind_of? AmqpStruct and s.pack
+ s.kind_of? AmqpStruct
end
def execution_header?(s)
@@ -62,55 +62,60 @@ class StructGen < CppGen
end
end
+ def flag_mask(s, i)
+ pos = SizeMap[s.pack]*8 - 8 - (i/8)*8 + (i % 8)
+ return "(1 << #{pos})"
+ end
+
def get_flags_impl(s)
genl "#{s.cpp_pack_type.name} flags = 0;"
- process_packed_fields(s) { |f, i| set_field_flag(f, i) }
+ process_packed_fields(s) { |f, i| set_field_flag(s, f, i) }
genl "return flags;"
end
- def set_field_flag(f, i)
+ def set_field_flag(s, f, i)
if (ValueTypes.include?(f.domain.type_) || f.domain.type_ == "bit")
- genl "if (#{f.cppname}) flags |= (1 << #{i});"
+ genl "if (#{f.cppname}) flags |= #{flag_mask(s, i)};"
else
- genl "if (#{f.cppname}.size()) flags |= (1 << #{i});"
+ genl "if (#{f.cppname}.size()) flags |= #{flag_mask(s, i)};"
end
end
def encode_packed_struct(s)
genl "#{s.cpp_pack_type.name} flags = getFlags();"
genl s.cpp_pack_type.encode('flags', 'buffer')
- process_packed_fields(s) { |f, i| encode_packed_field(f, i) unless f.domain.type_ == "bit" }
+ process_packed_fields(s) { |f, i| encode_packed_field(s, f, i) unless f.domain.type_ == "bit" }
end
def decode_packed_struct(s)
genl "#{s.cpp_pack_type.name} #{s.cpp_pack_type.decode('flags', 'buffer')}"
- process_packed_fields(s) { |f, i| decode_packed_field(f, i) unless f.domain.type_ == "bit" }
- process_packed_fields(s) { |f, i| set_bitfield(f, i) if f.domain.type_ == "bit" }
+ process_packed_fields(s) { |f, i| decode_packed_field(s, f, i) unless f.domain.type_ == "bit" }
+ process_packed_fields(s) { |f, i| set_bitfield(s, f, i) if f.domain.type_ == "bit" }
end
def size_packed_struct(s)
genl "#{s.cpp_pack_type.name} flags = getFlags();" unless has_bitfields_only(s)
genl "total += #{SizeMap[s.pack]};"
- process_packed_fields(s) { |f, i| size_packed_field(f, i) unless f.domain.type_ == "bit" }
+ process_packed_fields(s) { |f, i| size_packed_field(s, f, i) unless f.domain.type_ == "bit" }
end
- def encode_packed_field(f, i)
- genl "if (flags & (1 << #{i}))"
+ def encode_packed_field(s, f, i)
+ genl "if (flags & #{flag_mask(s, i)})"
indent { genl f.domain.cpptype.encode(f.cppname,"buffer") }
end
- def decode_packed_field(f, i)
- genl "if (flags & (1 << #{i}))"
+ def decode_packed_field(s, f, i)
+ genl "if (flags & #{flag_mask(s, i)})"
indent { genl f.domain.cpptype.decode(f.cppname,"buffer") }
end
- def size_packed_field(f, i)
- genl "if (flags & (1 << #{i}))"
+ def size_packed_field(s, f, i)
+ genl "if (flags & #{flag_mask(s, i)})"
indent { generate_size(f, []) }
end
- def set_bitfield(f, i)
- genl "#{f.cppname} = (flags & (1 << #{i}));"
+ def set_bitfield(s, f, i)
+ genl "#{f.cppname} = (flags & #{flag_mask(s, i)});"
end
def generate_encode(f, combined)
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 3dbd192d3c..5d572283ce 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -75,7 +75,7 @@ uint32_t Message::getRequiredCredit() const
{
//add up payload for all header and content frames in the frameset
SumBodySize sum;
- frames.map_if(sum, TypeFilter(METHOD_BODY, HEADER_BODY));
+ frames.map_if(sum, TypeFilter(HEADER_BODY, CONTENT_BODY));
return sum.getSize();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index dbe01a96c6..08eba25d16 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -148,7 +148,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
// if there is a replyto destination then we need to request the exchange info
- if (!message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
+ ReplyTo replyTo = message.getMessageProperties().getReplyTo();
+ if (replyTo != null &&
+ replyTo.getExchangeName() != null &&
+ !replyTo.getExchangeName().equals(""))
{
Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
.exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index 9313149442..e037c3ed3e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -127,7 +127,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
props.setContentType(mprop.getContentType());
props.setCorrelationId(mprop.getCorrelationId());
String encoding = mprop.getContentEncoding();
- if (!encoding.equals(""))
+ if (encoding != null && !encoding.equals(""))
{
props.setEncoding(encoding);
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
index 98c893eddd..75e50ee09b 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -203,12 +203,12 @@ public class MessageListenerMultiConsumerTest extends QpidTestCase
for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
{
- assertTrue(_consumer1.receive() != null);
+ assertTrue(_consumer1.receive(3000) != null);
}
for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
{
- assertTrue(consumer2.receive() != null);
+ assertTrue(consumer2.receive(3000) != null);
}
}
else
@@ -218,12 +218,12 @@ public class MessageListenerMultiConsumerTest extends QpidTestCase
for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
{
- assertTrue(_consumer1.receive() != null);
+ assertTrue(_consumer1.receive(3000) != null);
}
for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
{
- assertTrue(_consumer2.receive() != null);
+ assertTrue(_consumer2.receive(3000) != null);
}
}
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
index b151aea8ae..c2524c366c 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
@@ -199,19 +199,27 @@ public class ChannelCloseOkTest extends QpidTestCase
private void waitFor(List<Message> received, int count) throws InterruptedException
{
+ long timeout = 3000;
+
synchronized (received)
{
+ long start = System.currentTimeMillis();
while (received.size() < count)
{
+ if (System.currentTimeMillis() - start > timeout)
+ {
+ fail("timeout expired waiting for messages");
+ }
try
{
- received.wait();
+ received.wait(timeout);
}
catch (InterruptedException e)
{
_log.info("Interrupted: " + e);
throw e;
}
+
}
}
}
diff --git a/qpid/java/common/generate b/qpid/java/common/generate
index f3a53ee8da..701efe03a9 100755
--- a/qpid/java/common/generate
+++ b/qpid/java/common/generate
@@ -11,6 +11,12 @@ spec_file = sys.argv[3]
spec = mllib.xml_parse(spec_file)
+def jbool(b):
+ if b:
+ return "true"
+ else:
+ return "false"
+
class Output:
def __init__(self, dir, package, name):
@@ -37,11 +43,12 @@ class Output:
def line(self, l = ""):
self.lines.append(l)
- def getter(self, type, method, variable, pre = None):
+ def getter(self, type, method, value, pre = None):
+ self.line()
self.line(" public final %s %s() {" % (type, method))
if pre:
self.line(" %s;" % pre)
- self.line(" return %s;" % variable)
+ self.line(" return %s;" % value)
self.line(" }")
def setter(self, type, method, variable, value = None, pre = None,
@@ -52,6 +59,7 @@ class Output:
params = "%s value" % type
value = "value"
+ self.line()
self.line(" public final %s %s(%s) {" % (self.name, method, params))
if pre:
self.line(" %s;" % pre)
@@ -301,37 +309,15 @@ class Struct:
def impl(self, out):
out.line("public class %s extends %s {" % (self.name, self.base))
- if self.type != None:
- out.line()
- out.line(" public static final int TYPE = %d;" % self.type)
-
- out.line()
- if self.type == None:
- pre = "if (true) throw new UnsupportedOperationException()"
- value = "0"
- else:
- pre = None
- value = "TYPE"
- out.getter("int", "getEncodedType", value, pre = pre)
-
out.line()
+ out.line(" public static final int TYPE = %d;" % self.type)
+ out.getter("int", "getStructType", "TYPE")
out.getter("int", "getSizeWidth", self.size)
- out.line()
out.getter("int", "getPackWidth", self.pack)
-
- if self.ticket:
- out.getter("boolean", "hasTicket", "true")
- else:
- out.getter("boolean", "hasTicket", "false");
+ out.getter("boolean", "hasTicket", jbool(self.ticket))
if self.base == "Method":
- out.line()
- if self.content:
- out.getter("boolean", "hasPayload", "true")
- else:
- out.getter("boolean", "hasPayload", "false")
-
- out.line()
+ out.getter("boolean", "hasPayload", jbool(self.content))
out.getter("byte", "getEncodedTrack", self.track)
out.line()
@@ -382,7 +368,6 @@ class Struct:
index = 0
for type, name in self.fields:
- out.line()
out.getter("boolean", camel(1, "has", name), "has_" + name)
out.setter("boolean", camel(1, "clear", name), "has_" + name, "false",
post = "this.%s = %s" % (name, DEFAULTS.get(type, "null")))
@@ -409,8 +394,8 @@ class Struct:
if TYPES.has_key(type):
out.line(' check(struct).%s = dec.read%s();' % (name, camel(0, type)))
elif STRUCTS.has_key(type):
- out.line(' check(struct).%s = new %s();' % (name, STRUCTS[type]))
- out.line(' check(struct).%s.read(dec);' % name)
+ out.line(' check(struct).%s = (%s) dec.readStruct(%s.TYPE);' %
+ (name, STRUCTS[type], STRUCTS[type]))
else:
raise Exception("unknown type: %s" % type)
out.line(' }')
@@ -418,11 +403,8 @@ class Struct:
if TYPES.has_key(type):
out.line(' enc.write%s(check(struct).%s);' % (camel(0, type), name))
elif STRUCTS.has_key(type):
- out.line(" if (check(struct).%s == null) {" % name)
- out.line(" new %s().write(enc);" % jtype(type))
- out.line(" } else {")
- out.line(' check(struct).%s.write(enc);' % name)
- out.line(" }")
+ out.line(' enc.writeStruct(%s.TYPE, check(struct).%s);' %
+ (STRUCTS[type], name))
else:
raise Exception("unknown type: %s" % type)
out.line(' }')
@@ -473,6 +455,7 @@ class Visitor(mllib.transforms.Visitor):
def __init__(self):
self.structs = []
+ self.untyped = -1
def do_method(self, m):
if CLASSES.get(m.parent["@name"], True):
@@ -487,11 +470,12 @@ class Visitor(mllib.transforms.Visitor):
name = camel(0, d["@name"])
st = s["@type"]
if st in (None, "none", ""):
- type = None
+ type = self.untyped
+ self.untyped -= 1
else:
type = int(st)
- self.structs.append((name, "Struct", type, SIZE_WIDTHS[s["size"]],
- PACK_WIDTHS[s["pack"]], s))
+ self.structs.append((name, "Struct", type, SIZE_WIDTHS[s["@size"]],
+ PACK_WIDTHS[s["@pack"]], s))
self.descend(d)
def do_result(self, r):
@@ -499,8 +483,8 @@ class Visitor(mllib.transforms.Visitor):
if s:
name = camel(0, r.parent.parent["@name"], r.parent["@name"], "Result")
type = int(r.parent.parent["@index"]) * 256 + int(s["@type"])
- self.structs.append((name, "Result", type, SIZE_WIDTHS[s["size"]],
- PACK_WIDTHS[s["pack"]], s))
+ self.structs.append((name, "Result", type, SIZE_WIDTHS[s["@size"]],
+ PACK_WIDTHS[s["@pack"]], s))
self.descend(r)
v = Visitor()
@@ -540,7 +524,6 @@ fct.line("class StructFactory {")
fct.line(" public static Struct create(int type) {")
fct.line(" switch (type) {")
for s in structs:
- if s.type == None: continue
fct.line(" case %s.TYPE:" % s.name)
fct.line(" return new %s();" % s.name)
fct.line(" default:")
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java
new file mode 100644
index 0000000000..03a684dd47
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.transport.network.mina.MinaHandler;
+
+
+/**
+ * Echo
+ *
+ */
+
+public class Echo extends SessionDelegate
+{
+
+ private MessageTransfer xfr = null;
+
+ public void messageTransfer(Session ssn, MessageTransfer xfr)
+ {
+ this.xfr = xfr;
+ ssn.invoke(xfr);
+ }
+
+ public void header(Session ssn, Header hdr)
+ {
+ ssn.header(hdr);
+ }
+
+ public void data(Session ssn, Data data)
+ {
+ for (ByteBuffer buf : data.getFragments())
+ {
+ ssn.data(buf);
+ }
+ if (data.isLast())
+ {
+ ssn.endData();
+ }
+
+ // XXX: should be able to get command-id from any segment
+ ssn.processed(xfr);
+ }
+
+ public static final void main(String[] args) throws IOException
+ {
+ ConnectionDelegate delegate = new ConnectionDelegate()
+ {
+ public SessionDelegate getSessionDelegate()
+ {
+ return new Echo();
+ }
+ };
+
+ //hack
+ delegate.setUsername("guest");
+ delegate.setPassword("guest");
+
+ MinaHandler.accept("0.0.0.0", 5672, delegate);
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java
index b87512284f..d13af88127 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java
@@ -41,14 +41,24 @@ public abstract class Struct implements Encodable
return StructFactory.create(type);
}
- public abstract List<Field<?,?>> getFields();
+ public abstract int getStructType();
- public abstract int getEncodedType();
+ public abstract List<Field<?,?>> getFields();
public abstract int getSizeWidth();
public abstract int getPackWidth();
+ public final int getEncodedType()
+ {
+ int type = getStructType();
+ if (type < 0)
+ {
+ throw new UnsupportedOperationException();
+ }
+ return type;
+ }
+
public abstract boolean hasTicket();
private final boolean isBit(Field<?,?> f)
@@ -56,11 +66,21 @@ public abstract class Struct implements Encodable
return f.getType().equals(Boolean.class);
}
+ private final boolean packed()
+ {
+ if (this instanceof Method)
+ {
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
private final boolean encoded(Field<?,?> f)
{
- // XXX: remove to enable packed encoding
- if (true) { return true; }
- return !isBit(f) && f.has(this);
+ return !packed() || !isBit(f) && f.has(this);
}
private final int getFlagWidth()
@@ -75,14 +95,23 @@ public abstract class Struct implements Encodable
return pw;
}
+ private final int getFlagCount()
+ {
+ return 8*getPackWidth();
+ }
+
+ private final int getReservedFlagCount()
+ {
+ return getFlagCount() - getFields().size();
+ }
+
public final void read(Decoder dec)
{
List<Field<?,?>> fields = getFields();
- assert fields.size() <= 8*getPackWidth();
+ assert fields.size() <= getFlagCount();
- // XXX: remove to enable packed encoding
- if (false)
+ if (packed())
{
for (Field<?,?> f : fields)
{
@@ -97,12 +126,11 @@ public abstract class Struct implements Encodable
}
}
- for (int i = 0; i < getPaddWidth(); i++)
+ for (int i = 0; i < getReservedFlagCount(); i++)
{
- short padd = dec.readOctet();
- if (padd != 0x0)
+ if (dec.readBit())
{
- throw new IllegalStateException("urecognized value in reserved bytes: " + padd);
+ throw new IllegalStateException("reserved flag true");
}
}
}
@@ -125,10 +153,9 @@ public abstract class Struct implements Encodable
{
List<Field<?,?>> fields = getFields();
- assert fields.size() <= 8*getPackWidth();
+ assert fields.size() <= getFlagCount();
- // XXX: remove to enable packed encoding
- if (false)
+ if (packed())
{
for (Field<?,?> f : fields)
{
@@ -142,9 +169,9 @@ public abstract class Struct implements Encodable
}
}
- for (int i = 0; i < getPaddWidth(); i++)
+ for (int i = 0; i < getReservedFlagCount(); i++)
{
- enc.writeOctet((short) 0x0);
+ enc.writeBit(false);
}
}
@@ -171,8 +198,7 @@ public abstract class Struct implements Encodable
boolean first = true;
for (Field<?,?> f : getFields())
{
- // XXX: remove when packed encoding is enabled
- if (false)
+ if (packed())
{
if (!f.has(this))
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
index e5997d6642..0e06b9e88c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
@@ -186,6 +186,22 @@ abstract class AbstractDecoder implements Decoder
throw new Error("Deprecated");
}
+ public Struct readStruct(int type)
+ {
+ Struct st = Struct.create(type);
+ int width = st.getSizeWidth();
+ if (false && width > 0)
+ {
+ long size = readSize(width);
+ if (size == 0)
+ {
+ return null;
+ }
+ }
+ st.read(this);
+ return st;
+ }
+
public Struct readLongStruct()
{
long size = readLong();
@@ -270,17 +286,22 @@ abstract class AbstractDecoder implements Decoder
}
else
{
- switch (t.width)
- {
- case 1:
- return readOctet();
- case 2:
- return readShort();
- case 4:
- return readLong();
- default:
- throw new IllegalStateException("irregular width: " + t);
- }
+ return readSize(t.width);
+ }
+ }
+
+ private long readSize(int width)
+ {
+ switch (width)
+ {
+ case 1:
+ return readOctet();
+ case 2:
+ return readShort();
+ case 4:
+ return readLong();
+ default:
+ throw new IllegalStateException("illegal width: " + width);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
index 06e44fb1c6..e2b5cd41b8 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
@@ -61,20 +61,19 @@ abstract class AbstractEncoder implements Encoder
ENCODINGS.put(byte[].class, Type.LONG_BINARY);
}
+ // XXX: no longer need major/minor
private final byte major;
private final byte minor;
- private final boolean calcsize;
- protected AbstractEncoder(byte major, byte minor, boolean calcsize)
+ protected AbstractEncoder(byte major, byte minor)
{
this.major = major;
this.minor = minor;
- this.calcsize = calcsize;
}
- protected AbstractEncoder(byte major, byte minor)
+ protected Sizer sizer()
{
- this(major, minor, true);
+ return new SizeEncoder(major, minor);
}
protected abstract void doPut(byte b);
@@ -224,25 +223,45 @@ abstract class AbstractEncoder implements Encoder
throw new Error("Deprecated");
}
- public void writeLongStruct(Struct s)
+ public void writeStruct(int type, Struct s)
{
+ boolean empty = false;
if (s == null)
{
- writeLong(0);
+ s = Struct.create(type);
+ empty = true;
}
- else
+
+ int width = s.getSizeWidth();
+ if (false && width > 0)
{
- int size = 0;
- if (calcsize)
+ if (empty)
{
- SizeEncoder sizer = new SizeEncoder(major, minor);
- sizer.writeShort(s.getEncodedType());
+ writeSize(width, 0);
+ }
+ else
+ {
+ Sizer sizer = sizer();
s.write(sizer);
- sizer.flush();
- size = sizer.getSize();
+ writeSize(width, sizer.size());
}
+ }
- writeLong(size);
+ s.write(this);
+ }
+
+ public void writeLongStruct(Struct s)
+ {
+ if (s == null)
+ {
+ writeLong(0);
+ }
+ else
+ {
+ Sizer sizer = sizer();
+ sizer.writeShort(s.getEncodedType());
+ s.write(sizer);
+ writeLong(sizer.size());
writeShort(s.getEncodedType());
s.write(this);
}
@@ -308,15 +327,10 @@ abstract class AbstractEncoder implements Encoder
return;
}
- int size = 0;
- if (calcsize)
- {
- SizeEncoder sizer = new SizeEncoder(major, minor);
- sizer.writeTableEntries(table);
- size = sizer.getSize();
- }
-
- writeLong(size);
+ Sizer sizer = sizer();
+ sizer.writeTable(table);
+ // XXX: - 4
+ writeLong(sizer.size() - 4);
writeTableEntries(table);
}
@@ -335,15 +349,10 @@ abstract class AbstractEncoder implements Encoder
public void writeSequence(List<Object> sequence)
{
- int size = 0;
- if (calcsize)
- {
- SizeEncoder sizer = new SizeEncoder(major, minor);
- sizer.writeSequenceEntries(sequence);
- size = sizer.getSize();
- }
-
- writeLong(size);
+ Sizer sizer = sizer();
+ sizer.writeSequence(sequence);
+ // XXX: - 4
+ writeLong(sizer.size() - 4);
writeSequenceEntries(sequence);
}
@@ -359,15 +368,10 @@ abstract class AbstractEncoder implements Encoder
public void writeArray(List<Object> array)
{
- int size = 0;
- if (calcsize)
- {
- SizeEncoder sizer = new SizeEncoder(major, minor);
- sizer.writeArrayEntries(array);
- size = sizer.getSize();
- }
-
- writeLong(size);
+ Sizer sizer = sizer();
+ sizer.writeArray(array);
+ // XXX: -4
+ writeLong(sizer.size() - 4);
writeArrayEntries(array);
}
@@ -405,21 +409,26 @@ abstract class AbstractEncoder implements Encoder
}
else
{
- // XXX: should check lengths
- switch (t.width)
- {
- case 1:
- writeOctet((short) size);
- break;
- case 2:
- writeShort(size);
- break;
- case 4:
- writeLong(size);
- break;
- default:
- throw new IllegalStateException("irregular width: " + t);
- }
+ writeSize(t.width, size);
+ }
+ }
+
+ private void writeSize(int width, int size)
+ {
+ // XXX: should check lengths
+ switch (width)
+ {
+ case 1:
+ writeOctet((short) size);
+ break;
+ case 2:
+ writeShort(size);
+ break;
+ case 4:
+ writeLong(size);
+ break;
+ default:
+ throw new IllegalStateException("illegal width: " + width);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java
index 36cc13b7db..f0738e0a91 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java
@@ -53,6 +53,7 @@ public interface Decoder
String readContent();
+ Struct readStruct(int type);
Struct readLongStruct();
Map<String,Object> readTable();
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java
index 5490bdd904..1b2fe0213e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java
@@ -55,6 +55,7 @@ public interface Encoder
void writeContent(String c);
+ void writeStruct(int type, Struct s);
void writeLongStruct(Struct s);
void writeTable(Map<String,Object> table);
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java
index 4f39709554..aba269ef08 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java
@@ -31,7 +31,7 @@ import java.util.Map;
* @author Rafael H. Schloming
*/
-public class SizeEncoder extends AbstractEncoder
+public class SizeEncoder extends AbstractEncoder implements Sizer
{
private int size;
@@ -41,10 +41,15 @@ public class SizeEncoder extends AbstractEncoder
}
public SizeEncoder(byte major, byte minor, int size) {
- super(major, minor, false);
+ super(major, minor);
this.size = size;
}
+ protected Sizer sizer()
+ {
+ return Sizer.NULL;
+ }
+
public int getSize() {
return size;
}
@@ -53,6 +58,12 @@ public class SizeEncoder extends AbstractEncoder
this.size = size;
}
+ public int size()
+ {
+ flush();
+ return getSize();
+ }
+
protected void doPut(byte b)
{
size += 1;
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java
new file mode 100644
index 0000000000..b98bf98239
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.codec;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Struct;
+
+
+/**
+ * Sizer
+ *
+ */
+
+public interface Sizer extends Encoder
+{
+
+ public static final Sizer NULL = new Sizer()
+ {
+ public void flush() {};
+
+ public void writeBit(boolean b) {};
+ public void writeOctet(short b) {};
+ public void writeShort(int s) {};
+ public void writeLong(long i) {};
+ public void writeLonglong(long l) {};
+
+ public void writeTimestamp(long l) {};
+
+ public void writeShortstr(String s) {};
+ public void writeLongstr(String s) {};
+
+ public void writeRfc1982LongSet(RangeSet ranges) {};
+ public void writeUuid(UUID uuid) {};
+
+ public void writeContent(String c) {};
+
+ public void writeStruct(int type, Struct s) {};
+ public void writeLongStruct(Struct s) {};
+
+ public void writeTable(Map<String,Object> table) {};
+ public void writeSequence(List<Object> sequence) {};
+ public void writeArray(List<Object> array) {};
+
+ public int getSize() { return 0; }
+
+ public int size() { return 0; }
+ };
+
+ int getSize();
+
+ int size();
+
+}
diff --git a/qpid/python/hello-world b/qpid/python/hello-world
index b3170c8e0c..5ba14b0fc1 100755
--- a/qpid/python/hello-world
+++ b/qpid/python/hello-world
@@ -3,14 +3,25 @@ import qpid
from qpid.client import Client
from qpid.content import Content
-client = Client("127.0.0.1", 5672, qpid.spec.load("../specs/amqp.0-10-preview.xml"))
+spec = qpid.spec.load("../specs/amqp.0-10-preview.xml")
+client = Client("127.0.0.1", 5672, spec)
client.start({"LOGIN": "guest", "PASSWORD": "guest"})
ch = client.channel(1)
ch.session_open()
ch.queue_declare(queue="test")
ch.queue_bind(exchange="amq.direct", queue="test", routing_key="test")
-print ch.queue_query(queue="test")
-ch.message_subscribe(queue="test", destination="test")
+#print ch.queue_query(queue="test")
+ch.message_subscribe(queue="test", destination="amq.direct")
+ch.message_flow("amq.direct", 0, 0xFFFFFFFF)
+ch.message_flow("amq.direct", 1, 0xFFFFFFFF)
msg = Content("hello world")
msg["content_type"] = "text/plain"
+msg["routing_key"] = "test"
+msg["reply_to"] = spec.struct("reply_to")
+msg["reply_to"].exchange_name = "asdf"
+msg["reply_to"].routing_key = "fdsa"
+msg["application_headers"] = {"x": 1, "y": 2, "z": "zee"}
ch.message_transfer(destination="amq.direct", content=msg)
+queue = client.queue("amq.direct")
+msg = queue.get(timeout=10)
+print msg
diff --git a/qpid/python/qpid/__init__.py b/qpid/python/qpid/__init__.py
index 3f6d82b89e..7afebaf73b 100644
--- a/qpid/python/qpid/__init__.py
+++ b/qpid/python/qpid/__init__.py
@@ -31,8 +31,12 @@ class Struct:
raise AttributeError(attr)
return field
- def has(self, name):
- return self.type.fields.byname.has_key(name)
+ def exists(self, attr):
+ return self.type.fields.byname.has_key(attr)
+
+ def has(self, attr):
+ self._check(attr)
+ return self._values.has_key(attr)
def set(self, attr, value):
self._check(attr)
@@ -42,17 +46,30 @@ class Struct:
field = self._check(attr)
return self._values.get(attr, field.default())
+ def clear(self, attr):
+ self._check(attr)
+ del self._values[attr]
+
def __setattr__(self, attr, value):
self.set(attr, value)
def __getattr__(self, attr):
return self.get(attr)
+ def __delattr__(self, attr):
+ self.clear(attr)
+
def __setitem__(self, attr, value):
self.set(attr, value)
def __getitem__(self, attr):
return self.get(attr)
+ def __delitem__(self, attr):
+ self.clear(attr)
+
def __str__(self):
- return "%s %s" % (self.type.type, self._values)
+ return "%s %s" % (self.type, self._values)
+
+ def __repr__(self):
+ return str(self)
diff --git a/qpid/python/qpid/codec.py b/qpid/python/qpid/codec.py
index f6b0f980cb..c54d48df52 100644
--- a/qpid/python/qpid/codec.py
+++ b/qpid/python/qpid/codec.py
@@ -94,6 +94,7 @@ class Codec:
"""
reads in 'n' bytes from the stream. Can raise EOF exception
"""
+ self.clearbits()
data = self.stream.read(n)
if n > 0 and len(data) == 0:
raise EOF()
@@ -130,6 +131,10 @@ class Codec:
for byte in bytes:
self.encode_octet(byte)
+ def clearbits(self):
+ if self.incoming_bits:
+ self.incoming_bits = []
+
def pack(self, fmt, *args):
"""
packs the data 'args' as per the format 'fmt' and writes it to the stream
@@ -237,6 +242,12 @@ class Codec:
"""
return self.unpack("!L")
+ def encode_signed_long(self, o):
+ self.pack("!q", o)
+
+ def decode_signed_long(self):
+ return self.unpack("!q")
+
def encode_longlong(self, o):
"""
encodes long long (64 bits) data 'o' in network byte order
@@ -332,14 +343,8 @@ class Codec:
w = width(code)
if fixed(code):
value = self.read(w)
- elif w == 1:
- value = self.decode_shortstr()
- elif w == 2:
- value = self.dec_str("!H")
- elif w == 4:
- value = self.decode_longstr()
else:
- raise ValueError("illegal width: " + w)
+ value = self.read(self.dec_num(w))
result[key] = value
return result
@@ -409,19 +414,88 @@ class Codec:
def decode_uuid(self):
return self.unpack("16s")
+ def enc_num(self, width, n):
+ if width == 1:
+ self.encode_octet(n)
+ elif width == 2:
+ self.encode_short(n)
+ elif width == 3:
+ self.encode_long(n)
+ else:
+ raise ValueError("invalid width: %s" % width)
+
+ def dec_num(self, width):
+ if width == 1:
+ return self.decode_octet()
+ elif width == 2:
+ return self.decode_short()
+ elif width == 4:
+ return self.decode_long()
+ else:
+ raise ValueError("invalid width: %s" % width)
+
def encode_struct(self, type, s):
+ if False and type.size:
+ enc = StringIO()
+ codec = Codec(enc, self.spec)
+ codec.encode_struct_body(type, s)
+ codec.flush()
+ body = enc.getvalue()
+ self.enc_num(type.size, len(body))
+ self.write(body)
+ else:
+ self.encode_struct_body(type, s)
+
+ def decode_struct(self, type):
+ if False and type.size:
+ size = self.dec_num(type.size)
+ if size == 0:
+ return None
+ return self.decode_struct_body(type)
+
+ def encode_struct_body(self, type, s):
+ reserved = 8*type.pack - len(type.fields)
+ assert reserved >= 0
+
for f in type.fields:
if s == None:
- val = f.default()
+ self.encode_bit(False)
+ elif f.type == "bit":
+ self.encode_bit(s.get(f.name))
else:
- val = s.get(f.name)
- self.encode(f.type, val)
+ self.encode_bit(s.has(f.name))
+
+ for i in range(reserved):
+ self.encode_bit(False)
+
+ for f in type.fields:
+ if f.type != "bit" and s != None and s.has(f.name):
+ self.encode(f.type, s.get(f.name))
+
self.flush()
- def decode_struct(self, type):
+ def decode_struct_body(self, type):
+ reserved = 8*type.pack - len(type.fields)
+ assert reserved >= 0
+
s = qpid.Struct(type)
+
for f in type.fields:
- s.set(f.name, self.decode(f.type))
+ if f.type == "bit":
+ s.set(f.name, self.decode_bit())
+ elif self.decode_bit():
+ s.set(f.name, None)
+
+ for i in range(reserved):
+ if self.decode_bit():
+ raise ValueError("expecting reserved flag")
+
+ for f in type.fields:
+ if f.type != "bit" and s.has(f.name):
+ s.set(f.name, self.decode(f.type))
+
+ self.clearbits()
+
return s
def encode_long_struct(self, s):
@@ -429,13 +503,13 @@ class Codec:
codec = Codec(enc, self.spec)
type = s.type
codec.encode_short(type.type)
- codec.encode_struct(type, s)
+ codec.encode_struct_body(type, s)
self.encode_longstr(enc.getvalue())
def decode_long_struct(self):
codec = Codec(StringIO(self.decode_longstr()), self.spec)
type = self.spec.structs[codec.decode_short()]
- return codec.decode_struct(type)
+ return codec.decode_struct_body(type)
def fixed(code):
return (code >> 6) != 2
@@ -454,9 +528,9 @@ def width(code):
raise ValueError(code)
# variable width
elif code < 192 and code >= 128:
- lenlen = (self.code >> 4) & 3
+ lenlen = (code >> 4) & 3
if lenlen == 3: raise ValueError(code)
return 2 ** lenlen
# fixed width
else:
- return (self.code >> 4) & 7
+ return (code >> 4) & 7
diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py
index 15d7e69945..1beb60822d 100644
--- a/qpid/python/qpid/connection.py
+++ b/qpid/python/qpid/connection.py
@@ -163,7 +163,10 @@ class Connection:
body_size = frame_size - 12 # TODO: Magic number (frame header size)
body = c.read(body_size)
dec = codec.Codec(StringIO(body), self.spec)
- frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
+ try:
+ frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
+ except EOF:
+ raise "truncated frame body: %r" % body
frame.channel = channel
frame.subchannel = subchannel
end = c.decode_octet()
@@ -350,7 +353,7 @@ class Header(Frame):
props = self.properties.copy()
for k in self.properties:
for s in structs:
- if s.has(k):
+ if s.exists(k):
s.set(k, props.pop(k))
if props:
raise TypeError("no such property: %s" % (", ".join(props)))
diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py
index 5c05e71cf1..b734031798 100644
--- a/qpid/python/qpid/peer.py
+++ b/qpid/python/qpid/peer.py
@@ -130,8 +130,8 @@ class Peer:
content = None
self.delegate(channel, Message(channel, frame, content))
- except QueueClosed, e:
- self.close(e)
+ except QueueClosed:
+ self.close("worker closed")
except:
self.fatal()
diff --git a/qpid/python/qpid/spec.py b/qpid/python/qpid/spec.py
index 2542ccc3e1..31c79276aa 100644
--- a/qpid/python/qpid/spec.py
+++ b/qpid/python/qpid/spec.py
@@ -29,7 +29,7 @@ class so that the generated code can be reused in a variety of
situations.
"""
-import re, textwrap, new, mllib
+import re, textwrap, new, mllib, qpid
class SpecContainer:
@@ -115,6 +115,10 @@ class Spec(Metadata):
klass, meth = parts
return self.classes.byname[klass].methods.byname[meth]
+ def struct(self, name):
+ type = self.domains.byname[name].type
+ return qpid.Struct(type)
+
def define_module(self, name, doc = None):
module = new.module(name, doc)
module.__file__ = self.file
@@ -303,14 +307,26 @@ class Field(Metadata):
else:
return Method.DEFAULTS[self.type]
+WIDTHS = {
+ "octet": 1,
+ "short": 2,
+ "long": 4
+ }
+
+def width(st, default=None):
+ if st in (None, "none", ""):
+ return default
+ else:
+ return WIDTHS[st]
+
def get_result(nd, spec):
result = nd["result"]
if not result: return None
name = result["@domain"]
if name != None: return spec.domains.byname[name]
st_nd = result["struct"]
- st = Struct(st_nd["@size"], int(result.parent.parent["@index"])*256 +
- int(st_nd["@type"]), st_nd["@pack"])
+ st = Struct(width(st_nd["@size"]), int(result.parent.parent["@index"])*256 +
+ int(st_nd["@type"]), width(st_nd["@pack"], 2))
spec.structs[st.type] = st
load_fields(st_nd, st.fields, spec.domains.byname)
return st
@@ -366,7 +382,7 @@ def load(specfile, *errata):
code = st_nd["@type"]
if code not in (None, "", "none"):
code = int(code)
- type = Struct(st_nd["@size"], code, st_nd["@pack"])
+ type = Struct(width(st_nd["@size"]), code, width(st_nd["@pack"], 2))
if type.type != None:
spec.structs[type.type] = type
structs.append((type, st_nd))
diff --git a/qpid/python/tests/spec.py b/qpid/python/tests/spec.py
index c00faad3ba..ce03640493 100644
--- a/qpid/python/tests/spec.py
+++ b/qpid/python/tests/spec.py
@@ -32,13 +32,13 @@ class SpecTest(TestCase):
if (spec.major == 0 and spec.minor == 10):
assert qdecl_ok == None
reply_to = spec.domains.byname["reply_to"]
- assert reply_to.type.size == "short"
- assert reply_to.type.pack == "short"
+ assert reply_to.type.size == 2
+ assert reply_to.type.pack == 2
assert len(reply_to.type.fields) == 2
qq = spec.method("queue_query")
assert qq != None
- assert qq.result.size == "long"
+ assert qq.result.size == 4
assert qq.result.type != None
args = qq.result.fields.byname["arguments"]
assert args.type == "table"
diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py
index c414b15214..8212c7be67 100644
--- a/qpid/python/tests_0-10/message.py
+++ b/qpid/python/tests_0-10/message.py
@@ -437,23 +437,25 @@ class MessageTests(TestBase):
channel.message_subscribe(queue = "q", destination = "c")
channel.message_flow_mode(mode = 0, destination = "c")
#send batch of messages to queue
- for i in range(1, 11):
+ for i in range(10):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
- #each message is currently interpreted as requiring 75 bytes of credit
+ #each message is currently interpreted as requiring msg_size bytes of credit
+ msg_size = 40
+
#set byte credit to finite amount (less than enough for all messages)
- channel.message_flow(unit = 1, value = 75*5, destination = "c")
+ channel.message_flow(unit = 1, value = msg_size*5, destination = "c")
#set infinite message credit
channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
#check that expected number were received
q = self.client.queue("c")
- for i in range(1, 6):
+ for i in range(5):
self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
self.assertEmpty(q)
-
+
#increase credit again and check more are received
- for i in range(6, 11):
- channel.message_flow(unit = 1, value = 75, destination = "c")
+ for i in range(5):
+ channel.message_flow(unit = 1, value = msg_size, destination = "c")
self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
self.assertEmpty(q)
@@ -501,25 +503,27 @@ class MessageTests(TestBase):
channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
channel.message_flow_mode(mode = 1, destination = "c")
#send batch of messages to queue
- for i in range(1, 11):
+ for i in range(10):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
- #each message is currently interpreted as requiring 75 bytes of credit
+ #each message is currently interpreted as requiring msg_size bytes of credit
+ msg_size = 40
+
#set byte credit to finite amount (less than enough for all messages)
- channel.message_flow(unit = 1, value = 75*5, destination = "c")
+ channel.message_flow(unit = 1, value = msg_size*5, destination = "c")
#set infinite message credit
channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
#check that expected number were received
q = self.client.queue("c")
msgs = []
- for i in range(1, 6):
+ for i in range(5):
msg = q.get(timeout = 1)
msgs.append(msg)
self.assertDataEquals(channel, msg, "abcdefgh")
self.assertEmpty(q)
#ack each message individually and check more are received
- for i in range(6, 11):
+ for i in range(5):
msg = msgs.pop()
msg.complete(cumulative=False)
self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")