diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-01-27 23:44:44 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-01-27 23:44:44 +0000 |
| commit | 0694a2fef88073eba1614d4f4b018bf9ca56f958 (patch) | |
| tree | ec20540fc0828bb11d6a0a83932da6a170bdfeb5 /java/client | |
| parent | 3fbcf5545919883c00e6d3c284ce5f16b1d7e79b (diff) | |
| download | qpid-python-0694a2fef88073eba1614d4f4b018bf9ca56f958.tar.gz | |
This is related to QPID-2363
I am comitting the patch as it is.
I will make the agreed changes in a subsequent commit shortly.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@903911 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
9 files changed, 190 insertions, 21 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 2346ab5626..899d63af4e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -61,6 +61,11 @@ import java.util.Map; import java.util.Timer; import java.util.TimerTask; +import org.apache.qpid.client.message.AMQPEncodedMapMessage; +import org.apache.qpid.client.message.JMSMapMessage; +import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; + /** * This is a 0.10 Session */ @@ -122,6 +127,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private TimerTask flushTask = null; private RangeSet unacked = new RangeSet(); private int unackedCount = 0; + private boolean useAMQPEncodedMapMessage = !Boolean.getBoolean("qpid.use_legacy_map_message"); /** * USed to store the range of in tx messages @@ -933,4 +939,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return AMQMessageDelegateFactory.FACTORY_0_10; } + @ Override + public MapMessage createMapMessage() throws JMSException + { + checkNotClosed(); + if (useAMQPEncodedMapMessage) + { + return new AMQPEncodedMapMessage(AMQMessageDelegateFactory.FACTORY_0_10); + } + else + { + return new JMSMapMessage(AMQMessageDelegateFactory.FACTORY_0_10); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java index 314508805d..fe919ddd9e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java @@ -25,7 +25,10 @@ import org.apache.qpid.client.AMQSession; import javax.jms.Destination; import javax.jms.JMSException; + +import java.nio.ByteBuffer; import java.util.Enumeration; +import java.util.Map; import java.util.UUID; public interface AMQMessageDelegate @@ -134,4 +137,8 @@ public interface AMQMessageDelegate long getDeliveryTag(); void setJMSMessageID(final UUID messageId) throws JMSException; + + ByteBuffer encodeMap(Map<String,Object> map); + + Map<String,Object> decodeMap(ByteBuffer buf); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index 1479f06632..053bfe095d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -21,26 +21,36 @@ package org.apache.qpid.client.message; -import org.apache.commons.collections.map.ReferenceMap; -import org.apache.qpid.client.*; -import org.apache.qpid.framing.ContentHeaderProperties; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQPInvalidClassException; -import org.apache.qpid.jms.Message; -import org.apache.qpid.url.BindingURL; -import org.apache.qpid.url.AMQBindingURL; -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.transport.*; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.MessageNotWriteableException; import javax.jms.MessageFormatException; -import javax.jms.DeliveryMode; -import java.util.*; +import javax.jms.MessageNotWriteableException; + +import org.apache.commons.collections.map.ReferenceMap; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.Message; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.ExchangeQueryResult; +import org.apache.qpid.transport.Future; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageDeliveryPriority; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.ReplyTo; +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.transport.codec.BBEncoder; /** * This extends AbstractAMQMessageDelegate which contains common code between @@ -912,4 +922,18 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate return _deliveryProps; } + + public java.nio.ByteBuffer encodeMap(Map<String,Object> map) + { + BBEncoder encoder = new BBEncoder(1024); + encoder.writeMap(map); + return encoder.segment(); + } + + public Map<String,Object> decodeMap(java.nio.ByteBuffer buf) + { + BBDecoder decoder = new BBDecoder(); + decoder.init(buf); + return decoder.readMap(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java index 5157632280..2a3bd20f11 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java @@ -573,5 +573,25 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate return _deliveryTag; } + + public java.nio.ByteBuffer encodeMap(Map<String,Object> map) + { + String errorMsg = "There is no support for encoding maps"; + if (_session != null) + { + errorMsg = errorMsg + " in AMQP " + _session.getAMQConnection().getProtocolVersion(); + } + throw new UnsupportedOperationException(errorMsg); + } + + public Map<String,Object> decodeMap(java.nio.ByteBuffer buf) + { + String errorMsg = "There is no support for encoding maps"; + if (_session != null) + { + errorMsg = errorMsg + " in AMQP " + _session.getAMQConnection().getProtocolVersion(); + } + throw new UnsupportedOperationException(errorMsg); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java new file mode 100644 index 0000000000..4412476b45 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java @@ -0,0 +1,73 @@ +package org.apache.qpid.client.message; + +import java.util.Map; + +import javax.jms.JMSException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; + +public class AMQPEncodedMapMessage extends JMSMapMessage +{ + public static final String MIME_TYPE = "amqp/map"; + + public AMQPEncodedMapMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException + { + this(delegateFactory, null); + } + + AMQPEncodedMapMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException + { + super(delegateFactory, data); + } + + AMQPEncodedMapMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException + { + super(delegate, data); + } + + @ Override + protected String getMimeType() + { + return MIME_TYPE; + } + + // The super clas methods resets the buffer + @ Override + public ByteBuffer getData() + { + writeMapToData(); + return _data; + } + + @ Override + protected void populateMapFromData() throws JMSException + { + if (_data != null) + { + _data.rewind(); + _map = _delegate.decodeMap(_data.buf()); + } + else + { + _map.clear(); + } + } + + @ Override + protected void writeMapToData() + { + _data = ByteBuffer.wrap(_delegate.encodeMap(_map)); + } + + // for testing + Map<String,Object> getMap() + { + return _map; + } + + void setMap(Map<String,Object> map) + { + _map = map; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java new file mode 100644 index 0000000000..37ade9cf80 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java @@ -0,0 +1,25 @@ +package org.apache.qpid.client.message; + +import javax.jms.JMSException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; + +public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory +{ + + @Override + protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, + ByteBuffer data) throws AMQException + { + return new AMQPEncodedMapMessage(delegate,data); + } + + @Override + public AbstractJMSMessage createMessage( + AMQMessageDelegateFactory delegateFactory) throws JMSException + { + return new AMQPEncodedMapMessage(delegateFactory); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 288a4ea85c..6ba55b207a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -49,7 +49,7 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message - private AMQMessageDelegate _delegate; + protected AMQMessageDelegate _delegate; private boolean _redelivered; protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) @@ -379,6 +379,7 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message buf.append("\nJMS Destination: ").append(getJMSDestination()); buf.append("\nJMS Type: ").append(getJMSType()); buf.append("\nJMS MessageID: ").append(getJMSMessageID()); + buf.append("\nJMS Content-Type: ").append(getContentType()); buf.append("\nAMQ message number: ").append(getDeliveryTag()); buf.append("\nProperties:"); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index b6e013ac8f..73f3afab03 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -44,8 +44,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm public static final String MIME_TYPE = "jms/map-message"; - - private Map<String, Object> _map = new HashMap<String, Object>(); + protected Map<String, Object> _map = new HashMap<String, Object>(); public JMSMapMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException { @@ -459,7 +458,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm return _map.containsKey(propName); } - private void populateMapFromData() throws JMSException + protected void populateMapFromData() throws JMSException { if (_data != null) { @@ -479,7 +478,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm } } - private void writeMapToData() + protected void writeMapToData() { allocateInitialBuffer(); final int size = _map.size(); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index a7d41e2cde..4e4061cf4d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -64,6 +64,7 @@ public class MessageFactoryRegistry mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory()); mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory()); mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory()); + mf.registerFactory(AMQPEncodedMapMessage.MIME_TYPE, new AMQPEncodedMapMessageFactory()); mf.registerFactory(null, mf._default); return mf; |
