diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-13 13:02:48 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-13 13:02:48 +0000 |
| commit | 34d325ebcb1fedcf0ef6c658bd7b742965d55e56 (patch) | |
| tree | 61fe4bae8ddab149175d286f59de40e7d03e1f04 /java | |
| parent | 01652ef4ec4d7a92bfdcf677c27fb28968398bc6 (diff) | |
| download | qpid-python-34d325ebcb1fedcf0ef6c658bd7b742965d55e56.tar.gz | |
updated JMS messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@565336 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
28 files changed, 3510 insertions, 4564 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index 5f2e7cbda7..88def8a807 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -22,7 +22,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpidity.jms.message.*; import org.apache.qpidity.QpidException; import org.apache.qpidity.RangeSet; -import org.apache.qpidity.Range; +import org.apache.qpid.client.message.*; import javax.jms.*; import javax.jms.IllegalStateException; diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractBytesMessage.java deleted file mode 100644 index 10cfadf903..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractBytesMessage.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.jms.message; - -import java.io.IOException; -import java.nio.charset.Charset; - -import javax.jms.JMSException; -import javax.jms.MessageEOFException; - -import org.apache.mina.common.ByteBuffer; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; - -/** - * @author Apache Software Foundation - */ -public abstract class AbstractBytesMessage extends AbstractJMSMessage -{ - - /** - * The default initial size of the buffer. The buffer expands automatically. - */ - private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024; - - AbstractBytesMessage() - { - this(null); - } - - /** - * Construct a bytes message with existing data. - * - * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - * set to auto expand - */ - AbstractBytesMessage(ByteBuffer data) - { - super(data); // this instanties a content header - getContentHeaderProperties().setContentType(getMimeTypeAsShortString()); - - if (_data == null) - { - allocateInitialBuffer(); - } - } - - protected void allocateInitialBuffer() - { - _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE); - _data.setAutoExpand(true); - } - - AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException - { - // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea - super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data); - getContentHeaderProperties().setContentType(getMimeTypeAsShortString()); - } - - public void clearBodyImpl() throws JMSException - { - allocateInitialBuffer(); - } - - public String toBodyString() throws JMSException - { - checkReadable(); - try - { - return getText(); - } - catch (IOException e) - { - JMSException jmse = new JMSException(e.toString()); - jmse.setLinkedException(e); - throw jmse; - } - } - - /** - * We reset the stream before and after reading the data. This means that toString() will always output - * the entire message and also that the caller can then immediately start reading as if toString() had - * never been called. - * - * @return - * @throws IOException - */ - private String getText() throws IOException - { - // this will use the default platform encoding - if (_data == null) - { - return null; - } - - int pos = _data.position(); - _data.rewind(); - // one byte left is for the end of frame marker - if (_data.remaining() == 0) - { - // this is really redundant since pos must be zero - _data.position(pos); - - return null; - } - else - { - String data = _data.getString(Charset.forName("UTF8").newDecoder()); - _data.position(pos); - - return data; - } - } - - /** - * Check that there is at least a certain number of bytes available to read - * - * @param len the number of bytes - * @throws javax.jms.MessageEOFException if there are less than len bytes available to read - */ - protected void checkAvailable(int len) throws MessageEOFException - { - if (_data.remaining() < len) - { - throw new MessageEOFException("Unable to read " + len + " bytes"); - } - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractBytesTypedMessage.java deleted file mode 100644 index 9e205eb679..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractBytesTypedMessage.java +++ /dev/null @@ -1,801 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ - -package org.apache.qpidity.jms.message; - -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; - -import javax.jms.JMSException; -import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; - -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; - -/** - * @author Apache Software Foundation - */ -public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage -{ - - protected static final byte BOOLEAN_TYPE = (byte) 1; - - protected static final byte BYTE_TYPE = (byte) 2; - - protected static final byte BYTEARRAY_TYPE = (byte) 3; - - protected static final byte SHORT_TYPE = (byte) 4; - - protected static final byte CHAR_TYPE = (byte) 5; - - protected static final byte INT_TYPE = (byte) 6; - - protected static final byte LONG_TYPE = (byte) 7; - - protected static final byte FLOAT_TYPE = (byte) 8; - - protected static final byte DOUBLE_TYPE = (byte) 9; - - protected static final byte STRING_TYPE = (byte) 10; - - protected static final byte NULL_STRING_TYPE = (byte) 11; - - /** - * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read - * a byte array in multiple chunks, hence this is used to track how much is left to be read - */ - private int _byteArrayRemaining = -1; - - AbstractBytesTypedMessage() - { - this(null); - } - - /** - * Construct a stream message with existing data. - * - * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - * set to auto expand - */ - AbstractBytesTypedMessage(ByteBuffer data) - { - super(data); // this instanties a content header - } - - - AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException - { - super(messageNbr, contentHeader, exchange, routingKey, data); - } - - - protected byte readWireType() throws MessageFormatException, MessageEOFException, - MessageNotReadableException - { - checkReadable(); - checkAvailable(1); - return _data.get(); - } - - protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException - { - checkWritable(); - _data.put(type); - _changedData = true; - } - - protected boolean readBoolean() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - boolean result; - try - { - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Boolean.parseBoolean(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private boolean readBooleanImpl() - { - return _data.get() != 0; - } - - protected byte readByte() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - byte result; - try - { - switch (wireType) - { - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Byte.parseByte(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - return result; - } - - private byte readByteImpl() - { - return _data.get(); - } - - protected short readShort() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - short result; - try - { - switch (wireType) - { - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Short.parseShort(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a short"); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - return result; - } - - private short readShortImpl() - { - return _data.getShort(); - } - - /** - * Note that this method reads a unicode character as two bytes from the stream - * - * @return the character read from the stream - * @throws javax.jms.JMSException - */ - protected char readChar() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - try - { - if(wireType == NULL_STRING_TYPE){ - throw new NullPointerException(); - } - - if (wireType != CHAR_TYPE) - { - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a char"); - } - else - { - checkAvailable(2); - return readCharImpl(); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private char readCharImpl() - { - return _data.getChar(); - } - - protected int readInt() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - int result; - try - { - switch (wireType) - { - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Integer.parseInt(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to an int"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected int readIntImpl() - { - return _data.getInt(); - } - - protected long readLong() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - long result; - try - { - switch (wireType) - { - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Long.parseLong(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a long"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private long readLongImpl() - { - return _data.getLong(); - } - - protected float readFloat() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - float result; - try - { - switch (wireType) - { - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Float.parseFloat(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a float"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private float readFloatImpl() - { - return _data.getFloat(); - } - - protected double readDouble() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - double result; - try - { - switch (wireType) - { - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Double.parseDouble(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a double"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private double readDoubleImpl() - { - return _data.getDouble(); - } - - protected String readString() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - String result; - try - { - switch (wireType) - { - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - case NULL_STRING_TYPE: - result = null; - throw new NullPointerException("data is null"); - case BOOLEAN_TYPE: - checkAvailable(1); - result = String.valueOf(readBooleanImpl()); - break; - case LONG_TYPE: - checkAvailable(8); - result = String.valueOf(readLongImpl()); - break; - case INT_TYPE: - checkAvailable(4); - result = String.valueOf(readIntImpl()); - break; - case SHORT_TYPE: - checkAvailable(2); - result = String.valueOf(readShortImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = String.valueOf(readByteImpl()); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = String.valueOf(readFloatImpl()); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = String.valueOf(readDoubleImpl()); - break; - case CHAR_TYPE: - checkAvailable(2); - result = String.valueOf(readCharImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a String"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected String readStringImpl() throws JMSException - { - try - { - return _data.getString(Charset.forName("UTF-8").newDecoder()); - } - catch (CharacterCodingException e) - { - JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e); - je.setLinkedException(e); - throw je; - } - } - - protected int readBytes(byte[] bytes) throws JMSException - { - if (bytes == null) - { - throw new IllegalArgumentException("byte array must not be null"); - } - checkReadable(); - // first call - if (_byteArrayRemaining == -1) - { - // type discriminator checked separately so you get a MessageFormatException rather than - // an EOF even in the case where both would be applicable - checkAvailable(1); - byte wireType = readWireType(); - if (wireType != BYTEARRAY_TYPE) - { - throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); - } - checkAvailable(4); - int size = _data.getInt(); - // length of -1 indicates null - if (size == -1) - { - return -1; - } - else - { - if (size > _data.remaining()) - { - throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " + - _data.remaining() + " bytes"); - } - else - { - _byteArrayRemaining = size; - } - } - } - else if (_byteArrayRemaining == 0) - { - _byteArrayRemaining = -1; - return -1; - } - - int returnedSize = readBytesImpl(bytes); - if (returnedSize < bytes.length) - { - _byteArrayRemaining = -1; - } - return returnedSize; - } - - private int readBytesImpl(byte[] bytes) - { - int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); - _byteArrayRemaining -= count; - - if (count == 0) - { - return 0; - } - else - { - _data.get(bytes, 0, count); - return count; - } - } - - protected Object readObject() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - Object result = null; - try - { - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case BYTEARRAY_TYPE: - checkAvailable(4); - int size = _data.getInt(); - if (size == -1) - { - result = null; - } - else - { - _byteArrayRemaining = size; - byte[] bytesResult = new byte[size]; - readBytesImpl(bytesResult); - result = bytesResult; - } - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case CHAR_TYPE: - checkAvailable(2); - result = readCharImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case NULL_STRING_TYPE: - result = null; - break; - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected void writeBoolean(boolean b) throws JMSException - { - writeTypeDiscriminator(BOOLEAN_TYPE); - _data.put(b ? (byte) 1 : (byte) 0); - } - - protected void writeByte(byte b) throws JMSException - { - writeTypeDiscriminator(BYTE_TYPE); - _data.put(b); - } - - protected void writeShort(short i) throws JMSException - { - writeTypeDiscriminator(SHORT_TYPE); - _data.putShort(i); - } - - protected void writeChar(char c) throws JMSException - { - writeTypeDiscriminator(CHAR_TYPE); - _data.putChar(c); - } - - protected void writeInt(int i) throws JMSException - { - writeTypeDiscriminator(INT_TYPE); - writeIntImpl(i); - } - - protected void writeIntImpl(int i) - { - _data.putInt(i); - } - - protected void writeLong(long l) throws JMSException - { - writeTypeDiscriminator(LONG_TYPE); - _data.putLong(l); - } - - protected void writeFloat(float v) throws JMSException - { - writeTypeDiscriminator(FLOAT_TYPE); - _data.putFloat(v); - } - - protected void writeDouble(double v) throws JMSException - { - writeTypeDiscriminator(DOUBLE_TYPE); - _data.putDouble(v); - } - - protected void writeString(String string) throws JMSException - { - if (string == null) - { - writeTypeDiscriminator(NULL_STRING_TYPE); - } - else - { - writeTypeDiscriminator(STRING_TYPE); - try - { - writeStringImpl(string); - } - catch (CharacterCodingException e) - { - JMSException ex = new JMSException("Unable to encode string: " + e); - ex.setLinkedException(e); - throw ex; - } - } - } - - protected void writeStringImpl(String string) - throws CharacterCodingException - { - _data.putString(string, Charset.forName("UTF-8").newEncoder()); - // we must write the null terminator ourselves - _data.put((byte) 0); - } - - protected void writeBytes(byte[] bytes) throws JMSException - { - writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); - } - - protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException - { - writeTypeDiscriminator(BYTEARRAY_TYPE); - if (bytes == null) - { - _data.putInt(-1); - } - else - { - _data.putInt(length); - _data.put(bytes, offset, length); - } - } - - protected void writeObject(Object object) throws JMSException - { - checkWritable(); - Class clazz; - - if (object == null) - { - // string handles the output of null values - clazz = String.class; - } - else - { - clazz = object.getClass(); - } - - if (clazz == Byte.class) - { - writeByte((Byte) object); - } - else if (clazz == Boolean.class) - { - writeBoolean((Boolean) object); - } - else if (clazz == byte[].class) - { - writeBytes((byte[]) object); - } - else if (clazz == Short.class) - { - writeShort((Short) object); - } - else if (clazz == Character.class) - { - writeChar((Character) object); - } - else if (clazz == Integer.class) - { - writeInt((Integer) object); - } - else if (clazz == Long.class) - { - writeLong((Long) object); - } - else if (clazz == Float.class) - { - writeFloat((Float) object); - } - else if (clazz == Double.class) - { - writeDouble((Double) object); - } - else if (clazz == String.class) - { - writeString((String) object); - } - else - { - throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); - } - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractJMSMessage.java deleted file mode 100644 index 795556295e..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractJMSMessage.java +++ /dev/null @@ -1,669 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.jms.message; - -import org.apache.commons.collections.map.ReferenceMap; - -import org.apache.mina.common.ByteBuffer; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.*; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.BindingURL; -import org.apache.qpid.url.URLSyntaxException; - -import javax.jms.*; - -import java.util.Collections; -import java.util.Enumeration; -import java.util.Map; -import java.util.UUID; - -public abstract class AbstractJMSMessage extends QpidMessage implements Message -{ - private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); - - protected boolean _redelivered; - - protected ByteBuffer _data; - private boolean _readableProperties = false; - protected boolean _readableMessage = false; - protected boolean _changedData; - private Destination _destination; - private JMSHeaderAdapter _headerAdapter; - private BasicMessageConsumer _consumer; - private boolean _strictAMQP; - - protected AbstractJMSMessage(ByteBuffer data) - { - //super(new BasicContentHeaderProperties()); - _data = data; - if (_data != null) - { - _data.acquire(); - } - - _readableProperties = false; - _readableMessage = (data != null); - _changedData = (data == null); - // _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); - - _strictAMQP = - Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); - } - - protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException - { - this(contentHeader, deliveryTag); - - Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName()); - - AMQDestination dest; - - if (AMQDestination.QUEUE_TYPE.equals(type)) - { - dest = new AMQQueue(exchange, routingKey, routingKey); - } - else if (AMQDestination.TOPIC_TYPE.equals(type)) - { - dest = new AMQTopic(exchange, routingKey, null); - } - else - { - dest = new AMQUndefinedDestination(exchange, routingKey, null); - } - // Destination dest = AMQDestination.createDestination(url); - setJMSDestination(dest); - - _data = data; - if (_data != null) - { - _data.acquire(); - } - - _readableMessage = data != null; - - } - - protected AbstractJMSMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) - { - // super(contentHeader, deliveryTag); - // _readableProperties = (_contentHeaderProperties != null); - // _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); - } - - public String getJMSMessageID() throws JMSException - { - if (getContentHeaderProperties().getMessageIdAsString() == null) - { - getContentHeaderProperties().setMessageId("ID:" + UUID.randomUUID()); - } - - return getContentHeaderProperties().getMessageIdAsString(); - } - - public void setJMSMessageID(String messageId) throws JMSException - { - getContentHeaderProperties().setMessageId(messageId); - } - - public long getJMSTimestamp() throws JMSException - { - return getContentHeaderProperties().getTimestamp(); - } - - public void setJMSTimestamp(long timestamp) throws JMSException - { - getContentHeaderProperties().setTimestamp(timestamp); - } - - public byte[] getJMSCorrelationIDAsBytes() throws JMSException - { - return getContentHeaderProperties().getCorrelationIdAsString().getBytes(); - } - - public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException - { - getContentHeaderProperties().setCorrelationId(new String(bytes)); - } - - public void setJMSCorrelationID(String correlationId) throws JMSException - { - getContentHeaderProperties().setCorrelationId(correlationId); - } - - public String getJMSCorrelationID() throws JMSException - { - return getContentHeaderProperties().getCorrelationIdAsString(); - } - - public Destination getJMSReplyTo() throws JMSException - { - String replyToEncoding = getContentHeaderProperties().getReplyToAsString(); - if (replyToEncoding == null) - { - return null; - } - else - { - Destination dest = (Destination) _destinationCache.get(replyToEncoding); - if (dest == null) - { - try - { - BindingURL binding = new AMQBindingURL(replyToEncoding); - dest = AMQDestination.createDestination(binding); - } - catch (URLSyntaxException e) - { - throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e); - } - - _destinationCache.put(replyToEncoding, dest); - } - - return dest; - } - } - - public void setJMSReplyTo(Destination destination) throws JMSException - { - if (destination == null) - { - throw new IllegalArgumentException("Null destination not allowed"); - } - - if (!(destination instanceof AMQDestination)) - { - throw new IllegalArgumentException( - "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); - } - - final AMQDestination amqd = (AMQDestination) destination; - - final AMQShortString encodedDestination = amqd.getEncodedName(); - _destinationCache.put(encodedDestination, destination); - getContentHeaderProperties().setReplyTo(encodedDestination); - } - - public Destination getJMSDestination() throws JMSException - { - return _destination; - } - - public void setJMSDestination(Destination destination) - { - _destination = destination; - } - - public int getJMSDeliveryMode() throws JMSException - { - return getContentHeaderProperties().getDeliveryMode(); - } - - public void setJMSDeliveryMode(int i) throws JMSException - { - getContentHeaderProperties().setDeliveryMode((byte) i); - } - - public BasicContentHeaderProperties getContentHeaderProperties() - { - return null; - } - - public boolean getJMSRedelivered() throws JMSException - { - return _redelivered; - } - - public void setJMSRedelivered(boolean b) throws JMSException - { - _redelivered = b; - } - - public String getJMSType() throws JMSException - { - return getContentHeaderProperties().getTypeAsString(); - } - - public void setJMSType(String string) throws JMSException - { - getContentHeaderProperties().setType(string); - } - - public long getJMSExpiration() throws JMSException - { - return getContentHeaderProperties().getExpiration(); - } - - public void setJMSExpiration(long l) throws JMSException - { - getContentHeaderProperties().setExpiration(l); - } - - public int getJMSPriority() throws JMSException - { - return getContentHeaderProperties().getPriority(); - } - - public void setJMSPriority(int i) throws JMSException - { - getContentHeaderProperties().setPriority((byte) i); - } - - public void clearProperties() throws JMSException - { - getJmsHeaders().clear(); - - _readableProperties = false; - } - - public void clearBody() throws JMSException - { - clearBodyImpl(); - _readableMessage = false; - } - - public boolean propertyExists(AMQShortString propertyName) throws JMSException - { - return getJmsHeaders().propertyExists(propertyName); - } - - public boolean propertyExists(String propertyName) throws JMSException - { - return getJmsHeaders().propertyExists(propertyName); - } - - public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getBoolean(propertyName); - } - - public boolean getBooleanProperty(String propertyName) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getBoolean(propertyName); - } - - public byte getByteProperty(String propertyName) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getByte(propertyName); - } - - public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getBytes(propertyName); - } - - public short getShortProperty(String propertyName) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getShort(propertyName); - } - - public int getIntProperty(String propertyName) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getInteger(propertyName); - } - - public long getLongProperty(String propertyName) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getLong(propertyName); - } - - public float getFloatProperty(String propertyName) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getFloat(propertyName); - } - - public double getDoubleProperty(String propertyName) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getDouble(propertyName); - } - - public String getStringProperty(String propertyName) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getString(propertyName); - } - - public Object getObjectProperty(String propertyName) throws JMSException - { - return getJmsHeaders().getObject(propertyName); - } - - public Enumeration getPropertyNames() throws JMSException - { - return getJmsHeaders().getPropertyNames(); - } - - public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - checkWritableProperties(); - getJmsHeaders().setBoolean(propertyName, b); - } - - public void setBooleanProperty(String propertyName, boolean b) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - checkWritableProperties(); - getJmsHeaders().setBoolean(propertyName, b); - } - - public void setByteProperty(String propertyName, byte b) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - checkWritableProperties(); - getJmsHeaders().setByte(propertyName, new Byte(b)); - } - - public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - checkWritableProperties(); - getJmsHeaders().setBytes(propertyName, bytes); - } - - public void setShortProperty(String propertyName, short i) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - checkWritableProperties(); - getJmsHeaders().setShort(propertyName, new Short(i)); - } - - public void setIntProperty(String propertyName, int i) throws JMSException - { - checkWritableProperties(); - JMSHeaderAdapter.checkPropertyName(propertyName); - // super.setIntProperty(new AMQShortString(propertyName), new Integer(i)); - } - - public void setLongProperty(String propertyName, long l) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - checkWritableProperties(); - getJmsHeaders().setLong(propertyName, new Long(l)); - } - - public void setFloatProperty(String propertyName, float f) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - checkWritableProperties(); - getJmsHeaders().setFloat(propertyName, new Float(f)); - } - - public void setDoubleProperty(String propertyName, double v) throws JMSException - { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - checkWritableProperties(); - getJmsHeaders().setDouble(propertyName, new Double(v)); - } - - public void setStringProperty(String propertyName, String value) throws JMSException - { - checkWritableProperties(); - JMSHeaderAdapter.checkPropertyName(propertyName); - // super.setLongStringProperty(new AMQShortString(propertyName), value); - } - - public void setObjectProperty(String propertyName, Object object) throws JMSException - { - checkWritableProperties(); - getJmsHeaders().setObject(propertyName, object); - } - - protected void removeProperty(AMQShortString propertyName) throws JMSException - { - getJmsHeaders().remove(propertyName); - } - - protected void removeProperty(String propertyName) throws JMSException - { - getJmsHeaders().remove(propertyName); - } - - public void acknowledgeThis() throws JMSException - { - // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge - // is not specified. In our case, we only set the session field where client acknowledge mode is specified. - - } - - public void acknowledge() throws JMSException - { - - } - - /** - * This forces concrete classes to implement clearBody() - * - * @throws JMSException - */ - public abstract void clearBodyImpl() throws JMSException; - - /** - * Get a String representation of the body of the message. Used in the toString() method which outputs this before - * message properties. - */ - public abstract String toBodyString() throws JMSException; - - public String getMimeType() - { - return getMimeTypeAsShortString().toString(); - } - - public abstract AMQShortString getMimeTypeAsShortString(); - - public String toString() - { - try - { - StringBuffer buf = new StringBuffer("Body:\n"); - buf.append(toBodyString()); - buf.append("\nJMS Correlation ID: ").append(getJMSCorrelationID()); - buf.append("\nJMS timestamp: ").append(getJMSTimestamp()); - buf.append("\nJMS expiration: ").append(getJMSExpiration()); - buf.append("\nJMS priority: ").append(getJMSPriority()); - buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode()); - buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo())); - buf.append("\nJMS Redelivered: ").append(_redelivered); - buf.append("\nJMS Destination: ").append(getJMSDestination()); - buf.append("\nJMS Type: ").append(getJMSType()); - buf.append("\nJMS MessageID: ").append(getJMSMessageID()); - // buf.append("\nAMQ message number: ").append(_deliveryTag); - - buf.append("\nProperties:"); - if (getJmsHeaders().isEmpty()) - { - buf.append("<NONE>"); - } - else - { - buf.append('\n').append(getJmsHeaders().getHeaders()); - } - - return buf.toString(); - } - catch (JMSException e) - { - return e.toString(); - } - } - - public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties) - { - getContentHeaderProperties().setHeaders(messageProperties); - } - - public JMSHeaderAdapter getJmsHeaders() - { - return _headerAdapter; - } - - public ByteBuffer getData() - { - // make sure we rewind the data just in case any method has moved the - // position beyond the start - if (_data != null) - { - reset(); - } - - return _data; - } - - protected void checkReadable() throws MessageNotReadableException - { - if (!_readableMessage) - { - throw new MessageNotReadableException("You need to call reset() to make the message readable"); - } - } - - protected void checkWritable() throws MessageNotWriteableException - { - if (_readableMessage) - { - throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); - } - } - - protected void checkWritableProperties() throws MessageNotWriteableException - { - if (_readableProperties) - { - throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable"); - } - } - - public boolean isReadable() - { - return _readableMessage; - } - - public boolean isWritable() - { - return !_readableMessage; - } - - public void reset() - { - if (!_changedData) - { - _data.rewind(); - } - else - { - _data.flip(); - _changedData = false; - } - } - - public void setConsumer(BasicMessageConsumer basicMessageConsumer) - { - _consumer = basicMessageConsumer; - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractJMSMessageFactory.java deleted file mode 100644 index 020fb299fd..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractJMSMessageFactory.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.jms.message; - -import org.apache.mina.common.ByteBuffer; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.JMSException; - -import java.util.Iterator; -import java.util.List; - -public abstract class AbstractJMSMessageFactory implements MessageFactory -{ - private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class); - - protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data, AMQShortString exchange, - AMQShortString routingKey, ContentHeaderBody contentHeader) throws AMQException; - - protected AbstractJMSMessage createMessageWithBody(long messageNbr, ContentHeaderBody contentHeader, - AMQShortString exchange, AMQShortString routingKey, List bodies) throws AMQException - { - ByteBuffer data; - final boolean debug = _logger.isDebugEnabled(); - - // we optimise the non-fragmented case to avoid copying - if ((bodies != null) && (bodies.size() == 1)) - { - if (debug) - { - _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize + ")"); - } - - data = ((ContentBody) bodies.get(0)).payload; - } - else if (bodies != null) - { - if (debug) - { - _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize - + ")"); - } - - data = ByteBuffer.allocate((int) contentHeader.bodySize); // XXX: Is cast a problem? - final Iterator it = bodies.iterator(); - while (it.hasNext()) - { - ContentBody cb = (ContentBody) it.next(); - data.put(cb.payload); - cb.payload.release(); - } - - data.flip(); - } - else // bodies == null - { - data = ByteBuffer.allocate(0); - } - - if (debug) - { - _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" - + data.remaining()); - } - - return createMessage(messageNbr, data, exchange, routingKey, contentHeader); - } - - public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader, - AMQShortString exchange, AMQShortString routingKey, List bodies) throws JMSException, AMQException - { - final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies); - msg.setJMSRedelivered(redelivered); - - return msg; - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java new file mode 100644 index 0000000000..9323a03803 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java @@ -0,0 +1,834 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms.message; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpidity.QpidException; + +import javax.jms.*; +import java.io.*; +import java.nio.ByteBuffer; + +/** + * Implements javax.jms.BytesMessage + */ +public class BytesMessageImpl extends MessageImpl implements BytesMessage +{ + /** + * this BytesMessageImpl's logger + */ + private static final Logger _logger = LoggerFactory.getLogger(BytesMessageImpl.class); + + /** + * An input stream for reading this message data + * This stream wrappes the received byteBuffer. + */ + protected DataInputStream _dataIn = null; + + /** + * Used to store written data. + */ + protected ByteArrayOutputStream _storedData = new ByteArrayOutputStream(); + + /** + * DataOutputStream used to write the data + */ + protected DataOutputStream _dataOut = new DataOutputStream(_storedData); + + + /** + * Gets the number of bytes of the message body when the message + * is in read-only mode. + * <p> The value returned is the entire length of the message + * body, regardless of where the pointer for reading the message + * is currently located. + * + * @return Number of bytes in the message + * @throws JMSException If reading the message body length fails due to some error. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + */ + public long getBodyLength() throws JMSException + { + isReadable(); + return getMessageData().capacity(); + } + + /** + * Reads a boolean. + * + * @return The boolean value read + * @throws JMSException If reading fails due to some error. + * @throws MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + */ + public boolean readBoolean() throws JMSException + { + isReadable(); + try + { + return _dataIn.readBoolean(); + } + catch (EOFException e) + { + throw new MessageEOFException("Reach end of data when reading message data"); + } + catch (IOException ioe) + { + throw new JMSException("Problem when reading data", ioe.getLocalizedMessage()); + } + } + + /** + * Reads a signed 8-bit. + * + * @return The signed 8-bit read + * @throws JMSException If reading a signed 8-bit fails due to some error. + * @throws MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + */ + public byte readByte() throws JMSException + { + isReadable(); + try + { + return _dataIn.readByte(); + } + catch (EOFException e) + { + throw new MessageEOFException("Reach end of data when reading message data"); + } + catch (IOException ioe) + { + throw new JMSException("Problem when reading data", ioe.getLocalizedMessage()); + } + } + + /** + * Reads an unsigned 8-bit. + * + * @return The signed 8-bit read + * @throws JMSException If reading an unsigned 8-bit fails due to some error. + * @throws MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + */ + public int readUnsignedByte() throws JMSException + { + isReadable(); + try + { + return _dataIn.readUnsignedByte(); + } + catch (EOFException e) + { + throw new MessageEOFException("Reach end of data when reading message data"); + } + catch (IOException ioe) + { + throw new JMSException("Problem when reading data", ioe.getLocalizedMessage()); + } + } + + /** + * Reads a short. + * + * @return The short read + * @throws JMSException If reading a short fails due to some error. + * @throws MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + */ + public short readShort() throws JMSException + { + isReadable(); + try + { + return _dataIn.readShort(); + } + catch (EOFException e) + { + throw new MessageEOFException("Reach end of data when reading message data"); + } + catch (IOException ioe) + { + throw new JMSException("Problem when reading data", ioe.getLocalizedMessage()); + } + } + + /** + * Reads an unsigned short. + * + * @return The unsigned short read + * @throws JMSException If reading an unsigned short fails due to some error. + * @throws MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + */ + public int readUnsignedShort() throws JMSException + { + isReadable(); + try + { + return _dataIn.readUnsignedShort(); + } + catch (EOFException e) + { + throw new MessageEOFException("Reach end of data when reading message data"); + } + catch (IOException ioe) + { + throw new JMSException("Problem when reading data", ioe.getLocalizedMessage()); + } + } + + /** + * Reads a char. + * + * @return The char read + * @throws JMSException If reading a char fails due to some error. + * @throws MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + */ + public char readChar() throws JMSException + { + isReadable(); + try + { + return _dataIn.readChar(); + } + catch (EOFException e) + { + throw new MessageEOFException("Reach end of data when reading message data"); + } + catch (IOException ioe) + { + throw new JMSException("Problem when reading data", ioe.getLocalizedMessage()); + } + } + + /** + * Reads an int. + * + * @return The int read + * @throws JMSException If reading an int fails due to some error. + * @throws MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + */ + public int readInt() throws JMSException + { + isReadable(); + try + { + return _dataIn.readInt(); + } + catch (EOFException e) + { + throw new MessageEOFException("Reach end of data when reading message data"); + } + catch (IOException ioe) + { + throw new JMSException("Problem when reading data", ioe.getLocalizedMessage()); + } + } + + /** + * Reads a long. + * + * @return The long read + * @throws JMSException If reading a long fails due to some error. + * @throws MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + */ + public long readLong() throws JMSException + { + isReadable(); + try + { + return _dataIn.readLong(); + } + catch (EOFException e) + { + throw new MessageEOFException("Reach end of data when reading message data"); + } + catch (IOException ioe) + { + throw new JMSException("Problem when reading data", ioe.getLocalizedMessage()); + } + } + + /** + * Read a float. + * + * @return The float read + * @throws JMSException If reading a float fails due to some error. + * @throws MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + */ + public float readFloat() throws JMSException + { + isReadable(); + try + { + return _dataIn.readFloat(); + } + catch (EOFException e) + { + throw new MessageEOFException("Reach end of data when reading message data"); + } + catch (IOException ioe) + { + throw new JMSException("Problem when reading data", ioe.getLocalizedMessage()); + } + } + + /** + * Read a double. + * + * @return The double read + * @throws JMSException If reading a double fails due to some error. + * @throws MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + */ + public double readDouble() throws JMSException + { + isReadable(); + try + { + return _dataIn.readDouble(); + } + catch (EOFException e) + { + throw new MessageEOFException("Reach end of data when reading message data"); + } + catch (IOException ioe) + { + throw new JMSException("Problem when reading data", ioe.getLocalizedMessage()); + } + } + + /** + * Reads a string that has been encoded using a modified UTF-8 format. + * + * @return The String read + * @throws JMSException If reading a String fails due to some error. + * @throws MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + */ + public String readUTF() throws JMSException + { + isReadable(); + try + { + return _dataIn.readUTF(); + } + catch (EOFException e) + { + throw new MessageEOFException("Reach end of data when reading message data"); + } + catch (IOException ioe) + { + throw new JMSException("Problem when reading data", ioe.getLocalizedMessage()); + } + } + + /** + * Reads a byte array from the bytes message data. + * <p> JMS sepc says: + * <P>If the length of array <code>bytes</code> is less than the number of + * bytes remaining to be read from the stream, the array should + * be filled. A subsequent call reads the next increment, and so on. + * <P>If the number of bytes remaining in the stream is less than the + * length of + * array <code>bytes</code>, the bytes should be read into the array. + * The return value of the total number of bytes read will be less than + * the length of the array, indicating that there are no more bytes left + * to be read from the stream. The next read of the stream returns -1. + * + * @param b The array into which the data is read. + * @return The total number of bytes read into the buffer, or -1 if + * there is no more data because the end of the stream has been reached + * @throws JMSException If reading a byte array fails due to some error. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + */ + public int readBytes(byte[] b) throws JMSException + { + isReadable(); + try + { + return _dataIn.read(b); + } + catch (IOException ioe) + { + throw new JMSException("Problem when reading data", ioe.getLocalizedMessage()); + } + } + + /** + * Reads a portion of the bytes message data. + * <p> The JMS spec says + * <P>If the length of array <code>b</code> is less than the number of + * bytes remaining to be read from the stream, the array should + * be filled. A subsequent call reads the next increment, and so on. + * <P>If the number of bytes remaining in the stream is less than the + * length of array <code>b</code>, the bytes should be read into the array. + * The return value of the total number of bytes read will be less than + * the length of the array, indicating that there are no more bytes left + * to be read from the stream. The next read of the stream returns -1. + * <p> If <code>length</code> is negative, or + * <code>length</code> is greater than the length of the array + * <code>b</code>, then an <code>IndexOutOfBoundsException</code> is + * thrown. No bytes will be read from the stream for this exception case. + * + * @param b The buffer into which the data is read + * @param length The number of bytes to read; must be less than or equal to length. + * @return The total number of bytes read into the buffer, or -1 if + * there is no more data because the end of the data has been reached + * @throws JMSException If reading a byte array fails due to some error. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + */ + public int readBytes(byte[] b, int length) throws JMSException + { + isReadable(); + try + { + return _dataIn.read(b, 0, length); + } + catch (IOException ioe) + { + throw new JMSException("Problem when reading data", ioe.getLocalizedMessage()); + } + } + + /** + * Writes a boolean to the bytes message. + * + * @param val The boolean value to be written + * @throws JMSException If writting a boolean fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeBoolean(boolean val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeBoolean(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes a byte to the bytes message. + * + * @param val The byte value to be written + * @throws JMSException If writting a byte fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeByte(byte val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeByte(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes a short to the bytes message. + * + * @param val The short value to be written + * @throws JMSException If writting a short fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeShort(short val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeShort(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + + } + + /** + * Writes a char to the bytes message. + * + * @param c The char value to be written + * @throws JMSException If writting a char fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeChar(char c) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeChar(c); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes an int to the bytes message. + * + * @param val The int value to be written + * @throws JMSException If writting an int fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeInt(int val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeInt(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + + } + + /** + * Writes a long to the bytes message. + * + * @param val The long value to be written + * @throws JMSException If writting a long fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeLong(long val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeLong(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes a float to the bytes message. + * + * @param val The float value to be written + * @throws JMSException If writting a float fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeFloat(float val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeFloat(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes a double to the bytes message. + * + * @param val The double value to be written + * @throws JMSException If writting a double fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeDouble(double val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeDouble(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes a string to the bytes message. + * + * @param val The string value to be written + * @throws JMSException If writting a string fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeUTF(String val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeUTF(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + + } + + /** + * Writes a byte array to the bytes message. + * + * @param bytes The byte array value to be written + * @throws JMSException If writting a byte array fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeBytes(byte[] bytes) throws JMSException + { + isWriteable(); + try + { + _dataOut.write(bytes); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes a portion of byte array to the bytes message. + * + * @param val The byte array value to be written + * @throws JMSException If writting a byte array fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeBytes(byte[] val, int offset, int length) throws JMSException + { + isWriteable(); + try + { + _dataOut.write(val, offset, length); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes an Object to the bytes message. + * JMS spec says: + * <p>This method works only for the objectified primitive + * object types Integer, Double, Long, String and byte + * arrays. + * + * @param val The short value to be written + * @throws JMSException If writting a short fails due to some error. + * @throws NullPointerException if the parameter val is null. + * @throws MessageFormatException If the object is of an invalid type. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeObject(Object val) throws JMSException + { + if (val == null) + { + throw new NullPointerException("Cannot write null value to message"); + } + if (val instanceof byte[]) + { + writeBytes((byte[]) val); + } + else if (val instanceof String) + { + writeUTF((String) val); + } + else if (val instanceof Boolean) + { + writeBoolean(((Boolean) val).booleanValue()); + } + else if (val instanceof Number) + { + if (val instanceof Byte) + { + writeByte(((Byte) val).byteValue()); + } + else if (val instanceof Short) + { + writeShort(((Short) val).shortValue()); + } + else if (val instanceof Integer) + { + writeInt(((Integer) val).intValue()); + } + else if (val instanceof Long) + { + writeLong(((Long) val).longValue()); + } + else if (val instanceof Float) + { + writeFloat(((Float) val).floatValue()); + } + else if (val instanceof Double) + { + writeDouble(((Double) val).doubleValue()); + } + else + { + throw new MessageFormatException("Trying to write an invalid obejct type: " + val); + } + } + else if (val instanceof Character) + { + writeChar(((Character) val).charValue()); + } + else + { + throw new MessageFormatException("Trying to write an invalid obejct type: " + val); + } + } + + /** + * Puts the message body in read-only mode and repositions the stream of + * bytes to the beginning. + * + * @throws JMSException If resetting the message fails due to some internal error. + * @throws MessageFormatException If the message has an invalid format. + */ + public void reset() throws JMSException + { + _readOnly = true; + if (_dataIn == null) + { + // We were writting on this messsage so now read it + _dataIn = new DataInputStream(new ByteArrayInputStream(_storedData.toByteArray())); + } + else + { + // We were reading so reset it + try + { + _dataIn.reset(); + } + catch (IOException e) + { + if (_logger.isDebugEnabled()) + { + // we log this exception as this should not happen + _logger.debug("Problem when resetting message: ", e); + } + throw new JMSException("Problem when resetting message: " + e.getLocalizedMessage()); + } + } + } + + //-- overwritten methods + /** + * Clear out the message body. Clearing a message's body does not clear + * its header values or property entries. + * <p>If this message body was read-only, calling this method leaves + * the message body is in the same state as an empty body in a newly + * created message. + * + * @throws JMSException If clearing this message body fails to due to some error. + */ + public void clearBody() throws JMSException + { + super.clearBody(); + _dataIn = null; + _storedData = new ByteArrayOutputStream(); + _dataOut = new DataOutputStream(_storedData); + } + + + /** + * This method is invoked before a message dispatch operation. + * + * @throws org.apache.qpidity.QpidException + * If the destination is not set + */ + public void beforeMessageDispatch() throws QpidException + { + if (_dataOut.size() > 0) + { + setMessageData(ByteBuffer.wrap(_storedData.toByteArray())); + } + super.beforeMessageDispatch(); + } + + /** + * This method is invoked after this message is received. + * + * @throws QpidException + */ + public void afterMessageReceive() throws QpidException + { + super.afterMessageReceive(); + ByteBuffer messageData = getMessageData(); + if (messageData != null) + { + _dataIn = new DataInputStream(new ByteArrayInputStream(messageData.array())); + } + } + + //-- helper mehtods + /** + * Test whether this message is readable by throwing a MessageNotReadableException if this + * message cannot be read. + * + * @throws MessageNotReadableException If this message cannot be read. + */ + protected void isReadable() throws MessageNotReadableException + { + if (_dataIn == null) + { + throw new MessageNotReadableException("Cannot read this message"); + } + } +} + diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSBytesMessage.java deleted file mode 100644 index 06d8e4dd4d..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSBytesMessage.java +++ /dev/null @@ -1,388 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.jms.message; - -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; -import java.nio.charset.CharsetEncoder; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; -import javax.jms.MessageFormatException; - -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; - -public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage -{ - public static final String MIME_TYPE = "application/octet-stream"; - private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); - - - public JMSBytesMessage() - { - this(null); - } - - /** - * Construct a bytes message with existing data. - * - * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - * set to auto expand - */ - JMSBytesMessage(ByteBuffer data) - { - super(data); // this instanties a content header - } - - JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException - { - super(messageNbr, contentHeader, exchange, routingKey, data); - } - - public void reset() - { - super.reset(); - _readableMessage = true; - } - - public AMQShortString getMimeTypeAsShortString() - { - return MIME_TYPE_SHORT_STRING; - } - - public long getBodyLength() throws JMSException - { - checkReadable(); - return _data.limit(); - } - - public boolean readBoolean() throws JMSException - { - checkReadable(); - checkAvailable(1); - return _data.get() != 0; - } - - public byte readByte() throws JMSException - { - checkReadable(); - checkAvailable(1); - return _data.get(); - } - - public int readUnsignedByte() throws JMSException - { - checkReadable(); - checkAvailable(1); - return _data.getUnsigned(); - } - - public short readShort() throws JMSException - { - checkReadable(); - checkAvailable(2); - return _data.getShort(); - } - - public int readUnsignedShort() throws JMSException - { - checkReadable(); - checkAvailable(2); - return _data.getUnsignedShort(); - } - - /** - * Note that this method reads a unicode character as two bytes from the stream - * - * @return the character read from the stream - * @throws JMSException - */ - public char readChar() throws JMSException - { - checkReadable(); - checkAvailable(2); - return _data.getChar(); - } - - public int readInt() throws JMSException - { - checkReadable(); - checkAvailable(4); - return _data.getInt(); - } - - public long readLong() throws JMSException - { - checkReadable(); - checkAvailable(8); - return _data.getLong(); - } - - public float readFloat() throws JMSException - { - checkReadable(); - checkAvailable(4); - return _data.getFloat(); - } - - public double readDouble() throws JMSException - { - checkReadable(); - checkAvailable(8); - return _data.getDouble(); - } - - public String readUTF() throws JMSException - { - checkReadable(); - // we check only for one byte since theoretically the string could be only a - // single byte when using UTF-8 encoding - - try - { - short length = readShort(); - if(length == 0) - { - return ""; - } - else - { - CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); - ByteBuffer encodedString = _data.slice(); - encodedString.limit(length); - _data.position(_data.position()+length); - CharBuffer string = decoder.decode(encodedString.buf()); - - return string.toString(); - } - - - - } - catch (CharacterCodingException e) - { - JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e); - je.setLinkedException(e); - throw je; - } - } - - public int readBytes(byte[] bytes) throws JMSException - { - if (bytes == null) - { - throw new IllegalArgumentException("byte array must not be null"); - } - checkReadable(); - int count = (_data.remaining() >= bytes.length ? bytes.length : _data.remaining()); - if (count == 0) - { - return -1; - } - else - { - _data.get(bytes, 0, count); - return count; - } - } - - public int readBytes(byte[] bytes, int maxLength) throws JMSException - { - if (bytes == null) - { - throw new IllegalArgumentException("byte array must not be null"); - } - if (maxLength > bytes.length) - { - throw new IllegalArgumentException("maxLength must be <= bytes.length"); - } - checkReadable(); - int count = (_data.remaining() >= maxLength ? maxLength : _data.remaining()); - if (count == 0) - { - return -1; - } - else - { - _data.get(bytes, 0, count); - return count; - } - } - - public void writeBoolean(boolean b) throws JMSException - { - checkWritable(); - _changedData = true; - _data.put(b ? (byte) 1 : (byte) 0); - } - - public void writeByte(byte b) throws JMSException - { - checkWritable(); - _changedData = true; - _data.put(b); - } - - public void writeShort(short i) throws JMSException - { - checkWritable(); - _changedData = true; - _data.putShort(i); - } - - public void writeChar(char c) throws JMSException - { - checkWritable(); - _changedData = true; - _data.putChar(c); - } - - public void writeInt(int i) throws JMSException - { - checkWritable(); - _changedData = true; - _data.putInt(i); - } - - public void writeLong(long l) throws JMSException - { - checkWritable(); - _changedData = true; - _data.putLong(l); - } - - public void writeFloat(float v) throws JMSException - { - checkWritable(); - _changedData = true; - _data.putFloat(v); - } - - public void writeDouble(double v) throws JMSException - { - checkWritable(); - _changedData = true; - _data.putDouble(v); - } - - public void writeUTF(String string) throws JMSException - { - checkWritable(); - try - { - CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder(); - java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string)); - - _data.putShort((short)encodedString.limit()); - _data.put(encodedString); - _changedData = true; - //_data.putString(string, Charset.forName("UTF-8").newEncoder()); - // we must add the null terminator manually - //_data.put((byte)0); - } - catch (CharacterCodingException e) - { - JMSException ex = new JMSException("Unable to encode string: " + e); - ex.setLinkedException(e); - throw ex; - } - } - - public void writeBytes(byte[] bytes) throws JMSException - { - checkWritable(); - _data.put(bytes); - _changedData = true; - } - - public void writeBytes(byte[] bytes, int offset, int length) throws JMSException - { - checkWritable(); - _data.put(bytes, offset, length); - _changedData = true; - } - - public void writeObject(Object object) throws JMSException - { - checkWritable(); - if (object == null) - { - throw new NullPointerException("Argument must not be null"); - } - Class clazz = object.getClass(); - if (clazz == Byte.class) - { - writeByte((Byte) object); - } - else if (clazz == Boolean.class) - { - writeBoolean((Boolean) object); - } - else if (clazz == byte[].class) - { - writeBytes((byte[]) object); - } - else if (clazz == Short.class) - { - writeShort((Short) object); - } - else if (clazz == Character.class) - { - writeChar((Character) object); - } - else if (clazz == Integer.class) - { - writeInt((Integer) object); - } - else if (clazz == Long.class) - { - writeLong((Long) object); - } - else if (clazz == Float.class) - { - writeFloat((Float) object); - } - else if (clazz == Double.class) - { - writeDouble((Double) object); - } - else if (clazz == String.class) - { - writeUTF((String) object); - } - else - { - throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); - } - } - - public String toString() - { - return String.valueOf(System.identityHashCode(this)); - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSBytesMessageFactory.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSBytesMessageFactory.java deleted file mode 100644 index 006ebb2c83..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSBytesMessageFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.jms.message; - -import javax.jms.JMSException; - -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; - -public class JMSBytesMessageFactory extends AbstractJMSMessageFactory -{ - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, - AMQShortString exchange, AMQShortString routingKey, - ContentHeaderBody contentHeader) throws AMQException - { - return new JMSBytesMessage(deliveryTag, contentHeader, exchange, routingKey, data); - } - - public AbstractJMSMessage createMessage() throws JMSException - { - return new JMSBytesMessage(); - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSHeaderAdapter.java deleted file mode 100644 index e688b5e2be..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSHeaderAdapter.java +++ /dev/null @@ -1,552 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.jms.message; - -import java.util.Enumeration; - -import javax.jms.JMSException; -import javax.jms.MessageFormatException; - -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQPInvalidClassException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - - -public final class JMSHeaderAdapter -{ - private final FieldTable _headers; - - public JMSHeaderAdapter(FieldTable headers) - { - _headers = headers; - } - - - public FieldTable getHeaders() - { - return _headers; - } - - public boolean getBoolean(String string) throws JMSException - { - checkPropertyName(string); - Boolean b = getHeaders().getBoolean(string); - - if (b == null) - { - if (getHeaders().containsKey(string)) - { - Object str = getHeaders().getObject(string); - - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getBoolean can't use " + string + " item."); - } - else - { - return Boolean.valueOf((String) str); - } - } - else - { - b = Boolean.valueOf(null); - } - } - - return b; - } - - public boolean getBoolean(AMQShortString string) throws JMSException - { - checkPropertyName(string); - Boolean b = getHeaders().getBoolean(string); - - if (b == null) - { - if (getHeaders().containsKey(string)) - { - Object str = getHeaders().getObject(string); - - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getBoolean can't use " + string + " item."); - } - else - { - return Boolean.valueOf((String) str); - } - } - else - { - b = Boolean.valueOf(null); - } - } - - return b; - } - - public char getCharacter(String string) throws JMSException - { - checkPropertyName(string); - Character c = getHeaders().getCharacter(string); - - if (c == null) - { - if (getHeaders().isNullStringValue(string)) - { - throw new NullPointerException("Cannot convert null char"); - } - else - { - throw new MessageFormatException("getChar can't use " + string + " item."); - } - } - else - { - return (char) c; - } - } - - public byte[] getBytes(String string) throws JMSException - { - return getBytes(new AMQShortString(string)); - } - - public byte[] getBytes(AMQShortString string) throws JMSException - { - checkPropertyName(string); - - byte[] bs = getHeaders().getBytes(string); - - if (bs == null) - { - throw new MessageFormatException("getBytes can't use " + string + " item."); - } - else - { - return bs; - } - } - - public byte getByte(String string) throws JMSException - { - checkPropertyName(string); - Byte b = getHeaders().getByte(string); - if (b == null) - { - if (getHeaders().containsKey(string)) - { - Object str = getHeaders().getObject(string); - - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getByte can't use " + string + " item."); - } - else - { - return Byte.valueOf((String) str); - } - } - else - { - b = Byte.valueOf(null); - } - } - - return b; - } - - public short getShort(String string) throws JMSException - { - checkPropertyName(string); - Short s = getHeaders().getShort(string); - - if (s == null) - { - s = Short.valueOf(getByte(string)); - } - - return s; - } - - public int getInteger(String string) throws JMSException - { - checkPropertyName(string); - Integer i = getHeaders().getInteger(string); - - if (i == null) - { - i = Integer.valueOf(getShort(string)); - } - - return i; - } - - public long getLong(String string) throws JMSException - { - checkPropertyName(string); - Long l = getHeaders().getLong(string); - - if (l == null) - { - l = Long.valueOf(getInteger(string)); - } - - return l; - } - - public float getFloat(String string) throws JMSException - { - checkPropertyName(string); - Float f = getHeaders().getFloat(string); - - if (f == null) - { - if (getHeaders().containsKey(string)) - { - Object str = getHeaders().getObject(string); - - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getFloat can't use " + string + " item."); - } - else - { - return Float.valueOf((String) str); - } - } - else - { - f = Float.valueOf(null); - } - - } - - return f; - } - - public double getDouble(String string) throws JMSException - { - checkPropertyName(string); - Double d = getHeaders().getDouble(string); - - if (d == null) - { - d = Double.valueOf(getFloat(string)); - } - - return d; - } - - public String getString(String string) throws JMSException - { - checkPropertyName(string); - String s = getHeaders().getString(string); - - if (s == null) - { - if (getHeaders().containsKey(string)) - { - Object o = getHeaders().getObject(string); - if (o instanceof byte[]) - { - throw new MessageFormatException("getObject couldn't find " + string + " item."); - } - else - { - if (o == null) - { - return null; - } - else - { - s = String.valueOf(o); - } - } - }//else return s // null; - } - - return s; - } - - public Object getObject(String string) throws JMSException - { - checkPropertyName(string); - return getHeaders().getObject(string); - } - - public void setBoolean(AMQShortString string, boolean b) throws JMSException - { - checkPropertyName(string); - getHeaders().setBoolean(string, b); - } - - public void setBoolean(String string, boolean b) throws JMSException - { - checkPropertyName(string); - getHeaders().setBoolean(string, b); - } - - public void setChar(String string, char c) throws JMSException - { - checkPropertyName(string); - getHeaders().setChar(string, c); - } - - public Object setBytes(AMQShortString string, byte[] bytes) - { - checkPropertyName(string); - return getHeaders().setBytes(string, bytes); - } - - public Object setBytes(String string, byte[] bytes) - { - checkPropertyName(string); - return getHeaders().setBytes(string, bytes); - } - - public Object setBytes(String string, byte[] bytes, int start, int length) - { - checkPropertyName(string); - return getHeaders().setBytes(string, bytes, start, length); - } - - public void setByte(String string, byte b) throws JMSException - { - checkPropertyName(string); - getHeaders().setByte(string, b); - } - - public void setByte(AMQShortString string, byte b) throws JMSException - { - checkPropertyName(string); - getHeaders().setByte(string, b); - } - - - public void setShort(String string, short i) throws JMSException - { - checkPropertyName(string); - getHeaders().setShort(string, i); - } - - public void setInteger(String string, int i) throws JMSException - { - checkPropertyName(string); - getHeaders().setInteger(string, i); - } - - public void setInteger(AMQShortString string, int i) throws JMSException - { - checkPropertyName(string); - getHeaders().setInteger(string, i); - } - - public void setLong(String string, long l) throws JMSException - { - checkPropertyName(string); - getHeaders().setLong(string, l); - } - - public void setFloat(String string, float v) throws JMSException - { - checkPropertyName(string); - getHeaders().setFloat(string, v); - } - - public void setDouble(String string, double v) throws JMSException - { - checkPropertyName(string); - getHeaders().setDouble(string, v); - } - - public void setString(String string, String string1) throws JMSException - { - checkPropertyName(string); - getHeaders().setString(string, string1); - } - - public void setString(AMQShortString string, String string1) throws JMSException - { - checkPropertyName(string); - getHeaders().setString(string, string1); - } - - public void setObject(String string, Object object) throws JMSException - { - checkPropertyName(string); - try - { - getHeaders().setObject(string, object); - } - catch (AMQPInvalidClassException aice) - { - MessageFormatException mfe = new MessageFormatException("Only primatives are allowed object is:" + object.getClass()); - mfe.setLinkedException(aice); - throw mfe; - } - } - - public boolean itemExists(String string) throws JMSException - { - checkPropertyName(string); - return getHeaders().containsKey(string); - } - - public Enumeration getPropertyNames() - { - return getHeaders().getPropertyNames(); - } - - public void clear() - { - getHeaders().clear(); - } - - public boolean propertyExists(AMQShortString propertyName) - { - checkPropertyName(propertyName); - return getHeaders().propertyExists(propertyName); - } - - public boolean propertyExists(String propertyName) - { - checkPropertyName(propertyName); - return getHeaders().propertyExists(propertyName); - } - - public Object put(Object key, Object value) - { - checkPropertyName(key.toString()); - return getHeaders().setObject(key.toString(), value); - } - - public Object remove(AMQShortString propertyName) - { - checkPropertyName(propertyName); - return getHeaders().remove(propertyName); - } - - public Object remove(String propertyName) - { - checkPropertyName(propertyName); - return getHeaders().remove(propertyName); - } - - public boolean isEmpty() - { - return getHeaders().isEmpty(); - } - - public void writeToBuffer(ByteBuffer data) - { - getHeaders().writeToBuffer(data); - } - - public Enumeration getMapNames() - { - return getPropertyNames(); - } - - protected static void checkPropertyName(CharSequence propertyName) - { - if (propertyName == null) - { - throw new IllegalArgumentException("Property name must not be null"); - } - else if (propertyName.length() == 0) - { - throw new IllegalArgumentException("Property name must not be the empty string"); - } - - checkIdentiferFormat(propertyName); - } - - protected static void checkIdentiferFormat(CharSequence propertyName) - { -// JMS requirements 3.5.1 Property Names -// Identifiers: -// - An identifier is an unlimited-length character sequence that must begin -// with a Java identifier start character; all following characters must be Java -// identifier part characters. An identifier start character is any character for -// which the method Character.isJavaIdentifierStart returns true. This includes -// '_' and '$'. An identifier part character is any character for which the -// method Character.isJavaIdentifierPart returns true. -// - Identifiers cannot be the names NULL, TRUE, or FALSE. -// � Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or -// ESCAPE. -// � Identifiers are either header field references or property references. The -// type of a property value in a message selector corresponds to the type -// used to set the property. If a property that does not exist in a message is -// referenced, its value is NULL. The semantics of evaluating NULL values -// in a selector are described in Section 3.8.1.2, �Null Values.� -// � The conversions that apply to the get methods for properties do not -// apply when a property is used in a message selector expression. For -// example, suppose you set a property as a string value, as in the -// following: -// myMessage.setStringProperty("NumberOfOrders", "2"); -// The following expression in a message selector would evaluate to false, -// because a string cannot be used in an arithmetic expression: -// "NumberOfOrders > 1" -// � Identifiers are case sensitive. -// � Message header field references are restricted to JMSDeliveryMode, -// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and -// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be -// null and if so are treated as a NULL value. - - if (Boolean.getBoolean("strict-jms")) - { - // JMS start character - if (!(Character.isJavaIdentifierStart(propertyName.charAt(0)))) - { - throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character"); - } - - // JMS part character - int length = propertyName.length(); - for (int c = 1; c < length; c++) - { - if (!(Character.isJavaIdentifierPart(propertyName.charAt(c)))) - { - throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character"); - } - } - - // JMS invalid names - if ((propertyName.equals("NULL") - || propertyName.equals("TRUE") - || propertyName.equals("FALSE") - || propertyName.equals("NOT") - || propertyName.equals("AND") - || propertyName.equals("OR") - || propertyName.equals("BETWEEN") - || propertyName.equals("LIKE") - || propertyName.equals("IN") - || propertyName.equals("IS") - || propertyName.equals("ESCAPE"))) - { - throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS"); - } - } - - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSMapMessage.java deleted file mode 100644 index 67c79b096f..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSMapMessage.java +++ /dev/null @@ -1,507 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpidity.jms.message; - -import org.apache.mina.common.ByteBuffer; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.JMSException; -import javax.jms.MessageFormatException; - -import java.nio.charset.CharacterCodingException; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Map; - -public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage -{ - private static final Logger _logger = LoggerFactory.getLogger(JMSMapMessage.class); - - public static final String MIME_TYPE = "jms/map-message"; - private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); - - private Map<String, Object> _map = new HashMap<String, Object>(); - - public JMSMapMessage() throws JMSException - { - this(null); - } - - JMSMapMessage(ByteBuffer data) throws JMSException - { - super(data); // this instantiates a content header - populateMapFromData(); - } - - JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, - ByteBuffer data) throws AMQException - { - super(messageNbr, contentHeader, exchange, routingKey, data); - try - { - populateMapFromData(); - } - catch (JMSException je) - { - throw new AMQException(null, "Error populating MapMessage from ByteBuffer", je); - - } - - } - - public String toBodyString() throws JMSException - { - return _map.toString(); - } - - public AMQShortString getMimeTypeAsShortString() - { - return MIME_TYPE_SHORT_STRING; - } - - public ByteBuffer getData() - { - // What if _data is null? - writeMapToData(); - - return super.getData(); - } - - @Override - public void clearBodyImpl() throws JMSException - { - super.clearBodyImpl(); - _map.clear(); - } - - public boolean getBoolean(String propName) throws JMSException - { - Object value = _map.get(propName); - - if (value instanceof Boolean) - { - return ((Boolean) value).booleanValue(); - } - else if ((value instanceof String) || (value == null)) - { - return Boolean.valueOf((String) value); - } - else - { - throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() - + " cannot be converted to boolean."); - } - - } - - public byte getByte(String propName) throws JMSException - { - Object value = _map.get(propName); - - if (value instanceof Byte) - { - return ((Byte) value).byteValue(); - } - else if ((value instanceof String) || (value == null)) - { - return Byte.valueOf((String) value).byteValue(); - } - else - { - throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() - + " cannot be converted to byte."); - } - } - - public short getShort(String propName) throws JMSException - { - Object value = _map.get(propName); - - if (value instanceof Short) - { - return ((Short) value).shortValue(); - } - else if (value instanceof Byte) - { - return ((Byte) value).shortValue(); - } - else if ((value instanceof String) || (value == null)) - { - return Short.valueOf((String) value).shortValue(); - } - else - { - throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() - + " cannot be converted to short."); - } - - } - - public int getInt(String propName) throws JMSException - { - Object value = _map.get(propName); - - if (value instanceof Integer) - { - return ((Integer) value).intValue(); - } - else if (value instanceof Short) - { - return ((Short) value).intValue(); - } - else if (value instanceof Byte) - { - return ((Byte) value).intValue(); - } - else if ((value instanceof String) || (value == null)) - { - return Integer.valueOf((String) value).intValue(); - } - else - { - throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() - + " cannot be converted to int."); - } - - } - - public long getLong(String propName) throws JMSException - { - Object value = _map.get(propName); - - if (value instanceof Long) - { - return ((Long) value).longValue(); - } - else if (value instanceof Integer) - { - return ((Integer) value).longValue(); - } - - if (value instanceof Short) - { - return ((Short) value).longValue(); - } - - if (value instanceof Byte) - { - return ((Byte) value).longValue(); - } - else if ((value instanceof String) || (value == null)) - { - return Long.valueOf((String) value).longValue(); - } - else - { - throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() - + " cannot be converted to long."); - } - - } - - public char getChar(String propName) throws JMSException - { - Object value = _map.get(propName); - - if (!_map.containsKey(propName)) - { - throw new MessageFormatException("Property " + propName + " not present"); - } - else if (value instanceof Character) - { - return ((Character) value).charValue(); - } - else if (value == null) - { - throw new NullPointerException("Property " + propName + " has null value and therefore cannot " - + "be converted to char."); - } - else - { - throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() - + " cannot be converted to boolan."); - } - - } - - public float getFloat(String propName) throws JMSException - { - Object value = _map.get(propName); - - if (value instanceof Float) - { - return ((Float) value).floatValue(); - } - else if ((value instanceof String) || (value == null)) - { - return Float.valueOf((String) value).floatValue(); - } - else - { - throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() - + " cannot be converted to float."); - } - } - - public double getDouble(String propName) throws JMSException - { - Object value = _map.get(propName); - - if (value instanceof Double) - { - return ((Double) value).doubleValue(); - } - else if (value instanceof Float) - { - return ((Float) value).doubleValue(); - } - else if ((value instanceof String) || (value == null)) - { - return Double.valueOf((String) value).doubleValue(); - } - else - { - throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() - + " cannot be converted to double."); - } - } - - public String getString(String propName) throws JMSException - { - Object value = _map.get(propName); - - if ((value instanceof String) || (value == null)) - { - return (String) value; - } - else if (value instanceof byte[]) - { - throw new MessageFormatException("Property " + propName + " of type byte[] " + "cannot be converted to String."); - } - else - { - return value.toString(); - } - - } - - public byte[] getBytes(String propName) throws JMSException - { - Object value = _map.get(propName); - - if (!_map.containsKey(propName)) - { - throw new MessageFormatException("Property " + propName + " not present"); - } - else if ((value instanceof byte[]) || (value == null)) - { - return (byte[]) value; - } - else - { - throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() - + " cannot be converted to byte[]."); - } - } - - public Object getObject(String propName) throws JMSException - { - return _map.get(propName); - } - - public Enumeration getMapNames() throws JMSException - { - return Collections.enumeration(_map.keySet()); - } - - public void setBoolean(String propName, boolean b) throws JMSException - { - checkWritable(); - checkPropertyName(propName); - _map.put(propName, b); - } - - public void setByte(String propName, byte b) throws JMSException - { - checkWritable(); - checkPropertyName(propName); - _map.put(propName, b); - } - - public void setShort(String propName, short i) throws JMSException - { - checkWritable(); - checkPropertyName(propName); - _map.put(propName, i); - } - - public void setChar(String propName, char c) throws JMSException - { - checkWritable(); - checkPropertyName(propName); - _map.put(propName, c); - } - - public void setInt(String propName, int i) throws JMSException - { - checkWritable(); - checkPropertyName(propName); - _map.put(propName, i); - } - - public void setLong(String propName, long l) throws JMSException - { - checkWritable(); - checkPropertyName(propName); - _map.put(propName, l); - } - - public void setFloat(String propName, float v) throws JMSException - { - checkWritable(); - checkPropertyName(propName); - _map.put(propName, v); - } - - public void setDouble(String propName, double v) throws JMSException - { - checkWritable(); - checkPropertyName(propName); - _map.put(propName, v); - } - - public void setString(String propName, String string1) throws JMSException - { - checkWritable(); - checkPropertyName(propName); - _map.put(propName, string1); - } - - public void setBytes(String propName, byte[] bytes) throws JMSException - { - checkWritable(); - checkPropertyName(propName); - _map.put(propName, bytes); - } - - public void setBytes(String propName, byte[] bytes, int offset, int length) throws JMSException - { - if ((offset == 0) && (length == bytes.length)) - { - setBytes(propName, bytes); - } - else - { - byte[] newBytes = new byte[length]; - System.arraycopy(bytes, offset, newBytes, 0, length); - setBytes(propName, newBytes); - } - } - - public void setObject(String propName, Object value) throws JMSException - { - checkWritable(); - checkPropertyName(propName); - if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer) - || (value instanceof Long) || (value instanceof Character) || (value instanceof Float) - || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) || (value == null)) - { - _map.put(propName, value); - } - else - { - throw new MessageFormatException("Cannot set property " + propName + " to value " + value + "of type " - + value.getClass().getName() + "."); - } - } - - private void checkPropertyName(String propName) - { - if ((propName == null) || propName.equals("")) - { - throw new IllegalArgumentException("Property name cannot be null, or the empty String."); - } - } - - public boolean itemExists(String propName) throws JMSException - { - return _map.containsKey(propName); - } - - private void populateMapFromData() throws JMSException - { - if (_data != null) - { - _data.rewind(); - - final int entries = readIntImpl(); - for (int i = 0; i < entries; i++) - { - String propName = readStringImpl(); - Object value = readObject(); - _map.put(propName, value); - } - } - else - { - _map.clear(); - } - } - - private void writeMapToData() - { - allocateInitialBuffer(); - final int size = _map.size(); - writeIntImpl(size); - for (Map.Entry<String, Object> entry : _map.entrySet()) - { - try - { - writeStringImpl(entry.getKey()); - } - catch (CharacterCodingException e) - { - throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(), e); - - } - - try - { - writeObject(entry.getValue()); - } - catch (JMSException e) - { - Object value = entry.getValue(); - throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() + " value : " + value - + " (type: " + value.getClass().getName() + ").", e); - } - } - - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSMapMessageFactory.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSMapMessageFactory.java deleted file mode 100644 index 12addc3279..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSMapMessageFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpidity.jms.message; - -import javax.jms.JMSException; - -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; - -public class JMSMapMessageFactory extends AbstractJMSMessageFactory -{ - public AbstractJMSMessage createMessage() throws JMSException - { - return new JMSMapMessage(); - } - - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, - AMQShortString exchange, AMQShortString routingKey, - ContentHeaderBody contentHeader) throws AMQException - { - return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data); - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSObjectMessage.java deleted file mode 100644 index 5798ab42b5..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSObjectMessage.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.jms.message; - -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; - -import javax.jms.JMSException; -import javax.jms.MessageFormatException; -import javax.jms.ObjectMessage; - -import org.apache.mina.common.ByteBuffer; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; - -public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage -{ - public static final String MIME_TYPE = "application/java-object-stream"; - private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); - - private static final int DEFAULT_BUFFER_SIZE = 1024; - - /** - * Creates empty, writable message for use by producers - */ - public JMSObjectMessage() - { - this(null); - } - - private JMSObjectMessage(ByteBuffer data) - { - super(data); - if (data == null) - { - _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); - _data.setAutoExpand(true); - } - - getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING); - } - - /** - * Creates read only message for delivery to consumers - */ - JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, - ByteBuffer data) throws AMQException - { - super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data); - } - - public void clearBodyImpl() throws JMSException - { - if (_data != null) - { - _data.release(); - } - - _data = null; - - } - - public String toBodyString() throws JMSException - { - return toString(_data); - } - - public AMQShortString getMimeTypeAsShortString() - { - return MIME_TYPE_SHORT_STRING; - } - - public void setObject(Serializable serializable) throws JMSException - { - checkWritable(); - - if (_data == null) - { - _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); - _data.setAutoExpand(true); - } - else - { - _data.rewind(); - } - - try - { - ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream()); - out.writeObject(serializable); - out.flush(); - out.close(); - } - catch (IOException e) - { - MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e); - mfe.setLinkedException(e); - throw mfe; - } - - } - - public Serializable getObject() throws JMSException - { - ObjectInputStream in = null; - if (_data == null) - { - return null; - } - - try - { - _data.rewind(); - in = new ObjectInputStream(_data.asInputStream()); - - return (Serializable) in.readObject(); - } - catch (IOException e) - { - MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); - mfe.setLinkedException(e); - throw mfe; - } - catch (ClassNotFoundException e) - { - MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); - mfe.setLinkedException(e); - throw mfe; - } - finally - { - _data.rewind(); - close(in); - } - } - - private static void close(InputStream in) - { - try - { - if (in != null) - { - in.close(); - } - } - catch (IOException ignore) - { } - } - - private static String toString(ByteBuffer data) - { - if (data == null) - { - return null; - } - - int pos = data.position(); - try - { - return data.getString(Charset.forName("UTF8").newDecoder()); - } - catch (CharacterCodingException e) - { - return null; - } - finally - { - data.position(pos); - } - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSObjectMessageFactory.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSObjectMessageFactory.java deleted file mode 100644 index 75431c2312..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSObjectMessageFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.jms.message; - -import javax.jms.JMSException; - -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; - -public class JMSObjectMessageFactory extends AbstractJMSMessageFactory -{ - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, - AMQShortString exchange, AMQShortString routingKey, - ContentHeaderBody contentHeader) throws AMQException - { - return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data); - } - - public AbstractJMSMessage createMessage() throws JMSException - { - return new JMSObjectMessage(); - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSStreamMessage.java deleted file mode 100644 index d3b53a28cf..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSStreamMessage.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.jms.message; - -import javax.jms.JMSException; -import javax.jms.StreamMessage; - -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; - -/** - * @author Apache Software Foundation - */ -public class JMSStreamMessage extends AbstractBytesTypedMessage implements StreamMessage -{ - public static final String MIME_TYPE="jms/stream-message"; - private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); - - - /** - * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read - * a byte array in multiple chunks, hence this is used to track how much is left to be read - */ - private int _byteArrayRemaining = -1; - - public JMSStreamMessage() - { - this(null); - } - - /** - * Construct a stream message with existing data. - * - * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - * set to auto expand - */ - JMSStreamMessage(ByteBuffer data) - { - super(data); // this instanties a content header - } - - - JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException - { - super(messageNbr, contentHeader, exchange, routingKey, data); - } - - public void reset() - { - super.reset(); - _readableMessage = true; - } - - public AMQShortString getMimeTypeAsShortString() - { - return MIME_TYPE_SHORT_STRING; - } - - - - public boolean readBoolean() throws JMSException - { - return super.readBoolean(); - } - - - public byte readByte() throws JMSException - { - return super.readByte(); - } - - public short readShort() throws JMSException - { - return super.readShort(); - } - - /** - * Note that this method reads a unicode character as two bytes from the stream - * - * @return the character read from the stream - * @throws JMSException - */ - public char readChar() throws JMSException - { - return super.readChar(); - } - - public int readInt() throws JMSException - { - return super.readInt(); - } - - public long readLong() throws JMSException - { - return super.readLong(); - } - - public float readFloat() throws JMSException - { - return super.readFloat(); - } - - public double readDouble() throws JMSException - { - return super.readDouble(); - } - - public String readString() throws JMSException - { - return super.readString(); - } - - public int readBytes(byte[] bytes) throws JMSException - { - return super.readBytes(bytes); - } - - - public Object readObject() throws JMSException - { - return super.readObject(); - } - - public void writeBoolean(boolean b) throws JMSException - { - super.writeBoolean(b); - } - - public void writeByte(byte b) throws JMSException - { - super.writeByte(b); - } - - public void writeShort(short i) throws JMSException - { - super.writeShort(i); - } - - public void writeChar(char c) throws JMSException - { - super.writeChar(c); - } - - public void writeInt(int i) throws JMSException - { - super.writeInt(i); - } - - public void writeLong(long l) throws JMSException - { - super.writeLong(l); - } - - public void writeFloat(float v) throws JMSException - { - super.writeFloat(v); - } - - public void writeDouble(double v) throws JMSException - { - super.writeDouble(v); - } - - public void writeString(String string) throws JMSException - { - super.writeString(string); - } - - public void writeBytes(byte[] bytes) throws JMSException - { - super.writeBytes(bytes); - } - - public void writeBytes(byte[] bytes, int offset, int length) throws JMSException - { - super.writeBytes(bytes,offset,length); - } - - public void writeObject(Object object) throws JMSException - { - super.writeObject(object); - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSStreamMessageFactory.java deleted file mode 100644 index 769e47482b..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSStreamMessageFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.jms.message; - -import javax.jms.JMSException; - -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; - -public class JMSStreamMessageFactory extends AbstractJMSMessageFactory -{ - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, - AMQShortString exchange, AMQShortString routingKey, - ContentHeaderBody contentHeader) throws AMQException - { - return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data); - } - - public AbstractJMSMessage createMessage() throws JMSException - { - return new JMSStreamMessage(); - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSTextMessage.java deleted file mode 100644 index aac7e583ae..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSTextMessage.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.jms.message; - -import java.io.UnsupportedEncodingException; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; - -import javax.jms.JMSException; - -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.CustomJMSXProperty; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; - -public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage -{ - private static final String MIME_TYPE = "text/plain"; - private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); - - - private String _decodedValue; - - /** - * This constant represents the name of a property that is set when the message payload is null. - */ - private static final AMQShortString PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.getShortStringName(); - private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8"); - - public JMSTextMessage() throws JMSException - { - this(null, null); - } - - JMSTextMessage(ByteBuffer data, String encoding) throws JMSException - { - super(data); // this instantiates a content header - getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING); - getContentHeaderProperties().setEncoding(encoding); - } - - JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) - throws AMQException - { - super(deliveryTag, contentHeader, exchange, routingKey, data); - contentHeader.setContentType(MIME_TYPE_SHORT_STRING); - _data = data; - } - - JMSTextMessage(ByteBuffer data) throws JMSException - { - this(data, null); - } - - JMSTextMessage(String text) throws JMSException - { - super((ByteBuffer) null); - setText(text); - } - - public void clearBodyImpl() throws JMSException - { - if (_data != null) - { - _data.release(); - } - _data = null; - _decodedValue = null; - } - - public String toBodyString() throws JMSException - { - return getText(); - } - - public void setData(ByteBuffer data) - { - _data = data; - } - - public AMQShortString getMimeTypeAsShortString() - { - return MIME_TYPE_SHORT_STRING; - } - - public void setText(String text) throws JMSException - { - checkWritable(); - - clearBody(); - try - { - if (text != null) - { - _data = ByteBuffer.allocate(text.length()); - _data.limit(text.length()) ; - //_data.sweep(); - _data.setAutoExpand(true); - final String encoding = getContentHeaderProperties().getEncodingAsString(); - if (encoding == null) - { - _data.put(text.getBytes(DEFAULT_CHARSET.name())); - } - else - { - _data.put(text.getBytes(encoding)); - } - _changedData=true; - } - _decodedValue = text; - } - catch (UnsupportedEncodingException e) - { - // should never occur - JMSException jmse = new JMSException("Unable to decode text data"); - jmse.setLinkedException(e); - } - } - - public String getText() throws JMSException - { - if (_data == null && _decodedValue == null) - { - return null; - } - else if (_decodedValue != null) - { - return _decodedValue; - } - else - { - _data.rewind(); - - if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY)) - { - return null; - } - if (getContentHeaderProperties().getEncodingAsString() != null) - { - try - { - _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncodingAsString()).newDecoder()); - } - catch (CharacterCodingException e) - { - JMSException je = new JMSException("Could not decode string data: " + e); - je.setLinkedException(e); - throw je; - } - } - else - { - try - { - _decodedValue = _data.getString(DEFAULT_CHARSET.newDecoder()); - } - catch (CharacterCodingException e) - { - JMSException je = new JMSException("Could not decode string data: " + e); - je.setLinkedException(e); - throw je; - } - } - return _decodedValue; - } - } - - // @Override - public void prepareForSending() throws JMSException - { - // super.prepareForSending(); - if (_data == null) - { - setBooleanProperty(PAYLOAD_NULL_PROPERTY, true); - } - else - { - removeProperty(PAYLOAD_NULL_PROPERTY); - } - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSTextMessageFactory.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSTextMessageFactory.java deleted file mode 100644 index 4a078eb141..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSTextMessageFactory.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.jms.message; - -import javax.jms.JMSException; - -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; - -public class JMSTextMessageFactory extends AbstractJMSMessageFactory -{ - - public AbstractJMSMessage createMessage() throws JMSException - { - return new JMSTextMessage(); - } - - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, - AMQShortString exchange, AMQShortString routingKey, - ContentHeaderBody contentHeader) throws AMQException - { - return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties, - exchange, routingKey, data); - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java new file mode 100644 index 0000000000..24cd8a5a9a --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java @@ -0,0 +1,606 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms.message; + +import org.apache.qpidity.QpidException; + +import javax.jms.MapMessage; +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import java.util.Enumeration; +import java.util.Map; +import java.util.HashMap; +import java.util.Vector; +import java.io.*; +import java.nio.ByteBuffer; + +/** + * Implements javax.jms.MapMessage + */ +public class MapMessageImpl extends MessageImpl implements MapMessage +{ + + /** + * The MapMessage's payload. + */ + private Map<String, Object> _map = new HashMap<String, Object>(); + + /** + * Indicates whether an key exists in this MapMessage. + * + * @param key the name of the key to test + * @return true if the key exists + * @throws JMSException If determinein if the key exists fails due to some internal error + */ + public boolean itemExists(String key) throws JMSException + { + return _map.containsKey(key); + } + + /** + * Returns the booleanvalue with the specified key. + * + * @param key The key name. + * @return The boolean value with the specified key. + * @throws JMSException If reading the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If this type conversion is invalid. + */ + public boolean getBoolean(String key) throws JMSException + { + boolean result = false; + if (_map.containsKey(key)) + { + try + { + Object objValue = _map.get(key); + if (objValue != null) + { + result = MessageHelper.convertToBoolean(_map.get(key)); + } + } + catch (ClassCastException e) + { + throw new MessageFormatException("Wrong type for key: " + key); + } + } + return result; + } + + /** + * Returns the byte value with the specified name. + * + * @param key The key name. + * @return The byte value with the specified key. + * @throws JMSException If reading the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If this type conversion is invalid. + */ + public byte getByte(String key) throws JMSException + { + Object objValue = _map.get(key); + if (objValue == null) + { + throw new NumberFormatException("Wrong type for key: " + key); + } + return MessageHelper.convertToByte(objValue); + } + + /** + * Returns the <CODE>short</CODE> value with the specified name. + * + * @param key The key name. + * @return The <CODE>short</CODE> value with the specified key. + * @throws JMSException If reading the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If this type conversion is invalid. + */ + public short getShort(String key) throws JMSException + { + Object objValue = _map.get(key); + if (objValue == null) + { + throw new NumberFormatException("Wrong type for key: " + key); + } + return MessageHelper.convertToShort(objValue); + } + + /** + * Returns the Unicode character value with the specified name. + * + * @param key The key name. + * @return The Unicode charactervalue with the specified key. + * @throws JMSException If reading the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If this type conversion is invalid. + */ + public char getChar(String key) throws JMSException + { + Object objValue = _map.get(key); + if (objValue == null) + { + throw new java.lang.NullPointerException(); + } + return MessageHelper.convertToChar(objValue); + } + + /** + * Returns the intvalue with the specified name. + * + * @param key The key name. + * @return The int value with the specified key. + * @throws JMSException If reading the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If this type conversion is invalid. + */ + public int getInt(String key) throws JMSException + { + Object objValue = _map.get(key); + if (objValue == null) + { + throw new NumberFormatException("Wrong type for key: " + key); + } + return MessageHelper.convertToInt(objValue); + } + + /** + * Returns the longvalue with the specified name. + * + * @param key The key name. + * @return The long value with the specified key. + * @throws JMSException If reading the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If this type conversion is invalid. + */ + public long getLong(String key) throws JMSException + { + Object objValue = _map.get(key); + if (objValue == null) + { + throw new NumberFormatException("Wrong type for key: " + key); + } + return MessageHelper.convertToLong(objValue); + } + + /** + * Returns the float value with the specified name. + * + * @param key The key name. + * @return The float value with the specified key. + * @throws JMSException If reading the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If this type conversion is invalid. + */ + public float getFloat(String key) throws JMSException + { + Object objValue = _map.get(key); + if (objValue == null) + { + throw new NumberFormatException("Wrong type for key: " + key); + } + return MessageHelper.convertToFloat(objValue); + } + + /** + * Returns the double value with the specified name. + * + * @param key The key name. + * @return The double value with the specified key. + * @throws JMSException If reading the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If this type conversion is invalid. + */ + public double getDouble(String key) throws JMSException + { + Object objValue = _map.get(key); + if (objValue == null) + { + throw new NumberFormatException("Wrong type for key: " + key); + } + return MessageHelper.convertToDouble(objValue); + } + + /** + * Returns the String value with the specified name. + * + * @param key The key name. + * @return The String value with the specified key. + * @throws JMSException If reading the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If this type conversion is invalid. + */ + public String getString(String key) throws JMSException + { + String result = null; + Object objValue = _map.get(key); + if (objValue != null) + { + if (objValue instanceof byte[]) + { + throw new NumberFormatException("Wrong type for key: " + key); + } + else + { + result = objValue.toString(); + } + } + return result; + } + + /** + * Returns the byte array value with the specified name. + * + * @param key The key name. + * @return The byte value with the specified key. + * @throws JMSException If reading the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If this type conversion is invalid. + */ + public byte[] getBytes(String key) throws JMSException + { + Object objValue = _map.get(key); + if (objValue == null) + { + return null; + } + if (objValue instanceof byte[]) + { + byte[] value = (byte[]) objValue; + byte[] toReturn = new byte[value.length]; + System.arraycopy(value, 0, toReturn, 0, value.length); + return toReturn; + } + throw new MessageFormatException("Wrong type for key: " + key); + } + + /** + * Returns the value of the object with the specified name. + * + * @param key The key name. + * @return The byte value with the specified key. + * @throws JMSException If reading the message fails due to some internal error. + */ + public Object getObject(String key) throws JMSException + { + try + { + Object objValue = _map.get(key); + if (objValue == null) + { + return null; + } + else if (objValue instanceof byte[]) + { + byte[] value = (byte[]) objValue; + byte[] toReturn = new byte[value.length]; + System.arraycopy(value, 0, toReturn, 0, value.length); + return toReturn; + } + else + { + return objValue; + } + } + catch (java.lang.ClassCastException cce) + { + throw new MessageFormatException("Wrong type for key: " + key); + } + } + + /** + * Returns an Enumeration of all the keys + * + * @return an enumeration of all the keys in this MapMessage + * @throws JMSException If reading the message fails due to some internal error. + */ + public Enumeration getMapNames() throws JMSException + { + Vector<String> propVector = new Vector<String>(_map.keySet()); + return propVector.elements(); + } + + /** + * Sets a boolean value with the specified key into the Map. + * + * @param key The key name. + * @param value The boolean value to set in the Map. + * @throws JMSException If writting the message fails due to some internal error. + * @throws IllegalArgumentException If the key is nul or an empty string. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void setBoolean(String key, boolean value) throws JMSException, IllegalArgumentException + { + isWriteable(); + checkNotNullKey(key); + _map.put(key, value); + } + + /** + * Sets a byte value with the specified name into the Map. + * + * @param key The key name. + * @param value The byte value to set in the Map. + * @throws JMSException If writting the message fails due to some internal error. + * @throws IllegalArgumentException If the key is nul or an empty string. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void setByte(String key, byte value) throws JMSException, IllegalArgumentException + { + isWriteable(); + checkNotNullKey(key); + _map.put(key, value); + } + + /** + * Sets a shortvalue with the specified name into the Map. + * + * @param key The key name. + * @param value The short value to set in the Map. + * @throws JMSException If writting the message fails due to some internal error. + * @throws IllegalArgumentException If the key is nul or an empty string. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void setShort(String key, short value) throws JMSException, IllegalArgumentException + { + isWriteable(); + checkNotNullKey(key); + _map.put(key, value); + } + + /** + * Sets a Unicode character value with the specified name into the Map. + * + * @param key The key name. + * @param value The character value to set in the Map. + * @throws JMSException If writting the message fails due to some internal error. + * @throws IllegalArgumentException If the key is nul or an empty string. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void setChar(String key, char value) throws JMSException, IllegalArgumentException + { + isWriteable(); + checkNotNullKey(key); + _map.put(key, value); + } + + /** + * Sets an intvalue with the specified name into the Map. + * + * @param key The key name. + * @param value The int value to set in the Map. + * @throws JMSException If writting the message fails due to some internal error. + * @throws IllegalArgumentException If the key is nul or an empty string. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void setInt(String key, int value) throws JMSException, IllegalArgumentException + { + isWriteable(); + checkNotNullKey(key); + _map.put(key, value); + } + + /** + * Sets a long value with the specified name into the Map. + * + * @param key The key name. + * @param value The long value to set in the Map. + * @throws JMSException If writting the message fails due to some internal error. + * @throws IllegalArgumentException If the key is nul or an empty string. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void setLong(String key, long value) throws JMSException, IllegalArgumentException + { + isWriteable(); + checkNotNullKey(key); + _map.put(key, value); + } + + /** + * Sets a float value with the specified name into the Map. + * + * @param key The key name. + * @param value The float value to set in the Map. + * @throws JMSException If writting the message fails due to some internal error. + * @throws IllegalArgumentException If the key is nul or an empty string. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void setFloat(String key, float value) throws JMSException, IllegalArgumentException + { + isWriteable(); + checkNotNullKey(key); + _map.put(key, value); + } + + /** + * Sets a double value with the specified name into the Map. + * + * @param key The key name. + * @param value The double value to set in the Map. + * @throws JMSException If writting the message fails due to some internal error. + * @throws IllegalArgumentException If the key is nul or an empty string. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void setDouble(String key, double value) throws JMSException, IllegalArgumentException + { + isWriteable(); + checkNotNullKey(key); + _map.put(key, value); + } + + /** + * Sets a String value with the specified name into the Map. + * + * @param key The key name. + * @param value The String value to set in the Map. + * @throws JMSException If writting the message fails due to some internal error. + * @throws IllegalArgumentException If the key is nul or an empty string. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void setString(String key, String value) throws JMSException, IllegalArgumentException + { + isWriteable(); + checkNotNullKey(key); + _map.put(key, value); + } + + /** + * Sets a byte array value with the specified name into the Map. + * + * @param key the name of the byte array + * @param value the byte array value to set in the Map; the array + * is copied so that the value for <CODE>name</CODE> will + * not be altered by future modifications + * @throws JMSException If writting the message fails due to some internal error. + * @throws IllegalArgumentException If the key is nul or an empty string. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void setBytes(String key, byte[] value) throws JMSException, NullPointerException + { + isWriteable(); + checkNotNullKey(key); + byte[] newBytes = new byte[value.length]; + System.arraycopy(value, 0, newBytes, 0, value.length); + _map.put(key, value); + } + + /** + * Sets a portion of the byte array value with the specified name into the + * Map. + * + * @param key the name of the byte array + * @param value the byte array value to set in the Map; the array + * is copied so that the value for <CODE>name</CODE> will + * not be altered by future modifications + * @throws JMSException If writting the message fails due to some internal error. + * @throws IllegalArgumentException If the key is nul or an empty string. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void setBytes(String key, byte[] value, int offset, int length) throws JMSException, IllegalArgumentException + { + isWriteable(); + checkNotNullKey(key); + byte[] newBytes = new byte[length]; + System.arraycopy(value, offset, newBytes, 0, length); + _map.put(key, newBytes); + } + + /** + * Sets an object value with the specified name into the Map. + * + * @param key the name of the byte array + * @param value the byte array value to set in the Map; the array + * is copied so that the value for <CODE>name</CODE> will + * not be altered by future modifications + * @throws JMSException If writting the message fails due to some internal error. + * @throws IllegalArgumentException If the key is nul or an empty string. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void setObject(String key, Object value) throws JMSException, IllegalArgumentException + { + isWriteable(); + checkNotNullKey(key); + if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer) || (value instanceof Long) || (value instanceof Character) || (value instanceof Float) || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) || (value == null)) + { + _map.put(key, value); + } + else + { + throw new MessageFormatException("Cannot set property " + key + " to value " + value + "of type " + value + .getClass().getName() + "."); + } + } + + //-- Overwritten methods + /** + * This method is invoked before this message is dispatched. + * <p>This class uses it to convert its text payload into a ByteBuffer + */ + public void beforeMessageDispatch() throws QpidException + { + try + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(_map); + byte[] bytes = baos.toByteArray(); + setMessageData(ByteBuffer.wrap(bytes)); + } + catch (java.io.IOException ioe) + { + throw new QpidException("problem when dispatching message", null, ioe); + } + super.beforeMessageDispatch(); + } + + + /** + * This method is invoked after this message has been received. + */ + public void afterMessageReceive() throws QpidException + { + super.afterMessageReceive(); + ByteBuffer messageData = getMessageData(); + if (messageData != null) + { + try + { + ByteArrayInputStream bais = new ByteArrayInputStream(messageData.array()); + ObjectInputStream ois = new ObjectInputStream(bais); + _map = (Map<String, Object>) ois.readObject(); + } + catch (IOException ioe) + { + throw new QpidException( + "Unexpected error during rebuild of message in afterReceive(): " + "- IO Exception", null, ioe); + } + catch (ClassNotFoundException e) + { + throw new QpidException( + "Unexpected error during rebuild of message in afterReceive(): " + "- Could not find the required class in classpath.", + null, e); + } + } + } + + //-- protected methods + /** + * This method throws an <CODE>IllegalArgumentException</CODE> if the supplied parameter is null. + * + * @param key The key to check. + * @throws IllegalArgumentException If the key is null. + */ + private void checkNotNullKey(String key) throws IllegalArgumentException + { + if (key == null || key.equals("")) + { + throw new IllegalArgumentException("Key cannot be null"); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageConverter.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageConverter.java deleted file mode 100644 index 5554edb107..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageConverter.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.jms.message; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageEOFException; -import javax.jms.ObjectMessage; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; - -import java.util.Enumeration; - -public class MessageConverter -{ - - /** - * Log4J logger - */ - protected final Logger _logger = LoggerFactory.getLogger(getClass()); - - /** - * AbstractJMSMessage which will hold the converted message - */ - private AbstractJMSMessage _newMessage; - - public MessageConverter(AbstractJMSMessage message) throws JMSException - { - _newMessage = message; - } - - public MessageConverter(BytesMessage message) throws JMSException - { - BytesMessage bytesMessage = (BytesMessage) message; - bytesMessage.reset(); - - JMSBytesMessage nativeMsg = new JMSBytesMessage(); - - byte[] buf = new byte[1024]; - - int len; - - while ((len = bytesMessage.readBytes(buf)) != -1) - { - nativeMsg.writeBytes(buf, 0, len); - } - - _newMessage = nativeMsg; - setMessageProperties(message); - } - - public MessageConverter(MapMessage message) throws JMSException - { - MapMessage nativeMessage = new JMSMapMessage(); - - Enumeration mapNames = message.getMapNames(); - while (mapNames.hasMoreElements()) - { - String name = (String) mapNames.nextElement(); - nativeMessage.setObject(name, message.getObject(name)); - } - - _newMessage = (AbstractJMSMessage) nativeMessage; - setMessageProperties(message); - } - - public MessageConverter(ObjectMessage message) throws JMSException - { - ObjectMessage origMessage = (ObjectMessage) message; - ObjectMessage nativeMessage = new JMSObjectMessage(); - - nativeMessage.setObject(origMessage.getObject()); - - _newMessage = (AbstractJMSMessage) nativeMessage; - setMessageProperties(message); - - } - - public MessageConverter(TextMessage message) throws JMSException - { - TextMessage nativeMessage = new JMSTextMessage(); - - nativeMessage.setText(message.getText()); - - _newMessage = (AbstractJMSMessage) nativeMessage; - setMessageProperties(message); - } - - public MessageConverter(StreamMessage message) throws JMSException - { - StreamMessage nativeMessage = new JMSStreamMessage(); - - try - { - message.reset(); - while (true) - { - nativeMessage.writeObject(message.readObject()); - } - } - catch (MessageEOFException e) - { - // we're at the end so don't mind the exception - } - - _newMessage = (AbstractJMSMessage) nativeMessage; - setMessageProperties(message); - } - - public MessageConverter(Message message) throws JMSException - { - // Send a message with just properties. - // Throwing away content - BytesMessage nativeMessage = new JMSBytesMessage(); - - _newMessage = (AbstractJMSMessage) nativeMessage; - setMessageProperties(message); - } - - public AbstractJMSMessage getConvertedMessage() - { - return _newMessage; - } - - /** - * Sets all message properties - */ - protected void setMessageProperties(Message message) throws JMSException - { - setNonJMSProperties(message); - setJMSProperties(message); - } - - /** - * Sets all non-JMS defined properties on converted message - */ - protected void setNonJMSProperties(Message message) throws JMSException - { - Enumeration propertyNames = message.getPropertyNames(); - while (propertyNames.hasMoreElements()) - { - String propertyName = String.valueOf(propertyNames.nextElement()); - // TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them - if (!propertyName.startsWith("JMSX_")) - { - Object value = message.getObjectProperty(propertyName); - _newMessage.setObjectProperty(propertyName, value); - } - } - } - - /** - * Exposed JMS defined properties on converted message: - * JMSDestination - we don't set here - * JMSDeliveryMode - set - * JMSExpiration - we don't set here - * JMSPriority - we don't set here - * JMSMessageID - we don't set here - * JMSTimestamp - we don't set here - * JMSCorrelationID - set - * JMSReplyTo - set - * JMSType - set - * JMSRedlivered - we don't set here - */ - protected void setJMSProperties(Message message) throws JMSException - { - _newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode()); - - if (message.getJMSReplyTo() != null) - { - _newMessage.setJMSReplyTo(message.getJMSReplyTo()); - } - - _newMessage.setJMSType(message.getJMSType()); - - _newMessage.setJMSCorrelationID(message.getJMSCorrelationID()); - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageFactory.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageFactory.java deleted file mode 100644 index 33de600ab9..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.jms.message; - -import java.util.List; - -import javax.jms.JMSException; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; - - -public interface MessageFactory -{ - AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, - ContentHeaderBody contentHeader, - AMQShortString exchange, AMQShortString routingKey, - List bodies) - throws JMSException, AMQException; - - AbstractJMSMessage createMessage() throws JMSException; -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageFactoryRegistry.java deleted file mode 100644 index 7d84edf29b..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageFactoryRegistry.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.jms.message; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import javax.jms.JMSException; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; - -public class MessageFactoryRegistry -{ - private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>(); - private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap = - new HashMap<AMQShortString, MessageFactory>(); - - /** - * Construct a new registry with the default message factories registered - * @return a message factory registry - */ - public static MessageFactoryRegistry newDefaultRegistry() - { - MessageFactoryRegistry mf = new MessageFactoryRegistry(); - mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory()); - mf.registerFactory("text/plain", new JMSTextMessageFactory()); - mf.registerFactory("text/xml", new JMSTextMessageFactory()); - mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory()); - mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory()); - mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory()); - mf.registerFactory(null, new JMSBytesMessageFactory()); - - return mf; - } - - public void registerFactory(String mimeType, MessageFactory mf) - { - if (mf == null) - { - throw new IllegalArgumentException("Message factory must not be null"); - } - - _mimeStringToFactoryMap.put(mimeType, mf); - _mimeShortStringToFactoryMap.put(new AMQShortString(mimeType), mf); - } - - public MessageFactory deregisterFactory(String mimeType) - { - _mimeShortStringToFactoryMap.remove(new AMQShortString(mimeType)); - - return _mimeStringToFactoryMap.remove(mimeType); - } - - /** - * Create a message. This looks up the MIME type from the content header and instantiates the appropriate - * concrete message type. - * @param deliveryTag the AMQ message id - * @param redelivered true if redelivered - * @param contentHeader the content header that was received - * @param bodies a list of ContentBody instances - * @return the message. - * @throws AMQException - * @throws JMSException - */ - public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange, - AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies) - throws AMQException, JMSException - { - BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties; - - // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over - // AMQP. When the type is null, it can only be assumed that the message is a byte message. - AMQShortString contentTypeShortString = properties.getContentType(); - contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE) - : contentTypeShortString; - - MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString); - if (mf == null) - { - throw new AMQException(null, "Unsupport MIME type of " + properties.getContentTypeAsString(), null); - } - else - { - return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies); - } - } - - public AbstractJMSMessage createMessage(String mimeType) throws AMQException, JMSException - { - if (mimeType == null) - { - throw new IllegalArgumentException("Mime type must not be null"); - } - - MessageFactory mf = _mimeStringToFactoryMap.get(mimeType); - if (mf == null) - { - throw new AMQException(null, "Unsupport MIME type of " + mimeType, null); - } - else - { - return mf.createMessage(); - } - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageHelper.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageHelper.java new file mode 100644 index 0000000000..f5a2fd0ac6 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageHelper.java @@ -0,0 +1,272 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms.message; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import java.math.BigDecimal; +import java.math.BigInteger; + +/** + * + * This is an helper class for performing data convertion + */ +public class MessageHelper +{ + /** + * Convert an object into a boolean value + * + * @param obj object that may contain boolean value + * @return A boolean value. + * @throws MessageFormatException If this type conversion is invalid. + */ + public static boolean convertToBoolean(Object obj) throws JMSException + { + boolean result; + if (obj instanceof Boolean) + { + result = (Boolean) obj; + } + else if (obj instanceof String) + { + result = ((String) obj).equalsIgnoreCase("true"); + } + else + { + throw new MessageFormatException("boolean property type convertion error", + "Messasge property type convertion error"); + } + return result; + } + + /** + * Convert an object into a byte value + * + * @param obj The object that may contain byte value + * @return The convertToed byte value. + * @throws MessageFormatException If this type conversion is invalid. + */ + public static byte convertToByte(Object obj) throws JMSException + { + byte result; + if (obj instanceof Byte) + { + result = ((Number) obj).byteValue(); + } + else if (obj instanceof String) + { + result = Byte.parseByte((String) obj); + } + else + { + throw new MessageFormatException("byte property type convertion error", + "Messasge property type convertion error"); + } + return result; + } + + /** + * Convert an object into a short value + * + * @param obj The object that may contain short value + * @return The convertToed short value. + * @throws MessageFormatException If this type conversion is invalid. + */ + public static short convertToShort(Object obj) throws JMSException + { + short result; + if ((obj instanceof Short) || (obj instanceof Byte)) + { + result = ((Number) obj).shortValue(); + } + else if (obj instanceof String) + { + result = Short.parseShort((String) obj); + } + else + { + throw new MessageFormatException("short property type convertion error", + "Messasge property type convertion error"); + } + return result; + } + + /** + * Convert an object into a int value + * + * @param obj The object that may contain int value + * @return The convertToed int value. + * @throws MessageFormatException If this type conversion is invalid. + */ + public static int convertToInt(Object obj) throws JMSException + { + int result; + if ((obj instanceof Integer) || (obj instanceof Byte) || (obj instanceof Short)) + { + result = ((Number) obj).intValue(); + } + else if (obj instanceof String) + { + result = Integer.parseInt((String) obj); + } + else + { + throw new MessageFormatException("int property type convertion error", + "Messasge property type convertion error"); + } + return result; + } + + /** + * Convert an object into a long value + * + * @param obj The object that may contain long value + * @return The convertToed long value. + * @throws MessageFormatException If this type conversion is invalid. + */ + public static long convertToLong(Object obj) throws JMSException + { + long result; + if ((obj instanceof Number) && !((obj instanceof Float) || (obj instanceof Double))) + { + result = ((Number) obj).longValue(); + } + else if (obj instanceof String) + { + + result = Long.parseLong((String) obj); + } + else + { + throw new MessageFormatException("long property type convertion error", + "Messasge property type convertion error"); + } + return result; + } + + /** + * Convert an object into a float value + * + * @param obj The object that may contain float value + * @return The convertToed float value. + * @throws MessageFormatException If this type conversion is invalid. + */ + public static float convertToFloat(Object obj) throws JMSException + { + float result; + if (obj instanceof Float) + { + result = ((Number) obj).floatValue(); + } + else if (obj instanceof String) + { + result = Float.parseFloat((String) obj); + } + else + { + throw new MessageFormatException("float property type convertion error", + "Messasge property type convertion error"); + } + return result; + } + + /** + * Convert an object into a double value + * + * @param obj The object that may contain double value + * @return The convertToed double value. + * @throws MessageFormatException If this type conversion is invalid. + */ + public static double convertToDouble(Object obj) throws JMSException + { + double result; + if ((obj instanceof Double) || (obj instanceof Float)) + { + result = ((Number) obj).doubleValue(); + } + else if (obj instanceof String) + { + result = Double.parseDouble((String) obj); + } + else + { + throw new MessageFormatException("double property type convertion error", + "Messasge property type convertion error"); + } + return result; + } + + /** + * Convert an object into a char value + * + * @param obj The object that may contain char value + * @return The convertToed char value. + * @throws MessageFormatException If this type conversion is invalid. + */ + public static char convertToChar(Object obj) throws JMSException + { + char result; + if (obj instanceof Character) + { + result = ((Character) obj).charValue(); + } + else + { + throw new MessageFormatException("char property type convertion error", + "Messasge property type convertion error"); + } + return result; + } + + /** + * Convert an object into a String value + * + * @param obj The object that may contain String value + * @return The convertToed String value. + */ + public static String convertToString(Object obj) + { + String stringValue; + if (obj instanceof String) + { + stringValue = (String) obj; + } + else + { + stringValue = obj.toString(); + } + return stringValue; + } + + /** + * Check if the passed object represents Java primitive type + * + * @param value object for inspection + * @return true if object represent Java primitive type; false otherwise + */ + public static boolean isPrimitive(Object value) throws JMSException + { + // Innocent till proven guilty + boolean isPrimitive = true; + if (!((value instanceof String) || (value instanceof Boolean) || (value instanceof Character) || ((value instanceof Number) && !((value instanceof BigDecimal) || (value instanceof BigInteger))))) + { + isPrimitive = false; + } + return isPrimitive; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java index 468d8d39f9..18b2fa571b 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java @@ -20,10 +20,7 @@ package org.apache.qpidity.jms.message; import org.apache.qpidity.jms.ExceptionHelper; import org.apache.qpidity.QpidException; -import javax.jms.Message; -import javax.jms.JMSException; -import javax.jms.Destination; -import javax.jms.DeliveryMode; +import javax.jms.*; import java.util.Enumeration; /** @@ -33,7 +30,6 @@ public class MessageImpl extends QpidMessage implements Message { /** * The ReplyTo destination for this message - * TODO set it when the message is received */ private Destination _replyTo; @@ -42,14 +38,19 @@ public class MessageImpl extends QpidMessage implements Message * <p>When a message is sent this value is ignored. After completion * of the send method it holds the destination specified by the send. * <p>When a message is received, its destination value must be - * equivalent to the value assigned when it was sent. --> TODO + * equivalent to the value assigned when it was sent. */ private Destination _destination; /** * Indicates whether the message properties are in writeable status. */ - private boolean _readOnly = false; + protected boolean _readOnly = false; + + /** + * Indicate whether the message properties are in writeable status. + */ + protected boolean _proertiesReadOnly = false; //---- javax.jms.Message interface /** @@ -455,142 +456,411 @@ public class MessageImpl extends QpidMessage implements Message super.setMessagePriority((short) priority); } - + /** + * Clear the message's properties. + * <p/> + * The message header fields and body are not cleared. + * + * @throws JMSException if clearing JMS message properties fails due to some internal error. + */ public void clearProperties() throws JMSException { - // TODO - + // The properties can now be written + // Properties are read only when the message is received. + _proertiesReadOnly = false; + super.clearMessageProperties(); } - public boolean propertyExists(String string) throws JMSException + /** + * Indicates whether a property value exists. + * + * @param name The name of the property to test the existence + * @return True if the property exists, false otherwise. + * @throws JMSException if checking if the property exists fails due to some internal error. + */ + public boolean propertyExists(String name) throws JMSException { - // TODO - return false; + // Access the property; if the result is null, + // then the property value does not exist + return (super.getProperty(name) != null); } - public boolean getBooleanProperty(String string) throws JMSException + /** + * Access a boolean property value with the given name. + * + * @param name The name of the boolean property. + * @return The boolean property value with the given name. + * @throws JMSException if getting the boolean property fails due to some internal error. + * @throws MessageFormatException If this type conversion is invalid. + */ + public boolean getBooleanProperty(String name) throws JMSException { - // TODO - return false; + Object booleanProperty = getObjectProperty(name); + return booleanProperty != null && MessageHelper.convertToBoolean(booleanProperty); } - public byte getByteProperty(String string) throws JMSException + /** + * Access a byte property value with the given name. + * + * @param name The name of the byte property. + * @return The byte property value with the given name. + * @throws JMSException if getting the byte property fails due to some internal error. + * @throws MessageFormatException If this type conversion is invalid. + */ + public byte getByteProperty(String name) throws JMSException { - // TODO - return 0; + Object byteProperty = getObjectProperty(name); + if (byteProperty == null) + { + throw new NumberFormatException("Proerty " + name + " is null"); + } + else + { + return MessageHelper.convertToByte(byteProperty); + } } - public short getShortProperty(String string) throws JMSException + /** + * Access a short property value with the given name. + * + * @param name The name of the short property. + * @return The short property value with the given name. + * @throws JMSException if getting the short property fails due to some internal error. + * @throws MessageFormatException If this type conversion is invalid. + */ + public short getShortProperty(String name) throws JMSException { - // TODO - return 0; + Object shortProperty = getObjectProperty(name); + if (shortProperty == null) + { + throw new NumberFormatException("Proerty " + name + " is null"); + } + else + { + return MessageHelper.convertToShort(shortProperty); + } } - public int getIntProperty(String string) throws JMSException + /** + * Access a int property value with the given name. + * + * @param name The name of the int property. + * @return The int property value with the given name. + * @throws JMSException if getting the int property fails due to some internal error. + * @throws MessageFormatException If this type conversion is invalid. + */ + public int getIntProperty(String name) throws JMSException { - // TODO - return 0; + Object intProperty = getObjectProperty(name); + if (intProperty == null) + { + throw new NumberFormatException("Proerty " + name + " is null"); + } + else + { + return MessageHelper.convertToInt(intProperty); + } } - public long getLongProperty(String string) throws JMSException + /** + * Access a long property value with the given name. + * + * @param name The name of the long property. + * @return The long property value with the given name. + * @throws JMSException if getting the long property fails due to some internal error. + * @throws MessageFormatException If this type conversion is invalid. + */ + public long getLongProperty(String name) throws JMSException { - // TODO - return 0; + Object longProperty = getObjectProperty(name); + if (longProperty == null) + { + throw new NumberFormatException("Proerty " + name + " is null"); + } + else + { + return MessageHelper.convertToLong(longProperty); + } } - public float getFloatProperty(String string) throws JMSException + /** + * Access a long property value with the given name. + * + * @param name The name of the long property. + * @return The long property value with the given name. + * @throws JMSException if getting the long property fails due to some internal error. + * @throws MessageFormatException If this type conversion is invalid. + */ + public float getFloatProperty(String name) throws JMSException { - // TODO - return 0; + Object floatProperty = getObjectProperty(name); + if (floatProperty == null) + { + throw new NumberFormatException("Proerty " + name + " is null"); + } + else + { + return MessageHelper.convertToFloat(floatProperty); + } } - public double getDoubleProperty(String string) throws JMSException + /** + * Access a double property value with the given name. + * + * @param name The name of the double property. + * @return The double property value with the given name. + * @throws JMSException if getting the double property fails due to some internal error. + * @throws MessageFormatException If this type conversion is invalid. + */ + public double getDoubleProperty(String name) throws JMSException { - // TODO - return 0; + Object doubleProperty = getObjectProperty(name); + if (doubleProperty == null) + { + throw new NumberFormatException("Proerty " + name + " is null"); + } + else + { + return MessageHelper.convertToDouble(doubleProperty); + } } - public String getStringProperty(String string) throws JMSException + /** + * Access a String property value with the given name. + * + * @param name The name of the String property. + * @return The String property value with the given name. + * @throws JMSException if getting the String property fails due to some internal error. + * @throws MessageFormatException If this type conversion is invalid. + */ + public String getStringProperty(String name) throws JMSException { - // TODO - return null; + Object stringProperty = getObjectProperty(name); + String result = null; + if (stringProperty != null) + { + result = MessageHelper.convertToString(stringProperty); + } + return result; } - public Object getObjectProperty(String string) throws JMSException + /** + * Return the object property value with the given name. + * + * @param name the name of the Java object property + * @return the Java object property value with the given name, in + * objectified format (ie. if it set as an int, then a Integer is + * returned). If there is no property by this name, a null value + * is returned. + * @throws JMSException If getting the object property fails due to some internal error. + */ + public Object getObjectProperty(String name) throws JMSException { - // TODO - return null; + return super.getProperty(name); } + /** + * Get an Enumeration of all the property names. + * + * @return An enumeration of all the names of property values. + * @throws JMSException If getting the property names fails due to some internal JMS error. + */ public Enumeration getPropertyNames() throws JMSException { - // TODO - return null; + return super.getAllPropertyNames(); } - public void setBooleanProperty(String string, boolean b) throws JMSException + /** + * Set a boolean property value with the given name. + * + * @param name The name of the boolean property + * @param value The boolean property value to set. + * @throws JMSException If setting the property fails due to some internal JMS error. + * @throws MessageNotWriteableException If the message properties are read-only. + */ + public void setBooleanProperty(String name, boolean value) throws JMSException { - // TODO - + setObjectProperty(name, value); } - public void setByteProperty(String string, byte b) throws JMSException + /** + * Set a byte property value with the given name. + * + * @param name The name of the byte property + * @param value The byte property value to set. + * @throws JMSException If setting the property fails due to some internal JMS error. + * @throws MessageNotWriteableException If the message properties are read-only. + */ + public void setByteProperty(String name, byte value) throws JMSException { - // TODO - + setObjectProperty(name, value); } - public void setShortProperty(String string, short i) throws JMSException + /** + * Set a short property value with the given name. + * + * @param name The name of the short property + * @param value The short property value to set. + * @throws JMSException If setting the property fails due to some internal JMS error. + * @throws MessageNotWriteableException If the message properties are read-only. + */ + public void setShortProperty(String name, short value) throws JMSException { - // TODO - + setObjectProperty(name, value); } - public void setIntProperty(String string, int i) throws JMSException + /** + * Set an int property value with the given name. + * + * @param name The name of the int property + * @param value The int property value to set. + * @throws JMSException If setting the property fails due to some internal JMS error. + * @throws MessageNotWriteableException If the message properties are read-only. + */ + public void setIntProperty(String name, int value) throws JMSException { - // TODO - + setObjectProperty(name, value); } - public void setLongProperty(String string, long l) throws JMSException + /** + * Set a long property value with the given name. + * + * @param name The name of the long property + * @param value The long property value to set. + * @throws JMSException If setting the property fails due to some internal JMS error. + * @throws MessageNotWriteableException If the message properties are read-only. + */ + public void setLongProperty(String name, long value) throws JMSException { - // TODO - + setObjectProperty(name, value); } - public void setFloatProperty(String string, float v) throws JMSException + /** + * Set a float property value with the given name. + * + * @param name The name of the float property + * @param value The float property value to set. + * @throws JMSException If setting the property fails due to some internal JMS error. + * @throws MessageNotWriteableException If the message properties are read-only. + */ + public void setFloatProperty(String name, float value) throws JMSException { - // TODO + setObjectProperty(name, value); + } + /** + * Set a double property value with the given name. + * + * @param name The name of the double property + * @param value The double property value to set. + * @throws JMSException If setting the property fails due to some internal JMS error. + * @throws MessageNotWriteableException If the message properties are read-only. + */ + public void setDoubleProperty(String name, double value) throws JMSException + { + setObjectProperty(name, value); } - public void setDoubleProperty(String string, double v) throws JMSException + /** + * Set a string property value with the given name. + * + * @param name The name of the string property + * @param value The string property value to set. + * @throws JMSException If setting the property fails due to some internal JMS error. + * @throws MessageNotWriteableException If the message properties are read-only. + */ + public void setStringProperty(String name, String value) throws JMSException { - // TODO + setObjectProperty(name, value); + } + /** + * Set a Java object property value with the given name. + * <p> The JMS spec says: + * <p> The setObjectProperty method accepts values of class Boolean, Byte, Short, Integer, + * Long, Float, Double, and String. An attempt to use any other class must throw a JMSException. + * + * @param name the name of the Java object property. + * @param value the Java object property value to set in the Message. + * @throws JMSException If setting the property fails due to some internal JMS error. + * @throws MessageFormatException If the object is invalid + * @throws MessageNotWriteableException If the message properties are read-only. + */ + public void setObjectProperty(String name, Object value) throws JMSException + { + if (_proertiesReadOnly) + { + throw new MessageNotWriteableException("Error the message properties are read only"); + } + if (!(value instanceof String || value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long || value instanceof Float || value instanceof Double || value instanceof Boolean || value == null)) + { + throw new MessageFormatException("Format of object " + value + " is not supported"); + } + super.setProperty(name, value); } - public void setStringProperty(String string, String string1) throws JMSException + public void acknowledge() throws JMSException { // TODO - } - public void setObjectProperty(String string, Object object) throws JMSException + /** + * Clear out the message body. Clearing a message's body does not clear + * its header values or property entries. + * <P>If this message body was read-only, calling this method leaves + * the message body is in the same state as an empty body in a newly + * created message. + * + * @throws JMSException If clearing this message body fails to due to some error. + */ + public void clearBody() throws JMSException { - // TODO + super.clearMessageData(); + _readOnly = false; + } + //--- Additional public methods + /** + * This method is invoked before a message dispatch operation. + * + * @throws QpidException If the destination is not set + */ + public void beforeMessageDispatch() throws QpidException + { + if (_destination == null) + { + throw new QpidException("Invalid destination null", null, null); + } } - public void acknowledge() throws JMSException + /** + * This method is invoked after this message is received. + * + * @throws QpidException + */ + public void afterMessageReceive() throws QpidException { - // TODO + // recreate a destination object for the encoded destination + // _destination = // todo + // recreate a destination object for the encoded ReplyTo destination (if it exists) + // _replyTo = // todo + + _proertiesReadOnly = true; + _readOnly = true; } - public void clearBody() throws JMSException + /** + * Test whether this message is readonly by throwing a MessageNotWriteableException if this + * message is readonly + * + * @throws MessageNotWriteableException If this message is readonly + */ + protected void isWriteable() throws MessageNotWriteableException { - // TODO - + if (_readOnly) + { + throw new MessageNotWriteableException("Cannot update message"); + } } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java new file mode 100644 index 0000000000..ab5ac21d78 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java @@ -0,0 +1,155 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms.message; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpidity.QpidException; + +import javax.jms.ObjectMessage; +import javax.jms.JMSException; +import javax.jms.MessageNotWriteableException; +import java.io.*; +import java.nio.ByteBuffer; + +/** + * Implemetns javax.jms.ObjectMessage + */ +public class ObjectMessageImpl extends MessageImpl implements ObjectMessage +{ + + /** + * this ObjectMessageImpl's logger + */ + private static final Logger _logger = LoggerFactory.getLogger(ObjectMessageImpl.class); + + /** + * The ObjectMessage's payload. + */ + private Serializable _object = null; + + //--- Interface ObjctMessage + /** + * Sets the serializable object containing this message's data. + * <p> JE JMS spec says: + * <p> It is important to note that an <CODE>ObjectMessage</CODE> + * contains a snapshot of the object at the time <CODE>setObject()</CODE> + * is called; subsequent modifications of the object will have no + * effect on the <CODE>ObjectMessage</CODE> body. + * + * @param object The message's data + * @throws JMSException If setting the object fails due to some error. + * @throws javax.jms.MessageFormatException + * If object serialization fails. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void setObject(Serializable object) throws JMSException + { + isWriteable(); + try + { + // Serialize the passed in object, then de-serialize it + // so that changes to it do not affect m_data (JAVA's way to perform a deep clone) + ByteArrayOutputStream bOut = new ByteArrayOutputStream(); + ObjectOutputStream objOut = new ObjectOutputStream(bOut); + objOut.writeObject(object); + byte[] bArray = bOut.toByteArray(); + ByteArrayInputStream bIn = new ByteArrayInputStream(bArray); + ObjectInputStream objIn = new ObjectInputStream(bIn); + _object = (Serializable) objIn.readObject(); + objOut.close(); + objIn.close(); + } + catch (Exception e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Unexpected exeption when performing object deep clone", e); + } + throw new MessageNotWriteableException("Unexpected exeption when performing object deep clone", + e.getMessage()); + } + } + + /** + * Gets the serializable object containing this message's data. The + * default value is null. + * + * @return The serializable object containing this message's data + * @throws JMSException If getting the object fails due to some internal error. + */ + public Serializable getObject() throws JMSException + { + return _object; + } + + //--- Overwritten methods + /** + * This method is invoked before a message dispatch operation. + * + * @throws org.apache.qpidity.QpidException + * If the destination is not set + */ + public void beforeMessageDispatch() throws QpidException + { + try + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(_object); + byte[] bytes = baos.toByteArray(); + setMessageData(ByteBuffer.wrap(bytes)); + } + catch (IOException e) + { + throw new QpidException("Problem when setting object of object message", null, e); + } + super.beforeMessageDispatch(); + } + + /** + * This method is invoked after this message is received. + * + * @throws QpidException + */ + public void afterMessageReceive() throws QpidException + { + super.afterMessageReceive(); + try + { + ByteArrayInputStream bais = new ByteArrayInputStream(getMessageData().array()); + ObjectInputStream ois = new ObjectInputStream(bais); + _object = (Serializable) ois.readObject(); + } + catch (IOException ioe) + { + throw new QpidException( + "Unexpected error during rebuild of message in afterReceive() - " + + "The Object stored in the message was not a Serializable object.", + null, ioe); + } + catch (ClassNotFoundException clnfe) + { + throw new QpidException( + "Unexpected error during rebuild of message in afterReceive() - " + + "Could not find the required class in classpath.", + null, clnfe); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java index 17b7140691..3fe838d3bb 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java @@ -24,6 +24,10 @@ import org.apache.qpidity.ReplyTo; import org.apache.qpidity.QpidException; import javax.jms.Message; +import java.util.Map; +import java.util.Enumeration; +import java.util.Vector; +import java.nio.ByteBuffer; public class QpidMessage @@ -33,6 +37,17 @@ public class QpidMessage */ private org.apache.qpidity.api.Message _qpidityMessage; + /** + * This message specific properties. + */ + private Map<String, Object> _messageProperties; + + /** + * This message data + */ + private ByteBuffer _messageData; + + //--- This is required as AMQP delivery modes are different from the JMS ones public static final short DELIVERY_MODE_PERSISTENT = 2; public static final short DELIVERY_MODE_NON_PERSISTENT = 1; @@ -42,7 +57,7 @@ public class QpidMessage /** * The message properties */ - + /** * Get the message ID. * @@ -222,7 +237,75 @@ public class QpidMessage _qpidityMessage.getDeliveryProperties().setPriority(priority); } + /** + * Clear this messasge specific properties. + */ + protected void clearMessageProperties() + { + _messageProperties.clear(); + } + + /** + * Access to a message specific property. + * + * @param name The property to access. + * @return The value associated with this property, mull if the value is null or the property does not exist. + */ + protected Object getProperty(String name) + { + return _messageProperties.get(name); + } + + /** + * Set a property for this message + * + * @param name The name of the property to set. + * @param value The value of the rpoperty. + */ + protected void setProperty(String name, Object value) + { + _messageProperties.put(name, value); + } + /** + * Get an Enumeration of all the property names + * + * @return An Enumeration of all the property names. + */ + protected Enumeration<String> getAllPropertyNames() + { + Vector<String> vec = new Vector<String>(_messageProperties.keySet()); + return vec.elements(); + } + + /** + * Set this message body + * + * @param messageBody The buffer containing this message data + */ + protected void setMessageData(ByteBuffer messageBody) + { + _messageData = messageBody; + } + + /** + * Access this messaage data. + * + * @return This message data. + */ + protected ByteBuffer getMessageData() + { + return _messageData; + } + + + /** + * Clear this message data + */ + protected void clearMessageData() + { + _messageData = ByteBuffer.allocate(1024); + } public Message getJMSMessage() { diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/StreamMessaeImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/StreamMessaeImpl.java new file mode 100644 index 0000000000..4fc64a9a23 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/StreamMessaeImpl.java @@ -0,0 +1,1091 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms.message; + +import javax.jms.*; +import java.io.IOException; +import java.io.EOFException; + +/** + * The JMS spec says: + * StreamMessage objects support the following conversion table. + * The marked cases must be supported. The unmarked cases must throw a JMSException. + * The String-to-primitive conversions may throw a runtime exception if the + * primitive's valueOf() method does not accept it as a valid String representation of the primitive. + * <p> A value written as the row type can be read as the column type. + * <p/> + * | | boolean byte short char int long float double String byte[] + * |---------------------------------------------------------------------- + * |boolean | X X + * |byte | X X X X X + * |short | X X X X + * |char | X X + * |int | X X X + * |long | X X + * |float | X X X + * |double | X X + * |String | X X X X X X X X + * |byte[] | X + * |---------------------------------------------------------------------- + */ +public class StreamMessaeImpl extends BytesMessageImpl implements StreamMessage +{ + /** + * Those statics represent incoming field types. The type of a field is + * written first in the stream + */ + private static final byte BOOLEAN = 1; + private static final byte BYTE = 2; + private static final byte CHAR = 3; + private static final byte DOUBLE = 4; + private static final byte FLOAT = 5; + private static final byte INT = 6; + private static final byte LONG = 7; + private static final byte SHORT = 8; + private static final byte STRING = 9; + private static final byte BYTEARRAY = 10; + private static final byte NULL = 11; + + /** + * The size of the byteArray written in this stream + */ + private int _sizeOfByteArray = 0; + + /** + * Reads a boolean. + * <p/> + * | | boolean byte short char int long float double String byte[] + * |---------------------------------------------------------------------- + * |boolean | X X + * + * @return The boolean value read + * @throws JMSException If reading a boolean fails due to some error. + * @throws javax.jms.MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + * @throws MessageFormatException If this type conversion is invalid. + */ + public boolean readBoolean() throws JMSException + { + isReadableAndNotReadingByteArray(); + boolean result; + try + { + _dataIn.mark(10); + byte type = _dataIn.readByte(); + switch (type) + { + case BOOLEAN: + result = super.readBoolean(); + break; + case STRING: + int len = _dataIn.readInt(); + byte[] bArray = new byte[len]; + _dataIn.readFully(bArray); + result = Boolean.valueOf(new String(bArray)); + break; + case NULL: + result = false; + break; + default: + _dataIn.reset(); + throw new MessageFormatException("Invalid Object Type"); + } + } + catch (EOFException eof) + { + throw new MessageEOFException("End of file Reached when reading message"); + } + catch (IOException io) + { + throw new JMSException("IO exception when reading message"); + } + return result; + } + + /** + * Reads a byte. + * <p/> + * | | boolean byte short char int long float double String byte[] + * |---------------------------------------------------------------------- + * |byte | X X + * + * @return The byte value read + * @throws JMSException If reading fails due to some error. + * @throws javax.jms.MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + * @throws MessageFormatException If this type conversion is invalid. + */ + public byte readByte() throws JMSException + { + isReadableAndNotReadingByteArray(); + byte result; + try + { + _dataIn.mark(10); + byte type = _dataIn.readByte(); + switch (type) + { + case BYTE: + result = super.readByte(); + break; + case STRING: + int len = _dataIn.readInt(); + byte[] bArray = new byte[len]; + _dataIn.readFully(bArray); + result = new Byte(new String(bArray)); + break; + case NULL: + result = Byte.valueOf(null); + break; + default: + _dataIn.reset(); + throw new MessageFormatException("Invalid Object Type"); + } + } + catch (EOFException eof) + { + throw new MessageEOFException("End of file Reached when reading message"); + } + catch (IOException io) + { + throw new JMSException("IO exception when reading message"); + } + return result; + } + + /** + * Reads a short. + * <p/> + * | | boolean byte short char int long float double String byte[] + * |---------------------------------------------------------------------- + * |short | X X X + * + * @return The short value read + * @throws JMSException If reading fails due to some error. + * @throws javax.jms.MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + * @throws MessageFormatException If this type conversion is invalid. + */ + public short readShort() throws JMSException + { + isReadableAndNotReadingByteArray(); + short result; + try + { + _dataIn.mark(10); + byte type = _dataIn.readByte(); + switch (type) + { + case SHORT: + result = super.readShort(); + break; + case BYTE: + result = super.readByte(); + break; + case STRING: + int len = _dataIn.readInt(); + byte[] bArray = new byte[len]; + _dataIn.readFully(bArray); + result = new Short(new String(bArray)); + break; + case NULL: + result = Short.valueOf(null); + break; + default: + _dataIn.reset(); + throw new MessageFormatException("Invalid Object Type"); + } + } + catch (EOFException eof) + { + throw new MessageEOFException("End of file Reached when reading message"); + } + catch (IOException io) + { + throw new JMSException("IO exception when reading message"); + } + return result; + } + + /** + * Reads a char. + * <p/> + * | | boolean byte short char int long float double String byte[] + * |---------------------------------------------------------------------- + * |char | X + * + * @return The char value read + * @throws JMSException If reading fails due to some error. + * @throws javax.jms.MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + * @throws MessageFormatException If this type conversion is invalid. + */ + public char readChar() throws JMSException + { + isReadableAndNotReadingByteArray(); + char result; + try + { + _dataIn.mark(10); + byte type = _dataIn.readByte(); + switch (type) + { + case CHAR: + result = super.readChar(); + break; + case NULL: + _dataIn.reset(); + throw new NullPointerException(); + default: + _dataIn.reset(); + throw new MessageFormatException("Invalid Object Type"); + } + } + catch (EOFException eof) + { + throw new MessageEOFException("End of file Reached when reading message"); + } + catch (IOException io) + { + throw new JMSException("IO exception when reading message"); + } + return result; + } + + /** + * Reads an Int. + * <p/> + * | | boolean byte short char int long float double String byte[] + * |---------------------------------------------------------------------- + * |int | X X X X + * + * @return The int value read + * @throws JMSException If reading fails due to some error. + * @throws javax.jms.MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + * @throws MessageFormatException If this type conversion is invalid. + */ + public int readInt() throws JMSException + { + isReadableAndNotReadingByteArray(); + int result; + try + { + _dataIn.mark(10); + byte type = _dataIn.readByte(); + switch (type) + { + case INT: + result = super.readInt(); + break; + case SHORT: + result = super.readShort(); + break; + case BYTE: + result = super.readByte(); + break; + case STRING: + int len = _dataIn.readInt(); + byte[] bArray = new byte[len]; + _dataIn.readFully(bArray); + result = new Integer(new String(bArray)); + break; + case NULL: + result = Integer.valueOf(null); + break; + default: + _dataIn.reset(); + throw new MessageFormatException("Invalid Object Type"); + } + } + catch (EOFException eof) + { + throw new MessageEOFException("End of file Reached when reading message"); + } + catch (IOException io) + { + throw new JMSException("IO exception when reading message"); + } + return result; + } + + /** + * Reads an Long. + * <p/> + * | | boolean byte short char int long float double String byte[] + * |---------------------------------------------------------------------- + * |long | X X X X X + * + * @return The long value read + * @throws JMSException If reading fails due to some error. + * @throws javax.jms.MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + * @throws MessageFormatException If this type conversion is invalid. + */ + public long readLong() throws JMSException + { + isReadableAndNotReadingByteArray(); + long result; + try + { + _dataIn.mark(10); + byte type = _dataIn.readByte(); + switch (type) + { + case LONG: + result = super.readLong(); + break; + case INT: + result = super.readInt(); + break; + case SHORT: + result = super.readShort(); + break; + case BYTE: + result = super.readByte(); + break; + case STRING: + int len = _dataIn.readInt(); + byte[] bArray = new byte[len]; + _dataIn.readFully(bArray); + result = (new Long(new String(bArray))); + break; + case NULL: + result = Long.valueOf(null); + break; + default: + _dataIn.reset(); + throw new MessageFormatException("Invalid Object Type"); + } + } + catch (EOFException eof) + { + throw new MessageEOFException("End of file Reached when reading message"); + } + catch (IOException io) + { + throw new JMSException("IO exception when reading message"); + } + return result; + } + + /** + * Reads an Float. + * <p/> + * | | boolean byte short char int long float double String byte[] + * |---------------------------------------------------------------------- + * |float | X X + * + * @return The float value read + * @throws JMSException If reading fails due to some error. + * @throws javax.jms.MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + * @throws MessageFormatException If this type conversion is invalid. + */ + public float readFloat() throws JMSException + { + isReadableAndNotReadingByteArray(); + float result; + try + { + _dataIn.mark(10); + byte type = _dataIn.readByte(); + switch (type) + { + case FLOAT: + result = super.readFloat(); + break; + case STRING: + int len = _dataIn.readInt(); + byte[] bArray = new byte[len]; + _dataIn.readFully(bArray); + result = new Float(new String(bArray)); + break; + case NULL: + result = Float.valueOf(null); + break; + default: + _dataIn.reset(); + throw new MessageFormatException("Invalid Object Type"); + } + } + catch (EOFException eof) + { + throw new MessageEOFException("End of file Reached when reading message"); + } + catch (IOException io) + { + throw new JMSException("IO exception when reading message"); + } + return result; + } + + /** + * Reads an double. + * <p/> + * | | boolean byte short char int long float double String byte[] + * |---------------------------------------------------------------------- + * |double | X X X + * + * @return The double value read + * @throws JMSException If reading fails due to some error. + * @throws javax.jms.MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + * @throws MessageFormatException If this type conversion is invalid. + */ + public double readDouble() throws JMSException + { + isReadableAndNotReadingByteArray(); + double result; + try + { + _dataIn.mark(10); + byte type = _dataIn.readByte(); + switch (type) + { + case DOUBLE: + result = super.readDouble(); + break; + case FLOAT: + result = super.readFloat(); + break; + case STRING: + int len = _dataIn.readInt(); + byte[] bArray = new byte[len]; + _dataIn.readFully(bArray); + result = new Double(new String(bArray)); + break; + case NULL: + result = Double.valueOf(null); + break; + default: + _dataIn.reset(); + throw new MessageFormatException("Invalid Object Type"); + } + } + catch (EOFException eof) + { + throw new MessageEOFException("End of file Reached when reading message"); + } + catch (IOException io) + { + throw new JMSException("IO exception when reading message"); + } + return result; + } + + /** + * Reads an string. + * <p/> + * | | boolean byte short char int long float double String byte[] + * |---------------------------------------------------------------------- + * |double | X X X X X X X X X + * + * @return The string value read + * @throws JMSException If reading fails due to some error. + * @throws javax.jms.MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + * @throws MessageFormatException If this type conversion is invalid. + */ + public String readString() throws JMSException + { + isReadableAndNotReadingByteArray(); + String result; + try + { + _dataIn.mark(10); + byte type = _dataIn.readByte(); + switch (type) + { + case BOOLEAN: + result = Boolean.valueOf(super.readBoolean()).toString(); + break; + case BYTE: + result = Byte.valueOf(super.readByte()).toString(); + break; + case SHORT: + result = Short.valueOf(super.readShort()).toString(); + break; + case CHAR: + result = Character.valueOf(super.readChar()).toString(); + break; + case INT: + result = Integer.valueOf(super.readInt()).toString(); + break; + case LONG: + result = Long.valueOf(super.readLong()).toString(); + break; + case FLOAT: + result = Float.valueOf(super.readFloat()).toString(); + break; + case DOUBLE: + result = Double.valueOf(super.readDouble()).toString(); + break; + case STRING: + int len = _dataIn.readInt(); + if (len == 0) + { + throw new NullPointerException("trying to read a null String"); + } + byte[] bArray = new byte[len]; + _dataIn.readFully(bArray); + result = new String(bArray); + break; + case NULL: + result = null; + break; + default: + _dataIn.reset(); + throw new MessageFormatException("Invalid Object Type"); + } + } + catch (EOFException eof) + { + throw new MessageEOFException("End of file Reached when reading message"); + } + catch (IOException io) + { + throw new JMSException("IO exception when reading message"); + } + return result; + } + + /** + * Reads an byte[]. + * <p/> + * | | boolean byte short char int long float double String byte[] + * |---------------------------------------------------------------------- + * |byte[] | X + * <p> The JMS spec says: + * To read the field value, readBytes should be successively called until + * it returns a value less than the length + * of the read buffer. The value of the bytes in the buffer following the last byte read is undefined. + * + * @param value The byte array into which the data is read. + * @return the total number of bytes read into the array, or -1 if + * there is no more data because the end of the byte field has been + * reached. + * @throws JMSException If reading fails due to some error. + * @throws javax.jms.MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + * @throws MessageFormatException If this type conversion is invalid. + */ + public int readBytes(byte[] value) throws JMSException + { + isReadable(); + int result = -1; + try + { + byte type = BYTEARRAY; + if (_sizeOfByteArray == 0) + { + // we are not in the middle of reading this byte array + _dataIn.mark(10); + type = _dataIn.readByte(); + } + switch (type) + { + case BYTEARRAY: + if (_sizeOfByteArray == 0) + { + // we need to read the size of this byte array + _sizeOfByteArray = _dataIn.readInt(); + } + result = _dataIn.read(value, 0, value.length); + if (result != -1) + { + _sizeOfByteArray = _sizeOfByteArray - result; + } + else + { + _sizeOfByteArray = 0; + } + case NULL: + // result = -1; + break; + default: + _dataIn.reset(); + throw new MessageFormatException("Invalid Object Type"); + } + } + catch (EOFException eof) + { + throw new MessageEOFException("End of file Reached when reading message"); + } + catch (IOException io) + { + throw new JMSException("IO exception when reading message"); + } + return result; + } + + /** + * Reads an object from the stream message. + * <p> The JMS spec says: + * <P>This method can be used to return, in objectified format, + * an object in the Java programming language ("Java object") that has + * been written to the stream with the equivalent + * <CODE>writeObject</CODE> method call, or its equivalent primitive + * <CODE>write<I>type</I></CODE> method. + * <P>An attempt to call <CODE>readObject</CODE> to read a byte field + * value into a new <CODE>byte[]</CODE> object before the full value of the + * byte field has been read will throw a + * <CODE>MessageFormatException</CODE>. + * + * @return A Java object from the stream message, in objectified + * format + * @throws JMSException If reading fails due to some error. + * @throws javax.jms.MessageEOFException If unexpected end of message data has been reached. + * @throws javax.jms.MessageNotReadableException + * If the message is in write-only mode. + * @throws MessageFormatException If this type conversion is invalid. + */ + public Object readObject() throws JMSException + { + isReadableAndNotReadingByteArray(); + Object result; + try + { + _dataIn.mark(10); + byte type = _dataIn.readByte(); + switch (type) + { + case BOOLEAN: + result = super.readBoolean(); + break; + case BYTE: + result = super.readByte(); + break; + case SHORT: + result = super.readShort(); + break; + case CHAR: + result = super.readChar(); + break; + case INT: + result = super.readInt(); + break; + case LONG: + result = super.readLong(); + break; + case FLOAT: + result = super.readFloat(); + break; + case DOUBLE: + result = super.readDouble(); + break; + case STRING: + int len = _dataIn.readInt(); + if (len == 0) + { + result = null; + } + else + { + byte[] bArray = new byte[len]; + _dataIn.readFully(bArray); + result = new String(bArray); + } + break; + case BYTEARRAY: + int totalBytes = _dataIn.readInt(); + byte[] bArray = new byte[totalBytes]; + _dataIn.read(bArray, 0, totalBytes); + result = bArray; + break; + case NULL: + result = null; + break; + default: + _dataIn.reset(); + throw new MessageFormatException("Invalid Object Type"); + } + } + catch (EOFException eof) + { + throw new MessageEOFException("End of file Reached when reading message"); + } + catch (IOException io) + { + throw new JMSException("IO exception when reading message"); + } + return result; + } + + /** + * Writes a boolean to the stream message. + * + * @param val The boolean value to be written + * @throws JMSException If writting a boolean fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeBoolean(boolean val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeShort(BOOLEAN); + super.writeBoolean(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes a byte to the stream message. + * + * @param val The byte value to be written + * @throws JMSException If writting a boolean fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeByte(byte val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeShort(BYTE); + super.writeByte(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes a short to the stream message. + * + * @param val The short value to be written + * @throws JMSException If writting a boolean fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeShort(short val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeShort(SHORT); + super.writeShort(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes a char to the stream message. + * + * @param val The char value to be written + * @throws JMSException If writting a boolean fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeChar(char val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeShort(CHAR); + super.writeChar(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes a int to the stream message. + * + * @param val The int value to be written + * @throws JMSException If writting a boolean fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeInt(int val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeShort(INT); + super.writeInt(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes a long to the stream message. + * + * @param val The long value to be written + * @throws JMSException If writting a boolean fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeLong(long val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeShort(LONG); + super.writeLong(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes a float to the stream message. + * + * @param val The float value to be written + * @throws JMSException If writting a boolean fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeFloat(float val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeShort(FLOAT); + super.writeFloat(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes a double to the stream message. + * + * @param val The double value to be written + * @throws JMSException If writting a boolean fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeDouble(double val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeShort(DOUBLE); + super.writeDouble(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes a string to the stream message. + * + * @param val The string value to be written + * @throws JMSException If writting a boolean fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeString(String val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeShort(STRING); + if (val == null) + { + _dataOut.writeInt(0); + } + else + { + byte[] bArray = val.getBytes(); + int len = bArray.length; + _dataOut.writeInt(len); + _dataOut.write(bArray); + } + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes a byte array to the stream message. + * + * @param val The byte array value to be written + * @throws JMSException If writting a boolean fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeBytes(byte[] val) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeShort(BYTEARRAY); + _dataOut.writeInt(val.length); + super.writeBytes(val); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes a portion of byte array to the bytes message. + * + * @param val The byte array value to be written + * @throws JMSException If writting a byte array fails due to some error. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeBytes(byte[] val, int offset, int length) throws JMSException + { + isWriteable(); + try + { + _dataOut.writeShort(BYTEARRAY); + _dataOut.writeInt(length); + super.writeBytes(val, offset, length); + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + /** + * Writes an Object to the bytes message. + * JMS spec says: + * <p>This method works only for the objectified primitive + * object types Integer, Double, Long, String and byte + * arrays. + * + * @param val The short value to be written + * @throws JMSException If writting a short fails due to some error. + * @throws NullPointerException if the parameter val is null. + * @throws MessageFormatException If the object is of an invalid type. + * @throws javax.jms.MessageNotWriteableException + * If the message is in read-only mode. + */ + public void writeObject(Object val) throws JMSException + { + isWriteable(); + try + { + if (val == null) + { + _dataOut.writeShort(NULL); + } + else if (val instanceof Byte) + { + writeByte((Byte) val); + } + else if (val instanceof Boolean) + { + writeBoolean((Boolean) val); + } + else if (val instanceof Short) + { + writeShort((Short) val); + } + else if (val instanceof Integer) + { + writeInt((Integer) val); + } + else if (val instanceof Long) + { + writeLong((Long) val); + } + else if (val instanceof Double) + { + writeDouble((Double) val); + } + else if (val instanceof Float) + { + writeFloat((Float) val); + } + else if (val instanceof Character) + { + writeChar((Character) val); + } + else if (val instanceof String) + { + writeString((String) val); + } + else if (val instanceof byte[]) + { + writeBytes((byte[]) val); + } + else + { + throw new MessageFormatException( + "The data type of the object specified as the value to writeObject " + "was of an invalid type."); + } + } + catch (IOException e) + { + throw new JMSException("IO problem when writting " + e.getLocalizedMessage()); + } + } + + //-- overwritten methods + /** + * Test whether this message is readable by throwing a MessageNotReadableException if this + * message cannot be read. + * + * @throws javax.jms.MessageNotReadableException + * If this message cannot be read. + * @throws javax.jms.MessageFormatException + * If reading a byte array. + */ + protected void isReadableAndNotReadingByteArray() throws MessageNotReadableException, MessageFormatException + { + if (_dataIn == null) + { + throw new MessageNotReadableException("Cannot read this message"); + } + if (_sizeOfByteArray > 0) + { + throw new MessageFormatException( + "Read of object attempted while incomplete byteArray stored in message " + "- finish reading byte array first."); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java new file mode 100644 index 0000000000..df04673fe1 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java @@ -0,0 +1,127 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms.message; + +import org.apache.qpidity.QpidException; + +import javax.jms.TextMessage; +import javax.jms.JMSException; +import java.nio.ByteBuffer; +import java.io.UnsupportedEncodingException; + +/** + * Implements the interface javax.jms.TextMessage + */ +public class TextMessageImpl extends MessageImpl implements TextMessage +{ + /** + * The character encoding for converting non ASCII characters + * Default UTF-16 + */ + private static final String CHARACTER_ENCODING = "UTF-16"; + + /** + * This message text. The byte form is set when this message is sent + * the text is set when the message is received. + */ + private String _messageText; + + //-- constructor + // todo + + //--- interface TextMessage + + public String getText() throws JMSException + { + return _messageText; + } + + /** + * Set the text (String) of this TextMessage. + * + * @param text The String containing the text. + * @throws JMSException If setting the text fails due some error. + * @throws javax.jms.MessageNotWriteableException + * If message in read-only mode. + */ + public void setText(String text) throws JMSException + { + isWriteable(); + _messageText = text; + } + + //-- Overwritten methods + + /** + * This method is invoked before this message is dispatched. + * <p>This class uses it to convert its text payload into a ByteBuffer + */ + public void beforeMessageDispatch() throws QpidException + { + if (_messageText != null) + { + // set this message data + try + { + setMessageData(ByteBuffer.wrap(_messageText.getBytes(CHARACTER_ENCODING))); + } + catch (UnsupportedEncodingException e) + { + throw new QpidException("Problem when encoding text " + _messageText, null, e); + } + } + super.beforeMessageDispatch(); + } + + + /** + * This method is invoked after this message has been received. + */ + public void afterMessageReceive() throws QpidException + { + super.afterMessageReceive(); + ByteBuffer data = getMessageData(); + if (data != null) + { + try + { + _messageText = new String(data.array(), CHARACTER_ENCODING); + } + catch (UnsupportedEncodingException e) + { + throw new QpidException("Problem when decoding text", null, e); + } + } + } + + /** + * Clear out the message body. Clearing a message's body does not clear + * its header values or property entries. + * <P>If this message body was read-only, calling this method leaves + * the message body is in the same state as an empty body in a newly + * created message. + * + * @throws JMSException If clearing this message body fails to due to some error. + */ + public void clearBody() throws JMSException + { + super.clearBody(); + _messageText = null; + } +} + diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/UnprocessedMessage.java deleted file mode 100644 index 605760bc96..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/UnprocessedMessage.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.jms.message; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.qpid.framing.BasicDeliverBody; -import org.apache.qpid.framing.BasicReturnBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; - -/** - * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and - * the content body/ies. - * - * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher - * thread in order to minimise the amount of work done in the MINA dispatcher thread. - */ -public class UnprocessedMessage -{ - private long _bytesReceived = 0; - - private final BasicDeliverBody _deliverBody; - private final BasicReturnBody _bounceBody; // TODO: check change (gustavo) - private final int _channelId; - private ContentHeaderBody _contentHeader; - - /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ - private List<ContentBody> _bodies; - - public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody) - { - _deliverBody = deliverBody; - _channelId = channelId; - _bounceBody = null; - } - - - public UnprocessedMessage(int channelId, BasicReturnBody bounceBody) - { - _deliverBody = null; - _channelId = channelId; - _bounceBody = bounceBody; - } - - public void receiveBody(ContentBody body) //throws UnexpectedBodyReceivedException - { - - if (body.payload != null) - { - final long payloadSize = body.payload.remaining(); - - if (_bodies == null) - { - if (payloadSize == getContentHeader().bodySize) - { - _bodies = Collections.singletonList(body); - } - else - { - _bodies = new ArrayList<ContentBody>(); - _bodies.add(body); - } - - } - else - { - _bodies.add(body); - } - _bytesReceived += payloadSize; - } - } - - public boolean isAllBodyDataReceived() - { - return _bytesReceived == getContentHeader().bodySize; - } - - public BasicDeliverBody getDeliverBody() - { - return _deliverBody; - } - - public BasicReturnBody getBounceBody() - { - return _bounceBody; - } - - - public int getChannelId() - { - return _channelId; - } - - - public ContentHeaderBody getContentHeader() - { - return _contentHeader; - } - - public void setContentHeader(ContentHeaderBody contentHeader) - { - this._contentHeader = contentHeader; - } - - public List<ContentBody> getBodies() - { - return _bodies; - } - -} |
