diff options
author | Robert Greig <rgreig@apache.org> | 2007-02-12 13:25:36 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-02-12 13:25:36 +0000 |
commit | b53c13e9d33aa35ed38c647bfa29fab0bbe58915 (patch) | |
tree | a3f9401cd095238ca4f4f5d6b04582f6a33adc56 /java/client/src | |
parent | cd8ccb1a691ef5eb260b165f08fd9a07d1e5867d (diff) | |
download | qpid-python-b53c13e9d33aa35ed38c647bfa29fab0bbe58915.tar.gz |
(Patch submitted by Rupert Smith) Qpid-360 fixes.
Message type defaults to ByteMessage when not specified.
Unknown destination type is used as default when not specified.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@506439 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
3 files changed, 195 insertions, 176 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index fc3450c385..a0110cc8af 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -20,17 +20,20 @@ */ package org.apache.qpid.client; +import java.io.UnsupportedEncodingException; + +import javax.jms.*; + import org.apache.log4j.Logger; + import org.apache.mina.common.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageConverter; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; -import javax.jms.*; -import java.io.UnsupportedEncodingException; - public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { protected final Logger _logger = Logger.getLogger(getClass()); @@ -101,9 +104,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private final boolean _waitUntilSent; private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; - protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, - int channelId, AMQSession session, AMQProtocolHandler protocolHandler, - long producerId, boolean immediate, boolean mandatory, boolean waitUntilSent) + protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, + AMQSession session, AMQProtocolHandler protocolHandler, long producerId, + boolean immediate, boolean mandatory, boolean waitUntilSent) { _connection = connection; _destination = destination; @@ -116,6 +119,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { declareDestination(destination); } + _immediate = immediate; _mandatory = mandatory; _waitUntilSent = waitUntilSent; @@ -134,18 +138,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j // Declare the exchange // Note that the durable and internal arguments are ignored since passive is set to false // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - null, // arguments - false, // autoDelete - false, // durable - destination.getExchangeName(), // exchange - false, // internal - true, // nowait - false, // passive - _session.getTicket(), // ticket - destination.getExchangeClass()); // type + AMQFrame declare = + ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), null, // arguments + false, // autoDelete + false, // durable + destination.getExchangeName(), // exchange + false, // internal + true, // nowait + false, // passive + _session.getTicket(), // ticket + destination.getExchangeClass()); // type _protocolHandler.writeFrame(declare); } @@ -159,6 +162,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public boolean getDisableMessageID() throws JMSException { checkNotClosed(); + // Always false for AMQP return false; } @@ -172,39 +176,44 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public boolean getDisableMessageTimestamp() throws JMSException { checkNotClosed(); + return _disableTimestamps; } public void setDeliveryMode(int i) throws JMSException { checkPreConditions(); - if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT) + if ((i != DeliveryMode.NON_PERSISTENT) && (i != DeliveryMode.PERSISTENT)) { - throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i + - " is illegal"); + throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i + + " is illegal"); } + _deliveryMode = i; } public int getDeliveryMode() throws JMSException { checkNotClosed(); + return _deliveryMode; } public void setPriority(int i) throws JMSException { checkPreConditions(); - if (i < 0 || i > 9) + if ((i < 0) || (i > 9)) { throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9"); } + _messagePriority = i; } public int getPriority() throws JMSException { checkNotClosed(); + return _messagePriority; } @@ -215,18 +224,21 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l); } + _timeToLive = l; } public long getTimeToLive() throws JMSException { checkNotClosed(); + return _timeToLive; } public Destination getDestination() throws JMSException { checkNotClosed(); + return _destination; } @@ -241,11 +253,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j checkPreConditions(); checkInitialDestination(); - synchronized (_connection.getFailoverMutex()) { - sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, - _mandatory, _immediate); + sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); } } @@ -256,8 +266,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j synchronized (_connection.getFailoverMutex()) { - sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, - _mandatory, _immediate); + sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); } } @@ -267,20 +276,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { - sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, - _mandatory, immediate); + sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate); } } - public void send(Message message, int deliveryMode, int priority, - long timeToLive) throws JMSException + public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { checkPreConditions(); checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { - sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, - _immediate); + sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate); } } @@ -291,69 +297,60 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, - _mandatory, _immediate); + sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, + _immediate); } } - public void send(Destination destination, Message message, int deliveryMode, - int priority, long timeToLive) - throws JMSException + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) + throws JMSException { checkPreConditions(); checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, - _mandatory, _immediate); + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate); } } - public void send(Destination destination, Message message, int deliveryMode, - int priority, long timeToLive, boolean mandatory) - throws JMSException + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean mandatory) throws JMSException { checkPreConditions(); checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, - mandatory, _immediate); + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate); } } - public void send(Destination destination, Message message, int deliveryMode, - int priority, long timeToLive, boolean mandatory, boolean immediate) - throws JMSException + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean mandatory, boolean immediate) throws JMSException { checkPreConditions(); checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, - mandatory, immediate); + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate); } } - public void send(Destination destination, Message message, int deliveryMode, - int priority, long timeToLive, boolean mandatory, - boolean immediate, boolean waitUntilSent) - throws JMSException + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException { checkPreConditions(); checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, - mandatory, immediate, waitUntilSent); + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, + waitUntilSent); } } - private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException { if (message instanceof AbstractJMSMessage) @@ -366,23 +363,23 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j if (message instanceof BytesMessage) { - newMessage = new MessageConverter((BytesMessage)message).getConvertedMessage(); + newMessage = new MessageConverter((BytesMessage) message).getConvertedMessage(); } else if (message instanceof MapMessage) { - newMessage = new MessageConverter((MapMessage)message).getConvertedMessage(); + newMessage = new MessageConverter((MapMessage) message).getConvertedMessage(); } else if (message instanceof ObjectMessage) { - newMessage = new MessageConverter((ObjectMessage)message).getConvertedMessage(); + newMessage = new MessageConverter((ObjectMessage) message).getConvertedMessage(); } else if (message instanceof TextMessage) { - newMessage = new MessageConverter((TextMessage)message).getConvertedMessage(); + newMessage = new MessageConverter((TextMessage) message).getConvertedMessage(); } else if (message instanceof StreamMessage) { - newMessage = new MessageConverter((StreamMessage)message).getConvertedMessage(); + newMessage = new MessageConverter((StreamMessage) message).getConvertedMessage(); } else { @@ -395,24 +392,25 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } else { - throw new JMSException("Unable to send message, due to class conversion error: " + message.getClass().getName()); + throw new JMSException("Unable to send message, due to class conversion error: " + + message.getClass().getName()); } } } - private void validateDestination(Destination destination) throws JMSException { if (!(destination instanceof AMQDestination)) { - throw new JMSException("Unsupported destination class: " + - (destination != null ? destination.getClass() : null)); + throw new JMSException("Unsupported destination class: " + + ((destination != null) ? destination.getClass() : null)); } + declareDestination((AMQDestination) destination); } - protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, - long timeToLive, boolean mandatory, boolean immediate) throws JMSException + protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean mandatory, boolean immediate) throws JMSException { sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent); } @@ -429,21 +427,20 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j * @param immediate * @throws JMSException */ - protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, - long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException + protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive, + boolean mandatory, boolean immediate, boolean wait) throws JMSException { checkTemporaryDestination(destination); origMessage.setJMSDestination(destination); - AbstractJMSMessage message = convertToNativeMessage(origMessage); int type; - if(destination instanceof Topic) + if (destination instanceof Topic) { type = AMQDestination.TOPIC_TYPE; } - else if(destination instanceof Queue) + else if (destination instanceof Queue) { type = AMQDestination.QUEUE_TYPE; } @@ -452,22 +449,19 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j type = AMQDestination.UNKNOWN_TYPE; } - message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), - type); + message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - destination.getExchangeName(), // exchange - immediate, // immediate - mandatory, // mandatory - destination.getRoutingKey(), // routingKey - _session.getTicket()); // ticket - - + AMQFrame publishFrame = + BasicPublishBody.createAMQFrame( + _channelId, _protocolHandler.getProtocolMajorVersion(), _protocolHandler.getProtocolMinorVersion(), + destination.getExchangeName(), // exchange + immediate, // immediate + mandatory, // mandatory + destination.getRoutingKey(), // routingKey + _session.getTicket()); // ticket message.prepareForSending(); ByteBuffer payload = message.getData(); @@ -487,6 +481,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j contentHeaderProperties.setExpiration(0); } } + contentHeaderProperties.setDeliveryMode((byte) deliveryMode); contentHeaderProperties.setPriority((byte) priority); @@ -494,12 +489,12 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j final int contentBodyFrameCount = calculateContentBodyFrameCount(payload); final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount]; - if(payload != null) + if (payload != null) { createContentBodies(payload, frames, 2, _channelId); } - if (contentBodyFrameCount != 0 && _logger.isDebugEnabled()) + if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled()) { _logger.debug("Sending content body frames to " + destination); } @@ -508,12 +503,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. AMQFrame contentHeaderFrame = - ContentHeaderBody.createAMQFrame(_channelId, - BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion()), - 0, - contentHeaderProperties, - size); + ContentHeaderBody.createAMQFrame(_channelId, + BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion()), 0, + contentHeaderProperties, size); if (_logger.isDebugEnabled()) { _logger.debug("Sending content header frame to " + destination); @@ -524,7 +517,6 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); _protocolHandler.writeFrame(compositeFrame, wait); - if (message != origMessage) { _logger.debug("Updating original message"); @@ -538,16 +530,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private void checkTemporaryDestination(AMQDestination destination) throws JMSException { - if(destination instanceof TemporaryDestination) + if (destination instanceof TemporaryDestination) { _logger.debug("destination is temporary destination"); TemporaryDestination tempDest = (TemporaryDestination) destination; - if(tempDest.getSession().isClosed()) + if (tempDest.getSession().isClosed()) { _logger.debug("session is closed"); throw new JMSException("Session for temporary destination has been closed"); } - if(tempDest.isDeleted()) + + if (tempDest.isDeleted()) { _logger.debug("destination is deleted"); throw new JMSException("Cannot send to a deleted temporary destination"); @@ -567,9 +560,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId) { - if (frames.length == offset + 1) + if (frames.length == (offset + 1)) { - frames[offset] = ContentBody.createAMQFrame(channelId,new ContentBody(payload)); + frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload)); } else { @@ -578,10 +571,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j long remaining = payload.remaining(); for (int i = offset; i < frames.length; i++) { - payload.position((int) framePayloadMax * (i-offset)); + payload.position((int) framePayloadMax * (i - offset)); int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; payload.limit(payload.position() + length); - frames[i] = ContentBody.createAMQFrame(channelId,new ContentBody(payload.slice())); + frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice())); remaining -= length; } @@ -594,7 +587,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j // we substract one from the total frame maximum size to account for the end of frame marker in a body frame // (0xCE byte). int frameCount; - if(payload == null || payload.remaining() == 0) + if ((payload == null) || (payload.remaining() == 0)) { frameCount = 0; } @@ -602,9 +595,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { int dataLength = payload.remaining(); final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; - int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0; + int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; frameCount = (int) (dataLength / framePayloadMax) + lastFrame; } + return frameCount; } @@ -624,7 +618,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { checkNotClosed(); - if (_session == null || _session.isClosed()) + if ((_session == null) || _session.isClosed()) { throw new javax.jms.IllegalStateException("Invalid Session"); } @@ -640,9 +634,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException { - if (_destination != null && suppliedDestination != null) + if ((_destination != null) && (suppliedDestination != null)) { - throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); + throw new UnsupportedOperationException( + "This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); } if (suppliedDestination == null) @@ -650,10 +645,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j throw new InvalidDestinationException("Supplied Destination was invalid"); } - } - public AMQSession getSession() { return _session; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 8b6f2b4ab1..e3388be9ed 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -20,27 +20,29 @@ */ package org.apache.qpid.client.message; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Map; + +import javax.jms.*; + import org.apache.commons.collections.map.ReferenceMap; + import org.apache.mina.common.ByteBuffer; + import org.apache.qpid.AMQException; -import org.apache.qpid.url.BindingURL; -import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.client.AMQUndefinedDestination; import org.apache.qpid.client.*; +import org.apache.qpid.client.AMQUndefinedDestination; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.AMQShortString; - -import javax.jms.*; -import java.util.Collections; -import java.util.Enumeration; -import java.util.Map; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.url.URLSyntaxException; public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message { private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); - protected boolean _redelivered; @@ -60,10 +62,11 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { _data.acquire(); } + _readableProperties = false; _readableMessage = (data != null); _changedData = (data == null); - _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties)_contentHeaderProperties).getHeaders()); + _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); } protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, @@ -71,28 +74,29 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { this(contentHeader, deliveryTag); - - int type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName()); + Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName()); + int contentType = (type == null) ? AMQDestination.UNKNOWN_TYPE : type.intValue(); AMQDestination dest; - switch(type) + switch (contentType) { - case AMQDestination.QUEUE_TYPE: - dest = new AMQQueue(exchange, routingKey, routingKey); - break; - case AMQDestination.TOPIC_TYPE: - dest = new AMQTopic(exchange, routingKey, null); - break; - default: - dest = new AMQUndefinedDestination(exchange, routingKey, null); - break; + + case AMQDestination.QUEUE_TYPE: + dest = new AMQQueue(exchange, routingKey, routingKey); + break; + + case AMQDestination.TOPIC_TYPE: + dest = new AMQTopic(exchange, routingKey, null); + break; + + default: + dest = new AMQUndefinedDestination(exchange, routingKey, null); + break; } //Destination dest = AMQDestination.createDestination(url); setJMSDestination(dest); - - _data = data; if (_data != null) { @@ -107,7 +111,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { super(contentHeader, deliveryTag); _readableProperties = (_contentHeaderProperties != null); - _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties)_contentHeaderProperties).getHeaders()); + _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); } public String getJMSMessageID() throws JMSException @@ -116,6 +120,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { getContentHeaderProperties().setMessageId("ID:" + _deliveryTag); } + return getContentHeaderProperties().getMessageId(); } @@ -178,6 +183,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _destinationCache.put(replyToEncoding, dest); } + return dest; } } @@ -188,11 +194,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { throw new IllegalArgumentException("Null destination not allowed"); } + if (!(destination instanceof AMQDestination)) { - throw new IllegalArgumentException("ReplyTo destination may only be an AMQDestination - passed argument was type " + - destination.getClass()); + throw new IllegalArgumentException( + "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); } + final AMQDestination amqd = (AMQDestination) destination; final AMQShortString encodedDestination = amqd.getEncodedName(); @@ -278,17 +286,17 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _readableMessage = false; } - public boolean propertyExists(AMQShortString propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().propertyExists(propertyName); } - public boolean propertyExists(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().propertyExists(propertyName); } @@ -299,7 +307,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach return getJmsHeaders().getBoolean(propertyName); } - public boolean getBooleanProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); @@ -310,48 +317,56 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public byte getByteProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getByte(propertyName); } public short getShortProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getShort(propertyName); } public int getIntProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getInteger(propertyName); } public long getLongProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getLong(propertyName); } public float getFloatProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getFloat(propertyName); } public double getDoubleProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getDouble(propertyName); } public String getStringProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getString(propertyName); } public Object getObjectProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getObject(propertyName); } @@ -436,7 +451,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach getJmsHeaders().remove(propertyName); } - protected void removeProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); @@ -468,7 +482,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } } - /** * This forces concrete classes to implement clearBody() * @@ -511,6 +524,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { buf.append('\n').append(getJmsHeaders().getHeaders()); } + return buf.toString(); } catch (JMSException e) @@ -519,7 +533,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } } - public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties) { getContentHeaderProperties().setHeaders(messageProperties); @@ -550,6 +563,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { reset(); } + return _data; } @@ -608,6 +622,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getBytes(propertyName); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index 83dcc57b80..e02771d8f5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,20 +20,40 @@ */ package org.apache.qpid.client.message; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.jms.JMSException; + import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.AMQShortString; - -import javax.jms.JMSException; -import java.util.HashMap; -import java.util.Map; -import java.util.List; public class MessageFactoryRegistry { private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>(); - private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap = new HashMap<AMQShortString, MessageFactory>(); + private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap = + new HashMap<AMQShortString, MessageFactory>(); + + /** + * Construct a new registry with the default message factories registered + * @return a message factory registry + */ + public static MessageFactoryRegistry newDefaultRegistry() + { + MessageFactoryRegistry mf = new MessageFactoryRegistry(); + mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory()); + mf.registerFactory("text/plain", new JMSTextMessageFactory()); + mf.registerFactory("text/xml", new JMSTextMessageFactory()); + mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory()); + mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory()); + mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory()); + mf.registerFactory(null, new JMSBytesMessageFactory()); + + return mf; + } public void registerFactory(String mimeType, MessageFactory mf) { @@ -41,6 +61,7 @@ public class MessageFactoryRegistry { throw new IllegalArgumentException("Message factory must not be null"); } + _mimeStringToFactoryMap.put(mimeType, mf); _mimeShortStringToFactoryMap.put(new AMQShortString(mimeType), mf); } @@ -48,6 +69,7 @@ public class MessageFactoryRegistry public MessageFactory deregisterFactory(String mimeType) { _mimeShortStringToFactoryMap.remove(new AMQShortString(mimeType)); + return _mimeStringToFactoryMap.remove(mimeType); } @@ -62,14 +84,19 @@ public class MessageFactoryRegistry * @throws AMQException * @throws JMSException */ - public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, - AMQShortString exchange, - AMQShortString routingKey, - ContentHeaderBody contentHeader, - List bodies) throws AMQException, JMSException + public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange, + AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies) + throws AMQException, JMSException { - BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties; - MessageFactory mf = _mimeShortStringToFactoryMap.get(properties.getContentTypeShortString()); + BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties; + + // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over + // AMQP. When the type is null, it can only be assumed that the message is a byte message. + AMQShortString contentTypeShortString = properties.getContentTypeShortString(); + contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE) + : contentTypeShortString; + + MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString); if (mf == null) { throw new AMQException("Unsupport MIME type of " + properties.getContentType()); @@ -86,6 +113,7 @@ public class MessageFactoryRegistry { throw new IllegalArgumentException("Mime type must not be null"); } + MessageFactory mf = _mimeStringToFactoryMap.get(mimeType); if (mf == null) { @@ -96,21 +124,4 @@ public class MessageFactoryRegistry return mf.createMessage(); } } - - /** - * Construct a new registry with the default message factories registered - * @return a message factory registry - */ - public static MessageFactoryRegistry newDefaultRegistry() - { - MessageFactoryRegistry mf = new MessageFactoryRegistry(); - mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory()); - mf.registerFactory("text/plain", new JMSTextMessageFactory()); - mf.registerFactory("text/xml", new JMSTextMessageFactory()); - mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory()); - mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory()); - mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory()); - mf.registerFactory(null, new JMSBytesMessageFactory()); - return mf; - } } |