diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-07-25 14:24:36 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-07-25 14:24:36 +0000 |
| commit | 42bfb186da9e911c208f22dd5f6c794b9bddd859 (patch) | |
| tree | 5c0d345cd36b6ef2d17f0abf39f247fc7fdc21c9 /qpid/java/client/src | |
| parent | 9a08d3ffc0a21d501a33a2b318fca72f85d0c096 (diff) | |
| download | qpid-python-42bfb186da9e911c208f22dd5f6c794b9bddd859.tar.gz | |
QPID-4304 : [Java Broker] Add an attribute to queues - "messageDurability" - which controls whether message data is persisted or not. By default, depend on the persistence setting of the message, but allow an individual queue to declare that all (or no) messages should be persisted on the queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1613440 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 4 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 63 |
2 files changed, 35 insertions, 32 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a37b532617..3316ae801b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -118,7 +118,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Immediate message prefetch default. */ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - private final boolean _delareQueues = + private final boolean _declareQueues = Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "true")); private final boolean _declareExchanges = @@ -2871,7 +2871,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic declareExchange(amqd, nowait); } - if ((_delareQueues || amqd.isNameRequired()) && !amqd.neverDeclare()) + if ((_declareQueues || amqd.isNameRequired()) && !amqd.neverDeclare()) { declareQueue(amqd, consumer.isNoLocal(), nowait); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index cb8f81f68f..46473900c0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -24,6 +24,7 @@ import static org.apache.qpid.transport.Option.UNRELIABLE; import java.lang.ref.WeakReference; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -37,6 +38,9 @@ import java.util.concurrent.ConcurrentLinkedQueue; import javax.jms.Destination; import javax.jms.JMSException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; @@ -57,29 +61,9 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ExchangeBoundResult; -import org.apache.qpid.transport.ExchangeQueryResult; -import org.apache.qpid.transport.ExecutionErrorCode; -import org.apache.qpid.transport.ExecutionException; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.QueueQueryResult; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.RangeSetFactory; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionException; -import org.apache.qpid.transport.SessionListener; -import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.*; import org.apache.qpid.util.Serial; import org.apache.qpid.util.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This is a 0.10 Session @@ -362,19 +346,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic final AMQDestination destination, final boolean nowait) throws AMQException { - if (destination.getDestSyntax() == DestSyntax.BURL) + if (destination == null || destination.getDestSyntax() == DestSyntax.BURL) { Map args = FieldTableSupport.convertToMap(arguments); - for (AMQShortString rk: destination.getBindingKeys()) + if(destination != null) { - _logger.debug("Binding queue : " + queueName.toString() + - " exchange: " + exchangeName.toString() + - " using binding key " + rk.asString()); - getQpidSession().exchangeBind(queueName.toString(), - exchangeName.toString(), - rk.toString(), - args); + for (AMQShortString rk: destination.getBindingKeys()) + { + doSendQueueBind(queueName, exchangeName, args, rk); + } + if(!Arrays.asList(destination.getBindingKeys()).contains(routingKey)) + { + doSendQueueBind(queueName, exchangeName, args, routingKey); + } + } + else + { + doSendQueueBind(queueName, exchangeName, args, routingKey); } } else @@ -420,6 +409,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } + private void doSendQueueBind(final AMQShortString queueName, + final AMQShortString exchangeName, + final Map args, + final AMQShortString rk) + { + _logger.debug("Binding queue : " + queueName.toString() + + " exchange: " + exchangeName.toString() + + " using binding key " + rk.asString()); + getQpidSession().exchangeBind(queueName.toString(), + exchangeName.toString(), + rk.toString(), + args); + } + /** * Close this session. |
