summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-01-07 09:44:58 +0000
committerKeith Wall <kwall@apache.org>2012-01-07 09:44:58 +0000
commitad776f381e2690c58c37c33d23b2389da1b2028e (patch)
tree2f67e684701205686f879b23f8b05b5697141822 /java
parent2878a86757b081d65790210df3925e2d8c9cf845 (diff)
downloadqpid-python-ad776f381e2690c58c37c33d23b2389da1b2028e.tar.gz
QPID-3715: Fix the receiving of an empty ObjectMessage (without a body)
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1228583 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java2
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java52
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java (renamed from java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java)18
-rw-r--r--java/common/src/test/java/org/apache/qpid/util/ByteBufferInputStreamTest.java111
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java17
6 files changed, 156 insertions, 46 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java b/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
index 5992e42fb7..9bfa0bb2fb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
@@ -29,8 +29,8 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.MessageMetaDataType;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.util.ByteBufferInputStream;
import org.apache.qpid.server.util.ByteBufferOutputStream;
+import org.apache.qpid.util.ByteBufferInputStream;
import java.io.*;
import java.nio.ByteBuffer;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index aad3547789..ce6bfe87e0 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -43,7 +43,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.util.ByteBufferInputStream;
+import org.apache.qpid.util.ByteBufferInputStream;
import java.io.DataInputStream;
import java.io.IOException;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
index c981c951c3..7f733b9644 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
@@ -29,6 +29,7 @@ import javax.jms.ObjectMessage;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.util.ClassLoadingAwareObjectInputStream;
+import org.apache.qpid.util.ByteBufferInputStream;
public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
{
@@ -62,26 +63,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
try
{
- ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new InputStream()
- {
-
-
- @Override
- public int read() throws IOException
- {
- return data.get();
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException
- {
- len = data.remaining() < len ? data.remaining() : len;
- data.get(b, off, len);
- return len;
- }
- });
-
- _readData = (Serializable) in.readObject();
+ _readData = read(data);
}
catch (IOException e)
{
@@ -93,7 +75,6 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
}
}
-
public void clearBody() throws JMSException
{
super.clearBody();
@@ -189,24 +170,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
final ByteBuffer data = _data.duplicate();
try
{
- ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new InputStream()
- {
- @Override
- public int read() throws IOException
- {
- return data.get();
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException
- {
- len = data.remaining() < len ? data.remaining() : len;
- data.get(b, off, len);
- return len;
- }
- });
-
- return (Serializable) in.readObject();
+ return read(data);
}
catch (ClassNotFoundException e)
{
@@ -224,4 +188,14 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
}
+ private Serializable read(final ByteBuffer data) throws IOException, ClassNotFoundException
+ {
+ Serializable result = null;
+ if (data != null && data.hasRemaining())
+ {
+ ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new ByteBufferInputStream(data));
+ result = (Serializable) in.readObject();
+ }
+ return result;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java b/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java
index 898a667736..b72b342187 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java
+++ b/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java
@@ -18,12 +18,15 @@
* under the License.
*
*/
-package org.apache.qpid.server.util;
+package org.apache.qpid.util;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+/**
+ * Wraps @link {@link ByteBuffer} into {@link InputStream}
+ */
public class ByteBufferInputStream extends InputStream
{
private final ByteBuffer _buffer;
@@ -36,13 +39,20 @@ public class ByteBufferInputStream extends InputStream
@Override
public int read() throws IOException
{
- return _buffer.get() & 0xFF;
+ if (_buffer.hasRemaining())
+ {
+ return _buffer.get() & 0xFF;
+ }
+ return -1;
}
-
@Override
public int read(byte[] b, int off, int len) throws IOException
{
+ if (!_buffer.hasRemaining())
+ {
+ return -1;
+ }
if(_buffer.remaining() < len)
{
len = _buffer.remaining();
@@ -73,9 +83,7 @@ public class ByteBufferInputStream extends InputStream
@Override
public long skip(long n) throws IOException
{
-
_buffer.position(_buffer.position()+(int)n);
-
return n;
}
diff --git a/java/common/src/test/java/org/apache/qpid/util/ByteBufferInputStreamTest.java b/java/common/src/test/java/org/apache/qpid/util/ByteBufferInputStreamTest.java
new file mode 100644
index 0000000000..0b393a489f
--- /dev/null
+++ b/java/common/src/test/java/org/apache/qpid/util/ByteBufferInputStreamTest.java
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+public class ByteBufferInputStreamTest extends TestCase
+{
+ private byte[] _data = {2, 1, 5, 3, 4};
+ private ByteBufferInputStream _inputStream;
+
+ public void setUp() throws Exception
+ {
+ _inputStream = new ByteBufferInputStream(ByteBuffer.wrap(_data));
+ }
+
+ public void testRead() throws IOException
+ {
+ for (int i = 0; i < _data.length; i++)
+ {
+ assertEquals("Unexpected byte at position " + i, _data[i], _inputStream.read());
+ }
+ assertEquals("EOF not reached", -1, _inputStream.read());
+ }
+
+ public void testReadByteArray() throws IOException
+ {
+ byte[] readBytes = new byte[_data.length];
+ int length = _inputStream.read(readBytes, 0, 2);
+
+ byte[] expected = new byte[_data.length];
+ System.arraycopy(_data, 0, expected, 0, 2);
+
+ assertTrue("Unexpected data", Arrays.equals(expected, readBytes));
+ assertEquals("Unexpected length", 2, length);
+
+ length = _inputStream.read(readBytes, 2, 3);
+
+ assertTrue("Unexpected data", Arrays.equals(_data, readBytes));
+ assertEquals("Unexpected length", 3, length);
+
+ length = _inputStream.read(readBytes);
+ assertEquals("EOF not reached", -1, length);
+ }
+
+ public void testSkip() throws IOException
+ {
+ _inputStream.skip(3);
+ byte[] readBytes = new byte[_data.length - 3];
+ int length = _inputStream.read(readBytes);
+
+ byte[] expected = new byte[_data.length - 3];
+ System.arraycopy(_data, 3, expected, 0, _data.length - 3);
+
+ assertTrue("Unexpected data", Arrays.equals(expected, readBytes));
+ assertEquals("Unexpected length", _data.length - 3, length);
+ }
+
+ public void testAvailable() throws IOException
+ {
+ int available = _inputStream.available();
+ assertEquals("Unexpected number of available bytes", _data.length, available);
+ byte[] readBytes = new byte[_data.length];
+ _inputStream.read(readBytes);
+ available = _inputStream.available();
+ assertEquals("Unexpected number of available bytes", 0, available);
+ }
+
+ public void testMarkReset() throws IOException
+ {
+ _inputStream.mark(0);
+ byte[] readBytes = new byte[_data.length];
+ int length = _inputStream.read(readBytes);
+ assertEquals("Unexpected length", _data.length, length);
+ assertEquals("Unexpected number of available bytes", 0, _inputStream.available());
+
+ _inputStream.reset();
+ readBytes = new byte[_data.length];
+ length = _inputStream.read(readBytes);
+ assertEquals("Unexpected length", _data.length, length);
+ assertEquals("Unexpected number of available bytes", 0, _inputStream.available());
+ }
+
+ public void testMarkSupported() throws IOException
+ {
+ assertTrue("Unexpected mark supported", _inputStream.markSupported());
+ }
+
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java
index 147a03be0c..fa16152b69 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java
@@ -138,4 +138,21 @@ public class ObjectMessageTest extends QpidBrokerTestCase
assertEquals("Second read: UUIDs were not equal", sent, result);
}
+
+
+ public void testSendEmptyObjectMessage() throws JMSException
+ {
+ ObjectMessage testMessage = _session.createObjectMessage();
+ testMessage.setStringProperty("test-property", "test-value");
+ assertNotNull("Object was null", testMessage.toString());
+
+ _producer.send(testMessage);
+
+ ObjectMessage receivedMessage = (ObjectMessage) _consumer.receive(1000);
+
+ assertNotNull("Message was not received.", receivedMessage);
+ assertNull("No object was sent", receivedMessage.getObject());
+ assertEquals("Unexpected property received", "test-value", receivedMessage.getStringProperty("test-property"));
+ }
+
}