diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-06-01 19:24:36 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-06-01 19:24:36 +0000 |
| commit | 3e4d1f2f56ef296ea5132511faaa8689867c499c (patch) | |
| tree | 6c990d7d04cdaf07bd6dace7c157b882f8370cbf /qpid/java/broker/src/main | |
| parent | f6b68fa2e1ca27d46e6080a3568ef5d785eed548 (diff) | |
| download | qpid-python-3e4d1f2f56ef296ea5132511faaa8689867c499c.tar.gz | |
QPID-4897 : [Java Broker] Allow selectors on bindings fro non-topic exchanges
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1488561 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main')
11 files changed, 651 insertions, 505 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 0d05307cb4..58c2b33041 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -173,20 +174,106 @@ public abstract class AbstractExchange implements Exchange return getVirtualHost().getQueueRegistry(); } - public boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue) + public final boolean isBound(AMQShortString routingKey, FieldTable ft, AMQQueue queue) { - return isBound(new AMQShortString(bindingKey), queue); + return isBound(routingKey == null ? "" : routingKey.asString(), FieldTable.convertToMap(ft), queue); } + public final boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue) + { + for(Binding b : _bindings) + { + if(bindingKey.equals(b.getBindingKey()) && queue == b.getQueue()) + { + return (b.getArguments() == null || b.getArguments().isEmpty()) + ? (arguments == null || arguments.isEmpty()) + : b.getArguments().equals(arguments); + } + } + return false; + } + + public final boolean isBound(AMQShortString routingKey, AMQQueue queue) + { + return isBound(routingKey==null ? "" : routingKey.asString(), queue); + } + + public final boolean isBound(String bindingKey, AMQQueue queue) + { + for(Binding b : _bindings) + { + if(bindingKey.equals(b.getBindingKey()) && queue == b.getQueue()) + { + return true; + } + } + return false; + } + + public final boolean isBound(AMQShortString routingKey) + { + return isBound(routingKey == null ? "" : routingKey.asString()); + } + + public final boolean isBound(String bindingKey) + { + for(Binding b : _bindings) + { + if(bindingKey.equals(b.getBindingKey())) + { + return true; + } + } + return false; + } + + public final boolean isBound(AMQQueue queue) + { + for(Binding b : _bindings) + { + if(queue == b.getQueue()) + { + return true; + } + } + return false; + } - public boolean isBound(String bindingKey, AMQQueue queue) + @Override + public final boolean isBound(Map<String, Object> arguments, AMQQueue queue) { - return isBound(new AMQShortString(bindingKey), queue); + for(Binding b : _bindings) + { + if(queue == b.getQueue() && + ((b.getArguments() == null || b.getArguments().isEmpty()) + ? (arguments == null || arguments.isEmpty()) + : b.getArguments().equals(arguments))) + { + return true; + } + } + return false; + } + + @Override + public final boolean isBound(String bindingKey, Map<String, Object> arguments) + { + for(Binding b : _bindings) + { + if(b.getBindingKey().equals(bindingKey) && + ((b.getArguments() == null || b.getArguments().isEmpty()) + ? (arguments == null || arguments.isEmpty()) + : b.getArguments().equals(arguments))) + { + return true; + } + } + return false; } - public boolean isBound(String bindingKey) + public final boolean hasBindings() { - return isBound(new AMQShortString(bindingKey)); + return !_bindings.isEmpty(); } public Exchange getAlternateExchange() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index 4e136965a1..ccf955ed1c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -272,6 +272,18 @@ public class DefaultExchange implements Exchange } @Override + public boolean isBound(Map<String, Object> arguments, AMQQueue queue) + { + return (arguments == null || arguments.isEmpty()) && isBound(queue); + } + + @Override + public boolean isBound(String bindingKey, Map<String, Object> arguments) + { + return (arguments == null || arguments.isEmpty()) && isBound(bindingKey); + } + + @Override public boolean isBound(String bindingKey) { return _virtualHost.getQueueRegistry().getQueue(bindingKey) != null; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index fc6ce15bc4..2e2a93d638 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -20,9 +20,18 @@ */ package org.apache.qpid.server.exchange; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQInvalidArgumentException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.filter.JMSSelectorFilter; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; @@ -36,10 +45,14 @@ import java.util.concurrent.CopyOnWriteArraySet; public class DirectExchange extends AbstractExchange { + + private static final Logger _logger = Logger.getLogger(DirectExchange.class); + private static final class BindingSet { private CopyOnWriteArraySet<Binding> _bindings = new CopyOnWriteArraySet<Binding>(); - private List<BaseQueue> _queues = new ArrayList<BaseQueue>(); + private List<BaseQueue> _unfilteredQueues = new ArrayList<BaseQueue>(); + private Map<BaseQueue, MessageFilter> _filteredQueues = new HashMap<BaseQueue, MessageFilter>(); public synchronized void addBinding(Binding binding) { @@ -56,27 +69,59 @@ public class DirectExchange extends AbstractExchange private void recalculateQueues() { List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size()); + Map<BaseQueue, MessageFilter> filteredQueues = new HashMap<BaseQueue,MessageFilter>(); for(Binding b : _bindings) { - if(!queues.contains(b.getQueue())) + + if(FilterSupport.argumentsContainFilter(b.getArguments())) { - queues.add(b.getQueue()); + try + { + MessageFilter filter = FilterSupport.createMessageFilter(b.getArguments(), b.getQueue()); + filteredQueues.put(b.getQueue(),filter); + } + catch (AMQInvalidArgumentException e) + { + _logger.warn("Binding ignored: cannot parse filter on binding of queue '"+b.getQueue().getName() + + "' to exchange '" + b.getExchange().getName() + + "' with arguments: " + b.getArguments(), e); + } + + } + else + { + + if(!queues.contains(b.getQueue())) + { + queues.add(b.getQueue()); + } } } - _queues = queues; + _unfilteredQueues = queues; + _filteredQueues = filteredQueues; } - public List<BaseQueue> getQueues() + public List<BaseQueue> getUnfilteredQueues() { - return _queues; + return _unfilteredQueues; } public CopyOnWriteArraySet<Binding> getBindings() { return _bindings; } + + public boolean hasFilteredQueues() + { + return !_filteredQueues.isEmpty(); + } + + public Map<BaseQueue,MessageFilter> getFilteredQueues() + { + return _filteredQueues; + } } private final ConcurrentHashMap<String, BindingSet> _bindingsByKey = @@ -98,7 +143,30 @@ public class DirectExchange extends AbstractExchange if(bindings != null) { - return bindings.getQueues(); + List<BaseQueue> queues = bindings.getUnfilteredQueues(); + + if(bindings.hasFilteredQueues()) + { + Set<BaseQueue> queuesSet = new HashSet<BaseQueue>(queues); + + Map<BaseQueue, MessageFilter> filteredQueues = bindings.getFilteredQueues(); + for(Map.Entry<BaseQueue, MessageFilter> entry : filteredQueues.entrySet()) + { + if(!queuesSet.contains(entry.getKey())) + { + MessageFilter filter = entry.getValue(); + if(filter.matches(payload)) + { + queuesSet.add(entry.getKey()); + } + } + } + if(queues.size() != queuesSet.size()) + { + queues = new ArrayList<BaseQueue>(queuesSet); + } + } + return queues; } else { @@ -106,50 +174,6 @@ public class DirectExchange extends AbstractExchange } - - } - - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - return isBound(routingKey,queue); - } - - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - BindingSet bindings = _bindingsByKey.get(bindingKey); - if(bindings != null) - { - return bindings.getQueues().contains(queue); - } - return false; - - } - - public boolean isBound(AMQShortString routingKey) - { - String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - BindingSet bindings = _bindingsByKey.get(bindingKey); - return bindings != null && !bindings.getQueues().isEmpty(); - } - - public boolean isBound(AMQQueue queue) - { - - for (BindingSet bindings : _bindingsByKey.values()) - { - if(bindings.getQueues().contains(queue)) - { - return true; - } - - } - return false; - } - - public boolean hasBindings() - { - return !getBindings().isEmpty(); } protected void onBind(final Binding binding) @@ -189,5 +213,4 @@ public class DirectExchange extends AbstractExchange } - } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index a5a1d7f912..d483c3b29b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -145,12 +145,15 @@ public interface Exchange extends ExchangeReferrer Collection<Binding> getBindings(); + boolean isBound(String bindingKey); boolean isBound(String bindingKey, AMQQueue queue); - public boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue); + boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue); - boolean isBound(String bindingKey); + boolean isBound(Map<String, Object> arguments, AMQQueue queue); + + boolean isBound(String bindingKey, Map<String, Object> arguments); void removeReference(ExchangeReferrer exchange); @@ -158,6 +161,8 @@ public interface Exchange extends ExchangeReferrer boolean hasReferrers(); + + public interface BindingListener { void bindingAdded(Exchange exchange, Binding binding); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 6ad5eb261e..cd830d69a9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -20,11 +20,21 @@ */ package org.apache.qpid.server.exchange; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; +import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; @@ -42,7 +52,18 @@ public class FanoutExchange extends AbstractExchange /** * Maps from queue name to queue instances */ - private final ConcurrentHashMap<AMQQueue,Integer> _queues = new ConcurrentHashMap<AMQQueue,Integer>(); + private final Map<AMQQueue,Integer> _queues = new HashMap<AMQQueue,Integer>(); + private final CopyOnWriteArrayList<AMQQueue> _unfilteredQueues = new CopyOnWriteArrayList<AMQQueue>(); + private final CopyOnWriteArrayList<AMQQueue> _filteredQueues = new CopyOnWriteArrayList<AMQQueue>(); + + private final AtomicReference<Map<AMQQueue,Map<Binding, MessageFilter>>> _filteredBindings = + new AtomicReference<Map<AMQQueue,Map<Binding, MessageFilter>>>(); + { + Map<AMQQueue,Map<Binding, MessageFilter>> emptyMap = Collections.emptyMap(); + _filteredBindings.set(emptyMap); + } + + public static final ExchangeType<FanoutExchange> TYPE = new FanoutExchangeType(); @@ -54,115 +75,150 @@ public class FanoutExchange extends AbstractExchange public ArrayList<BaseQueue> doRoute(InboundMessage payload) { - - if (_logger.isDebugEnabled()) - { - _logger.debug("Publishing message to queue " + _queues); - } - for(Binding b : getBindings()) { b.incrementMatches(); } - return new ArrayList<BaseQueue>(_queues.keySet()); - - } + final ArrayList<BaseQueue> result = new ArrayList<BaseQueue>(_unfilteredQueues); - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - return isBound(routingKey, queue); - } - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - return isBound(queue); - } + final Map<AMQQueue, Map<Binding, MessageFilter>> filteredBindings = _filteredBindings.get(); + if(!_filteredQueues.isEmpty()) + { + for(AMQQueue q : _filteredQueues) + { + final Map<Binding, MessageFilter> bindingMessageFilterMap = filteredBindings.get(q); + if(!(bindingMessageFilterMap == null || result.contains(q))) + { + for(MessageFilter filter : bindingMessageFilterMap.values()) + { + if(filter.matches(payload)) + { + result.add(q); + break; + } + } + } + } - public boolean isBound(AMQShortString routingKey) - { + } - return (_queues != null) && !_queues.isEmpty(); - } - public boolean isBound(AMQQueue queue) - { - if (queue == null) + if (_logger.isDebugEnabled()) { - return false; + _logger.debug("Publishing message to queue " + result); } - return _queues.containsKey(queue); - } - public boolean hasBindings() - { - return !_queues.isEmpty(); + return result; + } - protected void onBind(final Binding binding) + + protected synchronized void onBind(final Binding binding) { AMQQueue queue = binding.getQueue(); assert queue != null; + if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments())) + { - Integer oldVal; + Integer oldVal; + if(_queues.containsKey(queue)) + { + _queues.put(queue,_queues.get(queue)+1); + } + else + { + _queues.put(queue, ONE); + _unfilteredQueues.add(queue); + // No longer any reason to check filters for this queue + _filteredQueues.remove(queue); + } - if((oldVal = _queues.putIfAbsent(queue, ONE)) != null) + } + else { - Integer newVal = oldVal+1; - while(!_queues.replace(queue, oldVal, newVal)) + try { - oldVal = _queues.get(queue); - if(oldVal == null) + + HashMap<AMQQueue,Map<Binding, MessageFilter>> filteredBindings = + new HashMap<AMQQueue,Map<Binding, MessageFilter>>(_filteredBindings.get()); + + Map<Binding, MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getQueue()); + final + MessageFilter messageFilter = + FilterSupport.createMessageFilter(binding.getArguments(), binding.getQueue()); + + if(bindingsForQueue != null) { - oldVal = _queues.putIfAbsent(queue, ONE); - if(oldVal == null) + bindingsForQueue = new HashMap<Binding,MessageFilter>(bindingsForQueue); + bindingsForQueue.put(binding, messageFilter); + } + else + { + bindingsForQueue = Collections.singletonMap(binding, messageFilter); + if(!_unfilteredQueues.contains(queue)) { - break; + _filteredQueues.add(queue); } } - newVal = oldVal + 1; + + filteredBindings.put(binding.getQueue(), bindingsForQueue); + + _filteredBindings.set(filteredBindings); + + } + catch (AMQInvalidArgumentException e) + { + _logger.warn("Cannoy bind queue " + queue + " to exchange this " + this + " beacuse selector cannot be parsed.", e); + return; } } - if (_logger.isDebugEnabled()) { _logger.debug("Binding queue " + queue - + " with routing key " + new AMQShortString(binding.getBindingKey()) + " to exchange " + this); + + " with routing key " + binding.getBindingKey() + " to exchange " + this); } } - protected void onUnbind(final Binding binding) + protected synchronized void onUnbind(final Binding binding) { AMQQueue queue = binding.getQueue(); - Integer oldValue = _queues.get(queue); - - boolean done = false; - - while(!(done || oldValue == null)) + if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments())) { - while(!(done || oldValue == null) && oldValue.intValue() == 1) + Integer oldValue = _queues.remove(queue); + if(ONE.equals(oldValue)) { - if(!_queues.remove(queue, oldValue)) + // should start checking filters for this queue + if(_filteredBindings.get().containsKey(queue)) { - oldValue = _queues.get(queue); - } - else - { - done = true; + _filteredQueues.add(queue); } + _unfilteredQueues.remove(queue); } - while(!(done || oldValue == null) && oldValue.intValue() != 1) + else { - Integer newValue = oldValue - 1; - if(!_queues.replace(queue, oldValue, newValue)) - { - oldValue = _queues.get(queue); - } - else - { - done = true; - } + _queues.put(queue,oldValue-1); } } + else // we are removing a binding with filters + { + HashMap<AMQQueue,Map<Binding, MessageFilter>> filteredBindings = + new HashMap<AMQQueue,Map<Binding, MessageFilter>>(_filteredBindings.get()); + + Map<Binding,MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getQueue()); + if(bindingsForQueue.size()>1) + { + bindingsForQueue = new HashMap<Binding,MessageFilter>(bindingsForQueue); + bindingsForQueue.remove(binding); + filteredBindings.put(binding.getQueue(),bindingsForQueue); + } + else + { + _filteredQueues.remove(queue); + } + _filteredBindings.set(filteredBindings); + + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java new file mode 100644 index 0000000000..880c9a2cf6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java @@ -0,0 +1,258 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.exchange; + +import java.lang.ref.WeakReference; +import java.util.Collections; +import java.util.Map; +import java.util.WeakHashMap; +import org.apache.qpid.AMQInvalidArgumentException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.filter.SelectorParsingException; +import org.apache.qpid.filter.selector.ParseException; +import org.apache.qpid.filter.selector.TokenMgrError; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.filter.JMSSelectorFilter; +import org.apache.qpid.server.filter.MessageFilter; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.Filterable; + +public class FilterSupport +{ + private static final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = + Collections.synchronizedMap(new WeakHashMap<String, WeakReference<JMSSelectorFilter>>()); + + static MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException + { + final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()); + return getMessageFilter(selectorString); + } + + + static MessageFilter createJMSSelectorFilter(Map<String, Object> args) throws AMQInvalidArgumentException + { + final String selectorString = (String) args.get(AMQPFilterTypes.JMS_SELECTOR.getValue()); + return getMessageFilter(selectorString); + } + + + private static MessageFilter getMessageFilter(String selectorString) throws AMQInvalidArgumentException + { + WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString); + JMSSelectorFilter selector = null; + + if(selectorRef == null || (selector = selectorRef.get())==null) + { + try + { + selector = new JMSSelectorFilter(selectorString); + } + catch (ParseException e) + { + throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); + } + catch (SelectorParsingException e) + { + throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); + } + catch (TokenMgrError e) + { + throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); + } + _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector)); + } + return selector; + } + + static boolean argumentsContainFilter(final FieldTable args) + { + return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args); + } + + + static boolean argumentsContainFilter(final Map<String, Object> args) + { + return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args); + } + + + static boolean argumentsContainNoLocal(final Map<String, Object> args) + { + return args != null + && args.containsKey(AMQPFilterTypes.NO_LOCAL.toString()) + && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.toString())); + } + + + static boolean argumentsContainNoLocal(final FieldTable args) + { + return args != null + && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue()) + && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue())); + } + + + static boolean argumentsContainJMSSelector(final Map<String,Object> args) + { + return args != null && (args.get(AMQPFilterTypes.JMS_SELECTOR.toString()) instanceof String) + && ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0; + } + + + static boolean argumentsContainJMSSelector(final FieldTable args) + { + return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) + && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0); + } + + + static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException + { + if(argumentsContainNoLocal(args)) + { + MessageFilter filter = new NoLocalFilter(queue); + + if(argumentsContainJMSSelector(args)) + { + filter = new CompoundFilter(filter, createJMSSelectorFilter(args)); + } + return filter; + } + else + { + return createJMSSelectorFilter(args); + } + } + + static MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException + { + if(argumentsContainNoLocal(args)) + { + MessageFilter filter = new NoLocalFilter(queue); + + if(argumentsContainJMSSelector(args)) + { + filter = new CompoundFilter(filter, createJMSSelectorFilter(args)); + } + return filter; + } + else + { + return createJMSSelectorFilter(args); + } + } + + static final class NoLocalFilter implements MessageFilter + { + private final AMQQueue _queue; + + public NoLocalFilter(AMQQueue queue) + { + _queue = queue; + } + + public boolean matches(Filterable message) + { + InboundMessage inbound = (InboundMessage) message; + final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession(); + return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound); + + } + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + + if (o == null || getClass() != o.getClass()) + { + return false; + } + + NoLocalFilter that = (NoLocalFilter) o; + + return _queue == null ? that._queue == null : _queue.equals(that._queue); + } + + @Override + public int hashCode() + { + return _queue != null ? _queue.hashCode() : 0; + } + } + + static final class CompoundFilter implements MessageFilter + { + private MessageFilter _noLocalFilter; + private MessageFilter _jmsSelectorFilter; + + public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter) + { + _noLocalFilter = filter; + _jmsSelectorFilter = jmsSelectorFilter; + } + + public boolean matches(Filterable message) + { + return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + CompoundFilter that = (CompoundFilter) o; + + if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null) + { + return false; + } + if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0; + result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0); + return result; + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java index b6f5f973f4..eb4a84a5b9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java @@ -22,15 +22,19 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; +import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.framing.AMQTypedValue; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.AMQMessageHeader; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.queue.Filterable; /** * Defines binding and matching based on a set of headers. @@ -44,13 +48,14 @@ class HeadersBinding private final Set<String> required = new HashSet<String>(); private final Map<String,Object> matches = new HashMap<String,Object>(); private boolean matchAny; + private MessageFilter _filter; /** * Creates a header binding for a set of mappings. Those mappings whose value is * null or the empty string are assumed only to be required headers, with * no constraint on the value. Those with a non-null value are assumed to * define a required match of value. - * + * * @param binding the binding to create a header binding using */ public HeadersBinding(Binding binding) @@ -66,9 +71,30 @@ class HeadersBinding _mappings = null; } } - + private void initMappings() { + if(FilterSupport.argumentsContainFilter(_mappings)) + { + try + { + _filter = FilterSupport.createMessageFilter(_mappings,_binding.getQueue()); + } + catch (AMQInvalidArgumentException e) + { + _logger.warn("Invalid filter in binding queue '"+_binding.getQueue().getName() + +"' to exchange '"+_binding.getExchange().getName() + +"' with arguments: " + _binding.getArguments()); + _filter = new MessageFilter() + { + @Override + public boolean matches(Filterable message) + { + return false; + } + }; + } + } for(Map.Entry<String, Object> entry : _mappings.entrySet()) { String propertyName = entry.getKey(); @@ -87,7 +113,7 @@ class HeadersBinding } } } - + public Binding getBinding() { return _binding; @@ -111,6 +137,11 @@ class HeadersBinding } } + public boolean matches(InboundMessage message) + { + return matches(message.getMessageHeader()) && (_filter == null || _filter.matches(message)); + } + private boolean and(AMQMessageHeader headers) { if(headers.containsHeaders(required)) @@ -215,7 +246,7 @@ class HeadersBinding { return key.startsWith("X-") || key.startsWith("x-"); } - + @Override public boolean equals(final Object o) { @@ -250,4 +281,4 @@ class HeadersBinding return true; } -}
\ No newline at end of file +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 746c8ac6bc..9fb745d553 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -69,14 +69,14 @@ public class HeadersExchange extends AbstractExchange { private static final Logger _logger = Logger.getLogger(HeadersExchange.class); - + private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>> _bindingsByKey = new ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>(); - + private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers = new CopyOnWriteArrayList<HeadersBinding>(); - + public static final ExchangeType<HeadersExchange> TYPE = new HeadersExchangeType(); public HeadersExchange() @@ -87,112 +87,31 @@ public class HeadersExchange extends AbstractExchange public ArrayList<BaseQueue> doRoute(InboundMessage payload) { - AMQMessageHeader header = payload.getMessageHeader(); if (_logger.isDebugEnabled()) { - _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + header); + _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + payload.getMessageHeader()); } - + LinkedHashSet<BaseQueue> queues = new LinkedHashSet<BaseQueue>(); - + for (HeadersBinding hb : _bindingHeaderMatchers) { - if (hb.matches(header)) + if (hb.matches(payload)) { Binding b = hb.getBinding(); - + b.incrementMatches(); - + if (_logger.isDebugEnabled()) { _logger.debug("Exchange " + getNameShortString() + ": delivering message with headers " + - header + " to " + b.getQueue().getNameShortString()); + payload.getMessageHeader() + " to " + b.getQueue().getNameShortString()); } queues.add(b.getQueue()); } } - - return new ArrayList<BaseQueue>(queues); - } - - - public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue) - { - CopyOnWriteArraySet<Binding> bindings; - if(bindingKey == null) - { - bindings = new CopyOnWriteArraySet<Binding>(getBindings()); - } - else - { - bindings = _bindingsByKey.get(bindingKey); - } - - if(bindings != null) - { - for(Binding binding : bindings) - { - if(queue == null || binding.getQueue().equals(queue)) - { - return arguments == null ? binding.getArguments() == null : binding.getArguments().equals(arguments); - } - } - } - - return false; - } - - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - //fixme isBound here should take the arguements in to consideration. - return isBound(routingKey, queue); - } - - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey); - - if(bindings != null) - { - for(Binding binding : bindings) - { - if(binding.getQueue().equals(queue)) - { - return true; - } - } - } - - return false; - } - - public boolean isBound(AMQShortString routingKey) - { - String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey); - return bindings != null && !bindings.isEmpty(); - } - - public boolean isBound(AMQQueue queue) - { - for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values()) - { - for(Binding binding : bindings) - { - if(binding.getQueue().equals(queue)) - { - return true; - } - } - } - - return false; - } - public boolean hasBindings() - { - return !getBindings().isEmpty(); + return new ArrayList<BaseQueue>(queues); } protected void onBind(final Binding binding) @@ -216,7 +135,7 @@ public class HeadersExchange extends AbstractExchange bindings = newBindings; } } - + if(_logger.isDebugEnabled()) { _logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 6d548be508..9d41856dc0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -20,21 +20,15 @@ */ package org.apache.qpid.server.exchange; -import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.qpid.AMQInvalidArgumentException; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.filter.SelectorParsingException; -import org.apache.qpid.filter.selector.ParseException; -import org.apache.qpid.filter.selector.TokenMgrError; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; @@ -42,14 +36,10 @@ import org.apache.qpid.server.exchange.topic.TopicExchangeResult; import org.apache.qpid.server.exchange.topic.TopicMatcherResult; import org.apache.qpid.server.exchange.topic.TopicNormalizer; import org.apache.qpid.server.exchange.topic.TopicParser; -import org.apache.qpid.server.filter.JMSSelectorFilter; -import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.Filterable; public class TopicExchange extends AbstractExchange { @@ -65,8 +55,6 @@ public class TopicExchange extends AbstractExchange private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>(); - private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>(); - public TopicExchange() { super(TYPE); @@ -77,7 +65,7 @@ public class TopicExchange extends AbstractExchange AMQShortString rKey = new AMQShortString(binding.getBindingKey()) ; AMQQueue queue = binding.getQueue(); FieldTable args = FieldTable.convertToFieldTable(binding.getArguments()); - + assert queue != null; assert rKey != null; @@ -91,26 +79,26 @@ public class TopicExchange extends AbstractExchange FieldTable oldArgs = _bindings.get(binding); TopicExchangeResult result = _topicExchangeResults.get(routingKey); - if(argumentsContainFilter(args)) + if(FilterSupport.argumentsContainFilter(args)) { - if(argumentsContainFilter(oldArgs)) + if(FilterSupport.argumentsContainFilter(oldArgs)) { result.replaceQueueFilter(queue, - createMessageFilter(oldArgs, queue), - createMessageFilter(args, queue)); + FilterSupport.createMessageFilter(oldArgs, queue), + FilterSupport.createMessageFilter(args, queue)); } else { - result.addFilteredQueue(queue, createMessageFilter(args, queue)); + result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue)); result.removeUnfilteredQueue(queue); } } else { - if(argumentsContainFilter(oldArgs)) + if(FilterSupport.argumentsContainFilter(oldArgs)) { result.addUnfilteredQueue(queue); - result.removeFilteredQueue(queue, createMessageFilter(oldArgs, queue)); + result.removeFilteredQueue(queue, FilterSupport.createMessageFilter(oldArgs, queue)); } else { @@ -118,7 +106,7 @@ public class TopicExchange extends AbstractExchange return; } } - + result.addBinding(binding); } @@ -129,9 +117,9 @@ public class TopicExchange extends AbstractExchange if(result == null) { result = new TopicExchangeResult(); - if(argumentsContainFilter(args)) + if(FilterSupport.argumentsContainFilter(args)) { - result.addFilteredQueue(queue, createMessageFilter(args, queue)); + result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue)); } else { @@ -142,89 +130,22 @@ public class TopicExchange extends AbstractExchange } else { - if(argumentsContainFilter(args)) + if(FilterSupport.argumentsContainFilter(args)) { - result.addFilteredQueue(queue, createMessageFilter(args, queue)); + result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue)); } else { result.addUnfilteredQueue(queue); } } - + result.addBinding(binding); _bindings.put(binding, args); } } - private MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException - { - if(argumentsContainNoLocal(args)) - { - MessageFilter filter = new NoLocalFilter(queue); - - if(argumentsContainJMSSelector(args)) - { - filter = new CompoundFilter(filter, createJMSSelectorFilter(args)); - } - return filter; - } - else - { - return createJMSSelectorFilter(args); - } - - } - - - private MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException - { - final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()); - WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString); - JMSSelectorFilter selector = null; - - if(selectorRef == null || (selector = selectorRef.get())==null) - { - try - { - selector = new JMSSelectorFilter(selectorString); - } - catch (ParseException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - catch (SelectorParsingException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - catch (TokenMgrError e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector)); - } - return selector; - } - - private static boolean argumentsContainFilter(final FieldTable args) - { - return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args); - } - - private static boolean argumentsContainNoLocal(final FieldTable args) - { - return args != null - && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue()) - && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue())); - } - - private static boolean argumentsContainJMSSelector(final FieldTable args) - { - return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) - && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0); - } - public ArrayList<BaseQueue> doRoute(InboundMessage payload) { @@ -256,87 +177,6 @@ public class TopicExchange extends AbstractExchange } - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - Binding binding = new Binding(null, routingKey.toString(), queue, this, FieldTable.convertToMap(arguments)); - - if (arguments == null) - { - return _bindings.containsKey(binding); - } - else - { - FieldTable o = _bindings.get(binding); - if (o != null) - { - return o.equals(arguments); - } - else - { - return false; - } - - } - } - - public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue) - { - Binding binding = new Binding(null, bindingKey, queue, this, arguments); - if (arguments == null) - { - return _bindings.containsKey(binding); - } - else - { - FieldTable o = _bindings.get(binding); - if (o != null) - { - return arguments.equals(FieldTable.convertToMap(o)); - } - else - { - return false; - } - } - - } - - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - return isBound(routingKey, null, queue); - } - - public boolean isBound(AMQShortString routingKey) - { - for(Binding b : _bindings.keySet()) - { - if(b.getBindingKey().equals(routingKey.toString())) - { - return true; - } - } - - return false; - } - - public boolean isBound(AMQQueue queue) - { - for(Binding b : _bindings.keySet()) - { - if(b.getQueue().equals(queue)) - { - return true; - } - } - - return false; - } - - public boolean hasBindings() - { - return !_bindings.isEmpty(); - } - private boolean deregisterQueue(final Binding binding) { if(_bindings.containsKey(binding)) @@ -344,14 +184,15 @@ public class TopicExchange extends AbstractExchange FieldTable bindingArgs = _bindings.remove(binding); AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey())); TopicExchangeResult result = _topicExchangeResults.get(bindingKey); - + result.removeBinding(binding); - - if(argumentsContainFilter(bindingArgs)) + + if(FilterSupport.argumentsContainFilter(bindingArgs)) { try { - result.removeFilteredQueue(binding.getQueue(), createMessageFilter(bindingArgs, binding.getQueue())); + result.removeFilteredQueue(binding.getQueue(), FilterSupport.createMessageFilter(bindingArgs, + binding.getQueue())); } catch (AMQInvalidArgumentException e) { @@ -418,96 +259,4 @@ public class TopicExchange extends AbstractExchange deregisterQueue(binding); } - private static final class NoLocalFilter implements MessageFilter - { - private final AMQQueue _queue; - - public NoLocalFilter(AMQQueue queue) - { - _queue = queue; - } - - public boolean matches(Filterable message) - { - InboundMessage inbound = (InboundMessage) message; - final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession(); - return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound); - - } - - @Override - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - - if (o == null || getClass() != o.getClass()) - { - return false; - } - - NoLocalFilter that = (NoLocalFilter) o; - - return _queue == null ? that._queue == null : _queue.equals(that._queue); - } - - @Override - public int hashCode() - { - return _queue != null ? _queue.hashCode() : 0; - } - } - - private static final class CompoundFilter implements MessageFilter - { - private MessageFilter _noLocalFilter; - private MessageFilter _jmsSelectorFilter; - - public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter) - { - _noLocalFilter = filter; - _jmsSelectorFilter = jmsSelectorFilter; - } - - public boolean matches(Filterable message) - { - return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message); - } - - @Override - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - if (o == null || getClass() != o.getClass()) - { - return false; - } - - CompoundFilter that = (CompoundFilter) o; - - if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null) - { - return false; - } - if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null) - { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0; - result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0); - return result; - } - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index b4eb41684d..2e6a98d81b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -159,9 +159,15 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo else { + String message = "Queue " + queueName + " not bound with routing key " + + body.getRoutingKey() + " to exchange " + exchangeName; + + if(message.length()>255) + { + message = message.substring(0,254); + } response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode - new AMQShortString("Queue " + queueName + " not bound with routing key " + - body.getRoutingKey() + " to exchange " + exchangeName)); // replyText + new AMQShortString(message)); // replyText } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index d8d245e255..110c7be50a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -1130,22 +1130,22 @@ public class ServerSessionDelegate extends SessionDelegate if(queueMatched) { - result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue)); + final boolean keyMatched = exchange.isBound(method.getBindingKey(), queue); + result.setKeyNotMatched(!keyMatched); + if(method.hasArguments() && keyMatched) + { + result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), queue)); + } } else { result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); } - if(method.hasArguments()) - { - result.setArgsNotMatched(!exchange.isBound(result.getKeyNotMatched() ? null : method.getBindingKey(), method.getArguments(), queueMatched ? queue : null)); - } - } else if (method.hasArguments()) { - result.setArgsNotMatched(!exchange.isBound(null, method.getArguments(), queueMatched ? queue : null)); + result.setArgsNotMatched(!exchange.isBound(method.getArguments(), queue)); } } @@ -1166,7 +1166,7 @@ public class ServerSessionDelegate extends SessionDelegate { if(method.hasArguments()) { - result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), null)); + result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments())); } result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); |
