diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2011-09-09 17:47:22 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2011-09-09 17:47:22 +0000 |
| commit | 5270591c7831e559925e720c6bfc0c78c514b95a (patch) | |
| tree | 8ce7aabb2f5a4a87c4c53b3e8f810c62392eba11 /java/client/src | |
| parent | 282e16aab532d842dffad3935bfd1a952bc584be (diff) | |
| download | qpid-python-5270591c7831e559925e720c6bfc0c78c514b95a.tar.gz | |
QPID-2627 : Remove dependency on MINA
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1167311 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
28 files changed, 1636 insertions, 1385 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 5821fee7ff..d739903ee6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -201,8 +201,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT)); } } - - messageProps.setContentLength(message.getContentLength()); + + ByteBuffer data = message.getData(); + messageProps.setContentLength(data.remaining()); // send the message try @@ -221,8 +222,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer boolean unreliable = (destination.getDestSyntax() == DestSyntax.ADDR) && (destination.getLink().getReliability() == Reliability.UNRELIABLE); - org.apache.mina.common.ByteBuffer data = message.getData(); - ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice(); + + ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.slice(); ssn.messageTransfer(destination.getExchangeName() == null ? "" : destination.getExchangeName().toString(), MessageAcceptMode.NONE, diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 27f7486890..26e9814e33 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -27,14 +27,13 @@ import javax.jms.Message; import javax.jms.Topic; import javax.jms.Queue; -import org.apache.mina.common.ByteBuffer; +import java.nio.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.AMQMessageDelegate; import org.apache.qpid.client.message.AMQMessageDelegate_0_8; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.CompositeAMQDataBlock; @@ -186,7 +185,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer if (frames.length == (offset + 1)) { - frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload)); + byte[] data = new byte[payload.remaining()]; + payload.get(data); + frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(data)); } else { @@ -198,7 +199,10 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer payload.position((int) framePayloadMax * (i - offset)); int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; payload.limit(payload.position() + length); - frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice())); + byte[] data = new byte[payload.remaining()]; + payload.get(data); + + frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(data)); remaining -= length; } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java index 8c3f2fd08f..e5b95f54f4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java @@ -21,11 +21,6 @@ package org.apache.qpid.client.message; -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.AMQException; - public interface AMQMessageDelegateFactory<D extends AMQMessageDelegate> { public static AMQMessageDelegateFactory DEFAULT_FACTORY = null; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java index cec4268a7b..b9ba946a20 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java @@ -499,7 +499,6 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable"); } - _contentHeaderProperties.updated(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java index 7f735e0722..be71c8c657 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java @@ -24,11 +24,11 @@ package org.apache.qpid.client.message; import java.util.List; import java.util.Map; import java.util.UUID; +import java.nio.ByteBuffer; import javax.jms.JMSException; import javax.jms.MessageFormatException; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.transport.codec.BBEncoder; @@ -81,18 +81,19 @@ public class AMQPEncodedMapMessage extends JMSMapMessage @ Override public ByteBuffer getData() { - writeMapToData(); - return _data; + BBEncoder encoder = new BBEncoder(1024); + encoder.writeMap(_map); + return encoder.segment(); } @ Override - protected void populateMapFromData() throws JMSException + protected void populateMapFromData(ByteBuffer data) throws JMSException { - if (_data != null) + if (data != null) { - _data.rewind(); + data.rewind(); BBDecoder decoder = new BBDecoder(); - decoder.init(_data.buf()); + decoder.init(data); _map = decoder.readMap(); } else @@ -101,14 +102,6 @@ public class AMQPEncodedMapMessage extends JMSMapMessage } } - @ Override - protected void writeMapToData() - { - BBEncoder encoder = new BBEncoder(1024); - encoder.writeMap(_map); - _data = ByteBuffer.wrap(encoder.segment()); - } - // for testing public Map<String,Object> getMap() { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java index 4978d1ce85..2c38f153cb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java @@ -1,6 +1,6 @@ package org.apache.qpid.client.message; /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -8,22 +8,23 @@ package org.apache.qpid.client.message; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ import javax.jms.JMSException; -import org.apache.mina.common.ByteBuffer; +import java.nio.ByteBuffer; + import org.apache.qpid.AMQException; public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory @@ -36,7 +37,7 @@ public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory return new AMQPEncodedMapMessage(delegate,data); } - @Override + public AbstractJMSMessage createMessage( AMQMessageDelegateFactory delegateFactory) throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java deleted file mode 100644 index 3846ee043d..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.message; - -import java.io.IOException; -import java.nio.charset.Charset; - -import javax.jms.JMSException; -import javax.jms.MessageEOFException; - -import org.apache.mina.common.ByteBuffer; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.transport.util.Functions; - -/** - * @author Apache Software Foundation - */ -public abstract class AbstractBytesMessage extends AbstractJMSMessage -{ - - /** - * The default initial size of the buffer. The buffer expands automatically. - */ - private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024; - - AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory) - { - this(delegateFactory, null); - } - - /** - * Construct a bytes message with existing data. - * - * @param delegateFactory - * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - */ - AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) - { - super(delegateFactory, data); // this instanties a content header - setContentType(getMimeType()); - - if (_data == null) - { - allocateInitialBuffer(); - } - } - - protected void allocateInitialBuffer() - { - _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE); - _data.setAutoExpand(true); - } - - AbstractBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException - { - super(delegate, data); - setContentType(getMimeType()); - } - - - public void clearBodyImpl() throws JMSException - { - allocateInitialBuffer(); - } - - public String toBodyString() throws JMSException - { - try - { - if (_data != null) - { - return Functions.str(_data.buf(), 100,0); - } - else - { - return ""; - } - - } - catch (Exception e) - { - JMSException jmse = new JMSException(e.toString()); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - - } - - /** - * Check that there is at least a certain number of bytes available to read - * - * @param len the number of bytes - * @throws javax.jms.MessageEOFException if there are less than len bytes available to read - */ - protected void checkAvailable(int len) throws MessageEOFException - { - if (_data.remaining() < len) - { - throw new MessageEOFException("Unable to read " + len + " bytes"); - } - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java index 85818dcd2b..ddeb62fbf6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java @@ -21,784 +21,96 @@ package org.apache.qpid.client.message; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; +import java.nio.ByteBuffer; import javax.jms.JMSException; -import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.transport.util.Functions; /** * @author Apache Software Foundation */ -public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage +public abstract class AbstractBytesTypedMessage extends AbstractJMSMessage { + protected boolean _readableMessage = false; - protected static final byte BOOLEAN_TYPE = (byte) 1; - - protected static final byte BYTE_TYPE = (byte) 2; - - protected static final byte BYTEARRAY_TYPE = (byte) 3; - - protected static final byte SHORT_TYPE = (byte) 4; - - protected static final byte CHAR_TYPE = (byte) 5; - - protected static final byte INT_TYPE = (byte) 6; - - protected static final byte LONG_TYPE = (byte) 7; - - protected static final byte FLOAT_TYPE = (byte) 8; - - protected static final byte DOUBLE_TYPE = (byte) 9; - - protected static final byte STRING_TYPE = (byte) 10; - - protected static final byte NULL_STRING_TYPE = (byte) 11; - - /** - * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read - * a byte array in multiple chunks, hence this is used to track how much is left to be read - */ - private int _byteArrayRemaining = -1; - - AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory) - { - - this(delegateFactory, null); - } - - /** - * Construct a stream message with existing data. - * - * @param delegateFactory - * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - */ - AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) + AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedMessage) { - super(delegateFactory, data); // this instanties a content header + super(delegateFactory, fromReceivedMessage); // this instanties a content header + _readableMessage = fromReceivedMessage; } - AbstractBytesTypedMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException + AbstractBytesTypedMessage(AMQMessageDelegate delegate, boolean fromReceivedMessage) throws AMQException { - super(delegate, data); - } + super(delegate, fromReceivedMessage); + _readableMessage = fromReceivedMessage; - - protected byte readWireType() throws MessageFormatException, MessageEOFException, - MessageNotReadableException - { - checkReadable(); - checkAvailable(1); - return _data.get(); } - protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException + protected void checkReadable() throws MessageNotReadableException { - checkWritable(); - _data.put(type); - _changedData = true; - } - - protected boolean readBoolean() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - boolean result; - try + if (!_readableMessage) { - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Boolean.parseBoolean(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; + throw new MessageNotReadableException("You need to call reset() to make the message readable"); } } - private boolean readBooleanImpl() + @Override + protected void checkWritable() throws MessageNotWriteableException { - return _data.get() != 0; - } - - protected byte readByte() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - byte result; - try + super.checkWritable(); + if(_readableMessage) { - switch (wireType) - { - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Byte.parseByte(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); - } + throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - return result; } - private byte readByteImpl() + public void clearBody() throws JMSException { - return _data.get(); + super.clearBody(); + _readableMessage = false; } - protected short readShort() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - short result; - try - { - switch (wireType) - { - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Short.parseShort(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a short"); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - return result; - } - private short readShortImpl() + public String toBodyString() throws JMSException { - return _data.getShort(); - } - - /** - * Note that this method reads a unicode character as two bytes from the stream - * - * @return the character read from the stream - * @throws javax.jms.JMSException - */ - protected char readChar() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); try { - if(wireType == NULL_STRING_TYPE){ - throw new NullPointerException(); + ByteBuffer data = getData(); + if (data != null) + { + return Functions.str(data, 100, 0); + } + else + { + return ""; } - if (wireType != CHAR_TYPE) - { - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a char"); - } - else - { - checkAvailable(2); - return readCharImpl(); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private char readCharImpl() - { - return _data.getChar(); - } - - protected int readInt() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - int result; - try - { - switch (wireType) - { - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Integer.parseInt(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to an int"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected int readIntImpl() - { - return _data.getInt(); - } - - protected long readLong() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - long result; - try - { - switch (wireType) - { - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Long.parseLong(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a long"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private long readLongImpl() - { - return _data.getLong(); - } - - protected float readFloat() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - float result; - try - { - switch (wireType) - { - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Float.parseFloat(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a float"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private float readFloatImpl() - { - return _data.getFloat(); - } - - protected double readDouble() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - double result; - try - { - switch (wireType) - { - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Double.parseDouble(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a double"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private double readDoubleImpl() - { - return _data.getDouble(); - } - - protected String readString() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - String result; - try - { - switch (wireType) - { - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - case NULL_STRING_TYPE: - result = null; - throw new NullPointerException("data is null"); - case BOOLEAN_TYPE: - checkAvailable(1); - result = String.valueOf(readBooleanImpl()); - break; - case LONG_TYPE: - checkAvailable(8); - result = String.valueOf(readLongImpl()); - break; - case INT_TYPE: - checkAvailable(4); - result = String.valueOf(readIntImpl()); - break; - case SHORT_TYPE: - checkAvailable(2); - result = String.valueOf(readShortImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = String.valueOf(readByteImpl()); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = String.valueOf(readFloatImpl()); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = String.valueOf(readDoubleImpl()); - break; - case CHAR_TYPE: - checkAvailable(2); - result = String.valueOf(readCharImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a String"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected String readStringImpl() throws JMSException - { - try - { - return _data.getString(Charset.forName("UTF-8").newDecoder()); } - catch (CharacterCodingException e) + catch (Exception e) { - JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + JMSException jmse = new JMSException(e.toString()); jmse.setLinkedException(e); jmse.initCause(e); throw jmse; } - } - - protected int readBytes(byte[] bytes) throws JMSException - { - if (bytes == null) - { - throw new IllegalArgumentException("byte array must not be null"); - } - checkReadable(); - // first call - if (_byteArrayRemaining == -1) - { - // type discriminator checked separately so you get a MessageFormatException rather than - // an EOF even in the case where both would be applicable - checkAvailable(1); - byte wireType = readWireType(); - if (wireType != BYTEARRAY_TYPE) - { - throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); - } - checkAvailable(4); - int size = _data.getInt(); - // length of -1 indicates null - if (size == -1) - { - return -1; - } - else - { - if (size > _data.remaining()) - { - throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " + - _data.remaining() + " bytes"); - } - else - { - _byteArrayRemaining = size; - } - } - } - else if (_byteArrayRemaining == 0) - { - _byteArrayRemaining = -1; - return -1; - } - - int returnedSize = readBytesImpl(bytes); - if (returnedSize < bytes.length) - { - _byteArrayRemaining = -1; - } - return returnedSize; - } - - private int readBytesImpl(byte[] bytes) - { - int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); - _byteArrayRemaining -= count; - - if (count == 0) - { - return 0; - } - else - { - _data.get(bytes, 0, count); - return count; - } - } - - protected Object readObject() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - Object result = null; - try - { - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case BYTEARRAY_TYPE: - checkAvailable(4); - int size = _data.getInt(); - if (size == -1) - { - result = null; - } - else - { - _byteArrayRemaining = size; - byte[] bytesResult = new byte[size]; - readBytesImpl(bytesResult); - result = bytesResult; - } - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case CHAR_TYPE: - checkAvailable(2); - result = readCharImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case NULL_STRING_TYPE: - result = null; - break; - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected void writeBoolean(boolean b) throws JMSException - { - writeTypeDiscriminator(BOOLEAN_TYPE); - _data.put(b ? (byte) 1 : (byte) 0); - } - - protected void writeByte(byte b) throws JMSException - { - writeTypeDiscriminator(BYTE_TYPE); - _data.put(b); - } - - protected void writeShort(short i) throws JMSException - { - writeTypeDiscriminator(SHORT_TYPE); - _data.putShort(i); - } - - protected void writeChar(char c) throws JMSException - { - writeTypeDiscriminator(CHAR_TYPE); - _data.putChar(c); - } - - protected void writeInt(int i) throws JMSException - { - writeTypeDiscriminator(INT_TYPE); - writeIntImpl(i); - } - - protected void writeIntImpl(int i) - { - _data.putInt(i); - } - - protected void writeLong(long l) throws JMSException - { - writeTypeDiscriminator(LONG_TYPE); - _data.putLong(l); - } - protected void writeFloat(float v) throws JMSException - { - writeTypeDiscriminator(FLOAT_TYPE); - _data.putFloat(v); } - protected void writeDouble(double v) throws JMSException - { - writeTypeDiscriminator(DOUBLE_TYPE); - _data.putDouble(v); - } - protected void writeString(String string) throws JMSException - { - if (string == null) - { - writeTypeDiscriminator(NULL_STRING_TYPE); - } - else - { - writeTypeDiscriminator(STRING_TYPE); - try - { - writeStringImpl(string); - } - catch (CharacterCodingException e) - { - JMSException jmse = new JMSException("Unable to encode string: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - } - } - - protected void writeStringImpl(String string) - throws CharacterCodingException - { - _data.putString(string, Charset.forName("UTF-8").newEncoder()); - // we must write the null terminator ourselves - _data.put((byte) 0); - } + abstract public void reset(); - protected void writeBytes(byte[] bytes) throws JMSException - { - writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); - } - protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException - { - writeTypeDiscriminator(BYTEARRAY_TYPE); - if (bytes == null) - { - _data.putInt(-1); - } - else - { - _data.putInt(length); - _data.put(bytes, offset, length); - } - } - protected void writeObject(Object object) throws JMSException - { - checkWritable(); - Class clazz; - if (object == null) - { - // string handles the output of null values - clazz = String.class; - } - else - { - clazz = object.getClass(); - } - - if (clazz == Byte.class) - { - writeByte((Byte) object); - } - else if (clazz == Boolean.class) - { - writeBoolean((Boolean) object); - } - else if (clazz == byte[].class) - { - writeBytes((byte[]) object); - } - else if (clazz == Short.class) - { - writeShort((Short) object); - } - else if (clazz == Character.class) - { - writeChar((Character) object); - } - else if (clazz == Integer.class) - { - writeInt((Integer) object); - } - else if (clazz == Long.class) - { - writeLong((Long) object); - } - else if (clazz == Float.class) - { - writeFloat((Float) object); - } - else if (clazz == Double.class) - { - writeDouble((Double) object); - } - else if (clazz == String.class) - { - writeString((String) object); - } - else - { - throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); - } - } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 6ba55b207a..f713554bfb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -20,66 +20,38 @@ */ package org.apache.qpid.client.message; -import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Enumeration; import java.util.UUID; import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message { - - protected ByteBuffer _data; - protected boolean _readableMessage = false; - protected boolean _changedData = true; - /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ - - - protected AMQMessageDelegate _delegate; private boolean _redelivered; + private boolean _receivedFromServer; - protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) + protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedData) { _delegate = delegateFactory.createDelegate(); - _data = data; - if (_data != null) - { - _data.acquire(); - } - - - _readableMessage = (data != null); - _changedData = (data == null); - + setContentType(getMimeType()); } - protected AbstractJMSMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException + protected AbstractJMSMessage(AMQMessageDelegate delegate, boolean fromReceivedData) throws AMQException { _delegate = delegate; - - _data = data; - if (_data != null) - { - _data.acquire(); - } - - _readableMessage = data != null; - + setContentType(getMimeType()); } public String getJMSMessageID() throws JMSException @@ -329,12 +301,9 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message public void clearBody() throws JMSException { - clearBodyImpl(); - _readableMessage = false; - + _receivedFromServer = false; } - public void acknowledgeThis() throws JMSException { _delegate.acknowledgeThis(); @@ -345,14 +314,7 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message _delegate.acknowledge(); } - /** - * This forces concrete classes to implement clearBody() - * - * @throws JMSException - */ - public abstract void clearBodyImpl() throws JMSException; - - /** + /* * Get a String representation of the body of the message. Used in the toString() method which outputs this before * message properties. */ @@ -413,63 +375,24 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message return _delegate; } - public ByteBuffer getData() - { - // make sure we rewind the data just in case any method has moved the - // position beyond the start - if (_data != null) - { - reset(); - } + abstract public ByteBuffer getData() throws JMSException; - return _data; - } - - protected void checkReadable() throws MessageNotReadableException - { - if (!_readableMessage) - { - throw new MessageNotReadableException("You need to call reset() to make the message readable"); - } - } protected void checkWritable() throws MessageNotWriteableException { - if (_readableMessage) + if (_receivedFromServer) { throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); } } - public void reset() - { - if (!_changedData) - { - _data.rewind(); - } - else - { - _data.flip(); - _changedData = false; - } - } - public int getContentLength() + public void setReceivedFromServer() { - if(_data != null) - { - return _data.remaining(); - } - else - { - return 0; - } + _receivedFromServer = true; } - public void receivedFromServer() - { - _changedData = false; - } + /** * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 40c1df0c5d..967a1fb49f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.client.message; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; @@ -38,6 +36,8 @@ import javax.jms.JMSException; import java.util.Iterator; import java.util.List; +import java.nio.ByteBuffer; + public abstract class AbstractJMSMessageFactory implements MessageFactory { private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class); @@ -57,7 +57,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize + ")"); } - data = ((ContentBody) bodies.get(0)).payload; + data = ByteBuffer.wrap(((ContentBody) bodies.get(0))._payload); } else if (bodies != null) { @@ -72,7 +72,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory while (it.hasNext()) { ContentBody cb = (ContentBody) it.next(); - final ByteBuffer payload = cb.payload; + final ByteBuffer payload = ByteBuffer.wrap(cb._payload); if(payload.isDirect() || payload.isReadOnly()) { data.put(payload); @@ -82,7 +82,6 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory data.put(payload.array(), payload.arrayOffset(), payload.limit()); } - payload.release(); } data.flip(); @@ -109,7 +108,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory protected AbstractJMSMessage create010MessageWithBody(long messageNbr, MessageProperties msgProps, - DeliveryProperties deliveryProps, + DeliveryProperties deliveryProps, java.nio.ByteBuffer body) throws AMQException { ByteBuffer data; @@ -118,7 +117,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory if (body != null) { - data = ByteBuffer.wrap(body); + data = body; } else // body == null { @@ -155,7 +154,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory { final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies); msg.setJMSRedelivered(redelivered); - msg.receivedFromServer(); + msg.setReceivedFromServer(); return msg; } @@ -166,7 +165,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory final AbstractJMSMessage msg = create010MessageWithBody(messageNbr,msgProps,deliveryProps, body); msg.setJMSRedelivered(redelivered); - msg.receivedFromServer(); + msg.setReceivedFromServer(); return msg; } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index b87275a9ce..e252bdb719 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.client.message; +import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; @@ -28,47 +29,56 @@ import java.nio.charset.CharsetEncoder; import javax.jms.BytesMessage; import javax.jms.JMSException; +import javax.jms.MessageEOFException; import javax.jms.MessageFormatException; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; -public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage +public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesMessage { public static final String MIME_TYPE = "application/octet-stream"; + private TypedBytesContentReader _typedBytesContentReader; + private TypedBytesContentWriter _typedBytesContentWriter; - public JMSBytesMessage(AMQMessageDelegateFactory delegateFactory) - { - this(delegateFactory,null); - } - - /** - * Construct a bytes message with existing data. - * - * @param delegateFactory - * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - */ - JMSBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) + public JMSBytesMessage(AMQMessageDelegateFactory delegateFactory) { - - super(delegateFactory, data); // this instanties a content header + super(delegateFactory,false); + _typedBytesContentWriter = new TypedBytesContentWriter(); } JMSBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - super(delegate, data); + super(delegate, data!=null); + _typedBytesContentReader = new TypedBytesContentReader(data); } public void reset() { - super.reset(); _readableMessage = true; + + if(_typedBytesContentReader != null) + { + _typedBytesContentReader.reset(); + } + else if (_typedBytesContentWriter != null) + { + _typedBytesContentReader = new TypedBytesContentReader(_typedBytesContentWriter.getData()); + } + } + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + _typedBytesContentReader = null; + _typedBytesContentWriter = new TypedBytesContentWriter(); + } protected String getMimeType() @@ -76,45 +86,57 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag return MIME_TYPE; } + @Override + public java.nio.ByteBuffer getData() throws JMSException + { + return _typedBytesContentWriter == null ? _typedBytesContentReader.getData() : _typedBytesContentWriter.getData(); + } + public long getBodyLength() throws JMSException { checkReadable(); - return _data.limit(); + return _typedBytesContentReader.size(); } public boolean readBoolean() throws JMSException { checkReadable(); checkAvailable(1); - return _data.get() != 0; + + return _typedBytesContentReader.readBooleanImpl(); + } + + private void checkAvailable(final int i) throws MessageEOFException + { + _typedBytesContentReader.checkAvailable(1); } public byte readByte() throws JMSException { checkReadable(); checkAvailable(1); - return _data.get(); + return _typedBytesContentReader.readByteImpl(); } public int readUnsignedByte() throws JMSException { checkReadable(); checkAvailable(1); - return _data.getUnsigned(); + return _typedBytesContentReader.readByteImpl() & 0xFF; } public short readShort() throws JMSException { checkReadable(); checkAvailable(2); - return _data.getShort(); + return _typedBytesContentReader.readShortImpl(); } public int readUnsignedShort() throws JMSException { checkReadable(); checkAvailable(2); - return _data.getUnsignedShort(); + return _typedBytesContentReader.readShortImpl() & 0xFFFF; } /** @@ -127,35 +149,35 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag { checkReadable(); checkAvailable(2); - return _data.getChar(); + return _typedBytesContentReader.readCharImpl(); } public int readInt() throws JMSException { checkReadable(); checkAvailable(4); - return _data.getInt(); + return _typedBytesContentReader.readIntImpl(); } public long readLong() throws JMSException { checkReadable(); checkAvailable(8); - return _data.getLong(); + return _typedBytesContentReader.readLongImpl(); } public float readFloat() throws JMSException { checkReadable(); checkAvailable(4); - return _data.getFloat(); + return _typedBytesContentReader.readFloatImpl(); } public double readDouble() throws JMSException { checkReadable(); checkAvailable(8); - return _data.getDouble(); + return _typedBytesContentReader.readDoubleImpl(); } public String readUTF() throws JMSException @@ -164,34 +186,7 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag // we check only for one byte since theoretically the string could be only a // single byte when using UTF-8 encoding - try - { - short length = readShort(); - if(length == 0) - { - return ""; - } - else - { - CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); - ByteBuffer encodedString = _data.slice(); - encodedString.limit(length); - _data.position(_data.position()+length); - CharBuffer string = decoder.decode(encodedString.buf()); - - return string.toString(); - } - - - - } - catch (CharacterCodingException e) - { - JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } + return _typedBytesContentReader.readLengthPrefixedUTF(); } public int readBytes(byte[] bytes) throws JMSException @@ -201,14 +196,14 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag throw new IllegalArgumentException("byte array must not be null"); } checkReadable(); - int count = (_data.remaining() >= bytes.length ? bytes.length : _data.remaining()); + int count = (_typedBytesContentReader.remaining() >= bytes.length ? bytes.length : _typedBytesContentReader.remaining()); if (count == 0) { return -1; } else { - _data.get(bytes, 0, count); + _typedBytesContentReader.readRawBytes(bytes, 0, count); return count; } } @@ -224,110 +219,82 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag throw new IllegalArgumentException("maxLength must be <= bytes.length"); } checkReadable(); - int count = (_data.remaining() >= maxLength ? maxLength : _data.remaining()); + int count = (_typedBytesContentReader.remaining() >= maxLength ? maxLength : _typedBytesContentReader.remaining()); if (count == 0) { return -1; } else { - _data.get(bytes, 0, count); + _typedBytesContentReader.readRawBytes(bytes, 0, count); return count; } } + public void writeBoolean(boolean b) throws JMSException { checkWritable(); - _changedData = true; - _data.put(b ? (byte) 1 : (byte) 0); + _typedBytesContentWriter.writeBooleanImpl(b); } public void writeByte(byte b) throws JMSException { checkWritable(); - _changedData = true; - _data.put(b); + _typedBytesContentWriter.writeByteImpl(b); } public void writeShort(short i) throws JMSException { checkWritable(); - _changedData = true; - _data.putShort(i); + _typedBytesContentWriter.writeShortImpl(i); } public void writeChar(char c) throws JMSException { checkWritable(); - _changedData = true; - _data.putChar(c); + _typedBytesContentWriter.writeCharImpl(c); } public void writeInt(int i) throws JMSException { checkWritable(); - _changedData = true; - _data.putInt(i); + _typedBytesContentWriter.writeIntImpl(i); } public void writeLong(long l) throws JMSException { checkWritable(); - _changedData = true; - _data.putLong(l); + _typedBytesContentWriter.writeLongImpl(l); } public void writeFloat(float v) throws JMSException { checkWritable(); - _changedData = true; - _data.putFloat(v); + _typedBytesContentWriter.writeFloatImpl(v); } public void writeDouble(double v) throws JMSException { checkWritable(); - _changedData = true; - _data.putDouble(v); + _typedBytesContentWriter.writeDoubleImpl(v); } public void writeUTF(String string) throws JMSException { checkWritable(); - try - { - CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder(); - java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string)); - - _data.putShort((short)encodedString.limit()); - _data.put(encodedString); - _changedData = true; - //_data.putString(string, Charset.forName("UTF-8").newEncoder()); - // we must add the null terminator manually - //_data.put((byte)0); - } - catch (CharacterCodingException e) - { - JMSException jmse = new JMSException("Unable to encode string: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } + _typedBytesContentWriter.writeLengthPrefixedUTF(string); } public void writeBytes(byte[] bytes) throws JMSException { - checkWritable(); - _data.put(bytes); - _changedData = true; + writeBytes(bytes, 0, bytes.length); } public void writeBytes(byte[] bytes, int offset, int length) throws JMSException { checkWritable(); - _data.put(bytes, offset, length); - _changedData = true; + _typedBytesContentWriter.writeBytesRaw(bytes, offset, length); } public void writeObject(Object object) throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java index cb04ebee1b..89561b88eb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java @@ -22,11 +22,12 @@ package org.apache.qpid.client.message; import javax.jms.JMSException; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; +import java.nio.ByteBuffer; + public class JMSBytesMessageFactory extends AbstractJMSMessageFactory { protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java index e295d4a2a0..52c0eb263b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java @@ -20,12 +20,15 @@ */ package org.apache.qpid.client.message; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; import java.util.Enumeration; import javax.jms.JMSException; import javax.jms.MessageFormatException; -import org.apache.mina.common.ByteBuffer; +import java.nio.ByteBuffer; import org.apache.qpid.AMQPInvalidClassException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -282,7 +285,7 @@ public final class JMSHeaderAdapter s = String.valueOf(o); } } - }//else return s // null; + }//else return s // null; } return s; @@ -458,9 +461,29 @@ public final class JMSHeaderAdapter return getHeaders().isEmpty(); } - public void writeToBuffer(ByteBuffer data) + public void writeToBuffer(final ByteBuffer data) { - getHeaders().writeToBuffer(data); + try + { + getHeaders().writeToBuffer(new DataOutputStream(new OutputStream() + { + @Override + public void write(final int b) + { + data.put((byte)b); + } + + @Override + public void write(final byte[] b, final int off, final int len) + { + data.put(b, off, len); + } + })); + } + catch (IOException e) + { + throw new IllegalArgumentException("Unexpected IO Exception - should never happen", e); + } } public Enumeration getMapNames() diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index 306ffeeadf..fad24a968e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -20,11 +20,8 @@ */ package org.apache.qpid.client.message; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,13 +29,14 @@ import org.slf4j.LoggerFactory; import javax.jms.JMSException; import javax.jms.MessageFormatException; +import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; -public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage +public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMessage { private static final Logger _logger = LoggerFactory.getLogger(JMSMapMessage.class); @@ -54,10 +52,10 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm JMSMapMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException { - super(delegateFactory, data); // this instantiates a content header + super(delegateFactory, data!=null); // this instantiates a content header if(data != null) { - populateMapFromData(); + populateMapFromData(data); } } @@ -65,10 +63,10 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm JMSMapMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - super(delegate, data); + super(delegate, data != null); try { - populateMapFromData(); + populateMapFromData(data); } catch (JMSException je) { @@ -89,18 +87,10 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm return MIME_TYPE; } - public ByteBuffer getData() - { - // What if _data is null? - writeMapToData(); - - return super.getData(); - } - @Override - public void clearBodyImpl() throws JMSException + public void clearBody() throws JMSException { - super.clearBodyImpl(); + super.clearBody(); _map.clear(); } @@ -458,17 +448,18 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm return _map.containsKey(propName); } - protected void populateMapFromData() throws JMSException + protected void populateMapFromData(ByteBuffer data) throws JMSException { - if (_data != null) + TypedBytesContentReader reader = new TypedBytesContentReader(data); + if (data != null) { - _data.rewind(); + data.rewind(); - final int entries = readIntImpl(); + final int entries = reader.readIntImpl(); for (int i = 0; i < entries; i++) { - String propName = readStringImpl(); - Object value = readObject(); + String propName = reader.readStringImpl(); + Object value = reader.readObject(); _map.put(propName, value); } } @@ -478,35 +469,21 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm } } - protected void writeMapToData() + public ByteBuffer getData() + throws JMSException { - allocateInitialBuffer(); + TypedBytesContentWriter writer = new TypedBytesContentWriter(); + final int size = _map.size(); - writeIntImpl(size); + writer.writeIntImpl(size); for (Map.Entry<String, Object> entry : _map.entrySet()) { - try - { - writeStringImpl(entry.getKey()); - } - catch (CharacterCodingException e) - { - throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(), e); - - } + writer.writeNullTerminatedStringImpl(entry.getKey()); - try - { - writeObject(entry.getValue()); - } - catch (JMSException e) - { - Object value = entry.getValue(); - throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() + " value : " + value - + " (type: " + value.getClass().getName() + ").", e); - } + writer.writeObject(entry.getValue()); } + return writer.getData(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java index eccb90560b..89408a5c3c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java @@ -14,18 +14,16 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.client.message; import javax.jms.JMSException; -import org.apache.mina.common.ByteBuffer; +import java.nio.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSMapMessageFactory extends AbstractJMSMessageFactory { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 370e2d6c55..c981c951c3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -20,27 +20,28 @@ */ package org.apache.qpid.client.message; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; +import java.io.*; +import java.nio.ByteBuffer; import javax.jms.JMSException; import javax.jms.MessageFormatException; import javax.jms.ObjectMessage; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.AMQException; import org.apache.qpid.client.util.ClassLoadingAwareObjectInputStream; public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage { public static final String MIME_TYPE = "application/java-object-stream"; + private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 256; + + private Serializable _readData; + private ByteBuffer _data; + private Exception _exception; + + private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); - private static final int DEFAULT_BUFFER_SIZE = 1024; /** * Creates empty, writable message for use by producers @@ -48,41 +49,57 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag */ public JMSObjectMessage(AMQMessageDelegateFactory delegateFactory) { - this(delegateFactory, null); - } - - private JMSObjectMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) - { - super(delegateFactory, data); - if (data == null) - { - _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); - _data.setAutoExpand(true); - } - - setContentType(getMimeType()); + super(delegateFactory, false); } /** * Creates read only message for delivery to consumers */ - JMSObjectMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException + JMSObjectMessage(AMQMessageDelegate delegate, final ByteBuffer data) throws AMQException { - super(delegate, data); + super(delegate, data!=null); + + try + { + ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new InputStream() + { + + + @Override + public int read() throws IOException + { + return data.get(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + len = data.remaining() < len ? data.remaining() : len; + data.get(b, off, len); + return len; + } + }); + + _readData = (Serializable) in.readObject(); + } + catch (IOException e) + { + _exception = e; + } + catch (ClassNotFoundException e) + { + _exception = e; + } } - public void clearBodyImpl() throws JMSException + public void clearBody() throws JMSException { - if (_data != null) - { - _data.release(); - _data = null; - } - - - + super.clearBody(); + _exception = null; + _readData = null; + _data = null; } public String toBodyString() throws JMSException @@ -95,83 +112,116 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag return MIME_TYPE; } - public void setObject(Serializable serializable) throws JMSException + @Override + public ByteBuffer getData() throws JMSException { - checkWritable(); - - if (_data == null) + if(_exception != null) + { + final MessageFormatException messageFormatException = + new MessageFormatException("Unable to deserialize message"); + messageFormatException.setLinkedException(_exception); + throw messageFormatException; + } + if(_readData == null) { - _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); - _data.setAutoExpand(true); + + return _data == null ? EMPTY_BYTE_BUFFER : _data.duplicate(); } else { - _data.rewind(); + try + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(DEFAULT_OUTPUT_BUFFER_SIZE); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(_readData); + oos.flush(); + return ByteBuffer.wrap(baos.toByteArray()); + } + catch (IOException e) + { + final JMSException jmsException = new JMSException("Unable to encode object of type: " + + _readData.getClass().getName() + ", value " + _readData); + jmsException.setLinkedException(e); + throw jmsException; + } } + } + + public void setObject(Serializable serializable) throws JMSException + { + checkWritable(); + clearBody(); try { - ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream()); - out.writeObject(serializable); - out.flush(); - out.close(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(DEFAULT_OUTPUT_BUFFER_SIZE); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(serializable); + oos.flush(); + _data = ByteBuffer.wrap(baos.toByteArray()); } catch (IOException e) { - MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e); - mfe.setLinkedException(e); - mfe.initCause(e); - throw mfe; + final JMSException jmsException = new JMSException("Unable to encode object of type: " + + serializable.getClass().getName() + ", value " + serializable); + jmsException.setLinkedException(e); + throw jmsException; } } public Serializable getObject() throws JMSException { - ObjectInputStream in = null; - if (_data == null) + if(_exception != null) { - return null; + final MessageFormatException messageFormatException = new MessageFormatException("Unable to deserialize message"); + messageFormatException.setLinkedException(_exception); + throw messageFormatException; } - - try + else if(_readData != null || _data == null) { - _data.rewind(); - in = new ClassLoadingAwareObjectInputStream(_data.asInputStream()); - - return (Serializable) in.readObject(); + return _readData; } - catch (IOException e) - { - MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); - mfe.setLinkedException(e); - mfe.initCause(e); - throw mfe; - } - catch (ClassNotFoundException e) - { - MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); - mfe.setLinkedException(e); - mfe.initCause(e); - throw mfe; - } - finally + else { - // _data.rewind(); - close(in); - } - } + Exception exception = null; - private static void close(InputStream in) - { - try - { - if (in != null) + final ByteBuffer data = _data.duplicate(); + try + { + ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new InputStream() + { + @Override + public int read() throws IOException + { + return data.get(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + len = data.remaining() < len ? data.remaining() : len; + data.get(b, off, len); + return len; + } + }); + + return (Serializable) in.readObject(); + } + catch (ClassNotFoundException e) + { + exception = e; + } + catch (IOException e) { - in.close(); + exception = e; } + + JMSException jmsException = new JMSException("Could not deserialize object"); + jmsException.setLinkedException(exception); + throw jmsException; } - catch (IOException ignore) - { } + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java index 03851dfa01..4660c91c1f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,10 +22,8 @@ package org.apache.qpid.client.message; import javax.jms.JMSException; -import org.apache.mina.common.ByteBuffer; +import java.nio.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSObjectMessageFactory extends AbstractJMSMessageFactory { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index ad2620852b..5c93f6b6f0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -23,7 +23,8 @@ package org.apache.qpid.client.message; import javax.jms.JMSException; import javax.jms.StreamMessage; -import org.apache.mina.common.ByteBuffer; +import java.nio.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -36,65 +37,76 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea public static final String MIME_TYPE="jms/stream-message"; - - /** - * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read - * a byte array in multiple chunks, hence this is used to track how much is left to be read - */ - private int _byteArrayRemaining = -1; + private TypedBytesContentReader _typedBytesContentReader; + private TypedBytesContentWriter _typedBytesContentWriter; public JMSStreamMessage(AMQMessageDelegateFactory delegateFactory) { - this(delegateFactory,null); + super(delegateFactory,false); + _typedBytesContentWriter = new TypedBytesContentWriter(); } - /** - * Construct a stream message with existing data. - * - * @param delegateFactory - * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - */ - JMSStreamMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) - { - super(delegateFactory, data); // this instanties a content header - } JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - - super(delegate, data); + super(delegate, data!=null); + _typedBytesContentReader = new TypedBytesContentReader(data); } - public void reset() { - super.reset(); _readableMessage = true; + + if(_typedBytesContentReader != null) + { + _typedBytesContentReader.reset(); + } + else if (_typedBytesContentWriter != null) + { + _typedBytesContentReader = new TypedBytesContentReader(_typedBytesContentWriter.getData()); + } + } + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + _typedBytesContentReader = null; + _typedBytesContentWriter = new TypedBytesContentWriter(); + } + protected String getMimeType() { return MIME_TYPE; } - + @Override + public java.nio.ByteBuffer getData() throws JMSException + { + return _typedBytesContentWriter == null ? _typedBytesContentReader.getData() : _typedBytesContentWriter.getData(); + } public boolean readBoolean() throws JMSException { - return super.readBoolean(); + checkReadable(); + return _typedBytesContentReader.readBoolean(); } public byte readByte() throws JMSException { - return super.readByte(); + checkReadable(); + return _typedBytesContentReader.readByte(); } public short readShort() throws JMSException { - return super.readShort(); + checkReadable(); + return _typedBytesContentReader.readShort(); } /** @@ -105,102 +117,127 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea */ public char readChar() throws JMSException { - return super.readChar(); + checkReadable(); + return _typedBytesContentReader.readChar(); } public int readInt() throws JMSException { - return super.readInt(); + checkReadable(); + return _typedBytesContentReader.readInt(); } public long readLong() throws JMSException { - return super.readLong(); + checkReadable(); + return _typedBytesContentReader.readLong(); } public float readFloat() throws JMSException { - return super.readFloat(); + checkReadable(); + return _typedBytesContentReader.readFloat(); } public double readDouble() throws JMSException { - return super.readDouble(); + checkReadable(); + return _typedBytesContentReader.readDouble(); } public String readString() throws JMSException { - return super.readString(); + checkReadable(); + return _typedBytesContentReader.readString(); } public int readBytes(byte[] bytes) throws JMSException { - return super.readBytes(bytes); + if(bytes == null) + { + throw new IllegalArgumentException("Must provide non-null array to read into"); + } + + checkReadable(); + return _typedBytesContentReader.readBytes(bytes); } public Object readObject() throws JMSException { - return super.readObject(); + checkReadable(); + return _typedBytesContentReader.readObject(); } public void writeBoolean(boolean b) throws JMSException { - super.writeBoolean(b); + checkWritable(); + _typedBytesContentWriter.writeBoolean(b); } public void writeByte(byte b) throws JMSException { - super.writeByte(b); + checkWritable(); + _typedBytesContentWriter.writeByte(b); } public void writeShort(short i) throws JMSException { - super.writeShort(i); + checkWritable(); + _typedBytesContentWriter.writeShort(i); } public void writeChar(char c) throws JMSException { - super.writeChar(c); + checkWritable(); + _typedBytesContentWriter.writeChar(c); } public void writeInt(int i) throws JMSException { - super.writeInt(i); + checkWritable(); + _typedBytesContentWriter.writeInt(i); } public void writeLong(long l) throws JMSException { - super.writeLong(l); + checkWritable(); + _typedBytesContentWriter.writeLong(l); } public void writeFloat(float v) throws JMSException { - super.writeFloat(v); + checkWritable(); + _typedBytesContentWriter.writeFloat(v); } public void writeDouble(double v) throws JMSException { - super.writeDouble(v); + checkWritable(); + _typedBytesContentWriter.writeDouble(v); } public void writeString(String string) throws JMSException { - super.writeString(string); + checkWritable(); + _typedBytesContentWriter.writeString(string); } public void writeBytes(byte[] bytes) throws JMSException { - super.writeBytes(bytes); + checkWritable(); + _typedBytesContentWriter.writeBytes(bytes); } public void writeBytes(byte[] bytes, int offset, int length) throws JMSException { - super.writeBytes(bytes,offset,length); + checkWritable(); + _typedBytesContentWriter.writeBytes(bytes, offset, length); } public void writeObject(Object object) throws JMSException { - super.writeObject(object); + checkWritable(); + _typedBytesContentWriter.writeObject(object); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java index 5e25db9ae0..359f5157f3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java @@ -22,10 +22,9 @@ package org.apache.qpid.client.message; import javax.jms.JMSException; -import org.apache.mina.common.ByteBuffer; +import java.nio.ByteBuffer; + import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSStreamMessageFactory extends AbstractJMSMessageFactory { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index fc2006a119..acf3a0ca14 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -20,15 +20,21 @@ */ package org.apache.qpid.client.message; +import java.io.DataInputStream; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; import javax.jms.JMSException; +import javax.jms.MessageFormatException; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.util.Strings; @@ -37,6 +43,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text { private static final String MIME_TYPE = "text/plain"; + private Exception _exception; private String _decodedValue; /** @@ -45,36 +52,41 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text private static final String PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.toString(); private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8"); - public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException - { - this(delegateFactory, null, null); - } + private CharsetDecoder _decoder = DEFAULT_CHARSET.newDecoder(); + private CharsetEncoder _encoder = DEFAULT_CHARSET.newEncoder(); + + private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); - JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data, String encoding) throws JMSException + public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException { - super(delegateFactory, data); // this instantiates a content header - setContentType(getMimeType()); - setEncoding(encoding); + super(delegateFactory, false); // this instantiates a content header } JMSTextMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - super(delegate, data); - setContentType(getMimeType()); - _data = data; - } + super(delegate, data!=null); - - public void clearBodyImpl() throws JMSException - { - if (_data != null) + try { - _data.release(); - _data = null; + if(propertyExists(PAYLOAD_NULL_PROPERTY)) + { + _decodedValue = null; + } + else + { + _decodedValue = _decoder.decode(data).toString(); + } + } + catch (CharacterCodingException e) + { + _exception = e; + } + catch (JMSException e) + { + _exception = e; } - _decodedValue = null; } public String toBodyString() throws JMSException @@ -87,95 +99,62 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text return MIME_TYPE; } - public void setText(String text) throws JMSException + @Override + public ByteBuffer getData() throws JMSException { - checkWritable(); - - clearBody(); + _encoder.reset(); try { - if (text != null) + if(_exception != null) + { + final MessageFormatException messageFormatException = new MessageFormatException("Cannot decode original message"); + messageFormatException.setLinkedException(_exception); + throw messageFormatException; + } + else if(_decodedValue == null) + { + return EMPTY_BYTE_BUFFER; + } + else { - final String encoding = getEncoding(); - if (encoding == null || encoding.equalsIgnoreCase("UTF-8")) - { - _data = ByteBuffer.wrap(Strings.toUTF8(text)); - setEncoding("UTF-8"); - } - else - { - _data = ByteBuffer.wrap(text.getBytes(encoding)); - } - _data.position(_data.limit()); - _changedData=true; + return _encoder.encode(CharBuffer.wrap(_decodedValue)); } - _decodedValue = text; } - catch (UnsupportedEncodingException e) + catch (CharacterCodingException e) { - // should never occur - JMSException jmse = new JMSException("Unable to decode text data"); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; + final JMSException jmsException = new JMSException("Cannot encode string in UFT-8: " + _decodedValue); + jmsException.setLinkedException(e); + throw jmsException; } } - public String getText() throws JMSException + @Override + public void clearBody() throws JMSException { - if (_data == null && _decodedValue == null) - { - return null; - } - else if (_decodedValue != null) - { - return _decodedValue; - } - else - { - _data.rewind(); + super.clearBody(); + _decodedValue = null; + _exception = null; + } - if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY)) - { - return null; - } - if (getEncoding() != null) - { - try - { - _decodedValue = _data.getString(Charset.forName(getEncoding()).newDecoder()); - } - catch (CharacterCodingException e) - { - JMSException jmse = new JMSException("Could not decode string data: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - } - else - { - try - { - _decodedValue = _data.getString(DEFAULT_CHARSET.newDecoder()); - } - catch (CharacterCodingException e) - { - JMSException jmse = new JMSException("Could not decode string data: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - } - return _decodedValue; - } + public void setText(String text) throws JMSException + { + checkWritable(); + + clearBody(); + _decodedValue = text; + + } + + public String getText() throws JMSException + { + return _decodedValue; } @Override public void prepareForSending() throws JMSException { super.prepareForSending(); - if (_data == null) + if (_decodedValue == null) { setBooleanProperty(PAYLOAD_NULL_PROPERTY, true); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java index 1f4d64c78f..d1af32c10a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java @@ -22,7 +22,7 @@ package org.apache.qpid.client.message; import javax.jms.JMSException; -import org.apache.mina.common.ByteBuffer; +import java.nio.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java b/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java new file mode 100644 index 0000000000..26a0b41cdc --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +public interface TypedBytesCodes +{ + static final byte BOOLEAN_TYPE = (byte) 1; + + static final byte BYTE_TYPE = (byte) 2; + + static final byte BYTEARRAY_TYPE = (byte) 3; + + static final byte SHORT_TYPE = (byte) 4; + + static final byte CHAR_TYPE = (byte) 5; + + static final byte INT_TYPE = (byte) 6; + + static final byte LONG_TYPE = (byte) 7; + + static final byte FLOAT_TYPE = (byte) 8; + + static final byte DOUBLE_TYPE = (byte) 9; + + static final byte STRING_TYPE = (byte) 10; + + static final byte NULL_STRING_TYPE = (byte) 11; +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java b/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java new file mode 100644 index 0000000000..1ae25eb1ed --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java @@ -0,0 +1,674 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotReadableException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; + +class TypedBytesContentReader implements TypedBytesCodes +{ + + private final ByteBuffer _data; + private final int _position; + private final int _limit; + + + private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); + + private final CharsetDecoder _charsetDecoder = UTF8_CHARSET.newDecoder(); + + private int _byteArrayRemaining = -1; + + + public TypedBytesContentReader(final ByteBuffer data) + { + _data = data.duplicate(); + _position = _data.position(); + _limit = _data.limit(); + } + + /** + * Check that there is at least a certain number of bytes available to read + * + * @param len the number of bytes + * @throws javax.jms.MessageEOFException if there are less than len bytes available to read + */ + protected void checkAvailable(int len) throws MessageEOFException + { + if (_data.remaining() < len) + { + throw new MessageEOFException("Unable to read " + len + " bytes"); + } + } + + protected byte readWireType() throws MessageFormatException, MessageEOFException, + MessageNotReadableException + { + checkAvailable(1); + return _data.get(); + } + + protected boolean readBoolean() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + boolean result; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Boolean.parseBoolean(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + boolean readBooleanImpl() + { + return _data.get() != 0; + } + + protected byte readByte() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + byte result; + try + { + switch (wireType) + { + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Byte.parseByte(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + byte readByteImpl() + { + return _data.get(); + } + + protected short readShort() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + short result; + try + { + switch (wireType) + { + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Short.parseShort(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + short readShortImpl() + { + return _data.getShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws javax.jms.JMSException + */ + protected char readChar() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + try + { + if (wireType == NULL_STRING_TYPE) + { + throw new NullPointerException(); + } + + if (wireType != CHAR_TYPE) + { + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + } + else + { + checkAvailable(2); + return readCharImpl(); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + char readCharImpl() + { + return _data.getChar(); + } + + protected int readInt() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + int result; + try + { + switch (wireType) + { + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Integer.parseInt(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected int readIntImpl() + { + return _data.getInt(); + } + + protected long readLong() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + long result; + try + { + switch (wireType) + { + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Long.parseLong(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + long readLongImpl() + { + return _data.getLong(); + } + + protected float readFloat() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + float result; + try + { + switch (wireType) + { + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Float.parseFloat(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + float readFloatImpl() + { + return _data.getFloat(); + } + + protected double readDouble() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + double result; + try + { + switch (wireType) + { + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Double.parseDouble(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + double readDoubleImpl() + { + return _data.getDouble(); + } + + protected String readString() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + String result; + try + { + switch (wireType) + { + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + case NULL_STRING_TYPE: + result = null; + throw new NullPointerException("data is null"); + case BOOLEAN_TYPE: + checkAvailable(1); + result = String.valueOf(readBooleanImpl()); + break; + case LONG_TYPE: + checkAvailable(8); + result = String.valueOf(readLongImpl()); + break; + case INT_TYPE: + checkAvailable(4); + result = String.valueOf(readIntImpl()); + break; + case SHORT_TYPE: + checkAvailable(2); + result = String.valueOf(readShortImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = String.valueOf(readByteImpl()); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = String.valueOf(readFloatImpl()); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = String.valueOf(readDoubleImpl()); + break; + case CHAR_TYPE: + checkAvailable(2); + result = String.valueOf(readCharImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected String readStringImpl() throws JMSException + { + try + { + _charsetDecoder.reset(); + ByteBuffer dup = _data.duplicate(); + int pos = _data.position(); + byte b; + while((b = _data.get()) != 0); + dup.limit(_data.position()-1); + return _charsetDecoder.decode(dup).toString(); + + } + catch (CharacterCodingException e) + { + JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; + } + } + + protected int readBytes(byte[] bytes) throws JMSException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + // first call + if (_byteArrayRemaining == -1) + { + // type discriminator checked separately so you get a MessageFormatException rather than + // an EOF even in the case where both would be applicable + checkAvailable(1); + byte wireType = readWireType(); + if (wireType != BYTEARRAY_TYPE) + { + throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); + } + checkAvailable(4); + int size = _data.getInt(); + // length of -1 indicates null + if (size == -1) + { + return -1; + } + else + { + if (size > _data.remaining()) + { + throw new MessageEOFException("Byte array has stated length " + + size + + " but message only contains " + + + _data.remaining() + + " bytes"); + } + else + { + _byteArrayRemaining = size; + } + } + } + else if (_byteArrayRemaining == 0) + { + _byteArrayRemaining = -1; + return -1; + } + + int returnedSize = readBytesImpl(bytes); + if (returnedSize < bytes.length) + { + _byteArrayRemaining = -1; + } + return returnedSize; + } + + private int readBytesImpl(byte[] bytes) + { + int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); + _byteArrayRemaining -= count; + + if (count == 0) + { + return 0; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + protected Object readObject() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + Object result = null; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case BYTEARRAY_TYPE: + checkAvailable(4); + int size = _data.getInt(); + if (size == -1) + { + result = null; + } + else + { + _byteArrayRemaining = size; + byte[] bytesResult = new byte[size]; + readBytesImpl(bytesResult); + result = bytesResult; + } + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case CHAR_TYPE: + checkAvailable(2); + result = readCharImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case NULL_STRING_TYPE: + result = null; + break; + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public void reset() + { + _byteArrayRemaining = -1; + _data.position(_position); + _data.limit(_limit); + } + + public ByteBuffer getData() + { + ByteBuffer buf = _data.duplicate(); + buf.position(_position); + buf.limit(_limit); + return buf; + } + + public long size() + { + return _limit - _position; + } + + public int remaining() + { + return _data.remaining(); + } + + public void readRawBytes(final byte[] bytes, final int offset, final int count) + { + _data.get(bytes, offset, count); + } + + public String readLengthPrefixedUTF() throws JMSException + { + try + { + short length = readShortImpl(); + if(length == 0) + { + return ""; + } + else + { + _charsetDecoder.reset(); + ByteBuffer encodedString = _data.slice(); + encodedString.limit(length); + _data.position(_data.position()+length); + CharBuffer string = _charsetDecoder.decode(encodedString); + + return string.toString(); + } + } + catch(CharacterCodingException e) + { + JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java b/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java new file mode 100644 index 0000000000..7c91db3a32 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java @@ -0,0 +1,370 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; + +class TypedBytesContentWriter implements TypedBytesCodes +{ + private final ByteArrayOutputStream _baos = new ByteArrayOutputStream(); + private final DataOutputStream _data = new DataOutputStream(_baos); + private static final Charset UTF8 = Charset.forName("UTF-8"); + + protected void writeTypeDiscriminator(byte type) throws JMSException + { + try + { + _data.writeByte(type); + } + catch (IOException e) + { + throw handle(e); + } + } + + private JMSException handle(final IOException e) + { + JMSException jmsEx = new JMSException("Unable to write value: " + e.getMessage()); + jmsEx.setLinkedException(e); + return jmsEx; + } + + + protected void writeBoolean(boolean b) throws JMSException + { + writeTypeDiscriminator(BOOLEAN_TYPE); + writeBooleanImpl(b); + } + + public void writeBooleanImpl(final boolean b) throws JMSException + { + try + { + _data.writeByte(b ? (byte) 1 : (byte) 0); + } + catch (IOException e) + { + throw handle(e); + } + } + + protected void writeByte(byte b) throws JMSException + { + writeTypeDiscriminator(BYTE_TYPE); + writeByteImpl(b); + } + + public void writeByteImpl(final byte b) throws JMSException + { + try + { + _data.writeByte(b); + } + catch (IOException e) + { + throw handle(e); + } + } + + protected void writeShort(short i) throws JMSException + { + writeTypeDiscriminator(SHORT_TYPE); + writeShortImpl(i); + } + + public void writeShortImpl(final short i) throws JMSException + { + try + { + _data.writeShort(i); + } + catch (IOException e) + { + throw handle(e); + } + } + + protected void writeChar(char c) throws JMSException + { + writeTypeDiscriminator(CHAR_TYPE); + writeCharImpl(c); + } + + public void writeCharImpl(final char c) throws JMSException + { + try + { + _data.writeChar(c); + } + catch (IOException e) + { + throw handle(e); + } + } + + protected void writeInt(int i) throws JMSException + { + writeTypeDiscriminator(INT_TYPE); + writeIntImpl(i); + } + + protected void writeIntImpl(int i) throws JMSException + { + try + { + _data.writeInt(i); + } + catch (IOException e) + { + throw handle(e); + } + } + + protected void writeLong(long l) throws JMSException + { + writeTypeDiscriminator(LONG_TYPE); + writeLongImpl(l); + } + + public void writeLongImpl(final long l) throws JMSException + { + try + { + _data.writeLong(l); + } + catch (IOException e) + { + throw handle(e); + } + } + + protected void writeFloat(float v) throws JMSException + { + writeTypeDiscriminator(FLOAT_TYPE); + writeFloatImpl(v); + } + + public void writeFloatImpl(final float v) throws JMSException + { + try + { + _data.writeFloat(v); + } + catch (IOException e) + { + throw handle(e); + } + } + + protected void writeDouble(double v) throws JMSException + { + writeTypeDiscriminator(DOUBLE_TYPE); + writeDoubleImpl(v); + } + + public void writeDoubleImpl(final double v) throws JMSException + { + try + { + _data.writeDouble(v); + } + catch (IOException e) + { + throw handle(e); + } + } + + protected void writeString(String string) throws JMSException + { + if (string == null) + { + writeTypeDiscriminator(NULL_STRING_TYPE); + } + else + { + writeTypeDiscriminator(STRING_TYPE); + writeNullTerminatedStringImpl(string); + } + } + + protected void writeNullTerminatedStringImpl(String string) + throws JMSException + { + try + { + _data.write(string.getBytes(UTF8)); + _data.writeByte((byte) 0); + } + catch (IOException e) + { + throw handle(e); + } + + } + + protected void writeBytes(byte[] bytes) throws JMSException + { + writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); + } + + protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException + { + writeTypeDiscriminator(BYTEARRAY_TYPE); + writeBytesImpl(bytes, offset, length); + } + + public void writeBytesImpl(final byte[] bytes, final int offset, final int length) throws JMSException + { + try + { + if (bytes == null) + { + _data.writeInt(-1); + } + else + { + _data.writeInt(length); + _data.write(bytes, offset, length); + } + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeBytesRaw(final byte[] bytes, final int offset, final int length) throws JMSException + { + try + { + if (bytes != null) + { + _data.write(bytes, offset, length); + } + } + catch (IOException e) + { + throw handle(e); + } + } + + + protected void writeObject(Object object) throws JMSException + { + Class clazz; + + if (object == null) + { + // string handles the output of null values + clazz = String.class; + } + else + { + clazz = object.getClass(); + } + + if (clazz == Byte.class) + { + writeByte((Byte) object); + } + else if (clazz == Boolean.class) + { + writeBoolean((Boolean) object); + } + else if (clazz == byte[].class) + { + writeBytes((byte[]) object); + } + else if (clazz == Short.class) + { + writeShort((Short) object); + } + else if (clazz == Character.class) + { + writeChar((Character) object); + } + else if (clazz == Integer.class) + { + writeInt((Integer) object); + } + else if (clazz == Long.class) + { + writeLong((Long) object); + } + else if (clazz == Float.class) + { + writeFloat((Float) object); + } + else if (clazz == Double.class) + { + writeDouble((Double) object); + } + else if (clazz == String.class) + { + writeString((String) object); + } + else + { + throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); + } + } + + public ByteBuffer getData() + { + return ByteBuffer.wrap(_baos.toByteArray()); + } + + public void writeLengthPrefixedUTF(final String string) throws JMSException + { + try + { + CharsetEncoder encoder = UTF8.newEncoder(); + java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string)); + + writeShortImpl((short) encodedString.limit()); + while(encodedString.hasRemaining()) + { + _data.writeByte(encodedString.get()); + } + } + catch (CharacterCodingException e) + { + JMSException jmse = new JMSException("Unable to encode string: " + e); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; + } + catch (IOException e) + { + throw handle(e); + } + + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java index 685e646d85..ce87a112c9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java @@ -87,9 +87,9 @@ public class UnprocessedMessage_0_8 extends UnprocessedMessage public void receiveBody(ContentBody body) { - if (body.payload != null) + if (body._payload != null) { - final long payloadSize = body.payload.remaining(); + final long payloadSize = body._payload.length; if (_bodies == null) { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 6d6cd9cae5..624cf67593 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.client.protocol; +import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -524,7 +526,7 @@ public class AMQProtocolHandler implements ProtocolEngine public synchronized void writeFrame(AMQDataBlock frame, boolean wait) { - final ByteBuffer buf = frame.toNioByteBuffer(); + final ByteBuffer buf = asByteBuffer(frame); _writtenBytes += buf.remaining(); _sender.send(buf); _sender.flush(); @@ -547,6 +549,39 @@ public class AMQProtocolHandler implements ProtocolEngine } + private ByteBuffer asByteBuffer(AMQDataBlock block) + { + final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize()); + + try + { + block.writePayload(new DataOutputStream(new OutputStream() + { + + + @Override + public void write(int b) throws IOException + { + buf.put((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + buf.put(b, off, len); + } + })); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + + buf.flip(); + return buf; + } + + /** * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to * calling getProtocolSession().write() then waiting for the response. diff --git a/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java b/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java index 93a4fb39f5..a12e4ce977 100644 --- a/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.client.util; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.io.ObjectOutputStream; -import java.lang.reflect.Proxy; import java.util.Arrays; import java.util.List; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.test.utils.QpidTestCase; public class ClassLoadingAwareObjectInputStreamTest extends QpidTestCase @@ -37,16 +37,15 @@ public class ClassLoadingAwareObjectInputStreamTest extends QpidTestCase protected void setUp() throws Exception { //Create a viable input stream for instantiating the CLA OIS - ByteBuffer buf = ByteBuffer.allocate(10); - buf.setAutoExpand(true); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream out = new ObjectOutputStream(buf.asOutputStream()); + ObjectOutputStream out = new ObjectOutputStream(baos); out.writeObject("testString"); out.flush(); out.close(); - buf.rewind(); - _in = buf.asInputStream(); + + _in = new ByteArrayInputStream(baos.toByteArray()); _claOIS = new ClassLoadingAwareObjectInputStream(_in); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java index 177dccea7e..e37970e9a2 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java @@ -45,8 +45,6 @@ public class ObjectMessageUnitTest extends QpidTestCase _om.setObject(true); //make the message readable - _om.reset(); - Object object = _om.getObject(); assertTrue("Unexpected type returned", object instanceof Boolean); @@ -61,8 +59,6 @@ public class ObjectMessageUnitTest extends QpidTestCase _om.setObject("test string"); //make the message readable - _om.reset(); - Object object = _om.getObject(); assertTrue("Unexpected type returned", object instanceof String); @@ -87,7 +83,6 @@ public class ObjectMessageUnitTest extends QpidTestCase list.add(0); //make the message readable - _om.reset(); //retrieve the Object Object object = _om.getObject(); |
