summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-01-27 23:44:44 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-01-27 23:44:44 +0000
commit0694a2fef88073eba1614d4f4b018bf9ca56f958 (patch)
treeec20540fc0828bb11d6a0a83932da6a170bdfeb5 /java
parent3fbcf5545919883c00e6d3c284ce5f16b1d7e79b (diff)
downloadqpid-python-0694a2fef88073eba1614d4f4b018bf9ca56f958.tar.gz
This is related to QPID-2363
I am comitting the patch as it is. I will make the agreed changes in a subsequent commit shortly. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@903911 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java19
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java56
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java20
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java73
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java25
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java1
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java138
10 files changed, 328 insertions, 21 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 2346ab5626..899d63af4e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -61,6 +61,11 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
+import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+import org.apache.qpid.client.message.JMSMapMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+
/**
* This is a 0.10 Session
*/
@@ -122,6 +127,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private TimerTask flushTask = null;
private RangeSet unacked = new RangeSet();
private int unackedCount = 0;
+ private boolean useAMQPEncodedMapMessage = !Boolean.getBoolean("qpid.use_legacy_map_message");
/**
* USed to store the range of in tx messages
@@ -933,4 +939,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return AMQMessageDelegateFactory.FACTORY_0_10;
}
+ @ Override
+ public MapMessage createMapMessage() throws JMSException
+ {
+ checkNotClosed();
+ if (useAMQPEncodedMapMessage)
+ {
+ return new AMQPEncodedMapMessage(AMQMessageDelegateFactory.FACTORY_0_10);
+ }
+ else
+ {
+ return new JMSMapMessage(AMQMessageDelegateFactory.FACTORY_0_10);
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
index 314508805d..fe919ddd9e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
@@ -25,7 +25,10 @@ import org.apache.qpid.client.AMQSession;
import javax.jms.Destination;
import javax.jms.JMSException;
+
+import java.nio.ByteBuffer;
import java.util.Enumeration;
+import java.util.Map;
import java.util.UUID;
public interface AMQMessageDelegate
@@ -134,4 +137,8 @@ public interface AMQMessageDelegate
long getDeliveryTag();
void setJMSMessageID(final UUID messageId) throws JMSException;
+
+ ByteBuffer encodeMap(Map<String,Object> map);
+
+ Map<String,Object> decodeMap(ByteBuffer buf);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index 1479f06632..053bfe095d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -21,26 +21,36 @@
package org.apache.qpid.client.message;
-import org.apache.commons.collections.map.ReferenceMap;
-import org.apache.qpid.client.*;
-import org.apache.qpid.framing.ContentHeaderProperties;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.jms.Message;
-import org.apache.qpid.url.BindingURL;
-import org.apache.qpid.url.AMQBindingURL;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.transport.*;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
-import javax.jms.MessageNotWriteableException;
import javax.jms.MessageFormatException;
-import javax.jms.DeliveryMode;
-import java.util.*;
+import javax.jms.MessageNotWriteableException;
+
+import org.apache.commons.collections.map.ReferenceMap;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.Message;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.Future;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.transport.codec.BBEncoder;
/**
* This extends AbstractAMQMessageDelegate which contains common code between
@@ -912,4 +922,18 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
return _deliveryProps;
}
+
+ public java.nio.ByteBuffer encodeMap(Map<String,Object> map)
+ {
+ BBEncoder encoder = new BBEncoder(1024);
+ encoder.writeMap(map);
+ return encoder.segment();
+ }
+
+ public Map<String,Object> decodeMap(java.nio.ByteBuffer buf)
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(buf);
+ return decoder.readMap();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
index 5157632280..2a3bd20f11 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -573,5 +573,25 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
return _deliveryTag;
}
+
+ public java.nio.ByteBuffer encodeMap(Map<String,Object> map)
+ {
+ String errorMsg = "There is no support for encoding maps";
+ if (_session != null)
+ {
+ errorMsg = errorMsg + " in AMQP " + _session.getAMQConnection().getProtocolVersion();
+ }
+ throw new UnsupportedOperationException(errorMsg);
+ }
+
+ public Map<String,Object> decodeMap(java.nio.ByteBuffer buf)
+ {
+ String errorMsg = "There is no support for encoding maps";
+ if (_session != null)
+ {
+ errorMsg = errorMsg + " in AMQP " + _session.getAMQConnection().getProtocolVersion();
+ }
+ throw new UnsupportedOperationException(errorMsg);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
new file mode 100644
index 0000000000..4412476b45
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
@@ -0,0 +1,73 @@
+package org.apache.qpid.client.message;
+
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+
+public class AMQPEncodedMapMessage extends JMSMapMessage
+{
+ public static final String MIME_TYPE = "amqp/map";
+
+ public AMQPEncodedMapMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
+ {
+ this(delegateFactory, null);
+ }
+
+ AMQPEncodedMapMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException
+ {
+ super(delegateFactory, data);
+ }
+
+ AMQPEncodedMapMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+ {
+ super(delegate, data);
+ }
+
+ @ Override
+ protected String getMimeType()
+ {
+ return MIME_TYPE;
+ }
+
+ // The super clas methods resets the buffer
+ @ Override
+ public ByteBuffer getData()
+ {
+ writeMapToData();
+ return _data;
+ }
+
+ @ Override
+ protected void populateMapFromData() throws JMSException
+ {
+ if (_data != null)
+ {
+ _data.rewind();
+ _map = _delegate.decodeMap(_data.buf());
+ }
+ else
+ {
+ _map.clear();
+ }
+ }
+
+ @ Override
+ protected void writeMapToData()
+ {
+ _data = ByteBuffer.wrap(_delegate.encodeMap(_map));
+ }
+
+ // for testing
+ Map<String,Object> getMap()
+ {
+ return _map;
+ }
+
+ void setMap(Map<String,Object> map)
+ {
+ _map = map;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
new file mode 100644
index 0000000000..37ade9cf80
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
@@ -0,0 +1,25 @@
+package org.apache.qpid.client.message;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+
+public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory
+{
+
+ @Override
+ protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate,
+ ByteBuffer data) throws AMQException
+ {
+ return new AMQPEncodedMapMessage(delegate,data);
+ }
+
+ @Override
+ public AbstractJMSMessage createMessage(
+ AMQMessageDelegateFactory delegateFactory) throws JMSException
+ {
+ return new AMQPEncodedMapMessage(delegateFactory);
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 288a4ea85c..6ba55b207a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -49,7 +49,7 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
- private AMQMessageDelegate _delegate;
+ protected AMQMessageDelegate _delegate;
private boolean _redelivered;
protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
@@ -379,6 +379,7 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
buf.append("\nJMS Destination: ").append(getJMSDestination());
buf.append("\nJMS Type: ").append(getJMSType());
buf.append("\nJMS MessageID: ").append(getJMSMessageID());
+ buf.append("\nJMS Content-Type: ").append(getContentType());
buf.append("\nAMQ message number: ").append(getDeliveryTag());
buf.append("\nProperties:");
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
index b6e013ac8f..73f3afab03 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
@@ -44,8 +44,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
public static final String MIME_TYPE = "jms/map-message";
-
- private Map<String, Object> _map = new HashMap<String, Object>();
+ protected Map<String, Object> _map = new HashMap<String, Object>();
public JMSMapMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
@@ -459,7 +458,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
return _map.containsKey(propName);
}
- private void populateMapFromData() throws JMSException
+ protected void populateMapFromData() throws JMSException
{
if (_data != null)
{
@@ -479,7 +478,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
}
}
- private void writeMapToData()
+ protected void writeMapToData()
{
allocateInitialBuffer();
final int size = _map.size();
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index a7d41e2cde..4e4061cf4d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -64,6 +64,7 @@ public class MessageFactoryRegistry
mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
+ mf.registerFactory(AMQPEncodedMapMessage.MIME_TYPE, new AMQPEncodedMapMessageFactory());
mf.registerFactory(null, mf._default);
return mf;
diff --git a/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java b/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java
new file mode 100644
index 0000000000..8852ff82cd
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java
@@ -0,0 +1,138 @@
+package org.apache.qpid.client.message;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+
+public class AMQPEncodedMapMessageTest extends QpidTestCase
+{
+ private Connection _connection;
+ private Session _session;
+ MessageConsumer _consumer;
+ MessageProducer _producer;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ //Create Connection
+ _connection = getConnection();
+
+ //Create Session
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Create Queue
+ String queueName = getTestQueueName();
+ Queue queue = _session.createQueue(queueName);
+
+ //Create Consumer
+ _consumer = _session.createConsumer(queue);
+
+ //Create Producer
+ _producer = _session.createProducer(queue);
+
+ _connection.start();
+ }
+
+ public void testEmptyMessage() throws JMSException
+ {
+ if (((AMQConnection)_connection).getProtocolVersion() == ProtocolVersion.v0_10)
+ {
+ MapMessage m = _session.createMapMessage();
+ _producer.send(m);
+ AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message was not received on time",msg);
+ assertEquals("Message content-type is incorrect",
+ AMQPEncodedMapMessage.MIME_TYPE,
+ ((AbstractJMSMessage)msg).getContentType());
+
+ assertEquals("Message content should be an empty map",
+ Collections.EMPTY_MAP,
+ ((AMQPEncodedMapMessage)msg).getMap());
+
+ }
+ }
+
+ public void testNullMessage() throws JMSException
+ {
+ if (((AMQConnection)_connection).getProtocolVersion() == ProtocolVersion.v0_10)
+ {
+ MapMessage m = _session.createMapMessage();
+ ((AMQPEncodedMapMessage)m).setMap(null);
+ _producer.send(m);
+ AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message was not received on time",msg);
+ assertEquals("Message content-type is incorrect",
+ AMQPEncodedMapMessage.MIME_TYPE,
+ ((AbstractJMSMessage)msg).getContentType());
+
+ assertEquals("Message content should be null",
+ null,
+ ((AMQPEncodedMapMessage)msg).getMap());
+
+ }
+ }
+
+ public void testMessageWithContent() throws JMSException
+ {
+ if (((AMQConnection)_connection).getProtocolVersion() == ProtocolVersion.v0_10)
+ {
+ MapMessage m = _session.createMapMessage();
+ m.setBoolean("Boolean", true);
+ m.setByte("Byte", (byte)5);
+ byte[] bytes = new byte[]{(byte)5,(byte)8};
+ m.setBytes("Bytes", bytes);
+ m.setChar("Char", 'X');
+ m.setDouble("Double", 56.84);
+ m.setFloat("Float", Integer.MAX_VALUE + 5000);
+ m.setInt("Int", Integer.MAX_VALUE - 5000);
+ m.setShort("Short", (short)58);
+ m.setString("String", "Hello");
+ _producer.send(m);
+
+ AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message was not received on time",msg);
+ assertEquals("Message content-type is incorrect",
+ AMQPEncodedMapMessage.MIME_TYPE,
+ ((AbstractJMSMessage)msg).getContentType());
+
+ assertEquals(true,m.getBoolean("Boolean"));
+ assertEquals((byte)5,m.getByte("Byte"));
+ byte[] bytesRcv = m.getBytes("Bytes");
+ assertNotNull("Byte array is null",bytesRcv);
+ assertEquals((byte)5,bytesRcv[0]);
+ assertEquals((byte)8,bytesRcv[1]);
+ assertEquals('X',m.getChar("Char"));
+ assertEquals(56.84,m.getDouble("Double"));
+ //assertEquals(Integer.MAX_VALUE + 5000,m.getFloat("Float"));
+ assertEquals(Integer.MAX_VALUE - 5000,m.getInt("Int"));
+ assertEquals((short)58,m.getShort("Short"));
+ assertEquals("Hello",m.getString("String"));
+ }
+ }
+
+ public void tearDown() throws Exception
+ {
+ //clean up
+ _connection.close();
+
+ super.tearDown();
+ }
+}