diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2007-10-14 02:21:59 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2007-10-14 02:21:59 +0000 |
| commit | e0ba5becd0052cabe0cfa997dd35d7362bf2c472 (patch) | |
| tree | 5c55f1dd6be53c4032be45ba422dacca1ded197f | |
| parent | 2a10deef36eec1b8df9ec52dfb44a72eee7059a8 (diff) | |
| download | qpid-python-e0ba5becd0052cabe0cfa997dd35d7362bf2c472.tar.gz | |
Enabled packed struct encoding in python, cpp, and java. Also fixed computation of required byte credit in Message.cpp.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@584474 13f79535-47bb-0310-9956-ffa450edef68
24 files changed, 576 insertions, 211 deletions
diff --git a/qpid/cpp/rubygen/amqpgen.rb b/qpid/cpp/rubygen/amqpgen.rb index 21bc05c651..5952b494df 100755 --- a/qpid/cpp/rubygen/amqpgen.rb +++ b/qpid/cpp/rubygen/amqpgen.rb @@ -41,10 +41,25 @@ class Module # Add attribute reader for XML attribute. def amqp_attr_reader(*attrs) attrs.each { |a| - define_method(mangle(a)) { - @amqp_attr_reader||={ } - @amqp_attr_reader[a] ||= xml.attributes[a.to_s] - } + case a + when Symbol + define_method(mangle(a)) { + @amqp_attr_reader||={ } + @amqp_attr_reader[a] ||= xml.attributes[a.to_s] + } + when Hash + a.each { |attr, default| + define_method(mangle(attr)) { + @amqp_attr_reader||={ } + value = xml.attributes[attr.to_s] + if value + @amqp_attr_reader[attr] ||= value + else + @amqp_attr_reader[attr] ||= default + end + } + } + end } end @@ -151,7 +166,7 @@ end class AmqpStruct < AmqpElement def initialize(xml, parent) super; end - amqp_attr_reader :size, :type, :pack + amqp_attr_reader :size, :type, :pack => "short" amqp_child_reader :field def result?() parent.xml.name == "result"; end diff --git a/qpid/cpp/rubygen/templates/structs.rb b/qpid/cpp/rubygen/templates/structs.rb index 15962af4df..2f5a3d8365 100644 --- a/qpid/cpp/rubygen/templates/structs.rb +++ b/qpid/cpp/rubygen/templates/structs.rb @@ -23,17 +23,17 @@ class StructGen < CppGen "long-struct"=>"LongString" } SizeMap={ - "octet"=>"1", - "short"=>"2", - "long"=>"4", - "longlong"=>"8", - "timestamp"=>"8" + "octet"=>1, + "short"=>2, + "long"=>4, + "longlong"=>8, + "timestamp"=>8 } ValueTypes=["octet", "short", "long", "longlong", "timestamp"] def is_packed(s) - false and s.kind_of? AmqpStruct and s.pack + s.kind_of? AmqpStruct end def execution_header?(s) @@ -62,55 +62,60 @@ class StructGen < CppGen end end + def flag_mask(s, i) + pos = SizeMap[s.pack]*8 - 8 - (i/8)*8 + (i % 8) + return "(1 << #{pos})" + end + def get_flags_impl(s) genl "#{s.cpp_pack_type.name} flags = 0;" - process_packed_fields(s) { |f, i| set_field_flag(f, i) } + process_packed_fields(s) { |f, i| set_field_flag(s, f, i) } genl "return flags;" end - def set_field_flag(f, i) + def set_field_flag(s, f, i) if (ValueTypes.include?(f.domain.type_) || f.domain.type_ == "bit") - genl "if (#{f.cppname}) flags |= (1 << #{i});" + genl "if (#{f.cppname}) flags |= #{flag_mask(s, i)};" else - genl "if (#{f.cppname}.size()) flags |= (1 << #{i});" + genl "if (#{f.cppname}.size()) flags |= #{flag_mask(s, i)};" end end def encode_packed_struct(s) genl "#{s.cpp_pack_type.name} flags = getFlags();" genl s.cpp_pack_type.encode('flags', 'buffer') - process_packed_fields(s) { |f, i| encode_packed_field(f, i) unless f.domain.type_ == "bit" } + process_packed_fields(s) { |f, i| encode_packed_field(s, f, i) unless f.domain.type_ == "bit" } end def decode_packed_struct(s) genl "#{s.cpp_pack_type.name} #{s.cpp_pack_type.decode('flags', 'buffer')}" - process_packed_fields(s) { |f, i| decode_packed_field(f, i) unless f.domain.type_ == "bit" } - process_packed_fields(s) { |f, i| set_bitfield(f, i) if f.domain.type_ == "bit" } + process_packed_fields(s) { |f, i| decode_packed_field(s, f, i) unless f.domain.type_ == "bit" } + process_packed_fields(s) { |f, i| set_bitfield(s, f, i) if f.domain.type_ == "bit" } end def size_packed_struct(s) genl "#{s.cpp_pack_type.name} flags = getFlags();" unless has_bitfields_only(s) genl "total += #{SizeMap[s.pack]};" - process_packed_fields(s) { |f, i| size_packed_field(f, i) unless f.domain.type_ == "bit" } + process_packed_fields(s) { |f, i| size_packed_field(s, f, i) unless f.domain.type_ == "bit" } end - def encode_packed_field(f, i) - genl "if (flags & (1 << #{i}))" + def encode_packed_field(s, f, i) + genl "if (flags & #{flag_mask(s, i)})" indent { genl f.domain.cpptype.encode(f.cppname,"buffer") } end - def decode_packed_field(f, i) - genl "if (flags & (1 << #{i}))" + def decode_packed_field(s, f, i) + genl "if (flags & #{flag_mask(s, i)})" indent { genl f.domain.cpptype.decode(f.cppname,"buffer") } end - def size_packed_field(f, i) - genl "if (flags & (1 << #{i}))" + def size_packed_field(s, f, i) + genl "if (flags & #{flag_mask(s, i)})" indent { generate_size(f, []) } end - def set_bitfield(f, i) - genl "#{f.cppname} = (flags & (1 << #{i}));" + def set_bitfield(s, f, i) + genl "#{f.cppname} = (flags & #{flag_mask(s, i)});" end def generate_encode(f, combined) diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 3dbd192d3c..5d572283ce 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -75,7 +75,7 @@ uint32_t Message::getRequiredCredit() const { //add up payload for all header and content frames in the frameset SumBodySize sum; - frames.map_if(sum, TypeFilter(METHOD_BODY, HEADER_BODY)); + frames.map_if(sum, TypeFilter(HEADER_BODY, CONTENT_BODY)); return sum.getSize(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index dbe01a96c6..08eba25d16 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -148,7 +148,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()}; // if there is a replyto destination then we need to request the exchange info - if (!message.getMessageProperties().getReplyTo().getExchangeName().equals("")) + ReplyTo replyTo = message.getMessageProperties().getReplyTo(); + if (replyTo != null && + replyTo.getExchangeName() != null && + !replyTo.getExchangeName().equals("")) { Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession() .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 9313149442..e037c3ed3e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -127,7 +127,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory props.setContentType(mprop.getContentType()); props.setCorrelationId(mprop.getCorrelationId()); String encoding = mprop.getContentEncoding(); - if (!encoding.equals("")) + if (encoding != null && !encoding.equals("")) { props.setEncoding(encoding); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java index 98c893eddd..75e50ee09b 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java @@ -203,12 +203,12 @@ public class MessageListenerMultiConsumerTest extends QpidTestCase for (int msg = 0; msg < (MSG_COUNT / 2); msg++) { - assertTrue(_consumer1.receive() != null); + assertTrue(_consumer1.receive(3000) != null); } for (int msg = 0; msg < (MSG_COUNT / 2); msg++) { - assertTrue(consumer2.receive() != null); + assertTrue(consumer2.receive(3000) != null); } } else @@ -218,12 +218,12 @@ public class MessageListenerMultiConsumerTest extends QpidTestCase for (int msg = 0; msg < (MSG_COUNT / 2); msg++) { - assertTrue(_consumer1.receive() != null); + assertTrue(_consumer1.receive(3000) != null); } for (int msg = 0; msg < (MSG_COUNT / 2); msg++) { - assertTrue(_consumer2.receive() != null); + assertTrue(_consumer2.receive(3000) != null); } } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java index b151aea8ae..c2524c366c 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java @@ -199,19 +199,27 @@ public class ChannelCloseOkTest extends QpidTestCase private void waitFor(List<Message> received, int count) throws InterruptedException { + long timeout = 3000; + synchronized (received) { + long start = System.currentTimeMillis(); while (received.size() < count) { + if (System.currentTimeMillis() - start > timeout) + { + fail("timeout expired waiting for messages"); + } try { - received.wait(); + received.wait(timeout); } catch (InterruptedException e) { _log.info("Interrupted: " + e); throw e; } + } } } diff --git a/qpid/java/common/generate b/qpid/java/common/generate index f3a53ee8da..701efe03a9 100755 --- a/qpid/java/common/generate +++ b/qpid/java/common/generate @@ -11,6 +11,12 @@ spec_file = sys.argv[3] spec = mllib.xml_parse(spec_file) +def jbool(b): + if b: + return "true" + else: + return "false" + class Output: def __init__(self, dir, package, name): @@ -37,11 +43,12 @@ class Output: def line(self, l = ""): self.lines.append(l) - def getter(self, type, method, variable, pre = None): + def getter(self, type, method, value, pre = None): + self.line() self.line(" public final %s %s() {" % (type, method)) if pre: self.line(" %s;" % pre) - self.line(" return %s;" % variable) + self.line(" return %s;" % value) self.line(" }") def setter(self, type, method, variable, value = None, pre = None, @@ -52,6 +59,7 @@ class Output: params = "%s value" % type value = "value" + self.line() self.line(" public final %s %s(%s) {" % (self.name, method, params)) if pre: self.line(" %s;" % pre) @@ -301,37 +309,15 @@ class Struct: def impl(self, out): out.line("public class %s extends %s {" % (self.name, self.base)) - if self.type != None: - out.line() - out.line(" public static final int TYPE = %d;" % self.type) - - out.line() - if self.type == None: - pre = "if (true) throw new UnsupportedOperationException()" - value = "0" - else: - pre = None - value = "TYPE" - out.getter("int", "getEncodedType", value, pre = pre) - out.line() + out.line(" public static final int TYPE = %d;" % self.type) + out.getter("int", "getStructType", "TYPE") out.getter("int", "getSizeWidth", self.size) - out.line() out.getter("int", "getPackWidth", self.pack) - - if self.ticket: - out.getter("boolean", "hasTicket", "true") - else: - out.getter("boolean", "hasTicket", "false"); + out.getter("boolean", "hasTicket", jbool(self.ticket)) if self.base == "Method": - out.line() - if self.content: - out.getter("boolean", "hasPayload", "true") - else: - out.getter("boolean", "hasPayload", "false") - - out.line() + out.getter("boolean", "hasPayload", jbool(self.content)) out.getter("byte", "getEncodedTrack", self.track) out.line() @@ -382,7 +368,6 @@ class Struct: index = 0 for type, name in self.fields: - out.line() out.getter("boolean", camel(1, "has", name), "has_" + name) out.setter("boolean", camel(1, "clear", name), "has_" + name, "false", post = "this.%s = %s" % (name, DEFAULTS.get(type, "null"))) @@ -409,8 +394,8 @@ class Struct: if TYPES.has_key(type): out.line(' check(struct).%s = dec.read%s();' % (name, camel(0, type))) elif STRUCTS.has_key(type): - out.line(' check(struct).%s = new %s();' % (name, STRUCTS[type])) - out.line(' check(struct).%s.read(dec);' % name) + out.line(' check(struct).%s = (%s) dec.readStruct(%s.TYPE);' % + (name, STRUCTS[type], STRUCTS[type])) else: raise Exception("unknown type: %s" % type) out.line(' }') @@ -418,11 +403,8 @@ class Struct: if TYPES.has_key(type): out.line(' enc.write%s(check(struct).%s);' % (camel(0, type), name)) elif STRUCTS.has_key(type): - out.line(" if (check(struct).%s == null) {" % name) - out.line(" new %s().write(enc);" % jtype(type)) - out.line(" } else {") - out.line(' check(struct).%s.write(enc);' % name) - out.line(" }") + out.line(' enc.writeStruct(%s.TYPE, check(struct).%s);' % + (STRUCTS[type], name)) else: raise Exception("unknown type: %s" % type) out.line(' }') @@ -473,6 +455,7 @@ class Visitor(mllib.transforms.Visitor): def __init__(self): self.structs = [] + self.untyped = -1 def do_method(self, m): if CLASSES.get(m.parent["@name"], True): @@ -487,11 +470,12 @@ class Visitor(mllib.transforms.Visitor): name = camel(0, d["@name"]) st = s["@type"] if st in (None, "none", ""): - type = None + type = self.untyped + self.untyped -= 1 else: type = int(st) - self.structs.append((name, "Struct", type, SIZE_WIDTHS[s["size"]], - PACK_WIDTHS[s["pack"]], s)) + self.structs.append((name, "Struct", type, SIZE_WIDTHS[s["@size"]], + PACK_WIDTHS[s["@pack"]], s)) self.descend(d) def do_result(self, r): @@ -499,8 +483,8 @@ class Visitor(mllib.transforms.Visitor): if s: name = camel(0, r.parent.parent["@name"], r.parent["@name"], "Result") type = int(r.parent.parent["@index"]) * 256 + int(s["@type"]) - self.structs.append((name, "Result", type, SIZE_WIDTHS[s["size"]], - PACK_WIDTHS[s["pack"]], s)) + self.structs.append((name, "Result", type, SIZE_WIDTHS[s["@size"]], + PACK_WIDTHS[s["@pack"]], s)) self.descend(r) v = Visitor() @@ -540,7 +524,6 @@ fct.line("class StructFactory {") fct.line(" public static Struct create(int type) {") fct.line(" switch (type) {") for s in structs: - if s.type == None: continue fct.line(" case %s.TYPE:" % s.name) fct.line(" return new %s();" % s.name) fct.line(" default:") diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java new file mode 100644 index 0000000000..03a684dd47 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.qpidity.transport.network.mina.MinaHandler; + + +/** + * Echo + * + */ + +public class Echo extends SessionDelegate +{ + + private MessageTransfer xfr = null; + + public void messageTransfer(Session ssn, MessageTransfer xfr) + { + this.xfr = xfr; + ssn.invoke(xfr); + } + + public void header(Session ssn, Header hdr) + { + ssn.header(hdr); + } + + public void data(Session ssn, Data data) + { + for (ByteBuffer buf : data.getFragments()) + { + ssn.data(buf); + } + if (data.isLast()) + { + ssn.endData(); + } + + // XXX: should be able to get command-id from any segment + ssn.processed(xfr); + } + + public static final void main(String[] args) throws IOException + { + ConnectionDelegate delegate = new ConnectionDelegate() + { + public SessionDelegate getSessionDelegate() + { + return new Echo(); + } + }; + + //hack + delegate.setUsername("guest"); + delegate.setPassword("guest"); + + MinaHandler.accept("0.0.0.0", 5672, delegate); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java index b87512284f..d13af88127 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java @@ -41,14 +41,24 @@ public abstract class Struct implements Encodable return StructFactory.create(type); } - public abstract List<Field<?,?>> getFields(); + public abstract int getStructType(); - public abstract int getEncodedType(); + public abstract List<Field<?,?>> getFields(); public abstract int getSizeWidth(); public abstract int getPackWidth(); + public final int getEncodedType() + { + int type = getStructType(); + if (type < 0) + { + throw new UnsupportedOperationException(); + } + return type; + } + public abstract boolean hasTicket(); private final boolean isBit(Field<?,?> f) @@ -56,11 +66,21 @@ public abstract class Struct implements Encodable return f.getType().equals(Boolean.class); } + private final boolean packed() + { + if (this instanceof Method) + { + return false; + } + else + { + return true; + } + } + private final boolean encoded(Field<?,?> f) { - // XXX: remove to enable packed encoding - if (true) { return true; } - return !isBit(f) && f.has(this); + return !packed() || !isBit(f) && f.has(this); } private final int getFlagWidth() @@ -75,14 +95,23 @@ public abstract class Struct implements Encodable return pw; } + private final int getFlagCount() + { + return 8*getPackWidth(); + } + + private final int getReservedFlagCount() + { + return getFlagCount() - getFields().size(); + } + public final void read(Decoder dec) { List<Field<?,?>> fields = getFields(); - assert fields.size() <= 8*getPackWidth(); + assert fields.size() <= getFlagCount(); - // XXX: remove to enable packed encoding - if (false) + if (packed()) { for (Field<?,?> f : fields) { @@ -97,12 +126,11 @@ public abstract class Struct implements Encodable } } - for (int i = 0; i < getPaddWidth(); i++) + for (int i = 0; i < getReservedFlagCount(); i++) { - short padd = dec.readOctet(); - if (padd != 0x0) + if (dec.readBit()) { - throw new IllegalStateException("urecognized value in reserved bytes: " + padd); + throw new IllegalStateException("reserved flag true"); } } } @@ -125,10 +153,9 @@ public abstract class Struct implements Encodable { List<Field<?,?>> fields = getFields(); - assert fields.size() <= 8*getPackWidth(); + assert fields.size() <= getFlagCount(); - // XXX: remove to enable packed encoding - if (false) + if (packed()) { for (Field<?,?> f : fields) { @@ -142,9 +169,9 @@ public abstract class Struct implements Encodable } } - for (int i = 0; i < getPaddWidth(); i++) + for (int i = 0; i < getReservedFlagCount(); i++) { - enc.writeOctet((short) 0x0); + enc.writeBit(false); } } @@ -171,8 +198,7 @@ public abstract class Struct implements Encodable boolean first = true; for (Field<?,?> f : getFields()) { - // XXX: remove when packed encoding is enabled - if (false) + if (packed()) { if (!f.has(this)) { diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java index e5997d6642..0e06b9e88c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java @@ -186,6 +186,22 @@ abstract class AbstractDecoder implements Decoder throw new Error("Deprecated"); } + public Struct readStruct(int type) + { + Struct st = Struct.create(type); + int width = st.getSizeWidth(); + if (false && width > 0) + { + long size = readSize(width); + if (size == 0) + { + return null; + } + } + st.read(this); + return st; + } + public Struct readLongStruct() { long size = readLong(); @@ -270,17 +286,22 @@ abstract class AbstractDecoder implements Decoder } else { - switch (t.width) - { - case 1: - return readOctet(); - case 2: - return readShort(); - case 4: - return readLong(); - default: - throw new IllegalStateException("irregular width: " + t); - } + return readSize(t.width); + } + } + + private long readSize(int width) + { + switch (width) + { + case 1: + return readOctet(); + case 2: + return readShort(); + case 4: + return readLong(); + default: + throw new IllegalStateException("illegal width: " + width); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java index 06e44fb1c6..e2b5cd41b8 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java @@ -61,20 +61,19 @@ abstract class AbstractEncoder implements Encoder ENCODINGS.put(byte[].class, Type.LONG_BINARY); } + // XXX: no longer need major/minor private final byte major; private final byte minor; - private final boolean calcsize; - protected AbstractEncoder(byte major, byte minor, boolean calcsize) + protected AbstractEncoder(byte major, byte minor) { this.major = major; this.minor = minor; - this.calcsize = calcsize; } - protected AbstractEncoder(byte major, byte minor) + protected Sizer sizer() { - this(major, minor, true); + return new SizeEncoder(major, minor); } protected abstract void doPut(byte b); @@ -224,25 +223,45 @@ abstract class AbstractEncoder implements Encoder throw new Error("Deprecated"); } - public void writeLongStruct(Struct s) + public void writeStruct(int type, Struct s) { + boolean empty = false; if (s == null) { - writeLong(0); + s = Struct.create(type); + empty = true; } - else + + int width = s.getSizeWidth(); + if (false && width > 0) { - int size = 0; - if (calcsize) + if (empty) { - SizeEncoder sizer = new SizeEncoder(major, minor); - sizer.writeShort(s.getEncodedType()); + writeSize(width, 0); + } + else + { + Sizer sizer = sizer(); s.write(sizer); - sizer.flush(); - size = sizer.getSize(); + writeSize(width, sizer.size()); } + } - writeLong(size); + s.write(this); + } + + public void writeLongStruct(Struct s) + { + if (s == null) + { + writeLong(0); + } + else + { + Sizer sizer = sizer(); + sizer.writeShort(s.getEncodedType()); + s.write(sizer); + writeLong(sizer.size()); writeShort(s.getEncodedType()); s.write(this); } @@ -308,15 +327,10 @@ abstract class AbstractEncoder implements Encoder return; } - int size = 0; - if (calcsize) - { - SizeEncoder sizer = new SizeEncoder(major, minor); - sizer.writeTableEntries(table); - size = sizer.getSize(); - } - - writeLong(size); + Sizer sizer = sizer(); + sizer.writeTable(table); + // XXX: - 4 + writeLong(sizer.size() - 4); writeTableEntries(table); } @@ -335,15 +349,10 @@ abstract class AbstractEncoder implements Encoder public void writeSequence(List<Object> sequence) { - int size = 0; - if (calcsize) - { - SizeEncoder sizer = new SizeEncoder(major, minor); - sizer.writeSequenceEntries(sequence); - size = sizer.getSize(); - } - - writeLong(size); + Sizer sizer = sizer(); + sizer.writeSequence(sequence); + // XXX: - 4 + writeLong(sizer.size() - 4); writeSequenceEntries(sequence); } @@ -359,15 +368,10 @@ abstract class AbstractEncoder implements Encoder public void writeArray(List<Object> array) { - int size = 0; - if (calcsize) - { - SizeEncoder sizer = new SizeEncoder(major, minor); - sizer.writeArrayEntries(array); - size = sizer.getSize(); - } - - writeLong(size); + Sizer sizer = sizer(); + sizer.writeArray(array); + // XXX: -4 + writeLong(sizer.size() - 4); writeArrayEntries(array); } @@ -405,21 +409,26 @@ abstract class AbstractEncoder implements Encoder } else { - // XXX: should check lengths - switch (t.width) - { - case 1: - writeOctet((short) size); - break; - case 2: - writeShort(size); - break; - case 4: - writeLong(size); - break; - default: - throw new IllegalStateException("irregular width: " + t); - } + writeSize(t.width, size); + } + } + + private void writeSize(int width, int size) + { + // XXX: should check lengths + switch (width) + { + case 1: + writeOctet((short) size); + break; + case 2: + writeShort(size); + break; + case 4: + writeLong(size); + break; + default: + throw new IllegalStateException("illegal width: " + width); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java index 36cc13b7db..f0738e0a91 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java @@ -53,6 +53,7 @@ public interface Decoder String readContent(); + Struct readStruct(int type); Struct readLongStruct(); Map<String,Object> readTable(); diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java index 5490bdd904..1b2fe0213e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java @@ -55,6 +55,7 @@ public interface Encoder void writeContent(String c); + void writeStruct(int type, Struct s); void writeLongStruct(Struct s); void writeTable(Map<String,Object> table); diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java index 4f39709554..aba269ef08 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java @@ -31,7 +31,7 @@ import java.util.Map; * @author Rafael H. Schloming */ -public class SizeEncoder extends AbstractEncoder +public class SizeEncoder extends AbstractEncoder implements Sizer { private int size; @@ -41,10 +41,15 @@ public class SizeEncoder extends AbstractEncoder } public SizeEncoder(byte major, byte minor, int size) { - super(major, minor, false); + super(major, minor); this.size = size; } + protected Sizer sizer() + { + return Sizer.NULL; + } + public int getSize() { return size; } @@ -53,6 +58,12 @@ public class SizeEncoder extends AbstractEncoder this.size = size; } + public int size() + { + flush(); + return getSize(); + } + protected void doPut(byte b) { size += 1; diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java new file mode 100644 index 0000000000..b98bf98239 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.codec; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpidity.transport.RangeSet; +import org.apache.qpidity.transport.Struct; + + +/** + * Sizer + * + */ + +public interface Sizer extends Encoder +{ + + public static final Sizer NULL = new Sizer() + { + public void flush() {}; + + public void writeBit(boolean b) {}; + public void writeOctet(short b) {}; + public void writeShort(int s) {}; + public void writeLong(long i) {}; + public void writeLonglong(long l) {}; + + public void writeTimestamp(long l) {}; + + public void writeShortstr(String s) {}; + public void writeLongstr(String s) {}; + + public void writeRfc1982LongSet(RangeSet ranges) {}; + public void writeUuid(UUID uuid) {}; + + public void writeContent(String c) {}; + + public void writeStruct(int type, Struct s) {}; + public void writeLongStruct(Struct s) {}; + + public void writeTable(Map<String,Object> table) {}; + public void writeSequence(List<Object> sequence) {}; + public void writeArray(List<Object> array) {}; + + public int getSize() { return 0; } + + public int size() { return 0; } + }; + + int getSize(); + + int size(); + +} diff --git a/qpid/python/hello-world b/qpid/python/hello-world index b3170c8e0c..5ba14b0fc1 100755 --- a/qpid/python/hello-world +++ b/qpid/python/hello-world @@ -3,14 +3,25 @@ import qpid from qpid.client import Client from qpid.content import Content -client = Client("127.0.0.1", 5672, qpid.spec.load("../specs/amqp.0-10-preview.xml")) +spec = qpid.spec.load("../specs/amqp.0-10-preview.xml") +client = Client("127.0.0.1", 5672, spec) client.start({"LOGIN": "guest", "PASSWORD": "guest"}) ch = client.channel(1) ch.session_open() ch.queue_declare(queue="test") ch.queue_bind(exchange="amq.direct", queue="test", routing_key="test") -print ch.queue_query(queue="test") -ch.message_subscribe(queue="test", destination="test") +#print ch.queue_query(queue="test") +ch.message_subscribe(queue="test", destination="amq.direct") +ch.message_flow("amq.direct", 0, 0xFFFFFFFF) +ch.message_flow("amq.direct", 1, 0xFFFFFFFF) msg = Content("hello world") msg["content_type"] = "text/plain" +msg["routing_key"] = "test" +msg["reply_to"] = spec.struct("reply_to") +msg["reply_to"].exchange_name = "asdf" +msg["reply_to"].routing_key = "fdsa" +msg["application_headers"] = {"x": 1, "y": 2, "z": "zee"} ch.message_transfer(destination="amq.direct", content=msg) +queue = client.queue("amq.direct") +msg = queue.get(timeout=10) +print msg diff --git a/qpid/python/qpid/__init__.py b/qpid/python/qpid/__init__.py index 3f6d82b89e..7afebaf73b 100644 --- a/qpid/python/qpid/__init__.py +++ b/qpid/python/qpid/__init__.py @@ -31,8 +31,12 @@ class Struct: raise AttributeError(attr) return field - def has(self, name): - return self.type.fields.byname.has_key(name) + def exists(self, attr): + return self.type.fields.byname.has_key(attr) + + def has(self, attr): + self._check(attr) + return self._values.has_key(attr) def set(self, attr, value): self._check(attr) @@ -42,17 +46,30 @@ class Struct: field = self._check(attr) return self._values.get(attr, field.default()) + def clear(self, attr): + self._check(attr) + del self._values[attr] + def __setattr__(self, attr, value): self.set(attr, value) def __getattr__(self, attr): return self.get(attr) + def __delattr__(self, attr): + self.clear(attr) + def __setitem__(self, attr, value): self.set(attr, value) def __getitem__(self, attr): return self.get(attr) + def __delitem__(self, attr): + self.clear(attr) + def __str__(self): - return "%s %s" % (self.type.type, self._values) + return "%s %s" % (self.type, self._values) + + def __repr__(self): + return str(self) diff --git a/qpid/python/qpid/codec.py b/qpid/python/qpid/codec.py index f6b0f980cb..c54d48df52 100644 --- a/qpid/python/qpid/codec.py +++ b/qpid/python/qpid/codec.py @@ -94,6 +94,7 @@ class Codec: """ reads in 'n' bytes from the stream. Can raise EOF exception """ + self.clearbits() data = self.stream.read(n) if n > 0 and len(data) == 0: raise EOF() @@ -130,6 +131,10 @@ class Codec: for byte in bytes: self.encode_octet(byte) + def clearbits(self): + if self.incoming_bits: + self.incoming_bits = [] + def pack(self, fmt, *args): """ packs the data 'args' as per the format 'fmt' and writes it to the stream @@ -237,6 +242,12 @@ class Codec: """ return self.unpack("!L") + def encode_signed_long(self, o): + self.pack("!q", o) + + def decode_signed_long(self): + return self.unpack("!q") + def encode_longlong(self, o): """ encodes long long (64 bits) data 'o' in network byte order @@ -332,14 +343,8 @@ class Codec: w = width(code) if fixed(code): value = self.read(w) - elif w == 1: - value = self.decode_shortstr() - elif w == 2: - value = self.dec_str("!H") - elif w == 4: - value = self.decode_longstr() else: - raise ValueError("illegal width: " + w) + value = self.read(self.dec_num(w)) result[key] = value return result @@ -409,19 +414,88 @@ class Codec: def decode_uuid(self): return self.unpack("16s") + def enc_num(self, width, n): + if width == 1: + self.encode_octet(n) + elif width == 2: + self.encode_short(n) + elif width == 3: + self.encode_long(n) + else: + raise ValueError("invalid width: %s" % width) + + def dec_num(self, width): + if width == 1: + return self.decode_octet() + elif width == 2: + return self.decode_short() + elif width == 4: + return self.decode_long() + else: + raise ValueError("invalid width: %s" % width) + def encode_struct(self, type, s): + if False and type.size: + enc = StringIO() + codec = Codec(enc, self.spec) + codec.encode_struct_body(type, s) + codec.flush() + body = enc.getvalue() + self.enc_num(type.size, len(body)) + self.write(body) + else: + self.encode_struct_body(type, s) + + def decode_struct(self, type): + if False and type.size: + size = self.dec_num(type.size) + if size == 0: + return None + return self.decode_struct_body(type) + + def encode_struct_body(self, type, s): + reserved = 8*type.pack - len(type.fields) + assert reserved >= 0 + for f in type.fields: if s == None: - val = f.default() + self.encode_bit(False) + elif f.type == "bit": + self.encode_bit(s.get(f.name)) else: - val = s.get(f.name) - self.encode(f.type, val) + self.encode_bit(s.has(f.name)) + + for i in range(reserved): + self.encode_bit(False) + + for f in type.fields: + if f.type != "bit" and s != None and s.has(f.name): + self.encode(f.type, s.get(f.name)) + self.flush() - def decode_struct(self, type): + def decode_struct_body(self, type): + reserved = 8*type.pack - len(type.fields) + assert reserved >= 0 + s = qpid.Struct(type) + for f in type.fields: - s.set(f.name, self.decode(f.type)) + if f.type == "bit": + s.set(f.name, self.decode_bit()) + elif self.decode_bit(): + s.set(f.name, None) + + for i in range(reserved): + if self.decode_bit(): + raise ValueError("expecting reserved flag") + + for f in type.fields: + if f.type != "bit" and s.has(f.name): + s.set(f.name, self.decode(f.type)) + + self.clearbits() + return s def encode_long_struct(self, s): @@ -429,13 +503,13 @@ class Codec: codec = Codec(enc, self.spec) type = s.type codec.encode_short(type.type) - codec.encode_struct(type, s) + codec.encode_struct_body(type, s) self.encode_longstr(enc.getvalue()) def decode_long_struct(self): codec = Codec(StringIO(self.decode_longstr()), self.spec) type = self.spec.structs[codec.decode_short()] - return codec.decode_struct(type) + return codec.decode_struct_body(type) def fixed(code): return (code >> 6) != 2 @@ -454,9 +528,9 @@ def width(code): raise ValueError(code) # variable width elif code < 192 and code >= 128: - lenlen = (self.code >> 4) & 3 + lenlen = (code >> 4) & 3 if lenlen == 3: raise ValueError(code) return 2 ** lenlen # fixed width else: - return (self.code >> 4) & 7 + return (code >> 4) & 7 diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py index 15d7e69945..1beb60822d 100644 --- a/qpid/python/qpid/connection.py +++ b/qpid/python/qpid/connection.py @@ -163,7 +163,10 @@ class Connection: body_size = frame_size - 12 # TODO: Magic number (frame header size) body = c.read(body_size) dec = codec.Codec(StringIO(body), self.spec) - frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) + try: + frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) + except EOF: + raise "truncated frame body: %r" % body frame.channel = channel frame.subchannel = subchannel end = c.decode_octet() @@ -350,7 +353,7 @@ class Header(Frame): props = self.properties.copy() for k in self.properties: for s in structs: - if s.has(k): + if s.exists(k): s.set(k, props.pop(k)) if props: raise TypeError("no such property: %s" % (", ".join(props))) diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py index 5c05e71cf1..b734031798 100644 --- a/qpid/python/qpid/peer.py +++ b/qpid/python/qpid/peer.py @@ -130,8 +130,8 @@ class Peer: content = None self.delegate(channel, Message(channel, frame, content)) - except QueueClosed, e: - self.close(e) + except QueueClosed: + self.close("worker closed") except: self.fatal() diff --git a/qpid/python/qpid/spec.py b/qpid/python/qpid/spec.py index 2542ccc3e1..31c79276aa 100644 --- a/qpid/python/qpid/spec.py +++ b/qpid/python/qpid/spec.py @@ -29,7 +29,7 @@ class so that the generated code can be reused in a variety of situations. """ -import re, textwrap, new, mllib +import re, textwrap, new, mllib, qpid class SpecContainer: @@ -115,6 +115,10 @@ class Spec(Metadata): klass, meth = parts return self.classes.byname[klass].methods.byname[meth] + def struct(self, name): + type = self.domains.byname[name].type + return qpid.Struct(type) + def define_module(self, name, doc = None): module = new.module(name, doc) module.__file__ = self.file @@ -303,14 +307,26 @@ class Field(Metadata): else: return Method.DEFAULTS[self.type] +WIDTHS = { + "octet": 1, + "short": 2, + "long": 4 + } + +def width(st, default=None): + if st in (None, "none", ""): + return default + else: + return WIDTHS[st] + def get_result(nd, spec): result = nd["result"] if not result: return None name = result["@domain"] if name != None: return spec.domains.byname[name] st_nd = result["struct"] - st = Struct(st_nd["@size"], int(result.parent.parent["@index"])*256 + - int(st_nd["@type"]), st_nd["@pack"]) + st = Struct(width(st_nd["@size"]), int(result.parent.parent["@index"])*256 + + int(st_nd["@type"]), width(st_nd["@pack"], 2)) spec.structs[st.type] = st load_fields(st_nd, st.fields, spec.domains.byname) return st @@ -366,7 +382,7 @@ def load(specfile, *errata): code = st_nd["@type"] if code not in (None, "", "none"): code = int(code) - type = Struct(st_nd["@size"], code, st_nd["@pack"]) + type = Struct(width(st_nd["@size"]), code, width(st_nd["@pack"], 2)) if type.type != None: spec.structs[type.type] = type structs.append((type, st_nd)) diff --git a/qpid/python/tests/spec.py b/qpid/python/tests/spec.py index c00faad3ba..ce03640493 100644 --- a/qpid/python/tests/spec.py +++ b/qpid/python/tests/spec.py @@ -32,13 +32,13 @@ class SpecTest(TestCase): if (spec.major == 0 and spec.minor == 10): assert qdecl_ok == None reply_to = spec.domains.byname["reply_to"] - assert reply_to.type.size == "short" - assert reply_to.type.pack == "short" + assert reply_to.type.size == 2 + assert reply_to.type.pack == 2 assert len(reply_to.type.fields) == 2 qq = spec.method("queue_query") assert qq != None - assert qq.result.size == "long" + assert qq.result.size == 4 assert qq.result.type != None args = qq.result.fields.byname["arguments"] assert args.type == "table" diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py index c414b15214..8212c7be67 100644 --- a/qpid/python/tests_0-10/message.py +++ b/qpid/python/tests_0-10/message.py @@ -437,23 +437,25 @@ class MessageTests(TestBase): channel.message_subscribe(queue = "q", destination = "c") channel.message_flow_mode(mode = 0, destination = "c") #send batch of messages to queue - for i in range(1, 11): + for i in range(10): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) - #each message is currently interpreted as requiring 75 bytes of credit + #each message is currently interpreted as requiring msg_size bytes of credit + msg_size = 40 + #set byte credit to finite amount (less than enough for all messages) - channel.message_flow(unit = 1, value = 75*5, destination = "c") + channel.message_flow(unit = 1, value = msg_size*5, destination = "c") #set infinite message credit channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") #check that expected number were received q = self.client.queue("c") - for i in range(1, 6): + for i in range(5): self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) - + #increase credit again and check more are received - for i in range(6, 11): - channel.message_flow(unit = 1, value = 75, destination = "c") + for i in range(5): + channel.message_flow(unit = 1, value = msg_size, destination = "c") self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) @@ -501,25 +503,27 @@ class MessageTests(TestBase): channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) channel.message_flow_mode(mode = 1, destination = "c") #send batch of messages to queue - for i in range(1, 11): + for i in range(10): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) - #each message is currently interpreted as requiring 75 bytes of credit + #each message is currently interpreted as requiring msg_size bytes of credit + msg_size = 40 + #set byte credit to finite amount (less than enough for all messages) - channel.message_flow(unit = 1, value = 75*5, destination = "c") + channel.message_flow(unit = 1, value = msg_size*5, destination = "c") #set infinite message credit channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") #check that expected number were received q = self.client.queue("c") msgs = [] - for i in range(1, 6): + for i in range(5): msg = q.get(timeout = 1) msgs.append(msg) self.assertDataEquals(channel, msg, "abcdefgh") self.assertEmpty(q) #ack each message individually and check more are received - for i in range(6, 11): + for i in range(5): msg = msgs.pop() msg.complete(cumulative=False) self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") |
