summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-09-09 17:47:22 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-09-09 17:47:22 +0000
commit5270591c7831e559925e720c6bfc0c78c514b95a (patch)
tree8ce7aabb2f5a4a87c4c53b3e8f810c62392eba11 /java/client/src
parent282e16aab532d842dffad3935bfd1a952bc584be (diff)
downloadqpid-python-5270591c7831e559925e720c6bfc0c78c514b95a.tar.gz
QPID-2627 : Remove dependency on MINA
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1167311 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java23
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java124
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java760
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java103
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java17
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java169
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java71
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java216
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java129
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java161
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java46
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java674
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java370
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java37
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java13
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java5
28 files changed, 1636 insertions, 1385 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 5821fee7ff..d739903ee6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -201,8 +201,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT));
}
}
-
- messageProps.setContentLength(message.getContentLength());
+
+ ByteBuffer data = message.getData();
+ messageProps.setContentLength(data.remaining());
// send the message
try
@@ -221,8 +222,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
boolean unreliable = (destination.getDestSyntax() == DestSyntax.ADDR) &&
(destination.getLink().getReliability() == Reliability.UNRELIABLE);
- org.apache.mina.common.ByteBuffer data = message.getData();
- ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
+
+ ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.slice();
ssn.messageTransfer(destination.getExchangeName() == null ? "" : destination.getExchangeName().toString(),
MessageAcceptMode.NONE,
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 27f7486890..26e9814e33 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -27,14 +27,13 @@ import javax.jms.Message;
import javax.jms.Topic;
import javax.jms.Queue;
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.AMQMessageDelegate;
import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.CompositeAMQDataBlock;
@@ -186,7 +185,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
if (frames.length == (offset + 1))
{
- frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
+ byte[] data = new byte[payload.remaining()];
+ payload.get(data);
+ frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(data));
}
else
{
@@ -198,7 +199,10 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
payload.position((int) framePayloadMax * (i - offset));
int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
payload.limit(payload.position() + length);
- frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
+ byte[] data = new byte[payload.remaining()];
+ payload.get(data);
+
+ frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(data));
remaining -= length;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java
index 8c3f2fd08f..e5b95f54f4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java
@@ -21,11 +21,6 @@
package org.apache.qpid.client.message;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.AMQException;
-
public interface AMQMessageDelegateFactory<D extends AMQMessageDelegate>
{
public static AMQMessageDelegateFactory DEFAULT_FACTORY = null;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
index cec4268a7b..b9ba946a20 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -499,7 +499,6 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
}
- _contentHeaderProperties.updated();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
index 7f735e0722..be71c8c657 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
@@ -24,11 +24,11 @@ package org.apache.qpid.client.message;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.nio.ByteBuffer;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.transport.codec.BBEncoder;
@@ -81,18 +81,19 @@ public class AMQPEncodedMapMessage extends JMSMapMessage
@ Override
public ByteBuffer getData()
{
- writeMapToData();
- return _data;
+ BBEncoder encoder = new BBEncoder(1024);
+ encoder.writeMap(_map);
+ return encoder.segment();
}
@ Override
- protected void populateMapFromData() throws JMSException
+ protected void populateMapFromData(ByteBuffer data) throws JMSException
{
- if (_data != null)
+ if (data != null)
{
- _data.rewind();
+ data.rewind();
BBDecoder decoder = new BBDecoder();
- decoder.init(_data.buf());
+ decoder.init(data);
_map = decoder.readMap();
}
else
@@ -101,14 +102,6 @@ public class AMQPEncodedMapMessage extends JMSMapMessage
}
}
- @ Override
- protected void writeMapToData()
- {
- BBEncoder encoder = new BBEncoder(1024);
- encoder.writeMap(_map);
- _data = ByteBuffer.wrap(encoder.segment());
- }
-
// for testing
public Map<String,Object> getMap()
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
index 4978d1ce85..2c38f153cb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
@@ -1,6 +1,6 @@
package org.apache.qpid.client.message;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,22 +8,23 @@ package org.apache.qpid.client.message;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import javax.jms.JMSException;
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.AMQException;
public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory
@@ -36,7 +37,7 @@ public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory
return new AMQPEncodedMapMessage(delegate,data);
}
- @Override
+
public AbstractJMSMessage createMessage(
AMQMessageDelegateFactory delegateFactory) throws JMSException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
deleted file mode 100644
index 3846ee043d..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.message;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-
-import javax.jms.JMSException;
-import javax.jms.MessageEOFException;
-
-import org.apache.mina.common.ByteBuffer;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.transport.util.Functions;
-
-/**
- * @author Apache Software Foundation
- */
-public abstract class AbstractBytesMessage extends AbstractJMSMessage
-{
-
- /**
- * The default initial size of the buffer. The buffer expands automatically.
- */
- private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024;
-
- AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory)
- {
- this(delegateFactory, null);
- }
-
- /**
- * Construct a bytes message with existing data.
- *
- * @param delegateFactory
- * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
- */
- AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
- {
- super(delegateFactory, data); // this instanties a content header
- setContentType(getMimeType());
-
- if (_data == null)
- {
- allocateInitialBuffer();
- }
- }
-
- protected void allocateInitialBuffer()
- {
- _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
- _data.setAutoExpand(true);
- }
-
- AbstractBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
- {
- super(delegate, data);
- setContentType(getMimeType());
- }
-
-
- public void clearBodyImpl() throws JMSException
- {
- allocateInitialBuffer();
- }
-
- public String toBodyString() throws JMSException
- {
- try
- {
- if (_data != null)
- {
- return Functions.str(_data.buf(), 100,0);
- }
- else
- {
- return "";
- }
-
- }
- catch (Exception e)
- {
- JMSException jmse = new JMSException(e.toString());
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
- }
-
- }
-
- /**
- * Check that there is at least a certain number of bytes available to read
- *
- * @param len the number of bytes
- * @throws javax.jms.MessageEOFException if there are less than len bytes available to read
- */
- protected void checkAvailable(int len) throws MessageEOFException
- {
- if (_data.remaining() < len)
- {
- throw new MessageEOFException("Unable to read " + len + " bytes");
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
index 85818dcd2b..ddeb62fbf6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
@@ -21,784 +21,96 @@
package org.apache.qpid.client.message;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
+import java.nio.ByteBuffer;
import javax.jms.JMSException;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.transport.util.Functions;
/**
* @author Apache Software Foundation
*/
-public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage
+public abstract class AbstractBytesTypedMessage extends AbstractJMSMessage
{
+ protected boolean _readableMessage = false;
- protected static final byte BOOLEAN_TYPE = (byte) 1;
-
- protected static final byte BYTE_TYPE = (byte) 2;
-
- protected static final byte BYTEARRAY_TYPE = (byte) 3;
-
- protected static final byte SHORT_TYPE = (byte) 4;
-
- protected static final byte CHAR_TYPE = (byte) 5;
-
- protected static final byte INT_TYPE = (byte) 6;
-
- protected static final byte LONG_TYPE = (byte) 7;
-
- protected static final byte FLOAT_TYPE = (byte) 8;
-
- protected static final byte DOUBLE_TYPE = (byte) 9;
-
- protected static final byte STRING_TYPE = (byte) 10;
-
- protected static final byte NULL_STRING_TYPE = (byte) 11;
-
- /**
- * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
- * a byte array in multiple chunks, hence this is used to track how much is left to be read
- */
- private int _byteArrayRemaining = -1;
-
- AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory)
- {
-
- this(delegateFactory, null);
- }
-
- /**
- * Construct a stream message with existing data.
- *
- * @param delegateFactory
- * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
- */
- AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
+ AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedMessage)
{
- super(delegateFactory, data); // this instanties a content header
+ super(delegateFactory, fromReceivedMessage); // this instanties a content header
+ _readableMessage = fromReceivedMessage;
}
- AbstractBytesTypedMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+ AbstractBytesTypedMessage(AMQMessageDelegate delegate, boolean fromReceivedMessage) throws AMQException
{
- super(delegate, data);
- }
+ super(delegate, fromReceivedMessage);
+ _readableMessage = fromReceivedMessage;
-
- protected byte readWireType() throws MessageFormatException, MessageEOFException,
- MessageNotReadableException
- {
- checkReadable();
- checkAvailable(1);
- return _data.get();
}
- protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException
+ protected void checkReadable() throws MessageNotReadableException
{
- checkWritable();
- _data.put(type);
- _changedData = true;
- }
-
- protected boolean readBoolean() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- boolean result;
- try
+ if (!_readableMessage)
{
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Boolean.parseBoolean(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
+ throw new MessageNotReadableException("You need to call reset() to make the message readable");
}
}
- private boolean readBooleanImpl()
+ @Override
+ protected void checkWritable() throws MessageNotWriteableException
{
- return _data.get() != 0;
- }
-
- protected byte readByte() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- byte result;
- try
+ super.checkWritable();
+ if(_readableMessage)
{
- switch (wireType)
- {
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Byte.parseByte(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
- }
+ throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
}
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- return result;
}
- private byte readByteImpl()
+ public void clearBody() throws JMSException
{
- return _data.get();
+ super.clearBody();
+ _readableMessage = false;
}
- protected short readShort() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- short result;
- try
- {
- switch (wireType)
- {
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Short.parseShort(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a short");
- }
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- return result;
- }
- private short readShortImpl()
+ public String toBodyString() throws JMSException
{
- return _data.getShort();
- }
-
- /**
- * Note that this method reads a unicode character as two bytes from the stream
- *
- * @return the character read from the stream
- * @throws javax.jms.JMSException
- */
- protected char readChar() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
try
{
- if(wireType == NULL_STRING_TYPE){
- throw new NullPointerException();
+ ByteBuffer data = getData();
+ if (data != null)
+ {
+ return Functions.str(data, 100, 0);
+ }
+ else
+ {
+ return "";
}
- if (wireType != CHAR_TYPE)
- {
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a char");
- }
- else
- {
- checkAvailable(2);
- return readCharImpl();
- }
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private char readCharImpl()
- {
- return _data.getChar();
- }
-
- protected int readInt() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- int result;
- try
- {
- switch (wireType)
- {
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Integer.parseInt(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to an int");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- protected int readIntImpl()
- {
- return _data.getInt();
- }
-
- protected long readLong() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- long result;
- try
- {
- switch (wireType)
- {
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Long.parseLong(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a long");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private long readLongImpl()
- {
- return _data.getLong();
- }
-
- protected float readFloat() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- float result;
- try
- {
- switch (wireType)
- {
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Float.parseFloat(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a float");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private float readFloatImpl()
- {
- return _data.getFloat();
- }
-
- protected double readDouble() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- double result;
- try
- {
- switch (wireType)
- {
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Double.parseDouble(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a double");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private double readDoubleImpl()
- {
- return _data.getDouble();
- }
-
- protected String readString() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- String result;
- try
- {
- switch (wireType)
- {
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
- case NULL_STRING_TYPE:
- result = null;
- throw new NullPointerException("data is null");
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = String.valueOf(readBooleanImpl());
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = String.valueOf(readLongImpl());
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readIntImpl());
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = String.valueOf(readShortImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = String.valueOf(readByteImpl());
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readFloatImpl());
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = String.valueOf(readDoubleImpl());
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = String.valueOf(readCharImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a String");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- protected String readStringImpl() throws JMSException
- {
- try
- {
- return _data.getString(Charset.forName("UTF-8").newDecoder());
}
- catch (CharacterCodingException e)
+ catch (Exception e)
{
- JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
+ JMSException jmse = new JMSException(e.toString());
jmse.setLinkedException(e);
jmse.initCause(e);
throw jmse;
}
- }
-
- protected int readBytes(byte[] bytes) throws JMSException
- {
- if (bytes == null)
- {
- throw new IllegalArgumentException("byte array must not be null");
- }
- checkReadable();
- // first call
- if (_byteArrayRemaining == -1)
- {
- // type discriminator checked separately so you get a MessageFormatException rather than
- // an EOF even in the case where both would be applicable
- checkAvailable(1);
- byte wireType = readWireType();
- if (wireType != BYTEARRAY_TYPE)
- {
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
- }
- checkAvailable(4);
- int size = _data.getInt();
- // length of -1 indicates null
- if (size == -1)
- {
- return -1;
- }
- else
- {
- if (size > _data.remaining())
- {
- throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " +
- _data.remaining() + " bytes");
- }
- else
- {
- _byteArrayRemaining = size;
- }
- }
- }
- else if (_byteArrayRemaining == 0)
- {
- _byteArrayRemaining = -1;
- return -1;
- }
-
- int returnedSize = readBytesImpl(bytes);
- if (returnedSize < bytes.length)
- {
- _byteArrayRemaining = -1;
- }
- return returnedSize;
- }
-
- private int readBytesImpl(byte[] bytes)
- {
- int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
- _byteArrayRemaining -= count;
-
- if (count == 0)
- {
- return 0;
- }
- else
- {
- _data.get(bytes, 0, count);
- return count;
- }
- }
-
- protected Object readObject() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- Object result = null;
- try
- {
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case BYTEARRAY_TYPE:
- checkAvailable(4);
- int size = _data.getInt();
- if (size == -1)
- {
- result = null;
- }
- else
- {
- _byteArrayRemaining = size;
- byte[] bytesResult = new byte[size];
- readBytesImpl(bytesResult);
- result = bytesResult;
- }
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = readCharImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case NULL_STRING_TYPE:
- result = null;
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- protected void writeBoolean(boolean b) throws JMSException
- {
- writeTypeDiscriminator(BOOLEAN_TYPE);
- _data.put(b ? (byte) 1 : (byte) 0);
- }
-
- protected void writeByte(byte b) throws JMSException
- {
- writeTypeDiscriminator(BYTE_TYPE);
- _data.put(b);
- }
-
- protected void writeShort(short i) throws JMSException
- {
- writeTypeDiscriminator(SHORT_TYPE);
- _data.putShort(i);
- }
-
- protected void writeChar(char c) throws JMSException
- {
- writeTypeDiscriminator(CHAR_TYPE);
- _data.putChar(c);
- }
-
- protected void writeInt(int i) throws JMSException
- {
- writeTypeDiscriminator(INT_TYPE);
- writeIntImpl(i);
- }
-
- protected void writeIntImpl(int i)
- {
- _data.putInt(i);
- }
-
- protected void writeLong(long l) throws JMSException
- {
- writeTypeDiscriminator(LONG_TYPE);
- _data.putLong(l);
- }
- protected void writeFloat(float v) throws JMSException
- {
- writeTypeDiscriminator(FLOAT_TYPE);
- _data.putFloat(v);
}
- protected void writeDouble(double v) throws JMSException
- {
- writeTypeDiscriminator(DOUBLE_TYPE);
- _data.putDouble(v);
- }
- protected void writeString(String string) throws JMSException
- {
- if (string == null)
- {
- writeTypeDiscriminator(NULL_STRING_TYPE);
- }
- else
- {
- writeTypeDiscriminator(STRING_TYPE);
- try
- {
- writeStringImpl(string);
- }
- catch (CharacterCodingException e)
- {
- JMSException jmse = new JMSException("Unable to encode string: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
- }
- }
- }
-
- protected void writeStringImpl(String string)
- throws CharacterCodingException
- {
- _data.putString(string, Charset.forName("UTF-8").newEncoder());
- // we must write the null terminator ourselves
- _data.put((byte) 0);
- }
+ abstract public void reset();
- protected void writeBytes(byte[] bytes) throws JMSException
- {
- writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
- }
- protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException
- {
- writeTypeDiscriminator(BYTEARRAY_TYPE);
- if (bytes == null)
- {
- _data.putInt(-1);
- }
- else
- {
- _data.putInt(length);
- _data.put(bytes, offset, length);
- }
- }
- protected void writeObject(Object object) throws JMSException
- {
- checkWritable();
- Class clazz;
- if (object == null)
- {
- // string handles the output of null values
- clazz = String.class;
- }
- else
- {
- clazz = object.getClass();
- }
-
- if (clazz == Byte.class)
- {
- writeByte((Byte) object);
- }
- else if (clazz == Boolean.class)
- {
- writeBoolean((Boolean) object);
- }
- else if (clazz == byte[].class)
- {
- writeBytes((byte[]) object);
- }
- else if (clazz == Short.class)
- {
- writeShort((Short) object);
- }
- else if (clazz == Character.class)
- {
- writeChar((Character) object);
- }
- else if (clazz == Integer.class)
- {
- writeInt((Integer) object);
- }
- else if (clazz == Long.class)
- {
- writeLong((Long) object);
- }
- else if (clazz == Float.class)
- {
- writeFloat((Float) object);
- }
- else if (clazz == Double.class)
- {
- writeDouble((Double) object);
- }
- else if (clazz == String.class)
- {
- writeString((String) object);
- }
- else
- {
- throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
- }
- }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 6ba55b207a..f713554bfb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -20,66 +20,38 @@
*/
package org.apache.qpid.client.message;
-import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Enumeration;
import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
{
-
- protected ByteBuffer _data;
- protected boolean _readableMessage = false;
- protected boolean _changedData = true;
-
/** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
-
-
-
protected AMQMessageDelegate _delegate;
private boolean _redelivered;
+ private boolean _receivedFromServer;
- protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
+ protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedData)
{
_delegate = delegateFactory.createDelegate();
- _data = data;
- if (_data != null)
- {
- _data.acquire();
- }
-
-
- _readableMessage = (data != null);
- _changedData = (data == null);
-
+ setContentType(getMimeType());
}
- protected AbstractJMSMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+ protected AbstractJMSMessage(AMQMessageDelegate delegate, boolean fromReceivedData) throws AMQException
{
_delegate = delegate;
-
- _data = data;
- if (_data != null)
- {
- _data.acquire();
- }
-
- _readableMessage = data != null;
-
+ setContentType(getMimeType());
}
public String getJMSMessageID() throws JMSException
@@ -329,12 +301,9 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
public void clearBody() throws JMSException
{
- clearBodyImpl();
- _readableMessage = false;
-
+ _receivedFromServer = false;
}
-
public void acknowledgeThis() throws JMSException
{
_delegate.acknowledgeThis();
@@ -345,14 +314,7 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
_delegate.acknowledge();
}
- /**
- * This forces concrete classes to implement clearBody()
- *
- * @throws JMSException
- */
- public abstract void clearBodyImpl() throws JMSException;
-
- /**
+ /*
* Get a String representation of the body of the message. Used in the toString() method which outputs this before
* message properties.
*/
@@ -413,63 +375,24 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
return _delegate;
}
- public ByteBuffer getData()
- {
- // make sure we rewind the data just in case any method has moved the
- // position beyond the start
- if (_data != null)
- {
- reset();
- }
+ abstract public ByteBuffer getData() throws JMSException;
- return _data;
- }
-
- protected void checkReadable() throws MessageNotReadableException
- {
- if (!_readableMessage)
- {
- throw new MessageNotReadableException("You need to call reset() to make the message readable");
- }
- }
protected void checkWritable() throws MessageNotWriteableException
{
- if (_readableMessage)
+ if (_receivedFromServer)
{
throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
}
}
- public void reset()
- {
- if (!_changedData)
- {
- _data.rewind();
- }
- else
- {
- _data.flip();
- _changedData = false;
- }
- }
- public int getContentLength()
+ public void setReceivedFromServer()
{
- if(_data != null)
- {
- return _data.remaining();
- }
- else
- {
- return 0;
- }
+ _receivedFromServer = true;
}
- public void receivedFromServer()
- {
- _changedData = false;
- }
+
/**
* The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index 40c1df0c5d..967a1fb49f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.client.message;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
@@ -38,6 +36,8 @@ import javax.jms.JMSException;
import java.util.Iterator;
import java.util.List;
+import java.nio.ByteBuffer;
+
public abstract class AbstractJMSMessageFactory implements MessageFactory
{
private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
@@ -57,7 +57,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
_logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize + ")");
}
- data = ((ContentBody) bodies.get(0)).payload;
+ data = ByteBuffer.wrap(((ContentBody) bodies.get(0))._payload);
}
else if (bodies != null)
{
@@ -72,7 +72,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
while (it.hasNext())
{
ContentBody cb = (ContentBody) it.next();
- final ByteBuffer payload = cb.payload;
+ final ByteBuffer payload = ByteBuffer.wrap(cb._payload);
if(payload.isDirect() || payload.isReadOnly())
{
data.put(payload);
@@ -82,7 +82,6 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
data.put(payload.array(), payload.arrayOffset(), payload.limit());
}
- payload.release();
}
data.flip();
@@ -109,7 +108,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
protected AbstractJMSMessage create010MessageWithBody(long messageNbr, MessageProperties msgProps,
- DeliveryProperties deliveryProps,
+ DeliveryProperties deliveryProps,
java.nio.ByteBuffer body) throws AMQException
{
ByteBuffer data;
@@ -118,7 +117,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
if (body != null)
{
- data = ByteBuffer.wrap(body);
+ data = body;
}
else // body == null
{
@@ -155,7 +154,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
{
final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
msg.setJMSRedelivered(redelivered);
- msg.receivedFromServer();
+ msg.setReceivedFromServer();
return msg;
}
@@ -166,7 +165,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
final AbstractJMSMessage msg =
create010MessageWithBody(messageNbr,msgProps,deliveryProps, body);
msg.setJMSRedelivered(redelivered);
- msg.receivedFromServer();
+ msg.setReceivedFromServer();
return msg;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
index b87275a9ce..e252bdb719 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.message;
+import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
@@ -28,47 +29,56 @@ import java.nio.charset.CharsetEncoder;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
import javax.jms.MessageFormatException;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
+public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesMessage
{
public static final String MIME_TYPE = "application/octet-stream";
+ private TypedBytesContentReader _typedBytesContentReader;
+ private TypedBytesContentWriter _typedBytesContentWriter;
- public JMSBytesMessage(AMQMessageDelegateFactory delegateFactory)
- {
- this(delegateFactory,null);
- }
-
- /**
- * Construct a bytes message with existing data.
- *
- * @param delegateFactory
- * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
- */
- JMSBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
+ public JMSBytesMessage(AMQMessageDelegateFactory delegateFactory)
{
-
- super(delegateFactory, data); // this instanties a content header
+ super(delegateFactory,false);
+ _typedBytesContentWriter = new TypedBytesContentWriter();
}
JMSBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- super(delegate, data);
+ super(delegate, data!=null);
+ _typedBytesContentReader = new TypedBytesContentReader(data);
}
public void reset()
{
- super.reset();
_readableMessage = true;
+
+ if(_typedBytesContentReader != null)
+ {
+ _typedBytesContentReader.reset();
+ }
+ else if (_typedBytesContentWriter != null)
+ {
+ _typedBytesContentReader = new TypedBytesContentReader(_typedBytesContentWriter.getData());
+ }
+ }
+
+ @Override
+ public void clearBody() throws JMSException
+ {
+ super.clearBody();
+ _typedBytesContentReader = null;
+ _typedBytesContentWriter = new TypedBytesContentWriter();
+
}
protected String getMimeType()
@@ -76,45 +86,57 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
return MIME_TYPE;
}
+ @Override
+ public java.nio.ByteBuffer getData() throws JMSException
+ {
+ return _typedBytesContentWriter == null ? _typedBytesContentReader.getData() : _typedBytesContentWriter.getData();
+ }
+
public long getBodyLength() throws JMSException
{
checkReadable();
- return _data.limit();
+ return _typedBytesContentReader.size();
}
public boolean readBoolean() throws JMSException
{
checkReadable();
checkAvailable(1);
- return _data.get() != 0;
+
+ return _typedBytesContentReader.readBooleanImpl();
+ }
+
+ private void checkAvailable(final int i) throws MessageEOFException
+ {
+ _typedBytesContentReader.checkAvailable(1);
}
public byte readByte() throws JMSException
{
checkReadable();
checkAvailable(1);
- return _data.get();
+ return _typedBytesContentReader.readByteImpl();
}
public int readUnsignedByte() throws JMSException
{
checkReadable();
checkAvailable(1);
- return _data.getUnsigned();
+ return _typedBytesContentReader.readByteImpl() & 0xFF;
}
public short readShort() throws JMSException
{
checkReadable();
checkAvailable(2);
- return _data.getShort();
+ return _typedBytesContentReader.readShortImpl();
}
public int readUnsignedShort() throws JMSException
{
checkReadable();
checkAvailable(2);
- return _data.getUnsignedShort();
+ return _typedBytesContentReader.readShortImpl() & 0xFFFF;
}
/**
@@ -127,35 +149,35 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
{
checkReadable();
checkAvailable(2);
- return _data.getChar();
+ return _typedBytesContentReader.readCharImpl();
}
public int readInt() throws JMSException
{
checkReadable();
checkAvailable(4);
- return _data.getInt();
+ return _typedBytesContentReader.readIntImpl();
}
public long readLong() throws JMSException
{
checkReadable();
checkAvailable(8);
- return _data.getLong();
+ return _typedBytesContentReader.readLongImpl();
}
public float readFloat() throws JMSException
{
checkReadable();
checkAvailable(4);
- return _data.getFloat();
+ return _typedBytesContentReader.readFloatImpl();
}
public double readDouble() throws JMSException
{
checkReadable();
checkAvailable(8);
- return _data.getDouble();
+ return _typedBytesContentReader.readDoubleImpl();
}
public String readUTF() throws JMSException
@@ -164,34 +186,7 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
// we check only for one byte since theoretically the string could be only a
// single byte when using UTF-8 encoding
- try
- {
- short length = readShort();
- if(length == 0)
- {
- return "";
- }
- else
- {
- CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
- ByteBuffer encodedString = _data.slice();
- encodedString.limit(length);
- _data.position(_data.position()+length);
- CharBuffer string = decoder.decode(encodedString.buf());
-
- return string.toString();
- }
-
-
-
- }
- catch (CharacterCodingException e)
- {
- JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
- }
+ return _typedBytesContentReader.readLengthPrefixedUTF();
}
public int readBytes(byte[] bytes) throws JMSException
@@ -201,14 +196,14 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
throw new IllegalArgumentException("byte array must not be null");
}
checkReadable();
- int count = (_data.remaining() >= bytes.length ? bytes.length : _data.remaining());
+ int count = (_typedBytesContentReader.remaining() >= bytes.length ? bytes.length : _typedBytesContentReader.remaining());
if (count == 0)
{
return -1;
}
else
{
- _data.get(bytes, 0, count);
+ _typedBytesContentReader.readRawBytes(bytes, 0, count);
return count;
}
}
@@ -224,110 +219,82 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
throw new IllegalArgumentException("maxLength must be <= bytes.length");
}
checkReadable();
- int count = (_data.remaining() >= maxLength ? maxLength : _data.remaining());
+ int count = (_typedBytesContentReader.remaining() >= maxLength ? maxLength : _typedBytesContentReader.remaining());
if (count == 0)
{
return -1;
}
else
{
- _data.get(bytes, 0, count);
+ _typedBytesContentReader.readRawBytes(bytes, 0, count);
return count;
}
}
+
public void writeBoolean(boolean b) throws JMSException
{
checkWritable();
- _changedData = true;
- _data.put(b ? (byte) 1 : (byte) 0);
+ _typedBytesContentWriter.writeBooleanImpl(b);
}
public void writeByte(byte b) throws JMSException
{
checkWritable();
- _changedData = true;
- _data.put(b);
+ _typedBytesContentWriter.writeByteImpl(b);
}
public void writeShort(short i) throws JMSException
{
checkWritable();
- _changedData = true;
- _data.putShort(i);
+ _typedBytesContentWriter.writeShortImpl(i);
}
public void writeChar(char c) throws JMSException
{
checkWritable();
- _changedData = true;
- _data.putChar(c);
+ _typedBytesContentWriter.writeCharImpl(c);
}
public void writeInt(int i) throws JMSException
{
checkWritable();
- _changedData = true;
- _data.putInt(i);
+ _typedBytesContentWriter.writeIntImpl(i);
}
public void writeLong(long l) throws JMSException
{
checkWritable();
- _changedData = true;
- _data.putLong(l);
+ _typedBytesContentWriter.writeLongImpl(l);
}
public void writeFloat(float v) throws JMSException
{
checkWritable();
- _changedData = true;
- _data.putFloat(v);
+ _typedBytesContentWriter.writeFloatImpl(v);
}
public void writeDouble(double v) throws JMSException
{
checkWritable();
- _changedData = true;
- _data.putDouble(v);
+ _typedBytesContentWriter.writeDoubleImpl(v);
}
public void writeUTF(String string) throws JMSException
{
checkWritable();
- try
- {
- CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
- java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string));
-
- _data.putShort((short)encodedString.limit());
- _data.put(encodedString);
- _changedData = true;
- //_data.putString(string, Charset.forName("UTF-8").newEncoder());
- // we must add the null terminator manually
- //_data.put((byte)0);
- }
- catch (CharacterCodingException e)
- {
- JMSException jmse = new JMSException("Unable to encode string: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
- }
+ _typedBytesContentWriter.writeLengthPrefixedUTF(string);
}
public void writeBytes(byte[] bytes) throws JMSException
{
- checkWritable();
- _data.put(bytes);
- _changedData = true;
+ writeBytes(bytes, 0, bytes.length);
}
public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
{
checkWritable();
- _data.put(bytes, offset, length);
- _changedData = true;
+ _typedBytesContentWriter.writeBytesRaw(bytes, offset, length);
}
public void writeObject(Object object) throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
index cb04ebee1b..89561b88eb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
@@ -22,11 +22,12 @@ package org.apache.qpid.client.message;
import javax.jms.JMSException;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import java.nio.ByteBuffer;
+
public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
{
protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
index e295d4a2a0..52c0eb263b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.client.message;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.Enumeration;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
import org.apache.qpid.AMQPInvalidClassException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -282,7 +285,7 @@ public final class JMSHeaderAdapter
s = String.valueOf(o);
}
}
- }//else return s // null;
+ }//else return s // null;
}
return s;
@@ -458,9 +461,29 @@ public final class JMSHeaderAdapter
return getHeaders().isEmpty();
}
- public void writeToBuffer(ByteBuffer data)
+ public void writeToBuffer(final ByteBuffer data)
{
- getHeaders().writeToBuffer(data);
+ try
+ {
+ getHeaders().writeToBuffer(new DataOutputStream(new OutputStream()
+ {
+ @Override
+ public void write(final int b)
+ {
+ data.put((byte)b);
+ }
+
+ @Override
+ public void write(final byte[] b, final int off, final int len)
+ {
+ data.put(b, off, len);
+ }
+ }));
+ }
+ catch (IOException e)
+ {
+ throw new IllegalArgumentException("Unexpected IO Exception - should never happen", e);
+ }
}
public Enumeration getMapNames()
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
index 306ffeeadf..fad24a968e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
@@ -20,11 +20,8 @@
*/
package org.apache.qpid.client.message;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,13 +29,14 @@ import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
+import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
-public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage
+public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMessage
{
private static final Logger _logger = LoggerFactory.getLogger(JMSMapMessage.class);
@@ -54,10 +52,10 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
JMSMapMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException
{
- super(delegateFactory, data); // this instantiates a content header
+ super(delegateFactory, data!=null); // this instantiates a content header
if(data != null)
{
- populateMapFromData();
+ populateMapFromData(data);
}
}
@@ -65,10 +63,10 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
JMSMapMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- super(delegate, data);
+ super(delegate, data != null);
try
{
- populateMapFromData();
+ populateMapFromData(data);
}
catch (JMSException je)
{
@@ -89,18 +87,10 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
return MIME_TYPE;
}
- public ByteBuffer getData()
- {
- // What if _data is null?
- writeMapToData();
-
- return super.getData();
- }
-
@Override
- public void clearBodyImpl() throws JMSException
+ public void clearBody() throws JMSException
{
- super.clearBodyImpl();
+ super.clearBody();
_map.clear();
}
@@ -458,17 +448,18 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
return _map.containsKey(propName);
}
- protected void populateMapFromData() throws JMSException
+ protected void populateMapFromData(ByteBuffer data) throws JMSException
{
- if (_data != null)
+ TypedBytesContentReader reader = new TypedBytesContentReader(data);
+ if (data != null)
{
- _data.rewind();
+ data.rewind();
- final int entries = readIntImpl();
+ final int entries = reader.readIntImpl();
for (int i = 0; i < entries; i++)
{
- String propName = readStringImpl();
- Object value = readObject();
+ String propName = reader.readStringImpl();
+ Object value = reader.readObject();
_map.put(propName, value);
}
}
@@ -478,35 +469,21 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
}
}
- protected void writeMapToData()
+ public ByteBuffer getData()
+ throws JMSException
{
- allocateInitialBuffer();
+ TypedBytesContentWriter writer = new TypedBytesContentWriter();
+
final int size = _map.size();
- writeIntImpl(size);
+ writer.writeIntImpl(size);
for (Map.Entry<String, Object> entry : _map.entrySet())
{
- try
- {
- writeStringImpl(entry.getKey());
- }
- catch (CharacterCodingException e)
- {
- throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(), e);
-
- }
+ writer.writeNullTerminatedStringImpl(entry.getKey());
- try
- {
- writeObject(entry.getValue());
- }
- catch (JMSException e)
- {
- Object value = entry.getValue();
- throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() + " value : " + value
- + " (type: " + value.getClass().getName() + ").", e);
- }
+ writer.writeObject(entry.getValue());
}
+ return writer.getData();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
index eccb90560b..89408a5c3c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
@@ -14,18 +14,16 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.client.message;
import javax.jms.JMSException;
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSMapMessageFactory extends AbstractJMSMessageFactory
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
index 370e2d6c55..c981c951c3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
@@ -20,27 +20,28 @@
*/
package org.apache.qpid.client.message;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
+import java.io.*;
+import java.nio.ByteBuffer;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.ObjectMessage;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.client.util.ClassLoadingAwareObjectInputStream;
public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
{
public static final String MIME_TYPE = "application/java-object-stream";
+ private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 256;
+
+ private Serializable _readData;
+ private ByteBuffer _data;
+ private Exception _exception;
+
+ private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
- private static final int DEFAULT_BUFFER_SIZE = 1024;
/**
* Creates empty, writable message for use by producers
@@ -48,41 +49,57 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
*/
public JMSObjectMessage(AMQMessageDelegateFactory delegateFactory)
{
- this(delegateFactory, null);
- }
-
- private JMSObjectMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
- {
- super(delegateFactory, data);
- if (data == null)
- {
- _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
- _data.setAutoExpand(true);
- }
-
- setContentType(getMimeType());
+ super(delegateFactory, false);
}
/**
* Creates read only message for delivery to consumers
*/
- JMSObjectMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+ JMSObjectMessage(AMQMessageDelegate delegate, final ByteBuffer data) throws AMQException
{
- super(delegate, data);
+ super(delegate, data!=null);
+
+ try
+ {
+ ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new InputStream()
+ {
+
+
+ @Override
+ public int read() throws IOException
+ {
+ return data.get();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ len = data.remaining() < len ? data.remaining() : len;
+ data.get(b, off, len);
+ return len;
+ }
+ });
+
+ _readData = (Serializable) in.readObject();
+ }
+ catch (IOException e)
+ {
+ _exception = e;
+ }
+ catch (ClassNotFoundException e)
+ {
+ _exception = e;
+ }
}
- public void clearBodyImpl() throws JMSException
+ public void clearBody() throws JMSException
{
- if (_data != null)
- {
- _data.release();
- _data = null;
- }
-
-
-
+ super.clearBody();
+ _exception = null;
+ _readData = null;
+ _data = null;
}
public String toBodyString() throws JMSException
@@ -95,83 +112,116 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
return MIME_TYPE;
}
- public void setObject(Serializable serializable) throws JMSException
+ @Override
+ public ByteBuffer getData() throws JMSException
{
- checkWritable();
-
- if (_data == null)
+ if(_exception != null)
+ {
+ final MessageFormatException messageFormatException =
+ new MessageFormatException("Unable to deserialize message");
+ messageFormatException.setLinkedException(_exception);
+ throw messageFormatException;
+ }
+ if(_readData == null)
{
- _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
- _data.setAutoExpand(true);
+
+ return _data == null ? EMPTY_BYTE_BUFFER : _data.duplicate();
}
else
{
- _data.rewind();
+ try
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(DEFAULT_OUTPUT_BUFFER_SIZE);
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(_readData);
+ oos.flush();
+ return ByteBuffer.wrap(baos.toByteArray());
+ }
+ catch (IOException e)
+ {
+ final JMSException jmsException = new JMSException("Unable to encode object of type: " +
+ _readData.getClass().getName() + ", value " + _readData);
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
}
+ }
+
+ public void setObject(Serializable serializable) throws JMSException
+ {
+ checkWritable();
+ clearBody();
try
{
- ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream());
- out.writeObject(serializable);
- out.flush();
- out.close();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(DEFAULT_OUTPUT_BUFFER_SIZE);
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(serializable);
+ oos.flush();
+ _data = ByteBuffer.wrap(baos.toByteArray());
}
catch (IOException e)
{
- MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e);
- mfe.setLinkedException(e);
- mfe.initCause(e);
- throw mfe;
+ final JMSException jmsException = new JMSException("Unable to encode object of type: " +
+ serializable.getClass().getName() + ", value " + serializable);
+ jmsException.setLinkedException(e);
+ throw jmsException;
}
}
public Serializable getObject() throws JMSException
{
- ObjectInputStream in = null;
- if (_data == null)
+ if(_exception != null)
{
- return null;
+ final MessageFormatException messageFormatException = new MessageFormatException("Unable to deserialize message");
+ messageFormatException.setLinkedException(_exception);
+ throw messageFormatException;
}
-
- try
+ else if(_readData != null || _data == null)
{
- _data.rewind();
- in = new ClassLoadingAwareObjectInputStream(_data.asInputStream());
-
- return (Serializable) in.readObject();
+ return _readData;
}
- catch (IOException e)
- {
- MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
- mfe.setLinkedException(e);
- mfe.initCause(e);
- throw mfe;
- }
- catch (ClassNotFoundException e)
- {
- MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
- mfe.setLinkedException(e);
- mfe.initCause(e);
- throw mfe;
- }
- finally
+ else
{
- // _data.rewind();
- close(in);
- }
- }
+ Exception exception = null;
- private static void close(InputStream in)
- {
- try
- {
- if (in != null)
+ final ByteBuffer data = _data.duplicate();
+ try
+ {
+ ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new InputStream()
+ {
+ @Override
+ public int read() throws IOException
+ {
+ return data.get();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ len = data.remaining() < len ? data.remaining() : len;
+ data.get(b, off, len);
+ return len;
+ }
+ });
+
+ return (Serializable) in.readObject();
+ }
+ catch (ClassNotFoundException e)
+ {
+ exception = e;
+ }
+ catch (IOException e)
{
- in.close();
+ exception = e;
}
+
+ JMSException jmsException = new JMSException("Could not deserialize object");
+ jmsException.setLinkedException(exception);
+ throw jmsException;
}
- catch (IOException ignore)
- { }
+
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
index 03851dfa01..4660c91c1f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,10 +22,8 @@ package org.apache.qpid.client.message;
import javax.jms.JMSException;
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
index ad2620852b..5c93f6b6f0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -23,7 +23,8 @@ package org.apache.qpid.client.message;
import javax.jms.JMSException;
import javax.jms.StreamMessage;
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -36,65 +37,76 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea
public static final String MIME_TYPE="jms/stream-message";
-
- /**
- * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
- * a byte array in multiple chunks, hence this is used to track how much is left to be read
- */
- private int _byteArrayRemaining = -1;
+ private TypedBytesContentReader _typedBytesContentReader;
+ private TypedBytesContentWriter _typedBytesContentWriter;
public JMSStreamMessage(AMQMessageDelegateFactory delegateFactory)
{
- this(delegateFactory,null);
+ super(delegateFactory,false);
+ _typedBytesContentWriter = new TypedBytesContentWriter();
}
- /**
- * Construct a stream message with existing data.
- *
- * @param delegateFactory
- * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
- */
- JMSStreamMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
- {
- super(delegateFactory, data); // this instanties a content header
- }
JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
-
- super(delegate, data);
+ super(delegate, data!=null);
+ _typedBytesContentReader = new TypedBytesContentReader(data);
}
-
public void reset()
{
- super.reset();
_readableMessage = true;
+
+ if(_typedBytesContentReader != null)
+ {
+ _typedBytesContentReader.reset();
+ }
+ else if (_typedBytesContentWriter != null)
+ {
+ _typedBytesContentReader = new TypedBytesContentReader(_typedBytesContentWriter.getData());
+ }
+ }
+
+ @Override
+ public void clearBody() throws JMSException
+ {
+ super.clearBody();
+ _typedBytesContentReader = null;
+ _typedBytesContentWriter = new TypedBytesContentWriter();
+
}
+
protected String getMimeType()
{
return MIME_TYPE;
}
-
+ @Override
+ public java.nio.ByteBuffer getData() throws JMSException
+ {
+ return _typedBytesContentWriter == null ? _typedBytesContentReader.getData() : _typedBytesContentWriter.getData();
+ }
public boolean readBoolean() throws JMSException
{
- return super.readBoolean();
+ checkReadable();
+ return _typedBytesContentReader.readBoolean();
}
public byte readByte() throws JMSException
{
- return super.readByte();
+ checkReadable();
+ return _typedBytesContentReader.readByte();
}
public short readShort() throws JMSException
{
- return super.readShort();
+ checkReadable();
+ return _typedBytesContentReader.readShort();
}
/**
@@ -105,102 +117,127 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea
*/
public char readChar() throws JMSException
{
- return super.readChar();
+ checkReadable();
+ return _typedBytesContentReader.readChar();
}
public int readInt() throws JMSException
{
- return super.readInt();
+ checkReadable();
+ return _typedBytesContentReader.readInt();
}
public long readLong() throws JMSException
{
- return super.readLong();
+ checkReadable();
+ return _typedBytesContentReader.readLong();
}
public float readFloat() throws JMSException
{
- return super.readFloat();
+ checkReadable();
+ return _typedBytesContentReader.readFloat();
}
public double readDouble() throws JMSException
{
- return super.readDouble();
+ checkReadable();
+ return _typedBytesContentReader.readDouble();
}
public String readString() throws JMSException
{
- return super.readString();
+ checkReadable();
+ return _typedBytesContentReader.readString();
}
public int readBytes(byte[] bytes) throws JMSException
{
- return super.readBytes(bytes);
+ if(bytes == null)
+ {
+ throw new IllegalArgumentException("Must provide non-null array to read into");
+ }
+
+ checkReadable();
+ return _typedBytesContentReader.readBytes(bytes);
}
public Object readObject() throws JMSException
{
- return super.readObject();
+ checkReadable();
+ return _typedBytesContentReader.readObject();
}
public void writeBoolean(boolean b) throws JMSException
{
- super.writeBoolean(b);
+ checkWritable();
+ _typedBytesContentWriter.writeBoolean(b);
}
public void writeByte(byte b) throws JMSException
{
- super.writeByte(b);
+ checkWritable();
+ _typedBytesContentWriter.writeByte(b);
}
public void writeShort(short i) throws JMSException
{
- super.writeShort(i);
+ checkWritable();
+ _typedBytesContentWriter.writeShort(i);
}
public void writeChar(char c) throws JMSException
{
- super.writeChar(c);
+ checkWritable();
+ _typedBytesContentWriter.writeChar(c);
}
public void writeInt(int i) throws JMSException
{
- super.writeInt(i);
+ checkWritable();
+ _typedBytesContentWriter.writeInt(i);
}
public void writeLong(long l) throws JMSException
{
- super.writeLong(l);
+ checkWritable();
+ _typedBytesContentWriter.writeLong(l);
}
public void writeFloat(float v) throws JMSException
{
- super.writeFloat(v);
+ checkWritable();
+ _typedBytesContentWriter.writeFloat(v);
}
public void writeDouble(double v) throws JMSException
{
- super.writeDouble(v);
+ checkWritable();
+ _typedBytesContentWriter.writeDouble(v);
}
public void writeString(String string) throws JMSException
{
- super.writeString(string);
+ checkWritable();
+ _typedBytesContentWriter.writeString(string);
}
public void writeBytes(byte[] bytes) throws JMSException
{
- super.writeBytes(bytes);
+ checkWritable();
+ _typedBytesContentWriter.writeBytes(bytes);
}
public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
{
- super.writeBytes(bytes,offset,length);
+ checkWritable();
+ _typedBytesContentWriter.writeBytes(bytes, offset, length);
}
public void writeObject(Object object) throws JMSException
{
- super.writeObject(object);
+ checkWritable();
+ _typedBytesContentWriter.writeObject(object);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
index 5e25db9ae0..359f5157f3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
@@ -22,10 +22,9 @@ package org.apache.qpid.client.message;
import javax.jms.JMSException;
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
index fc2006a119..acf3a0ca14 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
@@ -20,15 +20,21 @@
*/
package org.apache.qpid.client.message;
+import java.io.DataInputStream;
import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.util.Strings;
@@ -37,6 +43,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
{
private static final String MIME_TYPE = "text/plain";
+ private Exception _exception;
private String _decodedValue;
/**
@@ -45,36 +52,41 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
private static final String PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.toString();
private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
- public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
- {
- this(delegateFactory, null, null);
- }
+ private CharsetDecoder _decoder = DEFAULT_CHARSET.newDecoder();
+ private CharsetEncoder _encoder = DEFAULT_CHARSET.newEncoder();
+
+ private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
- JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data, String encoding) throws JMSException
+ public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
- super(delegateFactory, data); // this instantiates a content header
- setContentType(getMimeType());
- setEncoding(encoding);
+ super(delegateFactory, false); // this instantiates a content header
}
JMSTextMessage(AMQMessageDelegate delegate, ByteBuffer data)
throws AMQException
{
- super(delegate, data);
- setContentType(getMimeType());
- _data = data;
- }
+ super(delegate, data!=null);
-
- public void clearBodyImpl() throws JMSException
- {
- if (_data != null)
+ try
{
- _data.release();
- _data = null;
+ if(propertyExists(PAYLOAD_NULL_PROPERTY))
+ {
+ _decodedValue = null;
+ }
+ else
+ {
+ _decodedValue = _decoder.decode(data).toString();
+ }
+ }
+ catch (CharacterCodingException e)
+ {
+ _exception = e;
+ }
+ catch (JMSException e)
+ {
+ _exception = e;
}
- _decodedValue = null;
}
public String toBodyString() throws JMSException
@@ -87,95 +99,62 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
return MIME_TYPE;
}
- public void setText(String text) throws JMSException
+ @Override
+ public ByteBuffer getData() throws JMSException
{
- checkWritable();
-
- clearBody();
+ _encoder.reset();
try
{
- if (text != null)
+ if(_exception != null)
+ {
+ final MessageFormatException messageFormatException = new MessageFormatException("Cannot decode original message");
+ messageFormatException.setLinkedException(_exception);
+ throw messageFormatException;
+ }
+ else if(_decodedValue == null)
+ {
+ return EMPTY_BYTE_BUFFER;
+ }
+ else
{
- final String encoding = getEncoding();
- if (encoding == null || encoding.equalsIgnoreCase("UTF-8"))
- {
- _data = ByteBuffer.wrap(Strings.toUTF8(text));
- setEncoding("UTF-8");
- }
- else
- {
- _data = ByteBuffer.wrap(text.getBytes(encoding));
- }
- _data.position(_data.limit());
- _changedData=true;
+ return _encoder.encode(CharBuffer.wrap(_decodedValue));
}
- _decodedValue = text;
}
- catch (UnsupportedEncodingException e)
+ catch (CharacterCodingException e)
{
- // should never occur
- JMSException jmse = new JMSException("Unable to decode text data");
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
+ final JMSException jmsException = new JMSException("Cannot encode string in UFT-8: " + _decodedValue);
+ jmsException.setLinkedException(e);
+ throw jmsException;
}
}
- public String getText() throws JMSException
+ @Override
+ public void clearBody() throws JMSException
{
- if (_data == null && _decodedValue == null)
- {
- return null;
- }
- else if (_decodedValue != null)
- {
- return _decodedValue;
- }
- else
- {
- _data.rewind();
+ super.clearBody();
+ _decodedValue = null;
+ _exception = null;
+ }
- if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY))
- {
- return null;
- }
- if (getEncoding() != null)
- {
- try
- {
- _decodedValue = _data.getString(Charset.forName(getEncoding()).newDecoder());
- }
- catch (CharacterCodingException e)
- {
- JMSException jmse = new JMSException("Could not decode string data: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
- }
- }
- else
- {
- try
- {
- _decodedValue = _data.getString(DEFAULT_CHARSET.newDecoder());
- }
- catch (CharacterCodingException e)
- {
- JMSException jmse = new JMSException("Could not decode string data: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
- }
- }
- return _decodedValue;
- }
+ public void setText(String text) throws JMSException
+ {
+ checkWritable();
+
+ clearBody();
+ _decodedValue = text;
+
+ }
+
+ public String getText() throws JMSException
+ {
+ return _decodedValue;
}
@Override
public void prepareForSending() throws JMSException
{
super.prepareForSending();
- if (_data == null)
+ if (_decodedValue == null)
{
setBooleanProperty(PAYLOAD_NULL_PROPERTY, true);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
index 1f4d64c78f..d1af32c10a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
@@ -22,7 +22,7 @@ package org.apache.qpid.client.message;
import javax.jms.JMSException;
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java b/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java
new file mode 100644
index 0000000000..26a0b41cdc
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+public interface TypedBytesCodes
+{
+ static final byte BOOLEAN_TYPE = (byte) 1;
+
+ static final byte BYTE_TYPE = (byte) 2;
+
+ static final byte BYTEARRAY_TYPE = (byte) 3;
+
+ static final byte SHORT_TYPE = (byte) 4;
+
+ static final byte CHAR_TYPE = (byte) 5;
+
+ static final byte INT_TYPE = (byte) 6;
+
+ static final byte LONG_TYPE = (byte) 7;
+
+ static final byte FLOAT_TYPE = (byte) 8;
+
+ static final byte DOUBLE_TYPE = (byte) 9;
+
+ static final byte STRING_TYPE = (byte) 10;
+
+ static final byte NULL_STRING_TYPE = (byte) 11;
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java b/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java
new file mode 100644
index 0000000000..1ae25eb1ed
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java
@@ -0,0 +1,674 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+
+class TypedBytesContentReader implements TypedBytesCodes
+{
+
+ private final ByteBuffer _data;
+ private final int _position;
+ private final int _limit;
+
+
+ private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
+ private final CharsetDecoder _charsetDecoder = UTF8_CHARSET.newDecoder();
+
+ private int _byteArrayRemaining = -1;
+
+
+ public TypedBytesContentReader(final ByteBuffer data)
+ {
+ _data = data.duplicate();
+ _position = _data.position();
+ _limit = _data.limit();
+ }
+
+ /**
+ * Check that there is at least a certain number of bytes available to read
+ *
+ * @param len the number of bytes
+ * @throws javax.jms.MessageEOFException if there are less than len bytes available to read
+ */
+ protected void checkAvailable(int len) throws MessageEOFException
+ {
+ if (_data.remaining() < len)
+ {
+ throw new MessageEOFException("Unable to read " + len + " bytes");
+ }
+ }
+
+ protected byte readWireType() throws MessageFormatException, MessageEOFException,
+ MessageNotReadableException
+ {
+ checkAvailable(1);
+ return _data.get();
+ }
+
+ protected boolean readBoolean() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ boolean result;
+ try
+ {
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Boolean.parseBoolean(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ boolean readBooleanImpl()
+ {
+ return _data.get() != 0;
+ }
+
+ protected byte readByte() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ byte result;
+ try
+ {
+ switch (wireType)
+ {
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Byte.parseByte(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ return result;
+ }
+
+ byte readByteImpl()
+ {
+ return _data.get();
+ }
+
+ protected short readShort() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ short result;
+ try
+ {
+ switch (wireType)
+ {
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Short.parseShort(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ return result;
+ }
+
+ short readShortImpl()
+ {
+ return _data.getShort();
+ }
+
+ /**
+ * Note that this method reads a unicode character as two bytes from the stream
+ *
+ * @return the character read from the stream
+ * @throws javax.jms.JMSException
+ */
+ protected char readChar() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ try
+ {
+ if (wireType == NULL_STRING_TYPE)
+ {
+ throw new NullPointerException();
+ }
+
+ if (wireType != CHAR_TYPE)
+ {
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+ }
+ else
+ {
+ checkAvailable(2);
+ return readCharImpl();
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ char readCharImpl()
+ {
+ return _data.getChar();
+ }
+
+ protected int readInt() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ int result;
+ try
+ {
+ switch (wireType)
+ {
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Integer.parseInt(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ protected int readIntImpl()
+ {
+ return _data.getInt();
+ }
+
+ protected long readLong() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ long result;
+ try
+ {
+ switch (wireType)
+ {
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Long.parseLong(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ long readLongImpl()
+ {
+ return _data.getLong();
+ }
+
+ protected float readFloat() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ float result;
+ try
+ {
+ switch (wireType)
+ {
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Float.parseFloat(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ float readFloatImpl()
+ {
+ return _data.getFloat();
+ }
+
+ protected double readDouble() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ double result;
+ try
+ {
+ switch (wireType)
+ {
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Double.parseDouble(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ double readDoubleImpl()
+ {
+ return _data.getDouble();
+ }
+
+ protected String readString() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ String result;
+ try
+ {
+ switch (wireType)
+ {
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ case NULL_STRING_TYPE:
+ result = null;
+ throw new NullPointerException("data is null");
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readBooleanImpl());
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readLongImpl());
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readIntImpl());
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readShortImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readByteImpl());
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readFloatImpl());
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readDoubleImpl());
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readCharImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ protected String readStringImpl() throws JMSException
+ {
+ try
+ {
+ _charsetDecoder.reset();
+ ByteBuffer dup = _data.duplicate();
+ int pos = _data.position();
+ byte b;
+ while((b = _data.get()) != 0);
+ dup.limit(_data.position()-1);
+ return _charsetDecoder.decode(dup).toString();
+
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ throw jmse;
+ }
+ }
+
+ protected int readBytes(byte[] bytes) throws JMSException
+ {
+ if (bytes == null)
+ {
+ throw new IllegalArgumentException("byte array must not be null");
+ }
+ // first call
+ if (_byteArrayRemaining == -1)
+ {
+ // type discriminator checked separately so you get a MessageFormatException rather than
+ // an EOF even in the case where both would be applicable
+ checkAvailable(1);
+ byte wireType = readWireType();
+ if (wireType != BYTEARRAY_TYPE)
+ {
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
+ }
+ checkAvailable(4);
+ int size = _data.getInt();
+ // length of -1 indicates null
+ if (size == -1)
+ {
+ return -1;
+ }
+ else
+ {
+ if (size > _data.remaining())
+ {
+ throw new MessageEOFException("Byte array has stated length "
+ + size
+ + " but message only contains "
+ +
+ _data.remaining()
+ + " bytes");
+ }
+ else
+ {
+ _byteArrayRemaining = size;
+ }
+ }
+ }
+ else if (_byteArrayRemaining == 0)
+ {
+ _byteArrayRemaining = -1;
+ return -1;
+ }
+
+ int returnedSize = readBytesImpl(bytes);
+ if (returnedSize < bytes.length)
+ {
+ _byteArrayRemaining = -1;
+ }
+ return returnedSize;
+ }
+
+ private int readBytesImpl(byte[] bytes)
+ {
+ int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
+ _byteArrayRemaining -= count;
+
+ if (count == 0)
+ {
+ return 0;
+ }
+ else
+ {
+ _data.get(bytes, 0, count);
+ return count;
+ }
+ }
+
+ protected Object readObject() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ Object result = null;
+ try
+ {
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case BYTEARRAY_TYPE:
+ checkAvailable(4);
+ int size = _data.getInt();
+ if (size == -1)
+ {
+ result = null;
+ }
+ else
+ {
+ _byteArrayRemaining = size;
+ byte[] bytesResult = new byte[size];
+ readBytesImpl(bytesResult);
+ result = bytesResult;
+ }
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = readCharImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case NULL_STRING_TYPE:
+ result = null;
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ public void reset()
+ {
+ _byteArrayRemaining = -1;
+ _data.position(_position);
+ _data.limit(_limit);
+ }
+
+ public ByteBuffer getData()
+ {
+ ByteBuffer buf = _data.duplicate();
+ buf.position(_position);
+ buf.limit(_limit);
+ return buf;
+ }
+
+ public long size()
+ {
+ return _limit - _position;
+ }
+
+ public int remaining()
+ {
+ return _data.remaining();
+ }
+
+ public void readRawBytes(final byte[] bytes, final int offset, final int count)
+ {
+ _data.get(bytes, offset, count);
+ }
+
+ public String readLengthPrefixedUTF() throws JMSException
+ {
+ try
+ {
+ short length = readShortImpl();
+ if(length == 0)
+ {
+ return "";
+ }
+ else
+ {
+ _charsetDecoder.reset();
+ ByteBuffer encodedString = _data.slice();
+ encodedString.limit(length);
+ _data.position(_data.position()+length);
+ CharBuffer string = _charsetDecoder.decode(encodedString);
+
+ return string.toString();
+ }
+ }
+ catch(CharacterCodingException e)
+ {
+ JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ throw jmse;
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java b/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java
new file mode 100644
index 0000000000..7c91db3a32
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java
@@ -0,0 +1,370 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+
+class TypedBytesContentWriter implements TypedBytesCodes
+{
+ private final ByteArrayOutputStream _baos = new ByteArrayOutputStream();
+ private final DataOutputStream _data = new DataOutputStream(_baos);
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+
+ protected void writeTypeDiscriminator(byte type) throws JMSException
+ {
+ try
+ {
+ _data.writeByte(type);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ private JMSException handle(final IOException e)
+ {
+ JMSException jmsEx = new JMSException("Unable to write value: " + e.getMessage());
+ jmsEx.setLinkedException(e);
+ return jmsEx;
+ }
+
+
+ protected void writeBoolean(boolean b) throws JMSException
+ {
+ writeTypeDiscriminator(BOOLEAN_TYPE);
+ writeBooleanImpl(b);
+ }
+
+ public void writeBooleanImpl(final boolean b) throws JMSException
+ {
+ try
+ {
+ _data.writeByte(b ? (byte) 1 : (byte) 0);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ protected void writeByte(byte b) throws JMSException
+ {
+ writeTypeDiscriminator(BYTE_TYPE);
+ writeByteImpl(b);
+ }
+
+ public void writeByteImpl(final byte b) throws JMSException
+ {
+ try
+ {
+ _data.writeByte(b);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ protected void writeShort(short i) throws JMSException
+ {
+ writeTypeDiscriminator(SHORT_TYPE);
+ writeShortImpl(i);
+ }
+
+ public void writeShortImpl(final short i) throws JMSException
+ {
+ try
+ {
+ _data.writeShort(i);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ protected void writeChar(char c) throws JMSException
+ {
+ writeTypeDiscriminator(CHAR_TYPE);
+ writeCharImpl(c);
+ }
+
+ public void writeCharImpl(final char c) throws JMSException
+ {
+ try
+ {
+ _data.writeChar(c);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ protected void writeInt(int i) throws JMSException
+ {
+ writeTypeDiscriminator(INT_TYPE);
+ writeIntImpl(i);
+ }
+
+ protected void writeIntImpl(int i) throws JMSException
+ {
+ try
+ {
+ _data.writeInt(i);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ protected void writeLong(long l) throws JMSException
+ {
+ writeTypeDiscriminator(LONG_TYPE);
+ writeLongImpl(l);
+ }
+
+ public void writeLongImpl(final long l) throws JMSException
+ {
+ try
+ {
+ _data.writeLong(l);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ protected void writeFloat(float v) throws JMSException
+ {
+ writeTypeDiscriminator(FLOAT_TYPE);
+ writeFloatImpl(v);
+ }
+
+ public void writeFloatImpl(final float v) throws JMSException
+ {
+ try
+ {
+ _data.writeFloat(v);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ protected void writeDouble(double v) throws JMSException
+ {
+ writeTypeDiscriminator(DOUBLE_TYPE);
+ writeDoubleImpl(v);
+ }
+
+ public void writeDoubleImpl(final double v) throws JMSException
+ {
+ try
+ {
+ _data.writeDouble(v);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ protected void writeString(String string) throws JMSException
+ {
+ if (string == null)
+ {
+ writeTypeDiscriminator(NULL_STRING_TYPE);
+ }
+ else
+ {
+ writeTypeDiscriminator(STRING_TYPE);
+ writeNullTerminatedStringImpl(string);
+ }
+ }
+
+ protected void writeNullTerminatedStringImpl(String string)
+ throws JMSException
+ {
+ try
+ {
+ _data.write(string.getBytes(UTF8));
+ _data.writeByte((byte) 0);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+
+ }
+
+ protected void writeBytes(byte[] bytes) throws JMSException
+ {
+ writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
+ }
+
+ protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException
+ {
+ writeTypeDiscriminator(BYTEARRAY_TYPE);
+ writeBytesImpl(bytes, offset, length);
+ }
+
+ public void writeBytesImpl(final byte[] bytes, final int offset, final int length) throws JMSException
+ {
+ try
+ {
+ if (bytes == null)
+ {
+ _data.writeInt(-1);
+ }
+ else
+ {
+ _data.writeInt(length);
+ _data.write(bytes, offset, length);
+ }
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ public void writeBytesRaw(final byte[] bytes, final int offset, final int length) throws JMSException
+ {
+ try
+ {
+ if (bytes != null)
+ {
+ _data.write(bytes, offset, length);
+ }
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+
+ protected void writeObject(Object object) throws JMSException
+ {
+ Class clazz;
+
+ if (object == null)
+ {
+ // string handles the output of null values
+ clazz = String.class;
+ }
+ else
+ {
+ clazz = object.getClass();
+ }
+
+ if (clazz == Byte.class)
+ {
+ writeByte((Byte) object);
+ }
+ else if (clazz == Boolean.class)
+ {
+ writeBoolean((Boolean) object);
+ }
+ else if (clazz == byte[].class)
+ {
+ writeBytes((byte[]) object);
+ }
+ else if (clazz == Short.class)
+ {
+ writeShort((Short) object);
+ }
+ else if (clazz == Character.class)
+ {
+ writeChar((Character) object);
+ }
+ else if (clazz == Integer.class)
+ {
+ writeInt((Integer) object);
+ }
+ else if (clazz == Long.class)
+ {
+ writeLong((Long) object);
+ }
+ else if (clazz == Float.class)
+ {
+ writeFloat((Float) object);
+ }
+ else if (clazz == Double.class)
+ {
+ writeDouble((Double) object);
+ }
+ else if (clazz == String.class)
+ {
+ writeString((String) object);
+ }
+ else
+ {
+ throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
+ }
+ }
+
+ public ByteBuffer getData()
+ {
+ return ByteBuffer.wrap(_baos.toByteArray());
+ }
+
+ public void writeLengthPrefixedUTF(final String string) throws JMSException
+ {
+ try
+ {
+ CharsetEncoder encoder = UTF8.newEncoder();
+ java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string));
+
+ writeShortImpl((short) encodedString.limit());
+ while(encodedString.hasRemaining())
+ {
+ _data.writeByte(encodedString.get());
+ }
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException jmse = new JMSException("Unable to encode string: " + e);
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ throw jmse;
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
index 685e646d85..ce87a112c9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
@@ -87,9 +87,9 @@ public class UnprocessedMessage_0_8 extends UnprocessedMessage
public void receiveBody(ContentBody body)
{
- if (body.payload != null)
+ if (body._payload != null)
{
- final long payloadSize = body.payload.remaining();
+ final long payloadSize = body._payload.length;
if (_bodies == null)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 6d6cd9cae5..624cf67593 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.client.protocol;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -524,7 +526,7 @@ public class AMQProtocolHandler implements ProtocolEngine
public synchronized void writeFrame(AMQDataBlock frame, boolean wait)
{
- final ByteBuffer buf = frame.toNioByteBuffer();
+ final ByteBuffer buf = asByteBuffer(frame);
_writtenBytes += buf.remaining();
_sender.send(buf);
_sender.flush();
@@ -547,6 +549,39 @@ public class AMQProtocolHandler implements ProtocolEngine
}
+ private ByteBuffer asByteBuffer(AMQDataBlock block)
+ {
+ final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+
+ try
+ {
+ block.writePayload(new DataOutputStream(new OutputStream()
+ {
+
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ buf.put((byte) b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ buf.put(b, off, len);
+ }
+ }));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ buf.flip();
+ return buf;
+ }
+
+
/**
* Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
* calling getProtocolSession().write() then waiting for the response.
diff --git a/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java b/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java
index 93a4fb39f5..a12e4ce977 100644
--- a/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.client.util;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.ObjectOutputStream;
-import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.List;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.test.utils.QpidTestCase;
public class ClassLoadingAwareObjectInputStreamTest extends QpidTestCase
@@ -37,16 +37,15 @@ public class ClassLoadingAwareObjectInputStreamTest extends QpidTestCase
protected void setUp() throws Exception
{
//Create a viable input stream for instantiating the CLA OIS
- ByteBuffer buf = ByteBuffer.allocate(10);
- buf.setAutoExpand(true);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(buf.asOutputStream());
+ ObjectOutputStream out = new ObjectOutputStream(baos);
out.writeObject("testString");
out.flush();
out.close();
- buf.rewind();
- _in = buf.asInputStream();
+
+ _in = new ByteArrayInputStream(baos.toByteArray());
_claOIS = new ClassLoadingAwareObjectInputStream(_in);
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java
index 177dccea7e..e37970e9a2 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java
@@ -45,8 +45,6 @@ public class ObjectMessageUnitTest extends QpidTestCase
_om.setObject(true);
//make the message readable
- _om.reset();
-
Object object = _om.getObject();
assertTrue("Unexpected type returned", object instanceof Boolean);
@@ -61,8 +59,6 @@ public class ObjectMessageUnitTest extends QpidTestCase
_om.setObject("test string");
//make the message readable
- _om.reset();
-
Object object = _om.getObject();
assertTrue("Unexpected type returned", object instanceof String);
@@ -87,7 +83,6 @@ public class ObjectMessageUnitTest extends QpidTestCase
list.add(0);
//make the message readable
- _om.reset();
//retrieve the Object
Object object = _om.getObject();