diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2011-12-28 13:02:41 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2011-12-28 13:02:41 +0000 |
| commit | 6d4226a532443ab1fe33c7d486877dbb11e154de (patch) | |
| tree | 98b300c1fa6885cdabdc18ad18c7587627d6dc32 /java/common/src | |
| parent | 2f1ced0ba4334901984de39134c0e0b9337fa5ad (diff) | |
| download | qpid-python-6d4226a532443ab1fe33c7d486877dbb11e154de.tar.gz | |
QPID-3714 : [Java] Performance Improvements
Persistence:
Store message in same transaction as enqueue if possible
Memory:
Remove unnecessary (un)boxing
Reduce unnecessary copying of message data
Cache short strings
Cache queues for a given routing key on an Exchange
(0-9) Use a fixed size buffer for preparing frames to write out
Other:
Reduce calls to System.currentTimeMillis
(0-10) Special case immutable RangeSets, in particular RangeSets of a single range/point
(0-10) Special case delivery properties and message properties in headers
(0-9) send commit-ok as soon as data committed to store
Cache publishing access control queries
(0-9) Optimised long and int typed values for FieldTables
(0-9) Retain FieldTable encoded form
(0-9) Cache queue and topic destinations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1225178 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
46 files changed, 1719 insertions, 725 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index 69bf73bb49..1d196534b2 100644 --- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -24,12 +24,7 @@ import java.io.*; import java.nio.ByteBuffer; import java.util.*; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQDataBlockDecoder; -import org.apache.qpid.framing.AMQFrameDecodingException; -import org.apache.qpid.framing.AMQMethodBodyFactory; -import org.apache.qpid.framing.AMQProtocolVersionException; -import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** @@ -193,24 +188,41 @@ public class AMQDecoder } } + private static class SimpleDataInputStream extends DataInputStream implements MarkableDataInput + { + public SimpleDataInputStream(InputStream in) + { + super(in); + } + + public AMQShortString readAMQShortString() throws IOException + { + return EncodingUtils.readAMQShortString(this); + } + + } + public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { // get prior remaining data from accumulator ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>(); - DataInputStream msg; + MarkableDataInput msg; - ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining()); + ByteArrayInputStream bais; + DataInput di; if(!_remainingBufs.isEmpty()) { + bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining()); _remainingBufs.add(bais); - msg = new DataInputStream(new RemainingByteArrayInputStream()); + msg = new SimpleDataInputStream(new RemainingByteArrayInputStream()); } else { - msg = new DataInputStream(bais); + bais = null; + msg = new ByteArrayDataInput(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining()); } boolean enoughData = true; @@ -245,11 +257,24 @@ public class AMQDecoder iterator.remove(); } } - if(bais.available()!=0) + + if(bais == null) + { + if(msg.available()!=0) + { + byte[] remaining = new byte[msg.available()]; + msg.read(remaining); + _remainingBufs.add(new ByteArrayInputStream(remaining)); + } + } + else { - byte[] remaining = new byte[bais.available()]; - bais.read(remaining); - _remainingBufs.add(new ByteArrayInputStream(remaining)); + if(bais.available()!=0) + { + byte[] remaining = new byte[bais.available()]; + bais.read(remaining); + _remainingBufs.add(new ByteArrayInputStream(remaining)); + } } } } diff --git a/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java b/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java new file mode 100644 index 0000000000..2a243a810d --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java @@ -0,0 +1,21 @@ +package org.apache.qpid.codec; + +import org.apache.qpid.framing.AMQShortString; + +import java.io.DataInput; +import java.io.IOException; + +public interface MarkableDataInput extends DataInput +{ + public void mark(int pos); + public void reset() throws IOException; + + int available() throws IOException; + + long skip(long i) throws IOException; + + int read(byte[] b) throws IOException; + + public AMQShortString readAMQShortString() throws IOException; + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index ebdad12178..363d9f1ccc 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.framing; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; @@ -36,7 +37,7 @@ public interface AMQBody */ public abstract int getSize(); - public void writePayload(DataOutputStream buffer) throws IOException; + public void writePayload(DataOutput buffer) throws IOException; void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java index 00c1f5aae5..e77e5942e3 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java @@ -21,6 +21,7 @@ package org.apache.qpid.framing; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; @@ -42,6 +43,6 @@ public abstract class AMQDataBlock implements EncodableAMQDataBlock * Writes the datablock to the specified buffer. * @param buffer */ - public abstract void writePayload(DataOutputStream buffer) throws IOException; + public abstract void writePayload(DataOutput buffer) throws IOException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 2165cadd14..b6f2fb18ea 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.framing; +import org.apache.qpid.codec.MarkableDataInput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +44,7 @@ public class AMQDataBlockDecoder public AMQDataBlockDecoder() { } - public boolean decodable(DataInputStream in) throws AMQFrameDecodingException, IOException + public boolean decodable(MarkableDataInput in) throws AMQFrameDecodingException, IOException { final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1); // type, channel, body length and end byte @@ -65,7 +66,7 @@ public class AMQDataBlockDecoder } - public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, DataInputStream in) + public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, MarkableDataInput in) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { final byte type = in.readByte(); diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index 6acf60a5b3..9b5699e8ff 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -20,9 +20,10 @@ */ package org.apache.qpid.framing; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; +import org.apache.qpid.codec.MarkableDataInput; + +import java.io.*; +import java.io.DataOutput; public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock { @@ -38,7 +39,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock _bodyFrame = bodyFrame; } - public AMQFrame(final DataInputStream in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException + public AMQFrame(final MarkableDataInput in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException { this._channel = channel; this._bodyFrame = bodyFactory.createBody(in,bodySize); @@ -55,7 +56,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock } - public void writePayload(DataOutputStream buffer) throws IOException + public void writePayload(DataOutput buffer) throws IOException { buffer.writeByte(_bodyFrame.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, _channel); @@ -79,7 +80,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame); } - public static void writeFrame(DataOutputStream buffer, final int channel, AMQBody body) throws IOException + public static void writeFrame(DataOutput buffer, final int channel, AMQBody body) throws IOException { buffer.writeByte(body.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, channel); @@ -89,7 +90,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock } - public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2) throws IOException + public static void writeFrames(DataOutput buffer, final int channel, AMQBody body1, AMQBody body2) throws IOException { buffer.writeByte(body1.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, channel); @@ -104,7 +105,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock } - public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) throws IOException + public static void writeFrames(DataOutput buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) throws IOException { buffer.writeByte(body1.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, channel); diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index a076d0e5a1..2170ebf992 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -25,6 +25,7 @@ import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; @@ -45,12 +46,12 @@ public interface AMQMethodBody extends AMQBody /** @return unsigned short */ public int getMethod(); - public void writeMethodPayload(DataOutputStream buffer) throws IOException; + public void writeMethodPayload(DataOutput buffer) throws IOException; public int getSize(); - public void writePayload(DataOutputStream buffer) throws IOException; + public void writePayload(DataOutput buffer) throws IOException; //public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException; diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index 7fceb082ee..ec6d662726 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -20,11 +20,13 @@ */ package org.apache.qpid.framing; +import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; @@ -39,7 +41,7 @@ public class AMQMethodBodyFactory implements BodyFactory _protocolSession = protocolSession; } - public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException + public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException { return _protocolSession.getMethodRegistry().convertToBody(in, bodySize); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java index c73c1df701..d6f518b123 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java @@ -24,11 +24,12 @@ package org.apache.qpid.framing; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; +import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; public abstract class AMQMethodBodyImpl implements AMQMethodBody @@ -101,7 +102,7 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody return 2 + 2 + getBodySize(); } - public void writePayload(DataOutputStream buffer) throws IOException + public void writePayload(DataOutput buffer) throws IOException { EncodingUtils.writeUnsignedShort(buffer, getClazz()); EncodingUtils.writeUnsignedShort(buffer, getMethod()); @@ -109,14 +110,15 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody } - protected byte readByte(DataInputStream buffer) throws IOException + protected byte readByte(DataInput buffer) throws IOException { return buffer.readByte(); } - protected AMQShortString readAMQShortString(DataInputStream buffer) throws IOException + protected AMQShortString readAMQShortString(MarkableDataInput buffer) throws IOException { - return EncodingUtils.readAMQShortString(buffer); + AMQShortString str = buffer.readAMQShortString(); + return str == null ? null : str.intern(false); } protected int getSizeOf(AMQShortString string) @@ -124,27 +126,27 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody return EncodingUtils.encodedShortStringLength(string); } - protected void writeByte(DataOutputStream buffer, byte b) throws IOException + protected void writeByte(DataOutput buffer, byte b) throws IOException { buffer.writeByte(b); } - protected void writeAMQShortString(DataOutputStream buffer, AMQShortString string) throws IOException + protected void writeAMQShortString(DataOutput buffer, AMQShortString string) throws IOException { EncodingUtils.writeShortStringBytes(buffer, string); } - protected int readInt(DataInputStream buffer) throws IOException + protected int readInt(DataInput buffer) throws IOException { return buffer.readInt(); } - protected void writeInt(DataOutputStream buffer, int i) throws IOException + protected void writeInt(DataOutput buffer, int i) throws IOException { buffer.writeInt(i); } - protected FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException + protected FieldTable readFieldTable(DataInput buffer) throws AMQFrameDecodingException, IOException { return EncodingUtils.readFieldTable(buffer); } @@ -154,17 +156,17 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates. } - protected void writeFieldTable(DataOutputStream buffer, FieldTable table) throws IOException + protected void writeFieldTable(DataOutput buffer, FieldTable table) throws IOException { EncodingUtils.writeFieldTableBytes(buffer, table); } - protected long readLong(DataInputStream buffer) throws IOException + protected long readLong(DataInput buffer) throws IOException { return buffer.readLong(); } - protected void writeLong(DataOutputStream buffer, long l) throws IOException + protected void writeLong(DataOutput buffer, long l) throws IOException { buffer.writeLong(l); } @@ -174,27 +176,27 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody return (response == null) ? 4 : response.length + 4; } - protected void writeBytes(DataOutputStream buffer, byte[] data) throws IOException + protected void writeBytes(DataOutput buffer, byte[] data) throws IOException { EncodingUtils.writeBytes(buffer,data); } - protected byte[] readBytes(DataInputStream buffer) throws IOException + protected byte[] readBytes(DataInput buffer) throws IOException { return EncodingUtils.readBytes(buffer); } - protected short readShort(DataInputStream buffer) throws IOException + protected short readShort(DataInput buffer) throws IOException { return EncodingUtils.readShort(buffer); } - protected void writeShort(DataOutputStream buffer, short s) throws IOException + protected void writeShort(DataOutput buffer, short s) throws IOException { EncodingUtils.writeShort(buffer, s); } - protected Content readContent(DataInputStream buffer) + protected Content readContent(DataInput buffer) { return null; } @@ -204,56 +206,56 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody return 0; } - protected void writeContent(DataOutputStream buffer, Content body) + protected void writeContent(DataOutput buffer, Content body) { } - protected byte readBitfield(DataInputStream buffer) throws IOException + protected byte readBitfield(DataInput buffer) throws IOException { return readByte(buffer); } - protected int readUnsignedShort(DataInputStream buffer) throws IOException + protected int readUnsignedShort(DataInput buffer) throws IOException { return buffer.readUnsignedShort(); } - protected void writeBitfield(DataOutputStream buffer, byte bitfield0) throws IOException + protected void writeBitfield(DataOutput buffer, byte bitfield0) throws IOException { buffer.writeByte(bitfield0); } - protected void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException + protected void writeUnsignedShort(DataOutput buffer, int s) throws IOException { EncodingUtils.writeUnsignedShort(buffer, s); } - protected long readUnsignedInteger(DataInputStream buffer) throws IOException + protected long readUnsignedInteger(DataInput buffer) throws IOException { return EncodingUtils.readUnsignedInteger(buffer); } - protected void writeUnsignedInteger(DataOutputStream buffer, long i) throws IOException + protected void writeUnsignedInteger(DataOutput buffer, long i) throws IOException { EncodingUtils.writeUnsignedInteger(buffer, i); } - protected short readUnsignedByte(DataInputStream buffer) throws IOException + protected short readUnsignedByte(DataInput buffer) throws IOException { return (short) buffer.readUnsignedByte(); } - protected void writeUnsignedByte(DataOutputStream buffer, short unsignedByte) throws IOException + protected void writeUnsignedByte(DataOutput buffer, short unsignedByte) throws IOException { EncodingUtils.writeUnsignedByte(buffer, unsignedByte); } - protected long readTimestamp(DataInputStream buffer) throws IOException + protected long readTimestamp(DataInput buffer) throws IOException { return EncodingUtils.readTimestamp(buffer); } - protected void writeTimestamp(DataOutputStream buffer, long t) throws IOException + protected void writeTimestamp(DataOutput buffer, long t) throws IOException { EncodingUtils.writeTimestamp(buffer, t); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java index df4d8bdcb6..88b1ca7189 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java @@ -21,11 +21,12 @@ package org.apache.qpid.framing; -import java.io.DataInputStream; +import org.apache.qpid.codec.MarkableDataInput; + import java.io.IOException; public abstract interface AMQMethodBodyInstanceFactory { - public AMQMethodBody newInstance(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException; + public AMQMethodBody newInstance(MarkableDataInput buffer, long size) throws AMQFrameDecodingException, IOException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index cc9a33f4cf..4ff7827d7f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -24,8 +24,9 @@ package org.apache.qpid.framing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInput; import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.DataOutput; import java.io.IOException; import java.util.*; import java.lang.ref.WeakReference; @@ -93,22 +94,44 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt private AMQShortString substring(final int from, final int to) { - return new AMQShortString(_data, from+_offset, to+_offset); + return new AMQShortString(_data, from+_offset, to-from); } - private static final ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>> _localInternMap = - new ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>>() + private static final int LOCAL_INTERN_CACHE_SIZE = 2048; + + private static final ThreadLocal<Map<AMQShortString, AMQShortString>> _localInternMap = + new ThreadLocal<Map<AMQShortString, AMQShortString>>() { - protected Map<AMQShortString, WeakReference<AMQShortString>> initialValue() + protected Map<AMQShortString, AMQShortString> initialValue() { - return new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>(); + return new LinkedHashMap<AMQShortString, AMQShortString>() + { + + protected boolean removeEldestEntry(Map.Entry<AMQShortString, AMQShortString> eldest) + { + return size() > LOCAL_INTERN_CACHE_SIZE; + } + }; }; }; private static final Map<AMQShortString, WeakReference<AMQShortString>> _globalInternMap = new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>(); + + private static final ThreadLocal<Map<String, WeakReference<AMQShortString>>> _localStringMap = + new ThreadLocal<Map<String, WeakReference<AMQShortString>>>() + { + protected Map<String, WeakReference<AMQShortString>> initialValue() + { + return new WeakHashMap<String, WeakReference<AMQShortString>>(); + }; + }; + + private static final Map<String, WeakReference<AMQShortString>> _globalStringMap = + new WeakHashMap<String, WeakReference<AMQShortString>>(); + private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class); private final byte[] _data; @@ -200,32 +223,32 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt } - private AMQShortString(DataInputStream data, final int length) throws IOException + private AMQShortString(DataInput data, final int length) throws IOException { if (length > MAX_LENGTH) { throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!"); } byte[] dataBytes = new byte[length]; - data.read(dataBytes); + data.readFully(dataBytes); _data = dataBytes; _offset = 0; _length = length; } - private AMQShortString(final byte[] data, final int from, final int to) + public AMQShortString(byte[] data, final int offset, final int length) { - if (data == null) - { - throw new NullPointerException("Cannot create AMQShortString with null data[]"); - } - int length = to - from; if (length > MAX_LENGTH) { throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!"); } - _offset = from; + if (data == null) + { + throw new NullPointerException("Cannot create AMQShortString with null data[]"); + } + + _offset = offset; _length = length; _data = data; } @@ -234,9 +257,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt { if(_data.length != _length) { - byte[] dataBytes = new byte[_length]; - System.arraycopy(_data,_offset,dataBytes,0,_length); - return new AMQShortString(dataBytes,0,_length); + return copy(); } else { @@ -265,7 +286,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return new CharSubSequence(start, end); } - public static AMQShortString readFromBuffer(DataInputStream buffer) throws IOException + public static AMQShortString readFromBuffer(DataInput buffer) throws IOException { final int length = buffer.readUnsignedByte(); if (length == 0) @@ -293,12 +314,12 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt } } - public void writeToBuffer(DataOutputStream buffer) throws IOException + public void writeToBuffer(DataOutput buffer) throws IOException { final int size = length(); //buffer.setAutoExpand(true); - buffer.write((byte) size); + buffer.writeByte(size); buffer.write(_data, _offset, size); } @@ -420,7 +441,17 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt { if (_asString == null) { - _asString = new String(asChars()); + AMQShortString intern = intern(); + + if(intern == this) + { + _asString = new String(asChars()); + } + else + { + _asString = intern.asString(); + } + } return _asString; } @@ -609,42 +640,51 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt public AMQShortString intern() { + return intern(true); + } + + public AMQShortString intern(boolean keep) + { hashCode(); - Map<AMQShortString, WeakReference<AMQShortString>> localMap = + Map<AMQShortString, AMQShortString> localMap = _localInternMap.get(); - WeakReference<AMQShortString> ref = localMap.get(this); - AMQShortString internString; + AMQShortString internString = localMap.get(this); - if(ref != null) + + if(internString != null) { - internString = ref.get(); - if(internString != null) - { - return internString; - } + return internString; } + WeakReference<AMQShortString> ref; synchronized(_globalInternMap) { ref = _globalInternMap.get(this); if((ref == null) || ((internString = ref.get()) == null)) { - internString = shrink(); + internString = keep ? shrink() : copy(); ref = new WeakReference(internString); _globalInternMap.put(internString, ref); } } - localMap.put(internString, ref); + localMap.put(internString, internString); return internString; } + private AMQShortString copy() + { + byte[] dataBytes = new byte[_length]; + System.arraycopy(_data,_offset,dataBytes,0,_length); + return new AMQShortString(dataBytes,0,_length); + } + private int occurences(final byte delim) { int count = 0; @@ -761,7 +801,46 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt public static AMQShortString valueOf(Object obj) { - return obj == null ? null : new AMQShortString(String.valueOf(obj)); + return obj == null ? null : AMQShortString.valueOf(String.valueOf(obj)); + } + + public static AMQShortString valueOf(String obj) + { + if(obj == null) + { + return null; + } + + Map<String, WeakReference<AMQShortString>> localMap = + _localStringMap.get(); + + WeakReference<AMQShortString> ref = localMap.get(obj); + AMQShortString internString; + + if(ref != null) + { + internString = ref.get(); + if(internString != null) + { + return internString; + } + } + + + synchronized(_globalStringMap) + { + + ref = _globalStringMap.get(obj); + if((ref == null) || ((internString = ref.get()) == null)) + { + internString = (new AMQShortString(obj)).intern(); + ref = new WeakReference<AMQShortString>(internString); + _globalStringMap.put(obj, ref); + } + + } + localMap.put(obj, ref); + return internString; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java index f3da64e639..5c89af09c4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.framing; -import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.math.BigDecimal; @@ -61,12 +61,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException + public void writeValueImpl(Object value, DataOutput buffer) throws IOException { EncodingUtils.writeLongStringBytes(buffer, (String) value); } - public Object readValueFromBuffer(DataInputStream buffer) throws IOException + public Object readValueFromBuffer(DataInput buffer) throws IOException { return EncodingUtils.readLongString(buffer); } @@ -107,12 +107,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException + public void writeValueImpl(Object value, DataOutput buffer) throws IOException { EncodingUtils.writeUnsignedInteger(buffer, (Long) value); } - public Object readValueFromBuffer(DataInputStream buffer) throws IOException + public Object readValueFromBuffer(DataInput buffer) throws IOException { return EncodingUtils.readUnsignedInteger(buffer); } @@ -138,7 +138,7 @@ public enum AMQType } } - public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException + public void writeValueImpl(Object value, DataOutput buffer) throws IOException { BigDecimal bd = (BigDecimal) value; @@ -151,7 +151,7 @@ public enum AMQType EncodingUtils.writeInteger(buffer, unscaled); } - public Object readValueFromBuffer(DataInputStream buffer) throws IOException + public Object readValueFromBuffer(DataInput buffer) throws IOException { byte places = EncodingUtils.readByte(buffer); @@ -183,12 +183,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException + public void writeValueImpl(Object value, DataOutput buffer) throws IOException { EncodingUtils.writeLong(buffer, (Long) value); } - public Object readValueFromBuffer(DataInputStream buffer) throws IOException + public Object readValueFromBuffer(DataInput buffer) throws IOException { return EncodingUtils.readLong(buffer); } @@ -247,7 +247,7 @@ public enum AMQType * @param value An instance of the type. * @param buffer The byte buffer to write it to. */ - public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException + public void writeValueImpl(Object value, DataOutput buffer) throws IOException { // Ensure that the value is a FieldTable. if (!(value instanceof FieldTable)) @@ -268,7 +268,7 @@ public enum AMQType * * @return An instance of the type. */ - public Object readValueFromBuffer(DataInputStream buffer) throws IOException + public Object readValueFromBuffer(DataInput buffer) throws IOException { try { @@ -302,10 +302,10 @@ public enum AMQType } } - public void writeValueImpl(Object value, DataOutputStream buffer) + public void writeValueImpl(Object value, DataOutput buffer) { } - public Object readValueFromBuffer(DataInputStream buffer) + public Object readValueFromBuffer(DataInput buffer) { return null; } @@ -331,12 +331,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException + public void writeValueImpl(Object value, DataOutput buffer) throws IOException { EncodingUtils.writeLongstr(buffer, (byte[]) value); } - public Object readValueFromBuffer(DataInputStream buffer) throws IOException + public Object readValueFromBuffer(DataInput buffer) throws IOException { return EncodingUtils.readLongstr(buffer); } @@ -361,12 +361,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException + public void writeValueImpl(Object value, DataOutput buffer) throws IOException { EncodingUtils.writeLongStringBytes(buffer, (String) value); } - public Object readValueFromBuffer(DataInputStream buffer) throws IOException + public Object readValueFromBuffer(DataInput buffer) throws IOException { return EncodingUtils.readLongString(buffer); } @@ -392,12 +392,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException + public void writeValueImpl(Object value, DataOutput buffer) throws IOException { EncodingUtils.writeLongStringBytes(buffer, (String) value); } - public Object readValueFromBuffer(DataInputStream buffer) throws IOException + public Object readValueFromBuffer(DataInput buffer) throws IOException { return EncodingUtils.readLongString(buffer); } @@ -427,12 +427,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException + public void writeValueImpl(Object value, DataOutput buffer) throws IOException { EncodingUtils.writeBoolean(buffer, (Boolean) value); } - public Object readValueFromBuffer(DataInputStream buffer) throws IOException + public Object readValueFromBuffer(DataInput buffer) throws IOException { return EncodingUtils.readBoolean(buffer); } @@ -462,12 +462,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException + public void writeValueImpl(Object value, DataOutput buffer) throws IOException { EncodingUtils.writeChar(buffer, (Character) value); } - public Object readValueFromBuffer(DataInputStream buffer) throws IOException + public Object readValueFromBuffer(DataInput buffer) throws IOException { return EncodingUtils.readChar(buffer); } @@ -497,12 +497,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException + public void writeValueImpl(Object value, DataOutput buffer) throws IOException { EncodingUtils.writeByte(buffer, (Byte) value); } - public Object readValueFromBuffer(DataInputStream buffer) throws IOException + public Object readValueFromBuffer(DataInput buffer) throws IOException { return EncodingUtils.readByte(buffer); } @@ -536,12 +536,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException + public void writeValueImpl(Object value, DataOutput buffer) throws IOException { EncodingUtils.writeShort(buffer, (Short) value); } - public Object readValueFromBuffer(DataInputStream buffer) throws IOException + public Object readValueFromBuffer(DataInput buffer) throws IOException { return EncodingUtils.readShort(buffer); } @@ -578,12 +578,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException + public void writeValueImpl(Object value, DataOutput buffer) throws IOException { EncodingUtils.writeInteger(buffer, (Integer) value); } - public Object readValueFromBuffer(DataInputStream buffer) throws IOException + public Object readValueFromBuffer(DataInput buffer) throws IOException { return EncodingUtils.readInteger(buffer); } @@ -596,6 +596,22 @@ public enum AMQType return EncodingUtils.encodedLongLength(); } + public int getEncodingSize(long value) + { + return EncodingUtils.encodedLongLength(); + } + + public AMQTypedValue asTypedValue(long value) + { + return AMQTypedValue.createAMQTypedValue(value); + } + + public void writeToBuffer(long value, DataOutput buffer) throws IOException + { + buffer.writeByte(identifier()); + EncodingUtils.writeLong(buffer, value); + } + public Object toNativeValue(Object value) { if (value instanceof Long) @@ -625,12 +641,18 @@ public enum AMQType } } - public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException + public void writeValueImpl(Object value, DataOutput buffer) throws IOException { EncodingUtils.writeLong(buffer, (Long) value); } - public Object readValueFromBuffer(DataInputStream buffer) throws IOException + public long readLongFromBuffer(DataInput buffer) throws IOException + { + return EncodingUtils.readLong(buffer); + } + + + public Object readValueFromBuffer(DataInput buffer) throws IOException { return EncodingUtils.readLong(buffer); } @@ -660,12 +682,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException + public void writeValueImpl(Object value, DataOutput buffer) throws IOException { EncodingUtils.writeFloat(buffer, (Float) value); } - public Object readValueFromBuffer(DataInputStream buffer) throws IOException + public Object readValueFromBuffer(DataInput buffer) throws IOException { return EncodingUtils.readFloat(buffer); } @@ -699,12 +721,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException + public void writeValueImpl(Object value, DataOutput buffer) throws IOException { EncodingUtils.writeDouble(buffer, (Double) value); } - public Object readValueFromBuffer(DataInputStream buffer) throws IOException + public Object readValueFromBuffer(DataInput buffer) throws IOException { return EncodingUtils.readDouble(buffer); } @@ -761,7 +783,7 @@ public enum AMQType */ public AMQTypedValue asTypedValue(Object value) { - return new AMQTypedValue(this, toNativeValue(value)); + return AMQTypedValue.createAMQTypedValue(this, toNativeValue(value)); } /** @@ -771,7 +793,7 @@ public enum AMQType * @param value An instance of the type. * @param buffer The byte buffer to write it to. */ - public void writeToBuffer(Object value, DataOutputStream buffer) throws IOException + public void writeToBuffer(Object value, DataOutput buffer) throws IOException { buffer.writeByte(identifier()); writeValueImpl(value, buffer); @@ -783,7 +805,7 @@ public enum AMQType * @param value An instance of the type. * @param buffer The byte buffer to write it to. */ - abstract void writeValueImpl(Object value, DataOutputStream buffer) throws IOException; + abstract void writeValueImpl(Object value, DataOutput buffer) throws IOException; /** * Reads an instance of the type from a specified byte buffer. @@ -792,5 +814,5 @@ public enum AMQType * * @return An instance of the type. */ - abstract Object readValueFromBuffer(DataInputStream buffer) throws IOException; + abstract Object readValueFromBuffer(DataInput buffer) throws IOException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java index 1dbedca362..84e4056f4d 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -20,9 +20,7 @@ */ package org.apache.qpid.framing; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; +import java.io.*; import java.util.Date; import java.util.Map; import java.math.BigDecimal; @@ -42,81 +40,218 @@ import java.math.BigDecimal; * <tr><td> Extract the value from a fully typed AMQP value. * </table> */ -public class AMQTypedValue +public abstract class AMQTypedValue { - /** The type of the value. */ - private final AMQType _type; - /** The Java native representation of the AMQP typed value. */ - private final Object _value; + public abstract AMQType getType(); - public AMQTypedValue(AMQType type, Object value) + public abstract Object getValue(); + + public abstract void writeToBuffer(DataOutput buffer) throws IOException; + + public abstract int getEncodingSize(); + + + private static final class GenericTypedValue extends AMQTypedValue { - if (type == null) + /** The type of the value. */ + private final AMQType _type; + + /** The Java native representation of the AMQP typed value. */ + private final Object _value; + + private GenericTypedValue(AMQType type, Object value) { - throw new NullPointerException("Cannot create a typed value with null type"); + if (type == null) + { + throw new NullPointerException("Cannot create a typed value with null type"); + } + + _type = type; + _value = type.toNativeValue(value); } - _type = type; - _value = type.toNativeValue(value); - } + private GenericTypedValue(AMQType type, DataInput buffer) throws IOException + { + _type = type; + _value = type.readValueFromBuffer(buffer); + } + + + public AMQType getType() + { + return _type; + } + + public Object getValue() + { + return _value; + } + + public void writeToBuffer(DataOutput buffer) throws IOException + { + _type.writeToBuffer(_value, buffer); + } + + public int getEncodingSize() + { + return _type.getEncodingSize(_value); + } + + + public String toString() + { + return "[" + getType() + ": " + getValue() + "]"; + } + + + public boolean equals(Object o) + { + if(o instanceof GenericTypedValue) + { + GenericTypedValue other = (GenericTypedValue) o; + return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value)); + } + else + { + return false; + } + } + + public int hashCode() + { + return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode()); + } - private AMQTypedValue(AMQType type, DataInputStream buffer) throws IOException - { - _type = type; - _value = type.readValueFromBuffer(buffer); } - public AMQType getType() + private static final class LongTypedValue extends AMQTypedValue { - return _type; + + private final long _value; + + private LongTypedValue(long value) + { + _value = value; + } + + public LongTypedValue(DataInput buffer) throws IOException + { + _value = EncodingUtils.readLong(buffer); + } + + public AMQType getType() + { + return AMQType.LONG; + } + + + public Object getValue() + { + return _value; + } + + public void writeToBuffer(DataOutput buffer) throws IOException + { + EncodingUtils.writeByte(buffer,AMQType.LONG.identifier()); + EncodingUtils.writeLong(buffer,_value); + } + + + public int getEncodingSize() + { + return EncodingUtils.encodedLongLength(); + } } - public Object getValue() + private static final class IntTypedValue extends AMQTypedValue { - return _value; + + private final int _value; + + public IntTypedValue(int value) + { + _value = value; + } + + public AMQType getType() + { + return AMQType.INT; + } + + + public Object getValue() + { + return _value; + } + + public void writeToBuffer(DataOutput buffer) throws IOException + { + EncodingUtils.writeByte(buffer,AMQType.INT.identifier()); + EncodingUtils.writeInteger(buffer, _value); + } + + + public int getEncodingSize() + { + return EncodingUtils.encodedIntegerLength(); + } } - public void writeToBuffer(DataOutputStream buffer) throws IOException + + public static AMQTypedValue readFromBuffer(DataInput buffer) throws IOException { - _type.writeToBuffer(_value, buffer); + AMQType type = AMQTypeMap.getType(buffer.readByte()); + + switch(type) + { + case LONG: + return new LongTypedValue(buffer); + + case INT: + int value = EncodingUtils.readInteger(buffer); + return createAMQTypedValue(value); + + default: + return new GenericTypedValue(type, buffer); + } + } - public int getEncodingSize() + private static final AMQTypedValue[] INT_VALUES = new AMQTypedValue[16]; + static { - return _type.getEncodingSize(_value); + for(int i = 0 ; i < 16; i ++) + { + INT_VALUES[i] = new IntTypedValue(i); + } } - public static AMQTypedValue readFromBuffer(DataInputStream buffer) throws IOException + public static AMQTypedValue createAMQTypedValue(int i) { - AMQType type = AMQTypeMap.getType(buffer.readByte()); - - return new AMQTypedValue(type, buffer); + return (i & 0x0f) == i ? INT_VALUES[i] : new IntTypedValue(i); } - public String toString() + + public static AMQTypedValue createAMQTypedValue(long value) { - return "[" + getType() + ": " + getValue() + "]"; + return new LongTypedValue(value); } - - public boolean equals(Object o) + public static AMQTypedValue createAMQTypedValue(AMQType type, Object value) { - if(o instanceof AMQTypedValue) + switch(type) { - AMQTypedValue other = (AMQTypedValue) o; - return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value)); - } - else - { - return false; + case LONG: + return new LongTypedValue((Long) AMQType.LONG.toNativeValue(value)); + case INT: + return new IntTypedValue((Integer) AMQType.INT.toNativeValue(value)); + + default: + return new GenericTypedValue(type, value); } } - public int hashCode() - { - return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode()); - } public static AMQTypedValue toTypedValue(Object val) diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index 57622b5054..2739f7d14b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -20,8 +20,9 @@ */ package org.apache.qpid.framing; +import java.io.DataInput; import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.DataOutput; import java.io.IOException; import org.slf4j.Logger; @@ -80,6 +81,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti private static final int USER_ID_MASK = 1 << 4; private static final int APPLICATION_ID_MASK = 1 << 3; private static final int CLUSTER_ID_MASK = 1 << 2; + private byte[] _encodedForm; public BasicContentHeaderProperties() @@ -87,6 +89,12 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public int getPropertyListSize() { + if(_encodedForm != null && (_headers == null || _headers.isClean())) + { + return _encodedForm.length; + } + else + { int size = 0; if ((_propertyFlags & (CONTENT_TYPE_MASK)) > 0) @@ -167,6 +175,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti } return size; + } } public void setPropertyFlags(int propertyFlags) @@ -179,87 +188,94 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti return _propertyFlags; } - public void writePropertyListPayload(DataOutputStream buffer) throws IOException + public void writePropertyListPayload(DataOutput buffer) throws IOException { - if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) + if(_encodedForm != null && (_headers == null || !_headers.isClean())) { - EncodingUtils.writeShortStringBytes(buffer, _contentType); + buffer.write(_encodedForm); } - - if ((_propertyFlags & ENCODING_MASK) != 0) + else { - EncodingUtils.writeShortStringBytes(buffer, _encoding); - } + if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _contentType); + } - if ((_propertyFlags & HEADERS_MASK) != 0) - { - EncodingUtils.writeFieldTableBytes(buffer, _headers); - } + if ((_propertyFlags & ENCODING_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _encoding); + } - if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) - { - buffer.writeByte(_deliveryMode); - } + if ((_propertyFlags & HEADERS_MASK) != 0) + { + EncodingUtils.writeFieldTableBytes(buffer, _headers); + } - if ((_propertyFlags & PRIORITY_MASK) != 0) - { - buffer.writeByte(_priority); - } + if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) + { + buffer.writeByte(_deliveryMode); + } - if ((_propertyFlags & CORRELATION_ID_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _correlationId); - } + if ((_propertyFlags & PRIORITY_MASK) != 0) + { + buffer.writeByte(_priority); + } - if ((_propertyFlags & REPLY_TO_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _replyTo); - } + if ((_propertyFlags & CORRELATION_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _correlationId); + } - if ((_propertyFlags & EXPIRATION_MASK) != 0) - { - if (_expiration == 0L) + if ((_propertyFlags & REPLY_TO_MASK) != 0) { - EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING); + EncodingUtils.writeShortStringBytes(buffer, _replyTo); } - else + + if ((_propertyFlags & EXPIRATION_MASK) != 0) { - EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration)); + if (_expiration == 0L) + { + EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING); + } + else + { + EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration)); + } } - } - if ((_propertyFlags & MESSAGE_ID_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _messageId); - } + if ((_propertyFlags & MESSAGE_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _messageId); + } - if ((_propertyFlags & TIMESTAMP_MASK) != 0) - { - EncodingUtils.writeTimestamp(buffer, _timestamp); - } + if ((_propertyFlags & TIMESTAMP_MASK) != 0) + { + EncodingUtils.writeTimestamp(buffer, _timestamp); + } - if ((_propertyFlags & TYPE_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _type); - } + if ((_propertyFlags & TYPE_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _type); + } - if ((_propertyFlags & USER_ID_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _userId); - } + if ((_propertyFlags & USER_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _userId); + } - if ((_propertyFlags & APPLICATION_ID_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _appId); - } + if ((_propertyFlags & APPLICATION_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _appId); + } - if ((_propertyFlags & CLUSTER_ID_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _clusterId); + if ((_propertyFlags & CLUSTER_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _clusterId); + } } } - public void populatePropertiesFromBuffer(DataInputStream buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException + public void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException { _propertyFlags = propertyFlags; @@ -268,26 +284,40 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti _logger.debug("Property flags: " + _propertyFlags); } - decode(buffer); + _encodedForm = new byte[size]; + buffer.readFully(_encodedForm); + + ByteArrayDataInput input = new ByteArrayDataInput(_encodedForm); + + decode(input); + } - private void decode(DataInputStream buffer) throws IOException, AMQFrameDecodingException + private void decode(ByteArrayDataInput buffer) throws IOException, AMQFrameDecodingException { // ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); + int headersOffset = 0; + if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) { - _contentType = EncodingUtils.readAMQShortString(buffer); + _contentType = buffer.readAMQShortString(); + headersOffset += EncodingUtils.encodedShortStringLength(_contentType); } if ((_propertyFlags & ENCODING_MASK) != 0) { - _encoding = EncodingUtils.readAMQShortString(buffer); + _encoding = buffer.readAMQShortString(); + headersOffset += EncodingUtils.encodedShortStringLength(_encoding); } if ((_propertyFlags & HEADERS_MASK) != 0) { - _headers = EncodingUtils.readFieldTable(buffer); + long length = EncodingUtils.readUnsignedInteger(buffer); + + _headers = new FieldTable(_encodedForm, headersOffset+4, (int)length); + + buffer.skipBytes((int)length); } if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) @@ -302,12 +332,12 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti if ((_propertyFlags & CORRELATION_ID_MASK) != 0) { - _correlationId = EncodingUtils.readAMQShortString(buffer); + _correlationId = buffer.readAMQShortString(); } if ((_propertyFlags & REPLY_TO_MASK) != 0) { - _replyTo = EncodingUtils.readAMQShortString(buffer); + _replyTo = buffer.readAMQShortString(); } if ((_propertyFlags & EXPIRATION_MASK) != 0) @@ -317,7 +347,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti if ((_propertyFlags & MESSAGE_ID_MASK) != 0) { - _messageId = EncodingUtils.readAMQShortString(buffer); + _messageId = buffer.readAMQShortString(); } if ((_propertyFlags & TIMESTAMP_MASK) != 0) @@ -327,22 +357,22 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti if ((_propertyFlags & TYPE_MASK) != 0) { - _type = EncodingUtils.readAMQShortString(buffer); + _type = buffer.readAMQShortString(); } if ((_propertyFlags & USER_ID_MASK) != 0) { - _userId = EncodingUtils.readAMQShortString(buffer); + _userId = buffer.readAMQShortString(); } if ((_propertyFlags & APPLICATION_ID_MASK) != 0) { - _appId = EncodingUtils.readAMQShortString(buffer); + _appId = buffer.readAMQShortString(); } if ((_propertyFlags & CLUSTER_ID_MASK) != 0) { - _clusterId = EncodingUtils.readAMQShortString(buffer); + _clusterId = buffer.readAMQShortString(); } @@ -363,11 +393,12 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti { _propertyFlags |= (CONTENT_TYPE_MASK); _contentType = contentType; + _encodedForm = null; } public void setContentType(String contentType) { - setContentType((contentType == null) ? null : new AMQShortString(contentType)); + setContentType((contentType == null) ? null : AMQShortString.valueOf(contentType)); } public String getEncodingAsString() @@ -384,13 +415,15 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public void setEncoding(String encoding) { _propertyFlags |= ENCODING_MASK; - _encoding = (encoding == null) ? null : new AMQShortString(encoding); + _encoding = (encoding == null) ? null : AMQShortString.valueOf(encoding); + _encodedForm = null; } public void setEncoding(AMQShortString encoding) { _propertyFlags |= ENCODING_MASK; _encoding = encoding; + _encodedForm = null; } public FieldTable getHeaders() @@ -407,6 +440,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti { _propertyFlags |= HEADERS_MASK; _headers = headers; + _encodedForm = null; } public byte getDeliveryMode() @@ -418,6 +452,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti { _propertyFlags |= DELIVERY_MODE_MASK; _deliveryMode = deliveryMode; + _encodedForm = null; } public byte getPriority() @@ -429,6 +464,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti { _propertyFlags |= PRIORITY_MASK; _priority = priority; + _encodedForm = null; } public AMQShortString getCorrelationId() @@ -443,13 +479,14 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public void setCorrelationId(String correlationId) { - setCorrelationId((correlationId == null) ? null : new AMQShortString(correlationId)); + setCorrelationId((correlationId == null) ? null : AMQShortString.valueOf(correlationId)); } public void setCorrelationId(AMQShortString correlationId) { _propertyFlags |= CORRELATION_ID_MASK; _correlationId = correlationId; + _encodedForm = null; } public String getReplyToAsString() @@ -464,13 +501,14 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public void setReplyTo(String replyTo) { - setReplyTo((replyTo == null) ? null : new AMQShortString(replyTo)); + setReplyTo((replyTo == null) ? null : AMQShortString.valueOf(replyTo)); } public void setReplyTo(AMQShortString replyTo) { _propertyFlags |= REPLY_TO_MASK; _replyTo = replyTo; + _encodedForm = null; } public long getExpiration() @@ -482,6 +520,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti { _propertyFlags |= EXPIRATION_MASK; _expiration = expiration; + _encodedForm = null; } public AMQShortString getMessageId() @@ -498,12 +537,14 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti { _propertyFlags |= MESSAGE_ID_MASK; _messageId = (messageId == null) ? null : new AMQShortString(messageId); + _encodedForm = null; } public void setMessageId(AMQShortString messageId) { _propertyFlags |= MESSAGE_ID_MASK; _messageId = messageId; + _encodedForm = null; } public long getTimestamp() @@ -515,6 +556,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti { _propertyFlags |= TIMESTAMP_MASK; _timestamp = timestamp; + _encodedForm = null; } public String getTypeAsString() @@ -529,13 +571,14 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public void setType(String type) { - setType((type == null) ? null : new AMQShortString(type)); + setType((type == null) ? null : AMQShortString.valueOf(type)); } public void setType(AMQShortString type) { _propertyFlags |= TYPE_MASK; _type = type; + _encodedForm = null; } public String getUserIdAsString() @@ -550,13 +593,14 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public void setUserId(String userId) { - setUserId((userId == null) ? null : new AMQShortString(userId)); + setUserId((userId == null) ? null : AMQShortString.valueOf(userId)); } public void setUserId(AMQShortString userId) { _propertyFlags |= USER_ID_MASK; _userId = userId; + _encodedForm = null; } public String getAppIdAsString() @@ -571,13 +615,14 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public void setAppId(String appId) { - setAppId((appId == null) ? null : new AMQShortString(appId)); + setAppId((appId == null) ? null : AMQShortString.valueOf(appId)); } public void setAppId(AMQShortString appId) { _propertyFlags |= APPLICATION_ID_MASK; _appId = appId; + _encodedForm = null; } public String getClusterIdAsString() @@ -592,13 +637,14 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public void setClusterId(String clusterId) { - setClusterId((clusterId == null) ? null : new AMQShortString(clusterId)); + setClusterId((clusterId == null) ? null : AMQShortString.valueOf(clusterId)); } public void setClusterId(AMQShortString clusterId) { _propertyFlags |= CLUSTER_ID_MASK; _clusterId = clusterId; + _encodedForm = null; } public String toString() diff --git a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java index f9580d82b1..554e9373d8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.framing; -import java.io.DataInputStream; +import org.apache.qpid.codec.MarkableDataInput; + import java.io.IOException; /** @@ -28,5 +29,5 @@ import java.io.IOException; */ public interface BodyFactory { - AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException; + AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java b/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java new file mode 100644 index 0000000000..656185629b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java @@ -0,0 +1,174 @@ +package org.apache.qpid.framing; + +import org.apache.qpid.codec.MarkableDataInput; + +import java.io.IOException; + +public class ByteArrayDataInput implements ExtendedDataInput, MarkableDataInput +{ + private byte[] _data; + private int _offset; + private int _length; + private int _origin; + private int _mark; + + public ByteArrayDataInput(byte[] data) + { + this(data,0, data.length); + } + + public ByteArrayDataInput(byte[] data, int offset, int length) + { + _data = data; + _offset = offset; + _length = length; + _origin = offset; + _mark = 0; + } + + public void readFully(byte[] b) + { + System.arraycopy(_data,_offset,b,0,b.length); + _offset+=b.length; + } + + public void readFully(byte[] b, int off, int len) + { + System.arraycopy(_data,_offset,b,off,len); + _offset+=len; + } + + public int skipBytes(int n) + { + return _offset+=n; + } + + public boolean readBoolean() + { + return _data[_offset++] != 0; + } + + public byte readByte() + { + return _data[_offset++]; + } + + public int readUnsignedByte() + { + return ((int)_data[_offset++]) & 0xFF; + } + + public short readShort() + { + return (short) (((((int)_data[_offset++]) << 8) & 0xFF00) | (((int)_data[_offset++]) & 0xFF)); + } + + public int readUnsignedShort() + { + return (((((int)_data[_offset++]) << 8) & 0xFF00) | (((int)_data[_offset++]) & 0xFF)); + } + + public char readChar() + { + return (char) (((((int)_data[_offset++]) << 8) & 0xFF00) | (((int)_data[_offset++]) & 0xFF)); + } + + public int readInt() + { + return ((((int)_data[_offset++]) << 24) & 0xFF000000) + | ((((int)_data[_offset++]) << 16) & 0xFF0000) + | ((((int)_data[_offset++]) << 8) & 0xFF00) + | (((int)_data[_offset++]) & 0xFF); + } + + public long readLong() + { + return ((((long)_data[_offset++]) << 56) & 0xFF00000000000000L) + | ((((long)_data[_offset++]) << 48) & 0xFF000000000000L) + | ((((long)_data[_offset++]) << 40) & 0xFF0000000000L) + | ((((long)_data[_offset++]) << 32) & 0xFF00000000L) + | ((((long)_data[_offset++]) << 24) & 0xFF000000L) + | ((((long)_data[_offset++]) << 16) & 0xFF0000L) + | ((((long)_data[_offset++]) << 8) & 0xFF00L) + | (((long)_data[_offset++]) & 0xFFL); + } + + public float readFloat() + { + return Float.intBitsToFloat(readInt()); + } + + public double readDouble() + { + return Double.longBitsToDouble(readLong()); + } + + public AMQShortString readAMQShortString() + { + int length = _data[_offset++] & 0xff; + if(length == 0) + { + return null; + } + else + { + final AMQShortString amqShortString = new AMQShortString(_data, _offset, length); + _offset+=length; + return amqShortString; + } + } + + public String readLine() + { + throw new UnsupportedOperationException(); + } + + public String readUTF() + { + throw new UnsupportedOperationException(); + } + + public int available() + { + return (_origin+_length)-_offset; + } + + + public long skip(long i) + { + _offset+=i; + return i; + } + + public int read(byte[] b) + { + readFully(b); + return b.length; + } + + public int position() + { + return _offset - _origin; + } + + public void position(int position) + { + _offset = position + _origin; + } + + public int length() + { + return _length; + } + + + public void mark(int readAhead) + { + _mark = _offset-_origin; + } + + public void reset() + { + _offset = _origin + _mark; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java index 15bc20c52d..098e3652ad 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java +++ b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.framing; -import java.io.DataOutputStream; +import java.io.DataOutput; import java.io.IOException; public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock @@ -50,7 +50,7 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD return frameSize; } - public void writePayload(DataOutputStream buffer) throws IOException + public void writePayload(DataOutput buffer) throws IOException { for (int i = 0; i < _blocks.length; i++) { diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index aedb35f92a..541d104dc9 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -20,9 +20,11 @@ */ package org.apache.qpid.framing; +import java.io.DataInput; import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.AMQException; @@ -37,10 +39,10 @@ public class ContentBody implements AMQBody { } - public ContentBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException + public ContentBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException { _payload = new byte[(int)size]; - buffer.read(_payload); + buffer.readFully(_payload); } @@ -59,7 +61,7 @@ public class ContentBody implements AMQBody return _payload == null ? 0 : _payload.length; } - public void writePayload(DataOutputStream buffer) throws IOException + public void writePayload(DataOutput buffer) throws IOException { buffer.write(_payload); } @@ -84,11 +86,62 @@ public class ContentBody implements AMQBody { } + private static class BufferContentBody implements AMQBody + { + private final int _length; + private final int _offset; + private final ByteBuffer _buf; + + private BufferContentBody( ByteBuffer buf, int offset, int length) + { + _length = length; + _offset = offset; + _buf = buf; + } + + public byte getFrameType() + { + return TYPE; + } + + + public int getSize() + { + return _length; + } + public void writePayload(DataOutput buffer) throws IOException + { + if(_buf.hasArray()) + { + buffer.write(_buf.array(), _buf.arrayOffset() + _offset, _length); + } + else + { + byte[] data = new byte[_length]; + ByteBuffer buf = _buf.duplicate(); + + buf.position(_offset); + buf.limit(_offset+_length); + buf.get(data); + buffer.write(data); + } + } + + + public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException + { + throw new RuntimeException("Buffered Body only to be used for outgoing data"); + } + } + + public static AMQFrame createAMQFrame(int channelId, ByteBuffer buf, int offset, int length) + { + return new AMQFrame(channelId, new BufferContentBody(buf, offset, length)); + } public static AMQFrame createAMQFrame(int channelId, ContentBody body) { - final AMQFrame frame = new AMQFrame(channelId, body); - return frame; + return new AMQFrame(channelId, body); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java index a0b030ab6b..de2ffe9755 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java @@ -20,9 +20,9 @@ */ package org.apache.qpid.framing; -import java.io.DataInputStream; import java.io.IOException; +import org.apache.qpid.codec.MarkableDataInput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +42,7 @@ public class ContentBodyFactory implements BodyFactory _log.debug("Creating content body factory"); } - public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException + public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException { return new ContentBody(in, bodySize); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 18d0f26152..8a2ad53157 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -20,8 +20,9 @@ */ package org.apache.qpid.framing; +import java.io.DataInput; import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.DataOutput; import java.io.IOException; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; @@ -45,7 +46,7 @@ public class ContentHeaderBody implements AMQBody { } - public ContentHeaderBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException + public ContentHeaderBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException { classId = buffer.readUnsignedShort(); weight = buffer.readUnsignedShort(); @@ -106,7 +107,7 @@ public class ContentHeaderBody implements AMQBody return 2 + 2 + 8 + 2 + properties.getPropertyListSize(); } - public void writePayload(DataOutputStream buffer) throws IOException + public void writePayload(DataOutput buffer) throws IOException { EncodingUtils.writeUnsignedShort(buffer, classId); EncodingUtils.writeUnsignedShort(buffer, weight); diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java index a474e337b7..c3e4c69ec0 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java @@ -20,9 +20,9 @@ */ package org.apache.qpid.framing; -import java.io.DataInputStream; import java.io.IOException; +import org.apache.qpid.codec.MarkableDataInput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +42,7 @@ public class ContentHeaderBodyFactory implements BodyFactory _log.debug("Creating content header body factory"); } - public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException + public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException { // all content headers are the same - it is only the properties that differ. // the content header body further delegates construction of properties diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java index 237929f9a3..ea8358a538 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java @@ -20,8 +20,9 @@ */ package org.apache.qpid.framing; +import java.io.DataInput; import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.DataOutput; import java.io.IOException; @@ -35,7 +36,7 @@ public interface ContentHeaderProperties * Writes the property list to the buffer, in a suitably encoded form. * @param buffer The buffer to write to */ - void writePropertyListPayload(DataOutputStream buffer) throws IOException; + void writePropertyListPayload(DataOutput buffer) throws IOException; /** * Populates the properties from buffer. @@ -43,7 +44,7 @@ public interface ContentHeaderProperties * @param propertyFlags he property flags. * @throws AMQFrameDecodingException when the buffer does not contain valid data */ - void populatePropertiesFromBuffer(DataInputStream buffer, int propertyFlags, int size) + void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException; /** diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java index 43ee8cd1f1..48bd52858d 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.framing; +import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; @@ -39,7 +40,7 @@ public class ContentHeaderPropertiesFactory } public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags, - DataInputStream buffer, int size) + DataInput buffer, int size) throws AMQFrameDecodingException, IOException { ContentHeaderProperties properties; diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java index 2d7e27405c..e018407509 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java +++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -219,7 +219,7 @@ public class EncodingUtils return 0; } - public static void writeShortStringBytes(DataOutputStream buffer, String s) throws IOException + public static void writeShortStringBytes(DataOutput buffer, String s) throws IOException { if (s != null) { @@ -243,7 +243,7 @@ public class EncodingUtils } } - public static void writeShortStringBytes(DataOutputStream buffer, AMQShortString s) throws IOException + public static void writeShortStringBytes(DataOutput buffer, AMQShortString s) throws IOException { if (s != null) { @@ -257,7 +257,7 @@ public class EncodingUtils } } - public static void writeLongStringBytes(DataOutputStream buffer, String s) throws IOException + public static void writeLongStringBytes(DataOutput buffer, String s) throws IOException { assert (s == null) || (s.length() <= 0xFFFE); if (s != null) @@ -279,7 +279,7 @@ public class EncodingUtils } } - public static void writeLongStringBytes(DataOutputStream buffer, char[] s) throws IOException + public static void writeLongStringBytes(DataOutput buffer, char[] s) throws IOException { assert (s == null) || (s.length <= 0xFFFE); if (s != null) @@ -300,7 +300,7 @@ public class EncodingUtils } } - public static void writeLongStringBytes(DataOutputStream buffer, byte[] bytes) throws IOException + public static void writeLongStringBytes(DataOutput buffer, byte[] bytes) throws IOException { assert (bytes == null) || (bytes.length <= 0xFFFE); if (bytes != null) @@ -314,13 +314,13 @@ public class EncodingUtils } } - public static void writeUnsignedByte(DataOutputStream buffer, short b) throws IOException + public static void writeUnsignedByte(DataOutput buffer, short b) throws IOException { byte bv = (byte) b; buffer.write(bv); } - public static void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException + public static void writeUnsignedShort(DataOutput buffer, int s) throws IOException { // TODO: Is this comparison safe? Do I need to cast RHS to long? if (s < Short.MAX_VALUE) @@ -340,7 +340,7 @@ public class EncodingUtils return 4; } - public static void writeUnsignedInteger(DataOutputStream buffer, long l) throws IOException + public static void writeUnsignedInteger(DataOutput buffer, long l) throws IOException { // TODO: Is this comparison safe? Do I need to cast RHS to long? if (l < Integer.MAX_VALUE) @@ -360,7 +360,7 @@ public class EncodingUtils } } - public static void writeFieldTableBytes(DataOutputStream buffer, FieldTable table) throws IOException + public static void writeFieldTableBytes(DataOutput buffer, FieldTable table) throws IOException { if (table != null) { @@ -372,12 +372,12 @@ public class EncodingUtils } } - public static void writeContentBytes(DataOutputStream buffer, Content content) + public static void writeContentBytes(DataOutput buffer, Content content) { // TODO: New Content class required for AMQP 0-9. } - public static void writeBooleans(DataOutputStream buffer, boolean[] values) throws IOException + public static void writeBooleans(DataOutput buffer, boolean[] values) throws IOException { byte packedValue = 0; for (int i = 0; i < values.length; i++) @@ -391,13 +391,13 @@ public class EncodingUtils buffer.write(packedValue); } - public static void writeBooleans(DataOutputStream buffer, boolean value) throws IOException + public static void writeBooleans(DataOutput buffer, boolean value) throws IOException { buffer.write(value ? (byte) 1 : (byte) 0); } - public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1) throws IOException + public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -409,7 +409,7 @@ public class EncodingUtils buffer.write(packedValue); } - public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2) throws IOException + public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -426,7 +426,7 @@ public class EncodingUtils buffer.write(packedValue); } - public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3) throws IOException + public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -448,7 +448,7 @@ public class EncodingUtils buffer.write(packedValue); } - public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3, + public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3, boolean value4) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -476,7 +476,7 @@ public class EncodingUtils buffer.write(packedValue); } - public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3, + public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3, boolean value4, boolean value5) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -509,7 +509,7 @@ public class EncodingUtils buffer.write(packedValue); } - public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3, + public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3, boolean value4, boolean value5, boolean value6) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -547,7 +547,7 @@ public class EncodingUtils buffer.write(packedValue); } - public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3, + public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3, boolean value4, boolean value5, boolean value6, boolean value7) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -596,7 +596,7 @@ public class EncodingUtils * @param buffer * @param data */ - public static void writeLongstr(DataOutputStream buffer, byte[] data) throws IOException + public static void writeLongstr(DataOutput buffer, byte[] data) throws IOException { if (data != null) { @@ -609,12 +609,12 @@ public class EncodingUtils } } - public static void writeTimestamp(DataOutputStream buffer, long timestamp) throws IOException + public static void writeTimestamp(DataOutput buffer, long timestamp) throws IOException { writeLong(buffer, timestamp); } - public static boolean[] readBooleans(DataInputStream buffer) throws IOException + public static boolean[] readBooleans(DataInput buffer) throws IOException { final byte packedValue = buffer.readByte(); if (packedValue == 0) @@ -641,7 +641,7 @@ public class EncodingUtils return result; } - public static FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException + public static FieldTable readFieldTable(DataInput buffer) throws AMQFrameDecodingException, IOException { long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL; if (length == 0) @@ -654,19 +654,19 @@ public class EncodingUtils } } - public static Content readContent(DataInputStream buffer) throws AMQFrameDecodingException + public static Content readContent(DataInput buffer) throws AMQFrameDecodingException { // TODO: New Content class required for AMQP 0-9. return null; } - public static AMQShortString readAMQShortString(DataInputStream buffer) throws IOException + public static AMQShortString readAMQShortString(DataInput buffer) throws IOException { return AMQShortString.readFromBuffer(buffer); } - public static String readShortString(DataInputStream buffer) throws IOException + public static String readShortString(DataInput buffer) throws IOException { short length = (short) (((short)buffer.readByte()) & 0xFF); if (length == 0) @@ -681,7 +681,7 @@ public class EncodingUtils // this approach here is valid since we know that all the chars are // ASCII (0-127) byte[] stringBytes = new byte[length]; - buffer.read(stringBytes, 0, length); + buffer.readFully(stringBytes, 0, length); char[] stringChars = new char[length]; for (int i = 0; i < stringChars.length; i++) { @@ -692,7 +692,7 @@ public class EncodingUtils } } - public static String readLongString(DataInputStream buffer) throws IOException + public static String readLongString(DataInput buffer) throws IOException { long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL; if (length == 0) @@ -707,7 +707,7 @@ public class EncodingUtils // this approach here is valid since we know that all the chars are // ASCII (0-127) byte[] stringBytes = new byte[(int) length]; - buffer.read(stringBytes, 0, (int) length); + buffer.readFully(stringBytes, 0, (int) length); char[] stringChars = new char[(int) length]; for (int i = 0; i < stringChars.length; i++) { @@ -718,7 +718,7 @@ public class EncodingUtils } } - public static byte[] readLongstr(DataInputStream buffer) throws IOException + public static byte[] readLongstr(DataInput buffer) throws IOException { long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL; if (length == 0) @@ -728,13 +728,13 @@ public class EncodingUtils else { byte[] result = new byte[(int) length]; - buffer.read(result); + buffer.readFully(result); return result; } } - public static long readTimestamp(DataInputStream buffer) throws IOException + public static long readTimestamp(DataInput buffer) throws IOException { // Discard msb from AMQ timestamp // buffer.getUnsignedInt(); @@ -818,12 +818,12 @@ public class EncodingUtils // AMQP_BOOLEAN_PROPERTY_PREFIX - public static void writeBoolean(DataOutputStream buffer, Boolean aBoolean) throws IOException + public static void writeBoolean(DataOutput buffer, boolean aBoolean) throws IOException { buffer.write(aBoolean ? 1 : 0); } - public static boolean readBoolean(DataInputStream buffer) throws IOException + public static boolean readBoolean(DataInput buffer) throws IOException { byte packedValue = buffer.readByte(); @@ -836,12 +836,12 @@ public class EncodingUtils } // AMQP_BYTE_PROPERTY_PREFIX - public static void writeByte(DataOutputStream buffer, Byte aByte) throws IOException + public static void writeByte(DataOutput buffer, byte aByte) throws IOException { buffer.writeByte(aByte); } - public static byte readByte(DataInputStream buffer) throws IOException + public static byte readByte(DataInput buffer) throws IOException { return buffer.readByte(); } @@ -852,12 +852,12 @@ public class EncodingUtils } // AMQP_SHORT_PROPERTY_PREFIX - public static void writeShort(DataOutputStream buffer, Short aShort) throws IOException + public static void writeShort(DataOutput buffer, short aShort) throws IOException { buffer.writeShort(aShort); } - public static short readShort(DataInputStream buffer) throws IOException + public static short readShort(DataInput buffer) throws IOException { return buffer.readShort(); } @@ -868,12 +868,12 @@ public class EncodingUtils } // INTEGER_PROPERTY_PREFIX - public static void writeInteger(DataOutputStream buffer, Integer aInteger) throws IOException + public static void writeInteger(DataOutput buffer, int aInteger) throws IOException { buffer.writeInt(aInteger); } - public static int readInteger(DataInputStream buffer) throws IOException + public static int readInteger(DataInput buffer) throws IOException { return buffer.readInt(); } @@ -884,12 +884,12 @@ public class EncodingUtils } // AMQP_LONG_PROPERTY_PREFIX - public static void writeLong(DataOutputStream buffer, Long aLong) throws IOException + public static void writeLong(DataOutput buffer, long aLong) throws IOException { buffer.writeLong(aLong); } - public static long readLong(DataInputStream buffer) throws IOException + public static long readLong(DataInput buffer) throws IOException { return buffer.readLong(); } @@ -900,12 +900,12 @@ public class EncodingUtils } // Float_PROPERTY_PREFIX - public static void writeFloat(DataOutputStream buffer, Float aFloat) throws IOException + public static void writeFloat(DataOutput buffer, float aFloat) throws IOException { buffer.writeFloat(aFloat); } - public static float readFloat(DataInputStream buffer) throws IOException + public static float readFloat(DataInput buffer) throws IOException { return buffer.readFloat(); } @@ -916,12 +916,12 @@ public class EncodingUtils } // Double_PROPERTY_PREFIX - public static void writeDouble(DataOutputStream buffer, Double aDouble) throws IOException + public static void writeDouble(DataOutput buffer, Double aDouble) throws IOException { buffer.writeDouble(aDouble); } - public static double readDouble(DataInputStream buffer) throws IOException + public static double readDouble(DataInput buffer) throws IOException { return buffer.readDouble(); } @@ -931,7 +931,7 @@ public class EncodingUtils return 8; } - public static byte[] readBytes(DataInputStream buffer) throws IOException + public static byte[] readBytes(DataInput buffer) throws IOException { long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL; if (length == 0) @@ -941,13 +941,13 @@ public class EncodingUtils else { byte[] dataBytes = new byte[(int)length]; - buffer.read(dataBytes, 0, (int) length); + buffer.readFully(dataBytes, 0, (int) length); return dataBytes; } } - public static void writeBytes(DataOutputStream buffer, byte[] data) throws IOException + public static void writeBytes(DataOutput buffer, byte[] data) throws IOException { if (data != null) { @@ -969,19 +969,19 @@ public class EncodingUtils return encodedByteLength(); } - public static char readChar(DataInputStream buffer) throws IOException + public static char readChar(DataInput buffer) throws IOException { // This is valid as we know that the Character is ASCII 0..127 - return (char) buffer.read(); + return (char) buffer.readByte(); } - public static void writeChar(DataOutputStream buffer, char character) throws IOException + public static void writeChar(DataOutput buffer, char character) throws IOException { // This is valid as we know that the Character is ASCII 0..127 writeByte(buffer, (byte) character); } - public static long readLongAsShortString(DataInputStream buffer) throws IOException + public static long readLongAsShortString(DataInput buffer) throws IOException { short length = (short) buffer.readUnsignedByte(); short pos = 0; @@ -1018,7 +1018,7 @@ public class EncodingUtils return result; } - public static long readUnsignedInteger(DataInputStream buffer) throws IOException + public static long readUnsignedInteger(DataInput buffer) throws IOException { long l = 0xFF & buffer.readByte(); l <<= 8; diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java b/java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java new file mode 100644 index 0000000000..c789d9275e --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java @@ -0,0 +1,14 @@ +package org.apache.qpid.framing; + +import java.io.DataInput; + +public interface ExtendedDataInput extends DataInput +{ + AMQShortString readAMQShortString(); + + int available(); + + int position(); + + void position(int position); +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 4a126b8504..863e363b87 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -25,11 +25,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.AMQPInvalidClassException; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; +import java.io.*; import java.math.BigDecimal; import java.util.Collections; import java.util.Enumeration; @@ -44,18 +40,27 @@ import java.util.Set; public class FieldTable { private static final Logger _logger = LoggerFactory.getLogger(FieldTable.class); - private static final String STRICT_AMQP = "STRICT_AMQP"; - private final boolean _strictAMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP, "false")); + private static final String STRICT_AMQP_NAME = "STRICT_AMQP"; + private static final boolean STRICT_AMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP_NAME, "false")); private byte[] _encodedForm; + private int _encodedFormOffset; private LinkedHashMap<AMQShortString, AMQTypedValue> _properties = null; private long _encodedSize; private static final int INITIAL_HASHMAP_CAPACITY = 16; private static final int INITIAL_ENCODED_FORM_SIZE = 256; + private final boolean _strictAMQP; public FieldTable() { + this(STRICT_AMQP); + } + + + public FieldTable(boolean strictAMQP) + { super(); + _strictAMQP = strictAMQP; } /** @@ -64,14 +69,28 @@ public class FieldTable * @param buffer the buffer from which to read data. The length byte must be read already * @param length the length of the field table. Must be > 0. */ - public FieldTable(DataInputStream buffer, long length) throws IOException + public FieldTable(DataInput buffer, long length) throws IOException { this(); _encodedForm = new byte[(int) length]; - buffer.read(_encodedForm); + buffer.readFully(_encodedForm); _encodedSize = length; } + public FieldTable(byte[] encodedForm, int offset, int length) throws IOException + { + this(); + _encodedForm = encodedForm; + _encodedFormOffset = offset; + _encodedSize = length; + } + + + public boolean isClean() + { + return _encodedForm != null; + } + public AMQTypedValue getProperty(AMQShortString string) { checkPropertyName(string); @@ -181,7 +200,7 @@ public class FieldTable public Boolean getBoolean(String string) { - return getBoolean(new AMQShortString(string)); + return getBoolean(AMQShortString.valueOf(string)); } public Boolean getBoolean(AMQShortString string) @@ -199,7 +218,7 @@ public class FieldTable public Byte getByte(String string) { - return getByte(new AMQShortString(string)); + return getByte(AMQShortString.valueOf(string)); } public Byte getByte(AMQShortString string) @@ -217,7 +236,7 @@ public class FieldTable public Short getShort(String string) { - return getShort(new AMQShortString(string)); + return getShort(AMQShortString.valueOf(string)); } public Short getShort(AMQShortString string) @@ -235,7 +254,7 @@ public class FieldTable public Integer getInteger(String string) { - return getInteger(new AMQShortString(string)); + return getInteger(AMQShortString.valueOf(string)); } public Integer getInteger(AMQShortString string) @@ -253,7 +272,7 @@ public class FieldTable public Long getLong(String string) { - return getLong(new AMQShortString(string)); + return getLong(AMQShortString.valueOf(string)); } public Long getLong(AMQShortString string) @@ -271,7 +290,7 @@ public class FieldTable public Float getFloat(String string) { - return getFloat(new AMQShortString(string)); + return getFloat(AMQShortString.valueOf(string)); } public Float getFloat(AMQShortString string) @@ -289,7 +308,7 @@ public class FieldTable public Double getDouble(String string) { - return getDouble(new AMQShortString(string)); + return getDouble(AMQShortString.valueOf(string)); } public Double getDouble(AMQShortString string) @@ -307,7 +326,7 @@ public class FieldTable public String getString(String string) { - return getString(new AMQShortString(string)); + return getString(AMQShortString.valueOf(string)); } public String getString(AMQShortString string) @@ -330,7 +349,7 @@ public class FieldTable public Character getCharacter(String string) { - return getCharacter(new AMQShortString(string)); + return getCharacter(AMQShortString.valueOf(string)); } public Character getCharacter(AMQShortString string) @@ -348,7 +367,7 @@ public class FieldTable public byte[] getBytes(String string) { - return getBytes(new AMQShortString(string)); + return getBytes(AMQShortString.valueOf(string)); } public byte[] getBytes(AMQShortString string) @@ -374,7 +393,7 @@ public class FieldTable */ public FieldTable getFieldTable(String string) { - return getFieldTable(new AMQShortString(string)); + return getFieldTable(AMQShortString.valueOf(string)); } /** @@ -401,7 +420,7 @@ public class FieldTable public Object getObject(String string) { - return getObject(new AMQShortString(string)); + return getObject(AMQShortString.valueOf(string)); } public Object getObject(AMQShortString string) @@ -447,7 +466,7 @@ public class FieldTable // ************ Setters public Object setBoolean(String string, Boolean b) { - return setBoolean(new AMQShortString(string), b); + return setBoolean(AMQShortString.valueOf(string), b); } public Object setBoolean(AMQShortString string, Boolean b) @@ -457,7 +476,7 @@ public class FieldTable public Object setByte(String string, Byte b) { - return setByte(new AMQShortString(string), b); + return setByte(AMQShortString.valueOf(string), b); } public Object setByte(AMQShortString string, Byte b) @@ -467,7 +486,7 @@ public class FieldTable public Object setShort(String string, Short i) { - return setShort(new AMQShortString(string), i); + return setShort(AMQShortString.valueOf(string), i); } public Object setShort(AMQShortString string, Short i) @@ -475,29 +494,29 @@ public class FieldTable return setProperty(string, AMQType.SHORT.asTypedValue(i)); } - public Object setInteger(String string, Integer i) + public Object setInteger(String string, int i) { - return setInteger(new AMQShortString(string), i); + return setInteger(AMQShortString.valueOf(string), i); } - public Object setInteger(AMQShortString string, Integer i) + public Object setInteger(AMQShortString string, int i) { - return setProperty(string, AMQType.INT.asTypedValue(i)); + return setProperty(string, AMQTypedValue.createAMQTypedValue(i)); } - public Object setLong(String string, Long l) + public Object setLong(String string, long l) { - return setLong(new AMQShortString(string), l); + return setLong(AMQShortString.valueOf(string), l); } - public Object setLong(AMQShortString string, Long l) + public Object setLong(AMQShortString string, long l) { - return setProperty(string, AMQType.LONG.asTypedValue(l)); + return setProperty(string, AMQTypedValue.createAMQTypedValue(l)); } public Object setFloat(String string, Float f) { - return setFloat(new AMQShortString(string), f); + return setFloat(AMQShortString.valueOf(string), f); } public Object setFloat(AMQShortString string, Float v) @@ -507,7 +526,7 @@ public class FieldTable public Object setDouble(String string, Double d) { - return setDouble(new AMQShortString(string), d); + return setDouble(AMQShortString.valueOf(string), d); } public Object setDouble(AMQShortString string, Double v) @@ -517,7 +536,7 @@ public class FieldTable public Object setString(String string, String s) { - return setString(new AMQShortString(string), s); + return setString(AMQShortString.valueOf(string), s); } public Object setAsciiString(AMQShortString string, String value) @@ -546,7 +565,7 @@ public class FieldTable public Object setChar(String string, char c) { - return setChar(new AMQShortString(string), c); + return setChar(AMQShortString.valueOf(string), c); } public Object setChar(AMQShortString string, char c) @@ -556,7 +575,7 @@ public class FieldTable public Object setBytes(String string, byte[] b) { - return setBytes(new AMQShortString(string), b); + return setBytes(AMQShortString.valueOf(string), b); } public Object setBytes(AMQShortString string, byte[] bytes) @@ -566,7 +585,7 @@ public class FieldTable public Object setBytes(String string, byte[] bytes, int start, int length) { - return setBytes(new AMQShortString(string), bytes, start, length); + return setBytes(AMQShortString.valueOf(string), bytes, start, length); } public Object setBytes(AMQShortString string, byte[] bytes, int start, int length) @@ -579,7 +598,7 @@ public class FieldTable public Object setObject(String string, Object o) { - return setObject(new AMQShortString(string), o); + return setObject(AMQShortString.valueOf(string), o); } public Object setTimestamp(AMQShortString string, long datetime) @@ -617,7 +636,7 @@ public class FieldTable */ public Object setFieldTable(String string, FieldTable ftValue) { - return setFieldTable(new AMQShortString(string), ftValue); + return setFieldTable(AMQShortString.valueOf(string), ftValue); } /** @@ -681,7 +700,7 @@ public class FieldTable public boolean isNullStringValue(String name) { - AMQTypedValue value = getProperty(new AMQShortString(name)); + AMQTypedValue value = getProperty(AMQShortString.valueOf(name)); return (value != null) && (value.getType() == AMQType.VOID); } @@ -713,7 +732,7 @@ public class FieldTable public boolean itemExists(String string) { - return itemExists(new AMQShortString(string)); + return itemExists(AMQShortString.valueOf(string)); } public String toString() @@ -769,7 +788,7 @@ public class FieldTable // ************************* Byte Buffer Processing - public void writeToBuffer(DataOutputStream buffer) throws IOException + public void writeToBuffer(DataOutput buffer) throws IOException { final boolean trace = _logger.isDebugEnabled(); @@ -919,7 +938,7 @@ public class FieldTable public boolean containsKey(String key) { - return containsKey(new AMQShortString(key)); + return containsKey(AMQShortString.valueOf(key)); } public Set<String> keys() @@ -942,7 +961,7 @@ public class FieldTable public Object get(String key) { - return get(new AMQShortString(key)); + return get(AMQShortString.valueOf(key)); } public Object get(AMQShortString key) @@ -958,7 +977,7 @@ public class FieldTable public Object remove(String key) { - return remove(new AMQShortString(key)); + return remove(AMQShortString.valueOf(key)); } @@ -1005,12 +1024,12 @@ public class FieldTable return _properties.keySet(); } - private void putDataInBuffer(DataOutputStream buffer) throws IOException + private void putDataInBuffer(DataOutput buffer) throws IOException { if (_encodedForm != null) { - buffer.write(_encodedForm); + buffer.write(_encodedForm,_encodedFormOffset,(int)_encodedSize); } else if (_properties != null) { @@ -1039,9 +1058,8 @@ public class FieldTable private void setFromBuffer() throws AMQFrameDecodingException, IOException { - final ByteArrayInputStream in = new ByteArrayInputStream(_encodedForm); - DataInputStream buffer = new DataInputStream(in); - final boolean trace = _logger.isDebugEnabled(); + ByteArrayDataInput baid = new ByteArrayDataInput(_encodedForm, _encodedFormOffset, (int)_encodedSize); + if (_encodedSize > 0) { @@ -1051,12 +1069,12 @@ public class FieldTable do { - final AMQShortString key = EncodingUtils.readAMQShortString(buffer); - AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer); + final AMQShortString key = baid.readAMQShortString(); + AMQTypedValue value = AMQTypedValue.readFromBuffer(baid); _properties.put(key, value); } - while (in.available() > 0); + while (baid.available() > 0); } @@ -1101,7 +1119,7 @@ public class FieldTable FieldTable table = new FieldTable(); for(Map.Entry<String,Object> entry : map.entrySet()) { - table.put(new AMQShortString(entry.getKey()), entry.getValue()); + table.put(AMQShortString.valueOf(entry.getKey()), entry.getValue()); } return table; diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java index 438a46f28b..af0c5b845c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.framing; +import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; @@ -30,7 +31,7 @@ public class FieldTableFactory return new FieldTable(); } - public static FieldTable newFieldTable(DataInputStream byteBuffer, long length) throws AMQFrameDecodingException, IOException + public static FieldTable newFieldTable(DataInput byteBuffer, long length) throws AMQFrameDecodingException, IOException { return new FieldTable(byteBuffer, length); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index a6ce721a50..95b6246717 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -21,7 +21,7 @@ package org.apache.qpid.framing; import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.DataOutput; import java.io.IOException; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; @@ -56,7 +56,7 @@ public class HeartbeatBody implements AMQBody return 0;//heartbeats we generate have no payload } - public void writePayload(DataOutputStream buffer) + public void writePayload(DataOutput buffer) { } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java index dfc49c6167..971caca41a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java @@ -20,12 +20,13 @@ */ package org.apache.qpid.framing; -import java.io.DataInputStream; +import org.apache.qpid.codec.MarkableDataInput; public class HeartbeatBodyFactory implements BodyFactory { - public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException + public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException { return new HeartbeatBody(); } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index 8c018316f0..2925724dc2 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -21,13 +21,10 @@ package org.apache.qpid.framing; import org.apache.qpid.AMQException; +import org.apache.qpid.codec.MarkableDataInput; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; +import java.io.*; -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; import java.util.Arrays; public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock @@ -66,7 +63,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion()); } - public ProtocolInitiation(DataInputStream in) throws IOException + public ProtocolInitiation(MarkableDataInput in) throws IOException { _protocolHeader = new byte[4]; in.read(_protocolHeader); @@ -82,7 +79,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData return 4 + 1 + 1 + 1 + 1; } - public void writePayload(DataOutputStream buffer) throws IOException + public void writePayload(DataOutput buffer) throws IOException { buffer.write(_protocolHeader); @@ -143,7 +140,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData * @return true if we have enough data to decode the PI frame fully, false if more * data is required */ - public boolean decodable(DataInputStream in) throws IOException + public boolean decodable(MarkableDataInput in) throws IOException { return (in.available() >= 8); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java index d2925d13a8..dd854dd498 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java +++ b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java @@ -21,7 +21,7 @@ package org.apache.qpid.framing; -import java.io.DataOutputStream; +import java.io.DataOutput; import java.io.IOException; public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock @@ -69,7 +69,7 @@ public class SmallCompositeAMQDataBlock extends AMQDataBlock implements Encodabl return frameSize; } - public void writePayload(DataOutputStream buffer) throws IOException + public void writePayload(DataOutput buffer) throws IOException { if (_firstFrame != null) { diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java index ed9136f7c9..e770fdd7e4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java +++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java @@ -23,6 +23,7 @@ package org.apache.qpid.framing; import java.io.DataInputStream; import java.io.IOException; +import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.slf4j.Logger; @@ -145,7 +146,7 @@ public class VersionSpecificRegistry } - public AMQMethodBody get(short classID, short methodID, DataInputStream in, long size) throws AMQFrameDecodingException, IOException + public AMQMethodBody get(short classID, short methodID, MarkableDataInput in, long size) throws AMQFrameDecodingException, IOException { AMQMethodBodyInstanceFactory bodyFactory; try diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index b78433052c..dee5f696b9 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -383,13 +383,19 @@ public class Connection extends ConnectionInvoker public void received(ProtocolEvent event) { - log.debug("RECV: [%s] %s", this, event); + if(log.isDebugEnabled()) + { + log.debug("RECV: [%s] %s", this, event); + } event.delegate(this, delegate); } public void send(ProtocolEvent event) { - log.debug("SEND: [%s] %s", this, event); + if(log.isDebugEnabled()) + { + log.debug("SEND: [%s] %s", this, event); + } Sender s = sender; if (s == null) { @@ -400,8 +406,15 @@ public class Connection extends ConnectionInvoker public void flush() { - log.debug("FLUSH: [%s]", this); - sender.flush(); + if(log.isDebugEnabled()) + { + log.debug("FLUSH: [%s]", this); + } + final Sender<ProtocolEvent> theSender = sender; + if(theSender != null) + { + theSender.flush(); + } } protected void invoke(Method method) diff --git a/java/common/src/main/java/org/apache/qpid/transport/Header.java b/java/common/src/main/java/org/apache/qpid/transport/Header.java index 9439e5e0de..543856ca39 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Header.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Header.java @@ -20,13 +20,7 @@ */ package org.apache.qpid.transport; -import org.apache.qpid.transport.network.Frame; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.LinkedHashMap; -import java.nio.ByteBuffer; +import java.util.*; /** @@ -35,45 +29,87 @@ import java.nio.ByteBuffer; * @author Rafael H. Schloming */ -public class Header { +public class Header +{ - private final Struct[] structs; + private final DeliveryProperties _deliveryProps; + private final MessageProperties _messageProps; + private final List<Struct> _nonStandardProps; - public Header(List<Struct> structs) + public Header(DeliveryProperties deliveryProps, MessageProperties messageProps) { - this(structs.toArray(new Struct[structs.size()])); + this(deliveryProps, messageProps, null); } - public Header(Struct ... structs) + public Header(DeliveryProperties deliveryProps, MessageProperties messageProps, List<Struct> nonStandardProps) { - this.structs = structs; + _deliveryProps = deliveryProps; + _messageProps = messageProps; + _nonStandardProps = nonStandardProps; } public Struct[] getStructs() { + int size = 0; + if(_deliveryProps != null) + { + size++; + } + if(_messageProps != null) + { + size++; + } + if(_nonStandardProps != null) + { + size+=_nonStandardProps.size(); + } + Struct[] structs = new Struct[size]; + int index = 0; + if(_deliveryProps != null) + { + structs[index++] = _deliveryProps; + } + if(_messageProps != null) + { + structs[index++] = _messageProps; + } + if(_nonStandardProps != null) + { + for(Struct struct : _nonStandardProps) + { + structs[index++] = struct; + } + } + return structs; } + public DeliveryProperties getDeliveryProperties() + { + return _deliveryProps; + } - public <T> T get(Class<T> klass) + public MessageProperties getMessageProperties() { - for (Struct st : structs) - { - if (klass.isInstance(st)) - { - return (T) st; - } - } + return _messageProps; + } - return null; + public List<Struct> getNonStandardProperties() + { + return _nonStandardProps; } public String toString() { - StringBuffer str = new StringBuffer(); + StringBuilder str = new StringBuilder(); str.append(" Header("); boolean first = true; - for (Struct s : structs) + if(_deliveryProps !=null) + { + first=false; + str.append(_deliveryProps); + } + if(_messageProps != null) { if (first) { @@ -83,9 +119,24 @@ public class Header { { str.append(", "); } - str.append(s); + str.append(_messageProps); + } + if(_nonStandardProps != null) + { + for (Struct s : _nonStandardProps) + { + if (first) + { + first = false; + } + else + { + str.append(", "); + } + str.append(s); + } } - str.append(")"); + str.append(')'); return str.toString(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Range.java b/java/common/src/main/java/org/apache/qpid/transport/Range.java index f4335dc8a6..f976337788 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Range.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Range.java @@ -21,6 +21,8 @@ package org.apache.qpid.transport; import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import static org.apache.qpid.util.Serial.*; @@ -32,94 +34,259 @@ import static org.apache.qpid.util.Serial.*; * @author Rafael H. Schloming */ -public final class Range +public abstract class Range implements RangeSet { - private final int lower; - private final int upper; + public static Range newInstance(int point) + { + return new PointImpl(point); + } + + public static Range newInstance(int lower, int upper) + { + return lower == upper ? new PointImpl(lower) : new RangeImpl(lower, upper); + } + + public abstract int getLower(); + + public abstract int getUpper(); + + public abstract boolean includes(int value); + + public abstract boolean includes(Range range); + + public abstract boolean intersects(Range range); + + public abstract boolean touches(Range range); + + public abstract Range span(Range range); + + public abstract List<Range> subtract(Range range); + + + public Range intersect(Range range) + { + int l = max(getLower(), range.getLower()); + int r = min(getUpper(), range.getUpper()); + if (gt(l, r)) + { + return null; + } + else + { + return newInstance(l, r); + } + } + + + + public int size() + { + return 1; + } - public Range(int lower, int upper) + public Iterator<Range> iterator() { - this.lower = lower; - this.upper = upper; + return new RangeIterator(); } - public int getLower() + public Range getFirst() { - return lower; + return this; } - public int getUpper() + public Range getLast() { - return upper; + return this; } - public boolean includes(int value) + public void add(Range range) { - return le(lower, value) && le(value, upper); + throw new UnsupportedOperationException(); } - public boolean includes(Range range) + public void add(int lower, int upper) { - return includes(range.lower) && includes(range.upper); + throw new UnsupportedOperationException(); } - public boolean intersects(Range range) + public void add(int value) { - return (includes(range.lower) || includes(range.upper) || - range.includes(lower) || range.includes(upper)); + throw new UnsupportedOperationException(); } - public boolean touches(Range range) + public void clear() { - return (intersects(range) || - includes(range.upper + 1) || includes(range.lower - 1) || - range.includes(upper + 1) || range.includes(lower - 1)); + throw new UnsupportedOperationException(); } - public Range span(Range range) + public RangeSet copy() { - return new Range(min(lower, range.lower), max(upper, range.upper)); + RangeSet rangeSet = RangeSetFactory.createRangeSet(); + rangeSet.add(this); + return rangeSet; } - public List<Range> subtract(Range range) + private static class PointImpl extends Range { - List<Range> result = new ArrayList<Range>(); + private final int point; + + private PointImpl(int point) + { + this.point = point; + } + + public int getLower() + { + return point; + } + + public int getUpper() + { + return point; + } + + public boolean includes(int value) + { + return value == point; + } + + + public boolean includes(Range range) + { + return range.getLower() == point && range.getUpper() == point; + } + + public boolean intersects(Range range) + { + return range.includes(point); + } - if (includes(range.lower) && le(lower, range.lower - 1)) + public boolean touches(Range range) { - result.add(new Range(lower, range.lower - 1)); + return intersects(range) || + includes(range.getUpper() + 1) || includes(range.getLower() - 1) || + range.includes(point + 1) || range.includes(point - 1); } - if (includes(range.upper) && le(range.upper + 1, upper)) + public Range span(Range range) { - result.add(new Range(range.upper + 1, upper)); + return newInstance(min(point, range.getLower()), max(point, range.getUpper())); } - if (result.isEmpty() && !range.includes(this)) + public List<Range> subtract(Range range) { - result.add(this); + if(range.includes(point)) + { + return Collections.emptyList(); + } + else + { + return Collections.singletonList((Range) this); + } } - return result; } - public Range intersect(Range range) + private static class RangeImpl extends Range { - int l = max(lower, range.lower); - int r = min(upper, range.upper); - if (gt(l, r)) + private final int lower; + private final int upper; + + private RangeImpl(int lower, int upper) { - return null; + this.lower = lower; + this.upper = upper; } - else + + public int getLower() + { + return lower; + } + + public int getUpper() + { + return upper; + } + + public boolean includes(int value) + { + return le(lower, value) && le(value, upper); + } + + public boolean includes(Range range) + { + return includes(range.getLower()) && includes(range.getUpper()); + } + + public boolean intersects(Range range) + { + return (includes(range.getLower()) || includes(range.getUpper()) || + range.includes(lower) || range.includes(upper)); + } + + public boolean touches(Range range) + { + return (intersects(range) || + includes(range.getUpper() + 1) || includes(range.getLower() - 1) || + range.includes(upper + 1) || range.includes(lower - 1)); + } + + public Range span(Range range) + { + return newInstance(min(lower, range.getLower()), max(upper, range.getUpper())); + } + + public List<Range> subtract(Range range) + { + List<Range> result = new ArrayList<Range>(); + + if (includes(range.getLower()) && le(lower, range.getLower() - 1)) + { + result.add(newInstance(lower, range.getLower() - 1)); + } + + if (includes(range.getUpper()) && le(range.getUpper() + 1, upper)) + { + result.add(newInstance(range.getUpper() + 1, upper)); + } + + if (result.isEmpty() && !range.includes(this)) + { + result.add(this); + } + + return result; + } + + + public String toString() { - return new Range(l, r); + return "[" + lower + ", " + upper + "]"; } } - public String toString() + + private class RangeIterator implements Iterator<Range> { - return "[" + lower + ", " + upper + "]"; - } + private boolean atFirst = true; + + public boolean hasNext() + { + return atFirst; + } + + public Range next() + { + + Range range = atFirst ? Range.this : null; + atFirst = false; + return range; + } + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java b/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java index 3850dc162b..34ebd02777 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java +++ b/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java @@ -20,9 +20,7 @@ */ package org.apache.qpid.transport; -import java.util.Iterator; -import java.util.ListIterator; -import java.util.LinkedList; +import java.util.*; import static org.apache.qpid.util.Serial.*; @@ -32,121 +30,29 @@ import static org.apache.qpid.util.Serial.*; * @author Rafael H. Schloming */ -public final class RangeSet implements Iterable<Range> +public interface RangeSet extends Iterable<Range> { - private LinkedList<Range> ranges = new LinkedList<Range>(); - - public int size() - { - return ranges.size(); - } - - public Iterator<Range> iterator() - { - return ranges.iterator(); - } - - public Range getFirst() - { - return ranges.getFirst(); - } - - public Range getLast() - { - return ranges.getLast(); - } - - public boolean includes(Range range) - { - for (Range r : this) - { - if (r.includes(range)) - { - return true; - } - } - - return false; - } - - public boolean includes(int n) - { - for (Range r : this) - { - if (r.includes(n)) - { - return true; - } - } - - return false; - } - - public void add(Range range) - { - ListIterator<Range> it = ranges.listIterator(); - - while (it.hasNext()) - { - Range next = it.next(); - if (range.touches(next)) - { - it.remove(); - range = range.span(next); - } - else if (lt(range.getUpper(), next.getLower())) - { - it.previous(); - it.add(range); - return; - } - } - - it.add(range); - } - - public void add(int lower, int upper) - { - add(new Range(lower, upper)); - } - - public void add(int value) - { - add(value, value); - } - - public void clear() - { - ranges.clear(); - } - - public RangeSet copy() - { - RangeSet copy = new RangeSet(); - copy.ranges.addAll(ranges); - return copy; - } - - public String toString() - { - StringBuffer str = new StringBuffer(); - str.append("{"); - boolean first = true; - for (Range range : ranges) - { - if (first) - { - first = false; - } - else - { - str.append(", "); - } - str.append(range); - } - str.append("}"); - return str.toString(); - } + int size(); + + Iterator<Range> iterator(); + + Range getFirst(); + + Range getLast(); + + boolean includes(Range range); + + boolean includes(int n); + + void add(Range range); + + void add(int lower, int upper); + + void add(int value); + + void clear(); + + RangeSet copy(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java b/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java new file mode 100644 index 0000000000..0f19d7e2b2 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +public class RangeSetFactory +{ + public static RangeSet createRangeSet() + { + return new RangeSetImpl(); + } + + public static RangeSet createRangeSet(int size) + { + return new RangeSetImpl(size); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java b/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java new file mode 100644 index 0000000000..f2540afb40 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java @@ -0,0 +1,178 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; + +import static org.apache.qpid.util.Serial.lt; + +public class RangeSetImpl implements RangeSet +{ + + private List<Range> ranges; + + public RangeSetImpl() + { + ranges = new ArrayList<Range>(); + } + + public RangeSetImpl(int size) + { + ranges = new ArrayList<Range>(size); + } + + + public RangeSetImpl(org.apache.qpid.transport.RangeSetImpl copy) + { + ranges = new ArrayList<Range>(copy.ranges); + } + + public int size() + { + return ranges.size(); + } + + public Iterator<Range> iterator() + { + return ranges.iterator(); + } + + public Range getFirst() + { + return ranges.get(0); + } + + public Range getLast() + { + return ranges.get(ranges.size() - 1); + } + + public boolean includes(Range range) + { + for (Range r : this) + { + if (r.includes(range)) + { + return true; + } + } + + return false; + } + + public boolean includes(int n) + { + for (Range r : this) + { + if (r.includes(n)) + { + return true; + } + } + + return false; + } + + public void add(Range range) + { + ListIterator<Range> it = ranges.listIterator(); + + while (it.hasNext()) + { + Range next = it.next(); + if (range.touches(next)) + { + it.remove(); + range = range.span(next); + } + else if (lt(range.getUpper(), next.getLower())) + { + it.previous(); + it.add(range); + return; + } + } + + it.add(range); + } + + public void add(int lower, int upper) + { + switch(ranges.size()) + { + case 0: + ranges.add(Range.newInstance(lower, upper)); + break; + + case 1: + Range first = ranges.get(0); + if(first.getUpper() + 1 >= lower && upper >= first.getUpper()) + { + ranges.set(0, Range.newInstance(first.getLower(), upper)); + break; + } + + default: + add(Range.newInstance(lower, upper)); + } + + + } + + public void add(int value) + { + add(value, value); + } + + public void clear() + { + ranges.clear(); + } + + public RangeSet copy() + { + return new org.apache.qpid.transport.RangeSetImpl(this); + } + + public String toString() + { + StringBuffer str = new StringBuffer(); + str.append("{"); + boolean first = true; + for (Range range : ranges) + { + if (first) + { + first = false; + } + else + { + str.append(", "); + } + str.append(range); + } + str.append("}"); + return str.toString(); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 321e5256b2..3e823ba6fe 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -247,7 +247,7 @@ public class Session extends SessionInvoker synchronized (processedLock) { incomingInit = false; - processed = new RangeSet(); + processed = RangeSetFactory.createRangeSet(); } } @@ -276,22 +276,22 @@ public class Session extends SessionInvoker else if (m instanceof MessageTransfer) { MessageTransfer xfr = (MessageTransfer)m; - - if (xfr.getHeader() != null) + + Header header = xfr.getHeader(); + + if (header != null) { - if (xfr.getHeader().get(DeliveryProperties.class) != null) + if (header.getDeliveryProperties() != null) { - xfr.getHeader().get(DeliveryProperties.class).setRedelivered(true); + header.getDeliveryProperties().setRedelivered(true); } else { - Struct[] structs = xfr.getHeader().getStructs(); DeliveryProperties deliveryProps = new DeliveryProperties(); deliveryProps.setRedelivered(true); - - List<Struct> list = Arrays.asList(structs); - list.add(deliveryProps); - xfr.setHeader(new Header(list)); + + xfr.setHeader(new Header(deliveryProps, header.getMessageProperties(), + header.getNonStandardProperties())); } } @@ -299,7 +299,7 @@ public class Session extends SessionInvoker { DeliveryProperties deliveryProps = new DeliveryProperties(); deliveryProps.setRedelivered(true); - xfr.setHeader(new Header(deliveryProps)); + xfr.setHeader(new Header(deliveryProps, null, null)); } } sessionCommandPoint(m.getId(), 0); @@ -394,38 +394,46 @@ public class Session extends SessionInvoker public void processed(int command) { - processed(new Range(command, command)); + processed(command, command); } - public void processed(int lower, int upper) + public void processed(Range range) { - processed(new Range(lower, upper)); + processed(range.getLower(), range.getUpper()); } - public void processed(Range range) + public void processed(int lower, int upper) { - log.debug("%s processed(%s) %s %s", this, range, syncPoint, maxProcessed); + if(log.isDebugEnabled()) + { + log.debug("%s processed([%d,%d]) %s %s", this, lower, upper, syncPoint, maxProcessed); + } boolean flush; synchronized (processedLock) { - log.debug("%s", processed); + if(log.isDebugEnabled()) + { + log.debug("%s", processed); + } - if (ge(range.getUpper(), commandsIn)) + if (ge(upper, commandsIn)) { throw new IllegalArgumentException - ("range exceeds max received command-id: " + range); + ("range exceeds max received command-id: " + Range.newInstance(lower, upper)); } - processed.add(range); + processed.add(lower, upper); + Range first = processed.getFirst(); - int lower = first.getLower(); - int upper = first.getUpper(); + + int flower = first.getLower(); + int fupper = first.getUpper(); int old = maxProcessed; - if (le(lower, maxProcessed + 1)) + if (le(flower, maxProcessed + 1)) { - maxProcessed = max(maxProcessed, upper); + maxProcessed = max(maxProcessed, fupper); } boolean synced = ge(maxProcessed, syncPoint); flush = lt(old, syncPoint) && synced; @@ -442,7 +450,7 @@ public class Session extends SessionInvoker void flushExpected() { - RangeSet rs = new RangeSet(); + RangeSet rs = RangeSetFactory.createRangeSet(); synchronized (processedLock) { if (incomingInit) @@ -478,7 +486,7 @@ public class Session extends SessionInvoker { synchronized (processedLock) { - RangeSet newProcessed = new RangeSet(); + RangeSet newProcessed = RangeSetFactory.createRangeSet(); for (Range pr : processed) { for (Range kr : kc) @@ -534,7 +542,12 @@ public class Session extends SessionInvoker { maxComplete = max(maxComplete, upper); } - log.debug("%s commands remaining: %s", this, commandsOut - maxComplete); + + if(log.isDebugEnabled()) + { + log.debug("%s commands remaining: %s", this, commandsOut - maxComplete); + } + commands.notifyAll(); return gt(maxComplete, old); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index 3341149e5f..028b912ba1 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -91,21 +91,38 @@ public class SessionDelegate { RangeSet ranges = cmp.getCommands(); RangeSet known = null; - if (cmp.getTimelyReply()) - { - known = new RangeSet(); - } if (ranges != null) { - for (Range range : ranges) + if(ranges.size() == 1) { + Range range = ranges.getFirst(); boolean advanced = ssn.complete(range.getLower(), range.getUpper()); - if (advanced && known != null) + + if(advanced && cmp.getTimelyReply()) { - known.add(range); + known = range; } } + else + { + if (cmp.getTimelyReply()) + { + known = RangeSetFactory.createRangeSet(); + } + for (Range range : ranges) + { + boolean advanced = ssn.complete(range.getLower(), range.getUpper()); + if (advanced && known != null) + { + known.add(range); + } + } + } + } + else if (cmp.getTimelyReply()) + { + known = RangeSetFactory.createRangeSet(); } if (known != null) diff --git a/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java index 09ce6a7eb1..6ff3b21400 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java @@ -29,12 +29,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.qpid.transport.Binary; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.Struct; -import org.apache.qpid.transport.Type; - -import static org.apache.qpid.transport.util.Functions.*; +import org.apache.qpid.transport.*; /** @@ -194,18 +189,19 @@ abstract class AbstractDecoder implements Decoder public RangeSet readSequenceSet() { int count = readUint16()/8; - if (count == 0) + switch(count) { - return null; - } - else - { - RangeSet ranges = new RangeSet(); - for (int i = 0; i < count; i++) - { - ranges.add(readSequenceNo(), readSequenceNo()); - } - return ranges; + case 0: + return null; + case 1: + return Range.newInstance(readSequenceNo(), readSequenceNo()); + default: + RangeSet ranges = RangeSetFactory.createRangeSet(count); + for (int i = 0; i < count; i++) + { + ranges.add(readSequenceNo(), readSequenceNo()); + } + return ranges; } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java index 4486b03a67..d9150bed65 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java +++ b/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java @@ -70,6 +70,16 @@ public final class BBEncoder extends AbstractEncoder return slice; } + public int position() + { + return out.position(); + } + + public ByteBuffer underlyingBuffer() + { + return out; + } + private void grow(int size) { ByteBuffer old = out; diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index 1a85ab88a5..8cd5c29f6d 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -26,13 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.ProtocolError; -import org.apache.qpid.transport.ProtocolEvent; -import org.apache.qpid.transport.ProtocolHeader; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.*; import org.apache.qpid.transport.codec.BBDecoder; /** @@ -198,12 +192,33 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate break; case HEADER: command = getIncompleteCommand(channel); - List<Struct> structs = new ArrayList<Struct>(2); + List<Struct> structs = null; + DeliveryProperties deliveryProps = null; + MessageProperties messageProps = null; + while (dec.hasRemaining()) { - structs.add(dec.readStruct32()); + Struct struct = dec.readStruct32(); + if(struct instanceof DeliveryProperties && deliveryProps == null) + { + deliveryProps = (DeliveryProperties) struct; + } + else if(struct instanceof MessageProperties && messageProps == null) + { + messageProps = (MessageProperties) struct; + } + else + { + if(structs == null) + { + structs = new ArrayList<Struct>(2); + } + structs.add(struct); + } + } - command.setHeader(new Header(structs)); + command.setHeader(new Header(deliveryProps,messageProps,structs)); + if (frame.isLastSegment()) { setIncompleteCommand(channel, null); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index 685034d1a9..6ac9df9bc3 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -87,27 +87,35 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega } } + private final ByteBuffer _frameHeader = ByteBuffer.allocate(HEADER_SIZE); + + { + _frameHeader.order(ByteOrder.BIG_ENDIAN); + } + private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf) { synchronized (sendlock) { - ByteBuffer data = ByteBuffer.allocate(size + HEADER_SIZE); - data.order(ByteOrder.BIG_ENDIAN); + ByteBuffer data = _frameHeader; + _frameHeader.rewind(); + data.put(0, flags); data.put(1, type); data.putShort(2, (short) (size + HEADER_SIZE)); data.put(5, track); data.putShort(6, (short) channel); - data.position(HEADER_SIZE); + int limit = buf.limit(); buf.limit(buf.position() + size); - data.put(buf); - buf.limit(limit); - + data.rewind(); sender.send(data); + sender.send(buf); + buf.limit(limit); + } } @@ -179,7 +187,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega } } method.write(enc); - ByteBuffer methodSeg = enc.segment(); + int methodLimit = enc.position(); byte flags = FIRST_SEG; @@ -189,29 +197,44 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega flags |= LAST_SEG; } - ByteBuffer headerSeg = null; + int headerLimit = -1; if (payload) { final Header hdr = method.getHeader(); if (hdr != null) { - final Struct[] structs = hdr.getStructs(); - - for (Struct st : structs) + if(hdr.getDeliveryProperties() != null) + { + enc.writeStruct32(hdr.getDeliveryProperties()); + } + if(hdr.getMessageProperties() != null) + { + enc.writeStruct32(hdr.getMessageProperties()); + } + if(hdr.getNonStandardProperties() != null) { - enc.writeStruct32(st); + for (Struct st : hdr.getNonStandardProperties()) + { + enc.writeStruct32(st); + } } } - headerSeg = enc.segment(); + headerLimit = enc.position(); } synchronized (sendlock) { - fragment(flags, type, method, methodSeg); + ByteBuffer buf = enc.underlyingBuffer(); + buf.position(0); + buf.limit(methodLimit); + + fragment(flags, type, method, buf); if (payload) { ByteBuffer body = method.getBody(); - fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, headerSeg); + buf.limit(headerLimit); + buf.position(methodLimit); + fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf); if (body != null) { fragment(LAST_SEG, SegmentType.BODY, method, body); diff --git a/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java b/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java index bd189feb1c..cb9a9468bb 100644 --- a/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java +++ b/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java @@ -581,10 +581,10 @@ public class PropertyFieldTableTest extends TestCase table.setBytes("bytes", bytes); table.setChar("char", 'c'); - table.setDouble("double", Double.MAX_VALUE); - table.setFloat("float", Float.MAX_VALUE); table.setInteger("int", Integer.MAX_VALUE); table.setLong("long", Long.MAX_VALUE); + table.setDouble("double", Double.MAX_VALUE); + table.setFloat("float", Float.MAX_VALUE); table.setShort("short", Short.MAX_VALUE); table.setString("string", "hello"); table.setString("null-string", null); @@ -823,9 +823,7 @@ public class PropertyFieldTableTest extends TestCase */ public void testCheckPropertyNamehasMaxLength() { - String oldVal = System.getProperty("STRICT_AMQP"); - System.setProperty("STRICT_AMQP", "true"); - FieldTable table = new FieldTable(); + FieldTable table = new FieldTable(true); StringBuffer longPropertyName = new StringBuffer(129); @@ -845,14 +843,6 @@ public class PropertyFieldTableTest extends TestCase } // so length should be zero Assert.assertEquals(0, table.getEncodedSize()); - if (oldVal != null) - { - System.setProperty("STRICT_AMQP", oldVal); - } - else - { - System.clearProperty("STRICT_AMQP"); - } } /** @@ -860,9 +850,7 @@ public class PropertyFieldTableTest extends TestCase */ public void testCheckPropertyNameStartCharacterIsLetter() { - String oldVal = System.getProperty("STRICT_AMQP"); - System.setProperty("STRICT_AMQP", "true"); - FieldTable table = new FieldTable(); + FieldTable table = new FieldTable(true); // Try a name that starts with a number try @@ -876,14 +864,6 @@ public class PropertyFieldTableTest extends TestCase } // so length should be zero Assert.assertEquals(0, table.getEncodedSize()); - if (oldVal != null) - { - System.setProperty("STRICT_AMQP", oldVal); - } - else - { - System.clearProperty("STRICT_AMQP"); - } } /** @@ -891,9 +871,7 @@ public class PropertyFieldTableTest extends TestCase */ public void testCheckPropertyNameStartCharacterIsHashorDollar() { - String oldVal = System.getProperty("STRICT_AMQP"); - System.setProperty("STRICT_AMQP", "true"); - FieldTable table = new FieldTable(); + FieldTable table = new FieldTable(true); // Try a name that starts with a number try @@ -906,14 +884,6 @@ public class PropertyFieldTableTest extends TestCase fail("property name are allowed to start with # and $s"); } - if (oldVal != null) - { - System.setProperty("STRICT_AMQP", oldVal); - } - else - { - System.clearProperty("STRICT_AMQP"); - } } /** diff --git a/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java b/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java index ad45d00e46..889250e004 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java @@ -60,7 +60,7 @@ public class RangeSetTest extends TestCase public void test1() { - RangeSet ranges = new RangeSet(); + RangeSet ranges = RangeSetFactory.createRangeSet(); ranges.add(5, 10); check(ranges); ranges.add(15, 20); @@ -77,7 +77,7 @@ public class RangeSetTest extends TestCase public void test2() { - RangeSet rs = new RangeSet(); + RangeSet rs = RangeSetFactory.createRangeSet(); check(rs); rs.add(1); @@ -128,7 +128,7 @@ public class RangeSetTest extends TestCase public void testAddSelf() { - RangeSet a = new RangeSet(); + RangeSet a = RangeSetFactory.createRangeSet(); a.add(0, 8); check(a); a.add(0, 8); @@ -141,8 +141,8 @@ public class RangeSetTest extends TestCase public void testIntersect1() { - Range a = new Range(0, 10); - Range b = new Range(9, 20); + Range a = Range.newInstance(0, 10); + Range b = Range.newInstance(9, 20); Range i1 = a.intersect(b); Range i2 = b.intersect(a); assertEquals(i1.getUpper(), 10); @@ -153,16 +153,16 @@ public class RangeSetTest extends TestCase public void testIntersect2() { - Range a = new Range(0, 10); - Range b = new Range(11, 20); + Range a = Range.newInstance(0, 10); + Range b = Range.newInstance(11, 20); assertNull(a.intersect(b)); assertNull(b.intersect(a)); } public void testIntersect3() { - Range a = new Range(0, 10); - Range b = new Range(3, 5); + Range a = Range.newInstance(0, 10); + Range b = Range.newInstance(3, 5); Range i1 = a.intersect(b); Range i2 = b.intersect(a); assertEquals(i1.getUpper(), 5); @@ -173,14 +173,14 @@ public class RangeSetTest extends TestCase public void testSubtract1() { - Range a = new Range(0, 10); + Range a = Range.newInstance(0, 10); assertTrue(a.subtract(a).isEmpty()); } public void testSubtract2() { - Range a = new Range(0, 10); - Range b = new Range(20, 30); + Range a = Range.newInstance(0, 10); + Range b = Range.newInstance(20, 30); List<Range> ranges = a.subtract(b); assertEquals(ranges.size(), 1); Range d = ranges.get(0); @@ -190,8 +190,8 @@ public class RangeSetTest extends TestCase public void testSubtract3() { - Range a = new Range(20, 30); - Range b = new Range(0, 10); + Range a = Range.newInstance(20, 30); + Range b = Range.newInstance(0, 10); List<Range> ranges = a.subtract(b); assertEquals(ranges.size(), 1); Range d = ranges.get(0); @@ -201,8 +201,8 @@ public class RangeSetTest extends TestCase public void testSubtract4() { - Range a = new Range(0, 10); - Range b = new Range(3, 5); + Range a = Range.newInstance(0, 10); + Range b = Range.newInstance(3, 5); List<Range> ranges = a.subtract(b); assertEquals(ranges.size(), 2); Range low = ranges.get(0); @@ -215,8 +215,8 @@ public class RangeSetTest extends TestCase public void testSubtract5() { - Range a = new Range(0, 10); - Range b = new Range(3, 20); + Range a = Range.newInstance(0, 10); + Range b = Range.newInstance(3, 20); List<Range> ranges = a.subtract(b); assertEquals(ranges.size(), 1); Range d = ranges.get(0); @@ -226,8 +226,8 @@ public class RangeSetTest extends TestCase public void testSubtract6() { - Range a = new Range(0, 10); - Range b = new Range(-10, 5); + Range a = Range.newInstance(0, 10); + Range b = Range.newInstance(-10, 5); List<Range> ranges = a.subtract(b); assertEquals(ranges.size(), 1); Range d = ranges.get(0); |
