summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-03-15 15:25:15 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-03-15 15:25:15 +0000
commitfe6bf2f7fb72255666af4dd4f3ea4a29b9bfb7f8 (patch)
treec1a99cd929cfa423dc789d9d4b854e47e06b7dd7 /qpid/java
parent970be2db6117c8727e7757e062c9694dda1e4542 (diff)
downloadqpid-python-fe6bf2f7fb72255666af4dd4f3ea4a29b9bfb7f8.tar.gz
QPID-6452 : Defer creating filters in default filter map until a consumer is added. Optimize filters which start at the tail of the queue.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1666810 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java20
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java46
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java18
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java30
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java35
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java43
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java85
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java52
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java58
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java33
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java13
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java8
19 files changed, 283 insertions, 186 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index fa8b52db55..727678ccc4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
@@ -58,7 +59,7 @@ public class DirectExchange extends AbstractExchange<DirectExchange>
{
private CopyOnWriteArraySet<BindingImpl> _bindings = new CopyOnWriteArraySet<BindingImpl>();
private List<BaseQueue> _unfilteredQueues = new ArrayList<BaseQueue>();
- private Map<BaseQueue, MessageFilter> _filteredQueues = new HashMap<BaseQueue, MessageFilter>();
+ private Map<BaseQueue, FilterManager> _filteredQueues = new HashMap<>();
public synchronized void addBinding(BindingImpl binding)
{
@@ -80,7 +81,7 @@ public class DirectExchange extends AbstractExchange<DirectExchange>
private void recalculateQueues()
{
List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size());
- Map<BaseQueue, MessageFilter> filteredQueues = new HashMap<BaseQueue,MessageFilter>();
+ Map<BaseQueue, FilterManager> filteredQueues = new HashMap<>();
for(BindingImpl b : _bindings)
{
@@ -89,7 +90,7 @@ public class DirectExchange extends AbstractExchange<DirectExchange>
{
try
{
- MessageFilter filter = FilterSupport.createMessageFilter(b.getArguments(), b.getAMQQueue());
+ FilterManager filter = FilterSupport.createMessageFilter(b.getArguments(), b.getAMQQueue());
filteredQueues.put(b.getAMQQueue(),filter);
}
catch (AMQInvalidArgumentException e)
@@ -129,7 +130,7 @@ public class DirectExchange extends AbstractExchange<DirectExchange>
return !_filteredQueues.isEmpty();
}
- public Map<BaseQueue,MessageFilter> getFilteredQueues()
+ public Map<BaseQueue,FilterManager> getFilteredQueues()
{
return _filteredQueues;
}
@@ -159,14 +160,15 @@ public class DirectExchange extends AbstractExchange<DirectExchange>
if(bindings.hasFilteredQueues())
{
Set<BaseQueue> queuesSet = new HashSet<BaseQueue>(queues);
+ Filterable filterable = Filterable.Factory.newInstance(payload, instanceProperties);
- Map<BaseQueue, MessageFilter> filteredQueues = bindings.getFilteredQueues();
- for(Map.Entry<BaseQueue, MessageFilter> entry : filteredQueues.entrySet())
+ Map<BaseQueue, FilterManager> filteredQueues = bindings.getFilteredQueues();
+ for(Map.Entry<BaseQueue, FilterManager> entry : filteredQueues.entrySet())
{
if(!queuesSet.contains(entry.getKey()))
{
- MessageFilter filter = entry.getValue();
- if(filter.matches(Filterable.Factory.newInstance(payload, instanceProperties)))
+ FilterManager filter = entry.getValue();
+ if(filter.allAllow(filterable))
{
queuesSet.add(entry.getKey());
}
@@ -174,7 +176,7 @@ public class DirectExchange extends AbstractExchange<DirectExchange>
}
if(queues.size() != queuesSet.size())
{
- queues = new ArrayList<BaseQueue>(queuesSet);
+ queues = new ArrayList<>(queuesSet);
}
}
return queues;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index a7265526fe..8495d648f4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -33,9 +33,9 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ManagedObject;
@@ -58,10 +58,10 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange>
private final CopyOnWriteArrayList<AMQQueue> _unfilteredQueues = new CopyOnWriteArrayList<AMQQueue>();
private final CopyOnWriteArrayList<AMQQueue> _filteredQueues = new CopyOnWriteArrayList<AMQQueue>();
- private final AtomicReference<Map<AMQQueue,Map<BindingImpl, MessageFilter>>> _filteredBindings =
- new AtomicReference<Map<AMQQueue,Map<BindingImpl, MessageFilter>>>();
+ private final AtomicReference<Map<AMQQueue,Map<BindingImpl, FilterManager>>> _filteredBindings =
+ new AtomicReference<>();
{
- Map<AMQQueue,Map<BindingImpl, MessageFilter>> emptyMap = Collections.emptyMap();
+ Map<AMQQueue,Map<BindingImpl, FilterManager>> emptyMap = Collections.emptyMap();
_filteredBindings.set(emptyMap);
}
@@ -85,17 +85,17 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange>
final ArrayList<BaseQueue> result = new ArrayList<BaseQueue>(_unfilteredQueues);
- final Map<AMQQueue, Map<BindingImpl, MessageFilter>> filteredBindings = _filteredBindings.get();
+ final Map<AMQQueue, Map<BindingImpl, FilterManager>> filteredBindings = _filteredBindings.get();
if(!_filteredQueues.isEmpty())
{
for(AMQQueue q : _filteredQueues)
{
- final Map<BindingImpl, MessageFilter> bindingMessageFilterMap = filteredBindings.get(q);
+ final Map<BindingImpl, FilterManager> bindingMessageFilterMap = filteredBindings.get(q);
if(!(bindingMessageFilterMap == null || result.contains(q)))
{
- for(MessageFilter filter : bindingMessageFilterMap.values())
+ for(FilterManager filter : bindingMessageFilterMap.values())
{
- if(filter.matches(Filterable.Factory.newInstance(payload,instanceProperties)))
+ if(filter.allAllow(Filterable.Factory.newInstance(payload, instanceProperties)))
{
result.add(q);
break;
@@ -143,12 +143,12 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange>
}
else
{
- HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings =
- new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get());
+ HashMap<AMQQueue,Map<BindingImpl, FilterManager>> filteredBindings =
+ new HashMap<>(_filteredBindings.get());
- Map<BindingImpl,MessageFilter> bindingsForQueue;
+ Map<BindingImpl,FilterManager> bindingsForQueue;
- final MessageFilter messageFilter;
+ final FilterManager messageFilter;
try
{
@@ -163,11 +163,11 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange>
if (oldArguments != null && !oldArguments.isEmpty() && FilterSupport.argumentsContainFilter(oldArguments))
{
- bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(filteredBindings.remove(binding.getAMQQueue()));
+ bindingsForQueue = new HashMap<>(filteredBindings.remove(binding.getAMQQueue()));
}
else // previously unfiltered
{
- bindingsForQueue = new HashMap<BindingImpl,MessageFilter>();
+ bindingsForQueue = new HashMap<BindingImpl,FilterManager>();
Integer oldValue = _queues.remove(queue);
if (ONE.equals(oldValue))
@@ -217,16 +217,16 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange>
try
{
- HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings =
- new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get());
+ HashMap<AMQQueue,Map<BindingImpl, FilterManager>> filteredBindings =
+ new HashMap<>(_filteredBindings.get());
- Map<BindingImpl, MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
- final MessageFilter messageFilter =
+ Map<BindingImpl, FilterManager> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
+ final FilterManager messageFilter =
FilterSupport.createMessageFilter(binding.getArguments(), binding.getAMQQueue());
if(bindingsForQueue != null)
{
- bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(bindingsForQueue);
+ bindingsForQueue = new HashMap<>(bindingsForQueue);
bindingsForQueue.put(binding, messageFilter);
}
else
@@ -278,13 +278,13 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange>
}
else // we are removing a binding with filters
{
- HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings =
- new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get());
+ HashMap<AMQQueue,Map<BindingImpl, FilterManager>> filteredBindings =
+ new HashMap<>(_filteredBindings.get());
- Map<BindingImpl,MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
+ Map<BindingImpl,FilterManager> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
if(bindingsForQueue.size()>1)
{
- bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(bindingsForQueue);
+ bindingsForQueue = new HashMap<>(bindingsForQueue);
bindingsForQueue.remove(binding);
filteredBindings.put(binding.getAMQQueue(),bindingsForQueue);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
index 4329e2e003..ab2a5195cf 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
@@ -48,7 +49,7 @@ class HeadersBinding
private final Set<String> required = new HashSet<String>();
private final Map<String,Object> matches = new HashMap<String,Object>();
private boolean matchAny;
- private MessageFilter _filter;
+ private FilterManager _filter;
/**
* Creates a header binding for a set of mappings. Those mappings whose value is
@@ -86,7 +87,8 @@ class HeadersBinding
_logger.warn("Invalid filter in binding queue '"+_binding.getAMQQueue().getName()
+"' to exchange '"+_binding.getExchange().getName()
+"' with arguments: " + _binding.getArguments());
- _filter = new MessageFilter()
+ _filter = new FilterManager();
+ _filter.add("x-exclude-all",new MessageFilter()
{
@Override
public String getName()
@@ -94,12 +96,18 @@ class HeadersBinding
return "";
}
- @Override
+ @Override
+ public boolean startAtTail()
+ {
+ return false;
+ }
+
+ @Override
public boolean matches(Filterable message)
{
return false;
}
- };
+ });
}
}
for(Map.Entry<String, Object> entry : _mappings.entrySet())
@@ -146,7 +154,7 @@ class HeadersBinding
public boolean matches(Filterable message)
{
- return matches(message.getMessageHeader()) && (_filter == null || _filter.matches(message));
+ return matches(message.getMessageHeader()) && (_filter == null || _filter.allAllow(message));
}
private boolean and(AMQMessageHeader headers)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
index 0db3e9b378..5a0d4117a1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
@@ -31,15 +31,15 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.qpid.server.binding.BindingImpl;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.queue.AMQQueue;
public final class TopicExchangeResult implements TopicMatcherResult
{
private final List<BindingImpl> _bindings = new CopyOnWriteArrayList<BindingImpl>();
private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
- private final ConcurrentMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>();
+ private final ConcurrentMap<AMQQueue, Map<FilterManager,Integer>> _filteredQueues = new ConcurrentHashMap<>();
private volatile ArrayList<AMQQueue> _unfilteredQueueList = new ArrayList<AMQQueue>(0);
public void addUnfilteredQueue(AMQQueue queue)
@@ -93,15 +93,15 @@ public final class TopicExchangeResult implements TopicMatcherResult
public List<BindingImpl> getBindings()
{
- return new ArrayList<BindingImpl>(_bindings);
+ return new ArrayList<>(_bindings);
}
- public void addFilteredQueue(AMQQueue queue, MessageFilter filter)
+ public void addFilteredQueue(AMQQueue queue, FilterManager filter)
{
- Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
+ Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
if(filters == null)
{
- filters = new ConcurrentHashMap<MessageFilter,Integer>();
+ filters = new ConcurrentHashMap<>();
_filteredQueues.put(queue, filters);
}
Integer instances = filters.get(filter);
@@ -116,9 +116,9 @@ public final class TopicExchangeResult implements TopicMatcherResult
}
- public void removeFilteredQueue(AMQQueue queue, MessageFilter filter)
+ public void removeFilteredQueue(AMQQueue queue, FilterManager filter)
{
- Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
+ Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
if(filters != null)
{
Integer instances = filters.get(filter);
@@ -143,11 +143,11 @@ public final class TopicExchangeResult implements TopicMatcherResult
}
public void replaceQueueFilter(AMQQueue queue,
- MessageFilter oldFilter,
- MessageFilter newFilter)
+ FilterManager oldFilter,
+ FilterManager newFilter)
{
- Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
- Map<MessageFilter,Integer> newFilters = new ConcurrentHashMap<MessageFilter,Integer>(filters);
+ Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
+ Map<FilterManager,Integer> newFilters = new ConcurrentHashMap<>(filters);
Integer oldFilterInstances = filters.get(oldFilter);
if(oldFilterInstances == 1)
{
@@ -190,13 +190,13 @@ public final class TopicExchangeResult implements TopicMatcherResult
queues.addAll(_unfilteredQueues.keySet());
if(!_filteredQueues.isEmpty())
{
- for(Map.Entry<AMQQueue, Map<MessageFilter, Integer>> entry : _filteredQueues.entrySet())
+ for(Map.Entry<AMQQueue, Map<FilterManager, Integer>> entry : _filteredQueues.entrySet())
{
if(!queues.contains(entry.getKey()))
{
- for(MessageFilter filter : entry.getValue().keySet())
+ for(FilterManager filter : entry.getValue().keySet())
{
- if(filter.matches(msg))
+ if(filter.allAllow(msg))
{
queues.add(entry.getKey());
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java
index dbd6a5f6f6..0cb40abc07 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java
@@ -25,10 +25,12 @@ import org.apache.qpid.common.AMQPFilterTypes;
public final class ArrivalTimeFilter implements MessageFilter
{
private final long _startingFrom;
+ private final boolean _startAtTail;
- public ArrivalTimeFilter(final long startingFrom)
+ public ArrivalTimeFilter(final long startingFrom, final boolean startAtTail)
{
_startingFrom = startingFrom;
+ _startAtTail = startAtTail;
}
@Override
@@ -38,9 +40,38 @@ public final class ArrivalTimeFilter implements MessageFilter
}
@Override
+ public boolean startAtTail()
+ {
+ return _startAtTail;
+ }
+
+ @Override
public boolean matches(final Filterable message)
{
- return message.getArrivalTime() >= _startingFrom;
+ return message.getArrivalTime() >= _startingFrom;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final ArrivalTimeFilter that = (ArrivalTimeFilter) o;
+
+ return _startingFrom == that._startingFrom;
+
}
+ @Override
+ public int hashCode()
+ {
+ return (int) (_startingFrom ^ (_startingFrom >>> 32));
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
index 8c55c8ac76..7ed4e273a5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
@@ -40,7 +40,7 @@ public final class ArrivalTimeFilterFactory implements MessageFilterFactory
String arg = arguments.get(0);
long startingFrom= Long.parseLong(arg);
- return new ArrivalTimeFilter(startingFrom);
+ return new ArrivalTimeFilter(System.currentTimeMillis() + startingFrom, startingFrom==0l);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
index ad14fa423a..764fb60ed0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
@@ -50,6 +50,18 @@ public class FilterManager
return true;
}
+ public boolean startAtTail()
+ {
+ for(MessageFilter filter : _filters.values())
+ {
+ if(filter.startAtTail())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
public Iterator<MessageFilter> filters()
{
return _filters.values().iterator();
@@ -65,11 +77,42 @@ public class FilterManager
return _filters.containsKey(name);
}
+ public boolean hasFilter(final MessageFilter filter)
+ {
+ return _filters.containsValue(filter);
+ }
+
@Override
public String toString()
{
return _filters.toString();
}
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final FilterManager that = (FilterManager) o;
+
+ if (_filters != null ? !_filters.equals(that._filters) : that._filters != null)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _filters != null ? _filters.hashCode() : 0;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
index 6b8ae2f552..a670c96ac1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
@@ -95,22 +95,25 @@ public class FilterSupport
&& ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0;
}
- public static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException
+ public static FilterManager createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException
{
+ FilterManager filterManager = null;
if(argumentsContainNoLocal(args))
{
- MessageFilter filter = new NoLocalFilter(queue);
+ filterManager = new FilterManager();
+ filterManager.add(AMQPFilterTypes.NO_LOCAL.toString(), new NoLocalFilter(queue));
+ }
- if(argumentsContainJMSSelector(args))
+ if(argumentsContainJMSSelector(args))
+ {
+ if(filterManager == null)
{
- filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
+ filterManager = new FilterManager();
}
- return filter;
- }
- else
- {
- return createJMSSelectorFilter(args);
+ filterManager.add(AMQPFilterTypes.JMS_SELECTOR.toString(),createJMSSelectorFilter(args));
}
+ return filterManager;
+
}
@PluggableService
@@ -144,6 +147,12 @@ public class FilterSupport
}
@Override
+ public boolean startAtTail()
+ {
+ return false;
+ }
+
+ @Override
public boolean equals(Object o)
{
if (this == o)
@@ -170,62 +179,4 @@ public class FilterSupport
}
- static final class CompoundFilter implements MessageFilter
- {
- private MessageFilter _noLocalFilter;
- private MessageFilter _jmsSelectorFilter;
-
- public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter)
- {
- _noLocalFilter = filter;
- _jmsSelectorFilter = jmsSelectorFilter;
- }
-
- @Override
- public String getName()
- {
- return "";
- }
-
- public boolean matches(Filterable message)
- {
- return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- CompoundFilter that = (CompoundFilter) o;
-
- if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
- {
- return false;
- }
- if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
- {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode()
- {
- int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0;
- result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
- return result;
- }
-
-
- }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
index 295f9ae074..0b430ca471 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
@@ -20,11 +20,12 @@
*/
package org.apache.qpid.server.filter;
+import org.apache.qpid.filter.FilterableMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
-public interface Filterable
+public interface Filterable extends FilterableMessage
{
AMQMessageHeader getMessageHeader();
@@ -81,6 +82,55 @@ public interface Filterable
{
return message.getArrivalTime();
}
+
+ @Override
+ public String getReplyTo()
+ {
+ return message.getMessageHeader().getReplyTo();
+ }
+
+ @Override
+ public String getType()
+ {
+ return message.getMessageHeader().getType();
+ }
+
+ @Override
+ public byte getPriority()
+ {
+ return message.getMessageHeader().getPriority();
+ }
+
+ @Override
+ public String getMessageId()
+ {
+ return message.getMessageHeader().getMessageId();
+ }
+
+ @Override
+ public long getTimestamp()
+ {
+ return message.getMessageHeader().getTimestamp();
+ }
+
+ @Override
+ public String getCorrelationId()
+ {
+ return message.getMessageHeader().getCorrelationId();
+ }
+
+ @Override
+ public long getExpiration()
+ {
+ return message.getMessageHeader().getExpiration();
+ }
+
+ @Override
+ public Object getHeader(String name)
+ {
+ return message.getMessageHeader().getHeader(name);
+ }
+
};
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
index 5a08eb3a0f..1a50fd3cf4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
@@ -60,7 +60,7 @@ public class JMSSelectorFilter implements MessageFilter
public boolean matches(Filterable message)
{
- boolean match = _matcher.matches(wrap(message));
+ boolean match = _matcher.matches(message);
if(_logger.isDebugEnabled())
{
_logger.debug(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
@@ -68,60 +68,10 @@ public class JMSSelectorFilter implements MessageFilter
return match;
}
- private FilterableMessage wrap(final Filterable message)
+ @Override
+ public boolean startAtTail()
{
- return new FilterableMessage()
- {
- public boolean isPersistent()
- {
- return message.isPersistent();
- }
-
- public boolean isRedelivered()
- {
- return message.isRedelivered();
- }
-
- public Object getHeader(String name)
- {
- return message.getMessageHeader().getHeader(name);
- }
-
- public String getReplyTo()
- {
- return message.getMessageHeader().getReplyTo();
- }
-
- public String getType()
- {
- return message.getMessageHeader().getType();
- }
-
- public byte getPriority()
- {
- return message.getMessageHeader().getPriority();
- }
-
- public String getMessageId()
- {
- return message.getMessageHeader().getMessageId();
- }
-
- public long getTimestamp()
- {
- return message.getMessageHeader().getTimestamp();
- }
-
- public String getCorrelationId()
- {
- return message.getMessageHeader().getCorrelationId();
- }
-
- public long getExpiration()
- {
- return message.getMessageHeader().getExpiration();
- }
- };
+ return false;
}
public String getSelector()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
index 226d646efd..253640b931 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
@@ -24,4 +24,6 @@ public interface MessageFilter
{
String getName();
boolean matches(Filterable message);
+ boolean startAtTail();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 486c501772..3b42014136 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -57,6 +57,7 @@ import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.filter.ArrivalTimeFilter;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.logging.EventLogger;
@@ -262,7 +263,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private final QueueRunner _queueRunner = new QueueRunner(this);
private boolean _closing;
- private final ConcurrentMap<String,MessageFilter> _defaultFiltersMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String,Task<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost)
{
@@ -462,11 +463,20 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if(filterValue.size() == 1)
{
String filterTypeName = String.valueOf(filterValue.keySet().iterator().next());
- MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName);
+ final MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName);
if(filterFactory != null)
{
- List<String> filterArguments = filterValue.values().iterator().next();
- _defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments));
+ final List<String> filterArguments = filterValue.values().iterator().next();
+ // check the arguments are valid
+ filterFactory.newInstance(filterArguments);
+ _defaultFiltersMap.put(name, new Task<MessageFilter>()
+ {
+ @Override
+ public MessageFilter execute()
+ {
+ return filterFactory.newInstance(filterArguments);
+ }
+ });
}
else
{
@@ -786,11 +796,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
filters = new FilterManager();
}
- for (Map.Entry<String,MessageFilter> filter : _defaultFiltersMap.entrySet())
+ for (Map.Entry<String,Task<MessageFilter>> filter : _defaultFiltersMap.entrySet())
{
if(!filters.hasFilter(filter.getKey()))
{
- filters.add(filter.getKey(), filter.getValue());
+ filters.add(filter.getKey(), filter.getValue().execute());
}
}
}
@@ -823,7 +833,16 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
consumer.setStateListener(this);
- consumer.setQueueContext(new QueueContext(getEntries().getHead()));
+ QueueContext queueContext;
+ if(filters == null || !filters.startAtTail())
+ {
+ queueContext = new QueueContext(getEntries().getHead());
+ }
+ else
+ {
+ queueContext = new QueueContext(getEntries().getTail());
+ }
+ consumer.setQueueContext(queueContext);
if (!isDeleted())
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
index 634aa22928..f3822bc9ca 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
@@ -160,6 +160,12 @@ public abstract class OrderedQueueEntryList implements QueueEntryList
return _head;
}
+ @Override
+ public QueueEntry getTail()
+ {
+ return _tail;
+ }
+
public void entryDeleted(QueueEntry queueEntry)
{
QueueEntry next = _head.getNextNode();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
index d8d6a2ff0b..d0b5578055 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
@@ -185,6 +185,14 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList
}
@Override
+ public PriorityQueueEntry getTail()
+ {
+ return (PriorityQueueEntry) _priorityLists[0].getTail();
+ }
+
+
+
+ @Override
public void entryDeleted(final QueueEntry queueEntry)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index d4a91f2c0b..938f170bee 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -34,6 +34,8 @@ public interface QueueEntryList
QueueEntry getHead();
+ QueueEntry getTail();
+
QueueEntry getOldestEntry();
void entryDeleted(QueueEntry queueEntry);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
index a37713c8b8..ee850455b0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
@@ -319,6 +319,19 @@ public class SortedQueueEntryList implements QueueEntryList
return _head;
}
+
+ public SortedQueueEntry getTail()
+ {
+ SortedQueueEntry current = _head;
+ SortedQueueEntry next;
+ while((next = next(current))!=null)
+ {
+ current = next;
+ }
+ return current;
+ }
+
+
@Override
public QueueEntry getOldestEntry()
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 3723d19bfe..34afcdef66 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -113,6 +113,12 @@ public class MockConsumer implements ConsumerTarget
}
@Override
+ public boolean startAtTail()
+ {
+ return false;
+ }
+
+ @Override
public boolean matches(final Filterable message)
{
final String messageId = message.getMessageHeader().getMessageId();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 25ea2430ce..75a67c6c2a 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -294,7 +294,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
filterManager = new FilterManager();
}
- MessageFilter filter = new ArrivalTimeFilter(startingFrom);
+ MessageFilter filter = new ArrivalTimeFilter(startingFrom, period == 0);
filterManager.add(filter.getName(), filter);
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 99c9f11b33..87becd955d 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -755,6 +755,12 @@ public class AMQChannel
{
return message.getConnectionReference() != connectionReference;
}
+
+ @Override
+ public boolean startAtTail()
+ {
+ return false;
+ }
};
filterManager.add(filter.getName(), filter);
}
@@ -788,7 +794,7 @@ public class AMQChannel
{
filterManager = new FilterManager();
}
- MessageFilter filter = new ArrivalTimeFilter(startingFrom);
+ MessageFilter filter = new ArrivalTimeFilter(startingFrom, period==0);
filterManager.add(filter.getName(), filter);
}