diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-03-15 15:25:15 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-03-15 15:25:15 +0000 |
| commit | fe6bf2f7fb72255666af4dd4f3ea4a29b9bfb7f8 (patch) | |
| tree | c1a99cd929cfa423dc789d9d4b854e47e06b7dd7 /qpid/java | |
| parent | 970be2db6117c8727e7757e062c9694dda1e4542 (diff) | |
| download | qpid-python-fe6bf2f7fb72255666af4dd4f3ea4a29b9bfb7f8.tar.gz | |
QPID-6452 : Defer creating filters in default filter map until a consumer is added. Optimize filters which start at the tail of the queue.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1666810 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
19 files changed, 283 insertions, 186 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index fa8b52db55..727678ccc4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; @@ -58,7 +59,7 @@ public class DirectExchange extends AbstractExchange<DirectExchange> { private CopyOnWriteArraySet<BindingImpl> _bindings = new CopyOnWriteArraySet<BindingImpl>(); private List<BaseQueue> _unfilteredQueues = new ArrayList<BaseQueue>(); - private Map<BaseQueue, MessageFilter> _filteredQueues = new HashMap<BaseQueue, MessageFilter>(); + private Map<BaseQueue, FilterManager> _filteredQueues = new HashMap<>(); public synchronized void addBinding(BindingImpl binding) { @@ -80,7 +81,7 @@ public class DirectExchange extends AbstractExchange<DirectExchange> private void recalculateQueues() { List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size()); - Map<BaseQueue, MessageFilter> filteredQueues = new HashMap<BaseQueue,MessageFilter>(); + Map<BaseQueue, FilterManager> filteredQueues = new HashMap<>(); for(BindingImpl b : _bindings) { @@ -89,7 +90,7 @@ public class DirectExchange extends AbstractExchange<DirectExchange> { try { - MessageFilter filter = FilterSupport.createMessageFilter(b.getArguments(), b.getAMQQueue()); + FilterManager filter = FilterSupport.createMessageFilter(b.getArguments(), b.getAMQQueue()); filteredQueues.put(b.getAMQQueue(),filter); } catch (AMQInvalidArgumentException e) @@ -129,7 +130,7 @@ public class DirectExchange extends AbstractExchange<DirectExchange> return !_filteredQueues.isEmpty(); } - public Map<BaseQueue,MessageFilter> getFilteredQueues() + public Map<BaseQueue,FilterManager> getFilteredQueues() { return _filteredQueues; } @@ -159,14 +160,15 @@ public class DirectExchange extends AbstractExchange<DirectExchange> if(bindings.hasFilteredQueues()) { Set<BaseQueue> queuesSet = new HashSet<BaseQueue>(queues); + Filterable filterable = Filterable.Factory.newInstance(payload, instanceProperties); - Map<BaseQueue, MessageFilter> filteredQueues = bindings.getFilteredQueues(); - for(Map.Entry<BaseQueue, MessageFilter> entry : filteredQueues.entrySet()) + Map<BaseQueue, FilterManager> filteredQueues = bindings.getFilteredQueues(); + for(Map.Entry<BaseQueue, FilterManager> entry : filteredQueues.entrySet()) { if(!queuesSet.contains(entry.getKey())) { - MessageFilter filter = entry.getValue(); - if(filter.matches(Filterable.Factory.newInstance(payload, instanceProperties))) + FilterManager filter = entry.getValue(); + if(filter.allAllow(filterable)) { queuesSet.add(entry.getKey()); } @@ -174,7 +176,7 @@ public class DirectExchange extends AbstractExchange<DirectExchange> } if(queues.size() != queuesSet.size()) { - queues = new ArrayList<BaseQueue>(queuesSet); + queues = new ArrayList<>(queuesSet); } } return queues; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index a7265526fe..8495d648f4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -33,9 +33,9 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.filter.Filterable; -import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.ManagedObject; @@ -58,10 +58,10 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange> private final CopyOnWriteArrayList<AMQQueue> _unfilteredQueues = new CopyOnWriteArrayList<AMQQueue>(); private final CopyOnWriteArrayList<AMQQueue> _filteredQueues = new CopyOnWriteArrayList<AMQQueue>(); - private final AtomicReference<Map<AMQQueue,Map<BindingImpl, MessageFilter>>> _filteredBindings = - new AtomicReference<Map<AMQQueue,Map<BindingImpl, MessageFilter>>>(); + private final AtomicReference<Map<AMQQueue,Map<BindingImpl, FilterManager>>> _filteredBindings = + new AtomicReference<>(); { - Map<AMQQueue,Map<BindingImpl, MessageFilter>> emptyMap = Collections.emptyMap(); + Map<AMQQueue,Map<BindingImpl, FilterManager>> emptyMap = Collections.emptyMap(); _filteredBindings.set(emptyMap); } @@ -85,17 +85,17 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange> final ArrayList<BaseQueue> result = new ArrayList<BaseQueue>(_unfilteredQueues); - final Map<AMQQueue, Map<BindingImpl, MessageFilter>> filteredBindings = _filteredBindings.get(); + final Map<AMQQueue, Map<BindingImpl, FilterManager>> filteredBindings = _filteredBindings.get(); if(!_filteredQueues.isEmpty()) { for(AMQQueue q : _filteredQueues) { - final Map<BindingImpl, MessageFilter> bindingMessageFilterMap = filteredBindings.get(q); + final Map<BindingImpl, FilterManager> bindingMessageFilterMap = filteredBindings.get(q); if(!(bindingMessageFilterMap == null || result.contains(q))) { - for(MessageFilter filter : bindingMessageFilterMap.values()) + for(FilterManager filter : bindingMessageFilterMap.values()) { - if(filter.matches(Filterable.Factory.newInstance(payload,instanceProperties))) + if(filter.allAllow(Filterable.Factory.newInstance(payload, instanceProperties))) { result.add(q); break; @@ -143,12 +143,12 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange> } else { - HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings = - new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get()); + HashMap<AMQQueue,Map<BindingImpl, FilterManager>> filteredBindings = + new HashMap<>(_filteredBindings.get()); - Map<BindingImpl,MessageFilter> bindingsForQueue; + Map<BindingImpl,FilterManager> bindingsForQueue; - final MessageFilter messageFilter; + final FilterManager messageFilter; try { @@ -163,11 +163,11 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange> if (oldArguments != null && !oldArguments.isEmpty() && FilterSupport.argumentsContainFilter(oldArguments)) { - bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(filteredBindings.remove(binding.getAMQQueue())); + bindingsForQueue = new HashMap<>(filteredBindings.remove(binding.getAMQQueue())); } else // previously unfiltered { - bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(); + bindingsForQueue = new HashMap<BindingImpl,FilterManager>(); Integer oldValue = _queues.remove(queue); if (ONE.equals(oldValue)) @@ -217,16 +217,16 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange> try { - HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings = - new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get()); + HashMap<AMQQueue,Map<BindingImpl, FilterManager>> filteredBindings = + new HashMap<>(_filteredBindings.get()); - Map<BindingImpl, MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue()); - final MessageFilter messageFilter = + Map<BindingImpl, FilterManager> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue()); + final FilterManager messageFilter = FilterSupport.createMessageFilter(binding.getArguments(), binding.getAMQQueue()); if(bindingsForQueue != null) { - bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(bindingsForQueue); + bindingsForQueue = new HashMap<>(bindingsForQueue); bindingsForQueue.put(binding, messageFilter); } else @@ -278,13 +278,13 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange> } else // we are removing a binding with filters { - HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings = - new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get()); + HashMap<AMQQueue,Map<BindingImpl, FilterManager>> filteredBindings = + new HashMap<>(_filteredBindings.get()); - Map<BindingImpl,MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue()); + Map<BindingImpl,FilterManager> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue()); if(bindingsForQueue.size()>1) { - bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(bindingsForQueue); + bindingsForQueue = new HashMap<>(bindingsForQueue); bindingsForQueue.remove(binding); filteredBindings.put(binding.getAMQQueue(),bindingsForQueue); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java index 4329e2e003..ab2a5195cf 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; @@ -48,7 +49,7 @@ class HeadersBinding private final Set<String> required = new HashSet<String>(); private final Map<String,Object> matches = new HashMap<String,Object>(); private boolean matchAny; - private MessageFilter _filter; + private FilterManager _filter; /** * Creates a header binding for a set of mappings. Those mappings whose value is @@ -86,7 +87,8 @@ class HeadersBinding _logger.warn("Invalid filter in binding queue '"+_binding.getAMQQueue().getName() +"' to exchange '"+_binding.getExchange().getName() +"' with arguments: " + _binding.getArguments()); - _filter = new MessageFilter() + _filter = new FilterManager(); + _filter.add("x-exclude-all",new MessageFilter() { @Override public String getName() @@ -94,12 +96,18 @@ class HeadersBinding return ""; } - @Override + @Override + public boolean startAtTail() + { + return false; + } + + @Override public boolean matches(Filterable message) { return false; } - }; + }); } } for(Map.Entry<String, Object> entry : _mappings.entrySet()) @@ -146,7 +154,7 @@ class HeadersBinding public boolean matches(Filterable message) { - return matches(message.getMessageHeader()) && (_filter == null || _filter.matches(message)); + return matches(message.getMessageHeader()) && (_filter == null || _filter.allAllow(message)); } private boolean and(AMQMessageHeader headers) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java index 0db3e9b378..5a0d4117a1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java @@ -31,15 +31,15 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.qpid.server.binding.BindingImpl; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.Filterable; -import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.queue.AMQQueue; public final class TopicExchangeResult implements TopicMatcherResult { private final List<BindingImpl> _bindings = new CopyOnWriteArrayList<BindingImpl>(); private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>(); - private final ConcurrentMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>(); + private final ConcurrentMap<AMQQueue, Map<FilterManager,Integer>> _filteredQueues = new ConcurrentHashMap<>(); private volatile ArrayList<AMQQueue> _unfilteredQueueList = new ArrayList<AMQQueue>(0); public void addUnfilteredQueue(AMQQueue queue) @@ -93,15 +93,15 @@ public final class TopicExchangeResult implements TopicMatcherResult public List<BindingImpl> getBindings() { - return new ArrayList<BindingImpl>(_bindings); + return new ArrayList<>(_bindings); } - public void addFilteredQueue(AMQQueue queue, MessageFilter filter) + public void addFilteredQueue(AMQQueue queue, FilterManager filter) { - Map<MessageFilter,Integer> filters = _filteredQueues.get(queue); + Map<FilterManager,Integer> filters = _filteredQueues.get(queue); if(filters == null) { - filters = new ConcurrentHashMap<MessageFilter,Integer>(); + filters = new ConcurrentHashMap<>(); _filteredQueues.put(queue, filters); } Integer instances = filters.get(filter); @@ -116,9 +116,9 @@ public final class TopicExchangeResult implements TopicMatcherResult } - public void removeFilteredQueue(AMQQueue queue, MessageFilter filter) + public void removeFilteredQueue(AMQQueue queue, FilterManager filter) { - Map<MessageFilter,Integer> filters = _filteredQueues.get(queue); + Map<FilterManager,Integer> filters = _filteredQueues.get(queue); if(filters != null) { Integer instances = filters.get(filter); @@ -143,11 +143,11 @@ public final class TopicExchangeResult implements TopicMatcherResult } public void replaceQueueFilter(AMQQueue queue, - MessageFilter oldFilter, - MessageFilter newFilter) + FilterManager oldFilter, + FilterManager newFilter) { - Map<MessageFilter,Integer> filters = _filteredQueues.get(queue); - Map<MessageFilter,Integer> newFilters = new ConcurrentHashMap<MessageFilter,Integer>(filters); + Map<FilterManager,Integer> filters = _filteredQueues.get(queue); + Map<FilterManager,Integer> newFilters = new ConcurrentHashMap<>(filters); Integer oldFilterInstances = filters.get(oldFilter); if(oldFilterInstances == 1) { @@ -190,13 +190,13 @@ public final class TopicExchangeResult implements TopicMatcherResult queues.addAll(_unfilteredQueues.keySet()); if(!_filteredQueues.isEmpty()) { - for(Map.Entry<AMQQueue, Map<MessageFilter, Integer>> entry : _filteredQueues.entrySet()) + for(Map.Entry<AMQQueue, Map<FilterManager, Integer>> entry : _filteredQueues.entrySet()) { if(!queues.contains(entry.getKey())) { - for(MessageFilter filter : entry.getValue().keySet()) + for(FilterManager filter : entry.getValue().keySet()) { - if(filter.matches(msg)) + if(filter.allAllow(msg)) { queues.add(entry.getKey()); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java index dbd6a5f6f6..0cb40abc07 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java @@ -25,10 +25,12 @@ import org.apache.qpid.common.AMQPFilterTypes; public final class ArrivalTimeFilter implements MessageFilter { private final long _startingFrom; + private final boolean _startAtTail; - public ArrivalTimeFilter(final long startingFrom) + public ArrivalTimeFilter(final long startingFrom, final boolean startAtTail) { _startingFrom = startingFrom; + _startAtTail = startAtTail; } @Override @@ -38,9 +40,38 @@ public final class ArrivalTimeFilter implements MessageFilter } @Override + public boolean startAtTail() + { + return _startAtTail; + } + + @Override public boolean matches(final Filterable message) { - return message.getArrivalTime() >= _startingFrom; + return message.getArrivalTime() >= _startingFrom; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final ArrivalTimeFilter that = (ArrivalTimeFilter) o; + + return _startingFrom == that._startingFrom; + } + @Override + public int hashCode() + { + return (int) (_startingFrom ^ (_startingFrom >>> 32)); + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java index 8c55c8ac76..7ed4e273a5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java @@ -40,7 +40,7 @@ public final class ArrivalTimeFilterFactory implements MessageFilterFactory String arg = arguments.get(0); long startingFrom= Long.parseLong(arg); - return new ArrivalTimeFilter(startingFrom); + return new ArrivalTimeFilter(System.currentTimeMillis() + startingFrom, startingFrom==0l); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java index ad14fa423a..764fb60ed0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java @@ -50,6 +50,18 @@ public class FilterManager return true; } + public boolean startAtTail() + { + for(MessageFilter filter : _filters.values()) + { + if(filter.startAtTail()) + { + return true; + } + } + return false; + } + public Iterator<MessageFilter> filters() { return _filters.values().iterator(); @@ -65,11 +77,42 @@ public class FilterManager return _filters.containsKey(name); } + public boolean hasFilter(final MessageFilter filter) + { + return _filters.containsValue(filter); + } + @Override public String toString() { return _filters.toString(); } + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final FilterManager that = (FilterManager) o; + + if (_filters != null ? !_filters.equals(that._filters) : that._filters != null) + { + return false; + } + return true; + } + + @Override + public int hashCode() + { + return _filters != null ? _filters.hashCode() : 0; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java index 6b8ae2f552..a670c96ac1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java @@ -95,22 +95,25 @@ public class FilterSupport && ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0; } - public static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException + public static FilterManager createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException { + FilterManager filterManager = null; if(argumentsContainNoLocal(args)) { - MessageFilter filter = new NoLocalFilter(queue); + filterManager = new FilterManager(); + filterManager.add(AMQPFilterTypes.NO_LOCAL.toString(), new NoLocalFilter(queue)); + } - if(argumentsContainJMSSelector(args)) + if(argumentsContainJMSSelector(args)) + { + if(filterManager == null) { - filter = new CompoundFilter(filter, createJMSSelectorFilter(args)); + filterManager = new FilterManager(); } - return filter; - } - else - { - return createJMSSelectorFilter(args); + filterManager.add(AMQPFilterTypes.JMS_SELECTOR.toString(),createJMSSelectorFilter(args)); } + return filterManager; + } @PluggableService @@ -144,6 +147,12 @@ public class FilterSupport } @Override + public boolean startAtTail() + { + return false; + } + + @Override public boolean equals(Object o) { if (this == o) @@ -170,62 +179,4 @@ public class FilterSupport } - static final class CompoundFilter implements MessageFilter - { - private MessageFilter _noLocalFilter; - private MessageFilter _jmsSelectorFilter; - - public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter) - { - _noLocalFilter = filter; - _jmsSelectorFilter = jmsSelectorFilter; - } - - @Override - public String getName() - { - return ""; - } - - public boolean matches(Filterable message) - { - return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message); - } - - @Override - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - if (o == null || getClass() != o.getClass()) - { - return false; - } - - CompoundFilter that = (CompoundFilter) o; - - if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null) - { - return false; - } - if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null) - { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0; - result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0); - return result; - } - - - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java index 295f9ae074..0b430ca471 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java @@ -20,11 +20,12 @@ */ package org.apache.qpid.server.filter; +import org.apache.qpid.filter.FilterableMessage; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; -public interface Filterable +public interface Filterable extends FilterableMessage { AMQMessageHeader getMessageHeader(); @@ -81,6 +82,55 @@ public interface Filterable { return message.getArrivalTime(); } + + @Override + public String getReplyTo() + { + return message.getMessageHeader().getReplyTo(); + } + + @Override + public String getType() + { + return message.getMessageHeader().getType(); + } + + @Override + public byte getPriority() + { + return message.getMessageHeader().getPriority(); + } + + @Override + public String getMessageId() + { + return message.getMessageHeader().getMessageId(); + } + + @Override + public long getTimestamp() + { + return message.getMessageHeader().getTimestamp(); + } + + @Override + public String getCorrelationId() + { + return message.getMessageHeader().getCorrelationId(); + } + + @Override + public long getExpiration() + { + return message.getMessageHeader().getExpiration(); + } + + @Override + public Object getHeader(String name) + { + return message.getMessageHeader().getHeader(name); + } + }; } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java index 5a08eb3a0f..1a50fd3cf4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java @@ -60,7 +60,7 @@ public class JMSSelectorFilter implements MessageFilter public boolean matches(Filterable message) { - boolean match = _matcher.matches(wrap(message)); + boolean match = _matcher.matches(message); if(_logger.isDebugEnabled()) { _logger.debug(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector); @@ -68,60 +68,10 @@ public class JMSSelectorFilter implements MessageFilter return match; } - private FilterableMessage wrap(final Filterable message) + @Override + public boolean startAtTail() { - return new FilterableMessage() - { - public boolean isPersistent() - { - return message.isPersistent(); - } - - public boolean isRedelivered() - { - return message.isRedelivered(); - } - - public Object getHeader(String name) - { - return message.getMessageHeader().getHeader(name); - } - - public String getReplyTo() - { - return message.getMessageHeader().getReplyTo(); - } - - public String getType() - { - return message.getMessageHeader().getType(); - } - - public byte getPriority() - { - return message.getMessageHeader().getPriority(); - } - - public String getMessageId() - { - return message.getMessageHeader().getMessageId(); - } - - public long getTimestamp() - { - return message.getMessageHeader().getTimestamp(); - } - - public String getCorrelationId() - { - return message.getMessageHeader().getCorrelationId(); - } - - public long getExpiration() - { - return message.getMessageHeader().getExpiration(); - } - }; + return false; } public String getSelector() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java index 226d646efd..253640b931 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java @@ -24,4 +24,6 @@ public interface MessageFilter { String getName(); boolean matches(Filterable message); + boolean startAtTail(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 486c501772..3b42014136 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -57,6 +57,7 @@ import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.filter.ArrivalTimeFilter; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.logging.EventLogger; @@ -262,7 +263,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private final QueueRunner _queueRunner = new QueueRunner(this); private boolean _closing; - private final ConcurrentMap<String,MessageFilter> _defaultFiltersMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<String,Task<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>(); protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost) { @@ -462,11 +463,20 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if(filterValue.size() == 1) { String filterTypeName = String.valueOf(filterValue.keySet().iterator().next()); - MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName); + final MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName); if(filterFactory != null) { - List<String> filterArguments = filterValue.values().iterator().next(); - _defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments)); + final List<String> filterArguments = filterValue.values().iterator().next(); + // check the arguments are valid + filterFactory.newInstance(filterArguments); + _defaultFiltersMap.put(name, new Task<MessageFilter>() + { + @Override + public MessageFilter execute() + { + return filterFactory.newInstance(filterArguments); + } + }); } else { @@ -786,11 +796,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { filters = new FilterManager(); } - for (Map.Entry<String,MessageFilter> filter : _defaultFiltersMap.entrySet()) + for (Map.Entry<String,Task<MessageFilter>> filter : _defaultFiltersMap.entrySet()) { if(!filters.hasFilter(filter.getKey())) { - filters.add(filter.getKey(), filter.getValue()); + filters.add(filter.getKey(), filter.getValue().execute()); } } } @@ -823,7 +833,16 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } consumer.setStateListener(this); - consumer.setQueueContext(new QueueContext(getEntries().getHead())); + QueueContext queueContext; + if(filters == null || !filters.startAtTail()) + { + queueContext = new QueueContext(getEntries().getHead()); + } + else + { + queueContext = new QueueContext(getEntries().getTail()); + } + consumer.setQueueContext(queueContext); if (!isDeleted()) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java index 634aa22928..f3822bc9ca 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java @@ -160,6 +160,12 @@ public abstract class OrderedQueueEntryList implements QueueEntryList return _head; } + @Override + public QueueEntry getTail() + { + return _tail; + } + public void entryDeleted(QueueEntry queueEntry) { QueueEntry next = _head.getNextNode(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index d8d6a2ff0b..d0b5578055 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -185,6 +185,14 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList } @Override + public PriorityQueueEntry getTail() + { + return (PriorityQueueEntry) _priorityLists[0].getTail(); + } + + + + @Override public void entryDeleted(final QueueEntry queueEntry) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index d4a91f2c0b..938f170bee 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -34,6 +34,8 @@ public interface QueueEntryList QueueEntry getHead(); + QueueEntry getTail(); + QueueEntry getOldestEntry(); void entryDeleted(QueueEntry queueEntry); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java index a37713c8b8..ee850455b0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java @@ -319,6 +319,19 @@ public class SortedQueueEntryList implements QueueEntryList return _head; } + + public SortedQueueEntry getTail() + { + SortedQueueEntry current = _head; + SortedQueueEntry next; + while((next = next(current))!=null) + { + current = next; + } + return current; + } + + @Override public QueueEntry getOldestEntry() { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 3723d19bfe..34afcdef66 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -113,6 +113,12 @@ public class MockConsumer implements ConsumerTarget } @Override + public boolean startAtTail() + { + return false; + } + + @Override public boolean matches(final Filterable message) { final String messageId = message.getMessageHeader().getMessageId(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 25ea2430ce..75a67c6c2a 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -294,7 +294,7 @@ public class ServerSessionDelegate extends SessionDelegate { filterManager = new FilterManager(); } - MessageFilter filter = new ArrivalTimeFilter(startingFrom); + MessageFilter filter = new ArrivalTimeFilter(startingFrom, period == 0); filterManager.add(filter.getName(), filter); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 99c9f11b33..87becd955d 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -755,6 +755,12 @@ public class AMQChannel { return message.getConnectionReference() != connectionReference; } + + @Override + public boolean startAtTail() + { + return false; + } }; filterManager.add(filter.getName(), filter); } @@ -788,7 +794,7 @@ public class AMQChannel { filterManager = new FilterManager(); } - MessageFilter filter = new ArrivalTimeFilter(startingFrom); + MessageFilter filter = new ArrivalTimeFilter(startingFrom, period==0); filterManager.add(filter.getName(), filter); } |
