summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java63
2 files changed, 35 insertions, 32 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index a37b532617..3316ae801b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -118,7 +118,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Immediate message prefetch default. */
public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
- private final boolean _delareQueues =
+ private final boolean _declareQueues =
Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "true"));
private final boolean _declareExchanges =
@@ -2871,7 +2871,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
declareExchange(amqd, nowait);
}
- if ((_delareQueues || amqd.isNameRequired()) && !amqd.neverDeclare())
+ if ((_declareQueues || amqd.isNameRequired()) && !amqd.neverDeclare())
{
declareQueue(amqd, consumer.isNoLocal(), nowait);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index cb8f81f68f..46473900c0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -24,6 +24,7 @@ import static org.apache.qpid.transport.Option.UNRELIABLE;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
@@ -37,6 +38,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jms.Destination;
import javax.jms.JMSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.Binding;
@@ -57,29 +61,9 @@ import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ExchangeBoundResult;
-import org.apache.qpid.transport.ExchangeQueryResult;
-import org.apache.qpid.transport.ExecutionErrorCode;
-import org.apache.qpid.transport.ExecutionException;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.QueueQueryResult;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.RangeSetFactory;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.SessionListener;
-import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.*;
import org.apache.qpid.util.Serial;
import org.apache.qpid.util.Strings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This is a 0.10 Session
@@ -362,19 +346,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
final AMQDestination destination, final boolean nowait)
throws AMQException
{
- if (destination.getDestSyntax() == DestSyntax.BURL)
+ if (destination == null || destination.getDestSyntax() == DestSyntax.BURL)
{
Map args = FieldTableSupport.convertToMap(arguments);
- for (AMQShortString rk: destination.getBindingKeys())
+ if(destination != null)
{
- _logger.debug("Binding queue : " + queueName.toString() +
- " exchange: " + exchangeName.toString() +
- " using binding key " + rk.asString());
- getQpidSession().exchangeBind(queueName.toString(),
- exchangeName.toString(),
- rk.toString(),
- args);
+ for (AMQShortString rk: destination.getBindingKeys())
+ {
+ doSendQueueBind(queueName, exchangeName, args, rk);
+ }
+ if(!Arrays.asList(destination.getBindingKeys()).contains(routingKey))
+ {
+ doSendQueueBind(queueName, exchangeName, args, routingKey);
+ }
+ }
+ else
+ {
+ doSendQueueBind(queueName, exchangeName, args, routingKey);
}
}
else
@@ -420,6 +409,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
+ private void doSendQueueBind(final AMQShortString queueName,
+ final AMQShortString exchangeName,
+ final Map args,
+ final AMQShortString rk)
+ {
+ _logger.debug("Binding queue : " + queueName.toString() +
+ " exchange: " + exchangeName.toString() +
+ " using binding key " + rk.asString());
+ getQpidSession().exchangeBind(queueName.toString(),
+ exchangeName.toString(),
+ rk.toString(),
+ args);
+ }
+
/**
* Close this session.