diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-03-17 16:44:47 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-03-17 16:44:47 +0000 |
| commit | ba09630a4258cded77842e1bd5d746b8fbda0cfe (patch) | |
| tree | da5fd4e29ce839185c6759a1a141fb2d65f0250c /qpid/java/client | |
| parent | 1533a95469482a820d3f883c44e7e92fa02c5eb3 (diff) | |
| download | qpid-python-ba09630a4258cded77842e1bd5d746b8fbda0cfe.tar.gz | |
QPID-4000 : [Java Broker] Add conversion of 0-x messages to 1-0 subscriptions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1457482 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
6 files changed, 212 insertions, 1122 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index b0320d0f4e..6ffa051ff8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -20,13 +20,16 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.AMQException; - +import java.io.EOFException; +import java.nio.ByteBuffer; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.MessageEOFException; import javax.jms.MessageFormatException; -import java.nio.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesContentWriter; +import org.apache.qpid.typedmessage.TypedBytesFormatException; public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesMessage { @@ -100,7 +103,14 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM private void checkAvailable(final int i) throws MessageEOFException { - _typedBytesContentReader.checkAvailable(1); + try + { + _typedBytesContentReader.checkAvailable(1); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } } public byte readByte() throws JMSException @@ -178,7 +188,14 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM // we check only for one byte since theoretically the string could be only a // single byte when using UTF-8 encoding - return _typedBytesContentReader.readLengthPrefixedUTF(); + try + { + return _typedBytesContentReader.readLengthPrefixedUTF(); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public int readBytes(byte[] bytes) throws JMSException @@ -275,7 +292,14 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM public void writeUTF(String string) throws JMSException { checkWritable(); - _typedBytesContentWriter.writeLengthPrefixedUTF(string); + try + { + _typedBytesContentWriter.writeLengthPrefixedUTF(string); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public void writeBytes(byte[] bytes) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index e18ed80f6d..0b05179215 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -20,18 +20,21 @@ */ package org.apache.qpid.client.message; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.AMQException; - -import javax.jms.JMSException; -import javax.jms.MessageFormatException; +import java.io.EOFException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import org.apache.qpid.AMQException; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesContentWriter; +import org.apache.qpid.typedmessage.TypedBytesFormatException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMessage { @@ -455,9 +458,22 @@ public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMe final int entries = reader.readIntImpl(); for (int i = 0; i < entries; i++) { - String propName = reader.readStringImpl(); - Object value = reader.readObject(); - _map.put(propName, value); + String propName = null; + try + { + propName = reader.readStringImpl(); + Object value = reader.readObject(); + _map.put(propName, value); + + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } } } else @@ -477,7 +493,14 @@ public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMe { writer.writeNullTerminatedStringImpl(entry.getKey()); - writer.writeObject(entry.getValue()); + try + { + writer.writeObject(entry.getValue()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } return writer.getData(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index b1af262580..223facbb59 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -20,11 +20,16 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.AMQException; - +import java.io.EOFException; +import java.nio.ByteBuffer; import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; import javax.jms.StreamMessage; -import java.nio.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesContentWriter; +import org.apache.qpid.typedmessage.TypedBytesFormatException; /** * @author Apache Software Foundation @@ -95,20 +100,53 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea public boolean readBoolean() throws JMSException { checkReadable(); - return _typedBytesContentReader.readBoolean(); + try + { + return _typedBytesContentReader.readBoolean(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public byte readByte() throws JMSException { checkReadable(); - return _typedBytesContentReader.readByte(); + try + { + return _typedBytesContentReader.readByte(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public short readShort() throws JMSException { checkReadable(); - return _typedBytesContentReader.readShort(); + try + { + return _typedBytesContentReader.readShort(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } /** @@ -120,37 +158,103 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea public char readChar() throws JMSException { checkReadable(); - return _typedBytesContentReader.readChar(); + try + { + return _typedBytesContentReader.readChar(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public int readInt() throws JMSException { checkReadable(); - return _typedBytesContentReader.readInt(); + try + { + return _typedBytesContentReader.readInt(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public long readLong() throws JMSException { checkReadable(); - return _typedBytesContentReader.readLong(); + try + { + return _typedBytesContentReader.readLong(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public float readFloat() throws JMSException { checkReadable(); - return _typedBytesContentReader.readFloat(); + try + { + return _typedBytesContentReader.readFloat(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public double readDouble() throws JMSException { checkReadable(); - return _typedBytesContentReader.readDouble(); + try + { + return _typedBytesContentReader.readDouble(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public String readString() throws JMSException { checkReadable(); - return _typedBytesContentReader.readString(); + try + { + return _typedBytesContentReader.readString(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public int readBytes(byte[] bytes) throws JMSException @@ -161,14 +265,36 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea } checkReadable(); - return _typedBytesContentReader.readBytes(bytes); + try + { + return _typedBytesContentReader.readBytes(bytes); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public Object readObject() throws JMSException { checkReadable(); - return _typedBytesContentReader.readObject(); + try + { + return _typedBytesContentReader.readObject(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public void writeBoolean(boolean b) throws JMSException @@ -240,6 +366,13 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea public void writeObject(Object object) throws JMSException { checkWritable(); - _typedBytesContentWriter.writeObject(object); + try + { + _typedBytesContentWriter.writeObject(object); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java deleted file mode 100644 index 26a0b41cdc..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.message; - -public interface TypedBytesCodes -{ - static final byte BOOLEAN_TYPE = (byte) 1; - - static final byte BYTE_TYPE = (byte) 2; - - static final byte BYTEARRAY_TYPE = (byte) 3; - - static final byte SHORT_TYPE = (byte) 4; - - static final byte CHAR_TYPE = (byte) 5; - - static final byte INT_TYPE = (byte) 6; - - static final byte LONG_TYPE = (byte) 7; - - static final byte FLOAT_TYPE = (byte) 8; - - static final byte DOUBLE_TYPE = (byte) 9; - - static final byte STRING_TYPE = (byte) 10; - - static final byte NULL_STRING_TYPE = (byte) 11; -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java deleted file mode 100644 index b00ac7e34b..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java +++ /dev/null @@ -1,674 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.message; - -import javax.jms.JMSException; -import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; -import javax.jms.MessageNotReadableException; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; - -class TypedBytesContentReader implements TypedBytesCodes -{ - - private final ByteBuffer _data; - private final int _position; - private final int _limit; - - - private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); - - private final CharsetDecoder _charsetDecoder = UTF8_CHARSET.newDecoder(); - - private int _byteArrayRemaining = -1; - - - public TypedBytesContentReader(final ByteBuffer data) - { - _data = data.duplicate(); - _position = _data.position(); - _limit = _data.limit(); - } - - /** - * Check that there is at least a certain number of bytes available to read - * - * @param len the number of bytes - * @throws javax.jms.MessageEOFException if there are less than len bytes available to read - */ - protected void checkAvailable(int len) throws MessageEOFException - { - if (_data.remaining() < len) - { - throw new MessageEOFException("Unable to read " + len + " bytes"); - } - } - - protected byte readWireType() throws MessageFormatException, MessageEOFException, - MessageNotReadableException - { - checkAvailable(1); - return _data.get(); - } - - protected boolean readBoolean() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - boolean result; - try - { - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Boolean.parseBoolean(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - boolean readBooleanImpl() - { - return _data.get() != 0; - } - - protected byte readByte() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - byte result; - try - { - switch (wireType) - { - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Byte.parseByte(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - return result; - } - - byte readByteImpl() - { - return _data.get(); - } - - protected short readShort() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - short result; - try - { - switch (wireType) - { - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Short.parseShort(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a short"); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - return result; - } - - short readShortImpl() - { - return _data.getShort(); - } - - /** - * Note that this method reads a unicode character as two bytes from the stream - * - * @return the character read from the stream - * @throws javax.jms.JMSException - */ - protected char readChar() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - try - { - if (wireType == NULL_STRING_TYPE) - { - throw new NullPointerException(); - } - - if (wireType != CHAR_TYPE) - { - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a char"); - } - else - { - checkAvailable(2); - return readCharImpl(); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - char readCharImpl() - { - return _data.getChar(); - } - - protected int readInt() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - int result; - try - { - switch (wireType) - { - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Integer.parseInt(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to an int"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected int readIntImpl() - { - return _data.getInt(); - } - - protected long readLong() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - long result; - try - { - switch (wireType) - { - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Long.parseLong(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a long"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - long readLongImpl() - { - return _data.getLong(); - } - - protected float readFloat() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - float result; - try - { - switch (wireType) - { - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Float.parseFloat(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a float"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - float readFloatImpl() - { - return _data.getFloat(); - } - - protected double readDouble() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - double result; - try - { - switch (wireType) - { - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Double.parseDouble(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a double"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - double readDoubleImpl() - { - return _data.getDouble(); - } - - protected String readString() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - String result; - try - { - switch (wireType) - { - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - case NULL_STRING_TYPE: - result = null; - throw new NullPointerException("data is null"); - case BOOLEAN_TYPE: - checkAvailable(1); - result = String.valueOf(readBooleanImpl()); - break; - case LONG_TYPE: - checkAvailable(8); - result = String.valueOf(readLongImpl()); - break; - case INT_TYPE: - checkAvailable(4); - result = String.valueOf(readIntImpl()); - break; - case SHORT_TYPE: - checkAvailable(2); - result = String.valueOf(readShortImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = String.valueOf(readByteImpl()); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = String.valueOf(readFloatImpl()); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = String.valueOf(readDoubleImpl()); - break; - case CHAR_TYPE: - checkAvailable(2); - result = String.valueOf(readCharImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a String"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected String readStringImpl() throws JMSException - { - try - { - _charsetDecoder.reset(); - ByteBuffer dup = _data.duplicate(); - int pos = _data.position(); - byte b; - while((b = _data.get()) != 0) {}; - dup.limit(_data.position()-1); - return _charsetDecoder.decode(dup).toString(); - - } - catch (CharacterCodingException e) - { - JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - } - - protected int readBytes(byte[] bytes) throws JMSException - { - if (bytes == null) - { - throw new IllegalArgumentException("byte array must not be null"); - } - // first call - if (_byteArrayRemaining == -1) - { - // type discriminator checked separately so you get a MessageFormatException rather than - // an EOF even in the case where both would be applicable - checkAvailable(1); - byte wireType = readWireType(); - if (wireType != BYTEARRAY_TYPE) - { - throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); - } - checkAvailable(4); - int size = _data.getInt(); - // length of -1 indicates null - if (size == -1) - { - return -1; - } - else - { - if (size > _data.remaining()) - { - throw new MessageEOFException("Byte array has stated length " - + size - + " but message only contains " - + - _data.remaining() - + " bytes"); - } - else - { - _byteArrayRemaining = size; - } - } - } - else if (_byteArrayRemaining == 0) - { - _byteArrayRemaining = -1; - return -1; - } - - int returnedSize = readBytesImpl(bytes); - if (returnedSize < bytes.length) - { - _byteArrayRemaining = -1; - } - return returnedSize; - } - - private int readBytesImpl(byte[] bytes) - { - int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); - _byteArrayRemaining -= count; - - if (count == 0) - { - return 0; - } - else - { - _data.get(bytes, 0, count); - return count; - } - } - - protected Object readObject() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - Object result = null; - try - { - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case BYTEARRAY_TYPE: - checkAvailable(4); - int size = _data.getInt(); - if (size == -1) - { - result = null; - } - else - { - _byteArrayRemaining = size; - byte[] bytesResult = new byte[size]; - readBytesImpl(bytesResult); - result = bytesResult; - } - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case CHAR_TYPE: - checkAvailable(2); - result = readCharImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case NULL_STRING_TYPE: - result = null; - break; - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - public void reset() - { - _byteArrayRemaining = -1; - _data.position(_position); - _data.limit(_limit); - } - - public ByteBuffer getData() - { - ByteBuffer buf = _data.duplicate(); - buf.position(_position); - buf.limit(_limit); - return buf; - } - - public long size() - { - return _limit - _position; - } - - public int remaining() - { - return _data.remaining(); - } - - public void readRawBytes(final byte[] bytes, final int offset, final int count) - { - _data.get(bytes, offset, count); - } - - public String readLengthPrefixedUTF() throws JMSException - { - try - { - short length = readShortImpl(); - if(length == 0) - { - return ""; - } - else - { - _charsetDecoder.reset(); - ByteBuffer encodedString = _data.slice(); - encodedString.limit(length); - _data.position(_data.position()+length); - CharBuffer string = _charsetDecoder.decode(encodedString); - - return string.toString(); - } - } - catch(CharacterCodingException e) - { - JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java deleted file mode 100644 index 7c91db3a32..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java +++ /dev/null @@ -1,370 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.message; - -import javax.jms.JMSException; -import javax.jms.MessageFormatException; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetEncoder; - -class TypedBytesContentWriter implements TypedBytesCodes -{ - private final ByteArrayOutputStream _baos = new ByteArrayOutputStream(); - private final DataOutputStream _data = new DataOutputStream(_baos); - private static final Charset UTF8 = Charset.forName("UTF-8"); - - protected void writeTypeDiscriminator(byte type) throws JMSException - { - try - { - _data.writeByte(type); - } - catch (IOException e) - { - throw handle(e); - } - } - - private JMSException handle(final IOException e) - { - JMSException jmsEx = new JMSException("Unable to write value: " + e.getMessage()); - jmsEx.setLinkedException(e); - return jmsEx; - } - - - protected void writeBoolean(boolean b) throws JMSException - { - writeTypeDiscriminator(BOOLEAN_TYPE); - writeBooleanImpl(b); - } - - public void writeBooleanImpl(final boolean b) throws JMSException - { - try - { - _data.writeByte(b ? (byte) 1 : (byte) 0); - } - catch (IOException e) - { - throw handle(e); - } - } - - protected void writeByte(byte b) throws JMSException - { - writeTypeDiscriminator(BYTE_TYPE); - writeByteImpl(b); - } - - public void writeByteImpl(final byte b) throws JMSException - { - try - { - _data.writeByte(b); - } - catch (IOException e) - { - throw handle(e); - } - } - - protected void writeShort(short i) throws JMSException - { - writeTypeDiscriminator(SHORT_TYPE); - writeShortImpl(i); - } - - public void writeShortImpl(final short i) throws JMSException - { - try - { - _data.writeShort(i); - } - catch (IOException e) - { - throw handle(e); - } - } - - protected void writeChar(char c) throws JMSException - { - writeTypeDiscriminator(CHAR_TYPE); - writeCharImpl(c); - } - - public void writeCharImpl(final char c) throws JMSException - { - try - { - _data.writeChar(c); - } - catch (IOException e) - { - throw handle(e); - } - } - - protected void writeInt(int i) throws JMSException - { - writeTypeDiscriminator(INT_TYPE); - writeIntImpl(i); - } - - protected void writeIntImpl(int i) throws JMSException - { - try - { - _data.writeInt(i); - } - catch (IOException e) - { - throw handle(e); - } - } - - protected void writeLong(long l) throws JMSException - { - writeTypeDiscriminator(LONG_TYPE); - writeLongImpl(l); - } - - public void writeLongImpl(final long l) throws JMSException - { - try - { - _data.writeLong(l); - } - catch (IOException e) - { - throw handle(e); - } - } - - protected void writeFloat(float v) throws JMSException - { - writeTypeDiscriminator(FLOAT_TYPE); - writeFloatImpl(v); - } - - public void writeFloatImpl(final float v) throws JMSException - { - try - { - _data.writeFloat(v); - } - catch (IOException e) - { - throw handle(e); - } - } - - protected void writeDouble(double v) throws JMSException - { - writeTypeDiscriminator(DOUBLE_TYPE); - writeDoubleImpl(v); - } - - public void writeDoubleImpl(final double v) throws JMSException - { - try - { - _data.writeDouble(v); - } - catch (IOException e) - { - throw handle(e); - } - } - - protected void writeString(String string) throws JMSException - { - if (string == null) - { - writeTypeDiscriminator(NULL_STRING_TYPE); - } - else - { - writeTypeDiscriminator(STRING_TYPE); - writeNullTerminatedStringImpl(string); - } - } - - protected void writeNullTerminatedStringImpl(String string) - throws JMSException - { - try - { - _data.write(string.getBytes(UTF8)); - _data.writeByte((byte) 0); - } - catch (IOException e) - { - throw handle(e); - } - - } - - protected void writeBytes(byte[] bytes) throws JMSException - { - writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); - } - - protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException - { - writeTypeDiscriminator(BYTEARRAY_TYPE); - writeBytesImpl(bytes, offset, length); - } - - public void writeBytesImpl(final byte[] bytes, final int offset, final int length) throws JMSException - { - try - { - if (bytes == null) - { - _data.writeInt(-1); - } - else - { - _data.writeInt(length); - _data.write(bytes, offset, length); - } - } - catch (IOException e) - { - throw handle(e); - } - } - - public void writeBytesRaw(final byte[] bytes, final int offset, final int length) throws JMSException - { - try - { - if (bytes != null) - { - _data.write(bytes, offset, length); - } - } - catch (IOException e) - { - throw handle(e); - } - } - - - protected void writeObject(Object object) throws JMSException - { - Class clazz; - - if (object == null) - { - // string handles the output of null values - clazz = String.class; - } - else - { - clazz = object.getClass(); - } - - if (clazz == Byte.class) - { - writeByte((Byte) object); - } - else if (clazz == Boolean.class) - { - writeBoolean((Boolean) object); - } - else if (clazz == byte[].class) - { - writeBytes((byte[]) object); - } - else if (clazz == Short.class) - { - writeShort((Short) object); - } - else if (clazz == Character.class) - { - writeChar((Character) object); - } - else if (clazz == Integer.class) - { - writeInt((Integer) object); - } - else if (clazz == Long.class) - { - writeLong((Long) object); - } - else if (clazz == Float.class) - { - writeFloat((Float) object); - } - else if (clazz == Double.class) - { - writeDouble((Double) object); - } - else if (clazz == String.class) - { - writeString((String) object); - } - else - { - throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); - } - } - - public ByteBuffer getData() - { - return ByteBuffer.wrap(_baos.toByteArray()); - } - - public void writeLengthPrefixedUTF(final String string) throws JMSException - { - try - { - CharsetEncoder encoder = UTF8.newEncoder(); - java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string)); - - writeShortImpl((short) encodedString.limit()); - while(encodedString.hasRemaining()) - { - _data.writeByte(encodedString.get()); - } - } - catch (CharacterCodingException e) - { - JMSException jmse = new JMSException("Unable to encode string: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - catch (IOException e) - { - throw handle(e); - } - - } -} |
