diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-15 14:29:56 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-15 14:29:56 +0000 |
| commit | 8c9a768363eb8a5069920adc054eb89295584db1 (patch) | |
| tree | e29d435ba0517709316dda28e670889ba5afa1e7 /java/client | |
| parent | 8e3751d921d2915b4edb57beb7a4db66c02963bd (diff) | |
| download | qpid-python-8c9a768363eb8a5069920adc054eb89295584db1.tar.gz | |
Added foreign message support
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566187 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
11 files changed, 349 insertions, 43 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java index e06d4d3a1e..5394a50448 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java @@ -27,7 +27,12 @@ public class ByteBufferMessage implements Message private DeliveryProperties _currentDeliveryProps; private MessageProperties _currentMessageProps; private long _transferId; - + + public ByteBufferMessage() + { + + } + public ByteBufferMessage(long transferId) { _transferId = transferId; diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java index f3b6122992..18dfebeabf 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java @@ -17,7 +17,13 @@ */ package org.apache.qpidity.jms; +import org.apache.qpidity.jms.message.QpidMessage; +import org.apache.qpidity.jms.message.MessageHelper; +import org.apache.qpidity.jms.message.MessageImpl; +import org.apache.qpidity.QpidException; + import javax.jms.*; +import java.util.UUID; /** * Implements MessageProducer @@ -304,13 +310,63 @@ public class MessageProducerImpl extends MessageActor implements MessageProducer { throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive); } + // Only get current time if required + long currentTime = Long.MIN_VALUE; + if (!((timeToLive == 0) && _disableTimestamps)) + { + currentTime = System.currentTimeMillis(); + } + // the messae UID + String uid = (_disableMessageId) ? "MSG_ID_DISABLED" : UUID.randomUUID().toString(); + MessageImpl qpidMessage = null; // check that the message is not a foreign one - // todo - // set the properties - - // todo - - // dispatch it + try + { + qpidMessage = (MessageImpl) message; + } + catch (ClassCastException cce) + { + // this is a foreign message + qpidMessage = MessageHelper.transformMessage(message); + // set message's properties in case they are queried after send. + message.setJMSDestination(destination); + message.setJMSDeliveryMode(deliveryMode); + message.setJMSPriority(priority); + message.setJMSMessageID(uid); + if (timeToLive != 0) + { + message.setJMSExpiration(timeToLive + currentTime); + _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration()); + } + else + { + message.setJMSExpiration(timeToLive); + } + message.setJMSTimestamp(currentTime); + } + // set the message properties + qpidMessage.setJMSDestination(destination); + qpidMessage.setJMSMessageID(uid); + qpidMessage.setJMSDeliveryMode(deliveryMode); + qpidMessage.setJMSPriority(priority); + if (timeToLive != 0) + { + qpidMessage.setJMSExpiration(timeToLive + currentTime); + } + else + { + qpidMessage.setJMSExpiration(timeToLive); + } + qpidMessage.setJMSTimestamp(currentTime); + // call beforeMessageDispatch + try + { + qpidMessage.beforeMessageDispatch(); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } // todo getSession().getQpidSession().messageTransfer(((DestinationImpl) destination).getExchangeName(), message, Option); } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index 71cbae29a2..90912f9c10 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -22,7 +22,6 @@ import org.slf4j.LoggerFactory; import org.apache.qpidity.jms.message.*; import org.apache.qpidity.QpidException; import org.apache.qpidity.RangeSet; -import org.apache.qpid.client.message.*; import javax.jms.*; import javax.jms.IllegalStateException; @@ -435,7 +434,6 @@ public class SessionImpl implements Session // TODO: messageID is a string but range need a long??? // ranges.add(message.getMessageID()); getQpidSession().messageRelease(ranges); - // TODO We can be a little bit cleverer and build a set of ranges } } @@ -1040,7 +1038,6 @@ public class SessionImpl implements Session // TODO: messageID is a string but range need a long??? // ranges.add(message.getMessageID()); getQpidSession().messageAcknowledge(ranges); - // TODO We can be a little bit cleverer and build a set of ranges } //empty the list of unack messages _unacknowledgedMessages.clear(); diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java index efc1b40d2d..ecf6c796d3 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java @@ -65,8 +65,9 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage * Constructor used by MessageFactory * * @param message The new qpid message. + * @throws QpidException In case of problem when receiving the message body. */ - protected BytesMessageImpl(org.apache.qpidity.api.Message message) + protected BytesMessageImpl(org.apache.qpidity.api.Message message) throws QpidException { super(message); } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java index d2493c1cc9..9c4387d8b0 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java @@ -54,8 +54,9 @@ public class MapMessageImpl extends MessageImpl implements MapMessage * Constructor used by MessageFactory * * @param message The new qpid message. + * @throws QpidException In case of IO problem when reading the received message. */ - protected MapMessageImpl(org.apache.qpidity.api.Message message) + protected MapMessageImpl(org.apache.qpidity.api.Message message) throws QpidException { super(message); } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageHelper.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageHelper.java index f5a2fd0ac6..ab23daaadd 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageHelper.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageHelper.java @@ -17,13 +17,12 @@ */ package org.apache.qpidity.jms.message; -import javax.jms.JMSException; -import javax.jms.MessageFormatException; +import javax.jms.*; import java.math.BigDecimal; import java.math.BigInteger; +import java.util.Enumeration; /** - * * This is an helper class for performing data convertion */ public class MessageHelper @@ -106,7 +105,7 @@ public class MessageHelper return result; } - /** + /** * Convert an object into a int value * * @param obj The object that may contain int value @@ -127,12 +126,12 @@ public class MessageHelper else { throw new MessageFormatException("int property type convertion error", - "Messasge property type convertion error"); + "Messasge property type convertion error"); } return result; } - /** + /** * Convert an object into a long value * * @param obj The object that may contain long value @@ -154,7 +153,7 @@ public class MessageHelper else { throw new MessageFormatException("long property type convertion error", - "Messasge property type convertion error"); + "Messasge property type convertion error"); } return result; } @@ -180,7 +179,7 @@ public class MessageHelper else { throw new MessageFormatException("float property type convertion error", - "Messasge property type convertion error"); + "Messasge property type convertion error"); } return result; } @@ -206,8 +205,8 @@ public class MessageHelper else { throw new MessageFormatException("double property type convertion error", - "Messasge property type convertion error"); - } + "Messasge property type convertion error"); + } return result; } @@ -223,17 +222,17 @@ public class MessageHelper char result; if (obj instanceof Character) { - result = ((Character) obj).charValue(); + result = (Character) obj; } else { throw new MessageFormatException("char property type convertion error", - "Messasge property type convertion error"); + "Messasge property type convertion error"); } return result; } - /** + /** * Convert an object into a String value * * @param obj The object that may contain String value @@ -259,7 +258,7 @@ public class MessageHelper * @param value object for inspection * @return true if object represent Java primitive type; false otherwise */ - public static boolean isPrimitive(Object value) throws JMSException + public static boolean isPrimitive(Object value) { // Innocent till proven guilty boolean isPrimitive = true; @@ -269,4 +268,189 @@ public class MessageHelper } return isPrimitive; } + + /** + * Transform a foreign message into an equivalent QPID representation. + * + * @param message The foreign message to be converted. + * @return A native message. + * @throws JMSException In case of problem when converting the message. + */ + public static MessageImpl transformMessage(Message message) throws JMSException + { + MessageImpl messageImpl; + + if (message instanceof BytesMessage) + { + messageImpl = transformBytesMessage((BytesMessage) message); + } + else if (message instanceof MapMessage) + { + messageImpl = transformMapMessage((MapMessage) message); + } + else if (message instanceof ObjectMessage) + { + messageImpl = transformObjectMessage((ObjectMessage) message); + } + else if (message instanceof StreamMessage) + { + messageImpl = transformStreamMessage((StreamMessage) message); + } + else if (message instanceof TextMessage) + { + messageImpl = transformTextMessage((TextMessage) message); + } + else + { + messageImpl = new MessageImpl(); + } + transformHeaderAndProperties(message, messageImpl); + return messageImpl; + } + + //---- Private methods + /** + * Exposed JMS defined properties on converted message: + * JMSDestination - we don't set here + * JMSDeliveryMode - we don't set here + * JMSExpiration - we don't set here + * JMSPriority - we don't set here + * JMSMessageID - we don't set here + * JMSTimestamp - we don't set here + * JMSCorrelationID - set + * JMSReplyTo - set + * JMSType - set + * JMSRedlivered - we don't set here + * + * @param message The foreign message to be converted. + * @param nativeMsg A native Qpid message. + * @throws JMSException In case of problem when converting the message. + */ + private static void transformHeaderAndProperties(Message message, MessageImpl nativeMsg) throws JMSException + { + //Set the correlation ID + String correlationID = message.getJMSCorrelationID(); + if (correlationID != null) + { + nativeMsg.setJMSCorrelationID(correlationID); + } + //Set JMS ReplyTo + if (message.getJMSReplyTo() != null) + { + nativeMsg.setJMSReplyTo(message.getJMSReplyTo()); + } + //Set JMS type + String jmsType = message.getJMSType(); + if (jmsType != null) + { + nativeMsg.setJMSType(jmsType); + } + // Sets all non-JMS defined properties on converted message + Enumeration propertyNames = message.getPropertyNames(); + while (propertyNames.hasMoreElements()) + { + String propertyName = String.valueOf(propertyNames.nextElement()); + if (!propertyName.startsWith("JMSX_")) + { + Object value = message.getObjectProperty(propertyName); + nativeMsg.setObjectProperty(propertyName, value); + } + } + } + + /** + * Transform a BytesMessage. + * + * @param bytesMessage a BytesMessage to be converted. + * @return a native BytesMessage. + * @throws JMSException In case of problem when converting the message. + */ + private static BytesMessageImpl transformBytesMessage(BytesMessage bytesMessage) throws JMSException + { + //reset the BytesMessage (makes the body read-only and repositions + // the stream of bytes to the beginning + bytesMessage.reset(); + BytesMessageImpl nativeMsg = new BytesMessageImpl(); + byte[] buf = new byte[1024]; + int len; + while ((len = bytesMessage.readBytes(buf)) != -1) + { + nativeMsg.writeBytes(buf, 0, len); + } + return nativeMsg; + } + + /** + * Transform a MapMessage. + * + * @param mapMessage a MapMessage to be converted. + * @return a native MapMessage. + * @throws JMSException In case of problem when converting the message. + */ + private static MapMessageImpl transformMapMessage(MapMessage mapMessage) throws JMSException + { + MapMessageImpl nativeMsg = new MapMessageImpl(); + Enumeration mapNames = mapMessage.getMapNames(); + while (mapNames.hasMoreElements()) + { + String name = (String) mapNames.nextElement(); + nativeMsg.setObject(name, mapMessage.getObject(name)); + } + return nativeMsg; + } + + /** + * Transform an ObjectMessage. + * + * @param objectMessage a ObjectMessage to be converted. + * @return a native ObjectMessage. + * @throws JMSException In case of problem when converting the message. + */ + private static ObjectMessageImpl transformObjectMessage(ObjectMessage objectMessage) throws JMSException + { + ObjectMessageImpl nativeMsg = new ObjectMessageImpl(); + nativeMsg.setObject(objectMessage.getObject()); + return nativeMsg; + } + + /** + * Transform a StreamMessage. + * + * @param streamMessage a StreamMessage to be converted. + * @return a native StreamMessage. + * @throws JMSException In case of problem when converting the message. + */ + private static StreamMessageImpl transformStreamMessage(StreamMessage streamMessage) throws JMSException + { + StreamMessageImpl nativeMsg = new StreamMessageImpl(); + try + { + //reset the stream message + streamMessage.reset(); + while (true) + { + nativeMsg.writeObject(streamMessage.readObject()); + } + } + catch (MessageEOFException e) + { + // we're at the end so don't mind the exception + } + return nativeMsg; + } + + /** + * Transform a TextMessage. + * + * @param textMessage a TextMessage to be converted. + * @return a native TextMessage. + * @throws JMSException In case of problem when converting the message. + */ + private static TextMessageImpl transformTextMessage(TextMessage textMessage) throws JMSException + { + TextMessageImpl nativeMsg = new TextMessageImpl(); + nativeMsg.setText(textMessage.getText()); + return nativeMsg; + } + } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java index 0285e60908..7d5ae755f5 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java @@ -30,6 +30,11 @@ import java.util.Enumeration; public class MessageImpl extends QpidMessage implements Message { /** + * name used to store JMSType. + */ + private static final String JMS_MESSAGE_TYPE = "JMSType"; + + /** * The ReplyTo destination for this message */ private Destination _replyTo; @@ -72,8 +77,9 @@ public class MessageImpl extends QpidMessage implements Message * Constructor used by MessageFactory * * @param message The new qpid message. + * @throws QpidException In case of IO problem when reading the received message. */ - protected MessageImpl(org.apache.qpidity.api.Message message) + protected MessageImpl(org.apache.qpidity.api.Message message) throws QpidException { super(message); } @@ -411,7 +417,7 @@ public class MessageImpl extends QpidMessage implements Message */ public String getJMSType() throws JMSException { - return super.getMessageType(); + return getStringProperty(JMS_MESSAGE_TYPE); } /** @@ -422,7 +428,14 @@ public class MessageImpl extends QpidMessage implements Message */ public void setJMSType(String type) throws JMSException { - super.setMessageType(type); + if (type == null) + { + throw new JMSException("Invalid message type null"); + } + else + { + super.setProperty(JMS_MESSAGE_TYPE, type); + } } /** @@ -843,7 +856,7 @@ public class MessageImpl extends QpidMessage implements Message * Clear out the message body. Clearing a message's body does not clear * its header values or property entries. * <P>If this message body was read-only, calling this method leaves - * the message body is in the same state as an empty body in a newly + * the message body in the same state as an empty body in a newly * created message. * * @throws JMSException If clearing this message body fails to due to some error. @@ -864,14 +877,15 @@ public class MessageImpl extends QpidMessage implements Message { if (_destination == null) { - throw new QpidException("Invalid destination null",null, null); + throw new QpidException("Invalid destination null", null, null); } + super.beforeMessageDispatch(); } /** * This method is invoked after this message is received. * - * @throws QpidException + * @throws QpidException If there is an internal error when procesing this message. */ public void afterMessageReceive() throws QpidException { @@ -882,7 +896,6 @@ public class MessageImpl extends QpidMessage implements Message _proertiesReadOnly = true; _readOnly = true; - } /** diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java index d3653a0962..5878fc8c34 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java @@ -57,8 +57,9 @@ public class ObjectMessageImpl extends MessageImpl implements ObjectMessage * Constructor used by MessageFactory * * @param message The new qpid message. + * @throws QpidException In case of IO problem when reading the received message. */ - protected ObjectMessageImpl(org.apache.qpidity.api.Message message) + protected ObjectMessageImpl(org.apache.qpidity.api.Message message) throws QpidException { super(message); } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java index 1ddc567e54..7c46e191ef 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java @@ -25,10 +25,12 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.Map; import java.util.Vector; +import java.io.IOException; import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; import org.apache.qpidity.ReplyTo; +import org.apache.qpidity.client.util.ByteBufferMessage; public class QpidMessage @@ -55,25 +57,36 @@ public class QpidMessage //-- Constructors - /** * Constructor used when JMS messages are created by SessionImpl. */ protected QpidMessage() { - // TODO we need an implementation class: _qpidityMessage + // We us a byteBufferMessage as default + _qpidityMessage = new ByteBufferMessage(); _messageProperties = new HashMap<String, Object>(); + // This is a newly created messsage so the data is empty + _messageData = ByteBuffer.allocate(1024); } /** * Constructor used when a Qpid message is received * - * @param message The received message + * @param message The received message. + * @throws QpidException In case of problem when receiving the message body. */ - protected QpidMessage(org.apache.qpidity.api.Message message) + protected QpidMessage(org.apache.qpidity.api.Message message) throws QpidException { - _qpidityMessage = message; - _messageProperties = (Map<String, Object>) message.getMessageProperties().getApplicationHeaders(); + try + { + _qpidityMessage = message; + _messageProperties = (Map<String, Object>) message.getMessageProperties().getApplicationHeaders(); + _messageData = _qpidityMessage.readData(); + } + catch (IOException ioe) + { + throw new QpidException("IO problem when creating message", ErrorCode.UNDEFINED, ioe); + } } //---- getters and setters. @@ -148,6 +161,16 @@ public class QpidMessage } /** + * Set the ReplyTo for this message. + * + * @param replyTo The ReplyTo for this message. + */ + protected void setReplyTo(ReplyTo replyTo) + { + _qpidityMessage.getMessageProperties().setReplyTo(replyTo); + } + + /** * Get this message Delivery mode * The delivery mode may be non-persistent (1) or persistent (2) * @@ -321,9 +344,30 @@ public class QpidMessage */ protected void clearMessageData() { - _messageData = ByteBuffer.allocate(1024); + _messageData = ByteBuffer.allocate(1024); } + /** + * This method is invoked before a message dispatch operation. + * + * @throws QpidException If the destination is not set + */ + public void beforeMessageDispatch() throws QpidException + { + try + { + // set the message data + _qpidityMessage.clearData(); + // we need to do a flip + _messageData.flip(); + _qpidityMessage.appendData(_messageData); + _qpidityMessage.getMessageProperties().setApplicationHeaders(_messageProperties); + } + catch (IOException e) + { + throw new QpidException("IO exception when sending message", ErrorCode.UNDEFINED, e); + } + } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/StreamMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/StreamMessageImpl.java index 94075a3c6f..10060b6df8 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/StreamMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/StreamMessageImpl.java @@ -17,6 +17,8 @@ */ package org.apache.qpidity.jms.message; +import org.apache.qpidity.QpidException; + import javax.jms.*; import java.io.IOException; import java.io.EOFException; @@ -80,8 +82,9 @@ public class StreamMessageImpl extends BytesMessageImpl implements StreamMessage * Constructor used by MessageFactory * * @param message The new qpid message. + * @throws QpidException In case of problem when receiving the message body. */ - protected StreamMessageImpl(org.apache.qpidity.api.Message message) + protected StreamMessageImpl(org.apache.qpidity.api.Message message) throws QpidException { super(message); } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java index f5228cb6d2..e894c33682 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java @@ -55,8 +55,9 @@ public class TextMessageImpl extends MessageImpl implements TextMessage * Constructor used by MessageFactory * * @param message The new qpid message. + * @throws QpidException In case of IO problem when reading the received message. */ - protected TextMessageImpl(org.apache.qpidity.api.Message message) + protected TextMessageImpl(org.apache.qpidity.api.Message message) throws QpidException { super(message); } |
