diff options
Diffstat (limited to 'qpid/java')
4 files changed, 76 insertions, 29 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 7e320956b6..295a7191e7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -146,6 +146,33 @@ public class HeadersExchange extends AbstractExchange return new ArrayList<BaseQueue>(queues); } + + public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue) + { + CopyOnWriteArraySet<Binding> bindings; + if(bindingKey == null) + { + bindings = new CopyOnWriteArraySet<Binding>(getBindings()); + } + else + { + bindings = _bindingsByKey.get(bindingKey); + } + + if(bindings != null) + { + for(Binding binding : bindings) + { + if(queue == null || binding.getQueue().equals(queue)) + { + return arguments == null ? binding.getArguments() == null : binding.getArguments().equals(arguments); + } + } + } + + return false; + } + public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) { //fixme isBound here should take the arguements in to consideration. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 6d7242a78d..e4c452a5d6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -20,13 +20,23 @@ */ package org.apache.qpid.server.exchange; +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentHashMap; +import javax.management.JMException; import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.filter.SelectorParsingException; +import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -36,23 +46,11 @@ import org.apache.qpid.server.exchange.topic.TopicMatcherResult; import org.apache.qpid.server.exchange.topic.TopicNormalizer; import org.apache.qpid.server.exchange.topic.TopicParser; import org.apache.qpid.server.filter.JMSSelectorFilter; -import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.virtualhost.VirtualHost; -import javax.management.JMException; -import java.lang.ref.WeakReference; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.WeakHashMap; -import java.util.concurrent.ConcurrentHashMap; - public class TopicExchange extends AbstractExchange { @@ -275,6 +273,28 @@ public class TopicExchange extends AbstractExchange } } + public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue) + { + Binding binding = new Binding(null, bindingKey, queue, this, arguments); + if (arguments == null) + { + return _bindings.containsKey(binding); + } + else + { + FieldTable o = _bindings.get(binding); + if (o != null) + { + return arguments.equals(FieldTable.convertToMap(o)); + } + else + { + return false; + } + } + + } + public boolean isBound(AMQShortString routingKey, AMQQueue queue) { return isBound(routingKey, null, queue); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 629597f40b..ca8de53d32 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -859,12 +859,6 @@ public class ServerSessionDelegate extends SessionDelegate if(method.hasBindingKey()) { - if(method.hasArguments()) - { - FieldTable args = FieldTable.convertToFieldTable(method.getArguments()); - - result.setArgsNotMatched(!exchange.isBound(new AMQShortString(method.getBindingKey()), args, queue)); - } if(queueMatched) { result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue)); @@ -873,23 +867,28 @@ public class ServerSessionDelegate extends SessionDelegate { result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); } + + if(method.hasArguments()) + { + result.setArgsNotMatched(!exchange.isBound(result.getKeyNotMatched() ? null : method.getBindingKey(), method.getArguments(), queueMatched ? queue : null)); + } + } else if (method.hasArguments()) { - // TODO - + result.setArgsNotMatched(!exchange.isBound(null, method.getArguments(), queueMatched ? queue : null)); } - result.setQueueNotMatched(!exchange.isBound(queue)); - } else if(exchange != null && method.hasBindingKey()) { + result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); + if(method.hasArguments()) { - // TODO + result.setArgsNotMatched(!exchange.isBound(result.getKeyNotMatched() ? null : method.getBindingKey(), method.getArguments(), queue)); } - result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); + } @@ -898,11 +897,15 @@ public class ServerSessionDelegate extends SessionDelegate { if(method.hasArguments()) { - // TODO + result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), null)); } result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); } + else if(exchange != null && method.hasArguments()) + { + result.setArgsNotMatched(!exchange.isBound(null, method.getArguments(), null)); + } session.executionResult((int) method.getId(), result); diff --git a/qpid/java/test-profiles/python_tests/Java010PythonExcludes b/qpid/java/test-profiles/python_tests/Java010PythonExcludes index 70a15fea9b..42f8c18235 100644 --- a/qpid/java/test-profiles/python_tests/Java010PythonExcludes +++ b/qpid/java/test-profiles/python_tests/Java010PythonExcludes @@ -77,9 +77,6 @@ qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_reject_no_ qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_add_alternate_to_exchange qpid_tests.broker_0_10.new_api.GeneralTests.test_qpid_3481_acquired_to_alt_exchange -#QPID-3597 Headers exchange issues -qpid_tests.broker_0_10.query.QueryTests.test_exchange_bound_header - #QPID-3599 Tests fail due to differences in expected message Redelivered status qpid.tests.messaging.endpoints.SessionTests.testCommitAck qpid.tests.messaging.endpoints.SessionTests.testRelease |
