diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2007-09-24 14:26:14 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2007-09-24 14:26:14 +0000 |
| commit | d1e7aed5b442767e4a81a64f21266b181047690d (patch) | |
| tree | f6c1f63f9796e33d4ee100a0ce9d38a48d09ad77 /java | |
| parent | 31f50b898ce632d5879bf5cd68c84fe2a5eaa87c (diff) | |
| download | qpid-python-d1e7aed5b442767e4a81a64f21266b181047690d.tar.gz | |
added field table encoding/decoding
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@578827 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
17 files changed, 742 insertions, 113 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java index f2e28eb8e7..b99e1f2f2a 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java @@ -260,7 +260,7 @@ public interface Session * on the providers implementation. */ public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, - MessagePartListener listener, Map<String, ?> filter, Option... options); + MessagePartListener listener, Map<String, Object> filter, Option... options); /** * This method cancels a consumer. This does not affect already delivered messages, but it does @@ -481,7 +481,7 @@ public interface Session * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NO_OPTION}) * @see Option */ - public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options); + public void queueDeclare(String queueName, String alternateExchange, Map<String, Object> arguments, Option... options); /** * Bind a queue with an exchange. @@ -498,7 +498,7 @@ public interface Session * routing keys depends on the exchange implementation. * @param arguments Used for backward compatibility */ - public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments); + public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments); /** * Unbind a queue from an exchange. @@ -508,7 +508,7 @@ public interface Session * @param routingKey Specifies the routing key of the binding to unbind. * @param arguments Used for backward compatibility */ - public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments); + public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments); /** * This method removes all messages from a queue. It does not cancel consumers. Purged messages @@ -573,7 +573,7 @@ public interface Session * @param arguments Used for backward compatibility * @see Option */ - public void exchangeDeclare(String exchangeName, String type, String alternateExchange, Map<String, ?> arguments, + public void exchangeDeclare(String exchangeName, String type, String alternateExchange, Map<String, Object> arguments, Option... options); /** diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java index f6aa444881..69b04490ff 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java @@ -35,7 +35,7 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen } } - public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map<String, ?> filter, Option... options) + public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map<String, Object> filter, Option... options) { setMessageListener(destination,listener); super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options); diff --git a/java/common/generate b/java/common/generate index 502b0a4706..6d3c3dea2c 100755 --- a/java/common/generate +++ b/java/common/generate @@ -21,6 +21,7 @@ class Output: self.line("package %s;" % self.package) self.line() + self.line("import java.util.List;") self.line("import java.util.Map;") self.line("import java.util.UUID;") self.line() @@ -54,12 +55,55 @@ TYPES = { "short": "int", "octet": "short", "bit": "boolean", - "table": "Map<String,?>", + "table": "Map<String,Object>", "timestamp": "long", "content": "String", "uuid": "UUID", "rfc1982-long-set": "RangeSet", - "long-struct": "Struct" + "long-struct": "Struct", + "signed-byte": "byte", + "unsigned-byte": "short", + "char": "char", + "boolean": "boolean", + "two-octets": "short", + "signed-short": "short", + "unsigned-short": "int", + "four-octets": "int", + "signed-int": "int", + "unsigned-int": "long", + "float": "float", + "utf32-char": "char", + "eight-octets": "long", + "signed-long": "long", + "unsigned-long": "long", + "double": "double", + "datetime": "long", + "sixteen-octets": "byte[]", + "thirty-two-octets": "byte[]", + "sixty-four-octets": "byte[]", + "_128-octets": "byte[]", + "short-binary": "byte[]", + "short-string": "String", + "short-utf8-string": "String", + "short-utf16-string": "String", + "short-utf32-string": "String", + "binary": "byte[]", + "string": "String", + "utf8-string": "String", + "utf16-string": "String", + "utf32-string": "String", + "long-binary": "byte[]", + "long-string": "String", + "long-utf8-string": "String", + "long-utf16-string": "String", + "long-utf32-string": "String", + "sequence": "List<Object>", + "array": "List<Object>", + "five-octets": "byte[]", + "decimal": "byte[]", + "nine-octets": "byte[]", + "long-decimal": "byte[]", + "void": "Void" } TRACKS = { @@ -81,6 +125,54 @@ def dromedary(s): def scream(*args): return "_".join([a.replace("-", "_").upper() for a in args]) + +types = Output(out_dir, out_pkg, "Type") +types.line("public enum Type") +types.line("{") +codes = {} +for c in spec.query["amqp/constant"]: + if c["@class"] == "field-table-type": + name = c["@name"] + if name.startswith("field-table-"): + name = name[12:] + if name[0].isdigit(): + name = "_" + name + val = c["@value"] + codes[val] = name + if c["@width"] != None: + width = c["@width"] + fixed = "true" + if c["@lfwidth"] != None: + width = c["@lfwidth"] + fixed = "false" + types.line(" %s((byte) %s, %s, %s)," % + (scream(name), val, width, fixed)) +types.line(" ;") + +types.line(" public byte code;") +types.line(" public int width;") +types.line(" public boolean fixed;") + +types.line(" Type(byte code, int width, boolean fixed)") +types.line(" {") +for arg in ("code", "width", "fixed"): + types.line(" this.%s = %s;" % (arg, arg)) +types.line(" }") + +types.line(" public static Type get(byte code)") +types.line(" {") +types.line(" switch (code)") +types.line(" {") +for code, name in codes.items(): + types.line(" case (byte) %s: return %s;" % (code, scream(name))) +types.line(" default: return null;") +types.line(" }") +types.line(" }") + +types.line("}") +types.write() + + const = Output(out_dir, out_pkg, "Constant") const.line("public interface Constant") const.line("{") diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index 5aff49cb16..f927765166 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -122,6 +122,12 @@ class ToyBroker extends SessionDelegate { System.out.println("received headers routing_key " + props.getRoutingKey()); } + MessageProperties mp = header.get(MessageProperties.class); + System.out.println("MP: " + mp); + if (mp != null) + { + System.out.println(mp.getApplicationHeaders()); + } this.header = header; } diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java index d924c55c7b..ec2af0dc1a 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java @@ -20,6 +20,8 @@ */ package org.apache.qpidity; +import java.util.*; + import org.apache.qpidity.transport.*; import org.apache.qpidity.transport.network.mina.MinaHandler; @@ -78,9 +80,32 @@ class ToyClient extends SessionDelegate ssn.queueDeclare("asdf", null, null); ssn.sync(); + Map<String,Object> nested = new LinkedHashMap<String,Object>(); + nested.put("list", Arrays.asList("one", "two", "three")); + Map<String,Object> map = new LinkedHashMap<String,Object>(); + + map.put("str", "this is a string"); + + map.put("+int", 3); + map.put("-int", -3); + map.put("maxint", Integer.MAX_VALUE); + map.put("minint", Integer.MIN_VALUE); + + map.put("+short", (short) 1); + map.put("-short", (short) -1); + map.put("maxshort", (short) Short.MAX_VALUE); + map.put("minshort", (short) Short.MIN_VALUE); + + map.put("float", (float) 3.3); + map.put("double", 4.9); + map.put("char", 'c'); + + map.put("table", nested); + map.put("list", Arrays.asList(1, 2, 3)); + ssn.messageTransfer("asdf", (short) 0, (short) 1); ssn.header(new DeliveryProperties(), - new MessageProperties()); + new MessageProperties().setApplicationHeaders(map)); ssn.data("this is the data"); ssn.endData(); diff --git a/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java index cfb9dfbe92..9c33d9f754 100644 --- a/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java @@ -20,11 +20,15 @@ */ package org.apache.qpidity.codec; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.qpidity.transport.RangeSet; import org.apache.qpidity.transport.Struct; +import org.apache.qpidity.transport.Type; import static org.apache.qpidity.transport.util.Functions.*; @@ -40,20 +44,37 @@ abstract class AbstractDecoder implements Decoder private final byte major; private final byte minor; + private int count; protected AbstractDecoder(byte major, byte minor) { this.major = major; this.minor = minor; + this.count = 0; } - protected abstract byte get(); + protected abstract byte doGet(); - protected abstract void get(byte[] bytes); + protected abstract void doGet(byte[] bytes); + + protected byte get() + { + clearBits(); + byte b = doGet(); + count += 1; + return b; + } + + protected void get(byte[] bytes) + { + clearBits(); + doGet(bytes); + count += bytes.length; + } protected short uget() { - return unsigned(get()); + return (short) (0xFF & get()); } private byte bits = 0x0; @@ -81,13 +102,11 @@ abstract class AbstractDecoder implements Decoder public short readOctet() { - clearBits(); return uget(); } public int readShort() { - clearBits(); int i = uget() << 8; i |= uget(); return i; @@ -95,7 +114,6 @@ abstract class AbstractDecoder implements Decoder public long readLong() { - clearBits(); long l = uget() << 24; l |= uget() << 16; l |= uget() << 8; @@ -105,15 +123,11 @@ abstract class AbstractDecoder implements Decoder public long readLonglong() { - clearBits(); - long l = uget() << 56; - l |= uget() << 48; - l |= uget() << 40; - l |= uget() << 32; - l |= uget() << 24; - l |= uget() << 16; - l |= uget() << 8; - l |= uget(); + long l = 0; + for (int i = 0; i < 8; i++) + { + l |= ((long) (0xFF & get())) << (56 - i*8); + } return l; } @@ -134,18 +148,11 @@ abstract class AbstractDecoder implements Decoder public String readLongstr() { long size = readLong(); - assert size <= Integer.MAX_VALUE; byte[] bytes = new byte[(int) size]; get(bytes); return new String(bytes); } - public Map<String,?> readTable() - { - //throw new Error("TODO"); - return null; - } - public RangeSet readRfc1982LongSet() { int count = readShort()/8; @@ -192,4 +199,173 @@ abstract class AbstractDecoder implements Decoder } } + public Map<String,Object> readTable() + { + long size = readLong(); + int start = count; + Map<String,Object> result = new LinkedHashMap(); + while (count < start + size) + { + String key = readShortstr(); + byte code = get(); + Type t = Type.get(code); + Object value = read(t); + result.put(key, value); + } + return result; + } + + public List<Object> readSequence() + { + long size = readLong(); + int start = count; + List<Object> result = new ArrayList(); + while (count < start + size) + { + byte code = get(); + Type t = Type.get(code); + Object value = read(t); + result.add(value); + } + return result; + } + + public List<Object> readArray() + { + long size = readLong(); + byte code = get(); + Type t = Type.get(code); + long count = readLong(); + + List<Object> result = new ArrayList<Object>(); + for (int i = 0; i < count; i++) + { + Object value = read(t); + result.add(value); + } + return result; + } + + private long readSize(Type t) + { + if (t.fixed) + { + return t.width; + } + else + { + switch (t.width) + { + case 1: + return readOctet(); + case 2: + return readShort(); + case 4: + return readLong(); + default: + throw new IllegalStateException("irregular width: " + t); + } + } + } + + private byte[] readBytes(Type t) + { + long size = readSize(t); + byte[] result = new byte[(int) size]; + get(result); + return result; + } + + private Object read(Type t) + { + switch (t) + { + case OCTET: + case UNSIGNED_BYTE: + return readOctet(); + case SIGNED_BYTE: + return get(); + case CHAR: + return (char) get(); + case BOOLEAN: + return get() > 0; + + case TWO_OCTETS: + case UNSIGNED_SHORT: + return readShort(); + + case SIGNED_SHORT: + return (short) readShort(); + + case FOUR_OCTETS: + case UNSIGNED_INT: + return readLong(); + + case UTF32_CHAR: + case SIGNED_INT: + return (int) readLong(); + + case FLOAT: + return Float.intBitsToFloat((int) readLong()); + + case EIGHT_OCTETS: + case SIGNED_LONG: + case UNSIGNED_LONG: + case DATETIME: + return readLonglong(); + + case DOUBLE: + long bits = readLonglong(); + System.out.println("double in: " + bits); + return Double.longBitsToDouble(bits); + + case SIXTEEN_OCTETS: + case THIRTY_TWO_OCTETS: + case SIXTY_FOUR_OCTETS: + case _128_OCTETS: + case SHORT_BINARY: + case BINARY: + case LONG_BINARY: + return readBytes(t); + + case UUID: + return readUuid(); + + case SHORT_STRING: + case SHORT_UTF8_STRING: + case SHORT_UTF16_STRING: + case SHORT_UTF32_STRING: + case STRING: + case UTF8_STRING: + case UTF16_STRING: + case UTF32_STRING: + case LONG_STRING: + case LONG_UTF8_STRING: + case LONG_UTF16_STRING: + case LONG_UTF32_STRING: + // XXX: need to do character conversion + return new String(readBytes(t)); + + case TABLE: + return readTable(); + case SEQUENCE: + return readSequence(); + case ARRAY: + return readArray(); + + case FIVE_OCTETS: + case DECIMAL: + case NINE_OCTETS: + case LONG_DECIMAL: + // XXX: what types are we supposed to use here? + return readBytes(t); + + case VOID: + return null; + + default: + return readBytes(t); + } + } + } diff --git a/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java index 0cc5a4157a..7ad71ef520 100644 --- a/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java @@ -22,12 +22,15 @@ package org.apache.qpidity.codec; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.qpidity.transport.Range; import org.apache.qpidity.transport.RangeSet; import org.apache.qpidity.transport.Struct; +import org.apache.qpidity.transport.Type; import static org.apache.qpidity.transport.util.Functions.*; @@ -41,18 +44,57 @@ import static org.apache.qpidity.transport.util.Functions.*; abstract class AbstractEncoder implements Encoder { + private static Map<Class<?>,Type> ENCODINGS = new HashMap<Class<?>,Type>(); + static + { + ENCODINGS.put(String.class, Type.LONG_STRING); + ENCODINGS.put(Long.class, Type.SIGNED_LONG); + ENCODINGS.put(Integer.class, Type.SIGNED_INT); + ENCODINGS.put(Short.class, Type.SIGNED_SHORT); + ENCODINGS.put(Byte.class, Type.SIGNED_BYTE); + ENCODINGS.put(Map.class, Type.TABLE); + ENCODINGS.put(List.class, Type.SEQUENCE); + ENCODINGS.put(Float.class, Type.FLOAT); + ENCODINGS.put(Double.class, Type.DOUBLE); + ENCODINGS.put(Character.class, Type.CHAR); + } + private final byte major; private final byte minor; + private final boolean calcsize; - protected AbstractEncoder(byte major, byte minor) + protected AbstractEncoder(byte major, byte minor, boolean calcsize) { this.major = major; this.minor = minor; + this.calcsize = calcsize; + } + + protected AbstractEncoder(byte major, byte minor) + { + this(major, minor, true); } - protected abstract void put(byte b); + protected abstract void doPut(byte b); - protected abstract void put(ByteBuffer src); + protected abstract void doPut(ByteBuffer src); + + protected void put(byte b) + { + flushBits(); + doPut(b); + } + + protected void put(ByteBuffer src) + { + flushBits(); + doPut(src); + } + + protected void put(byte[] bytes) + { + put(ByteBuffer.wrap(bytes)); + } private byte bits = 0x0; private byte nbits = 0; @@ -76,17 +118,21 @@ abstract class AbstractEncoder implements Encoder { if (nbits > 0) { - put(bits); + doPut(bits); bits = 0x0; nbits = 0; } } + public void flush() + { + flushBits(); + } + public void writeOctet(short b) { assert b < 0x100; - flushBits(); put((byte) b); } @@ -94,7 +140,6 @@ abstract class AbstractEncoder implements Encoder { assert s < 0x10000; - flushBits(); put(lsb(s >>> 8)); put(lsb(s)); } @@ -103,7 +148,6 @@ abstract class AbstractEncoder implements Encoder { assert i < 0x100000000L; - flushBits(); put(lsb(i >>> 24)); put(lsb(i >>> 16)); put(lsb(i >>> 8)); @@ -112,21 +156,15 @@ abstract class AbstractEncoder implements Encoder public void writeLonglong(long l) { - flushBits(); - put(lsb(l >>> 56)); - put(lsb(l >>> 48)); - put(lsb(l >>> 40)); - put(lsb(l >>> 32)); - put(lsb(l >>> 24)); - put(lsb(l >>> 16)); - put(lsb(l >>> 8)); - put(lsb(l)); + for (int i = 0; i < 8; i++) + { + put(lsb(l >> (56 - i*8))); + } } public void writeTimestamp(long l) { - flushBits(); writeLonglong(l); } @@ -149,11 +187,6 @@ abstract class AbstractEncoder implements Encoder } - public void writeTable(Map<String,?> table) - { - //throw new Error("TODO"); - } - public void writeRfc1982LongSet(RangeSet ranges) { if (ranges == null) @@ -197,19 +230,330 @@ abstract class AbstractEncoder implements Encoder } else { - SizeEncoder sizer = new SizeEncoder(major, minor); - sizer.writeShort(s.getEncodedType()); - s.write(sizer, major, minor); + int size = 0; + if (calcsize) + { + SizeEncoder sizer = new SizeEncoder(major, minor); + sizer.writeShort(s.getEncodedType()); + s.write(sizer, major, minor); + size = sizer.getSize(); + } - writeLong(sizer.getSize()); + writeLong(size); writeShort(s.getEncodedType()); s.write(this, major, minor); } } - public void flush() + private Type encoding(Object value) { - flushBits(); + if (value == null) + { + return Type.VOID; + } + + Class klass = value.getClass(); + Type type = resolve(klass); + + if (type == null) + { + throw new IllegalArgumentException + ("unable to resolve type: " + klass + ", " + value); + } + else + { + return type; + } + } + + private Type resolve(Class klass) + { + Type type = ENCODINGS.get(klass); + if (type != null) + { + return type; + } + + Class sup = klass.getSuperclass(); + if (sup != null) + { + type = resolve(klass.getSuperclass()); + + if (type != null) + { + return type; + } + } + + for (Class iface : klass.getInterfaces()) + { + type = resolve(iface); + if (type != null) + { + return type; + } + } + + return null; + } + + public void writeTable(Map<String,Object> table) + { + if (table == null) + { + writeLong(0); + return; + } + + int size = 0; + if (calcsize) + { + SizeEncoder sizer = new SizeEncoder(major, minor); + sizer.writeTableEntries(table); + size = sizer.getSize(); + } + + writeLong(size); + writeTableEntries(table); + } + + protected void writeTableEntries(Map<String,Object> table) + { + for (Map.Entry<String,Object> entry : table.entrySet()) + { + String key = entry.getKey(); + Object value = entry.getValue(); + Type type = encoding(value); + writeShortstr(key); + put(type.code); + write(type, value); + } + } + + public void writeSequence(List<Object> sequence) + { + int size = 0; + if (calcsize) + { + SizeEncoder sizer = new SizeEncoder(major, minor); + sizer.writeSequenceEntries(sequence); + size = sizer.getSize(); + } + + writeLong(size); + writeSequenceEntries(sequence); + } + + protected void writeSequenceEntries(List<Object> sequence) + { + for (Object value : sequence) + { + Type type = encoding(value); + put(type.code); + write(type, value); + } + } + + public void writeArray(List<Object> array) + { + int size = 0; + if (calcsize) + { + SizeEncoder sizer = new SizeEncoder(major, minor); + sizer.writeArrayEntries(array); + size = sizer.getSize(); + } + + writeLong(size); + writeArrayEntries(array); + } + + protected void writeArrayEntries(List<Object> array) + { + Type type; + + if (array.isEmpty()) + { + type = Type.VOID; + } + else + { + type = encoding(array.get(0)); + } + + put(type.code); + + for (Object value : array) + { + write(type, value); + } + } + + private void writeSize(Type t, int size) + { + if (t.fixed) + { + if (size != t.width) + { + throw new IllegalArgumentException + ("size does not match fixed width " + t.width + ": " + + size); + } + } + else + { + // XXX: should check lengths + switch (t.width) + { + case 1: + writeOctet((short) size); + break; + case 2: + writeShort(size); + break; + case 4: + writeLong(size); + break; + default: + throw new IllegalStateException("irregular width: " + t); + } + } + } + + private void writeBytes(Type t, byte[] bytes) + { + writeSize(t, bytes.length); + put(bytes); + } + + private <T> T coerce(Class<T> klass, Object value) + { + if (klass.isInstance(value)) + { + return klass.cast(value); + } + else + { + throw new IllegalArgumentException("" + value); + } + } + + private void write(Type t, Object value) + { + switch (t) + { + case OCTET: + case UNSIGNED_BYTE: + writeOctet(coerce(Short.class, value)); + break; + case SIGNED_BYTE: + put(coerce(Byte.class, value)); + break; + case CHAR: + put((byte) ((char)coerce(Character.class, value))); + break; + case BOOLEAN: + if (coerce(Boolean.class, value)) + { + put((byte) 1); + } + else + { + put((byte) 0); + } + break; + + case TWO_OCTETS: + case UNSIGNED_SHORT: + writeShort(coerce(Integer.class, value)); + break; + + case SIGNED_SHORT: + writeShort(coerce(Short.class, value)); + break; + + case FOUR_OCTETS: + case UNSIGNED_INT: + writeLong(coerce(Long.class, value)); + break; + + case UTF32_CHAR: + case SIGNED_INT: + writeLong(coerce(Integer.class, value)); + break; + + case FLOAT: + writeLong(Float.floatToIntBits(coerce(Float.class, value))); + break; + + case EIGHT_OCTETS: + case SIGNED_LONG: + case UNSIGNED_LONG: + case DATETIME: + writeLonglong(coerce(Long.class, value)); + break; + + case DOUBLE: + long bits = Double.doubleToLongBits(coerce(Double.class, value)); + System.out.println("double out: " + bits); + writeLonglong(bits); + break; + + case SIXTEEN_OCTETS: + case THIRTY_TWO_OCTETS: + case SIXTY_FOUR_OCTETS: + case _128_OCTETS: + case SHORT_BINARY: + case BINARY: + case LONG_BINARY: + writeBytes(t, coerce(byte[].class, value)); + break; + + case UUID: + writeUuid(coerce(UUID.class, value)); + break; + + case SHORT_STRING: + case SHORT_UTF8_STRING: + case SHORT_UTF16_STRING: + case SHORT_UTF32_STRING: + case STRING: + case UTF8_STRING: + case UTF16_STRING: + case UTF32_STRING: + case LONG_STRING: + case LONG_UTF8_STRING: + case LONG_UTF16_STRING: + case LONG_UTF32_STRING: + // XXX: need to do character conversion + writeBytes(t, coerce(String.class, value).getBytes()); + break; + + case TABLE: + writeTable((Map<String,Object>) coerce(Map.class, value)); + break; + case SEQUENCE: + writeSequence(coerce(List.class, value)); + break; + case ARRAY: + writeArray(coerce(List.class, value)); + break; + + case FIVE_OCTETS: + case DECIMAL: + case NINE_OCTETS: + case LONG_DECIMAL: + // XXX: what types are we supposed to use here? + writeBytes(t, coerce(byte[].class, value)); + break; + + case VOID: + break; + + default: + writeBytes(t, coerce(byte[].class, value)); + break; + } } } diff --git a/java/common/src/main/java/org/apache/qpidity/codec/BBDecoder.java b/java/common/src/main/java/org/apache/qpidity/codec/BBDecoder.java index b4e3389dde..111835882e 100644 --- a/java/common/src/main/java/org/apache/qpidity/codec/BBDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/codec/BBDecoder.java @@ -40,12 +40,12 @@ public class BBDecoder extends AbstractDecoder this.in = in; } - protected byte get() + protected byte doGet() { return in.get(); } - protected void get(byte[] bytes) + protected void doGet(byte[] bytes) { in.get(bytes); } diff --git a/java/common/src/main/java/org/apache/qpidity/codec/BBEncoder.java b/java/common/src/main/java/org/apache/qpidity/codec/BBEncoder.java index bbc6ba0ad9..7aa21c5871 100644 --- a/java/common/src/main/java/org/apache/qpidity/codec/BBEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/codec/BBEncoder.java @@ -39,12 +39,12 @@ public class BBEncoder extends AbstractEncoder this.out = out; } - @Override protected void put(byte b) + protected void doPut(byte b) { out.put(b); } - @Override protected void put(ByteBuffer src) + protected void doPut(ByteBuffer src) { out.put(src); } diff --git a/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java b/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java index 0d224056de..9ed02cdb42 100644 --- a/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java +++ b/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java @@ -20,6 +20,7 @@ */ package org.apache.qpidity.codec; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -47,7 +48,6 @@ public interface Decoder String readShortstr(); String readLongstr(); - Map<String,?> readTable(); RangeSet readRfc1982LongSet(); UUID readUuid(); @@ -55,4 +55,8 @@ public interface Decoder Struct readLongStruct(); + Map<String,Object> readTable(); + List<Object> readSequence(); + List<Object> readArray(); + } diff --git a/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java b/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java index 9a79ec70fa..1824a41895 100644 --- a/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java +++ b/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java @@ -20,6 +20,7 @@ */ package org.apache.qpidity.codec; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -36,6 +37,8 @@ import org.apache.qpidity.transport.Struct; public interface Encoder { + void flush(); + void writeBit(boolean b); void writeOctet(short b); void writeShort(int s); @@ -47,14 +50,15 @@ public interface Encoder void writeShortstr(String s); void writeLongstr(String s); - void writeTable(Map<String,?> table); void writeRfc1982LongSet(RangeSet ranges); void writeUuid(UUID uuid); void writeContent(String c); - void flush(); - void writeLongStruct(Struct s); + void writeTable(Map<String,Object> table); + void writeSequence(List<Object> sequence); + void writeArray(List<Object> array); + } diff --git a/java/common/src/main/java/org/apache/qpidity/codec/FragmentDecoder.java b/java/common/src/main/java/org/apache/qpidity/codec/FragmentDecoder.java index 53183e5007..cc4489c8c3 100644 --- a/java/common/src/main/java/org/apache/qpidity/codec/FragmentDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/codec/FragmentDecoder.java @@ -87,7 +87,7 @@ public class FragmentDecoder extends AbstractDecoder } } - @Override protected byte get() + protected byte doGet() { preRead(); byte b = current.get(); @@ -95,7 +95,7 @@ public class FragmentDecoder extends AbstractDecoder return b; } - @Override protected void get(byte[] bytes) + protected void doGet(byte[] bytes) { int remaining = bytes.length; while (remaining > 0) diff --git a/java/common/src/main/java/org/apache/qpidity/codec/SizeEncoder.java b/java/common/src/main/java/org/apache/qpidity/codec/SizeEncoder.java index a12cbef50a..5d4066f850 100644 --- a/java/common/src/main/java/org/apache/qpidity/codec/SizeEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/codec/SizeEncoder.java @@ -41,7 +41,7 @@ public class SizeEncoder extends AbstractEncoder } public SizeEncoder(byte major, byte minor, int size) { - super(major, minor); + super(major, minor, false); this.size = size; } @@ -53,31 +53,14 @@ public class SizeEncoder extends AbstractEncoder this.size = size; } - @Override protected void put(byte b) + protected void doPut(byte b) { size += 1; } - @Override protected void put(ByteBuffer src) + protected void doPut(ByteBuffer src) { size += src.remaining(); } - @Override public void writeShortstr(String s) - { - if (s == null) { s = ""; } - if (s.length() > 255) { - throw new IllegalArgumentException(s); - } - writeOctet((byte) s.length()); - size += s.length(); - } - - @Override public void writeLongstr(String s) - { - if (s == null) { s = ""; } - writeLong(s.length()); - size += s.length(); - } - } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java index 8ea2eaf8c4..a74ff99eb9 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java @@ -106,7 +106,7 @@ public class Channel extends Invoker public void error(Void v, ProtocolError error) { - error.delegate(session, sessionDelegate); + throw new RuntimeException(error.getMessage()); } public void closed() diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java index f3d3a70bff..5c812042af 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java @@ -117,7 +117,7 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> // need error handling } - Map<String,?> props = new HashMap<String,String>(); + Map<String,Object> props = new HashMap<String,Object>(); context.connectionStartOk(props, mechanism, response, _locale); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java index 7293193c02..b5e7a15854 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java @@ -27,6 +27,8 @@ import org.apache.qpidity.transport.ProtocolError; import org.apache.qpidity.transport.ProtocolHeader; import org.apache.qpidity.transport.Receiver; +import static org.apache.qpidity.transport.util.Functions.*; + import static org.apache.qpidity.transport.network.InputHandler.State.*; @@ -147,12 +149,20 @@ public class InputHandler implements Receiver<ByteBuffer> type = buf.get(); return FRAME_HDR_SIZE1; case FRAME_HDR_SIZE1: - size = buf.get() << 8; + size = (0xFF & buf.get()) << 8; return FRAME_HDR_SIZE2; case FRAME_HDR_SIZE2: - size += buf.get(); + size += 0xFF & buf.get(); size -= 12; - return FRAME_HDR_RSVD1; + if (size < 0 || size > (64*1024 - 12)) + { + error("bad frame size: %d", size); + return ERROR; + } + else + { + return FRAME_HDR_RSVD1; + } case FRAME_HDR_RSVD1: return expect(buf, 0, FRAME_HDR_TRACK); case FRAME_HDR_TRACK: @@ -166,10 +176,10 @@ public class InputHandler implements Receiver<ByteBuffer> return FRAME_HDR_CH1; } case FRAME_HDR_CH1: - channel = buf.get() << 8; + channel = (0xFF & buf.get()) << 8; return FRAME_HDR_CH2; case FRAME_HDR_CH2: - channel += buf.get(); + channel += 0xFF & buf.get(); return FRAME_HDR_RSVD2; case FRAME_HDR_RSVD2: return expect(buf, 0, FRAME_HDR_RSVD3); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java index fb1d4ccddf..69025940b5 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java @@ -32,21 +32,6 @@ import java.nio.ByteBuffer; public class Functions { - public static final short unsigned(byte b) - { - return (short) ((0x100 + b) & 0xFF); - } - - public static final int unsigned(short s) - { - return (0x10000 + s) & 0xFFFF; - } - - public static final long unsigned(int i) - { - return (0x1000000000L + i) & 0xFFFFFFFFL; - } - public static final byte lsb(int i) { return (byte) (0xFF & i); @@ -65,13 +50,13 @@ public class Functions public static final String str(ByteBuffer buf, int limit) { StringBuilder str = new StringBuilder(); - for (int i = 0; i < limit; i++) + for (int i = 0; i < buf.remaining(); i++) { if (i > 0 && i % 2 == 0) { str.append(" "); } - str.append(String.format("%02x", buf.get(i))); + str.append(String.format("%02x", buf.get(buf.position() + i))); } return str.toString(); |
