From 142d35580b326c99a306f6476ff0a0b723db920e Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Fri, 22 Dec 2006 13:06:45 +0000 Subject: git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489644 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/message/AbstractBytesMessage.java | 2 +- .../client/message/AbstractBytesTypedMessage.java | 774 +++++++++++++++++++++ .../apache/qpid/client/message/JMSMapMessage.java | 433 +++++++++--- .../qpid/client/message/JMSStreamMessage.java | 637 +---------------- .../qpid/test/unit/basic/MapMessageTest.java | 5 +- 5 files changed, 1150 insertions(+), 701 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java index 6935cde491..011f7c09ab 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -63,7 +63,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage } } - private void allocateInitialBuffer() + protected void allocateInitialBuffer() { _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE); _data.setAutoExpand(true); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java new file mode 100644 index 0000000000..b941b9aee8 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java @@ -0,0 +1,774 @@ +package org.apache.qpid.client.message; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.AMQException; + +import javax.jms.*; +import java.nio.charset.Charset; +import java.nio.charset.CharacterCodingException; + +/** + * @author Apache Software Foundation + */ +public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage +{ + + protected static final byte BOOLEAN_TYPE = (byte) 1; + + protected static final byte BYTE_TYPE = (byte) 2; + + protected static final byte BYTEARRAY_TYPE = (byte) 3; + + protected static final byte SHORT_TYPE = (byte) 4; + + protected static final byte CHAR_TYPE = (byte) 5; + + protected static final byte INT_TYPE = (byte) 6; + + protected static final byte LONG_TYPE = (byte) 7; + + protected static final byte FLOAT_TYPE = (byte) 8; + + protected static final byte DOUBLE_TYPE = (byte) 9; + + protected static final byte STRING_TYPE = (byte) 10; + + protected static final byte NULL_STRING_TYPE = (byte) 11; + + /** + * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read + * a byte array in multiple chunks, hence this is used to track how much is left to be read + */ + private int _byteArrayRemaining = -1; + + AbstractBytesTypedMessage() + { + this(null); + } + + /** + * Construct a stream message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + AbstractBytesTypedMessage(ByteBuffer data) + { + super(data); // this instanties a content header + } + + + AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) + throws AMQException + { + super(messageNbr, contentHeader, data); + } + + + protected byte readWireType() throws MessageFormatException, MessageEOFException, + MessageNotReadableException + { + checkReadable(); + checkAvailable(1); + return _data.get(); + } + + protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException + { + checkWritable(); + _data.put(type); + _changedData = true; + } + + protected boolean readBoolean() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + boolean result; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Boolean.parseBoolean(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private boolean readBooleanImpl() + { + return _data.get() != 0; + } + + protected byte readByte() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + byte result; + try + { + switch (wireType) + { + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Byte.parseByte(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + private byte readByteImpl() + { + return _data.get(); + } + + protected short readShort() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + short result; + try + { + switch (wireType) + { + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Short.parseShort(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + private short readShortImpl() + { + return _data.getShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws javax.jms.JMSException + */ + protected char readChar() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + try + { + if(wireType == NULL_STRING_TYPE){ + throw new NullPointerException(); + } + + if (wireType != CHAR_TYPE) + { + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + } + else + { + checkAvailable(2); + return readCharImpl(); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private char readCharImpl() + { + return _data.getChar(); + } + + protected int readInt() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + int result; + try + { + switch (wireType) + { + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Integer.parseInt(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected int readIntImpl() + { + return _data.getInt(); + } + + protected long readLong() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + long result; + try + { + switch (wireType) + { + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Long.parseLong(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private long readLongImpl() + { + return _data.getLong(); + } + + protected float readFloat() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + float result; + try + { + switch (wireType) + { + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Float.parseFloat(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private float readFloatImpl() + { + return _data.getFloat(); + } + + protected double readDouble() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + double result; + try + { + switch (wireType) + { + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Double.parseDouble(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private double readDoubleImpl() + { + return _data.getDouble(); + } + + protected String readString() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + String result; + try + { + switch (wireType) + { + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + case NULL_STRING_TYPE: + result = null; + throw new NullPointerException("data is null"); + case BOOLEAN_TYPE: + checkAvailable(1); + result = String.valueOf(readBooleanImpl()); + break; + case LONG_TYPE: + checkAvailable(8); + result = String.valueOf(readLongImpl()); + break; + case INT_TYPE: + checkAvailable(4); + result = String.valueOf(readIntImpl()); + break; + case SHORT_TYPE: + checkAvailable(2); + result = String.valueOf(readShortImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = String.valueOf(readByteImpl()); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = String.valueOf(readFloatImpl()); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = String.valueOf(readDoubleImpl()); + break; + case CHAR_TYPE: + checkAvailable(2); + result = String.valueOf(readCharImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected String readStringImpl() throws JMSException + { + try + { + return _data.getString(Charset.forName("UTF-8").newDecoder()); + } + catch (CharacterCodingException e) + { + JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + je.setLinkedException(e); + throw je; + } + } + + protected int readBytes(byte[] bytes) throws JMSException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + checkReadable(); + // first call + if (_byteArrayRemaining == -1) + { + // type discriminator checked separately so you get a MessageFormatException rather than + // an EOF even in the case where both would be applicable + checkAvailable(1); + byte wireType = readWireType(); + if (wireType != BYTEARRAY_TYPE) + { + throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); + } + checkAvailable(4); + int size = _data.getInt(); + // length of -1 indicates null + if (size == -1) + { + return -1; + } + else + { + if (size > _data.remaining()) + { + throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " + + _data.remaining() + " bytes"); + } + else + { + _byteArrayRemaining = size; + } + } + } + else if (_byteArrayRemaining == 0) + { + _byteArrayRemaining = -1; + return -1; + } + + int returnedSize = readBytesImpl(bytes); + if (returnedSize < bytes.length) + { + _byteArrayRemaining = -1; + } + return returnedSize; + } + + private int readBytesImpl(byte[] bytes) + { + int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); + _byteArrayRemaining -= count; + + if (count == 0) + { + return 0; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + protected Object readObject() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + Object result = null; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case BYTEARRAY_TYPE: + checkAvailable(4); + int size = _data.getInt(); + if (size == -1) + { + result = null; + } + else + { + _byteArrayRemaining = size; + byte[] bytesResult = new byte[size]; + readBytesImpl(bytesResult); + result = bytesResult; + } + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case CHAR_TYPE: + checkAvailable(2); + result = readCharImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case NULL_STRING_TYPE: + result = null; + break; + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected void writeBoolean(boolean b) throws JMSException + { + writeTypeDiscriminator(BOOLEAN_TYPE); + _data.put(b ? (byte) 1 : (byte) 0); + } + + protected void writeByte(byte b) throws JMSException + { + writeTypeDiscriminator(BYTE_TYPE); + _data.put(b); + } + + protected void writeShort(short i) throws JMSException + { + writeTypeDiscriminator(SHORT_TYPE); + _data.putShort(i); + } + + protected void writeChar(char c) throws JMSException + { + writeTypeDiscriminator(CHAR_TYPE); + _data.putChar(c); + } + + protected void writeInt(int i) throws JMSException + { + writeTypeDiscriminator(INT_TYPE); + writeIntImpl(i); + } + + protected void writeIntImpl(int i) + { + _data.putInt(i); + } + + protected void writeLong(long l) throws JMSException + { + writeTypeDiscriminator(LONG_TYPE); + _data.putLong(l); + } + + protected void writeFloat(float v) throws JMSException + { + writeTypeDiscriminator(FLOAT_TYPE); + _data.putFloat(v); + } + + protected void writeDouble(double v) throws JMSException + { + writeTypeDiscriminator(DOUBLE_TYPE); + _data.putDouble(v); + } + + protected void writeString(String string) throws JMSException + { + if (string == null) + { + writeTypeDiscriminator(NULL_STRING_TYPE); + } + else + { + writeTypeDiscriminator(STRING_TYPE); + try + { + writeStringImpl(string); + } + catch (CharacterCodingException e) + { + JMSException ex = new JMSException("Unable to encode string: " + e); + ex.setLinkedException(e); + throw ex; + } + } + } + + protected void writeStringImpl(String string) + throws CharacterCodingException + { + _data.putString(string, Charset.forName("UTF-8").newEncoder()); + // we must write the null terminator ourselves + _data.put((byte) 0); + } + + protected void writeBytes(byte[] bytes) throws JMSException + { + writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); + } + + protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException + { + writeTypeDiscriminator(BYTEARRAY_TYPE); + if (bytes == null) + { + _data.putInt(-1); + } + else + { + _data.putInt(length); + _data.put(bytes, offset, length); + } + } + + protected void writeObject(Object object) throws JMSException + { + checkWritable(); + Class clazz; + + if (object == null) + { + // string handles the output of null values + clazz = String.class; + } + else + { + clazz = object.getClass(); + } + + if (clazz == Byte.class) + { + writeByte((Byte) object); + } + else if (clazz == Boolean.class) + { + writeBoolean((Boolean) object); + } + else if (clazz == byte[].class) + { + writeBytes((byte[]) object); + } + else if (clazz == Short.class) + { + writeShort((Short) object); + } + else if (clazz == Character.class) + { + writeChar((Character) object); + } + else if (clazz == Integer.class) + { + writeInt((Integer) object); + } + else if (clazz == Long.class) + { + writeLong((Long) object); + } + else if (clazz == Float.class) + { + writeFloat((Float) object); + } + else if (clazz == Double.class) + { + writeDouble((Double) object); + } + else if (clazz == String.class) + { + writeString((String) object); + } + else + { + throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index a93c4d5d6d..88e78a1dad 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -22,23 +22,23 @@ package org.apache.qpid.client.message; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.EncodingUtils; -import org.apache.qpid.framing.JMSPropertyFieldTable; -import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; import javax.jms.JMSException; -import java.util.Enumeration; +import javax.jms.MessageFormatException; +import java.util.*; +import java.nio.charset.Charset; +import java.nio.charset.CharacterCodingException; -public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessage +public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage { private static final Logger _logger = Logger.getLogger(JMSMapMessage.class); public static final String MIME_TYPE = "jms/map-message"; - private JMSPropertyFieldTable _properties; + private Map _map = new HashMap(); JMSMapMessage() throws JMSException { @@ -48,45 +48,30 @@ public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessa JMSMapMessage(ByteBuffer data) throws JMSException { super(data); // this instantiates a content header - _properties = new JMSPropertyFieldTable(); + populateMapFromData(); } + JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException { super(messageNbr, contentHeader, data); - - if (data != null) + try { - - long tableSize = EncodingUtils.readInteger(_data); - try - { - _properties = new JMSPropertyFieldTable(_data, tableSize); - } - catch (JMSException e) - { - Exception error = e.getLinkedException(); - if (error instanceof AMQFrameDecodingException) - { - throw(AMQFrameDecodingException) error; - } - else - { - throw new AMQException(e.getMessage(), e); - } - } - } - else + populateMapFromData(); + } + catch (JMSException je) { - _properties = new JMSPropertyFieldTable(); + throw new AMQException("Error populating MapMessage from ByteBuffer", je); + } + } public String toBodyString() throws JMSException { - return _properties.toString(); + return _map.toString(); } public String getMimeType() @@ -98,161 +83,437 @@ public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessa public ByteBuffer getData() { //What if _data is null? - _properties.writeToBuffer(_data); + writeMapToData(); return super.getData(); } + + @Override public void clearBodyImpl() throws JMSException { super.clearBodyImpl(); - _properties.clear(); + _map.clear(); } - public boolean getBoolean(String string) throws JMSException + public boolean getBoolean(String propName) throws JMSException { - return _properties.getBoolean(string); + Object value = _map.get(propName); + + if(value instanceof Boolean) + { + return ((Boolean)value).booleanValue(); + } + else if((value instanceof String) || (value == null)) + { + return Boolean.valueOf((String)value); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + + value.getClass().getName() + " cannot be converted to boolean."); + } + } - public byte getByte(String string) throws JMSException + public byte getByte(String propName) throws JMSException { - return _properties.getByte(string); + Object value = _map.get(propName); + + if(value instanceof Byte) + { + return ((Byte)value).byteValue(); + } + else if((value instanceof String) || (value==null)) + { + return Byte.valueOf((String)value).byteValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + + value.getClass().getName() + " cannot be converted to byte."); + } } - public short getShort(String string) throws JMSException + public short getShort(String propName) throws JMSException { - return _properties.getShort(string); + Object value = _map.get(propName); + + if(value instanceof Short) + { + return ((Short)value).shortValue(); + } + else if(value instanceof Byte) + { + return ((Byte)value).shortValue(); + } + else if((value instanceof String) || (value==null)) + { + return Short.valueOf((String)value).shortValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + + value.getClass().getName() + " cannot be converted to short."); + } + } - public char getChar(String string) throws JMSException + + public int getInt(String propName) throws JMSException { - Character result = _properties.getCharacter(string); + Object value = _map.get(propName); - if (result == null) + if(value instanceof Integer) + { + return ((Integer)value).intValue(); + } + else if(value instanceof Short) + { + return ((Short)value).intValue(); + } + else if(value instanceof Byte) { - throw new NullPointerException("getChar couldn't find " + string + " item."); + return ((Byte)value).intValue(); + } + else if((value instanceof String) || (value==null)) + { + return Integer.valueOf((String)value).intValue(); } else { - return result; + throw new MessageFormatException("Property " + propName + " of type " + + value.getClass().getName() + " cannot be converted to int."); } + } - public int getInt(String string) throws JMSException + public long getLong(String propName) throws JMSException { - return _properties.getInteger(string); + Object value = _map.get(propName); + + if(value instanceof Long) + { + return ((Long)value).longValue(); + } + else if(value instanceof Integer) + { + return ((Integer)value).longValue(); + } + if(value instanceof Short) + { + return ((Short)value).longValue(); + } + if(value instanceof Byte) + { + return ((Byte)value).longValue(); + } + else if((value instanceof String) || (value==null)) + { + return Long.valueOf((String)value).longValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + + value.getClass().getName() + " cannot be converted to long."); + } + } - public long getLong(String string) throws JMSException + public char getChar(String propName) throws JMSException { - return _properties.getLong(string); + Object value = _map.get(propName); + + if(!_map.containsKey(propName)) + { + throw new MessageFormatException("Property " + propName + " not present"); + } + else if(value instanceof Character) + { + return ((Character)value).charValue(); + } + else if (value == null) + { + throw new NullPointerException("Property " + propName + " has null value and therefore cannot " + + "be converted to char."); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + + value.getClass().getName() + " cannot be converted to boolan."); + } + } - public float getFloat(String string) throws JMSException + + + public float getFloat(String propName) throws JMSException { - return _properties.getFloat(string); + Object value = _map.get(propName); + + if(value instanceof Float) + { + return ((Float)value).floatValue(); + } + else if((value instanceof String) || (value==null)) + { + return Float.valueOf((String)value).floatValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + + value.getClass().getName() + " cannot be converted to float."); + } } - public double getDouble(String string) throws JMSException + public double getDouble(String propName) throws JMSException { - return _properties.getDouble(string); + Object value = _map.get(propName); + + if(value instanceof Double) + { + return ((Double)value).doubleValue(); + } + else if(value instanceof Float) + { + return ((Float)value).doubleValue(); + } + else if((value instanceof String) || (value==null)) + { + return Double.valueOf((String)value).doubleValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + + value.getClass().getName() + " cannot be converted to double."); + } } - public String getString(String string) throws JMSException + public String getString(String propName) throws JMSException { - return _properties.getString(string); + Object value = _map.get(propName); + + if((value instanceof String) || (value == null)) + { + return (String) value; + } + else if(value instanceof byte[]) + { + throw new MessageFormatException("Property " + propName + " of type byte[] " + + "cannot be converted to String."); + } + else + { + return value.toString(); + } + } - public byte[] getBytes(String string) throws JMSException + public byte[] getBytes(String propName) throws JMSException { - return _properties.getBytes(string); + Object value = _map.get(propName); + + if(!_map.containsKey(propName)) + { + throw new MessageFormatException("Property " + propName + " not present"); + } + else if((value instanceof byte[]) || (value == null)) + { + return (byte[])value; + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + + value.getClass().getName() + " cannot be converted to byte[]."); + } } - public Object getObject(String string) throws JMSException + public Object getObject(String propName) throws JMSException { - return _properties.getObject(string); + return _map.get(propName); } public Enumeration getMapNames() throws JMSException { - return _properties.getMapNames(); + return Collections.enumeration(_map.keySet()); } - public void setBoolean(String string, boolean b) throws JMSException + public void setBoolean(String propName, boolean b) throws JMSException { checkWritable(); - _properties.setBoolean(string, b); + checkPropertyName(propName); + _map.put(propName, b); } - public void setByte(String string, byte b) throws JMSException + public void setByte(String propName, byte b) throws JMSException { checkWritable(); - _properties.setByte(string, b); + checkPropertyName(propName); + _map.put(propName, b); } - public void setShort(String string, short i) throws JMSException + public void setShort(String propName, short i) throws JMSException { checkWritable(); - _properties.setShort(string, i); + checkPropertyName(propName); + _map.put(propName, i); } - public void setChar(String string, char c) throws JMSException + public void setChar(String propName, char c) throws JMSException { checkWritable(); - _properties.setChar(string, c); + checkPropertyName(propName); + _map.put(propName, c); } - public void setInt(String string, int i) throws JMSException + public void setInt(String propName, int i) throws JMSException { checkWritable(); - _properties.setInteger(string, i); + checkPropertyName(propName); + _map.put(propName, i); } - public void setLong(String string, long l) throws JMSException + public void setLong(String propName, long l) throws JMSException { checkWritable(); - _properties.setLong(string, l); + checkPropertyName(propName); + _map.put(propName, l); } - public void setFloat(String string, float v) throws JMSException + public void setFloat(String propName, float v) throws JMSException { checkWritable(); - _properties.setFloat(string, v); + checkPropertyName(propName); + _map.put(propName, v); } - public void setDouble(String string, double v) throws JMSException + public void setDouble(String propName, double v) throws JMSException { checkWritable(); - _properties.setDouble(string, v); + checkPropertyName(propName); + _map.put(propName, v); } - public void setString(String string, String string1) throws JMSException + public void setString(String propName, String string1) throws JMSException { checkWritable(); - _properties.setString(string, string1); + checkPropertyName(propName); + _map.put(propName, string1); } - public void setBytes(String string, byte[] bytes) throws JMSException + public void setBytes(String propName, byte[] bytes) throws JMSException { - this.setBytes(string, bytes, 0, bytes.length); + checkWritable(); + checkPropertyName(propName); + _map.put(propName, bytes); } - public void setBytes(String string, byte[] bytes, int i, int i1) throws JMSException + public void setBytes(String propName, byte[] bytes, int offset, int length) throws JMSException { + if((offset == 0) && (length == bytes.length)) + { + setBytes(propName,bytes); + } + else + { + byte[] newBytes = new byte[length]; + System.arraycopy(bytes,offset,newBytes,0,length); + setBytes(propName,newBytes); + } + } + + public void setObject(String propName, Object value) throws JMSException + { checkWritable(); - _properties.setBytes(string, bytes, i, i1); + checkPropertyName(propName); + if(value instanceof Boolean + || value instanceof Byte + || value instanceof Short + || value instanceof Integer + || value instanceof Long + || value instanceof Character + || value instanceof Float + || value instanceof Double + || value instanceof String + || value instanceof byte[] + || value == null) + { + _map.put(propName, value); + } + else + { + throw new MessageFormatException("Cannot set property " + propName + " to value " + value + + "of type " + value.getClass().getName() + "."); + } } - public void setObject(String string, Object object) throws JMSException + private void checkPropertyName(String propName) { - checkWritable(); - _properties.setObject(string, object); + if(propName == null || propName.equals("")) + { + throw new IllegalArgumentException("Property name cannot be null, or the empty String."); + } + } + + public boolean itemExists(String propName) throws JMSException + { + return _map.containsKey(propName); + } + + + private void populateMapFromData() throws JMSException + { + if(_data != null) + { + _data.rewind(); + + final int entries = readIntImpl(); + for(int i = 0; i < entries; i++) + { + String propName = readStringImpl(); + Object value = readObject(); + _map.put(propName,value); + } + } + else + { + _map.clear(); + } } - public boolean itemExists(String string) throws JMSException + private void writeMapToData() { - return _properties.itemExists(string); + allocateInitialBuffer(); + final int size = _map.size(); + writeIntImpl(size); + for(Map.Entry entry : _map.entrySet()) + { + try + { + writeStringImpl(entry.getKey()); + } + catch (CharacterCodingException e) + { + throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(),e); + + + } + try + { + writeObject(entry.getValue()); + } + catch (JMSException e) + { + Object value = entry.getValue(); + throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() + + " value : " + value + " (type: " + value.getClass().getName() + ").",e); + } + } + } + + + } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index 6709ff802d..972a5fc8bf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -31,31 +31,10 @@ import java.nio.charset.Charset; /** * @author Apache Software Foundation */ -public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage +public class JMSStreamMessage extends AbstractBytesTypedMessage implements StreamMessage { public static final String MIME_TYPE="jms/stream-message"; - private static final byte BOOLEAN_TYPE = (byte) 1; - - private static final byte BYTE_TYPE = (byte) 2; - - private static final byte BYTEARRAY_TYPE = (byte) 3; - - private static final byte SHORT_TYPE = (byte) 4; - - private static final byte CHAR_TYPE = (byte) 5; - - private static final byte INT_TYPE = (byte) 6; - - private static final byte LONG_TYPE = (byte) 7; - - private static final byte FLOAT_TYPE = (byte) 8; - - private static final byte DOUBLE_TYPE = (byte) 9; - - private static final byte STRING_TYPE = (byte) 10; - - private static final byte NULL_STRING_TYPE = (byte) 11; /** * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read @@ -97,128 +76,22 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess return MIME_TYPE; } - private byte readWireType() throws MessageFormatException, MessageEOFException, - MessageNotReadableException - { - checkReadable(); - checkAvailable(1); - return _data.get(); - } - private void writeTypeDiscriminator(byte type) throws MessageNotWriteableException - { - checkWritable(); - _data.put(type); - _changedData = true; - } public boolean readBoolean() throws JMSException { - int position = _data.position(); - byte wireType = readWireType(); - boolean result; - try - { - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Boolean.parseBoolean(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } + return super.readBoolean(); } - private boolean readBooleanImpl() - { - return _data.get() != 0; - } public byte readByte() throws JMSException { - int position = _data.position(); - byte wireType = readWireType(); - byte result; - try - { - switch (wireType) - { - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Byte.parseByte(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - return result; - } - - private byte readByteImpl() - { - return _data.get(); + return super.readByte(); } public short readShort() throws JMSException { - int position = _data.position(); - byte wireType = readWireType(); - short result; - try - { - switch (wireType) - { - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Short.parseShort(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a short"); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - return result; - } - - private short readShortImpl() - { - return _data.getShort(); + return super.readShort(); } /** @@ -229,564 +102,102 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess */ public char readChar() throws JMSException { - int position = _data.position(); - byte wireType = readWireType(); - try - { - if(wireType == NULL_STRING_TYPE){ - throw new NullPointerException(); - } - - if (wireType != CHAR_TYPE) - { - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a char"); - } - else - { - checkAvailable(2); - return readCharImpl(); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private char readCharImpl() - { - return _data.getChar(); + return super.readChar(); } public int readInt() throws JMSException { - int position = _data.position(); - byte wireType = readWireType(); - int result; - try - { - switch (wireType) - { - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Integer.parseInt(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to an int"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private int readIntImpl() - { - return _data.getInt(); + return super.readInt(); } public long readLong() throws JMSException { - int position = _data.position(); - byte wireType = readWireType(); - long result; - try - { - switch (wireType) - { - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Long.parseLong(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a long"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private long readLongImpl() - { - return _data.getLong(); + return super.readLong(); } public float readFloat() throws JMSException { - int position = _data.position(); - byte wireType = readWireType(); - float result; - try - { - switch (wireType) - { - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Float.parseFloat(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a float"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private float readFloatImpl() - { - return _data.getFloat(); + return super.readFloat(); } public double readDouble() throws JMSException { - int position = _data.position(); - byte wireType = readWireType(); - double result; - try - { - switch (wireType) - { - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Double.parseDouble(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a double"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private double readDoubleImpl() - { - return _data.getDouble(); + return super.readDouble(); } public String readString() throws JMSException { - int position = _data.position(); - byte wireType = readWireType(); - String result; - try - { - switch (wireType) - { - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - case NULL_STRING_TYPE: - result = null; - throw new NullPointerException("data is null"); - case BOOLEAN_TYPE: - checkAvailable(1); - result = String.valueOf(readBooleanImpl()); - break; - case LONG_TYPE: - checkAvailable(8); - result = String.valueOf(readLongImpl()); - break; - case INT_TYPE: - checkAvailable(4); - result = String.valueOf(readIntImpl()); - break; - case SHORT_TYPE: - checkAvailable(2); - result = String.valueOf(readShortImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = String.valueOf(readByteImpl()); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = String.valueOf(readFloatImpl()); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = String.valueOf(readDoubleImpl()); - break; - case CHAR_TYPE: - checkAvailable(2); - result = String.valueOf(readCharImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a String"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private String readStringImpl() throws JMSException - { - try - { - return _data.getString(Charset.forName("UTF-8").newDecoder()); - } - catch (CharacterCodingException e) - { - JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e); - je.setLinkedException(e); - throw je; - } + return super.readString(); } public int readBytes(byte[] bytes) throws JMSException { - if (bytes == null) - { - throw new IllegalArgumentException("byte array must not be null"); - } - checkReadable(); - // first call - if (_byteArrayRemaining == -1) - { - // type discriminator checked separately so you get a MessageFormatException rather than - // an EOF even in the case where both would be applicable - checkAvailable(1); - byte wireType = readWireType(); - if (wireType != BYTEARRAY_TYPE) - { - throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); - } - checkAvailable(4); - int size = _data.getInt(); - // size of -1 indicates null - if (size == -1) - { - return -1; - } - else - { - if (size > _data.remaining()) - { - throw new MessageEOFException("Byte array has stated size " + size + " but message only contains " + - _data.remaining() + " bytes"); - } - else - { - _byteArrayRemaining = size; - } - } - } - else if (_byteArrayRemaining == 0) - { - _byteArrayRemaining = -1; - return -1; - } - - int returnedSize = readBytesImpl(bytes); - if (returnedSize < bytes.length) - { - _byteArrayRemaining = -1; - } - return returnedSize; + return super.readBytes(bytes); } - private int readBytesImpl(byte[] bytes) - { - int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); - _byteArrayRemaining -= count; - - if (count == 0) - { - return 0; - } - else - { - _data.get(bytes, 0, count); - return count; - } - } public Object readObject() throws JMSException { - int position = _data.position(); - byte wireType = readWireType(); - Object result = null; - try - { - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case BYTEARRAY_TYPE: - checkAvailable(4); - int size = _data.getInt(); - if (size == -1) - { - result = null; - } - else - { - _byteArrayRemaining = size; - result = new byte[size]; - readBytesImpl(new byte[size]); - } - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case CHAR_TYPE: - checkAvailable(2); - result = readCharImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case NULL_STRING_TYPE: - result = null; - break; - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } + return super.readObject(); } public void writeBoolean(boolean b) throws JMSException { - writeTypeDiscriminator(BOOLEAN_TYPE); - _data.put(b ? (byte) 1 : (byte) 0); + super.writeBoolean(b); } public void writeByte(byte b) throws JMSException { - writeTypeDiscriminator(BYTE_TYPE); - _data.put(b); + super.writeByte(b); } public void writeShort(short i) throws JMSException { - writeTypeDiscriminator(SHORT_TYPE); - _data.putShort(i); + super.writeShort(i); } public void writeChar(char c) throws JMSException { - writeTypeDiscriminator(CHAR_TYPE); - _data.putChar(c); + super.writeChar(c); } public void writeInt(int i) throws JMSException { - writeTypeDiscriminator(INT_TYPE); - _data.putInt(i); + super.writeInt(i); } public void writeLong(long l) throws JMSException { - writeTypeDiscriminator(LONG_TYPE); - _data.putLong(l); + super.writeLong(l); } public void writeFloat(float v) throws JMSException { - writeTypeDiscriminator(FLOAT_TYPE); - _data.putFloat(v); + super.writeFloat(v); } public void writeDouble(double v) throws JMSException { - writeTypeDiscriminator(DOUBLE_TYPE); - _data.putDouble(v); + super.writeDouble(v); } public void writeString(String string) throws JMSException { - if (string == null) - { - writeTypeDiscriminator(NULL_STRING_TYPE); - } - else - { - writeTypeDiscriminator(STRING_TYPE); - try - { - _data.putString(string, Charset.forName("UTF-8").newEncoder()); - // we must write the null terminator ourselves - _data.put((byte) 0); - } - catch (CharacterCodingException e) - { - JMSException ex = new JMSException("Unable to encode string: " + e); - ex.setLinkedException(e); - throw ex; - } - } + super.writeString(string); } public void writeBytes(byte[] bytes) throws JMSException { - writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); + super.writeBytes(bytes); } public void writeBytes(byte[] bytes, int offset, int length) throws JMSException { - writeTypeDiscriminator(BYTEARRAY_TYPE); - if (bytes == null) - { - _data.putInt(-1); - } - else - { - _data.putInt(length); - _data.put(bytes, offset, length); - } + super.writeBytes(bytes,offset,length); } public void writeObject(Object object) throws JMSException { - checkWritable(); - Class clazz = null; - if (object == null) - { - // string handles the output of null values - clazz = String.class; - } - else - { - clazz = object.getClass(); - } - - if (clazz == Byte.class) - { - writeByte((Byte) object); - } - else if (clazz == Boolean.class) - { - writeBoolean((Boolean) object); - } - else if (clazz == byte[].class) - { - writeBytes((byte[]) object); - } - else if (clazz == Short.class) - { - writeShort((Short) object); - } - else if (clazz == Character.class) - { - writeChar((Character) object); - } - else if (clazz == Integer.class) - { - writeInt((Integer) object); - } - else if (clazz == Long.class) - { - writeLong((Long) object); - } - else if (clazz == Float.class) - { - writeFloat((Float) object); - } - else if (clazz == Double.class) - { - writeDouble((Double) object); - } - else if (clazz == String.class) - { - writeString((String) object); - } - else - { - throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); - } + super.writeObject(object); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java index 02a98f67d9..bc2def1c64 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java @@ -81,7 +81,7 @@ public class MapMessageTest extends TestCase implements MessageListener { _connection = connection; _destination = destination; - _session = (AMQSession) connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //set up a slow consumer _session.createConsumer(destination).setMessageListener(this); @@ -1112,6 +1112,9 @@ public class MapMessageTest extends TestCase implements MessageListener } } + + + private void testMapValues(JMSMapMessage m, int count) throws JMSException { //Test get -- cgit v1.2.1