summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-02-12 13:25:36 +0000
committerRobert Greig <rgreig@apache.org>2007-02-12 13:25:36 +0000
commitb53c13e9d33aa35ed38c647bfa29fab0bbe58915 (patch)
treea3f9401cd095238ca4f4f5d6b04582f6a33adc56 /java/client/src
parentcd8ccb1a691ef5eb260b165f08fd9a07d1e5867d (diff)
downloadqpid-python-b53c13e9d33aa35ed38c647bfa29fab0bbe58915.tar.gz
(Patch submitted by Rupert Smith) Qpid-360 fixes.
Message type defaults to ByteMessage when not specified. Unknown destination type is used as default when not specified. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@506439 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java209
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java85
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java77
3 files changed, 195 insertions, 176 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index fc3450c385..a0110cc8af 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -20,17 +20,20 @@
*/
package org.apache.qpid.client;
+import java.io.UnsupportedEncodingException;
+
+import javax.jms.*;
+
import org.apache.log4j.Logger;
+
import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
-import javax.jms.*;
-import java.io.UnsupportedEncodingException;
-
public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
protected final Logger _logger = Logger.getLogger(getClass());
@@ -101,9 +104,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
private final boolean _waitUntilSent;
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
- protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
- int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
- long producerId, boolean immediate, boolean mandatory, boolean waitUntilSent)
+ protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
+ boolean immediate, boolean mandatory, boolean waitUntilSent)
{
_connection = connection;
_destination = destination;
@@ -116,6 +119,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
declareDestination(destination);
}
+
_immediate = immediate;
_mandatory = mandatory;
_waitUntilSent = waitUntilSent;
@@ -134,18 +138,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
// Declare the exchange
// Note that the durable and internal arguments are ignored since passive is set to false
// TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- null, // arguments
- false, // autoDelete
- false, // durable
- destination.getExchangeName(), // exchange
- false, // internal
- true, // nowait
- false, // passive
- _session.getTicket(), // ticket
- destination.getExchangeClass()); // type
+ AMQFrame declare =
+ ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), null, // arguments
+ false, // autoDelete
+ false, // durable
+ destination.getExchangeName(), // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ _session.getTicket(), // ticket
+ destination.getExchangeClass()); // type
_protocolHandler.writeFrame(declare);
}
@@ -159,6 +162,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public boolean getDisableMessageID() throws JMSException
{
checkNotClosed();
+
// Always false for AMQP
return false;
}
@@ -172,39 +176,44 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public boolean getDisableMessageTimestamp() throws JMSException
{
checkNotClosed();
+
return _disableTimestamps;
}
public void setDeliveryMode(int i) throws JMSException
{
checkPreConditions();
- if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT)
+ if ((i != DeliveryMode.NON_PERSISTENT) && (i != DeliveryMode.PERSISTENT))
{
- throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i +
- " is illegal");
+ throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i
+ + " is illegal");
}
+
_deliveryMode = i;
}
public int getDeliveryMode() throws JMSException
{
checkNotClosed();
+
return _deliveryMode;
}
public void setPriority(int i) throws JMSException
{
checkPreConditions();
- if (i < 0 || i > 9)
+ if ((i < 0) || (i > 9))
{
throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
}
+
_messagePriority = i;
}
public int getPriority() throws JMSException
{
checkNotClosed();
+
return _messagePriority;
}
@@ -215,18 +224,21 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
}
+
_timeToLive = l;
}
public long getTimeToLive() throws JMSException
{
checkNotClosed();
+
return _timeToLive;
}
public Destination getDestination() throws JMSException
{
checkNotClosed();
+
return _destination;
}
@@ -241,11 +253,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
checkPreConditions();
checkInitialDestination();
-
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive,
- _mandatory, _immediate);
+ sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
}
}
@@ -256,8 +266,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
- _mandatory, _immediate);
+ sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
}
}
@@ -267,20 +276,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
- _mandatory, immediate);
+ sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate);
}
}
- public void send(Message message, int deliveryMode, int priority,
- long timeToLive) throws JMSException
+ public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
{
checkPreConditions();
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory,
- _immediate);
+ sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
}
}
@@ -291,69 +297,60 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive,
- _mandatory, _immediate);
+ sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory,
+ _immediate);
}
}
- public void send(Destination destination, Message message, int deliveryMode,
- int priority, long timeToLive)
- throws JMSException
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
+ throws JMSException
{
checkPreConditions();
checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
- _mandatory, _immediate);
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
}
}
- public void send(Destination destination, Message message, int deliveryMode,
- int priority, long timeToLive, boolean mandatory)
- throws JMSException
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
+ boolean mandatory) throws JMSException
{
checkPreConditions();
checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
- mandatory, _immediate);
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate);
}
}
- public void send(Destination destination, Message message, int deliveryMode,
- int priority, long timeToLive, boolean mandatory, boolean immediate)
- throws JMSException
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
+ boolean mandatory, boolean immediate) throws JMSException
{
checkPreConditions();
checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
- mandatory, immediate);
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate);
}
}
- public void send(Destination destination, Message message, int deliveryMode,
- int priority, long timeToLive, boolean mandatory,
- boolean immediate, boolean waitUntilSent)
- throws JMSException
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
+ boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
{
checkPreConditions();
checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
- mandatory, immediate, waitUntilSent);
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
+ waitUntilSent);
}
}
-
private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
{
if (message instanceof AbstractJMSMessage)
@@ -366,23 +363,23 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
if (message instanceof BytesMessage)
{
- newMessage = new MessageConverter((BytesMessage)message).getConvertedMessage();
+ newMessage = new MessageConverter((BytesMessage) message).getConvertedMessage();
}
else if (message instanceof MapMessage)
{
- newMessage = new MessageConverter((MapMessage)message).getConvertedMessage();
+ newMessage = new MessageConverter((MapMessage) message).getConvertedMessage();
}
else if (message instanceof ObjectMessage)
{
- newMessage = new MessageConverter((ObjectMessage)message).getConvertedMessage();
+ newMessage = new MessageConverter((ObjectMessage) message).getConvertedMessage();
}
else if (message instanceof TextMessage)
{
- newMessage = new MessageConverter((TextMessage)message).getConvertedMessage();
+ newMessage = new MessageConverter((TextMessage) message).getConvertedMessage();
}
else if (message instanceof StreamMessage)
{
- newMessage = new MessageConverter((StreamMessage)message).getConvertedMessage();
+ newMessage = new MessageConverter((StreamMessage) message).getConvertedMessage();
}
else
{
@@ -395,24 +392,25 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
else
{
- throw new JMSException("Unable to send message, due to class conversion error: " + message.getClass().getName());
+ throw new JMSException("Unable to send message, due to class conversion error: "
+ + message.getClass().getName());
}
}
}
-
private void validateDestination(Destination destination) throws JMSException
{
if (!(destination instanceof AMQDestination))
{
- throw new JMSException("Unsupported destination class: " +
- (destination != null ? destination.getClass() : null));
+ throw new JMSException("Unsupported destination class: "
+ + ((destination != null) ? destination.getClass() : null));
}
+
declareDestination((AMQDestination) destination);
}
- protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority,
- long timeToLive, boolean mandatory, boolean immediate) throws JMSException
+ protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
+ boolean mandatory, boolean immediate) throws JMSException
{
sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
}
@@ -429,21 +427,20 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
* @param immediate
* @throws JMSException
*/
- protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority,
- long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
+ protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
+ boolean mandatory, boolean immediate, boolean wait) throws JMSException
{
checkTemporaryDestination(destination);
origMessage.setJMSDestination(destination);
-
AbstractJMSMessage message = convertToNativeMessage(origMessage);
int type;
- if(destination instanceof Topic)
+ if (destination instanceof Topic)
{
type = AMQDestination.TOPIC_TYPE;
}
- else if(destination instanceof Queue)
+ else if (destination instanceof Queue)
{
type = AMQDestination.QUEUE_TYPE;
}
@@ -452,22 +449,19 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
type = AMQDestination.UNKNOWN_TYPE;
}
- message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(),
- type);
+ message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- destination.getExchangeName(), // exchange
- immediate, // immediate
- mandatory, // mandatory
- destination.getRoutingKey(), // routingKey
- _session.getTicket()); // ticket
-
-
+ AMQFrame publishFrame =
+ BasicPublishBody.createAMQFrame(
+ _channelId, _protocolHandler.getProtocolMajorVersion(), _protocolHandler.getProtocolMinorVersion(),
+ destination.getExchangeName(), // exchange
+ immediate, // immediate
+ mandatory, // mandatory
+ destination.getRoutingKey(), // routingKey
+ _session.getTicket()); // ticket
message.prepareForSending();
ByteBuffer payload = message.getData();
@@ -487,6 +481,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
contentHeaderProperties.setExpiration(0);
}
}
+
contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
contentHeaderProperties.setPriority((byte) priority);
@@ -494,12 +489,12 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
- if(payload != null)
+ if (payload != null)
{
createContentBodies(payload, frames, 2, _channelId);
}
- if (contentBodyFrameCount != 0 && _logger.isDebugEnabled())
+ if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
{
_logger.debug("Sending content body frames to " + destination);
}
@@ -508,12 +503,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
AMQFrame contentHeaderFrame =
- ContentHeaderBody.createAMQFrame(_channelId,
- BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion()),
- 0,
- contentHeaderProperties,
- size);
+ ContentHeaderBody.createAMQFrame(_channelId,
+ BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion()), 0,
+ contentHeaderProperties, size);
if (_logger.isDebugEnabled())
{
_logger.debug("Sending content header frame to " + destination);
@@ -524,7 +517,6 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
_protocolHandler.writeFrame(compositeFrame, wait);
-
if (message != origMessage)
{
_logger.debug("Updating original message");
@@ -538,16 +530,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
- if(destination instanceof TemporaryDestination)
+ if (destination instanceof TemporaryDestination)
{
_logger.debug("destination is temporary destination");
TemporaryDestination tempDest = (TemporaryDestination) destination;
- if(tempDest.getSession().isClosed())
+ if (tempDest.getSession().isClosed())
{
_logger.debug("session is closed");
throw new JMSException("Session for temporary destination has been closed");
}
- if(tempDest.isDeleted())
+
+ if (tempDest.isDeleted())
{
_logger.debug("destination is deleted");
throw new JMSException("Cannot send to a deleted temporary destination");
@@ -567,9 +560,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
{
- if (frames.length == offset + 1)
+ if (frames.length == (offset + 1))
{
- frames[offset] = ContentBody.createAMQFrame(channelId,new ContentBody(payload));
+ frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
}
else
{
@@ -578,10 +571,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
long remaining = payload.remaining();
for (int i = offset; i < frames.length; i++)
{
- payload.position((int) framePayloadMax * (i-offset));
+ payload.position((int) framePayloadMax * (i - offset));
int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
payload.limit(payload.position() + length);
- frames[i] = ContentBody.createAMQFrame(channelId,new ContentBody(payload.slice()));
+ frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
remaining -= length;
}
@@ -594,7 +587,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
// we substract one from the total frame maximum size to account for the end of frame marker in a body frame
// (0xCE byte).
int frameCount;
- if(payload == null || payload.remaining() == 0)
+ if ((payload == null) || (payload.remaining() == 0))
{
frameCount = 0;
}
@@ -602,9 +595,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
int dataLength = payload.remaining();
final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
- int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
+ int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
}
+
return frameCount;
}
@@ -624,7 +618,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
checkNotClosed();
- if (_session == null || _session.isClosed())
+ if ((_session == null) || _session.isClosed())
{
throw new javax.jms.IllegalStateException("Invalid Session");
}
@@ -640,9 +634,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException
{
- if (_destination != null && suppliedDestination != null)
+ if ((_destination != null) && (suppliedDestination != null))
{
- throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
+ throw new UnsupportedOperationException(
+ "This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
}
if (suppliedDestination == null)
@@ -650,10 +645,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
throw new InvalidDestinationException("Supplied Destination was invalid");
}
-
}
-
public AMQSession getSession()
{
return _session;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 8b6f2b4ab1..e3388be9ed 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -20,27 +20,29 @@
*/
package org.apache.qpid.client.message;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+
+import javax.jms.*;
+
import org.apache.commons.collections.map.ReferenceMap;
+
import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.url.BindingURL;
-import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.AMQUndefinedDestination;
import org.apache.qpid.client.*;
+import org.apache.qpid.client.AMQUndefinedDestination;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQShortString;
-
-import javax.jms.*;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Map;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.URLSyntaxException;
public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
{
private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
-
protected boolean _redelivered;
@@ -60,10 +62,11 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
{
_data.acquire();
}
+
_readableProperties = false;
_readableMessage = (data != null);
_changedData = (data == null);
- _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties)_contentHeaderProperties).getHeaders());
+ _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
}
protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
@@ -71,28 +74,29 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
{
this(contentHeader, deliveryTag);
-
- int type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
+ Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
+ int contentType = (type == null) ? AMQDestination.UNKNOWN_TYPE : type.intValue();
AMQDestination dest;
- switch(type)
+ switch (contentType)
{
- case AMQDestination.QUEUE_TYPE:
- dest = new AMQQueue(exchange, routingKey, routingKey);
- break;
- case AMQDestination.TOPIC_TYPE:
- dest = new AMQTopic(exchange, routingKey, null);
- break;
- default:
- dest = new AMQUndefinedDestination(exchange, routingKey, null);
- break;
+
+ case AMQDestination.QUEUE_TYPE:
+ dest = new AMQQueue(exchange, routingKey, routingKey);
+ break;
+
+ case AMQDestination.TOPIC_TYPE:
+ dest = new AMQTopic(exchange, routingKey, null);
+ break;
+
+ default:
+ dest = new AMQUndefinedDestination(exchange, routingKey, null);
+ break;
}
//Destination dest = AMQDestination.createDestination(url);
setJMSDestination(dest);
-
-
_data = data;
if (_data != null)
{
@@ -107,7 +111,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
{
super(contentHeader, deliveryTag);
_readableProperties = (_contentHeaderProperties != null);
- _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties)_contentHeaderProperties).getHeaders());
+ _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
}
public String getJMSMessageID() throws JMSException
@@ -116,6 +120,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
{
getContentHeaderProperties().setMessageId("ID:" + _deliveryTag);
}
+
return getContentHeaderProperties().getMessageId();
}
@@ -178,6 +183,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
_destinationCache.put(replyToEncoding, dest);
}
+
return dest;
}
}
@@ -188,11 +194,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
{
throw new IllegalArgumentException("Null destination not allowed");
}
+
if (!(destination instanceof AMQDestination))
{
- throw new IllegalArgumentException("ReplyTo destination may only be an AMQDestination - passed argument was type " +
- destination.getClass());
+ throw new IllegalArgumentException(
+ "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
}
+
final AMQDestination amqd = (AMQDestination) destination;
final AMQShortString encodedDestination = amqd.getEncodedName();
@@ -278,17 +286,17 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
_readableMessage = false;
}
-
public boolean propertyExists(AMQShortString propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().propertyExists(propertyName);
}
-
public boolean propertyExists(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().propertyExists(propertyName);
}
@@ -299,7 +307,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
return getJmsHeaders().getBoolean(propertyName);
}
-
public boolean getBooleanProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
@@ -310,48 +317,56 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public byte getByteProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getByte(propertyName);
}
public short getShortProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getShort(propertyName);
}
public int getIntProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getInteger(propertyName);
}
public long getLongProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getLong(propertyName);
}
public float getFloatProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getFloat(propertyName);
}
public double getDoubleProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getDouble(propertyName);
}
public String getStringProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getString(propertyName);
}
public Object getObjectProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getObject(propertyName);
}
@@ -436,7 +451,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
getJmsHeaders().remove(propertyName);
}
-
protected void removeProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
@@ -468,7 +482,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
}
-
/**
* This forces concrete classes to implement clearBody()
*
@@ -511,6 +524,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
{
buf.append('\n').append(getJmsHeaders().getHeaders());
}
+
return buf.toString();
}
catch (JMSException e)
@@ -519,7 +533,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
}
-
public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties)
{
getContentHeaderProperties().setHeaders(messageProperties);
@@ -550,6 +563,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
{
reset();
}
+
return _data;
}
@@ -608,6 +622,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getBytes(propertyName);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index 83dcc57b80..e02771d8f5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,20 +20,40 @@
*/
package org.apache.qpid.client.message;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.AMQShortString;
-
-import javax.jms.JMSException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
public class MessageFactoryRegistry
{
private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>();
- private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap = new HashMap<AMQShortString, MessageFactory>();
+ private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap =
+ new HashMap<AMQShortString, MessageFactory>();
+
+ /**
+ * Construct a new registry with the default message factories registered
+ * @return a message factory registry
+ */
+ public static MessageFactoryRegistry newDefaultRegistry()
+ {
+ MessageFactoryRegistry mf = new MessageFactoryRegistry();
+ mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
+ mf.registerFactory("text/plain", new JMSTextMessageFactory());
+ mf.registerFactory("text/xml", new JMSTextMessageFactory());
+ mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
+ mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
+ mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
+ mf.registerFactory(null, new JMSBytesMessageFactory());
+
+ return mf;
+ }
public void registerFactory(String mimeType, MessageFactory mf)
{
@@ -41,6 +61,7 @@ public class MessageFactoryRegistry
{
throw new IllegalArgumentException("Message factory must not be null");
}
+
_mimeStringToFactoryMap.put(mimeType, mf);
_mimeShortStringToFactoryMap.put(new AMQShortString(mimeType), mf);
}
@@ -48,6 +69,7 @@ public class MessageFactoryRegistry
public MessageFactory deregisterFactory(String mimeType)
{
_mimeShortStringToFactoryMap.remove(new AMQShortString(mimeType));
+
return _mimeStringToFactoryMap.remove(mimeType);
}
@@ -62,14 +84,19 @@ public class MessageFactoryRegistry
* @throws AMQException
* @throws JMSException
*/
- public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
- AMQShortString exchange,
- AMQShortString routingKey,
- ContentHeaderBody contentHeader,
- List bodies) throws AMQException, JMSException
+ public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
+ AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies)
+ throws AMQException, JMSException
{
- BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
- MessageFactory mf = _mimeShortStringToFactoryMap.get(properties.getContentTypeShortString());
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
+
+ // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
+ // AMQP. When the type is null, it can only be assumed that the message is a byte message.
+ AMQShortString contentTypeShortString = properties.getContentTypeShortString();
+ contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE)
+ : contentTypeShortString;
+
+ MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString);
if (mf == null)
{
throw new AMQException("Unsupport MIME type of " + properties.getContentType());
@@ -86,6 +113,7 @@ public class MessageFactoryRegistry
{
throw new IllegalArgumentException("Mime type must not be null");
}
+
MessageFactory mf = _mimeStringToFactoryMap.get(mimeType);
if (mf == null)
{
@@ -96,21 +124,4 @@ public class MessageFactoryRegistry
return mf.createMessage();
}
}
-
- /**
- * Construct a new registry with the default message factories registered
- * @return a message factory registry
- */
- public static MessageFactoryRegistry newDefaultRegistry()
- {
- MessageFactoryRegistry mf = new MessageFactoryRegistry();
- mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
- mf.registerFactory("text/plain", new JMSTextMessageFactory());
- mf.registerFactory("text/xml", new JMSTextMessageFactory());
- mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
- mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
- mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
- mf.registerFactory(null, new JMSBytesMessageFactory());
- return mf;
- }
}