summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-03-17 16:44:47 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-03-17 16:44:47 +0000
commitba09630a4258cded77842e1bd5d746b8fbda0cfe (patch)
treeda5fd4e29ce839185c6759a1a141fb2d65f0250c /qpid/java/client
parent1533a95469482a820d3f883c44e7e92fa02c5eb3 (diff)
downloadqpid-python-ba09630a4258cded77842e1bd5d746b8fbda0cfe.tar.gz
QPID-4000 : [Java Broker] Add conversion of 0-x messages to 1-0 subscriptions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1457482 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java36
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java45
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java163
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java46
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java674
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java370
6 files changed, 212 insertions, 1122 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
index b0320d0f4e..6ffa051ff8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
@@ -20,13 +20,16 @@
*/
package org.apache.qpid.client.message;
-import org.apache.qpid.AMQException;
-
+import java.io.EOFException;
+import java.nio.ByteBuffer;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MessageEOFException;
import javax.jms.MessageFormatException;
-import java.nio.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesContentWriter;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesMessage
{
@@ -100,7 +103,14 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM
private void checkAvailable(final int i) throws MessageEOFException
{
- _typedBytesContentReader.checkAvailable(1);
+ try
+ {
+ _typedBytesContentReader.checkAvailable(1);
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
}
public byte readByte() throws JMSException
@@ -178,7 +188,14 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM
// we check only for one byte since theoretically the string could be only a
// single byte when using UTF-8 encoding
- return _typedBytesContentReader.readLengthPrefixedUTF();
+ try
+ {
+ return _typedBytesContentReader.readLengthPrefixedUTF();
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public int readBytes(byte[] bytes) throws JMSException
@@ -275,7 +292,14 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM
public void writeUTF(String string) throws JMSException
{
checkWritable();
- _typedBytesContentWriter.writeLengthPrefixedUTF(string);
+ try
+ {
+ _typedBytesContentWriter.writeLengthPrefixedUTF(string);
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public void writeBytes(byte[] bytes) throws JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
index e18ed80f6d..0b05179215 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
@@ -20,18 +20,21 @@
*/
package org.apache.qpid.client.message;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.AMQException;
-
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
+import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesContentWriter;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMessage
{
@@ -455,9 +458,22 @@ public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMe
final int entries = reader.readIntImpl();
for (int i = 0; i < entries; i++)
{
- String propName = reader.readStringImpl();
- Object value = reader.readObject();
- _map.put(propName, value);
+ String propName = null;
+ try
+ {
+ propName = reader.readStringImpl();
+ Object value = reader.readObject();
+ _map.put(propName, value);
+
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
}
}
else
@@ -477,7 +493,14 @@ public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMe
{
writer.writeNullTerminatedStringImpl(entry.getKey());
- writer.writeObject(entry.getValue());
+ try
+ {
+ writer.writeObject(entry.getValue());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
return writer.getData();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
index b1af262580..223facbb59 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -20,11 +20,16 @@
*/
package org.apache.qpid.client.message;
-import org.apache.qpid.AMQException;
-
+import java.io.EOFException;
+import java.nio.ByteBuffer;
import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
import javax.jms.StreamMessage;
-import java.nio.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesContentWriter;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
/**
* @author Apache Software Foundation
@@ -95,20 +100,53 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea
public boolean readBoolean() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readBoolean();
+ try
+ {
+ return _typedBytesContentReader.readBoolean();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public byte readByte() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readByte();
+ try
+ {
+ return _typedBytesContentReader.readByte();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public short readShort() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readShort();
+ try
+ {
+ return _typedBytesContentReader.readShort();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
/**
@@ -120,37 +158,103 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea
public char readChar() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readChar();
+ try
+ {
+ return _typedBytesContentReader.readChar();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public int readInt() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readInt();
+ try
+ {
+ return _typedBytesContentReader.readInt();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public long readLong() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readLong();
+ try
+ {
+ return _typedBytesContentReader.readLong();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public float readFloat() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readFloat();
+ try
+ {
+ return _typedBytesContentReader.readFloat();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public double readDouble() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readDouble();
+ try
+ {
+ return _typedBytesContentReader.readDouble();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public String readString() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readString();
+ try
+ {
+ return _typedBytesContentReader.readString();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public int readBytes(byte[] bytes) throws JMSException
@@ -161,14 +265,36 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea
}
checkReadable();
- return _typedBytesContentReader.readBytes(bytes);
+ try
+ {
+ return _typedBytesContentReader.readBytes(bytes);
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public Object readObject() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readObject();
+ try
+ {
+ return _typedBytesContentReader.readObject();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public void writeBoolean(boolean b) throws JMSException
@@ -240,6 +366,13 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea
public void writeObject(Object object) throws JMSException
{
checkWritable();
- _typedBytesContentWriter.writeObject(object);
+ try
+ {
+ _typedBytesContentWriter.writeObject(object);
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java
deleted file mode 100644
index 26a0b41cdc..0000000000
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.message;
-
-public interface TypedBytesCodes
-{
- static final byte BOOLEAN_TYPE = (byte) 1;
-
- static final byte BYTE_TYPE = (byte) 2;
-
- static final byte BYTEARRAY_TYPE = (byte) 3;
-
- static final byte SHORT_TYPE = (byte) 4;
-
- static final byte CHAR_TYPE = (byte) 5;
-
- static final byte INT_TYPE = (byte) 6;
-
- static final byte LONG_TYPE = (byte) 7;
-
- static final byte FLOAT_TYPE = (byte) 8;
-
- static final byte DOUBLE_TYPE = (byte) 9;
-
- static final byte STRING_TYPE = (byte) 10;
-
- static final byte NULL_STRING_TYPE = (byte) 11;
-}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java
deleted file mode 100644
index b00ac7e34b..0000000000
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java
+++ /dev/null
@@ -1,674 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.message;
-
-import javax.jms.JMSException;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageNotReadableException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
-
-class TypedBytesContentReader implements TypedBytesCodes
-{
-
- private final ByteBuffer _data;
- private final int _position;
- private final int _limit;
-
-
- private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
-
- private final CharsetDecoder _charsetDecoder = UTF8_CHARSET.newDecoder();
-
- private int _byteArrayRemaining = -1;
-
-
- public TypedBytesContentReader(final ByteBuffer data)
- {
- _data = data.duplicate();
- _position = _data.position();
- _limit = _data.limit();
- }
-
- /**
- * Check that there is at least a certain number of bytes available to read
- *
- * @param len the number of bytes
- * @throws javax.jms.MessageEOFException if there are less than len bytes available to read
- */
- protected void checkAvailable(int len) throws MessageEOFException
- {
- if (_data.remaining() < len)
- {
- throw new MessageEOFException("Unable to read " + len + " bytes");
- }
- }
-
- protected byte readWireType() throws MessageFormatException, MessageEOFException,
- MessageNotReadableException
- {
- checkAvailable(1);
- return _data.get();
- }
-
- protected boolean readBoolean() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- boolean result;
- try
- {
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Boolean.parseBoolean(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- boolean readBooleanImpl()
- {
- return _data.get() != 0;
- }
-
- protected byte readByte() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- byte result;
- try
- {
- switch (wireType)
- {
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Byte.parseByte(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
- }
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- return result;
- }
-
- byte readByteImpl()
- {
- return _data.get();
- }
-
- protected short readShort() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- short result;
- try
- {
- switch (wireType)
- {
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Short.parseShort(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a short");
- }
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- return result;
- }
-
- short readShortImpl()
- {
- return _data.getShort();
- }
-
- /**
- * Note that this method reads a unicode character as two bytes from the stream
- *
- * @return the character read from the stream
- * @throws javax.jms.JMSException
- */
- protected char readChar() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- try
- {
- if (wireType == NULL_STRING_TYPE)
- {
- throw new NullPointerException();
- }
-
- if (wireType != CHAR_TYPE)
- {
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a char");
- }
- else
- {
- checkAvailable(2);
- return readCharImpl();
- }
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- char readCharImpl()
- {
- return _data.getChar();
- }
-
- protected int readInt() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- int result;
- try
- {
- switch (wireType)
- {
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Integer.parseInt(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to an int");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- protected int readIntImpl()
- {
- return _data.getInt();
- }
-
- protected long readLong() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- long result;
- try
- {
- switch (wireType)
- {
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Long.parseLong(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a long");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- long readLongImpl()
- {
- return _data.getLong();
- }
-
- protected float readFloat() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- float result;
- try
- {
- switch (wireType)
- {
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Float.parseFloat(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a float");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- float readFloatImpl()
- {
- return _data.getFloat();
- }
-
- protected double readDouble() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- double result;
- try
- {
- switch (wireType)
- {
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Double.parseDouble(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a double");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- double readDoubleImpl()
- {
- return _data.getDouble();
- }
-
- protected String readString() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- String result;
- try
- {
- switch (wireType)
- {
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
- case NULL_STRING_TYPE:
- result = null;
- throw new NullPointerException("data is null");
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = String.valueOf(readBooleanImpl());
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = String.valueOf(readLongImpl());
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readIntImpl());
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = String.valueOf(readShortImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = String.valueOf(readByteImpl());
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readFloatImpl());
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = String.valueOf(readDoubleImpl());
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = String.valueOf(readCharImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a String");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- protected String readStringImpl() throws JMSException
- {
- try
- {
- _charsetDecoder.reset();
- ByteBuffer dup = _data.duplicate();
- int pos = _data.position();
- byte b;
- while((b = _data.get()) != 0) {};
- dup.limit(_data.position()-1);
- return _charsetDecoder.decode(dup).toString();
-
- }
- catch (CharacterCodingException e)
- {
- JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
- }
- }
-
- protected int readBytes(byte[] bytes) throws JMSException
- {
- if (bytes == null)
- {
- throw new IllegalArgumentException("byte array must not be null");
- }
- // first call
- if (_byteArrayRemaining == -1)
- {
- // type discriminator checked separately so you get a MessageFormatException rather than
- // an EOF even in the case where both would be applicable
- checkAvailable(1);
- byte wireType = readWireType();
- if (wireType != BYTEARRAY_TYPE)
- {
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
- }
- checkAvailable(4);
- int size = _data.getInt();
- // length of -1 indicates null
- if (size == -1)
- {
- return -1;
- }
- else
- {
- if (size > _data.remaining())
- {
- throw new MessageEOFException("Byte array has stated length "
- + size
- + " but message only contains "
- +
- _data.remaining()
- + " bytes");
- }
- else
- {
- _byteArrayRemaining = size;
- }
- }
- }
- else if (_byteArrayRemaining == 0)
- {
- _byteArrayRemaining = -1;
- return -1;
- }
-
- int returnedSize = readBytesImpl(bytes);
- if (returnedSize < bytes.length)
- {
- _byteArrayRemaining = -1;
- }
- return returnedSize;
- }
-
- private int readBytesImpl(byte[] bytes)
- {
- int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
- _byteArrayRemaining -= count;
-
- if (count == 0)
- {
- return 0;
- }
- else
- {
- _data.get(bytes, 0, count);
- return count;
- }
- }
-
- protected Object readObject() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- Object result = null;
- try
- {
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case BYTEARRAY_TYPE:
- checkAvailable(4);
- int size = _data.getInt();
- if (size == -1)
- {
- result = null;
- }
- else
- {
- _byteArrayRemaining = size;
- byte[] bytesResult = new byte[size];
- readBytesImpl(bytesResult);
- result = bytesResult;
- }
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = readCharImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case NULL_STRING_TYPE:
- result = null;
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- public void reset()
- {
- _byteArrayRemaining = -1;
- _data.position(_position);
- _data.limit(_limit);
- }
-
- public ByteBuffer getData()
- {
- ByteBuffer buf = _data.duplicate();
- buf.position(_position);
- buf.limit(_limit);
- return buf;
- }
-
- public long size()
- {
- return _limit - _position;
- }
-
- public int remaining()
- {
- return _data.remaining();
- }
-
- public void readRawBytes(final byte[] bytes, final int offset, final int count)
- {
- _data.get(bytes, offset, count);
- }
-
- public String readLengthPrefixedUTF() throws JMSException
- {
- try
- {
- short length = readShortImpl();
- if(length == 0)
- {
- return "";
- }
- else
- {
- _charsetDecoder.reset();
- ByteBuffer encodedString = _data.slice();
- encodedString.limit(length);
- _data.position(_data.position()+length);
- CharBuffer string = _charsetDecoder.decode(encodedString);
-
- return string.toString();
- }
- }
- catch(CharacterCodingException e)
- {
- JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
- }
- }
-}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java
deleted file mode 100644
index 7c91db3a32..0000000000
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.message;
-
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetEncoder;
-
-class TypedBytesContentWriter implements TypedBytesCodes
-{
- private final ByteArrayOutputStream _baos = new ByteArrayOutputStream();
- private final DataOutputStream _data = new DataOutputStream(_baos);
- private static final Charset UTF8 = Charset.forName("UTF-8");
-
- protected void writeTypeDiscriminator(byte type) throws JMSException
- {
- try
- {
- _data.writeByte(type);
- }
- catch (IOException e)
- {
- throw handle(e);
- }
- }
-
- private JMSException handle(final IOException e)
- {
- JMSException jmsEx = new JMSException("Unable to write value: " + e.getMessage());
- jmsEx.setLinkedException(e);
- return jmsEx;
- }
-
-
- protected void writeBoolean(boolean b) throws JMSException
- {
- writeTypeDiscriminator(BOOLEAN_TYPE);
- writeBooleanImpl(b);
- }
-
- public void writeBooleanImpl(final boolean b) throws JMSException
- {
- try
- {
- _data.writeByte(b ? (byte) 1 : (byte) 0);
- }
- catch (IOException e)
- {
- throw handle(e);
- }
- }
-
- protected void writeByte(byte b) throws JMSException
- {
- writeTypeDiscriminator(BYTE_TYPE);
- writeByteImpl(b);
- }
-
- public void writeByteImpl(final byte b) throws JMSException
- {
- try
- {
- _data.writeByte(b);
- }
- catch (IOException e)
- {
- throw handle(e);
- }
- }
-
- protected void writeShort(short i) throws JMSException
- {
- writeTypeDiscriminator(SHORT_TYPE);
- writeShortImpl(i);
- }
-
- public void writeShortImpl(final short i) throws JMSException
- {
- try
- {
- _data.writeShort(i);
- }
- catch (IOException e)
- {
- throw handle(e);
- }
- }
-
- protected void writeChar(char c) throws JMSException
- {
- writeTypeDiscriminator(CHAR_TYPE);
- writeCharImpl(c);
- }
-
- public void writeCharImpl(final char c) throws JMSException
- {
- try
- {
- _data.writeChar(c);
- }
- catch (IOException e)
- {
- throw handle(e);
- }
- }
-
- protected void writeInt(int i) throws JMSException
- {
- writeTypeDiscriminator(INT_TYPE);
- writeIntImpl(i);
- }
-
- protected void writeIntImpl(int i) throws JMSException
- {
- try
- {
- _data.writeInt(i);
- }
- catch (IOException e)
- {
- throw handle(e);
- }
- }
-
- protected void writeLong(long l) throws JMSException
- {
- writeTypeDiscriminator(LONG_TYPE);
- writeLongImpl(l);
- }
-
- public void writeLongImpl(final long l) throws JMSException
- {
- try
- {
- _data.writeLong(l);
- }
- catch (IOException e)
- {
- throw handle(e);
- }
- }
-
- protected void writeFloat(float v) throws JMSException
- {
- writeTypeDiscriminator(FLOAT_TYPE);
- writeFloatImpl(v);
- }
-
- public void writeFloatImpl(final float v) throws JMSException
- {
- try
- {
- _data.writeFloat(v);
- }
- catch (IOException e)
- {
- throw handle(e);
- }
- }
-
- protected void writeDouble(double v) throws JMSException
- {
- writeTypeDiscriminator(DOUBLE_TYPE);
- writeDoubleImpl(v);
- }
-
- public void writeDoubleImpl(final double v) throws JMSException
- {
- try
- {
- _data.writeDouble(v);
- }
- catch (IOException e)
- {
- throw handle(e);
- }
- }
-
- protected void writeString(String string) throws JMSException
- {
- if (string == null)
- {
- writeTypeDiscriminator(NULL_STRING_TYPE);
- }
- else
- {
- writeTypeDiscriminator(STRING_TYPE);
- writeNullTerminatedStringImpl(string);
- }
- }
-
- protected void writeNullTerminatedStringImpl(String string)
- throws JMSException
- {
- try
- {
- _data.write(string.getBytes(UTF8));
- _data.writeByte((byte) 0);
- }
- catch (IOException e)
- {
- throw handle(e);
- }
-
- }
-
- protected void writeBytes(byte[] bytes) throws JMSException
- {
- writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
- }
-
- protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException
- {
- writeTypeDiscriminator(BYTEARRAY_TYPE);
- writeBytesImpl(bytes, offset, length);
- }
-
- public void writeBytesImpl(final byte[] bytes, final int offset, final int length) throws JMSException
- {
- try
- {
- if (bytes == null)
- {
- _data.writeInt(-1);
- }
- else
- {
- _data.writeInt(length);
- _data.write(bytes, offset, length);
- }
- }
- catch (IOException e)
- {
- throw handle(e);
- }
- }
-
- public void writeBytesRaw(final byte[] bytes, final int offset, final int length) throws JMSException
- {
- try
- {
- if (bytes != null)
- {
- _data.write(bytes, offset, length);
- }
- }
- catch (IOException e)
- {
- throw handle(e);
- }
- }
-
-
- protected void writeObject(Object object) throws JMSException
- {
- Class clazz;
-
- if (object == null)
- {
- // string handles the output of null values
- clazz = String.class;
- }
- else
- {
- clazz = object.getClass();
- }
-
- if (clazz == Byte.class)
- {
- writeByte((Byte) object);
- }
- else if (clazz == Boolean.class)
- {
- writeBoolean((Boolean) object);
- }
- else if (clazz == byte[].class)
- {
- writeBytes((byte[]) object);
- }
- else if (clazz == Short.class)
- {
- writeShort((Short) object);
- }
- else if (clazz == Character.class)
- {
- writeChar((Character) object);
- }
- else if (clazz == Integer.class)
- {
- writeInt((Integer) object);
- }
- else if (clazz == Long.class)
- {
- writeLong((Long) object);
- }
- else if (clazz == Float.class)
- {
- writeFloat((Float) object);
- }
- else if (clazz == Double.class)
- {
- writeDouble((Double) object);
- }
- else if (clazz == String.class)
- {
- writeString((String) object);
- }
- else
- {
- throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
- }
- }
-
- public ByteBuffer getData()
- {
- return ByteBuffer.wrap(_baos.toByteArray());
- }
-
- public void writeLengthPrefixedUTF(final String string) throws JMSException
- {
- try
- {
- CharsetEncoder encoder = UTF8.newEncoder();
- java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string));
-
- writeShortImpl((short) encodedString.limit());
- while(encodedString.hasRemaining())
- {
- _data.writeByte(encodedString.get());
- }
- }
- catch (CharacterCodingException e)
- {
- JMSException jmse = new JMSException("Unable to encode string: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
- }
- catch (IOException e)
- {
- throw handle(e);
- }
-
- }
-}