diff options
Diffstat (limited to 'qpid/java/client')
3 files changed, 14 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index a71555480f..2eeea4c967 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -89,6 +89,8 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte private RejectBehaviour _rejectBehaviour; + private Map<String,Object> _consumerArguments; + public static final int QUEUE_TYPE = 1; public static final int TOPIC_TYPE = 2; public static final int UNKNOWN_TYPE = 3; @@ -299,6 +301,7 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte _bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys(); final String rejectBehaviourValue = binding.getOption(BindingURL.OPTION_REJECT_BEHAVIOUR); _rejectBehaviour = rejectBehaviourValue == null ? null : RejectBehaviour.valueOf(rejectBehaviourValue.toUpperCase()); + _consumerArguments = binding.getConsumerOptions(); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName) @@ -718,6 +721,11 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte return result; } + public Map<String, Object> getConsumerArguments() + { + return _consumerArguments; + } + public Reference getReference() throws NamingException { return new Reference( diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 9cef1f8dce..3c947043c6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -187,6 +187,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } final FieldTable ft = FieldTableFactory.newFieldTable(); + if(destination.getConsumerArguments() != null) + { + ft.addAll(FieldTable.convertToFieldTable(destination.getConsumerArguments())); + } // rawSelector is used by HeadersExchange and is not a JMS Selector if (rawSelector != null) { @@ -203,6 +207,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa ft.put(AMQPFilterTypes.NO_LOCAL.getValue(), noLocal); } + _arguments = ft; _addressType = _destination.getAddressType(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 1d7bb6087a..4f2715bd7b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -69,6 +69,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); } + _topicDestinationCache = session.getTopicDestinationCache(); _queueDestinationCache = session.getQueueDestinationCache(); |
