diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-28 14:31:52 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-28 14:31:52 +0000 |
| commit | 64f8ac4fd226e9201fad685efea5a3332a30e262 (patch) | |
| tree | c738e29520cad69665749ffb0e4b389dc2ac46cf /qpid/java | |
| parent | 9a5e9e59763989f2e1ceb8e7bc32376a203bc0d9 (diff) | |
| download | qpid-python-64f8ac4fd226e9201fad685efea5a3332a30e262.tar.gz | |
QPID-6052 : Use ADDR addresses for JMSDestination on incoming messages in 0-9-1 when the address mode is ADDR
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1621143 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
17 files changed, 252 insertions, 132 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java index 0e1658d2b3..57f8343874 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java @@ -58,7 +58,7 @@ public class AMQAnyDestination extends AMQDestination implements Queue, Topic super(str); } - public AMQAnyDestination(Address addr) throws Exception + public AMQAnyDestination(Address addr) { super(addr); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 2714caf2a1..a71555480f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -211,7 +211,7 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte { } - protected AMQDestination(Address address) throws Exception + protected AMQDestination(Address address) { this._address = address; getInfoFromAddress(); @@ -749,7 +749,8 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte } } - public static Destination createDestination(String str) throws Exception + public static Destination createDestination(String str, final boolean useNodeTypeForDestinationType) + throws URISyntaxException { DestSyntax syntax = getDestType(str); str = stripSyntaxPrefix(str); @@ -760,7 +761,24 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte else { Address address = createAddressFromString(str); - return new AMQAnyDestination(address); + if(useNodeTypeForDestinationType) + { + AddressHelper helper = new AddressHelper(address); + switch(helper.getNodeType()) + { + case AMQDestination.QUEUE_TYPE: + return new AMQQueue(address); + case AMQDestination.TOPIC_TYPE: + return new AMQTopic(address); + default: + return new AMQAnyDestination(address); + } + + } + else + { + return new AMQAnyDestination(address); + } } } @@ -912,7 +930,7 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte return Address.parse(str); } - private void getInfoFromAddress() throws Exception + private void getInfoFromAddress() { _name = _address.getName(); _subject = _address.getSubject(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java index f65ecece2b..9b00883dfb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java @@ -20,13 +20,15 @@ */ package org.apache.qpid.client; +import java.net.URISyntaxException; + +import javax.jms.Queue; + import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.messaging.Address; import org.apache.qpid.url.BindingURL; -import javax.jms.Queue; -import java.net.URISyntaxException; - public class AMQQueue extends AMQDestination implements Queue { private static final long serialVersionUID = -1283142598932655606L; @@ -36,6 +38,11 @@ public class AMQQueue extends AMQDestination implements Queue super(); } + public AMQQueue(Address address) + { + super(address); + } + public AMQQueue(String address) throws URISyntaxException { super(address); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 9bdcb9e83f..29460bb42d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1424,7 +1424,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkValidDestination(destination); Queue dest = validateQueue(destination); C consumer = (C) createConsumer(dest); - + consumer.setAddressType(AMQDestination.QUEUE_TYPE); return new QueueReceiverAdaptor(dest, consumer); } @@ -1443,7 +1443,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkValidDestination(destination); Queue dest = validateQueue(destination); C consumer = (C) createConsumer(dest, messageSelector); - + consumer.setAddressType(AMQDestination.QUEUE_TYPE); return new QueueReceiverAdaptor(dest, consumer); } @@ -1461,7 +1461,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkNotClosed(); Queue dest = validateQueue(queue); C consumer = (C) createConsumer(dest); - + consumer.setAddressType(AMQDestination.QUEUE_TYPE); return new QueueReceiverAdaptor(dest, consumer); } @@ -1480,7 +1480,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkNotClosed(); Queue dest = validateQueue(queue); C consumer = (C) createConsumer(dest, messageSelector); - + consumer.setAddressType(AMQDestination.QUEUE_TYPE); return new QueueReceiverAdaptor(dest, consumer); } @@ -2590,7 +2590,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ("Cannot create a durable subscription with a temporary topic: " + topic); } - if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic)) + if (!(topic instanceof AMQDestination)) { throw new javax.jms.InvalidDestinationException( "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index b0615ea99f..31df674a09 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -774,7 +774,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe // Bounced message is processed here, away from the mina thread AbstractJMSMessage bouncedMessage = getMessageFactoryRegistry().createMessage(0, false, msg.getExchange(), - msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache, _topicDestinationCache); + msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache, + _topicDestinationCache, AMQDestination.UNKNOWN_TYPE); AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); AMQShortString reason = msg.getReplyText(); _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 1a7b6bea80..9cdbc1e189 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -20,16 +20,16 @@ */ package org.apache.qpid.client; -import org.apache.qpid.client.AMQDestination.DestSyntax; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.messaging.Address; -import org.apache.qpid.url.BindingURL; +import java.net.URISyntaxException; import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.Topic; -import java.net.URISyntaxException; + +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.messaging.Address; +import org.apache.qpid.url.BindingURL; public class AMQTopic extends AMQDestination implements Topic { @@ -40,7 +40,7 @@ public class AMQTopic extends AMQDestination implements Topic super(address); } - public AMQTopic(Address address) throws Exception + public AMQTopic(Address address) { super(address); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 187be8522c..3d0e972ca2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -136,6 +136,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa private List<StackTraceElement> _closedStack = null; private boolean _isDurableSubscriber = false; + private int _addressType = AMQDestination.UNKNOWN_TYPE; protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, @@ -203,7 +204,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } _arguments = ft; - + _addressType = _destination.getAddressType(); } public AMQDestination getDestination() @@ -1066,4 +1067,14 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { _isDurableSubscriber = true; } + + void setAddressType(final int addressType) + { + _addressType = addressType; + } + + int getAddressType() + { + return _addressType; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index cdffc73932..459030a10d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -151,7 +151,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe return getMessageFactory().createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(), messageFrame.getExchange() == null ? AMQShortString.EMPTY_STRING : messageFrame.getExchange(), messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(), - _queueDestinationCache, _topicDestinationCache); + _queueDestinationCache, _topicDestinationCache, getAddressType()); } @@ -164,4 +164,6 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe { return _rejectBehaviour; } + + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java index 9bdef22f96..efe91f6797 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java @@ -20,14 +20,14 @@ */ package org.apache.qpid.client; -import org.apache.qpid.AMQException; - import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicSubscriber; +import org.apache.qpid.AMQException; + /** * Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract * @@ -43,6 +43,7 @@ class TopicSubscriberAdaptor<C extends BasicMessageConsumer> implements TopicSub _topic = topic; _consumer = consumer; _noLocal = noLocal; + consumer.setAddressType(AMQDestination.TOPIC_TYPE); } TopicSubscriberAdaptor(Topic topic, C consumer) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index bd089eb6a8..6e1ca889c0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -133,7 +133,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate } } dest = (AMQDestination) convertToAddressBasedDestination(_deliveryProps.getExchange(), - _deliveryProps.getRoutingKey(), subject); + _deliveryProps.getRoutingKey(), subject, false, AMQDestination.UNKNOWN_TYPE); } setJMSDestination(dest); @@ -280,7 +280,8 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate } else { - dest = convertToAddressBasedDestination(exchange,routingKey,null); + dest = convertToAddressBasedDestination(exchange,routingKey,null, false, + AMQDestination.UNKNOWN_TYPE); } _destinationCache.put(replyTo, dest); } @@ -288,49 +289,6 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate return dest; } } - - private Destination convertToAddressBasedDestination(String exchange, String routingKey, String subject) - { - String addr; - boolean isQueue = true; - if ("".equals(exchange)) // type Queue - { - subject = (subject == null) ? "" : "/" + subject; - addr = routingKey + subject; - } - else - { - addr = exchange + "/" + routingKey; - isQueue = false; - } - - try - { - AMQDestination dest = (AMQDestination)AMQDestination.createDestination("ADDR:" + addr); - if (isQueue) - { - dest.setQueueName(new AMQShortString(routingKey)); - dest.setRoutingKey(new AMQShortString(routingKey)); - dest.setExchangeName(new AMQShortString("")); - } - else - { - dest.setRoutingKey(new AMQShortString(routingKey)); - dest.setExchangeName(new AMQShortString(exchange)); - } - return dest; - } - catch(Exception e) - { - // An exception is only thrown here if the address syntax is invalid. - // Logging the exception, but not throwing as this is only important to Qpid developers. - // An exception here means a bug in the code. - _logger.error("Exception when constructing an address string from the ReplyTo struct"); - - // falling back to the old way of doing it to ensure the application continues. - return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey)); - } - } public void setJMSReplyTo(Destination destination) throws JMSException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java index d9d2cf85d3..0c8c853f07 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java @@ -21,6 +21,18 @@ package org.apache.qpid.client.message; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.Enumeration; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageNotWriteableException; +import javax.jms.Queue; + import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; @@ -32,17 +44,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageNotWriteableException; -import javax.jms.Queue; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.Enumeration; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.UUID; - public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { @@ -63,6 +64,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate }); public static final String JMS_TYPE = "x-jms-type"; + public static final boolean STRICT_JMS = Boolean.getBoolean("strict-jms"); private boolean _readableProperties = false; @@ -96,7 +98,8 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate // Used when generating a received message object protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, - AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache) + AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache, + int addressType) { this(contentHeader, deliveryTag); @@ -104,28 +107,53 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate AMQDestination dest = null; - // If we have a type set the attempt to use that. - if (type != null) + if(AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL) { - switch (type.intValue()) + // If we have a type set the attempt to use that. + if (type != null) { - case AMQDestination.QUEUE_TYPE: - dest = queueDestinationCache.getDestination(exchange, routingKey); - break; - case AMQDestination.TOPIC_TYPE: - dest = topicDestinationCache.getDestination(exchange, routingKey); - break; - default: - // Use the generateDestination method - dest = null; + switch (type.intValue()) + { + case AMQDestination.QUEUE_TYPE: + dest = queueDestinationCache.getDestination(exchange, routingKey); + break; + case AMQDestination.TOPIC_TYPE: + dest = topicDestinationCache.getDestination(exchange, routingKey); + break; + default: + // Use the generateDestination method + dest = null; + } } - } - if (dest == null) + if (dest == null) + { + dest = generateDestination(exchange, routingKey); + } + } + else { - dest = generateDestination(exchange, routingKey); + String subject = null; + if (contentHeader.getHeaders() != null + && contentHeader.getHeaders().containsKey(QpidMessageProperties.QPID_SUBJECT) + && STRICT_JMS) + { + subject = contentHeader.getHeaders().getString(QpidMessageProperties.QPID_SUBJECT); + if (subject != null) + { + contentHeader.getHeaders().remove(QpidMessageProperties.QPID_SUBJECT); + contentHeader.getHeaders().setString("JMS_" + QpidMessageProperties.QPID_SUBJECT_JMS_PROPER, + subject); + } + } + if(type == null) + { + type = addressType; + } + dest = (AMQDestination) convertToAddressBasedDestination(AMQShortString.toString(exchange), + AMQShortString.toString(routingKey), subject, + true, type); } - setJMSDestination(dest); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java index 784c33cf02..d1ca6adb60 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java @@ -20,6 +20,16 @@ */ package org.apache.qpid.client.message; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; @@ -28,11 +38,6 @@ import org.apache.qpid.client.AMQTopic; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import javax.jms.JMSException; -import javax.jms.Session; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - /** * This abstract class provides exchange lookup functionality that is shared * between all MessageDelegates. Update facilities are provided so that the 0-10 @@ -45,6 +50,7 @@ import java.util.concurrent.ConcurrentHashMap; */ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate { + private static final Logger _logger = LoggerFactory.getLogger(AMQMessageDelegate.class); private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>(); private static Map<String,ExchangeInfo> _exchangeMap = new ConcurrentHashMap<String, ExchangeInfo>(); @@ -222,6 +228,76 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate { return _session; } + + protected Destination convertToAddressBasedDestination(String exchange, + String routingKey, + String subject, + boolean useNodeTypeForDestinationType, + int type) + { + String addr; + boolean isQueue = true; + if ("".equals(exchange)) // type Queue + { + subject = (subject == null) ? "" : "/" + subject; + addr = routingKey + subject; + + } + else + { + addr = exchange + "/" + routingKey; + isQueue = false; + } + + if(useNodeTypeForDestinationType) + { + if(type == AMQDestination.UNKNOWN_TYPE && "".equals(exchange)) + { + type = AMQDestination.QUEUE_TYPE; + } + + switch(type) + { + case AMQDestination.QUEUE_TYPE: + addr = addr + " ; { node: { type: queue } } "; + break; + case AMQDestination.TOPIC_TYPE: + addr = addr + " ; { node: { type: topic } } "; + break; + default: + // do nothing + } + } + + + try + { + AMQDestination dest = (AMQDestination)AMQDestination.createDestination("ADDR:" + addr, + useNodeTypeForDestinationType); + if (isQueue) + { + dest.setQueueName(new AMQShortString(routingKey)); + dest.setRoutingKey(new AMQShortString(routingKey)); + dest.setExchangeName(new AMQShortString("")); + } + else + { + dest.setRoutingKey(new AMQShortString(routingKey)); + dest.setExchangeName(new AMQShortString(exchange)); + } + return dest; + } + catch(Exception e) + { + // An exception is only thrown here if the address syntax is invalid. + // Logging the exception, but not throwing as this is only important to Qpid developers. + // An exception here means a bug in the code. + _logger.error("Exception when constructing an address string from the ReplyTo struct"); + + // falling back to the old way of doing it to ensure the application continues. + return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey)); + } + } } class ExchangeInfo diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 9748038b9b..65f4478baa 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -47,11 +47,14 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory { private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class); - protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader, - AMQShortString exchange, AMQShortString routingKey, + protected AbstractJMSMessage create08MessageWithBody(long messageNbr, + ContentHeaderBody contentHeader, + AMQShortString exchange, + AMQShortString routingKey, List bodies, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, - AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache) throws AMQException + AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache, + final int addressType) throws AMQException { ByteBuffer data; final boolean debug = _logger.isDebugEnabled(); @@ -117,7 +120,8 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr, contentHeader.getProperties(), - exchange, routingKey, queueDestinationCache, topicDestinationCache); + exchange, routingKey, queueDestinationCache, + topicDestinationCache, addressType); return createMessage(delegate, data); } @@ -162,13 +166,15 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory return message; } + @Override public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, List bodies, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, - AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache) + AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache, + int addressType) throws JMSException, AMQException { - final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache); + final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache, addressType); msg.setJMSRedelivered(redelivered); msg.setReceivedFromServer(); return msg; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java index 70c6aa4c75..b073275421 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.client.message; +import java.util.List; + +import javax.jms.JMSException; + import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession_0_8; @@ -29,16 +33,18 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.MessageProperties; -import javax.jms.JMSException; -import java.util.List; - public interface MessageFactory { - AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, + AbstractJMSMessage createMessage(long deliveryTag, + boolean redelivered, ContentHeaderBody contentHeader, - AMQShortString exchange, AMQShortString routingKey, - List bodies, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache) + AMQShortString exchange, + AMQShortString routingKey, + List bodies, + AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, + AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache, + final int addressType) throws JMSException, AMQException; AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index 7e1ce20238..de46887895 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.client.message; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.jms.JMSException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,11 +40,6 @@ import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; -import javax.jms.JMSException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class MessageFactoryRegistry { /** @@ -95,19 +96,24 @@ public class MessageFactoryRegistry * Create a message. This looks up the MIME type from the content header and instantiates the appropriate * concrete message type. * - * - * @param deliveryTag the AMQ message id + * @param deliveryTag the AMQ message id * @param redelivered true if redelivered * @param contentHeader the content header that was received * @param bodies a list of ContentBody instances @return the message. * @param queueDestinationCache - *@param topicDestinationCache @throws AMQException + * @param topicDestinationCache @throws AMQException + * @param addressType * @throws JMSException */ - public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange, - AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies, + public AbstractJMSMessage createMessage(long deliveryTag, + boolean redelivered, + AMQShortString exchange, + AMQShortString routingKey, + ContentHeaderBody contentHeader, + List bodies, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, - AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache) + AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache, + final int addressType) throws AMQException, JMSException { BasicContentHeaderProperties properties = contentHeader.getProperties(); @@ -124,7 +130,7 @@ public class MessageFactoryRegistry mf = _default; } - return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache); + return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache, addressType); } public AbstractJMSMessage createMessage(MessageTransfer transfer) throws AMQException, JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 91e2143c47..116fd11942 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -159,7 +159,7 @@ public class AddressHelper } } - public int getNodeType() throws Exception + public int getNodeType() { if (_nodePropAccess == null || _nodePropAccess.getString(TYPE) == null) { @@ -176,7 +176,7 @@ public class AddressHelper } else { - throw new Exception("unkown exchange type"); + throw new IllegalArgumentException("unknown exchange type"); } } @@ -212,7 +212,7 @@ public class AddressHelper return (result == null) ? defaultValue : result.booleanValue(); } - public Link getLink() throws Exception + public Link getLink() { Link link = new Link(); link.setSubscription(new Subscription()); @@ -235,7 +235,7 @@ public class AddressHelper } else { - throw new Exception("The reliability mode '" + + throw new IllegalArgumentException("The reliability mode '" + reliability + "' is not yet supported"); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index 8c23ddad5e..a4c3d20042 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -255,7 +255,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor { try { - return AMQDestination.createDestination(str); + return AMQDestination.createDestination(str, false); } catch (Exception e) { |
