summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-06-01 19:24:36 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-06-01 19:24:36 +0000
commit3e4d1f2f56ef296ea5132511faaa8689867c499c (patch)
tree6c990d7d04cdaf07bd6dace7c157b882f8370cbf /qpid/java/broker/src/main
parentf6b68fa2e1ca27d46e6080a3568ef5d785eed548 (diff)
downloadqpid-python-3e4d1f2f56ef296ea5132511faaa8689867c499c.tar.gz
QPID-4897 : [Java Broker] Allow selectors on bindings fro non-topic exchanges
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1488561 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java99
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java127
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java190
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java258
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java41
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java105
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java289
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java16
11 files changed, 651 insertions, 505 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 0d05307cb4..58c2b33041 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -173,20 +174,106 @@ public abstract class AbstractExchange implements Exchange
return getVirtualHost().getQueueRegistry();
}
- public boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue)
+ public final boolean isBound(AMQShortString routingKey, FieldTable ft, AMQQueue queue)
{
- return isBound(new AMQShortString(bindingKey), queue);
+ return isBound(routingKey == null ? "" : routingKey.asString(), FieldTable.convertToMap(ft), queue);
}
+ public final boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue)
+ {
+ for(Binding b : _bindings)
+ {
+ if(bindingKey.equals(b.getBindingKey()) && queue == b.getQueue())
+ {
+ return (b.getArguments() == null || b.getArguments().isEmpty())
+ ? (arguments == null || arguments.isEmpty())
+ : b.getArguments().equals(arguments);
+ }
+ }
+ return false;
+ }
+
+ public final boolean isBound(AMQShortString routingKey, AMQQueue queue)
+ {
+ return isBound(routingKey==null ? "" : routingKey.asString(), queue);
+ }
+
+ public final boolean isBound(String bindingKey, AMQQueue queue)
+ {
+ for(Binding b : _bindings)
+ {
+ if(bindingKey.equals(b.getBindingKey()) && queue == b.getQueue())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public final boolean isBound(AMQShortString routingKey)
+ {
+ return isBound(routingKey == null ? "" : routingKey.asString());
+ }
+
+ public final boolean isBound(String bindingKey)
+ {
+ for(Binding b : _bindings)
+ {
+ if(bindingKey.equals(b.getBindingKey()))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public final boolean isBound(AMQQueue queue)
+ {
+ for(Binding b : _bindings)
+ {
+ if(queue == b.getQueue())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
- public boolean isBound(String bindingKey, AMQQueue queue)
+ @Override
+ public final boolean isBound(Map<String, Object> arguments, AMQQueue queue)
{
- return isBound(new AMQShortString(bindingKey), queue);
+ for(Binding b : _bindings)
+ {
+ if(queue == b.getQueue() &&
+ ((b.getArguments() == null || b.getArguments().isEmpty())
+ ? (arguments == null || arguments.isEmpty())
+ : b.getArguments().equals(arguments)))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public final boolean isBound(String bindingKey, Map<String, Object> arguments)
+ {
+ for(Binding b : _bindings)
+ {
+ if(b.getBindingKey().equals(bindingKey) &&
+ ((b.getArguments() == null || b.getArguments().isEmpty())
+ ? (arguments == null || arguments.isEmpty())
+ : b.getArguments().equals(arguments)))
+ {
+ return true;
+ }
+ }
+ return false;
}
- public boolean isBound(String bindingKey)
+ public final boolean hasBindings()
{
- return isBound(new AMQShortString(bindingKey));
+ return !_bindings.isEmpty();
}
public Exchange getAlternateExchange()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
index 4e136965a1..ccf955ed1c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -272,6 +272,18 @@ public class DefaultExchange implements Exchange
}
@Override
+ public boolean isBound(Map<String, Object> arguments, AMQQueue queue)
+ {
+ return (arguments == null || arguments.isEmpty()) && isBound(queue);
+ }
+
+ @Override
+ public boolean isBound(String bindingKey, Map<String, Object> arguments)
+ {
+ return (arguments == null || arguments.isEmpty()) && isBound(bindingKey);
+ }
+
+ @Override
public boolean isBound(String bindingKey)
{
return _virtualHost.getQueueRegistry().getQueue(bindingKey) != null;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index fc6ce15bc4..2e2a93d638 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -20,9 +20,18 @@
*/
package org.apache.qpid.server.exchange;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQInvalidArgumentException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
@@ -36,10 +45,14 @@ import java.util.concurrent.CopyOnWriteArraySet;
public class DirectExchange extends AbstractExchange
{
+
+ private static final Logger _logger = Logger.getLogger(DirectExchange.class);
+
private static final class BindingSet
{
private CopyOnWriteArraySet<Binding> _bindings = new CopyOnWriteArraySet<Binding>();
- private List<BaseQueue> _queues = new ArrayList<BaseQueue>();
+ private List<BaseQueue> _unfilteredQueues = new ArrayList<BaseQueue>();
+ private Map<BaseQueue, MessageFilter> _filteredQueues = new HashMap<BaseQueue, MessageFilter>();
public synchronized void addBinding(Binding binding)
{
@@ -56,27 +69,59 @@ public class DirectExchange extends AbstractExchange
private void recalculateQueues()
{
List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size());
+ Map<BaseQueue, MessageFilter> filteredQueues = new HashMap<BaseQueue,MessageFilter>();
for(Binding b : _bindings)
{
- if(!queues.contains(b.getQueue()))
+
+ if(FilterSupport.argumentsContainFilter(b.getArguments()))
{
- queues.add(b.getQueue());
+ try
+ {
+ MessageFilter filter = FilterSupport.createMessageFilter(b.getArguments(), b.getQueue());
+ filteredQueues.put(b.getQueue(),filter);
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ _logger.warn("Binding ignored: cannot parse filter on binding of queue '"+b.getQueue().getName()
+ + "' to exchange '" + b.getExchange().getName()
+ + "' with arguments: " + b.getArguments(), e);
+ }
+
+ }
+ else
+ {
+
+ if(!queues.contains(b.getQueue()))
+ {
+ queues.add(b.getQueue());
+ }
}
}
- _queues = queues;
+ _unfilteredQueues = queues;
+ _filteredQueues = filteredQueues;
}
- public List<BaseQueue> getQueues()
+ public List<BaseQueue> getUnfilteredQueues()
{
- return _queues;
+ return _unfilteredQueues;
}
public CopyOnWriteArraySet<Binding> getBindings()
{
return _bindings;
}
+
+ public boolean hasFilteredQueues()
+ {
+ return !_filteredQueues.isEmpty();
+ }
+
+ public Map<BaseQueue,MessageFilter> getFilteredQueues()
+ {
+ return _filteredQueues;
+ }
}
private final ConcurrentHashMap<String, BindingSet> _bindingsByKey =
@@ -98,7 +143,30 @@ public class DirectExchange extends AbstractExchange
if(bindings != null)
{
- return bindings.getQueues();
+ List<BaseQueue> queues = bindings.getUnfilteredQueues();
+
+ if(bindings.hasFilteredQueues())
+ {
+ Set<BaseQueue> queuesSet = new HashSet<BaseQueue>(queues);
+
+ Map<BaseQueue, MessageFilter> filteredQueues = bindings.getFilteredQueues();
+ for(Map.Entry<BaseQueue, MessageFilter> entry : filteredQueues.entrySet())
+ {
+ if(!queuesSet.contains(entry.getKey()))
+ {
+ MessageFilter filter = entry.getValue();
+ if(filter.matches(payload))
+ {
+ queuesSet.add(entry.getKey());
+ }
+ }
+ }
+ if(queues.size() != queuesSet.size())
+ {
+ queues = new ArrayList<BaseQueue>(queuesSet);
+ }
+ }
+ return queues;
}
else
{
@@ -106,50 +174,6 @@ public class DirectExchange extends AbstractExchange
}
-
- }
-
- public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
- {
- return isBound(routingKey,queue);
- }
-
- public boolean isBound(AMQShortString routingKey, AMQQueue queue)
- {
- String bindingKey = (routingKey == null) ? "" : routingKey.toString();
- BindingSet bindings = _bindingsByKey.get(bindingKey);
- if(bindings != null)
- {
- return bindings.getQueues().contains(queue);
- }
- return false;
-
- }
-
- public boolean isBound(AMQShortString routingKey)
- {
- String bindingKey = (routingKey == null) ? "" : routingKey.toString();
- BindingSet bindings = _bindingsByKey.get(bindingKey);
- return bindings != null && !bindings.getQueues().isEmpty();
- }
-
- public boolean isBound(AMQQueue queue)
- {
-
- for (BindingSet bindings : _bindingsByKey.values())
- {
- if(bindings.getQueues().contains(queue))
- {
- return true;
- }
-
- }
- return false;
- }
-
- public boolean hasBindings()
- {
- return !getBindings().isEmpty();
}
protected void onBind(final Binding binding)
@@ -189,5 +213,4 @@ public class DirectExchange extends AbstractExchange
}
-
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index a5a1d7f912..d483c3b29b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -145,12 +145,15 @@ public interface Exchange extends ExchangeReferrer
Collection<Binding> getBindings();
+ boolean isBound(String bindingKey);
boolean isBound(String bindingKey, AMQQueue queue);
- public boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue);
+ boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue);
- boolean isBound(String bindingKey);
+ boolean isBound(Map<String, Object> arguments, AMQQueue queue);
+
+ boolean isBound(String bindingKey, Map<String, Object> arguments);
void removeReference(ExchangeReferrer exchange);
@@ -158,6 +161,8 @@ public interface Exchange extends ExchangeReferrer
boolean hasReferrers();
+
+
public interface BindingListener
{
void bindingAdded(Exchange exchange, Binding binding);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index 6ad5eb261e..cd830d69a9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -20,11 +20,21 @@
*/
package org.apache.qpid.server.exchange;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
@@ -42,7 +52,18 @@ public class FanoutExchange extends AbstractExchange
/**
* Maps from queue name to queue instances
*/
- private final ConcurrentHashMap<AMQQueue,Integer> _queues = new ConcurrentHashMap<AMQQueue,Integer>();
+ private final Map<AMQQueue,Integer> _queues = new HashMap<AMQQueue,Integer>();
+ private final CopyOnWriteArrayList<AMQQueue> _unfilteredQueues = new CopyOnWriteArrayList<AMQQueue>();
+ private final CopyOnWriteArrayList<AMQQueue> _filteredQueues = new CopyOnWriteArrayList<AMQQueue>();
+
+ private final AtomicReference<Map<AMQQueue,Map<Binding, MessageFilter>>> _filteredBindings =
+ new AtomicReference<Map<AMQQueue,Map<Binding, MessageFilter>>>();
+ {
+ Map<AMQQueue,Map<Binding, MessageFilter>> emptyMap = Collections.emptyMap();
+ _filteredBindings.set(emptyMap);
+ }
+
+
public static final ExchangeType<FanoutExchange> TYPE = new FanoutExchangeType();
@@ -54,115 +75,150 @@ public class FanoutExchange extends AbstractExchange
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Publishing message to queue " + _queues);
- }
-
for(Binding b : getBindings())
{
b.incrementMatches();
}
- return new ArrayList<BaseQueue>(_queues.keySet());
-
- }
+ final ArrayList<BaseQueue> result = new ArrayList<BaseQueue>(_unfilteredQueues);
- public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
- {
- return isBound(routingKey, queue);
- }
- public boolean isBound(AMQShortString routingKey, AMQQueue queue)
- {
- return isBound(queue);
- }
+ final Map<AMQQueue, Map<Binding, MessageFilter>> filteredBindings = _filteredBindings.get();
+ if(!_filteredQueues.isEmpty())
+ {
+ for(AMQQueue q : _filteredQueues)
+ {
+ final Map<Binding, MessageFilter> bindingMessageFilterMap = filteredBindings.get(q);
+ if(!(bindingMessageFilterMap == null || result.contains(q)))
+ {
+ for(MessageFilter filter : bindingMessageFilterMap.values())
+ {
+ if(filter.matches(payload))
+ {
+ result.add(q);
+ break;
+ }
+ }
+ }
+ }
- public boolean isBound(AMQShortString routingKey)
- {
+ }
- return (_queues != null) && !_queues.isEmpty();
- }
- public boolean isBound(AMQQueue queue)
- {
- if (queue == null)
+ if (_logger.isDebugEnabled())
{
- return false;
+ _logger.debug("Publishing message to queue " + result);
}
- return _queues.containsKey(queue);
- }
- public boolean hasBindings()
- {
- return !_queues.isEmpty();
+ return result;
+
}
- protected void onBind(final Binding binding)
+
+ protected synchronized void onBind(final Binding binding)
{
AMQQueue queue = binding.getQueue();
assert queue != null;
+ if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments()))
+ {
- Integer oldVal;
+ Integer oldVal;
+ if(_queues.containsKey(queue))
+ {
+ _queues.put(queue,_queues.get(queue)+1);
+ }
+ else
+ {
+ _queues.put(queue, ONE);
+ _unfilteredQueues.add(queue);
+ // No longer any reason to check filters for this queue
+ _filteredQueues.remove(queue);
+ }
- if((oldVal = _queues.putIfAbsent(queue, ONE)) != null)
+ }
+ else
{
- Integer newVal = oldVal+1;
- while(!_queues.replace(queue, oldVal, newVal))
+ try
{
- oldVal = _queues.get(queue);
- if(oldVal == null)
+
+ HashMap<AMQQueue,Map<Binding, MessageFilter>> filteredBindings =
+ new HashMap<AMQQueue,Map<Binding, MessageFilter>>(_filteredBindings.get());
+
+ Map<Binding, MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getQueue());
+ final
+ MessageFilter messageFilter =
+ FilterSupport.createMessageFilter(binding.getArguments(), binding.getQueue());
+
+ if(bindingsForQueue != null)
{
- oldVal = _queues.putIfAbsent(queue, ONE);
- if(oldVal == null)
+ bindingsForQueue = new HashMap<Binding,MessageFilter>(bindingsForQueue);
+ bindingsForQueue.put(binding, messageFilter);
+ }
+ else
+ {
+ bindingsForQueue = Collections.singletonMap(binding, messageFilter);
+ if(!_unfilteredQueues.contains(queue))
{
- break;
+ _filteredQueues.add(queue);
}
}
- newVal = oldVal + 1;
+
+ filteredBindings.put(binding.getQueue(), bindingsForQueue);
+
+ _filteredBindings.set(filteredBindings);
+
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ _logger.warn("Cannoy bind queue " + queue + " to exchange this " + this + " beacuse selector cannot be parsed.", e);
+ return;
}
}
-
if (_logger.isDebugEnabled())
{
_logger.debug("Binding queue " + queue
- + " with routing key " + new AMQShortString(binding.getBindingKey()) + " to exchange " + this);
+ + " with routing key " + binding.getBindingKey() + " to exchange " + this);
}
}
- protected void onUnbind(final Binding binding)
+ protected synchronized void onUnbind(final Binding binding)
{
AMQQueue queue = binding.getQueue();
- Integer oldValue = _queues.get(queue);
-
- boolean done = false;
-
- while(!(done || oldValue == null))
+ if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments()))
{
- while(!(done || oldValue == null) && oldValue.intValue() == 1)
+ Integer oldValue = _queues.remove(queue);
+ if(ONE.equals(oldValue))
{
- if(!_queues.remove(queue, oldValue))
+ // should start checking filters for this queue
+ if(_filteredBindings.get().containsKey(queue))
{
- oldValue = _queues.get(queue);
- }
- else
- {
- done = true;
+ _filteredQueues.add(queue);
}
+ _unfilteredQueues.remove(queue);
}
- while(!(done || oldValue == null) && oldValue.intValue() != 1)
+ else
{
- Integer newValue = oldValue - 1;
- if(!_queues.replace(queue, oldValue, newValue))
- {
- oldValue = _queues.get(queue);
- }
- else
- {
- done = true;
- }
+ _queues.put(queue,oldValue-1);
}
}
+ else // we are removing a binding with filters
+ {
+ HashMap<AMQQueue,Map<Binding, MessageFilter>> filteredBindings =
+ new HashMap<AMQQueue,Map<Binding, MessageFilter>>(_filteredBindings.get());
+
+ Map<Binding,MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getQueue());
+ if(bindingsForQueue.size()>1)
+ {
+ bindingsForQueue = new HashMap<Binding,MessageFilter>(bindingsForQueue);
+ bindingsForQueue.remove(binding);
+ filteredBindings.put(binding.getQueue(),bindingsForQueue);
+ }
+ else
+ {
+ _filteredQueues.remove(queue);
+ }
+ _filteredBindings.set(filteredBindings);
+
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
new file mode 100644
index 0000000000..880c9a2cf6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.exchange;
+
+import java.lang.ref.WeakReference;
+import java.util.Collections;
+import java.util.Map;
+import java.util.WeakHashMap;
+import org.apache.qpid.AMQInvalidArgumentException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.SelectorParsingException;
+import org.apache.qpid.filter.selector.ParseException;
+import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.filter.MessageFilter;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.Filterable;
+
+public class FilterSupport
+{
+ private static final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache =
+ Collections.synchronizedMap(new WeakHashMap<String, WeakReference<JMSSelectorFilter>>());
+
+ static MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException
+ {
+ final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
+ return getMessageFilter(selectorString);
+ }
+
+
+ static MessageFilter createJMSSelectorFilter(Map<String, Object> args) throws AMQInvalidArgumentException
+ {
+ final String selectorString = (String) args.get(AMQPFilterTypes.JMS_SELECTOR.getValue());
+ return getMessageFilter(selectorString);
+ }
+
+
+ private static MessageFilter getMessageFilter(String selectorString) throws AMQInvalidArgumentException
+ {
+ WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString);
+ JMSSelectorFilter selector = null;
+
+ if(selectorRef == null || (selector = selectorRef.get())==null)
+ {
+ try
+ {
+ selector = new JMSSelectorFilter(selectorString);
+ }
+ catch (ParseException e)
+ {
+ throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
+ }
+ catch (SelectorParsingException e)
+ {
+ throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
+ }
+ catch (TokenMgrError e)
+ {
+ throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
+ }
+ _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector));
+ }
+ return selector;
+ }
+
+ static boolean argumentsContainFilter(final FieldTable args)
+ {
+ return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
+ }
+
+
+ static boolean argumentsContainFilter(final Map<String, Object> args)
+ {
+ return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
+ }
+
+
+ static boolean argumentsContainNoLocal(final Map<String, Object> args)
+ {
+ return args != null
+ && args.containsKey(AMQPFilterTypes.NO_LOCAL.toString())
+ && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.toString()));
+ }
+
+
+ static boolean argumentsContainNoLocal(final FieldTable args)
+ {
+ return args != null
+ && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue())
+ && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue()));
+ }
+
+
+ static boolean argumentsContainJMSSelector(final Map<String,Object> args)
+ {
+ return args != null && (args.get(AMQPFilterTypes.JMS_SELECTOR.toString()) instanceof String)
+ && ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0;
+ }
+
+
+ static boolean argumentsContainJMSSelector(final FieldTable args)
+ {
+ return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())
+ && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0);
+ }
+
+
+ static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException
+ {
+ if(argumentsContainNoLocal(args))
+ {
+ MessageFilter filter = new NoLocalFilter(queue);
+
+ if(argumentsContainJMSSelector(args))
+ {
+ filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
+ }
+ return filter;
+ }
+ else
+ {
+ return createJMSSelectorFilter(args);
+ }
+ }
+
+ static MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
+ {
+ if(argumentsContainNoLocal(args))
+ {
+ MessageFilter filter = new NoLocalFilter(queue);
+
+ if(argumentsContainJMSSelector(args))
+ {
+ filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
+ }
+ return filter;
+ }
+ else
+ {
+ return createJMSSelectorFilter(args);
+ }
+ }
+
+ static final class NoLocalFilter implements MessageFilter
+ {
+ private final AMQQueue _queue;
+
+ public NoLocalFilter(AMQQueue queue)
+ {
+ _queue = queue;
+ }
+
+ public boolean matches(Filterable message)
+ {
+ InboundMessage inbound = (InboundMessage) message;
+ final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession();
+ return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound);
+
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ NoLocalFilter that = (NoLocalFilter) o;
+
+ return _queue == null ? that._queue == null : _queue.equals(that._queue);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _queue != null ? _queue.hashCode() : 0;
+ }
+ }
+
+ static final class CompoundFilter implements MessageFilter
+ {
+ private MessageFilter _noLocalFilter;
+ private MessageFilter _jmsSelectorFilter;
+
+ public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter)
+ {
+ _noLocalFilter = filter;
+ _jmsSelectorFilter = jmsSelectorFilter;
+ }
+
+ public boolean matches(Filterable message)
+ {
+ return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ CompoundFilter that = (CompoundFilter) o;
+
+ if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
+ {
+ return false;
+ }
+ if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0;
+ result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
+ return result;
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
index b6f5f973f4..eb4a84a5b9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
@@ -22,15 +22,19 @@ package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.framing.AMQTypedValue;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.AMQMessageHeader;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.queue.Filterable;
/**
* Defines binding and matching based on a set of headers.
@@ -44,13 +48,14 @@ class HeadersBinding
private final Set<String> required = new HashSet<String>();
private final Map<String,Object> matches = new HashMap<String,Object>();
private boolean matchAny;
+ private MessageFilter _filter;
/**
* Creates a header binding for a set of mappings. Those mappings whose value is
* null or the empty string are assumed only to be required headers, with
* no constraint on the value. Those with a non-null value are assumed to
* define a required match of value.
- *
+ *
* @param binding the binding to create a header binding using
*/
public HeadersBinding(Binding binding)
@@ -66,9 +71,30 @@ class HeadersBinding
_mappings = null;
}
}
-
+
private void initMappings()
{
+ if(FilterSupport.argumentsContainFilter(_mappings))
+ {
+ try
+ {
+ _filter = FilterSupport.createMessageFilter(_mappings,_binding.getQueue());
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ _logger.warn("Invalid filter in binding queue '"+_binding.getQueue().getName()
+ +"' to exchange '"+_binding.getExchange().getName()
+ +"' with arguments: " + _binding.getArguments());
+ _filter = new MessageFilter()
+ {
+ @Override
+ public boolean matches(Filterable message)
+ {
+ return false;
+ }
+ };
+ }
+ }
for(Map.Entry<String, Object> entry : _mappings.entrySet())
{
String propertyName = entry.getKey();
@@ -87,7 +113,7 @@ class HeadersBinding
}
}
}
-
+
public Binding getBinding()
{
return _binding;
@@ -111,6 +137,11 @@ class HeadersBinding
}
}
+ public boolean matches(InboundMessage message)
+ {
+ return matches(message.getMessageHeader()) && (_filter == null || _filter.matches(message));
+ }
+
private boolean and(AMQMessageHeader headers)
{
if(headers.containsHeaders(required))
@@ -215,7 +246,7 @@ class HeadersBinding
{
return key.startsWith("X-") || key.startsWith("x-");
}
-
+
@Override
public boolean equals(final Object o)
{
@@ -250,4 +281,4 @@ class HeadersBinding
return true;
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 746c8ac6bc..9fb745d553 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -69,14 +69,14 @@ public class HeadersExchange extends AbstractExchange
{
private static final Logger _logger = Logger.getLogger(HeadersExchange.class);
-
+
private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>> _bindingsByKey =
new ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>();
-
+
private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers =
new CopyOnWriteArrayList<HeadersBinding>();
-
+
public static final ExchangeType<HeadersExchange> TYPE = new HeadersExchangeType();
public HeadersExchange()
@@ -87,112 +87,31 @@ public class HeadersExchange extends AbstractExchange
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
- AMQMessageHeader header = payload.getMessageHeader();
if (_logger.isDebugEnabled())
{
- _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + header);
+ _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + payload.getMessageHeader());
}
-
+
LinkedHashSet<BaseQueue> queues = new LinkedHashSet<BaseQueue>();
-
+
for (HeadersBinding hb : _bindingHeaderMatchers)
{
- if (hb.matches(header))
+ if (hb.matches(payload))
{
Binding b = hb.getBinding();
-
+
b.incrementMatches();
-
+
if (_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getNameShortString() + ": delivering message with headers " +
- header + " to " + b.getQueue().getNameShortString());
+ payload.getMessageHeader() + " to " + b.getQueue().getNameShortString());
}
queues.add(b.getQueue());
}
}
-
- return new ArrayList<BaseQueue>(queues);
- }
-
-
- public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue)
- {
- CopyOnWriteArraySet<Binding> bindings;
- if(bindingKey == null)
- {
- bindings = new CopyOnWriteArraySet<Binding>(getBindings());
- }
- else
- {
- bindings = _bindingsByKey.get(bindingKey);
- }
-
- if(bindings != null)
- {
- for(Binding binding : bindings)
- {
- if(queue == null || binding.getQueue().equals(queue))
- {
- return arguments == null ? binding.getArguments() == null : binding.getArguments().equals(arguments);
- }
- }
- }
-
- return false;
- }
-
- public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
- {
- //fixme isBound here should take the arguements in to consideration.
- return isBound(routingKey, queue);
- }
-
- public boolean isBound(AMQShortString routingKey, AMQQueue queue)
- {
- String bindingKey = (routingKey == null) ? "" : routingKey.toString();
- CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
-
- if(bindings != null)
- {
- for(Binding binding : bindings)
- {
- if(binding.getQueue().equals(queue))
- {
- return true;
- }
- }
- }
-
- return false;
- }
-
- public boolean isBound(AMQShortString routingKey)
- {
- String bindingKey = (routingKey == null) ? "" : routingKey.toString();
- CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
- return bindings != null && !bindings.isEmpty();
- }
-
- public boolean isBound(AMQQueue queue)
- {
- for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values())
- {
- for(Binding binding : bindings)
- {
- if(binding.getQueue().equals(queue))
- {
- return true;
- }
- }
- }
-
- return false;
- }
- public boolean hasBindings()
- {
- return !getBindings().isEmpty();
+ return new ArrayList<BaseQueue>(queues);
}
protected void onBind(final Binding binding)
@@ -216,7 +135,7 @@ public class HeadersExchange extends AbstractExchange
bindings = newBindings;
}
}
-
+
if(_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() +
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index 6d548be508..9d41856dc0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -20,21 +20,15 @@
*/
package org.apache.qpid.server.exchange;
-import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQInvalidArgumentException;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.filter.SelectorParsingException;
-import org.apache.qpid.filter.selector.ParseException;
-import org.apache.qpid.filter.selector.TokenMgrError;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
@@ -42,14 +36,10 @@ import org.apache.qpid.server.exchange.topic.TopicExchangeResult;
import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
import org.apache.qpid.server.exchange.topic.TopicNormalizer;
import org.apache.qpid.server.exchange.topic.TopicParser;
-import org.apache.qpid.server.filter.JMSSelectorFilter;
-import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.Filterable;
public class TopicExchange extends AbstractExchange
{
@@ -65,8 +55,6 @@ public class TopicExchange extends AbstractExchange
private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
- private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>();
-
public TopicExchange()
{
super(TYPE);
@@ -77,7 +65,7 @@ public class TopicExchange extends AbstractExchange
AMQShortString rKey = new AMQShortString(binding.getBindingKey()) ;
AMQQueue queue = binding.getQueue();
FieldTable args = FieldTable.convertToFieldTable(binding.getArguments());
-
+
assert queue != null;
assert rKey != null;
@@ -91,26 +79,26 @@ public class TopicExchange extends AbstractExchange
FieldTable oldArgs = _bindings.get(binding);
TopicExchangeResult result = _topicExchangeResults.get(routingKey);
- if(argumentsContainFilter(args))
+ if(FilterSupport.argumentsContainFilter(args))
{
- if(argumentsContainFilter(oldArgs))
+ if(FilterSupport.argumentsContainFilter(oldArgs))
{
result.replaceQueueFilter(queue,
- createMessageFilter(oldArgs, queue),
- createMessageFilter(args, queue));
+ FilterSupport.createMessageFilter(oldArgs, queue),
+ FilterSupport.createMessageFilter(args, queue));
}
else
{
- result.addFilteredQueue(queue, createMessageFilter(args, queue));
+ result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
result.removeUnfilteredQueue(queue);
}
}
else
{
- if(argumentsContainFilter(oldArgs))
+ if(FilterSupport.argumentsContainFilter(oldArgs))
{
result.addUnfilteredQueue(queue);
- result.removeFilteredQueue(queue, createMessageFilter(oldArgs, queue));
+ result.removeFilteredQueue(queue, FilterSupport.createMessageFilter(oldArgs, queue));
}
else
{
@@ -118,7 +106,7 @@ public class TopicExchange extends AbstractExchange
return;
}
}
-
+
result.addBinding(binding);
}
@@ -129,9 +117,9 @@ public class TopicExchange extends AbstractExchange
if(result == null)
{
result = new TopicExchangeResult();
- if(argumentsContainFilter(args))
+ if(FilterSupport.argumentsContainFilter(args))
{
- result.addFilteredQueue(queue, createMessageFilter(args, queue));
+ result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
}
else
{
@@ -142,89 +130,22 @@ public class TopicExchange extends AbstractExchange
}
else
{
- if(argumentsContainFilter(args))
+ if(FilterSupport.argumentsContainFilter(args))
{
- result.addFilteredQueue(queue, createMessageFilter(args, queue));
+ result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
}
else
{
result.addUnfilteredQueue(queue);
}
}
-
+
result.addBinding(binding);
_bindings.put(binding, args);
}
}
- private MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
- {
- if(argumentsContainNoLocal(args))
- {
- MessageFilter filter = new NoLocalFilter(queue);
-
- if(argumentsContainJMSSelector(args))
- {
- filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
- }
- return filter;
- }
- else
- {
- return createJMSSelectorFilter(args);
- }
-
- }
-
-
- private MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException
- {
- final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
- WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString);
- JMSSelectorFilter selector = null;
-
- if(selectorRef == null || (selector = selectorRef.get())==null)
- {
- try
- {
- selector = new JMSSelectorFilter(selectorString);
- }
- catch (ParseException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
- }
- catch (SelectorParsingException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
- }
- catch (TokenMgrError e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
- }
- _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector));
- }
- return selector;
- }
-
- private static boolean argumentsContainFilter(final FieldTable args)
- {
- return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
- }
-
- private static boolean argumentsContainNoLocal(final FieldTable args)
- {
- return args != null
- && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue())
- && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue()));
- }
-
- private static boolean argumentsContainJMSSelector(final FieldTable args)
- {
- return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())
- && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0);
- }
-
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
@@ -256,87 +177,6 @@ public class TopicExchange extends AbstractExchange
}
- public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
- {
- Binding binding = new Binding(null, routingKey.toString(), queue, this, FieldTable.convertToMap(arguments));
-
- if (arguments == null)
- {
- return _bindings.containsKey(binding);
- }
- else
- {
- FieldTable o = _bindings.get(binding);
- if (o != null)
- {
- return o.equals(arguments);
- }
- else
- {
- return false;
- }
-
- }
- }
-
- public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue)
- {
- Binding binding = new Binding(null, bindingKey, queue, this, arguments);
- if (arguments == null)
- {
- return _bindings.containsKey(binding);
- }
- else
- {
- FieldTable o = _bindings.get(binding);
- if (o != null)
- {
- return arguments.equals(FieldTable.convertToMap(o));
- }
- else
- {
- return false;
- }
- }
-
- }
-
- public boolean isBound(AMQShortString routingKey, AMQQueue queue)
- {
- return isBound(routingKey, null, queue);
- }
-
- public boolean isBound(AMQShortString routingKey)
- {
- for(Binding b : _bindings.keySet())
- {
- if(b.getBindingKey().equals(routingKey.toString()))
- {
- return true;
- }
- }
-
- return false;
- }
-
- public boolean isBound(AMQQueue queue)
- {
- for(Binding b : _bindings.keySet())
- {
- if(b.getQueue().equals(queue))
- {
- return true;
- }
- }
-
- return false;
- }
-
- public boolean hasBindings()
- {
- return !_bindings.isEmpty();
- }
-
private boolean deregisterQueue(final Binding binding)
{
if(_bindings.containsKey(binding))
@@ -344,14 +184,15 @@ public class TopicExchange extends AbstractExchange
FieldTable bindingArgs = _bindings.remove(binding);
AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
-
+
result.removeBinding(binding);
-
- if(argumentsContainFilter(bindingArgs))
+
+ if(FilterSupport.argumentsContainFilter(bindingArgs))
{
try
{
- result.removeFilteredQueue(binding.getQueue(), createMessageFilter(bindingArgs, binding.getQueue()));
+ result.removeFilteredQueue(binding.getQueue(), FilterSupport.createMessageFilter(bindingArgs,
+ binding.getQueue()));
}
catch (AMQInvalidArgumentException e)
{
@@ -418,96 +259,4 @@ public class TopicExchange extends AbstractExchange
deregisterQueue(binding);
}
- private static final class NoLocalFilter implements MessageFilter
- {
- private final AMQQueue _queue;
-
- public NoLocalFilter(AMQQueue queue)
- {
- _queue = queue;
- }
-
- public boolean matches(Filterable message)
- {
- InboundMessage inbound = (InboundMessage) message;
- final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession();
- return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound);
-
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- {
- return true;
- }
-
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- NoLocalFilter that = (NoLocalFilter) o;
-
- return _queue == null ? that._queue == null : _queue.equals(that._queue);
- }
-
- @Override
- public int hashCode()
- {
- return _queue != null ? _queue.hashCode() : 0;
- }
- }
-
- private static final class CompoundFilter implements MessageFilter
- {
- private MessageFilter _noLocalFilter;
- private MessageFilter _jmsSelectorFilter;
-
- public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter)
- {
- _noLocalFilter = filter;
- _jmsSelectorFilter = jmsSelectorFilter;
- }
-
- public boolean matches(Filterable message)
- {
- return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- CompoundFilter that = (CompoundFilter) o;
-
- if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
- {
- return false;
- }
- if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
- {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode()
- {
- int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0;
- result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
- return result;
- }
- }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
index b4eb41684d..2e6a98d81b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -159,9 +159,15 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
else
{
+ String message = "Queue " + queueName + " not bound with routing key " +
+ body.getRoutingKey() + " to exchange " + exchangeName;
+
+ if(message.length()>255)
+ {
+ message = message.substring(0,254);
+ }
response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
- new AMQShortString("Queue " + queueName + " not bound with routing key " +
- body.getRoutingKey() + " to exchange " + exchangeName)); // replyText
+ new AMQShortString(message)); // replyText
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index d8d245e255..110c7be50a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -1130,22 +1130,22 @@ public class ServerSessionDelegate extends SessionDelegate
if(queueMatched)
{
- result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue));
+ final boolean keyMatched = exchange.isBound(method.getBindingKey(), queue);
+ result.setKeyNotMatched(!keyMatched);
+ if(method.hasArguments() && keyMatched)
+ {
+ result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), queue));
+ }
}
else
{
result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
}
- if(method.hasArguments())
- {
- result.setArgsNotMatched(!exchange.isBound(result.getKeyNotMatched() ? null : method.getBindingKey(), method.getArguments(), queueMatched ? queue : null));
- }
-
}
else if (method.hasArguments())
{
- result.setArgsNotMatched(!exchange.isBound(null, method.getArguments(), queueMatched ? queue : null));
+ result.setArgsNotMatched(!exchange.isBound(method.getArguments(), queue));
}
}
@@ -1166,7 +1166,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
if(method.hasArguments())
{
- result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), null));
+ result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments()));
}
result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));