summaryrefslogtreecommitdiff
path: root/java/common/src/main
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-12-28 13:02:41 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-12-28 13:02:41 +0000
commit6d4226a532443ab1fe33c7d486877dbb11e154de (patch)
tree98b300c1fa6885cdabdc18ad18c7587627d6dc32 /java/common/src/main
parent2f1ced0ba4334901984de39134c0e0b9337fa5ad (diff)
downloadqpid-python-6d4226a532443ab1fe33c7d486877dbb11e154de.tar.gz
QPID-3714 : [Java] Performance Improvements
Persistence: Store message in same transaction as enqueue if possible Memory: Remove unnecessary (un)boxing Reduce unnecessary copying of message data Cache short strings Cache queues for a given routing key on an Exchange (0-9) Use a fixed size buffer for preparing frames to write out Other: Reduce calls to System.currentTimeMillis (0-10) Special case immutable RangeSets, in particular RangeSets of a single range/point (0-10) Special case delivery properties and message properties in headers (0-9) send commit-ok as soon as data committed to store Cache publishing access control queries (0-9) Optimised long and int typed values for FieldTables (0-9) Retain FieldTable encoded form (0-9) Cache queue and topic destinations git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1225178 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src/main')
-rw-r--r--java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java53
-rw-r--r--java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java21
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQBody.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java17
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java62
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java147
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQType.java102
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java225
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java204
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java174
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentBody.java65
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java108
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java14
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FieldTable.java128
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java21
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Header.java103
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Range.java251
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/RangeSet.java140
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java34
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java178
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java67
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java31
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java30
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java35
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java53
44 files changed, 1694 insertions, 670 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
index 69bf73bb49..1d196534b2 100644
--- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
@@ -24,12 +24,7 @@ import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQDataBlockDecoder;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.AMQMethodBodyFactory;
-import org.apache.qpid.framing.AMQProtocolVersionException;
-import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
@@ -193,24 +188,41 @@ public class AMQDecoder
}
}
+ private static class SimpleDataInputStream extends DataInputStream implements MarkableDataInput
+ {
+ public SimpleDataInputStream(InputStream in)
+ {
+ super(in);
+ }
+
+ public AMQShortString readAMQShortString() throws IOException
+ {
+ return EncodingUtils.readAMQShortString(this);
+ }
+
+ }
+
public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
// get prior remaining data from accumulator
ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
- DataInputStream msg;
+ MarkableDataInput msg;
- ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
+ ByteArrayInputStream bais;
+ DataInput di;
if(!_remainingBufs.isEmpty())
{
+ bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
_remainingBufs.add(bais);
- msg = new DataInputStream(new RemainingByteArrayInputStream());
+ msg = new SimpleDataInputStream(new RemainingByteArrayInputStream());
}
else
{
- msg = new DataInputStream(bais);
+ bais = null;
+ msg = new ByteArrayDataInput(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
}
boolean enoughData = true;
@@ -245,11 +257,24 @@ public class AMQDecoder
iterator.remove();
}
}
- if(bais.available()!=0)
+
+ if(bais == null)
+ {
+ if(msg.available()!=0)
+ {
+ byte[] remaining = new byte[msg.available()];
+ msg.read(remaining);
+ _remainingBufs.add(new ByteArrayInputStream(remaining));
+ }
+ }
+ else
{
- byte[] remaining = new byte[bais.available()];
- bais.read(remaining);
- _remainingBufs.add(new ByteArrayInputStream(remaining));
+ if(bais.available()!=0)
+ {
+ byte[] remaining = new byte[bais.available()];
+ bais.read(remaining);
+ _remainingBufs.add(new ByteArrayInputStream(remaining));
+ }
}
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java b/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
new file mode 100644
index 0000000000..2a243a810d
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
@@ -0,0 +1,21 @@
+package org.apache.qpid.codec;
+
+import org.apache.qpid.framing.AMQShortString;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+public interface MarkableDataInput extends DataInput
+{
+ public void mark(int pos);
+ public void reset() throws IOException;
+
+ int available() throws IOException;
+
+ long skip(long i) throws IOException;
+
+ int read(byte[] b) throws IOException;
+
+ public AMQShortString readAMQShortString() throws IOException;
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
index ebdad12178..363d9f1ccc 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.framing;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -36,7 +37,7 @@ public interface AMQBody
*/
public abstract int getSize();
- public void writePayload(DataOutputStream buffer) throws IOException;
+ public void writePayload(DataOutput buffer) throws IOException;
void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
index 00c1f5aae5..e77e5942e3 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
@@ -21,6 +21,7 @@
package org.apache.qpid.framing;
import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -42,6 +43,6 @@ public abstract class AMQDataBlock implements EncodableAMQDataBlock
* Writes the datablock to the specified buffer.
* @param buffer
*/
- public abstract void writePayload(DataOutputStream buffer) throws IOException;
+ public abstract void writePayload(DataOutput buffer) throws IOException;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 2165cadd14..b6f2fb18ea 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.framing;
+import org.apache.qpid.codec.MarkableDataInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +44,7 @@ public class AMQDataBlockDecoder
public AMQDataBlockDecoder()
{ }
- public boolean decodable(DataInputStream in) throws AMQFrameDecodingException, IOException
+ public boolean decodable(MarkableDataInput in) throws AMQFrameDecodingException, IOException
{
final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
// type, channel, body length and end byte
@@ -65,7 +66,7 @@ public class AMQDataBlockDecoder
}
- public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, DataInputStream in)
+ public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, MarkableDataInput in)
throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
final byte type = in.readByte();
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
index 6acf60a5b3..9b5699e8ff 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
@@ -20,9 +20,10 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import org.apache.qpid.codec.MarkableDataInput;
+
+import java.io.*;
+import java.io.DataOutput;
public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -38,7 +39,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
_bodyFrame = bodyFrame;
}
- public AMQFrame(final DataInputStream in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException
+ public AMQFrame(final MarkableDataInput in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException
{
this._channel = channel;
this._bodyFrame = bodyFactory.createBody(in,bodySize);
@@ -55,7 +56,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(DataOutput buffer) throws IOException
{
buffer.writeByte(_bodyFrame.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, _channel);
@@ -79,7 +80,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
}
- public static void writeFrame(DataOutputStream buffer, final int channel, AMQBody body) throws IOException
+ public static void writeFrame(DataOutput buffer, final int channel, AMQBody body) throws IOException
{
buffer.writeByte(body.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
@@ -89,7 +90,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
}
- public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2) throws IOException
+ public static void writeFrames(DataOutput buffer, final int channel, AMQBody body1, AMQBody body2) throws IOException
{
buffer.writeByte(body1.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
@@ -104,7 +105,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
}
- public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) throws IOException
+ public static void writeFrames(DataOutput buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) throws IOException
{
buffer.writeByte(body1.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
index a076d0e5a1..2170ebf992 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -45,12 +46,12 @@ public interface AMQMethodBody extends AMQBody
/** @return unsigned short */
public int getMethod();
- public void writeMethodPayload(DataOutputStream buffer) throws IOException;
+ public void writeMethodPayload(DataOutput buffer) throws IOException;
public int getSize();
- public void writePayload(DataOutputStream buffer) throws IOException;
+ public void writePayload(DataOutput buffer) throws IOException;
//public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
index 7fceb082ee..ec6d662726 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.framing;
+import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
@@ -39,7 +41,7 @@ public class AMQMethodBodyFactory implements BodyFactory
_protocolSession = protocolSession;
}
- public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException
+ public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException
{
return _protocolSession.getMethodRegistry().convertToBody(in, bodySize);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
index c73c1df701..d6f518b123 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
@@ -24,11 +24,12 @@ package org.apache.qpid.framing;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
public abstract class AMQMethodBodyImpl implements AMQMethodBody
@@ -101,7 +102,7 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
return 2 + 2 + getBodySize();
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(DataOutput buffer) throws IOException
{
EncodingUtils.writeUnsignedShort(buffer, getClazz());
EncodingUtils.writeUnsignedShort(buffer, getMethod());
@@ -109,14 +110,15 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
}
- protected byte readByte(DataInputStream buffer) throws IOException
+ protected byte readByte(DataInput buffer) throws IOException
{
return buffer.readByte();
}
- protected AMQShortString readAMQShortString(DataInputStream buffer) throws IOException
+ protected AMQShortString readAMQShortString(MarkableDataInput buffer) throws IOException
{
- return EncodingUtils.readAMQShortString(buffer);
+ AMQShortString str = buffer.readAMQShortString();
+ return str == null ? null : str.intern(false);
}
protected int getSizeOf(AMQShortString string)
@@ -124,27 +126,27 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
return EncodingUtils.encodedShortStringLength(string);
}
- protected void writeByte(DataOutputStream buffer, byte b) throws IOException
+ protected void writeByte(DataOutput buffer, byte b) throws IOException
{
buffer.writeByte(b);
}
- protected void writeAMQShortString(DataOutputStream buffer, AMQShortString string) throws IOException
+ protected void writeAMQShortString(DataOutput buffer, AMQShortString string) throws IOException
{
EncodingUtils.writeShortStringBytes(buffer, string);
}
- protected int readInt(DataInputStream buffer) throws IOException
+ protected int readInt(DataInput buffer) throws IOException
{
return buffer.readInt();
}
- protected void writeInt(DataOutputStream buffer, int i) throws IOException
+ protected void writeInt(DataOutput buffer, int i) throws IOException
{
buffer.writeInt(i);
}
- protected FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException
+ protected FieldTable readFieldTable(DataInput buffer) throws AMQFrameDecodingException, IOException
{
return EncodingUtils.readFieldTable(buffer);
}
@@ -154,17 +156,17 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
}
- protected void writeFieldTable(DataOutputStream buffer, FieldTable table) throws IOException
+ protected void writeFieldTable(DataOutput buffer, FieldTable table) throws IOException
{
EncodingUtils.writeFieldTableBytes(buffer, table);
}
- protected long readLong(DataInputStream buffer) throws IOException
+ protected long readLong(DataInput buffer) throws IOException
{
return buffer.readLong();
}
- protected void writeLong(DataOutputStream buffer, long l) throws IOException
+ protected void writeLong(DataOutput buffer, long l) throws IOException
{
buffer.writeLong(l);
}
@@ -174,27 +176,27 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
return (response == null) ? 4 : response.length + 4;
}
- protected void writeBytes(DataOutputStream buffer, byte[] data) throws IOException
+ protected void writeBytes(DataOutput buffer, byte[] data) throws IOException
{
EncodingUtils.writeBytes(buffer,data);
}
- protected byte[] readBytes(DataInputStream buffer) throws IOException
+ protected byte[] readBytes(DataInput buffer) throws IOException
{
return EncodingUtils.readBytes(buffer);
}
- protected short readShort(DataInputStream buffer) throws IOException
+ protected short readShort(DataInput buffer) throws IOException
{
return EncodingUtils.readShort(buffer);
}
- protected void writeShort(DataOutputStream buffer, short s) throws IOException
+ protected void writeShort(DataOutput buffer, short s) throws IOException
{
EncodingUtils.writeShort(buffer, s);
}
- protected Content readContent(DataInputStream buffer)
+ protected Content readContent(DataInput buffer)
{
return null;
}
@@ -204,56 +206,56 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
return 0;
}
- protected void writeContent(DataOutputStream buffer, Content body)
+ protected void writeContent(DataOutput buffer, Content body)
{
}
- protected byte readBitfield(DataInputStream buffer) throws IOException
+ protected byte readBitfield(DataInput buffer) throws IOException
{
return readByte(buffer);
}
- protected int readUnsignedShort(DataInputStream buffer) throws IOException
+ protected int readUnsignedShort(DataInput buffer) throws IOException
{
return buffer.readUnsignedShort();
}
- protected void writeBitfield(DataOutputStream buffer, byte bitfield0) throws IOException
+ protected void writeBitfield(DataOutput buffer, byte bitfield0) throws IOException
{
buffer.writeByte(bitfield0);
}
- protected void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException
+ protected void writeUnsignedShort(DataOutput buffer, int s) throws IOException
{
EncodingUtils.writeUnsignedShort(buffer, s);
}
- protected long readUnsignedInteger(DataInputStream buffer) throws IOException
+ protected long readUnsignedInteger(DataInput buffer) throws IOException
{
return EncodingUtils.readUnsignedInteger(buffer);
}
- protected void writeUnsignedInteger(DataOutputStream buffer, long i) throws IOException
+ protected void writeUnsignedInteger(DataOutput buffer, long i) throws IOException
{
EncodingUtils.writeUnsignedInteger(buffer, i);
}
- protected short readUnsignedByte(DataInputStream buffer) throws IOException
+ protected short readUnsignedByte(DataInput buffer) throws IOException
{
return (short) buffer.readUnsignedByte();
}
- protected void writeUnsignedByte(DataOutputStream buffer, short unsignedByte) throws IOException
+ protected void writeUnsignedByte(DataOutput buffer, short unsignedByte) throws IOException
{
EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
}
- protected long readTimestamp(DataInputStream buffer) throws IOException
+ protected long readTimestamp(DataInput buffer) throws IOException
{
return EncodingUtils.readTimestamp(buffer);
}
- protected void writeTimestamp(DataOutputStream buffer, long t) throws IOException
+ protected void writeTimestamp(DataOutput buffer, long t) throws IOException
{
EncodingUtils.writeTimestamp(buffer, t);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
index df4d8bdcb6..88b1ca7189 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
@@ -21,11 +21,12 @@
package org.apache.qpid.framing;
-import java.io.DataInputStream;
+import org.apache.qpid.codec.MarkableDataInput;
+
import java.io.IOException;
public abstract interface AMQMethodBodyInstanceFactory
{
- public AMQMethodBody newInstance(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException;
+ public AMQMethodBody newInstance(MarkableDataInput buffer, long size) throws AMQFrameDecodingException, IOException;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
index cc9a33f4cf..4ff7827d7f 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -24,8 +24,9 @@ package org.apache.qpid.framing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.*;
import java.lang.ref.WeakReference;
@@ -93,22 +94,44 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
private AMQShortString substring(final int from, final int to)
{
- return new AMQShortString(_data, from+_offset, to+_offset);
+ return new AMQShortString(_data, from+_offset, to-from);
}
- private static final ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>> _localInternMap =
- new ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>>()
+ private static final int LOCAL_INTERN_CACHE_SIZE = 2048;
+
+ private static final ThreadLocal<Map<AMQShortString, AMQShortString>> _localInternMap =
+ new ThreadLocal<Map<AMQShortString, AMQShortString>>()
{
- protected Map<AMQShortString, WeakReference<AMQShortString>> initialValue()
+ protected Map<AMQShortString, AMQShortString> initialValue()
{
- return new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>();
+ return new LinkedHashMap<AMQShortString, AMQShortString>()
+ {
+
+ protected boolean removeEldestEntry(Map.Entry<AMQShortString, AMQShortString> eldest)
+ {
+ return size() > LOCAL_INTERN_CACHE_SIZE;
+ }
+ };
};
};
private static final Map<AMQShortString, WeakReference<AMQShortString>> _globalInternMap =
new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>();
+
+ private static final ThreadLocal<Map<String, WeakReference<AMQShortString>>> _localStringMap =
+ new ThreadLocal<Map<String, WeakReference<AMQShortString>>>()
+ {
+ protected Map<String, WeakReference<AMQShortString>> initialValue()
+ {
+ return new WeakHashMap<String, WeakReference<AMQShortString>>();
+ };
+ };
+
+ private static final Map<String, WeakReference<AMQShortString>> _globalStringMap =
+ new WeakHashMap<String, WeakReference<AMQShortString>>();
+
private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class);
private final byte[] _data;
@@ -200,32 +223,32 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
}
- private AMQShortString(DataInputStream data, final int length) throws IOException
+ private AMQShortString(DataInput data, final int length) throws IOException
{
if (length > MAX_LENGTH)
{
throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
}
byte[] dataBytes = new byte[length];
- data.read(dataBytes);
+ data.readFully(dataBytes);
_data = dataBytes;
_offset = 0;
_length = length;
}
- private AMQShortString(final byte[] data, final int from, final int to)
+ public AMQShortString(byte[] data, final int offset, final int length)
{
- if (data == null)
- {
- throw new NullPointerException("Cannot create AMQShortString with null data[]");
- }
- int length = to - from;
if (length > MAX_LENGTH)
{
throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
}
- _offset = from;
+ if (data == null)
+ {
+ throw new NullPointerException("Cannot create AMQShortString with null data[]");
+ }
+
+ _offset = offset;
_length = length;
_data = data;
}
@@ -234,9 +257,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
{
if(_data.length != _length)
{
- byte[] dataBytes = new byte[_length];
- System.arraycopy(_data,_offset,dataBytes,0,_length);
- return new AMQShortString(dataBytes,0,_length);
+ return copy();
}
else
{
@@ -265,7 +286,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return new CharSubSequence(start, end);
}
- public static AMQShortString readFromBuffer(DataInputStream buffer) throws IOException
+ public static AMQShortString readFromBuffer(DataInput buffer) throws IOException
{
final int length = buffer.readUnsignedByte();
if (length == 0)
@@ -293,12 +314,12 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
}
}
- public void writeToBuffer(DataOutputStream buffer) throws IOException
+ public void writeToBuffer(DataOutput buffer) throws IOException
{
final int size = length();
//buffer.setAutoExpand(true);
- buffer.write((byte) size);
+ buffer.writeByte(size);
buffer.write(_data, _offset, size);
}
@@ -420,7 +441,17 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
{
if (_asString == null)
{
- _asString = new String(asChars());
+ AMQShortString intern = intern();
+
+ if(intern == this)
+ {
+ _asString = new String(asChars());
+ }
+ else
+ {
+ _asString = intern.asString();
+ }
+
}
return _asString;
}
@@ -609,42 +640,51 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
public AMQShortString intern()
{
+ return intern(true);
+ }
+
+ public AMQShortString intern(boolean keep)
+ {
hashCode();
- Map<AMQShortString, WeakReference<AMQShortString>> localMap =
+ Map<AMQShortString, AMQShortString> localMap =
_localInternMap.get();
- WeakReference<AMQShortString> ref = localMap.get(this);
- AMQShortString internString;
+ AMQShortString internString = localMap.get(this);
- if(ref != null)
+
+ if(internString != null)
{
- internString = ref.get();
- if(internString != null)
- {
- return internString;
- }
+ return internString;
}
+ WeakReference<AMQShortString> ref;
synchronized(_globalInternMap)
{
ref = _globalInternMap.get(this);
if((ref == null) || ((internString = ref.get()) == null))
{
- internString = shrink();
+ internString = keep ? shrink() : copy();
ref = new WeakReference(internString);
_globalInternMap.put(internString, ref);
}
}
- localMap.put(internString, ref);
+ localMap.put(internString, internString);
return internString;
}
+ private AMQShortString copy()
+ {
+ byte[] dataBytes = new byte[_length];
+ System.arraycopy(_data,_offset,dataBytes,0,_length);
+ return new AMQShortString(dataBytes,0,_length);
+ }
+
private int occurences(final byte delim)
{
int count = 0;
@@ -761,7 +801,46 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
public static AMQShortString valueOf(Object obj)
{
- return obj == null ? null : new AMQShortString(String.valueOf(obj));
+ return obj == null ? null : AMQShortString.valueOf(String.valueOf(obj));
+ }
+
+ public static AMQShortString valueOf(String obj)
+ {
+ if(obj == null)
+ {
+ return null;
+ }
+
+ Map<String, WeakReference<AMQShortString>> localMap =
+ _localStringMap.get();
+
+ WeakReference<AMQShortString> ref = localMap.get(obj);
+ AMQShortString internString;
+
+ if(ref != null)
+ {
+ internString = ref.get();
+ if(internString != null)
+ {
+ return internString;
+ }
+ }
+
+
+ synchronized(_globalStringMap)
+ {
+
+ ref = _globalStringMap.get(obj);
+ if((ref == null) || ((internString = ref.get()) == null))
+ {
+ internString = (new AMQShortString(obj)).intern();
+ ref = new WeakReference<AMQShortString>(internString);
+ _globalStringMap.put(obj, ref);
+ }
+
+ }
+ localMap.put(obj, ref);
+ return internString;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
index f3da64e639..5c89af09c4 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.math.BigDecimal;
@@ -61,12 +61,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readLongString(buffer);
}
@@ -107,12 +107,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeUnsignedInteger(buffer, (Long) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readUnsignedInteger(buffer);
}
@@ -138,7 +138,7 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
BigDecimal bd = (BigDecimal) value;
@@ -151,7 +151,7 @@ public enum AMQType
EncodingUtils.writeInteger(buffer, unscaled);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
byte places = EncodingUtils.readByte(buffer);
@@ -183,12 +183,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeLong(buffer, (Long) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readLong(buffer);
}
@@ -247,7 +247,7 @@ public enum AMQType
* @param value An instance of the type.
* @param buffer The byte buffer to write it to.
*/
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
// Ensure that the value is a FieldTable.
if (!(value instanceof FieldTable))
@@ -268,7 +268,7 @@ public enum AMQType
*
* @return An instance of the type.
*/
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
try
{
@@ -302,10 +302,10 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer)
+ public void writeValueImpl(Object value, DataOutput buffer)
{ }
- public Object readValueFromBuffer(DataInputStream buffer)
+ public Object readValueFromBuffer(DataInput buffer)
{
return null;
}
@@ -331,12 +331,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeLongstr(buffer, (byte[]) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readLongstr(buffer);
}
@@ -361,12 +361,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readLongString(buffer);
}
@@ -392,12 +392,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readLongString(buffer);
}
@@ -427,12 +427,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeBoolean(buffer, (Boolean) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readBoolean(buffer);
}
@@ -462,12 +462,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeChar(buffer, (Character) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readChar(buffer);
}
@@ -497,12 +497,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeByte(buffer, (Byte) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readByte(buffer);
}
@@ -536,12 +536,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeShort(buffer, (Short) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readShort(buffer);
}
@@ -578,12 +578,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeInteger(buffer, (Integer) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readInteger(buffer);
}
@@ -596,6 +596,22 @@ public enum AMQType
return EncodingUtils.encodedLongLength();
}
+ public int getEncodingSize(long value)
+ {
+ return EncodingUtils.encodedLongLength();
+ }
+
+ public AMQTypedValue asTypedValue(long value)
+ {
+ return AMQTypedValue.createAMQTypedValue(value);
+ }
+
+ public void writeToBuffer(long value, DataOutput buffer) throws IOException
+ {
+ buffer.writeByte(identifier());
+ EncodingUtils.writeLong(buffer, value);
+ }
+
public Object toNativeValue(Object value)
{
if (value instanceof Long)
@@ -625,12 +641,18 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeLong(buffer, (Long) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public long readLongFromBuffer(DataInput buffer) throws IOException
+ {
+ return EncodingUtils.readLong(buffer);
+ }
+
+
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readLong(buffer);
}
@@ -660,12 +682,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeFloat(buffer, (Float) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readFloat(buffer);
}
@@ -699,12 +721,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeDouble(buffer, (Double) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readDouble(buffer);
}
@@ -761,7 +783,7 @@ public enum AMQType
*/
public AMQTypedValue asTypedValue(Object value)
{
- return new AMQTypedValue(this, toNativeValue(value));
+ return AMQTypedValue.createAMQTypedValue(this, toNativeValue(value));
}
/**
@@ -771,7 +793,7 @@ public enum AMQType
* @param value An instance of the type.
* @param buffer The byte buffer to write it to.
*/
- public void writeToBuffer(Object value, DataOutputStream buffer) throws IOException
+ public void writeToBuffer(Object value, DataOutput buffer) throws IOException
{
buffer.writeByte(identifier());
writeValueImpl(value, buffer);
@@ -783,7 +805,7 @@ public enum AMQType
* @param value An instance of the type.
* @param buffer The byte buffer to write it to.
*/
- abstract void writeValueImpl(Object value, DataOutputStream buffer) throws IOException;
+ abstract void writeValueImpl(Object value, DataOutput buffer) throws IOException;
/**
* Reads an instance of the type from a specified byte buffer.
@@ -792,5 +814,5 @@ public enum AMQType
*
* @return An instance of the type.
*/
- abstract Object readValueFromBuffer(DataInputStream buffer) throws IOException;
+ abstract Object readValueFromBuffer(DataInput buffer) throws IOException;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
index 1dbedca362..84e4056f4d 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
import java.util.Date;
import java.util.Map;
import java.math.BigDecimal;
@@ -42,81 +40,218 @@ import java.math.BigDecimal;
* <tr><td> Extract the value from a fully typed AMQP value.
* </table>
*/
-public class AMQTypedValue
+public abstract class AMQTypedValue
{
- /** The type of the value. */
- private final AMQType _type;
- /** The Java native representation of the AMQP typed value. */
- private final Object _value;
+ public abstract AMQType getType();
- public AMQTypedValue(AMQType type, Object value)
+ public abstract Object getValue();
+
+ public abstract void writeToBuffer(DataOutput buffer) throws IOException;
+
+ public abstract int getEncodingSize();
+
+
+ private static final class GenericTypedValue extends AMQTypedValue
{
- if (type == null)
+ /** The type of the value. */
+ private final AMQType _type;
+
+ /** The Java native representation of the AMQP typed value. */
+ private final Object _value;
+
+ private GenericTypedValue(AMQType type, Object value)
{
- throw new NullPointerException("Cannot create a typed value with null type");
+ if (type == null)
+ {
+ throw new NullPointerException("Cannot create a typed value with null type");
+ }
+
+ _type = type;
+ _value = type.toNativeValue(value);
}
- _type = type;
- _value = type.toNativeValue(value);
- }
+ private GenericTypedValue(AMQType type, DataInput buffer) throws IOException
+ {
+ _type = type;
+ _value = type.readValueFromBuffer(buffer);
+ }
+
+
+ public AMQType getType()
+ {
+ return _type;
+ }
+
+ public Object getValue()
+ {
+ return _value;
+ }
+
+ public void writeToBuffer(DataOutput buffer) throws IOException
+ {
+ _type.writeToBuffer(_value, buffer);
+ }
+
+ public int getEncodingSize()
+ {
+ return _type.getEncodingSize(_value);
+ }
+
+
+ public String toString()
+ {
+ return "[" + getType() + ": " + getValue() + "]";
+ }
+
+
+ public boolean equals(Object o)
+ {
+ if(o instanceof GenericTypedValue)
+ {
+ GenericTypedValue other = (GenericTypedValue) o;
+ return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value));
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public int hashCode()
+ {
+ return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
+ }
- private AMQTypedValue(AMQType type, DataInputStream buffer) throws IOException
- {
- _type = type;
- _value = type.readValueFromBuffer(buffer);
}
- public AMQType getType()
+ private static final class LongTypedValue extends AMQTypedValue
{
- return _type;
+
+ private final long _value;
+
+ private LongTypedValue(long value)
+ {
+ _value = value;
+ }
+
+ public LongTypedValue(DataInput buffer) throws IOException
+ {
+ _value = EncodingUtils.readLong(buffer);
+ }
+
+ public AMQType getType()
+ {
+ return AMQType.LONG;
+ }
+
+
+ public Object getValue()
+ {
+ return _value;
+ }
+
+ public void writeToBuffer(DataOutput buffer) throws IOException
+ {
+ EncodingUtils.writeByte(buffer,AMQType.LONG.identifier());
+ EncodingUtils.writeLong(buffer,_value);
+ }
+
+
+ public int getEncodingSize()
+ {
+ return EncodingUtils.encodedLongLength();
+ }
}
- public Object getValue()
+ private static final class IntTypedValue extends AMQTypedValue
{
- return _value;
+
+ private final int _value;
+
+ public IntTypedValue(int value)
+ {
+ _value = value;
+ }
+
+ public AMQType getType()
+ {
+ return AMQType.INT;
+ }
+
+
+ public Object getValue()
+ {
+ return _value;
+ }
+
+ public void writeToBuffer(DataOutput buffer) throws IOException
+ {
+ EncodingUtils.writeByte(buffer,AMQType.INT.identifier());
+ EncodingUtils.writeInteger(buffer, _value);
+ }
+
+
+ public int getEncodingSize()
+ {
+ return EncodingUtils.encodedIntegerLength();
+ }
}
- public void writeToBuffer(DataOutputStream buffer) throws IOException
+
+ public static AMQTypedValue readFromBuffer(DataInput buffer) throws IOException
{
- _type.writeToBuffer(_value, buffer);
+ AMQType type = AMQTypeMap.getType(buffer.readByte());
+
+ switch(type)
+ {
+ case LONG:
+ return new LongTypedValue(buffer);
+
+ case INT:
+ int value = EncodingUtils.readInteger(buffer);
+ return createAMQTypedValue(value);
+
+ default:
+ return new GenericTypedValue(type, buffer);
+ }
+
}
- public int getEncodingSize()
+ private static final AMQTypedValue[] INT_VALUES = new AMQTypedValue[16];
+ static
{
- return _type.getEncodingSize(_value);
+ for(int i = 0 ; i < 16; i ++)
+ {
+ INT_VALUES[i] = new IntTypedValue(i);
+ }
}
- public static AMQTypedValue readFromBuffer(DataInputStream buffer) throws IOException
+ public static AMQTypedValue createAMQTypedValue(int i)
{
- AMQType type = AMQTypeMap.getType(buffer.readByte());
-
- return new AMQTypedValue(type, buffer);
+ return (i & 0x0f) == i ? INT_VALUES[i] : new IntTypedValue(i);
}
- public String toString()
+
+ public static AMQTypedValue createAMQTypedValue(long value)
{
- return "[" + getType() + ": " + getValue() + "]";
+ return new LongTypedValue(value);
}
-
- public boolean equals(Object o)
+ public static AMQTypedValue createAMQTypedValue(AMQType type, Object value)
{
- if(o instanceof AMQTypedValue)
+ switch(type)
{
- AMQTypedValue other = (AMQTypedValue) o;
- return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value));
- }
- else
- {
- return false;
+ case LONG:
+ return new LongTypedValue((Long) AMQType.LONG.toNativeValue(value));
+ case INT:
+ return new IntTypedValue((Integer) AMQType.INT.toNativeValue(value));
+
+ default:
+ return new GenericTypedValue(type, value);
}
}
- public int hashCode()
- {
- return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
- }
public static AMQTypedValue toTypedValue(Object val)
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
index 57622b5054..2739f7d14b 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -20,8 +20,9 @@
*/
package org.apache.qpid.framing;
+import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
import org.slf4j.Logger;
@@ -80,6 +81,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
private static final int USER_ID_MASK = 1 << 4;
private static final int APPLICATION_ID_MASK = 1 << 3;
private static final int CLUSTER_ID_MASK = 1 << 2;
+ private byte[] _encodedForm;
public BasicContentHeaderProperties()
@@ -87,6 +89,12 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
public int getPropertyListSize()
{
+ if(_encodedForm != null && (_headers == null || _headers.isClean()))
+ {
+ return _encodedForm.length;
+ }
+ else
+ {
int size = 0;
if ((_propertyFlags & (CONTENT_TYPE_MASK)) > 0)
@@ -167,6 +175,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
}
return size;
+ }
}
public void setPropertyFlags(int propertyFlags)
@@ -179,87 +188,94 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
return _propertyFlags;
}
- public void writePropertyListPayload(DataOutputStream buffer) throws IOException
+ public void writePropertyListPayload(DataOutput buffer) throws IOException
{
- if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
+ if(_encodedForm != null && (_headers == null || !_headers.isClean()))
{
- EncodingUtils.writeShortStringBytes(buffer, _contentType);
+ buffer.write(_encodedForm);
}
-
- if ((_propertyFlags & ENCODING_MASK) != 0)
+ else
{
- EncodingUtils.writeShortStringBytes(buffer, _encoding);
- }
+ if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _contentType);
+ }
- if ((_propertyFlags & HEADERS_MASK) != 0)
- {
- EncodingUtils.writeFieldTableBytes(buffer, _headers);
- }
+ if ((_propertyFlags & ENCODING_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _encoding);
+ }
- if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
- {
- buffer.writeByte(_deliveryMode);
- }
+ if ((_propertyFlags & HEADERS_MASK) != 0)
+ {
+ EncodingUtils.writeFieldTableBytes(buffer, _headers);
+ }
- if ((_propertyFlags & PRIORITY_MASK) != 0)
- {
- buffer.writeByte(_priority);
- }
+ if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
+ {
+ buffer.writeByte(_deliveryMode);
+ }
- if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _correlationId);
- }
+ if ((_propertyFlags & PRIORITY_MASK) != 0)
+ {
+ buffer.writeByte(_priority);
+ }
- if ((_propertyFlags & REPLY_TO_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _replyTo);
- }
+ if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _correlationId);
+ }
- if ((_propertyFlags & EXPIRATION_MASK) != 0)
- {
- if (_expiration == 0L)
+ if ((_propertyFlags & REPLY_TO_MASK) != 0)
{
- EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING);
+ EncodingUtils.writeShortStringBytes(buffer, _replyTo);
}
- else
+
+ if ((_propertyFlags & EXPIRATION_MASK) != 0)
{
- EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration));
+ if (_expiration == 0L)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING);
+ }
+ else
+ {
+ EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration));
+ }
}
- }
- if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _messageId);
- }
+ if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _messageId);
+ }
- if ((_propertyFlags & TIMESTAMP_MASK) != 0)
- {
- EncodingUtils.writeTimestamp(buffer, _timestamp);
- }
+ if ((_propertyFlags & TIMESTAMP_MASK) != 0)
+ {
+ EncodingUtils.writeTimestamp(buffer, _timestamp);
+ }
- if ((_propertyFlags & TYPE_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _type);
- }
+ if ((_propertyFlags & TYPE_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _type);
+ }
- if ((_propertyFlags & USER_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _userId);
- }
+ if ((_propertyFlags & USER_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _userId);
+ }
- if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _appId);
- }
+ if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _appId);
+ }
- if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _clusterId);
+ if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _clusterId);
+ }
}
}
- public void populatePropertiesFromBuffer(DataInputStream buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException
+ public void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException
{
_propertyFlags = propertyFlags;
@@ -268,26 +284,40 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
_logger.debug("Property flags: " + _propertyFlags);
}
- decode(buffer);
+ _encodedForm = new byte[size];
+ buffer.readFully(_encodedForm);
+
+ ByteArrayDataInput input = new ByteArrayDataInput(_encodedForm);
+
+ decode(input);
+
}
- private void decode(DataInputStream buffer) throws IOException, AMQFrameDecodingException
+ private void decode(ByteArrayDataInput buffer) throws IOException, AMQFrameDecodingException
{
// ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+ int headersOffset = 0;
+
if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
{
- _contentType = EncodingUtils.readAMQShortString(buffer);
+ _contentType = buffer.readAMQShortString();
+ headersOffset += EncodingUtils.encodedShortStringLength(_contentType);
}
if ((_propertyFlags & ENCODING_MASK) != 0)
{
- _encoding = EncodingUtils.readAMQShortString(buffer);
+ _encoding = buffer.readAMQShortString();
+ headersOffset += EncodingUtils.encodedShortStringLength(_encoding);
}
if ((_propertyFlags & HEADERS_MASK) != 0)
{
- _headers = EncodingUtils.readFieldTable(buffer);
+ long length = EncodingUtils.readUnsignedInteger(buffer);
+
+ _headers = new FieldTable(_encodedForm, headersOffset+4, (int)length);
+
+ buffer.skipBytes((int)length);
}
if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
@@ -302,12 +332,12 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
{
- _correlationId = EncodingUtils.readAMQShortString(buffer);
+ _correlationId = buffer.readAMQShortString();
}
if ((_propertyFlags & REPLY_TO_MASK) != 0)
{
- _replyTo = EncodingUtils.readAMQShortString(buffer);
+ _replyTo = buffer.readAMQShortString();
}
if ((_propertyFlags & EXPIRATION_MASK) != 0)
@@ -317,7 +347,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
{
- _messageId = EncodingUtils.readAMQShortString(buffer);
+ _messageId = buffer.readAMQShortString();
}
if ((_propertyFlags & TIMESTAMP_MASK) != 0)
@@ -327,22 +357,22 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
if ((_propertyFlags & TYPE_MASK) != 0)
{
- _type = EncodingUtils.readAMQShortString(buffer);
+ _type = buffer.readAMQShortString();
}
if ((_propertyFlags & USER_ID_MASK) != 0)
{
- _userId = EncodingUtils.readAMQShortString(buffer);
+ _userId = buffer.readAMQShortString();
}
if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
{
- _appId = EncodingUtils.readAMQShortString(buffer);
+ _appId = buffer.readAMQShortString();
}
if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
{
- _clusterId = EncodingUtils.readAMQShortString(buffer);
+ _clusterId = buffer.readAMQShortString();
}
@@ -363,11 +393,12 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
{
_propertyFlags |= (CONTENT_TYPE_MASK);
_contentType = contentType;
+ _encodedForm = null;
}
public void setContentType(String contentType)
{
- setContentType((contentType == null) ? null : new AMQShortString(contentType));
+ setContentType((contentType == null) ? null : AMQShortString.valueOf(contentType));
}
public String getEncodingAsString()
@@ -384,13 +415,15 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
public void setEncoding(String encoding)
{
_propertyFlags |= ENCODING_MASK;
- _encoding = (encoding == null) ? null : new AMQShortString(encoding);
+ _encoding = (encoding == null) ? null : AMQShortString.valueOf(encoding);
+ _encodedForm = null;
}
public void setEncoding(AMQShortString encoding)
{
_propertyFlags |= ENCODING_MASK;
_encoding = encoding;
+ _encodedForm = null;
}
public FieldTable getHeaders()
@@ -407,6 +440,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
{
_propertyFlags |= HEADERS_MASK;
_headers = headers;
+ _encodedForm = null;
}
public byte getDeliveryMode()
@@ -418,6 +452,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
{
_propertyFlags |= DELIVERY_MODE_MASK;
_deliveryMode = deliveryMode;
+ _encodedForm = null;
}
public byte getPriority()
@@ -429,6 +464,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
{
_propertyFlags |= PRIORITY_MASK;
_priority = priority;
+ _encodedForm = null;
}
public AMQShortString getCorrelationId()
@@ -443,13 +479,14 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
public void setCorrelationId(String correlationId)
{
- setCorrelationId((correlationId == null) ? null : new AMQShortString(correlationId));
+ setCorrelationId((correlationId == null) ? null : AMQShortString.valueOf(correlationId));
}
public void setCorrelationId(AMQShortString correlationId)
{
_propertyFlags |= CORRELATION_ID_MASK;
_correlationId = correlationId;
+ _encodedForm = null;
}
public String getReplyToAsString()
@@ -464,13 +501,14 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
public void setReplyTo(String replyTo)
{
- setReplyTo((replyTo == null) ? null : new AMQShortString(replyTo));
+ setReplyTo((replyTo == null) ? null : AMQShortString.valueOf(replyTo));
}
public void setReplyTo(AMQShortString replyTo)
{
_propertyFlags |= REPLY_TO_MASK;
_replyTo = replyTo;
+ _encodedForm = null;
}
public long getExpiration()
@@ -482,6 +520,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
{
_propertyFlags |= EXPIRATION_MASK;
_expiration = expiration;
+ _encodedForm = null;
}
public AMQShortString getMessageId()
@@ -498,12 +537,14 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
{
_propertyFlags |= MESSAGE_ID_MASK;
_messageId = (messageId == null) ? null : new AMQShortString(messageId);
+ _encodedForm = null;
}
public void setMessageId(AMQShortString messageId)
{
_propertyFlags |= MESSAGE_ID_MASK;
_messageId = messageId;
+ _encodedForm = null;
}
public long getTimestamp()
@@ -515,6 +556,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
{
_propertyFlags |= TIMESTAMP_MASK;
_timestamp = timestamp;
+ _encodedForm = null;
}
public String getTypeAsString()
@@ -529,13 +571,14 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
public void setType(String type)
{
- setType((type == null) ? null : new AMQShortString(type));
+ setType((type == null) ? null : AMQShortString.valueOf(type));
}
public void setType(AMQShortString type)
{
_propertyFlags |= TYPE_MASK;
_type = type;
+ _encodedForm = null;
}
public String getUserIdAsString()
@@ -550,13 +593,14 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
public void setUserId(String userId)
{
- setUserId((userId == null) ? null : new AMQShortString(userId));
+ setUserId((userId == null) ? null : AMQShortString.valueOf(userId));
}
public void setUserId(AMQShortString userId)
{
_propertyFlags |= USER_ID_MASK;
_userId = userId;
+ _encodedForm = null;
}
public String getAppIdAsString()
@@ -571,13 +615,14 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
public void setAppId(String appId)
{
- setAppId((appId == null) ? null : new AMQShortString(appId));
+ setAppId((appId == null) ? null : AMQShortString.valueOf(appId));
}
public void setAppId(AMQShortString appId)
{
_propertyFlags |= APPLICATION_ID_MASK;
_appId = appId;
+ _encodedForm = null;
}
public String getClusterIdAsString()
@@ -592,13 +637,14 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
public void setClusterId(String clusterId)
{
- setClusterId((clusterId == null) ? null : new AMQShortString(clusterId));
+ setClusterId((clusterId == null) ? null : AMQShortString.valueOf(clusterId));
}
public void setClusterId(AMQShortString clusterId)
{
_propertyFlags |= CLUSTER_ID_MASK;
_clusterId = clusterId;
+ _encodedForm = null;
}
public String toString()
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
index f9580d82b1..554e9373d8 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
+import org.apache.qpid.codec.MarkableDataInput;
+
import java.io.IOException;
/**
@@ -28,5 +29,5 @@ import java.io.IOException;
*/
public interface BodyFactory
{
- AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException;
+ AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java b/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java
new file mode 100644
index 0000000000..656185629b
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java
@@ -0,0 +1,174 @@
+package org.apache.qpid.framing;
+
+import org.apache.qpid.codec.MarkableDataInput;
+
+import java.io.IOException;
+
+public class ByteArrayDataInput implements ExtendedDataInput, MarkableDataInput
+{
+ private byte[] _data;
+ private int _offset;
+ private int _length;
+ private int _origin;
+ private int _mark;
+
+ public ByteArrayDataInput(byte[] data)
+ {
+ this(data,0, data.length);
+ }
+
+ public ByteArrayDataInput(byte[] data, int offset, int length)
+ {
+ _data = data;
+ _offset = offset;
+ _length = length;
+ _origin = offset;
+ _mark = 0;
+ }
+
+ public void readFully(byte[] b)
+ {
+ System.arraycopy(_data,_offset,b,0,b.length);
+ _offset+=b.length;
+ }
+
+ public void readFully(byte[] b, int off, int len)
+ {
+ System.arraycopy(_data,_offset,b,off,len);
+ _offset+=len;
+ }
+
+ public int skipBytes(int n)
+ {
+ return _offset+=n;
+ }
+
+ public boolean readBoolean()
+ {
+ return _data[_offset++] != 0;
+ }
+
+ public byte readByte()
+ {
+ return _data[_offset++];
+ }
+
+ public int readUnsignedByte()
+ {
+ return ((int)_data[_offset++]) & 0xFF;
+ }
+
+ public short readShort()
+ {
+ return (short) (((((int)_data[_offset++]) << 8) & 0xFF00) | (((int)_data[_offset++]) & 0xFF));
+ }
+
+ public int readUnsignedShort()
+ {
+ return (((((int)_data[_offset++]) << 8) & 0xFF00) | (((int)_data[_offset++]) & 0xFF));
+ }
+
+ public char readChar()
+ {
+ return (char) (((((int)_data[_offset++]) << 8) & 0xFF00) | (((int)_data[_offset++]) & 0xFF));
+ }
+
+ public int readInt()
+ {
+ return ((((int)_data[_offset++]) << 24) & 0xFF000000)
+ | ((((int)_data[_offset++]) << 16) & 0xFF0000)
+ | ((((int)_data[_offset++]) << 8) & 0xFF00)
+ | (((int)_data[_offset++]) & 0xFF);
+ }
+
+ public long readLong()
+ {
+ return ((((long)_data[_offset++]) << 56) & 0xFF00000000000000L)
+ | ((((long)_data[_offset++]) << 48) & 0xFF000000000000L)
+ | ((((long)_data[_offset++]) << 40) & 0xFF0000000000L)
+ | ((((long)_data[_offset++]) << 32) & 0xFF00000000L)
+ | ((((long)_data[_offset++]) << 24) & 0xFF000000L)
+ | ((((long)_data[_offset++]) << 16) & 0xFF0000L)
+ | ((((long)_data[_offset++]) << 8) & 0xFF00L)
+ | (((long)_data[_offset++]) & 0xFFL);
+ }
+
+ public float readFloat()
+ {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ public double readDouble()
+ {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ public AMQShortString readAMQShortString()
+ {
+ int length = _data[_offset++] & 0xff;
+ if(length == 0)
+ {
+ return null;
+ }
+ else
+ {
+ final AMQShortString amqShortString = new AMQShortString(_data, _offset, length);
+ _offset+=length;
+ return amqShortString;
+ }
+ }
+
+ public String readLine()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public String readUTF()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int available()
+ {
+ return (_origin+_length)-_offset;
+ }
+
+
+ public long skip(long i)
+ {
+ _offset+=i;
+ return i;
+ }
+
+ public int read(byte[] b)
+ {
+ readFully(b);
+ return b.length;
+ }
+
+ public int position()
+ {
+ return _offset - _origin;
+ }
+
+ public void position(int position)
+ {
+ _offset = position + _origin;
+ }
+
+ public int length()
+ {
+ return _length;
+ }
+
+
+ public void mark(int readAhead)
+ {
+ _mark = _offset-_origin;
+ }
+
+ public void reset()
+ {
+ _offset = _origin + _mark;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
index 15bc20c52d..098e3652ad 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.framing;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
@@ -50,7 +50,7 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD
return frameSize;
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(DataOutput buffer) throws IOException
{
for (int i = 0; i < _blocks.length; i++)
{
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
index aedb35f92a..541d104dc9 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -20,9 +20,11 @@
*/
package org.apache.qpid.framing;
+import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.AMQException;
@@ -37,10 +39,10 @@ public class ContentBody implements AMQBody
{
}
- public ContentBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException
+ public ContentBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException
{
_payload = new byte[(int)size];
- buffer.read(_payload);
+ buffer.readFully(_payload);
}
@@ -59,7 +61,7 @@ public class ContentBody implements AMQBody
return _payload == null ? 0 : _payload.length;
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(DataOutput buffer) throws IOException
{
buffer.write(_payload);
}
@@ -84,11 +86,62 @@ public class ContentBody implements AMQBody
{
}
+ private static class BufferContentBody implements AMQBody
+ {
+ private final int _length;
+ private final int _offset;
+ private final ByteBuffer _buf;
+
+ private BufferContentBody( ByteBuffer buf, int offset, int length)
+ {
+ _length = length;
+ _offset = offset;
+ _buf = buf;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+
+ public int getSize()
+ {
+ return _length;
+ }
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ if(_buf.hasArray())
+ {
+ buffer.write(_buf.array(), _buf.arrayOffset() + _offset, _length);
+ }
+ else
+ {
+ byte[] data = new byte[_length];
+ ByteBuffer buf = _buf.duplicate();
+
+ buf.position(_offset);
+ buf.limit(_offset+_length);
+ buf.get(data);
+ buffer.write(data);
+ }
+ }
+
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new RuntimeException("Buffered Body only to be used for outgoing data");
+ }
+ }
+
+ public static AMQFrame createAMQFrame(int channelId, ByteBuffer buf, int offset, int length)
+ {
+ return new AMQFrame(channelId, new BufferContentBody(buf, offset, length));
+ }
public static AMQFrame createAMQFrame(int channelId, ContentBody body)
{
- final AMQFrame frame = new AMQFrame(channelId, body);
- return frame;
+ return new AMQFrame(channelId, body);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
index a0b030ab6b..de2ffe9755 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
@@ -20,9 +20,9 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
import java.io.IOException;
+import org.apache.qpid.codec.MarkableDataInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +42,7 @@ public class ContentBodyFactory implements BodyFactory
_log.debug("Creating content body factory");
}
- public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException
+ public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException
{
return new ContentBody(in, bodySize);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index 18d0f26152..8a2ad53157 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -20,8 +20,9 @@
*/
package org.apache.qpid.framing;
+import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
@@ -45,7 +46,7 @@ public class ContentHeaderBody implements AMQBody
{
}
- public ContentHeaderBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException
+ public ContentHeaderBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException
{
classId = buffer.readUnsignedShort();
weight = buffer.readUnsignedShort();
@@ -106,7 +107,7 @@ public class ContentHeaderBody implements AMQBody
return 2 + 2 + 8 + 2 + properties.getPropertyListSize();
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(DataOutput buffer) throws IOException
{
EncodingUtils.writeUnsignedShort(buffer, classId);
EncodingUtils.writeUnsignedShort(buffer, weight);
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
index a474e337b7..c3e4c69ec0 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
@@ -20,9 +20,9 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
import java.io.IOException;
+import org.apache.qpid.codec.MarkableDataInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +42,7 @@ public class ContentHeaderBodyFactory implements BodyFactory
_log.debug("Creating content header body factory");
}
- public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException
+ public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException
{
// all content headers are the same - it is only the properties that differ.
// the content header body further delegates construction of properties
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
index 237929f9a3..ea8358a538 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
@@ -20,8 +20,9 @@
*/
package org.apache.qpid.framing;
+import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
@@ -35,7 +36,7 @@ public interface ContentHeaderProperties
* Writes the property list to the buffer, in a suitably encoded form.
* @param buffer The buffer to write to
*/
- void writePropertyListPayload(DataOutputStream buffer) throws IOException;
+ void writePropertyListPayload(DataOutput buffer) throws IOException;
/**
* Populates the properties from buffer.
@@ -43,7 +44,7 @@ public interface ContentHeaderProperties
* @param propertyFlags he property flags.
* @throws AMQFrameDecodingException when the buffer does not contain valid data
*/
- void populatePropertiesFromBuffer(DataInputStream buffer, int propertyFlags, int size)
+ void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size)
throws AMQFrameDecodingException, IOException;
/**
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
index 43ee8cd1f1..48bd52858d 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.framing;
+import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
@@ -39,7 +40,7 @@ public class ContentHeaderPropertiesFactory
}
public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
- DataInputStream buffer, int size)
+ DataInput buffer, int size)
throws AMQFrameDecodingException, IOException
{
ContentHeaderProperties properties;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
index 2d7e27405c..e018407509 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
@@ -219,7 +219,7 @@ public class EncodingUtils
return 0;
}
- public static void writeShortStringBytes(DataOutputStream buffer, String s) throws IOException
+ public static void writeShortStringBytes(DataOutput buffer, String s) throws IOException
{
if (s != null)
{
@@ -243,7 +243,7 @@ public class EncodingUtils
}
}
- public static void writeShortStringBytes(DataOutputStream buffer, AMQShortString s) throws IOException
+ public static void writeShortStringBytes(DataOutput buffer, AMQShortString s) throws IOException
{
if (s != null)
{
@@ -257,7 +257,7 @@ public class EncodingUtils
}
}
- public static void writeLongStringBytes(DataOutputStream buffer, String s) throws IOException
+ public static void writeLongStringBytes(DataOutput buffer, String s) throws IOException
{
assert (s == null) || (s.length() <= 0xFFFE);
if (s != null)
@@ -279,7 +279,7 @@ public class EncodingUtils
}
}
- public static void writeLongStringBytes(DataOutputStream buffer, char[] s) throws IOException
+ public static void writeLongStringBytes(DataOutput buffer, char[] s) throws IOException
{
assert (s == null) || (s.length <= 0xFFFE);
if (s != null)
@@ -300,7 +300,7 @@ public class EncodingUtils
}
}
- public static void writeLongStringBytes(DataOutputStream buffer, byte[] bytes) throws IOException
+ public static void writeLongStringBytes(DataOutput buffer, byte[] bytes) throws IOException
{
assert (bytes == null) || (bytes.length <= 0xFFFE);
if (bytes != null)
@@ -314,13 +314,13 @@ public class EncodingUtils
}
}
- public static void writeUnsignedByte(DataOutputStream buffer, short b) throws IOException
+ public static void writeUnsignedByte(DataOutput buffer, short b) throws IOException
{
byte bv = (byte) b;
buffer.write(bv);
}
- public static void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException
+ public static void writeUnsignedShort(DataOutput buffer, int s) throws IOException
{
// TODO: Is this comparison safe? Do I need to cast RHS to long?
if (s < Short.MAX_VALUE)
@@ -340,7 +340,7 @@ public class EncodingUtils
return 4;
}
- public static void writeUnsignedInteger(DataOutputStream buffer, long l) throws IOException
+ public static void writeUnsignedInteger(DataOutput buffer, long l) throws IOException
{
// TODO: Is this comparison safe? Do I need to cast RHS to long?
if (l < Integer.MAX_VALUE)
@@ -360,7 +360,7 @@ public class EncodingUtils
}
}
- public static void writeFieldTableBytes(DataOutputStream buffer, FieldTable table) throws IOException
+ public static void writeFieldTableBytes(DataOutput buffer, FieldTable table) throws IOException
{
if (table != null)
{
@@ -372,12 +372,12 @@ public class EncodingUtils
}
}
- public static void writeContentBytes(DataOutputStream buffer, Content content)
+ public static void writeContentBytes(DataOutput buffer, Content content)
{
// TODO: New Content class required for AMQP 0-9.
}
- public static void writeBooleans(DataOutputStream buffer, boolean[] values) throws IOException
+ public static void writeBooleans(DataOutput buffer, boolean[] values) throws IOException
{
byte packedValue = 0;
for (int i = 0; i < values.length; i++)
@@ -391,13 +391,13 @@ public class EncodingUtils
buffer.write(packedValue);
}
- public static void writeBooleans(DataOutputStream buffer, boolean value) throws IOException
+ public static void writeBooleans(DataOutput buffer, boolean value) throws IOException
{
buffer.write(value ? (byte) 1 : (byte) 0);
}
- public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1) throws IOException
+ public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -409,7 +409,7 @@ public class EncodingUtils
buffer.write(packedValue);
}
- public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2) throws IOException
+ public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -426,7 +426,7 @@ public class EncodingUtils
buffer.write(packedValue);
}
- public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3) throws IOException
+ public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -448,7 +448,7 @@ public class EncodingUtils
buffer.write(packedValue);
}
- public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3,
boolean value4) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -476,7 +476,7 @@ public class EncodingUtils
buffer.write(packedValue);
}
- public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3,
boolean value4, boolean value5) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -509,7 +509,7 @@ public class EncodingUtils
buffer.write(packedValue);
}
- public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3,
boolean value4, boolean value5, boolean value6) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -547,7 +547,7 @@ public class EncodingUtils
buffer.write(packedValue);
}
- public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3,
boolean value4, boolean value5, boolean value6, boolean value7) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -596,7 +596,7 @@ public class EncodingUtils
* @param buffer
* @param data
*/
- public static void writeLongstr(DataOutputStream buffer, byte[] data) throws IOException
+ public static void writeLongstr(DataOutput buffer, byte[] data) throws IOException
{
if (data != null)
{
@@ -609,12 +609,12 @@ public class EncodingUtils
}
}
- public static void writeTimestamp(DataOutputStream buffer, long timestamp) throws IOException
+ public static void writeTimestamp(DataOutput buffer, long timestamp) throws IOException
{
writeLong(buffer, timestamp);
}
- public static boolean[] readBooleans(DataInputStream buffer) throws IOException
+ public static boolean[] readBooleans(DataInput buffer) throws IOException
{
final byte packedValue = buffer.readByte();
if (packedValue == 0)
@@ -641,7 +641,7 @@ public class EncodingUtils
return result;
}
- public static FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException
+ public static FieldTable readFieldTable(DataInput buffer) throws AMQFrameDecodingException, IOException
{
long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
@@ -654,19 +654,19 @@ public class EncodingUtils
}
}
- public static Content readContent(DataInputStream buffer) throws AMQFrameDecodingException
+ public static Content readContent(DataInput buffer) throws AMQFrameDecodingException
{
// TODO: New Content class required for AMQP 0-9.
return null;
}
- public static AMQShortString readAMQShortString(DataInputStream buffer) throws IOException
+ public static AMQShortString readAMQShortString(DataInput buffer) throws IOException
{
return AMQShortString.readFromBuffer(buffer);
}
- public static String readShortString(DataInputStream buffer) throws IOException
+ public static String readShortString(DataInput buffer) throws IOException
{
short length = (short) (((short)buffer.readByte()) & 0xFF);
if (length == 0)
@@ -681,7 +681,7 @@ public class EncodingUtils
// this approach here is valid since we know that all the chars are
// ASCII (0-127)
byte[] stringBytes = new byte[length];
- buffer.read(stringBytes, 0, length);
+ buffer.readFully(stringBytes, 0, length);
char[] stringChars = new char[length];
for (int i = 0; i < stringChars.length; i++)
{
@@ -692,7 +692,7 @@ public class EncodingUtils
}
}
- public static String readLongString(DataInputStream buffer) throws IOException
+ public static String readLongString(DataInput buffer) throws IOException
{
long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
@@ -707,7 +707,7 @@ public class EncodingUtils
// this approach here is valid since we know that all the chars are
// ASCII (0-127)
byte[] stringBytes = new byte[(int) length];
- buffer.read(stringBytes, 0, (int) length);
+ buffer.readFully(stringBytes, 0, (int) length);
char[] stringChars = new char[(int) length];
for (int i = 0; i < stringChars.length; i++)
{
@@ -718,7 +718,7 @@ public class EncodingUtils
}
}
- public static byte[] readLongstr(DataInputStream buffer) throws IOException
+ public static byte[] readLongstr(DataInput buffer) throws IOException
{
long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
@@ -728,13 +728,13 @@ public class EncodingUtils
else
{
byte[] result = new byte[(int) length];
- buffer.read(result);
+ buffer.readFully(result);
return result;
}
}
- public static long readTimestamp(DataInputStream buffer) throws IOException
+ public static long readTimestamp(DataInput buffer) throws IOException
{
// Discard msb from AMQ timestamp
// buffer.getUnsignedInt();
@@ -818,12 +818,12 @@ public class EncodingUtils
// AMQP_BOOLEAN_PROPERTY_PREFIX
- public static void writeBoolean(DataOutputStream buffer, Boolean aBoolean) throws IOException
+ public static void writeBoolean(DataOutput buffer, boolean aBoolean) throws IOException
{
buffer.write(aBoolean ? 1 : 0);
}
- public static boolean readBoolean(DataInputStream buffer) throws IOException
+ public static boolean readBoolean(DataInput buffer) throws IOException
{
byte packedValue = buffer.readByte();
@@ -836,12 +836,12 @@ public class EncodingUtils
}
// AMQP_BYTE_PROPERTY_PREFIX
- public static void writeByte(DataOutputStream buffer, Byte aByte) throws IOException
+ public static void writeByte(DataOutput buffer, byte aByte) throws IOException
{
buffer.writeByte(aByte);
}
- public static byte readByte(DataInputStream buffer) throws IOException
+ public static byte readByte(DataInput buffer) throws IOException
{
return buffer.readByte();
}
@@ -852,12 +852,12 @@ public class EncodingUtils
}
// AMQP_SHORT_PROPERTY_PREFIX
- public static void writeShort(DataOutputStream buffer, Short aShort) throws IOException
+ public static void writeShort(DataOutput buffer, short aShort) throws IOException
{
buffer.writeShort(aShort);
}
- public static short readShort(DataInputStream buffer) throws IOException
+ public static short readShort(DataInput buffer) throws IOException
{
return buffer.readShort();
}
@@ -868,12 +868,12 @@ public class EncodingUtils
}
// INTEGER_PROPERTY_PREFIX
- public static void writeInteger(DataOutputStream buffer, Integer aInteger) throws IOException
+ public static void writeInteger(DataOutput buffer, int aInteger) throws IOException
{
buffer.writeInt(aInteger);
}
- public static int readInteger(DataInputStream buffer) throws IOException
+ public static int readInteger(DataInput buffer) throws IOException
{
return buffer.readInt();
}
@@ -884,12 +884,12 @@ public class EncodingUtils
}
// AMQP_LONG_PROPERTY_PREFIX
- public static void writeLong(DataOutputStream buffer, Long aLong) throws IOException
+ public static void writeLong(DataOutput buffer, long aLong) throws IOException
{
buffer.writeLong(aLong);
}
- public static long readLong(DataInputStream buffer) throws IOException
+ public static long readLong(DataInput buffer) throws IOException
{
return buffer.readLong();
}
@@ -900,12 +900,12 @@ public class EncodingUtils
}
// Float_PROPERTY_PREFIX
- public static void writeFloat(DataOutputStream buffer, Float aFloat) throws IOException
+ public static void writeFloat(DataOutput buffer, float aFloat) throws IOException
{
buffer.writeFloat(aFloat);
}
- public static float readFloat(DataInputStream buffer) throws IOException
+ public static float readFloat(DataInput buffer) throws IOException
{
return buffer.readFloat();
}
@@ -916,12 +916,12 @@ public class EncodingUtils
}
// Double_PROPERTY_PREFIX
- public static void writeDouble(DataOutputStream buffer, Double aDouble) throws IOException
+ public static void writeDouble(DataOutput buffer, Double aDouble) throws IOException
{
buffer.writeDouble(aDouble);
}
- public static double readDouble(DataInputStream buffer) throws IOException
+ public static double readDouble(DataInput buffer) throws IOException
{
return buffer.readDouble();
}
@@ -931,7 +931,7 @@ public class EncodingUtils
return 8;
}
- public static byte[] readBytes(DataInputStream buffer) throws IOException
+ public static byte[] readBytes(DataInput buffer) throws IOException
{
long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
@@ -941,13 +941,13 @@ public class EncodingUtils
else
{
byte[] dataBytes = new byte[(int)length];
- buffer.read(dataBytes, 0, (int) length);
+ buffer.readFully(dataBytes, 0, (int) length);
return dataBytes;
}
}
- public static void writeBytes(DataOutputStream buffer, byte[] data) throws IOException
+ public static void writeBytes(DataOutput buffer, byte[] data) throws IOException
{
if (data != null)
{
@@ -969,19 +969,19 @@ public class EncodingUtils
return encodedByteLength();
}
- public static char readChar(DataInputStream buffer) throws IOException
+ public static char readChar(DataInput buffer) throws IOException
{
// This is valid as we know that the Character is ASCII 0..127
- return (char) buffer.read();
+ return (char) buffer.readByte();
}
- public static void writeChar(DataOutputStream buffer, char character) throws IOException
+ public static void writeChar(DataOutput buffer, char character) throws IOException
{
// This is valid as we know that the Character is ASCII 0..127
writeByte(buffer, (byte) character);
}
- public static long readLongAsShortString(DataInputStream buffer) throws IOException
+ public static long readLongAsShortString(DataInput buffer) throws IOException
{
short length = (short) buffer.readUnsignedByte();
short pos = 0;
@@ -1018,7 +1018,7 @@ public class EncodingUtils
return result;
}
- public static long readUnsignedInteger(DataInputStream buffer) throws IOException
+ public static long readUnsignedInteger(DataInput buffer) throws IOException
{
long l = 0xFF & buffer.readByte();
l <<= 8;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java b/java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java
new file mode 100644
index 0000000000..c789d9275e
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java
@@ -0,0 +1,14 @@
+package org.apache.qpid.framing;
+
+import java.io.DataInput;
+
+public interface ExtendedDataInput extends DataInput
+{
+ AMQShortString readAMQShortString();
+
+ int available();
+
+ int position();
+
+ void position(int position);
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index 4a126b8504..863e363b87 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -25,11 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQPInvalidClassException;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.Enumeration;
@@ -44,18 +40,27 @@ import java.util.Set;
public class FieldTable
{
private static final Logger _logger = LoggerFactory.getLogger(FieldTable.class);
- private static final String STRICT_AMQP = "STRICT_AMQP";
- private final boolean _strictAMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP, "false"));
+ private static final String STRICT_AMQP_NAME = "STRICT_AMQP";
+ private static final boolean STRICT_AMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP_NAME, "false"));
private byte[] _encodedForm;
+ private int _encodedFormOffset;
private LinkedHashMap<AMQShortString, AMQTypedValue> _properties = null;
private long _encodedSize;
private static final int INITIAL_HASHMAP_CAPACITY = 16;
private static final int INITIAL_ENCODED_FORM_SIZE = 256;
+ private final boolean _strictAMQP;
public FieldTable()
{
+ this(STRICT_AMQP);
+ }
+
+
+ public FieldTable(boolean strictAMQP)
+ {
super();
+ _strictAMQP = strictAMQP;
}
/**
@@ -64,14 +69,28 @@ public class FieldTable
* @param buffer the buffer from which to read data. The length byte must be read already
* @param length the length of the field table. Must be > 0.
*/
- public FieldTable(DataInputStream buffer, long length) throws IOException
+ public FieldTable(DataInput buffer, long length) throws IOException
{
this();
_encodedForm = new byte[(int) length];
- buffer.read(_encodedForm);
+ buffer.readFully(_encodedForm);
_encodedSize = length;
}
+ public FieldTable(byte[] encodedForm, int offset, int length) throws IOException
+ {
+ this();
+ _encodedForm = encodedForm;
+ _encodedFormOffset = offset;
+ _encodedSize = length;
+ }
+
+
+ public boolean isClean()
+ {
+ return _encodedForm != null;
+ }
+
public AMQTypedValue getProperty(AMQShortString string)
{
checkPropertyName(string);
@@ -181,7 +200,7 @@ public class FieldTable
public Boolean getBoolean(String string)
{
- return getBoolean(new AMQShortString(string));
+ return getBoolean(AMQShortString.valueOf(string));
}
public Boolean getBoolean(AMQShortString string)
@@ -199,7 +218,7 @@ public class FieldTable
public Byte getByte(String string)
{
- return getByte(new AMQShortString(string));
+ return getByte(AMQShortString.valueOf(string));
}
public Byte getByte(AMQShortString string)
@@ -217,7 +236,7 @@ public class FieldTable
public Short getShort(String string)
{
- return getShort(new AMQShortString(string));
+ return getShort(AMQShortString.valueOf(string));
}
public Short getShort(AMQShortString string)
@@ -235,7 +254,7 @@ public class FieldTable
public Integer getInteger(String string)
{
- return getInteger(new AMQShortString(string));
+ return getInteger(AMQShortString.valueOf(string));
}
public Integer getInteger(AMQShortString string)
@@ -253,7 +272,7 @@ public class FieldTable
public Long getLong(String string)
{
- return getLong(new AMQShortString(string));
+ return getLong(AMQShortString.valueOf(string));
}
public Long getLong(AMQShortString string)
@@ -271,7 +290,7 @@ public class FieldTable
public Float getFloat(String string)
{
- return getFloat(new AMQShortString(string));
+ return getFloat(AMQShortString.valueOf(string));
}
public Float getFloat(AMQShortString string)
@@ -289,7 +308,7 @@ public class FieldTable
public Double getDouble(String string)
{
- return getDouble(new AMQShortString(string));
+ return getDouble(AMQShortString.valueOf(string));
}
public Double getDouble(AMQShortString string)
@@ -307,7 +326,7 @@ public class FieldTable
public String getString(String string)
{
- return getString(new AMQShortString(string));
+ return getString(AMQShortString.valueOf(string));
}
public String getString(AMQShortString string)
@@ -330,7 +349,7 @@ public class FieldTable
public Character getCharacter(String string)
{
- return getCharacter(new AMQShortString(string));
+ return getCharacter(AMQShortString.valueOf(string));
}
public Character getCharacter(AMQShortString string)
@@ -348,7 +367,7 @@ public class FieldTable
public byte[] getBytes(String string)
{
- return getBytes(new AMQShortString(string));
+ return getBytes(AMQShortString.valueOf(string));
}
public byte[] getBytes(AMQShortString string)
@@ -374,7 +393,7 @@ public class FieldTable
*/
public FieldTable getFieldTable(String string)
{
- return getFieldTable(new AMQShortString(string));
+ return getFieldTable(AMQShortString.valueOf(string));
}
/**
@@ -401,7 +420,7 @@ public class FieldTable
public Object getObject(String string)
{
- return getObject(new AMQShortString(string));
+ return getObject(AMQShortString.valueOf(string));
}
public Object getObject(AMQShortString string)
@@ -447,7 +466,7 @@ public class FieldTable
// ************ Setters
public Object setBoolean(String string, Boolean b)
{
- return setBoolean(new AMQShortString(string), b);
+ return setBoolean(AMQShortString.valueOf(string), b);
}
public Object setBoolean(AMQShortString string, Boolean b)
@@ -457,7 +476,7 @@ public class FieldTable
public Object setByte(String string, Byte b)
{
- return setByte(new AMQShortString(string), b);
+ return setByte(AMQShortString.valueOf(string), b);
}
public Object setByte(AMQShortString string, Byte b)
@@ -467,7 +486,7 @@ public class FieldTable
public Object setShort(String string, Short i)
{
- return setShort(new AMQShortString(string), i);
+ return setShort(AMQShortString.valueOf(string), i);
}
public Object setShort(AMQShortString string, Short i)
@@ -475,29 +494,29 @@ public class FieldTable
return setProperty(string, AMQType.SHORT.asTypedValue(i));
}
- public Object setInteger(String string, Integer i)
+ public Object setInteger(String string, int i)
{
- return setInteger(new AMQShortString(string), i);
+ return setInteger(AMQShortString.valueOf(string), i);
}
- public Object setInteger(AMQShortString string, Integer i)
+ public Object setInteger(AMQShortString string, int i)
{
- return setProperty(string, AMQType.INT.asTypedValue(i));
+ return setProperty(string, AMQTypedValue.createAMQTypedValue(i));
}
- public Object setLong(String string, Long l)
+ public Object setLong(String string, long l)
{
- return setLong(new AMQShortString(string), l);
+ return setLong(AMQShortString.valueOf(string), l);
}
- public Object setLong(AMQShortString string, Long l)
+ public Object setLong(AMQShortString string, long l)
{
- return setProperty(string, AMQType.LONG.asTypedValue(l));
+ return setProperty(string, AMQTypedValue.createAMQTypedValue(l));
}
public Object setFloat(String string, Float f)
{
- return setFloat(new AMQShortString(string), f);
+ return setFloat(AMQShortString.valueOf(string), f);
}
public Object setFloat(AMQShortString string, Float v)
@@ -507,7 +526,7 @@ public class FieldTable
public Object setDouble(String string, Double d)
{
- return setDouble(new AMQShortString(string), d);
+ return setDouble(AMQShortString.valueOf(string), d);
}
public Object setDouble(AMQShortString string, Double v)
@@ -517,7 +536,7 @@ public class FieldTable
public Object setString(String string, String s)
{
- return setString(new AMQShortString(string), s);
+ return setString(AMQShortString.valueOf(string), s);
}
public Object setAsciiString(AMQShortString string, String value)
@@ -546,7 +565,7 @@ public class FieldTable
public Object setChar(String string, char c)
{
- return setChar(new AMQShortString(string), c);
+ return setChar(AMQShortString.valueOf(string), c);
}
public Object setChar(AMQShortString string, char c)
@@ -556,7 +575,7 @@ public class FieldTable
public Object setBytes(String string, byte[] b)
{
- return setBytes(new AMQShortString(string), b);
+ return setBytes(AMQShortString.valueOf(string), b);
}
public Object setBytes(AMQShortString string, byte[] bytes)
@@ -566,7 +585,7 @@ public class FieldTable
public Object setBytes(String string, byte[] bytes, int start, int length)
{
- return setBytes(new AMQShortString(string), bytes, start, length);
+ return setBytes(AMQShortString.valueOf(string), bytes, start, length);
}
public Object setBytes(AMQShortString string, byte[] bytes, int start, int length)
@@ -579,7 +598,7 @@ public class FieldTable
public Object setObject(String string, Object o)
{
- return setObject(new AMQShortString(string), o);
+ return setObject(AMQShortString.valueOf(string), o);
}
public Object setTimestamp(AMQShortString string, long datetime)
@@ -617,7 +636,7 @@ public class FieldTable
*/
public Object setFieldTable(String string, FieldTable ftValue)
{
- return setFieldTable(new AMQShortString(string), ftValue);
+ return setFieldTable(AMQShortString.valueOf(string), ftValue);
}
/**
@@ -681,7 +700,7 @@ public class FieldTable
public boolean isNullStringValue(String name)
{
- AMQTypedValue value = getProperty(new AMQShortString(name));
+ AMQTypedValue value = getProperty(AMQShortString.valueOf(name));
return (value != null) && (value.getType() == AMQType.VOID);
}
@@ -713,7 +732,7 @@ public class FieldTable
public boolean itemExists(String string)
{
- return itemExists(new AMQShortString(string));
+ return itemExists(AMQShortString.valueOf(string));
}
public String toString()
@@ -769,7 +788,7 @@ public class FieldTable
// ************************* Byte Buffer Processing
- public void writeToBuffer(DataOutputStream buffer) throws IOException
+ public void writeToBuffer(DataOutput buffer) throws IOException
{
final boolean trace = _logger.isDebugEnabled();
@@ -919,7 +938,7 @@ public class FieldTable
public boolean containsKey(String key)
{
- return containsKey(new AMQShortString(key));
+ return containsKey(AMQShortString.valueOf(key));
}
public Set<String> keys()
@@ -942,7 +961,7 @@ public class FieldTable
public Object get(String key)
{
- return get(new AMQShortString(key));
+ return get(AMQShortString.valueOf(key));
}
public Object get(AMQShortString key)
@@ -958,7 +977,7 @@ public class FieldTable
public Object remove(String key)
{
- return remove(new AMQShortString(key));
+ return remove(AMQShortString.valueOf(key));
}
@@ -1005,12 +1024,12 @@ public class FieldTable
return _properties.keySet();
}
- private void putDataInBuffer(DataOutputStream buffer) throws IOException
+ private void putDataInBuffer(DataOutput buffer) throws IOException
{
if (_encodedForm != null)
{
- buffer.write(_encodedForm);
+ buffer.write(_encodedForm,_encodedFormOffset,(int)_encodedSize);
}
else if (_properties != null)
{
@@ -1039,9 +1058,8 @@ public class FieldTable
private void setFromBuffer() throws AMQFrameDecodingException, IOException
{
- final ByteArrayInputStream in = new ByteArrayInputStream(_encodedForm);
- DataInputStream buffer = new DataInputStream(in);
- final boolean trace = _logger.isDebugEnabled();
+ ByteArrayDataInput baid = new ByteArrayDataInput(_encodedForm, _encodedFormOffset, (int)_encodedSize);
+
if (_encodedSize > 0)
{
@@ -1051,12 +1069,12 @@ public class FieldTable
do
{
- final AMQShortString key = EncodingUtils.readAMQShortString(buffer);
- AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer);
+ final AMQShortString key = baid.readAMQShortString();
+ AMQTypedValue value = AMQTypedValue.readFromBuffer(baid);
_properties.put(key, value);
}
- while (in.available() > 0);
+ while (baid.available() > 0);
}
@@ -1101,7 +1119,7 @@ public class FieldTable
FieldTable table = new FieldTable();
for(Map.Entry<String,Object> entry : map.entrySet())
{
- table.put(new AMQShortString(entry.getKey()), entry.getValue());
+ table.put(AMQShortString.valueOf(entry.getKey()), entry.getValue());
}
return table;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
index 438a46f28b..af0c5b845c 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.framing;
+import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
@@ -30,7 +31,7 @@ public class FieldTableFactory
return new FieldTable();
}
- public static FieldTable newFieldTable(DataInputStream byteBuffer, long length) throws AMQFrameDecodingException, IOException
+ public static FieldTable newFieldTable(DataInput byteBuffer, long length) throws AMQFrameDecodingException, IOException
{
return new FieldTable(byteBuffer, length);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
index a6ce721a50..95b6246717 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
@@ -21,7 +21,7 @@
package org.apache.qpid.framing;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
@@ -56,7 +56,7 @@ public class HeartbeatBody implements AMQBody
return 0;//heartbeats we generate have no payload
}
- public void writePayload(DataOutputStream buffer)
+ public void writePayload(DataOutput buffer)
{
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
index dfc49c6167..971caca41a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
@@ -20,12 +20,13 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
+import org.apache.qpid.codec.MarkableDataInput;
public class HeartbeatBodyFactory implements BodyFactory
{
- public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException
+ public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException
{
return new HeartbeatBody();
}
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index 8c018316f0..2925724dc2 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -21,13 +21,10 @@
package org.apache.qpid.framing;
import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.MarkableDataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
import java.util.Arrays;
public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
@@ -66,7 +63,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion());
}
- public ProtocolInitiation(DataInputStream in) throws IOException
+ public ProtocolInitiation(MarkableDataInput in) throws IOException
{
_protocolHeader = new byte[4];
in.read(_protocolHeader);
@@ -82,7 +79,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
return 4 + 1 + 1 + 1 + 1;
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(DataOutput buffer) throws IOException
{
buffer.write(_protocolHeader);
@@ -143,7 +140,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
* @return true if we have enough data to decode the PI frame fully, false if more
* data is required
*/
- public boolean decodable(DataInputStream in) throws IOException
+ public boolean decodable(MarkableDataInput in) throws IOException
{
return (in.available() >= 8);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
index d2925d13a8..dd854dd498 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
@@ -21,7 +21,7 @@
package org.apache.qpid.framing;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
@@ -69,7 +69,7 @@ public class SmallCompositeAMQDataBlock extends AMQDataBlock implements Encodabl
return frameSize;
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(DataOutput buffer) throws IOException
{
if (_firstFrame != null)
{
diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
index ed9136f7c9..e770fdd7e4 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
@@ -23,6 +23,7 @@ package org.apache.qpid.framing;
import java.io.DataInputStream;
import java.io.IOException;
+import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.slf4j.Logger;
@@ -145,7 +146,7 @@ public class VersionSpecificRegistry
}
- public AMQMethodBody get(short classID, short methodID, DataInputStream in, long size) throws AMQFrameDecodingException, IOException
+ public AMQMethodBody get(short classID, short methodID, MarkableDataInput in, long size) throws AMQFrameDecodingException, IOException
{
AMQMethodBodyInstanceFactory bodyFactory;
try
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index b78433052c..dee5f696b9 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -383,13 +383,19 @@ public class Connection extends ConnectionInvoker
public void received(ProtocolEvent event)
{
- log.debug("RECV: [%s] %s", this, event);
+ if(log.isDebugEnabled())
+ {
+ log.debug("RECV: [%s] %s", this, event);
+ }
event.delegate(this, delegate);
}
public void send(ProtocolEvent event)
{
- log.debug("SEND: [%s] %s", this, event);
+ if(log.isDebugEnabled())
+ {
+ log.debug("SEND: [%s] %s", this, event);
+ }
Sender s = sender;
if (s == null)
{
@@ -400,8 +406,15 @@ public class Connection extends ConnectionInvoker
public void flush()
{
- log.debug("FLUSH: [%s]", this);
- sender.flush();
+ if(log.isDebugEnabled())
+ {
+ log.debug("FLUSH: [%s]", this);
+ }
+ final Sender<ProtocolEvent> theSender = sender;
+ if(theSender != null)
+ {
+ theSender.flush();
+ }
}
protected void invoke(Method method)
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Header.java b/java/common/src/main/java/org/apache/qpid/transport/Header.java
index 9439e5e0de..543856ca39 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Header.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Header.java
@@ -20,13 +20,7 @@
*/
package org.apache.qpid.transport;
-import org.apache.qpid.transport.network.Frame;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.LinkedHashMap;
-import java.nio.ByteBuffer;
+import java.util.*;
/**
@@ -35,45 +29,87 @@ import java.nio.ByteBuffer;
* @author Rafael H. Schloming
*/
-public class Header {
+public class Header
+{
- private final Struct[] structs;
+ private final DeliveryProperties _deliveryProps;
+ private final MessageProperties _messageProps;
+ private final List<Struct> _nonStandardProps;
- public Header(List<Struct> structs)
+ public Header(DeliveryProperties deliveryProps, MessageProperties messageProps)
{
- this(structs.toArray(new Struct[structs.size()]));
+ this(deliveryProps, messageProps, null);
}
- public Header(Struct ... structs)
+ public Header(DeliveryProperties deliveryProps, MessageProperties messageProps, List<Struct> nonStandardProps)
{
- this.structs = structs;
+ _deliveryProps = deliveryProps;
+ _messageProps = messageProps;
+ _nonStandardProps = nonStandardProps;
}
public Struct[] getStructs()
{
+ int size = 0;
+ if(_deliveryProps != null)
+ {
+ size++;
+ }
+ if(_messageProps != null)
+ {
+ size++;
+ }
+ if(_nonStandardProps != null)
+ {
+ size+=_nonStandardProps.size();
+ }
+ Struct[] structs = new Struct[size];
+ int index = 0;
+ if(_deliveryProps != null)
+ {
+ structs[index++] = _deliveryProps;
+ }
+ if(_messageProps != null)
+ {
+ structs[index++] = _messageProps;
+ }
+ if(_nonStandardProps != null)
+ {
+ for(Struct struct : _nonStandardProps)
+ {
+ structs[index++] = struct;
+ }
+ }
+
return structs;
}
+ public DeliveryProperties getDeliveryProperties()
+ {
+ return _deliveryProps;
+ }
- public <T> T get(Class<T> klass)
+ public MessageProperties getMessageProperties()
{
- for (Struct st : structs)
- {
- if (klass.isInstance(st))
- {
- return (T) st;
- }
- }
+ return _messageProps;
+ }
- return null;
+ public List<Struct> getNonStandardProperties()
+ {
+ return _nonStandardProps;
}
public String toString()
{
- StringBuffer str = new StringBuffer();
+ StringBuilder str = new StringBuilder();
str.append(" Header(");
boolean first = true;
- for (Struct s : structs)
+ if(_deliveryProps !=null)
+ {
+ first=false;
+ str.append(_deliveryProps);
+ }
+ if(_messageProps != null)
{
if (first)
{
@@ -83,9 +119,24 @@ public class Header {
{
str.append(", ");
}
- str.append(s);
+ str.append(_messageProps);
+ }
+ if(_nonStandardProps != null)
+ {
+ for (Struct s : _nonStandardProps)
+ {
+ if (first)
+ {
+ first = false;
+ }
+ else
+ {
+ str.append(", ");
+ }
+ str.append(s);
+ }
}
- str.append(")");
+ str.append(')');
return str.toString();
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Range.java b/java/common/src/main/java/org/apache/qpid/transport/Range.java
index f4335dc8a6..f976337788 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Range.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Range.java
@@ -21,6 +21,8 @@
package org.apache.qpid.transport;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import static org.apache.qpid.util.Serial.*;
@@ -32,94 +34,259 @@ import static org.apache.qpid.util.Serial.*;
* @author Rafael H. Schloming
*/
-public final class Range
+public abstract class Range implements RangeSet
{
- private final int lower;
- private final int upper;
+ public static Range newInstance(int point)
+ {
+ return new PointImpl(point);
+ }
+
+ public static Range newInstance(int lower, int upper)
+ {
+ return lower == upper ? new PointImpl(lower) : new RangeImpl(lower, upper);
+ }
+
+ public abstract int getLower();
+
+ public abstract int getUpper();
+
+ public abstract boolean includes(int value);
+
+ public abstract boolean includes(Range range);
+
+ public abstract boolean intersects(Range range);
+
+ public abstract boolean touches(Range range);
+
+ public abstract Range span(Range range);
+
+ public abstract List<Range> subtract(Range range);
+
+
+ public Range intersect(Range range)
+ {
+ int l = max(getLower(), range.getLower());
+ int r = min(getUpper(), range.getUpper());
+ if (gt(l, r))
+ {
+ return null;
+ }
+ else
+ {
+ return newInstance(l, r);
+ }
+ }
+
+
+
+ public int size()
+ {
+ return 1;
+ }
- public Range(int lower, int upper)
+ public Iterator<Range> iterator()
{
- this.lower = lower;
- this.upper = upper;
+ return new RangeIterator();
}
- public int getLower()
+ public Range getFirst()
{
- return lower;
+ return this;
}
- public int getUpper()
+ public Range getLast()
{
- return upper;
+ return this;
}
- public boolean includes(int value)
+ public void add(Range range)
{
- return le(lower, value) && le(value, upper);
+ throw new UnsupportedOperationException();
}
- public boolean includes(Range range)
+ public void add(int lower, int upper)
{
- return includes(range.lower) && includes(range.upper);
+ throw new UnsupportedOperationException();
}
- public boolean intersects(Range range)
+ public void add(int value)
{
- return (includes(range.lower) || includes(range.upper) ||
- range.includes(lower) || range.includes(upper));
+ throw new UnsupportedOperationException();
}
- public boolean touches(Range range)
+ public void clear()
{
- return (intersects(range) ||
- includes(range.upper + 1) || includes(range.lower - 1) ||
- range.includes(upper + 1) || range.includes(lower - 1));
+ throw new UnsupportedOperationException();
}
- public Range span(Range range)
+ public RangeSet copy()
{
- return new Range(min(lower, range.lower), max(upper, range.upper));
+ RangeSet rangeSet = RangeSetFactory.createRangeSet();
+ rangeSet.add(this);
+ return rangeSet;
}
- public List<Range> subtract(Range range)
+ private static class PointImpl extends Range
{
- List<Range> result = new ArrayList<Range>();
+ private final int point;
+
+ private PointImpl(int point)
+ {
+ this.point = point;
+ }
+
+ public int getLower()
+ {
+ return point;
+ }
+
+ public int getUpper()
+ {
+ return point;
+ }
+
+ public boolean includes(int value)
+ {
+ return value == point;
+ }
+
+
+ public boolean includes(Range range)
+ {
+ return range.getLower() == point && range.getUpper() == point;
+ }
+
+ public boolean intersects(Range range)
+ {
+ return range.includes(point);
+ }
- if (includes(range.lower) && le(lower, range.lower - 1))
+ public boolean touches(Range range)
{
- result.add(new Range(lower, range.lower - 1));
+ return intersects(range) ||
+ includes(range.getUpper() + 1) || includes(range.getLower() - 1) ||
+ range.includes(point + 1) || range.includes(point - 1);
}
- if (includes(range.upper) && le(range.upper + 1, upper))
+ public Range span(Range range)
{
- result.add(new Range(range.upper + 1, upper));
+ return newInstance(min(point, range.getLower()), max(point, range.getUpper()));
}
- if (result.isEmpty() && !range.includes(this))
+ public List<Range> subtract(Range range)
{
- result.add(this);
+ if(range.includes(point))
+ {
+ return Collections.emptyList();
+ }
+ else
+ {
+ return Collections.singletonList((Range) this);
+ }
}
- return result;
}
- public Range intersect(Range range)
+ private static class RangeImpl extends Range
{
- int l = max(lower, range.lower);
- int r = min(upper, range.upper);
- if (gt(l, r))
+ private final int lower;
+ private final int upper;
+
+ private RangeImpl(int lower, int upper)
{
- return null;
+ this.lower = lower;
+ this.upper = upper;
}
- else
+
+ public int getLower()
+ {
+ return lower;
+ }
+
+ public int getUpper()
+ {
+ return upper;
+ }
+
+ public boolean includes(int value)
+ {
+ return le(lower, value) && le(value, upper);
+ }
+
+ public boolean includes(Range range)
+ {
+ return includes(range.getLower()) && includes(range.getUpper());
+ }
+
+ public boolean intersects(Range range)
+ {
+ return (includes(range.getLower()) || includes(range.getUpper()) ||
+ range.includes(lower) || range.includes(upper));
+ }
+
+ public boolean touches(Range range)
+ {
+ return (intersects(range) ||
+ includes(range.getUpper() + 1) || includes(range.getLower() - 1) ||
+ range.includes(upper + 1) || range.includes(lower - 1));
+ }
+
+ public Range span(Range range)
+ {
+ return newInstance(min(lower, range.getLower()), max(upper, range.getUpper()));
+ }
+
+ public List<Range> subtract(Range range)
+ {
+ List<Range> result = new ArrayList<Range>();
+
+ if (includes(range.getLower()) && le(lower, range.getLower() - 1))
+ {
+ result.add(newInstance(lower, range.getLower() - 1));
+ }
+
+ if (includes(range.getUpper()) && le(range.getUpper() + 1, upper))
+ {
+ result.add(newInstance(range.getUpper() + 1, upper));
+ }
+
+ if (result.isEmpty() && !range.includes(this))
+ {
+ result.add(this);
+ }
+
+ return result;
+ }
+
+
+ public String toString()
{
- return new Range(l, r);
+ return "[" + lower + ", " + upper + "]";
}
}
- public String toString()
+
+ private class RangeIterator implements Iterator<Range>
{
- return "[" + lower + ", " + upper + "]";
- }
+ private boolean atFirst = true;
+
+ public boolean hasNext()
+ {
+ return atFirst;
+ }
+
+ public Range next()
+ {
+
+ Range range = atFirst ? Range.this : null;
+ atFirst = false;
+ return range;
+ }
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java b/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
index 3850dc162b..34ebd02777 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.transport;
-import java.util.Iterator;
-import java.util.ListIterator;
-import java.util.LinkedList;
+import java.util.*;
import static org.apache.qpid.util.Serial.*;
@@ -32,121 +30,29 @@ import static org.apache.qpid.util.Serial.*;
* @author Rafael H. Schloming
*/
-public final class RangeSet implements Iterable<Range>
+public interface RangeSet extends Iterable<Range>
{
- private LinkedList<Range> ranges = new LinkedList<Range>();
-
- public int size()
- {
- return ranges.size();
- }
-
- public Iterator<Range> iterator()
- {
- return ranges.iterator();
- }
-
- public Range getFirst()
- {
- return ranges.getFirst();
- }
-
- public Range getLast()
- {
- return ranges.getLast();
- }
-
- public boolean includes(Range range)
- {
- for (Range r : this)
- {
- if (r.includes(range))
- {
- return true;
- }
- }
-
- return false;
- }
-
- public boolean includes(int n)
- {
- for (Range r : this)
- {
- if (r.includes(n))
- {
- return true;
- }
- }
-
- return false;
- }
-
- public void add(Range range)
- {
- ListIterator<Range> it = ranges.listIterator();
-
- while (it.hasNext())
- {
- Range next = it.next();
- if (range.touches(next))
- {
- it.remove();
- range = range.span(next);
- }
- else if (lt(range.getUpper(), next.getLower()))
- {
- it.previous();
- it.add(range);
- return;
- }
- }
-
- it.add(range);
- }
-
- public void add(int lower, int upper)
- {
- add(new Range(lower, upper));
- }
-
- public void add(int value)
- {
- add(value, value);
- }
-
- public void clear()
- {
- ranges.clear();
- }
-
- public RangeSet copy()
- {
- RangeSet copy = new RangeSet();
- copy.ranges.addAll(ranges);
- return copy;
- }
-
- public String toString()
- {
- StringBuffer str = new StringBuffer();
- str.append("{");
- boolean first = true;
- for (Range range : ranges)
- {
- if (first)
- {
- first = false;
- }
- else
- {
- str.append(", ");
- }
- str.append(range);
- }
- str.append("}");
- return str.toString();
- }
+ int size();
+
+ Iterator<Range> iterator();
+
+ Range getFirst();
+
+ Range getLast();
+
+ boolean includes(Range range);
+
+ boolean includes(int n);
+
+ void add(Range range);
+
+ void add(int lower, int upper);
+
+ void add(int value);
+
+ void clear();
+
+ RangeSet copy();
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java b/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java
new file mode 100644
index 0000000000..0f19d7e2b2
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+public class RangeSetFactory
+{
+ public static RangeSet createRangeSet()
+ {
+ return new RangeSetImpl();
+ }
+
+ public static RangeSet createRangeSet(int size)
+ {
+ return new RangeSetImpl(size);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java b/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java
new file mode 100644
index 0000000000..f2540afb40
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+
+import static org.apache.qpid.util.Serial.lt;
+
+public class RangeSetImpl implements RangeSet
+{
+
+ private List<Range> ranges;
+
+ public RangeSetImpl()
+ {
+ ranges = new ArrayList<Range>();
+ }
+
+ public RangeSetImpl(int size)
+ {
+ ranges = new ArrayList<Range>(size);
+ }
+
+
+ public RangeSetImpl(org.apache.qpid.transport.RangeSetImpl copy)
+ {
+ ranges = new ArrayList<Range>(copy.ranges);
+ }
+
+ public int size()
+ {
+ return ranges.size();
+ }
+
+ public Iterator<Range> iterator()
+ {
+ return ranges.iterator();
+ }
+
+ public Range getFirst()
+ {
+ return ranges.get(0);
+ }
+
+ public Range getLast()
+ {
+ return ranges.get(ranges.size() - 1);
+ }
+
+ public boolean includes(Range range)
+ {
+ for (Range r : this)
+ {
+ if (r.includes(range))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public boolean includes(int n)
+ {
+ for (Range r : this)
+ {
+ if (r.includes(n))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public void add(Range range)
+ {
+ ListIterator<Range> it = ranges.listIterator();
+
+ while (it.hasNext())
+ {
+ Range next = it.next();
+ if (range.touches(next))
+ {
+ it.remove();
+ range = range.span(next);
+ }
+ else if (lt(range.getUpper(), next.getLower()))
+ {
+ it.previous();
+ it.add(range);
+ return;
+ }
+ }
+
+ it.add(range);
+ }
+
+ public void add(int lower, int upper)
+ {
+ switch(ranges.size())
+ {
+ case 0:
+ ranges.add(Range.newInstance(lower, upper));
+ break;
+
+ case 1:
+ Range first = ranges.get(0);
+ if(first.getUpper() + 1 >= lower && upper >= first.getUpper())
+ {
+ ranges.set(0, Range.newInstance(first.getLower(), upper));
+ break;
+ }
+
+ default:
+ add(Range.newInstance(lower, upper));
+ }
+
+
+ }
+
+ public void add(int value)
+ {
+ add(value, value);
+ }
+
+ public void clear()
+ {
+ ranges.clear();
+ }
+
+ public RangeSet copy()
+ {
+ return new org.apache.qpid.transport.RangeSetImpl(this);
+ }
+
+ public String toString()
+ {
+ StringBuffer str = new StringBuffer();
+ str.append("{");
+ boolean first = true;
+ for (Range range : ranges)
+ {
+ if (first)
+ {
+ first = false;
+ }
+ else
+ {
+ str.append(", ");
+ }
+ str.append(range);
+ }
+ str.append("}");
+ return str.toString();
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 321e5256b2..3e823ba6fe 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -247,7 +247,7 @@ public class Session extends SessionInvoker
synchronized (processedLock)
{
incomingInit = false;
- processed = new RangeSet();
+ processed = RangeSetFactory.createRangeSet();
}
}
@@ -276,22 +276,22 @@ public class Session extends SessionInvoker
else if (m instanceof MessageTransfer)
{
MessageTransfer xfr = (MessageTransfer)m;
-
- if (xfr.getHeader() != null)
+
+ Header header = xfr.getHeader();
+
+ if (header != null)
{
- if (xfr.getHeader().get(DeliveryProperties.class) != null)
+ if (header.getDeliveryProperties() != null)
{
- xfr.getHeader().get(DeliveryProperties.class).setRedelivered(true);
+ header.getDeliveryProperties().setRedelivered(true);
}
else
{
- Struct[] structs = xfr.getHeader().getStructs();
DeliveryProperties deliveryProps = new DeliveryProperties();
deliveryProps.setRedelivered(true);
-
- List<Struct> list = Arrays.asList(structs);
- list.add(deliveryProps);
- xfr.setHeader(new Header(list));
+
+ xfr.setHeader(new Header(deliveryProps, header.getMessageProperties(),
+ header.getNonStandardProperties()));
}
}
@@ -299,7 +299,7 @@ public class Session extends SessionInvoker
{
DeliveryProperties deliveryProps = new DeliveryProperties();
deliveryProps.setRedelivered(true);
- xfr.setHeader(new Header(deliveryProps));
+ xfr.setHeader(new Header(deliveryProps, null, null));
}
}
sessionCommandPoint(m.getId(), 0);
@@ -394,38 +394,46 @@ public class Session extends SessionInvoker
public void processed(int command)
{
- processed(new Range(command, command));
+ processed(command, command);
}
- public void processed(int lower, int upper)
+ public void processed(Range range)
{
- processed(new Range(lower, upper));
+ processed(range.getLower(), range.getUpper());
}
- public void processed(Range range)
+ public void processed(int lower, int upper)
{
- log.debug("%s processed(%s) %s %s", this, range, syncPoint, maxProcessed);
+ if(log.isDebugEnabled())
+ {
+ log.debug("%s processed([%d,%d]) %s %s", this, lower, upper, syncPoint, maxProcessed);
+ }
boolean flush;
synchronized (processedLock)
{
- log.debug("%s", processed);
+ if(log.isDebugEnabled())
+ {
+ log.debug("%s", processed);
+ }
- if (ge(range.getUpper(), commandsIn))
+ if (ge(upper, commandsIn))
{
throw new IllegalArgumentException
- ("range exceeds max received command-id: " + range);
+ ("range exceeds max received command-id: " + Range.newInstance(lower, upper));
}
- processed.add(range);
+ processed.add(lower, upper);
+
Range first = processed.getFirst();
- int lower = first.getLower();
- int upper = first.getUpper();
+
+ int flower = first.getLower();
+ int fupper = first.getUpper();
int old = maxProcessed;
- if (le(lower, maxProcessed + 1))
+ if (le(flower, maxProcessed + 1))
{
- maxProcessed = max(maxProcessed, upper);
+ maxProcessed = max(maxProcessed, fupper);
}
boolean synced = ge(maxProcessed, syncPoint);
flush = lt(old, syncPoint) && synced;
@@ -442,7 +450,7 @@ public class Session extends SessionInvoker
void flushExpected()
{
- RangeSet rs = new RangeSet();
+ RangeSet rs = RangeSetFactory.createRangeSet();
synchronized (processedLock)
{
if (incomingInit)
@@ -478,7 +486,7 @@ public class Session extends SessionInvoker
{
synchronized (processedLock)
{
- RangeSet newProcessed = new RangeSet();
+ RangeSet newProcessed = RangeSetFactory.createRangeSet();
for (Range pr : processed)
{
for (Range kr : kc)
@@ -534,7 +542,12 @@ public class Session extends SessionInvoker
{
maxComplete = max(maxComplete, upper);
}
- log.debug("%s commands remaining: %s", this, commandsOut - maxComplete);
+
+ if(log.isDebugEnabled())
+ {
+ log.debug("%s commands remaining: %s", this, commandsOut - maxComplete);
+ }
+
commands.notifyAll();
return gt(maxComplete, old);
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index 3341149e5f..028b912ba1 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -91,21 +91,38 @@ public class SessionDelegate
{
RangeSet ranges = cmp.getCommands();
RangeSet known = null;
- if (cmp.getTimelyReply())
- {
- known = new RangeSet();
- }
if (ranges != null)
{
- for (Range range : ranges)
+ if(ranges.size() == 1)
{
+ Range range = ranges.getFirst();
boolean advanced = ssn.complete(range.getLower(), range.getUpper());
- if (advanced && known != null)
+
+ if(advanced && cmp.getTimelyReply())
{
- known.add(range);
+ known = range;
}
}
+ else
+ {
+ if (cmp.getTimelyReply())
+ {
+ known = RangeSetFactory.createRangeSet();
+ }
+ for (Range range : ranges)
+ {
+ boolean advanced = ssn.complete(range.getLower(), range.getUpper());
+ if (advanced && known != null)
+ {
+ known.add(range);
+ }
+ }
+ }
+ }
+ else if (cmp.getTimelyReply())
+ {
+ known = RangeSetFactory.createRangeSet();
}
if (known != null)
diff --git a/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
index 09ce6a7eb1..6ff3b21400 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
@@ -29,12 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.transport.Binary;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Struct;
-import org.apache.qpid.transport.Type;
-
-import static org.apache.qpid.transport.util.Functions.*;
+import org.apache.qpid.transport.*;
/**
@@ -194,18 +189,19 @@ abstract class AbstractDecoder implements Decoder
public RangeSet readSequenceSet()
{
int count = readUint16()/8;
- if (count == 0)
+ switch(count)
{
- return null;
- }
- else
- {
- RangeSet ranges = new RangeSet();
- for (int i = 0; i < count; i++)
- {
- ranges.add(readSequenceNo(), readSequenceNo());
- }
- return ranges;
+ case 0:
+ return null;
+ case 1:
+ return Range.newInstance(readSequenceNo(), readSequenceNo());
+ default:
+ RangeSet ranges = RangeSetFactory.createRangeSet(count);
+ for (int i = 0; i < count; i++)
+ {
+ ranges.add(readSequenceNo(), readSequenceNo());
+ }
+ return ranges;
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
index 4486b03a67..d9150bed65 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
@@ -70,6 +70,16 @@ public final class BBEncoder extends AbstractEncoder
return slice;
}
+ public int position()
+ {
+ return out.position();
+ }
+
+ public ByteBuffer underlyingBuffer()
+ {
+ return out;
+ }
+
private void grow(int size)
{
ByteBuffer old = out;
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
index 1a85ab88a5..8cd5c29f6d 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
@@ -26,13 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.Method;
-import org.apache.qpid.transport.ProtocolError;
-import org.apache.qpid.transport.ProtocolEvent;
-import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.*;
import org.apache.qpid.transport.codec.BBDecoder;
/**
@@ -198,12 +192,33 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
break;
case HEADER:
command = getIncompleteCommand(channel);
- List<Struct> structs = new ArrayList<Struct>(2);
+ List<Struct> structs = null;
+ DeliveryProperties deliveryProps = null;
+ MessageProperties messageProps = null;
+
while (dec.hasRemaining())
{
- structs.add(dec.readStruct32());
+ Struct struct = dec.readStruct32();
+ if(struct instanceof DeliveryProperties && deliveryProps == null)
+ {
+ deliveryProps = (DeliveryProperties) struct;
+ }
+ else if(struct instanceof MessageProperties && messageProps == null)
+ {
+ messageProps = (MessageProperties) struct;
+ }
+ else
+ {
+ if(structs == null)
+ {
+ structs = new ArrayList<Struct>(2);
+ }
+ structs.add(struct);
+ }
+
}
- command.setHeader(new Header(structs));
+ command.setHeader(new Header(deliveryProps,messageProps,structs));
+
if (frame.isLastSegment())
{
setIncompleteCommand(channel, null);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index 685034d1a9..6ac9df9bc3 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -87,27 +87,35 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega
}
}
+ private final ByteBuffer _frameHeader = ByteBuffer.allocate(HEADER_SIZE);
+
+ {
+ _frameHeader.order(ByteOrder.BIG_ENDIAN);
+ }
+
private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
{
synchronized (sendlock)
{
- ByteBuffer data = ByteBuffer.allocate(size + HEADER_SIZE);
- data.order(ByteOrder.BIG_ENDIAN);
+ ByteBuffer data = _frameHeader;
+ _frameHeader.rewind();
+
data.put(0, flags);
data.put(1, type);
data.putShort(2, (short) (size + HEADER_SIZE));
data.put(5, track);
data.putShort(6, (short) channel);
- data.position(HEADER_SIZE);
+
int limit = buf.limit();
buf.limit(buf.position() + size);
- data.put(buf);
- buf.limit(limit);
-
+
data.rewind();
sender.send(data);
+ sender.send(buf);
+ buf.limit(limit);
+
}
}
@@ -179,7 +187,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega
}
}
method.write(enc);
- ByteBuffer methodSeg = enc.segment();
+ int methodLimit = enc.position();
byte flags = FIRST_SEG;
@@ -189,29 +197,44 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega
flags |= LAST_SEG;
}
- ByteBuffer headerSeg = null;
+ int headerLimit = -1;
if (payload)
{
final Header hdr = method.getHeader();
if (hdr != null)
{
- final Struct[] structs = hdr.getStructs();
-
- for (Struct st : structs)
+ if(hdr.getDeliveryProperties() != null)
+ {
+ enc.writeStruct32(hdr.getDeliveryProperties());
+ }
+ if(hdr.getMessageProperties() != null)
+ {
+ enc.writeStruct32(hdr.getMessageProperties());
+ }
+ if(hdr.getNonStandardProperties() != null)
{
- enc.writeStruct32(st);
+ for (Struct st : hdr.getNonStandardProperties())
+ {
+ enc.writeStruct32(st);
+ }
}
}
- headerSeg = enc.segment();
+ headerLimit = enc.position();
}
synchronized (sendlock)
{
- fragment(flags, type, method, methodSeg);
+ ByteBuffer buf = enc.underlyingBuffer();
+ buf.position(0);
+ buf.limit(methodLimit);
+
+ fragment(flags, type, method, buf);
if (payload)
{
ByteBuffer body = method.getBody();
- fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, headerSeg);
+ buf.limit(headerLimit);
+ buf.position(methodLimit);
+ fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf);
if (body != null)
{
fragment(LAST_SEG, SegmentType.BODY, method, body);