From fe6bf2f7fb72255666af4dd4f3ea4a29b9bfb7f8 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sun, 15 Mar 2015 15:25:15 +0000 Subject: QPID-6452 : Defer creating filters in default filter map until a consumer is added. Optimize filters which start at the tail of the queue. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1666810 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/exchange/DirectExchange.java | 20 ++--- .../qpid/server/exchange/FanoutExchange.java | 46 ++++++------ .../qpid/server/exchange/HeadersBinding.java | 18 +++-- .../server/exchange/topic/TopicExchangeResult.java | 30 ++++---- .../qpid/server/filter/ArrivalTimeFilter.java | 35 ++++++++- .../server/filter/ArrivalTimeFilterFactory.java | 2 +- .../apache/qpid/server/filter/FilterManager.java | 43 +++++++++++ .../apache/qpid/server/filter/FilterSupport.java | 85 +++++----------------- .../org/apache/qpid/server/filter/Filterable.java | 52 ++++++++++++- .../qpid/server/filter/JMSSelectorFilter.java | 58 +-------------- .../apache/qpid/server/filter/MessageFilter.java | 2 + .../apache/qpid/server/queue/AbstractQueue.java | 33 +++++++-- .../qpid/server/queue/OrderedQueueEntryList.java | 6 ++ .../qpid/server/queue/PriorityQueueList.java | 8 ++ .../apache/qpid/server/queue/QueueEntryList.java | 2 + .../qpid/server/queue/SortedQueueEntryList.java | 13 ++++ .../apache/qpid/server/consumer/MockConsumer.java | 6 ++ .../protocol/v0_10/ServerSessionDelegate.java | 2 +- .../qpid/server/protocol/v0_8/AMQChannel.java | 8 +- 19 files changed, 283 insertions(+), 186 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index fa8b52db55..727678ccc4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; @@ -58,7 +59,7 @@ public class DirectExchange extends AbstractExchange { private CopyOnWriteArraySet _bindings = new CopyOnWriteArraySet(); private List _unfilteredQueues = new ArrayList(); - private Map _filteredQueues = new HashMap(); + private Map _filteredQueues = new HashMap<>(); public synchronized void addBinding(BindingImpl binding) { @@ -80,7 +81,7 @@ public class DirectExchange extends AbstractExchange private void recalculateQueues() { List queues = new ArrayList(_bindings.size()); - Map filteredQueues = new HashMap(); + Map filteredQueues = new HashMap<>(); for(BindingImpl b : _bindings) { @@ -89,7 +90,7 @@ public class DirectExchange extends AbstractExchange { try { - MessageFilter filter = FilterSupport.createMessageFilter(b.getArguments(), b.getAMQQueue()); + FilterManager filter = FilterSupport.createMessageFilter(b.getArguments(), b.getAMQQueue()); filteredQueues.put(b.getAMQQueue(),filter); } catch (AMQInvalidArgumentException e) @@ -129,7 +130,7 @@ public class DirectExchange extends AbstractExchange return !_filteredQueues.isEmpty(); } - public Map getFilteredQueues() + public Map getFilteredQueues() { return _filteredQueues; } @@ -159,14 +160,15 @@ public class DirectExchange extends AbstractExchange if(bindings.hasFilteredQueues()) { Set queuesSet = new HashSet(queues); + Filterable filterable = Filterable.Factory.newInstance(payload, instanceProperties); - Map filteredQueues = bindings.getFilteredQueues(); - for(Map.Entry entry : filteredQueues.entrySet()) + Map filteredQueues = bindings.getFilteredQueues(); + for(Map.Entry entry : filteredQueues.entrySet()) { if(!queuesSet.contains(entry.getKey())) { - MessageFilter filter = entry.getValue(); - if(filter.matches(Filterable.Factory.newInstance(payload, instanceProperties))) + FilterManager filter = entry.getValue(); + if(filter.allAllow(filterable)) { queuesSet.add(entry.getKey()); } @@ -174,7 +176,7 @@ public class DirectExchange extends AbstractExchange } if(queues.size() != queuesSet.size()) { - queues = new ArrayList(queuesSet); + queues = new ArrayList<>(queuesSet); } } return queues; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index a7265526fe..8495d648f4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -33,9 +33,9 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.filter.Filterable; -import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.ManagedObject; @@ -58,10 +58,10 @@ public class FanoutExchange extends AbstractExchange private final CopyOnWriteArrayList _unfilteredQueues = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList _filteredQueues = new CopyOnWriteArrayList(); - private final AtomicReference>> _filteredBindings = - new AtomicReference>>(); + private final AtomicReference>> _filteredBindings = + new AtomicReference<>(); { - Map> emptyMap = Collections.emptyMap(); + Map> emptyMap = Collections.emptyMap(); _filteredBindings.set(emptyMap); } @@ -85,17 +85,17 @@ public class FanoutExchange extends AbstractExchange final ArrayList result = new ArrayList(_unfilteredQueues); - final Map> filteredBindings = _filteredBindings.get(); + final Map> filteredBindings = _filteredBindings.get(); if(!_filteredQueues.isEmpty()) { for(AMQQueue q : _filteredQueues) { - final Map bindingMessageFilterMap = filteredBindings.get(q); + final Map bindingMessageFilterMap = filteredBindings.get(q); if(!(bindingMessageFilterMap == null || result.contains(q))) { - for(MessageFilter filter : bindingMessageFilterMap.values()) + for(FilterManager filter : bindingMessageFilterMap.values()) { - if(filter.matches(Filterable.Factory.newInstance(payload,instanceProperties))) + if(filter.allAllow(Filterable.Factory.newInstance(payload, instanceProperties))) { result.add(q); break; @@ -143,12 +143,12 @@ public class FanoutExchange extends AbstractExchange } else { - HashMap> filteredBindings = - new HashMap>(_filteredBindings.get()); + HashMap> filteredBindings = + new HashMap<>(_filteredBindings.get()); - Map bindingsForQueue; + Map bindingsForQueue; - final MessageFilter messageFilter; + final FilterManager messageFilter; try { @@ -163,11 +163,11 @@ public class FanoutExchange extends AbstractExchange if (oldArguments != null && !oldArguments.isEmpty() && FilterSupport.argumentsContainFilter(oldArguments)) { - bindingsForQueue = new HashMap(filteredBindings.remove(binding.getAMQQueue())); + bindingsForQueue = new HashMap<>(filteredBindings.remove(binding.getAMQQueue())); } else // previously unfiltered { - bindingsForQueue = new HashMap(); + bindingsForQueue = new HashMap(); Integer oldValue = _queues.remove(queue); if (ONE.equals(oldValue)) @@ -217,16 +217,16 @@ public class FanoutExchange extends AbstractExchange try { - HashMap> filteredBindings = - new HashMap>(_filteredBindings.get()); + HashMap> filteredBindings = + new HashMap<>(_filteredBindings.get()); - Map bindingsForQueue = filteredBindings.remove(binding.getAMQQueue()); - final MessageFilter messageFilter = + Map bindingsForQueue = filteredBindings.remove(binding.getAMQQueue()); + final FilterManager messageFilter = FilterSupport.createMessageFilter(binding.getArguments(), binding.getAMQQueue()); if(bindingsForQueue != null) { - bindingsForQueue = new HashMap(bindingsForQueue); + bindingsForQueue = new HashMap<>(bindingsForQueue); bindingsForQueue.put(binding, messageFilter); } else @@ -278,13 +278,13 @@ public class FanoutExchange extends AbstractExchange } else // we are removing a binding with filters { - HashMap> filteredBindings = - new HashMap>(_filteredBindings.get()); + HashMap> filteredBindings = + new HashMap<>(_filteredBindings.get()); - Map bindingsForQueue = filteredBindings.remove(binding.getAMQQueue()); + Map bindingsForQueue = filteredBindings.remove(binding.getAMQQueue()); if(bindingsForQueue.size()>1) { - bindingsForQueue = new HashMap(bindingsForQueue); + bindingsForQueue = new HashMap<>(bindingsForQueue); bindingsForQueue.remove(binding); filteredBindings.put(binding.getAMQQueue(),bindingsForQueue); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java index 4329e2e003..ab2a5195cf 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; @@ -48,7 +49,7 @@ class HeadersBinding private final Set required = new HashSet(); private final Map matches = new HashMap(); private boolean matchAny; - private MessageFilter _filter; + private FilterManager _filter; /** * Creates a header binding for a set of mappings. Those mappings whose value is @@ -86,7 +87,8 @@ class HeadersBinding _logger.warn("Invalid filter in binding queue '"+_binding.getAMQQueue().getName() +"' to exchange '"+_binding.getExchange().getName() +"' with arguments: " + _binding.getArguments()); - _filter = new MessageFilter() + _filter = new FilterManager(); + _filter.add("x-exclude-all",new MessageFilter() { @Override public String getName() @@ -94,12 +96,18 @@ class HeadersBinding return ""; } - @Override + @Override + public boolean startAtTail() + { + return false; + } + + @Override public boolean matches(Filterable message) { return false; } - }; + }); } } for(Map.Entry entry : _mappings.entrySet()) @@ -146,7 +154,7 @@ class HeadersBinding public boolean matches(Filterable message) { - return matches(message.getMessageHeader()) && (_filter == null || _filter.matches(message)); + return matches(message.getMessageHeader()) && (_filter == null || _filter.allAllow(message)); } private boolean and(AMQMessageHeader headers) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java index 0db3e9b378..5a0d4117a1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java @@ -31,15 +31,15 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.qpid.server.binding.BindingImpl; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.Filterable; -import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.queue.AMQQueue; public final class TopicExchangeResult implements TopicMatcherResult { private final List _bindings = new CopyOnWriteArrayList(); private final Map _unfilteredQueues = new ConcurrentHashMap(); - private final ConcurrentMap> _filteredQueues = new ConcurrentHashMap>(); + private final ConcurrentMap> _filteredQueues = new ConcurrentHashMap<>(); private volatile ArrayList _unfilteredQueueList = new ArrayList(0); public void addUnfilteredQueue(AMQQueue queue) @@ -93,15 +93,15 @@ public final class TopicExchangeResult implements TopicMatcherResult public List getBindings() { - return new ArrayList(_bindings); + return new ArrayList<>(_bindings); } - public void addFilteredQueue(AMQQueue queue, MessageFilter filter) + public void addFilteredQueue(AMQQueue queue, FilterManager filter) { - Map filters = _filteredQueues.get(queue); + Map filters = _filteredQueues.get(queue); if(filters == null) { - filters = new ConcurrentHashMap(); + filters = new ConcurrentHashMap<>(); _filteredQueues.put(queue, filters); } Integer instances = filters.get(filter); @@ -116,9 +116,9 @@ public final class TopicExchangeResult implements TopicMatcherResult } - public void removeFilteredQueue(AMQQueue queue, MessageFilter filter) + public void removeFilteredQueue(AMQQueue queue, FilterManager filter) { - Map filters = _filteredQueues.get(queue); + Map filters = _filteredQueues.get(queue); if(filters != null) { Integer instances = filters.get(filter); @@ -143,11 +143,11 @@ public final class TopicExchangeResult implements TopicMatcherResult } public void replaceQueueFilter(AMQQueue queue, - MessageFilter oldFilter, - MessageFilter newFilter) + FilterManager oldFilter, + FilterManager newFilter) { - Map filters = _filteredQueues.get(queue); - Map newFilters = new ConcurrentHashMap(filters); + Map filters = _filteredQueues.get(queue); + Map newFilters = new ConcurrentHashMap<>(filters); Integer oldFilterInstances = filters.get(oldFilter); if(oldFilterInstances == 1) { @@ -190,13 +190,13 @@ public final class TopicExchangeResult implements TopicMatcherResult queues.addAll(_unfilteredQueues.keySet()); if(!_filteredQueues.isEmpty()) { - for(Map.Entry> entry : _filteredQueues.entrySet()) + for(Map.Entry> entry : _filteredQueues.entrySet()) { if(!queues.contains(entry.getKey())) { - for(MessageFilter filter : entry.getValue().keySet()) + for(FilterManager filter : entry.getValue().keySet()) { - if(filter.matches(msg)) + if(filter.allAllow(msg)) { queues.add(entry.getKey()); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java index dbd6a5f6f6..0cb40abc07 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java @@ -25,10 +25,12 @@ import org.apache.qpid.common.AMQPFilterTypes; public final class ArrivalTimeFilter implements MessageFilter { private final long _startingFrom; + private final boolean _startAtTail; - public ArrivalTimeFilter(final long startingFrom) + public ArrivalTimeFilter(final long startingFrom, final boolean startAtTail) { _startingFrom = startingFrom; + _startAtTail = startAtTail; } @Override @@ -37,10 +39,39 @@ public final class ArrivalTimeFilter implements MessageFilter return AMQPFilterTypes.REPLAY_PERIOD.toString(); } + @Override + public boolean startAtTail() + { + return _startAtTail; + } + @Override public boolean matches(final Filterable message) { - return message.getArrivalTime() >= _startingFrom; + return message.getArrivalTime() >= _startingFrom; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final ArrivalTimeFilter that = (ArrivalTimeFilter) o; + + return _startingFrom == that._startingFrom; + } + @Override + public int hashCode() + { + return (int) (_startingFrom ^ (_startingFrom >>> 32)); + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java index 8c55c8ac76..7ed4e273a5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java @@ -40,7 +40,7 @@ public final class ArrivalTimeFilterFactory implements MessageFilterFactory String arg = arguments.get(0); long startingFrom= Long.parseLong(arg); - return new ArrivalTimeFilter(startingFrom); + return new ArrivalTimeFilter(System.currentTimeMillis() + startingFrom, startingFrom==0l); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java index ad14fa423a..764fb60ed0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java @@ -50,6 +50,18 @@ public class FilterManager return true; } + public boolean startAtTail() + { + for(MessageFilter filter : _filters.values()) + { + if(filter.startAtTail()) + { + return true; + } + } + return false; + } + public Iterator filters() { return _filters.values().iterator(); @@ -65,11 +77,42 @@ public class FilterManager return _filters.containsKey(name); } + public boolean hasFilter(final MessageFilter filter) + { + return _filters.containsValue(filter); + } + @Override public String toString() { return _filters.toString(); } + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final FilterManager that = (FilterManager) o; + + if (_filters != null ? !_filters.equals(that._filters) : that._filters != null) + { + return false; + } + return true; + } + + @Override + public int hashCode() + { + return _filters != null ? _filters.hashCode() : 0; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java index 6b8ae2f552..a670c96ac1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java @@ -95,22 +95,25 @@ public class FilterSupport && ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0; } - public static MessageFilter createMessageFilter(final Map args, AMQQueue queue) throws AMQInvalidArgumentException + public static FilterManager createMessageFilter(final Map args, AMQQueue queue) throws AMQInvalidArgumentException { + FilterManager filterManager = null; if(argumentsContainNoLocal(args)) { - MessageFilter filter = new NoLocalFilter(queue); + filterManager = new FilterManager(); + filterManager.add(AMQPFilterTypes.NO_LOCAL.toString(), new NoLocalFilter(queue)); + } - if(argumentsContainJMSSelector(args)) + if(argumentsContainJMSSelector(args)) + { + if(filterManager == null) { - filter = new CompoundFilter(filter, createJMSSelectorFilter(args)); + filterManager = new FilterManager(); } - return filter; - } - else - { - return createJMSSelectorFilter(args); + filterManager.add(AMQPFilterTypes.JMS_SELECTOR.toString(),createJMSSelectorFilter(args)); } + return filterManager; + } @PluggableService @@ -143,6 +146,12 @@ public class FilterSupport return !consumers.isEmpty(); } + @Override + public boolean startAtTail() + { + return false; + } + @Override public boolean equals(Object o) { @@ -170,62 +179,4 @@ public class FilterSupport } - static final class CompoundFilter implements MessageFilter - { - private MessageFilter _noLocalFilter; - private MessageFilter _jmsSelectorFilter; - - public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter) - { - _noLocalFilter = filter; - _jmsSelectorFilter = jmsSelectorFilter; - } - - @Override - public String getName() - { - return ""; - } - - public boolean matches(Filterable message) - { - return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message); - } - - @Override - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - if (o == null || getClass() != o.getClass()) - { - return false; - } - - CompoundFilter that = (CompoundFilter) o; - - if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null) - { - return false; - } - if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null) - { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0; - result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0); - return result; - } - - - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java index 295f9ae074..0b430ca471 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java @@ -20,11 +20,12 @@ */ package org.apache.qpid.server.filter; +import org.apache.qpid.filter.FilterableMessage; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; -public interface Filterable +public interface Filterable extends FilterableMessage { AMQMessageHeader getMessageHeader(); @@ -81,6 +82,55 @@ public interface Filterable { return message.getArrivalTime(); } + + @Override + public String getReplyTo() + { + return message.getMessageHeader().getReplyTo(); + } + + @Override + public String getType() + { + return message.getMessageHeader().getType(); + } + + @Override + public byte getPriority() + { + return message.getMessageHeader().getPriority(); + } + + @Override + public String getMessageId() + { + return message.getMessageHeader().getMessageId(); + } + + @Override + public long getTimestamp() + { + return message.getMessageHeader().getTimestamp(); + } + + @Override + public String getCorrelationId() + { + return message.getMessageHeader().getCorrelationId(); + } + + @Override + public long getExpiration() + { + return message.getMessageHeader().getExpiration(); + } + + @Override + public Object getHeader(String name) + { + return message.getMessageHeader().getHeader(name); + } + }; } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java index 5a08eb3a0f..1a50fd3cf4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java @@ -60,7 +60,7 @@ public class JMSSelectorFilter implements MessageFilter public boolean matches(Filterable message) { - boolean match = _matcher.matches(wrap(message)); + boolean match = _matcher.matches(message); if(_logger.isDebugEnabled()) { _logger.debug(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector); @@ -68,60 +68,10 @@ public class JMSSelectorFilter implements MessageFilter return match; } - private FilterableMessage wrap(final Filterable message) + @Override + public boolean startAtTail() { - return new FilterableMessage() - { - public boolean isPersistent() - { - return message.isPersistent(); - } - - public boolean isRedelivered() - { - return message.isRedelivered(); - } - - public Object getHeader(String name) - { - return message.getMessageHeader().getHeader(name); - } - - public String getReplyTo() - { - return message.getMessageHeader().getReplyTo(); - } - - public String getType() - { - return message.getMessageHeader().getType(); - } - - public byte getPriority() - { - return message.getMessageHeader().getPriority(); - } - - public String getMessageId() - { - return message.getMessageHeader().getMessageId(); - } - - public long getTimestamp() - { - return message.getMessageHeader().getTimestamp(); - } - - public String getCorrelationId() - { - return message.getMessageHeader().getCorrelationId(); - } - - public long getExpiration() - { - return message.getMessageHeader().getExpiration(); - } - }; + return false; } public String getSelector() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java index 226d646efd..253640b931 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java @@ -24,4 +24,6 @@ public interface MessageFilter { String getName(); boolean matches(Filterable message); + boolean startAtTail(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 486c501772..3b42014136 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -57,6 +57,7 @@ import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.filter.ArrivalTimeFilter; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.logging.EventLogger; @@ -262,7 +263,7 @@ public abstract class AbstractQueue> private final QueueRunner _queueRunner = new QueueRunner(this); private boolean _closing; - private final ConcurrentMap _defaultFiltersMap = new ConcurrentHashMap<>(); + private final ConcurrentMap> _defaultFiltersMap = new ConcurrentHashMap<>(); protected AbstractQueue(Map attributes, VirtualHostImpl virtualHost) { @@ -462,11 +463,20 @@ public abstract class AbstractQueue> if(filterValue.size() == 1) { String filterTypeName = String.valueOf(filterValue.keySet().iterator().next()); - MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName); + final MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName); if(filterFactory != null) { - List filterArguments = filterValue.values().iterator().next(); - _defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments)); + final List filterArguments = filterValue.values().iterator().next(); + // check the arguments are valid + filterFactory.newInstance(filterArguments); + _defaultFiltersMap.put(name, new Task() + { + @Override + public MessageFilter execute() + { + return filterFactory.newInstance(filterArguments); + } + }); } else { @@ -786,11 +796,11 @@ public abstract class AbstractQueue> { filters = new FilterManager(); } - for (Map.Entry filter : _defaultFiltersMap.entrySet()) + for (Map.Entry> filter : _defaultFiltersMap.entrySet()) { if(!filters.hasFilter(filter.getKey())) { - filters.add(filter.getKey(), filter.getValue()); + filters.add(filter.getKey(), filter.getValue().execute()); } } } @@ -823,7 +833,16 @@ public abstract class AbstractQueue> } consumer.setStateListener(this); - consumer.setQueueContext(new QueueContext(getEntries().getHead())); + QueueContext queueContext; + if(filters == null || !filters.startAtTail()) + { + queueContext = new QueueContext(getEntries().getHead()); + } + else + { + queueContext = new QueueContext(getEntries().getTail()); + } + consumer.setQueueContext(queueContext); if (!isDeleted()) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java index 634aa22928..f3822bc9ca 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java @@ -160,6 +160,12 @@ public abstract class OrderedQueueEntryList implements QueueEntryList return _head; } + @Override + public QueueEntry getTail() + { + return _tail; + } + public void entryDeleted(QueueEntry queueEntry) { QueueEntry next = _head.getNextNode(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index d8d6a2ff0b..d0b5578055 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -184,6 +184,14 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList return (PriorityQueueEntry) _priorityLists[_priorities-1].getHead(); } + @Override + public PriorityQueueEntry getTail() + { + return (PriorityQueueEntry) _priorityLists[0].getTail(); + } + + + @Override public void entryDeleted(final QueueEntry queueEntry) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index d4a91f2c0b..938f170bee 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -34,6 +34,8 @@ public interface QueueEntryList QueueEntry getHead(); + QueueEntry getTail(); + QueueEntry getOldestEntry(); void entryDeleted(QueueEntry queueEntry); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java index a37713c8b8..ee850455b0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java @@ -319,6 +319,19 @@ public class SortedQueueEntryList implements QueueEntryList return _head; } + + public SortedQueueEntry getTail() + { + SortedQueueEntry current = _head; + SortedQueueEntry next; + while((next = next(current))!=null) + { + current = next; + } + return current; + } + + @Override public QueueEntry getOldestEntry() { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 3723d19bfe..34afcdef66 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -112,6 +112,12 @@ public class MockConsumer implements ConsumerTarget return ""; } + @Override + public boolean startAtTail() + { + return false; + } + @Override public boolean matches(final Filterable message) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 25ea2430ce..75a67c6c2a 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -294,7 +294,7 @@ public class ServerSessionDelegate extends SessionDelegate { filterManager = new FilterManager(); } - MessageFilter filter = new ArrivalTimeFilter(startingFrom); + MessageFilter filter = new ArrivalTimeFilter(startingFrom, period == 0); filterManager.add(filter.getName(), filter); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 99c9f11b33..87becd955d 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -755,6 +755,12 @@ public class AMQChannel { return message.getConnectionReference() != connectionReference; } + + @Override + public boolean startAtTail() + { + return false; + } }; filterManager.add(filter.getName(), filter); } @@ -788,7 +794,7 @@ public class AMQChannel { filterManager = new FilterManager(); } - MessageFilter filter = new ArrivalTimeFilter(startingFrom); + MessageFilter filter = new ArrivalTimeFilter(startingFrom, period==0); filterManager.add(filter.getName(), filter); } -- cgit v1.2.1