summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-17 17:41:28 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-17 17:41:28 +0000
commit2d3a1f587a2201eed232cdc4b4ee589ea52e3606 (patch)
treedf3ffc561d223df11e6c5d1034f855a4da97549f /qpid/java/client/src
parent2e60cf6ce254d749890c5740d0d3b15e2b1e41a3 (diff)
downloadqpid-python-2d3a1f587a2201eed232cdc4b4ee589ea52e3606.tar.gz
QPID-6395 : [Java Broker] add support for queue default filters, and filters solely on arrival time
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1660458 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java1
3 files changed, 14 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index a71555480f..2eeea4c967 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -89,6 +89,8 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
private RejectBehaviour _rejectBehaviour;
+ private Map<String,Object> _consumerArguments;
+
public static final int QUEUE_TYPE = 1;
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
@@ -299,6 +301,7 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
_bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys();
final String rejectBehaviourValue = binding.getOption(BindingURL.OPTION_REJECT_BEHAVIOUR);
_rejectBehaviour = rejectBehaviourValue == null ? null : RejectBehaviour.valueOf(rejectBehaviourValue.toUpperCase());
+ _consumerArguments = binding.getConsumerOptions();
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName)
@@ -718,6 +721,11 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
return result;
}
+ public Map<String, Object> getConsumerArguments()
+ {
+ return _consumerArguments;
+ }
+
public Reference getReference() throws NamingException
{
return new Reference(
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 9cef1f8dce..3c947043c6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -187,6 +187,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
final FieldTable ft = FieldTableFactory.newFieldTable();
+ if(destination.getConsumerArguments() != null)
+ {
+ ft.addAll(FieldTable.convertToFieldTable(destination.getConsumerArguments()));
+ }
// rawSelector is used by HeadersExchange and is not a JMS Selector
if (rawSelector != null)
{
@@ -203,6 +207,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
ft.put(AMQPFilterTypes.NO_LOCAL.getValue(), noLocal);
}
+
_arguments = ft;
_addressType = _destination.getAddressType();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 1d7bb6087a..4f2715bd7b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -69,6 +69,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
+
_topicDestinationCache = session.getTopicDestinationCache();
_queueDestinationCache = session.getQueueDestinationCache();