summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-28 14:31:52 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-28 14:31:52 +0000
commit64f8ac4fd226e9201fad685efea5a3332a30e262 (patch)
treec738e29520cad69665749ffb0e4b389dc2ac46cf /qpid/java
parent9a5e9e59763989f2e1ceb8e7bc32376a203bc0d9 (diff)
downloadqpid-python-64f8ac4fd226e9201fad685efea5a3332a30e262.tar.gz
QPID-6052 : Use ADDR addresses for JMSDestination on incoming messages in 0-9-1 when the address mode is ADDR
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1621143 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java26
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java48
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java84
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java86
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java18
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java18
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java30
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java2
17 files changed, 252 insertions, 132 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
index 0e1658d2b3..57f8343874 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
@@ -58,7 +58,7 @@ public class AMQAnyDestination extends AMQDestination implements Queue, Topic
super(str);
}
- public AMQAnyDestination(Address addr) throws Exception
+ public AMQAnyDestination(Address addr)
{
super(addr);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 2714caf2a1..a71555480f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -211,7 +211,7 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
{
}
- protected AMQDestination(Address address) throws Exception
+ protected AMQDestination(Address address)
{
this._address = address;
getInfoFromAddress();
@@ -749,7 +749,8 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
}
}
- public static Destination createDestination(String str) throws Exception
+ public static Destination createDestination(String str, final boolean useNodeTypeForDestinationType)
+ throws URISyntaxException
{
DestSyntax syntax = getDestType(str);
str = stripSyntaxPrefix(str);
@@ -760,7 +761,24 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
else
{
Address address = createAddressFromString(str);
- return new AMQAnyDestination(address);
+ if(useNodeTypeForDestinationType)
+ {
+ AddressHelper helper = new AddressHelper(address);
+ switch(helper.getNodeType())
+ {
+ case AMQDestination.QUEUE_TYPE:
+ return new AMQQueue(address);
+ case AMQDestination.TOPIC_TYPE:
+ return new AMQTopic(address);
+ default:
+ return new AMQAnyDestination(address);
+ }
+
+ }
+ else
+ {
+ return new AMQAnyDestination(address);
+ }
}
}
@@ -912,7 +930,7 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
return Address.parse(str);
}
- private void getInfoFromAddress() throws Exception
+ private void getInfoFromAddress()
{
_name = _address.getName();
_subject = _address.getSubject();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
index f65ecece2b..9b00883dfb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
@@ -20,13 +20,15 @@
*/
package org.apache.qpid.client;
+import java.net.URISyntaxException;
+
+import javax.jms.Queue;
+
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
import org.apache.qpid.url.BindingURL;
-import javax.jms.Queue;
-import java.net.URISyntaxException;
-
public class AMQQueue extends AMQDestination implements Queue
{
private static final long serialVersionUID = -1283142598932655606L;
@@ -36,6 +38,11 @@ public class AMQQueue extends AMQDestination implements Queue
super();
}
+ public AMQQueue(Address address)
+ {
+ super(address);
+ }
+
public AMQQueue(String address) throws URISyntaxException
{
super(address);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 9bdcb9e83f..29460bb42d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1424,7 +1424,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
Queue dest = validateQueue(destination);
C consumer = (C) createConsumer(dest);
-
+ consumer.setAddressType(AMQDestination.QUEUE_TYPE);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1443,7 +1443,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
Queue dest = validateQueue(destination);
C consumer = (C) createConsumer(dest, messageSelector);
-
+ consumer.setAddressType(AMQDestination.QUEUE_TYPE);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1461,7 +1461,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkNotClosed();
Queue dest = validateQueue(queue);
C consumer = (C) createConsumer(dest);
-
+ consumer.setAddressType(AMQDestination.QUEUE_TYPE);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1480,7 +1480,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkNotClosed();
Queue dest = validateQueue(queue);
C consumer = (C) createConsumer(dest, messageSelector);
-
+ consumer.setAddressType(AMQDestination.QUEUE_TYPE);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -2590,7 +2590,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
("Cannot create a durable subscription with a temporary topic: " + topic);
}
- if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic))
+ if (!(topic instanceof AMQDestination))
{
throw new javax.jms.InvalidDestinationException(
"Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index b0615ea99f..31df674a09 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -774,7 +774,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage =
getMessageFactoryRegistry().createMessage(0, false, msg.getExchange(),
- msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache, _topicDestinationCache);
+ msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache,
+ _topicDestinationCache, AMQDestination.UNKNOWN_TYPE);
AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
AMQShortString reason = msg.getReplyText();
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 1a7b6bea80..9cdbc1e189 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -20,16 +20,16 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.client.AMQDestination.DestSyntax;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.messaging.Address;
-import org.apache.qpid.url.BindingURL;
+import java.net.URISyntaxException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Topic;
-import java.net.URISyntaxException;
+
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.url.BindingURL;
public class AMQTopic extends AMQDestination implements Topic
{
@@ -40,7 +40,7 @@ public class AMQTopic extends AMQDestination implements Topic
super(address);
}
- public AMQTopic(Address address) throws Exception
+ public AMQTopic(Address address)
{
super(address);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 187be8522c..3d0e972ca2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -136,6 +136,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
private List<StackTraceElement> _closedStack = null;
private boolean _isDurableSubscriber = false;
+ private int _addressType = AMQDestination.UNKNOWN_TYPE;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
@@ -203,7 +204,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
_arguments = ft;
-
+ _addressType = _destination.getAddressType();
}
public AMQDestination getDestination()
@@ -1066,4 +1067,14 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
_isDurableSubscriber = true;
}
+
+ void setAddressType(final int addressType)
+ {
+ _addressType = addressType;
+ }
+
+ int getAddressType()
+ {
+ return _addressType;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index cdffc73932..459030a10d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -151,7 +151,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
return getMessageFactory().createMessage(messageFrame.getDeliveryTag(),
messageFrame.isRedelivered(), messageFrame.getExchange() == null ? AMQShortString.EMPTY_STRING : messageFrame.getExchange(),
messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(),
- _queueDestinationCache, _topicDestinationCache);
+ _queueDestinationCache, _topicDestinationCache, getAddressType());
}
@@ -164,4 +164,6 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
{
return _rejectBehaviour;
}
+
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
index 9bdef22f96..efe91f6797 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
@@ -20,14 +20,14 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.AMQException;
-
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
+import org.apache.qpid.AMQException;
+
/**
* Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
*
@@ -43,6 +43,7 @@ class TopicSubscriberAdaptor<C extends BasicMessageConsumer> implements TopicSub
_topic = topic;
_consumer = consumer;
_noLocal = noLocal;
+ consumer.setAddressType(AMQDestination.TOPIC_TYPE);
}
TopicSubscriberAdaptor(Topic topic, C consumer)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index bd089eb6a8..6e1ca889c0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -133,7 +133,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
}
}
dest = (AMQDestination) convertToAddressBasedDestination(_deliveryProps.getExchange(),
- _deliveryProps.getRoutingKey(), subject);
+ _deliveryProps.getRoutingKey(), subject, false, AMQDestination.UNKNOWN_TYPE);
}
setJMSDestination(dest);
@@ -280,7 +280,8 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
}
else
{
- dest = convertToAddressBasedDestination(exchange,routingKey,null);
+ dest = convertToAddressBasedDestination(exchange,routingKey,null, false,
+ AMQDestination.UNKNOWN_TYPE);
}
_destinationCache.put(replyTo, dest);
}
@@ -288,49 +289,6 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
return dest;
}
}
-
- private Destination convertToAddressBasedDestination(String exchange, String routingKey, String subject)
- {
- String addr;
- boolean isQueue = true;
- if ("".equals(exchange)) // type Queue
- {
- subject = (subject == null) ? "" : "/" + subject;
- addr = routingKey + subject;
- }
- else
- {
- addr = exchange + "/" + routingKey;
- isQueue = false;
- }
-
- try
- {
- AMQDestination dest = (AMQDestination)AMQDestination.createDestination("ADDR:" + addr);
- if (isQueue)
- {
- dest.setQueueName(new AMQShortString(routingKey));
- dest.setRoutingKey(new AMQShortString(routingKey));
- dest.setExchangeName(new AMQShortString(""));
- }
- else
- {
- dest.setRoutingKey(new AMQShortString(routingKey));
- dest.setExchangeName(new AMQShortString(exchange));
- }
- return dest;
- }
- catch(Exception e)
- {
- // An exception is only thrown here if the address syntax is invalid.
- // Logging the exception, but not throwing as this is only important to Qpid developers.
- // An exception here means a bug in the code.
- _logger.error("Exception when constructing an address string from the ReplyTo struct");
-
- // falling back to the old way of doing it to ensure the application continues.
- return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
- }
- }
public void setJMSReplyTo(Destination destination) throws JMSException
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
index d9d2cf85d3..0c8c853f07 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -21,6 +21,18 @@
package org.apache.qpid.client.message;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.Queue;
+
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
@@ -32,17 +44,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageNotWriteableException;
-import javax.jms.Queue;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-
public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
@@ -63,6 +64,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
});
public static final String JMS_TYPE = "x-jms-type";
+ public static final boolean STRICT_JMS = Boolean.getBoolean("strict-jms");
private boolean _readableProperties = false;
@@ -96,7 +98,8 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
// Used when generating a received message object
protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
AMQShortString routingKey, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
- AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ int addressType)
{
this(contentHeader, deliveryTag);
@@ -104,28 +107,53 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
AMQDestination dest = null;
- // If we have a type set the attempt to use that.
- if (type != null)
+ if(AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)
{
- switch (type.intValue())
+ // If we have a type set the attempt to use that.
+ if (type != null)
{
- case AMQDestination.QUEUE_TYPE:
- dest = queueDestinationCache.getDestination(exchange, routingKey);
- break;
- case AMQDestination.TOPIC_TYPE:
- dest = topicDestinationCache.getDestination(exchange, routingKey);
- break;
- default:
- // Use the generateDestination method
- dest = null;
+ switch (type.intValue())
+ {
+ case AMQDestination.QUEUE_TYPE:
+ dest = queueDestinationCache.getDestination(exchange, routingKey);
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ dest = topicDestinationCache.getDestination(exchange, routingKey);
+ break;
+ default:
+ // Use the generateDestination method
+ dest = null;
+ }
}
- }
- if (dest == null)
+ if (dest == null)
+ {
+ dest = generateDestination(exchange, routingKey);
+ }
+ }
+ else
{
- dest = generateDestination(exchange, routingKey);
+ String subject = null;
+ if (contentHeader.getHeaders() != null
+ && contentHeader.getHeaders().containsKey(QpidMessageProperties.QPID_SUBJECT)
+ && STRICT_JMS)
+ {
+ subject = contentHeader.getHeaders().getString(QpidMessageProperties.QPID_SUBJECT);
+ if (subject != null)
+ {
+ contentHeader.getHeaders().remove(QpidMessageProperties.QPID_SUBJECT);
+ contentHeader.getHeaders().setString("JMS_" + QpidMessageProperties.QPID_SUBJECT_JMS_PROPER,
+ subject);
+ }
+ }
+ if(type == null)
+ {
+ type = addressType;
+ }
+ dest = (AMQDestination) convertToAddressBasedDestination(AMQShortString.toString(exchange),
+ AMQShortString.toString(routingKey), subject,
+ true, type);
}
-
setJMSDestination(dest);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
index 784c33cf02..d1ca6adb60 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
@@ -20,6 +20,16 @@
*/
package org.apache.qpid.client.message;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
@@ -28,11 +38,6 @@ import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import javax.jms.JMSException;
-import javax.jms.Session;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* This abstract class provides exchange lookup functionality that is shared
* between all MessageDelegates. Update facilities are provided so that the 0-10
@@ -45,6 +50,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
{
+ private static final Logger _logger = LoggerFactory.getLogger(AMQMessageDelegate.class);
private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>();
private static Map<String,ExchangeInfo> _exchangeMap = new ConcurrentHashMap<String, ExchangeInfo>();
@@ -222,6 +228,76 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
{
return _session;
}
+
+ protected Destination convertToAddressBasedDestination(String exchange,
+ String routingKey,
+ String subject,
+ boolean useNodeTypeForDestinationType,
+ int type)
+ {
+ String addr;
+ boolean isQueue = true;
+ if ("".equals(exchange)) // type Queue
+ {
+ subject = (subject == null) ? "" : "/" + subject;
+ addr = routingKey + subject;
+
+ }
+ else
+ {
+ addr = exchange + "/" + routingKey;
+ isQueue = false;
+ }
+
+ if(useNodeTypeForDestinationType)
+ {
+ if(type == AMQDestination.UNKNOWN_TYPE && "".equals(exchange))
+ {
+ type = AMQDestination.QUEUE_TYPE;
+ }
+
+ switch(type)
+ {
+ case AMQDestination.QUEUE_TYPE:
+ addr = addr + " ; { node: { type: queue } } ";
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ addr = addr + " ; { node: { type: topic } } ";
+ break;
+ default:
+ // do nothing
+ }
+ }
+
+
+ try
+ {
+ AMQDestination dest = (AMQDestination)AMQDestination.createDestination("ADDR:" + addr,
+ useNodeTypeForDestinationType);
+ if (isQueue)
+ {
+ dest.setQueueName(new AMQShortString(routingKey));
+ dest.setRoutingKey(new AMQShortString(routingKey));
+ dest.setExchangeName(new AMQShortString(""));
+ }
+ else
+ {
+ dest.setRoutingKey(new AMQShortString(routingKey));
+ dest.setExchangeName(new AMQShortString(exchange));
+ }
+ return dest;
+ }
+ catch(Exception e)
+ {
+ // An exception is only thrown here if the address syntax is invalid.
+ // Logging the exception, but not throwing as this is only important to Qpid developers.
+ // An exception here means a bug in the code.
+ _logger.error("Exception when constructing an address string from the ReplyTo struct");
+
+ // falling back to the old way of doing it to ensure the application continues.
+ return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
+ }
+ }
}
class ExchangeInfo
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index 9748038b9b..65f4478baa 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -47,11 +47,14 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
{
private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
- protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
- AMQShortString exchange, AMQShortString routingKey,
+ protected AbstractJMSMessage create08MessageWithBody(long messageNbr,
+ ContentHeaderBody contentHeader,
+ AMQShortString exchange,
+ AMQShortString routingKey,
List bodies,
AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
- AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache) throws AMQException
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ final int addressType) throws AMQException
{
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
@@ -117,7 +120,8 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
contentHeader.getProperties(),
- exchange, routingKey, queueDestinationCache, topicDestinationCache);
+ exchange, routingKey, queueDestinationCache,
+ topicDestinationCache, addressType);
return createMessage(delegate, data);
}
@@ -162,13 +166,15 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
return message;
}
+ @Override
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader,
AMQShortString exchange, AMQShortString routingKey, List bodies,
AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
- AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ int addressType)
throws JMSException, AMQException
{
- final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache);
+ final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache, addressType);
msg.setJMSRedelivered(redelivered);
msg.setReceivedFromServer();
return msg;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
index 70c6aa4c75..b073275421 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.client.message;
+import java.util.List;
+
+import javax.jms.JMSException;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession_0_8;
@@ -29,16 +33,18 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.MessageProperties;
-import javax.jms.JMSException;
-import java.util.List;
-
public interface MessageFactory
{
- AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+ AbstractJMSMessage createMessage(long deliveryTag,
+ boolean redelivered,
ContentHeaderBody contentHeader,
- AMQShortString exchange, AMQShortString routingKey,
- List bodies, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
+ AMQShortString exchange,
+ AMQShortString routingKey,
+ List bodies,
+ AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ final int addressType)
throws JMSException, AMQException;
AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index 7e1ce20238..de46887895 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -20,6 +20,12 @@
*/
package org.apache.qpid.client.message;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,11 +40,6 @@ import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
-import javax.jms.JMSException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
public class MessageFactoryRegistry
{
/**
@@ -95,19 +96,24 @@ public class MessageFactoryRegistry
* Create a message. This looks up the MIME type from the content header and instantiates the appropriate
* concrete message type.
*
- *
- * @param deliveryTag the AMQ message id
+ * @param deliveryTag the AMQ message id
* @param redelivered true if redelivered
* @param contentHeader the content header that was received
* @param bodies a list of ContentBody instances @return the message.
* @param queueDestinationCache
- *@param topicDestinationCache @throws AMQException
+ * @param topicDestinationCache @throws AMQException
+ * @param addressType
* @throws JMSException
*/
- public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
- AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies,
+ public AbstractJMSMessage createMessage(long deliveryTag,
+ boolean redelivered,
+ AMQShortString exchange,
+ AMQShortString routingKey,
+ ContentHeaderBody contentHeader,
+ List bodies,
AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
- AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ final int addressType)
throws AMQException, JMSException
{
BasicContentHeaderProperties properties = contentHeader.getProperties();
@@ -124,7 +130,7 @@ public class MessageFactoryRegistry
mf = _default;
}
- return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache);
+ return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache, addressType);
}
public AbstractJMSMessage createMessage(MessageTransfer transfer) throws AMQException, JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
index 91e2143c47..116fd11942 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -159,7 +159,7 @@ public class AddressHelper
}
}
- public int getNodeType() throws Exception
+ public int getNodeType()
{
if (_nodePropAccess == null || _nodePropAccess.getString(TYPE) == null)
{
@@ -176,7 +176,7 @@ public class AddressHelper
}
else
{
- throw new Exception("unkown exchange type");
+ throw new IllegalArgumentException("unknown exchange type");
}
}
@@ -212,7 +212,7 @@ public class AddressHelper
return (result == null) ? defaultValue : result.booleanValue();
}
- public Link getLink() throws Exception
+ public Link getLink()
{
Link link = new Link();
link.setSubscription(new Subscription());
@@ -235,7 +235,7 @@ public class AddressHelper
}
else
{
- throw new Exception("The reliability mode '" +
+ throw new IllegalArgumentException("The reliability mode '" +
reliability + "' is not yet supported");
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
index 8c23ddad5e..a4c3d20042 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
@@ -255,7 +255,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
{
try
{
- return AMQDestination.createDestination(str);
+ return AMQDestination.createDestination(str, false);
}
catch (Exception e)
{