diff options
Diffstat (limited to 'qpid/java/client/src')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 4 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 63 |
2 files changed, 35 insertions, 32 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a37b532617..3316ae801b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -118,7 +118,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Immediate message prefetch default. */ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - private final boolean _delareQueues = + private final boolean _declareQueues = Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "true")); private final boolean _declareExchanges = @@ -2871,7 +2871,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic declareExchange(amqd, nowait); } - if ((_delareQueues || amqd.isNameRequired()) && !amqd.neverDeclare()) + if ((_declareQueues || amqd.isNameRequired()) && !amqd.neverDeclare()) { declareQueue(amqd, consumer.isNoLocal(), nowait); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index cb8f81f68f..46473900c0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -24,6 +24,7 @@ import static org.apache.qpid.transport.Option.UNRELIABLE; import java.lang.ref.WeakReference; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -37,6 +38,9 @@ import java.util.concurrent.ConcurrentLinkedQueue; import javax.jms.Destination; import javax.jms.JMSException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; @@ -57,29 +61,9 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ExchangeBoundResult; -import org.apache.qpid.transport.ExchangeQueryResult; -import org.apache.qpid.transport.ExecutionErrorCode; -import org.apache.qpid.transport.ExecutionException; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.QueueQueryResult; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.RangeSetFactory; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionException; -import org.apache.qpid.transport.SessionListener; -import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.*; import org.apache.qpid.util.Serial; import org.apache.qpid.util.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This is a 0.10 Session @@ -362,19 +346,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic final AMQDestination destination, final boolean nowait) throws AMQException { - if (destination.getDestSyntax() == DestSyntax.BURL) + if (destination == null || destination.getDestSyntax() == DestSyntax.BURL) { Map args = FieldTableSupport.convertToMap(arguments); - for (AMQShortString rk: destination.getBindingKeys()) + if(destination != null) { - _logger.debug("Binding queue : " + queueName.toString() + - " exchange: " + exchangeName.toString() + - " using binding key " + rk.asString()); - getQpidSession().exchangeBind(queueName.toString(), - exchangeName.toString(), - rk.toString(), - args); + for (AMQShortString rk: destination.getBindingKeys()) + { + doSendQueueBind(queueName, exchangeName, args, rk); + } + if(!Arrays.asList(destination.getBindingKeys()).contains(routingKey)) + { + doSendQueueBind(queueName, exchangeName, args, routingKey); + } + } + else + { + doSendQueueBind(queueName, exchangeName, args, routingKey); } } else @@ -420,6 +409,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } + private void doSendQueueBind(final AMQShortString queueName, + final AMQShortString exchangeName, + final Map args, + final AMQShortString rk) + { + _logger.debug("Binding queue : " + queueName.toString() + + " exchange: " + exchangeName.toString() + + " using binding key " + rk.asString()); + getQpidSession().exchangeBind(queueName.toString(), + exchangeName.toString(), + rk.toString(), + args); + } + /** * Close this session. |
