From 3e4d1f2f56ef296ea5132511faaa8689867c499c Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 1 Jun 2013 19:24:36 +0000 Subject: QPID-4897 : [Java Broker] Allow selectors on bindings fro non-topic exchanges git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1488561 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/exchange/AbstractExchange.java | 99 +++- .../qpid/server/exchange/DefaultExchange.java | 12 + .../qpid/server/exchange/DirectExchange.java | 127 +++-- .../org/apache/qpid/server/exchange/Exchange.java | 9 +- .../qpid/server/exchange/FanoutExchange.java | 190 ++++--- .../apache/qpid/server/exchange/FilterSupport.java | 258 +++++++++ .../qpid/server/exchange/HeadersBinding.java | 41 +- .../qpid/server/exchange/HeadersExchange.java | 105 +--- .../apache/qpid/server/exchange/TopicExchange.java | 289 +--------- .../qpid/server/handler/ExchangeBoundHandler.java | 10 +- .../server/transport/ServerSessionDelegate.java | 16 +- .../exchange/AbstractHeadersExchangeTestBase.java | 633 --------------------- .../qpid/server/exchange/FanoutExchangeTest.java | 97 +++- .../qpid/server/exchange/HeadersExchangeTest.java | 276 ++++++--- .../org/apache/qpid/client/AMQHeadersExchange.java | 2 +- .../java/org/apache/qpid/client/AMQSession.java | 115 ++-- .../org/apache/qpid/client/AMQSession_0_10.java | 146 ++--- .../org/apache/qpid/client/AMQSession_0_8.java | 62 +- .../main/java/org/apache/qpid/client/AMQTopic.java | 20 +- .../org/apache/qpid/common/AMQPFilterTypes.java | 7 +- .../ReturnUnroutableMandatoryMessageTest.java | 5 +- .../qpid/server/logging/BindingLoggingTest.java | 10 +- .../destination/AddressBasedDestinationTest.java | 546 +++++++++--------- .../qpid/test/client/message/SelectorTest.java | 34 +- .../qpid/test/unit/message/StreamMessageTest.java | 8 +- .../qpid/test/unit/topic/TopicSessionTest.java | 45 +- 26 files changed, 1501 insertions(+), 1661 deletions(-) create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 0d05307cb4..58c2b33041 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -173,20 +174,106 @@ public abstract class AbstractExchange implements Exchange return getVirtualHost().getQueueRegistry(); } - public boolean isBound(String bindingKey, Map arguments, AMQQueue queue) + public final boolean isBound(AMQShortString routingKey, FieldTable ft, AMQQueue queue) { - return isBound(new AMQShortString(bindingKey), queue); + return isBound(routingKey == null ? "" : routingKey.asString(), FieldTable.convertToMap(ft), queue); } + public final boolean isBound(String bindingKey, Map arguments, AMQQueue queue) + { + for(Binding b : _bindings) + { + if(bindingKey.equals(b.getBindingKey()) && queue == b.getQueue()) + { + return (b.getArguments() == null || b.getArguments().isEmpty()) + ? (arguments == null || arguments.isEmpty()) + : b.getArguments().equals(arguments); + } + } + return false; + } + + public final boolean isBound(AMQShortString routingKey, AMQQueue queue) + { + return isBound(routingKey==null ? "" : routingKey.asString(), queue); + } + + public final boolean isBound(String bindingKey, AMQQueue queue) + { + for(Binding b : _bindings) + { + if(bindingKey.equals(b.getBindingKey()) && queue == b.getQueue()) + { + return true; + } + } + return false; + } + + public final boolean isBound(AMQShortString routingKey) + { + return isBound(routingKey == null ? "" : routingKey.asString()); + } + + public final boolean isBound(String bindingKey) + { + for(Binding b : _bindings) + { + if(bindingKey.equals(b.getBindingKey())) + { + return true; + } + } + return false; + } + + public final boolean isBound(AMQQueue queue) + { + for(Binding b : _bindings) + { + if(queue == b.getQueue()) + { + return true; + } + } + return false; + } - public boolean isBound(String bindingKey, AMQQueue queue) + @Override + public final boolean isBound(Map arguments, AMQQueue queue) { - return isBound(new AMQShortString(bindingKey), queue); + for(Binding b : _bindings) + { + if(queue == b.getQueue() && + ((b.getArguments() == null || b.getArguments().isEmpty()) + ? (arguments == null || arguments.isEmpty()) + : b.getArguments().equals(arguments))) + { + return true; + } + } + return false; + } + + @Override + public final boolean isBound(String bindingKey, Map arguments) + { + for(Binding b : _bindings) + { + if(b.getBindingKey().equals(bindingKey) && + ((b.getArguments() == null || b.getArguments().isEmpty()) + ? (arguments == null || arguments.isEmpty()) + : b.getArguments().equals(arguments))) + { + return true; + } + } + return false; } - public boolean isBound(String bindingKey) + public final boolean hasBindings() { - return isBound(new AMQShortString(bindingKey)); + return !_bindings.isEmpty(); } public Exchange getAlternateExchange() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index 4e136965a1..ccf955ed1c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -271,6 +271,18 @@ public class DefaultExchange implements Exchange return isBound(bindingKey, queue) && (arguments == null || arguments.isEmpty()); } + @Override + public boolean isBound(Map arguments, AMQQueue queue) + { + return (arguments == null || arguments.isEmpty()) && isBound(queue); + } + + @Override + public boolean isBound(String bindingKey, Map arguments) + { + return (arguments == null || arguments.isEmpty()) && isBound(bindingKey); + } + @Override public boolean isBound(String bindingKey) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index fc6ce15bc4..2e2a93d638 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -20,9 +20,18 @@ */ package org.apache.qpid.server.exchange; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQInvalidArgumentException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.filter.JMSSelectorFilter; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; @@ -36,10 +45,14 @@ import java.util.concurrent.CopyOnWriteArraySet; public class DirectExchange extends AbstractExchange { + + private static final Logger _logger = Logger.getLogger(DirectExchange.class); + private static final class BindingSet { private CopyOnWriteArraySet _bindings = new CopyOnWriteArraySet(); - private List _queues = new ArrayList(); + private List _unfilteredQueues = new ArrayList(); + private Map _filteredQueues = new HashMap(); public synchronized void addBinding(Binding binding) { @@ -56,27 +69,59 @@ public class DirectExchange extends AbstractExchange private void recalculateQueues() { List queues = new ArrayList(_bindings.size()); + Map filteredQueues = new HashMap(); for(Binding b : _bindings) { - if(!queues.contains(b.getQueue())) + + if(FilterSupport.argumentsContainFilter(b.getArguments())) { - queues.add(b.getQueue()); + try + { + MessageFilter filter = FilterSupport.createMessageFilter(b.getArguments(), b.getQueue()); + filteredQueues.put(b.getQueue(),filter); + } + catch (AMQInvalidArgumentException e) + { + _logger.warn("Binding ignored: cannot parse filter on binding of queue '"+b.getQueue().getName() + + "' to exchange '" + b.getExchange().getName() + + "' with arguments: " + b.getArguments(), e); + } + + } + else + { + + if(!queues.contains(b.getQueue())) + { + queues.add(b.getQueue()); + } } } - _queues = queues; + _unfilteredQueues = queues; + _filteredQueues = filteredQueues; } - public List getQueues() + public List getUnfilteredQueues() { - return _queues; + return _unfilteredQueues; } public CopyOnWriteArraySet getBindings() { return _bindings; } + + public boolean hasFilteredQueues() + { + return !_filteredQueues.isEmpty(); + } + + public Map getFilteredQueues() + { + return _filteredQueues; + } } private final ConcurrentHashMap _bindingsByKey = @@ -98,7 +143,30 @@ public class DirectExchange extends AbstractExchange if(bindings != null) { - return bindings.getQueues(); + List queues = bindings.getUnfilteredQueues(); + + if(bindings.hasFilteredQueues()) + { + Set queuesSet = new HashSet(queues); + + Map filteredQueues = bindings.getFilteredQueues(); + for(Map.Entry entry : filteredQueues.entrySet()) + { + if(!queuesSet.contains(entry.getKey())) + { + MessageFilter filter = entry.getValue(); + if(filter.matches(payload)) + { + queuesSet.add(entry.getKey()); + } + } + } + if(queues.size() != queuesSet.size()) + { + queues = new ArrayList(queuesSet); + } + } + return queues; } else { @@ -106,50 +174,6 @@ public class DirectExchange extends AbstractExchange } - - } - - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - return isBound(routingKey,queue); - } - - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - BindingSet bindings = _bindingsByKey.get(bindingKey); - if(bindings != null) - { - return bindings.getQueues().contains(queue); - } - return false; - - } - - public boolean isBound(AMQShortString routingKey) - { - String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - BindingSet bindings = _bindingsByKey.get(bindingKey); - return bindings != null && !bindings.getQueues().isEmpty(); - } - - public boolean isBound(AMQQueue queue) - { - - for (BindingSet bindings : _bindingsByKey.values()) - { - if(bindings.getQueues().contains(queue)) - { - return true; - } - - } - return false; - } - - public boolean hasBindings() - { - return !getBindings().isEmpty(); } protected void onBind(final Binding binding) @@ -189,5 +213,4 @@ public class DirectExchange extends AbstractExchange } - } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index a5a1d7f912..d483c3b29b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -145,12 +145,15 @@ public interface Exchange extends ExchangeReferrer Collection getBindings(); + boolean isBound(String bindingKey); boolean isBound(String bindingKey, AMQQueue queue); - public boolean isBound(String bindingKey, Map arguments, AMQQueue queue); + boolean isBound(String bindingKey, Map arguments, AMQQueue queue); - boolean isBound(String bindingKey); + boolean isBound(Map arguments, AMQQueue queue); + + boolean isBound(String bindingKey, Map arguments); void removeReference(ExchangeReferrer exchange); @@ -158,6 +161,8 @@ public interface Exchange extends ExchangeReferrer boolean hasReferrers(); + + public interface BindingListener { void bindingAdded(Exchange exchange, Binding binding); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 6ad5eb261e..cd830d69a9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -20,11 +20,21 @@ */ package org.apache.qpid.server.exchange; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; +import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; @@ -42,7 +52,18 @@ public class FanoutExchange extends AbstractExchange /** * Maps from queue name to queue instances */ - private final ConcurrentHashMap _queues = new ConcurrentHashMap(); + private final Map _queues = new HashMap(); + private final CopyOnWriteArrayList _unfilteredQueues = new CopyOnWriteArrayList(); + private final CopyOnWriteArrayList _filteredQueues = new CopyOnWriteArrayList(); + + private final AtomicReference>> _filteredBindings = + new AtomicReference>>(); + { + Map> emptyMap = Collections.emptyMap(); + _filteredBindings.set(emptyMap); + } + + public static final ExchangeType TYPE = new FanoutExchangeType(); @@ -54,115 +75,150 @@ public class FanoutExchange extends AbstractExchange public ArrayList doRoute(InboundMessage payload) { - - if (_logger.isDebugEnabled()) - { - _logger.debug("Publishing message to queue " + _queues); - } - for(Binding b : getBindings()) { b.incrementMatches(); } - return new ArrayList(_queues.keySet()); - - } + final ArrayList result = new ArrayList(_unfilteredQueues); - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - return isBound(routingKey, queue); - } - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - return isBound(queue); - } + final Map> filteredBindings = _filteredBindings.get(); + if(!_filteredQueues.isEmpty()) + { + for(AMQQueue q : _filteredQueues) + { + final Map bindingMessageFilterMap = filteredBindings.get(q); + if(!(bindingMessageFilterMap == null || result.contains(q))) + { + for(MessageFilter filter : bindingMessageFilterMap.values()) + { + if(filter.matches(payload)) + { + result.add(q); + break; + } + } + } + } - public boolean isBound(AMQShortString routingKey) - { + } - return (_queues != null) && !_queues.isEmpty(); - } - public boolean isBound(AMQQueue queue) - { - if (queue == null) + if (_logger.isDebugEnabled()) { - return false; + _logger.debug("Publishing message to queue " + result); } - return _queues.containsKey(queue); - } - public boolean hasBindings() - { - return !_queues.isEmpty(); + return result; + } - protected void onBind(final Binding binding) + + protected synchronized void onBind(final Binding binding) { AMQQueue queue = binding.getQueue(); assert queue != null; + if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments())) + { - Integer oldVal; + Integer oldVal; + if(_queues.containsKey(queue)) + { + _queues.put(queue,_queues.get(queue)+1); + } + else + { + _queues.put(queue, ONE); + _unfilteredQueues.add(queue); + // No longer any reason to check filters for this queue + _filteredQueues.remove(queue); + } - if((oldVal = _queues.putIfAbsent(queue, ONE)) != null) + } + else { - Integer newVal = oldVal+1; - while(!_queues.replace(queue, oldVal, newVal)) + try { - oldVal = _queues.get(queue); - if(oldVal == null) + + HashMap> filteredBindings = + new HashMap>(_filteredBindings.get()); + + Map bindingsForQueue = filteredBindings.remove(binding.getQueue()); + final + MessageFilter messageFilter = + FilterSupport.createMessageFilter(binding.getArguments(), binding.getQueue()); + + if(bindingsForQueue != null) { - oldVal = _queues.putIfAbsent(queue, ONE); - if(oldVal == null) + bindingsForQueue = new HashMap(bindingsForQueue); + bindingsForQueue.put(binding, messageFilter); + } + else + { + bindingsForQueue = Collections.singletonMap(binding, messageFilter); + if(!_unfilteredQueues.contains(queue)) { - break; + _filteredQueues.add(queue); } } - newVal = oldVal + 1; + + filteredBindings.put(binding.getQueue(), bindingsForQueue); + + _filteredBindings.set(filteredBindings); + + } + catch (AMQInvalidArgumentException e) + { + _logger.warn("Cannoy bind queue " + queue + " to exchange this " + this + " beacuse selector cannot be parsed.", e); + return; } } - if (_logger.isDebugEnabled()) { _logger.debug("Binding queue " + queue - + " with routing key " + new AMQShortString(binding.getBindingKey()) + " to exchange " + this); + + " with routing key " + binding.getBindingKey() + " to exchange " + this); } } - protected void onUnbind(final Binding binding) + protected synchronized void onUnbind(final Binding binding) { AMQQueue queue = binding.getQueue(); - Integer oldValue = _queues.get(queue); - - boolean done = false; - - while(!(done || oldValue == null)) + if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments())) { - while(!(done || oldValue == null) && oldValue.intValue() == 1) + Integer oldValue = _queues.remove(queue); + if(ONE.equals(oldValue)) { - if(!_queues.remove(queue, oldValue)) + // should start checking filters for this queue + if(_filteredBindings.get().containsKey(queue)) { - oldValue = _queues.get(queue); - } - else - { - done = true; + _filteredQueues.add(queue); } + _unfilteredQueues.remove(queue); } - while(!(done || oldValue == null) && oldValue.intValue() != 1) + else { - Integer newValue = oldValue - 1; - if(!_queues.replace(queue, oldValue, newValue)) - { - oldValue = _queues.get(queue); - } - else - { - done = true; - } + _queues.put(queue,oldValue-1); } } + else // we are removing a binding with filters + { + HashMap> filteredBindings = + new HashMap>(_filteredBindings.get()); + + Map bindingsForQueue = filteredBindings.remove(binding.getQueue()); + if(bindingsForQueue.size()>1) + { + bindingsForQueue = new HashMap(bindingsForQueue); + bindingsForQueue.remove(binding); + filteredBindings.put(binding.getQueue(),bindingsForQueue); + } + else + { + _filteredQueues.remove(queue); + } + _filteredBindings.set(filteredBindings); + + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java new file mode 100644 index 0000000000..880c9a2cf6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java @@ -0,0 +1,258 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.exchange; + +import java.lang.ref.WeakReference; +import java.util.Collections; +import java.util.Map; +import java.util.WeakHashMap; +import org.apache.qpid.AMQInvalidArgumentException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.filter.SelectorParsingException; +import org.apache.qpid.filter.selector.ParseException; +import org.apache.qpid.filter.selector.TokenMgrError; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.filter.JMSSelectorFilter; +import org.apache.qpid.server.filter.MessageFilter; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.Filterable; + +public class FilterSupport +{ + private static final Map> _selectorCache = + Collections.synchronizedMap(new WeakHashMap>()); + + static MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException + { + final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()); + return getMessageFilter(selectorString); + } + + + static MessageFilter createJMSSelectorFilter(Map args) throws AMQInvalidArgumentException + { + final String selectorString = (String) args.get(AMQPFilterTypes.JMS_SELECTOR.getValue()); + return getMessageFilter(selectorString); + } + + + private static MessageFilter getMessageFilter(String selectorString) throws AMQInvalidArgumentException + { + WeakReference selectorRef = _selectorCache.get(selectorString); + JMSSelectorFilter selector = null; + + if(selectorRef == null || (selector = selectorRef.get())==null) + { + try + { + selector = new JMSSelectorFilter(selectorString); + } + catch (ParseException e) + { + throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); + } + catch (SelectorParsingException e) + { + throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); + } + catch (TokenMgrError e) + { + throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); + } + _selectorCache.put(selectorString, new WeakReference(selector)); + } + return selector; + } + + static boolean argumentsContainFilter(final FieldTable args) + { + return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args); + } + + + static boolean argumentsContainFilter(final Map args) + { + return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args); + } + + + static boolean argumentsContainNoLocal(final Map args) + { + return args != null + && args.containsKey(AMQPFilterTypes.NO_LOCAL.toString()) + && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.toString())); + } + + + static boolean argumentsContainNoLocal(final FieldTable args) + { + return args != null + && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue()) + && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue())); + } + + + static boolean argumentsContainJMSSelector(final Map args) + { + return args != null && (args.get(AMQPFilterTypes.JMS_SELECTOR.toString()) instanceof String) + && ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0; + } + + + static boolean argumentsContainJMSSelector(final FieldTable args) + { + return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) + && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0); + } + + + static MessageFilter createMessageFilter(final Map args, AMQQueue queue) throws AMQInvalidArgumentException + { + if(argumentsContainNoLocal(args)) + { + MessageFilter filter = new NoLocalFilter(queue); + + if(argumentsContainJMSSelector(args)) + { + filter = new CompoundFilter(filter, createJMSSelectorFilter(args)); + } + return filter; + } + else + { + return createJMSSelectorFilter(args); + } + } + + static MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException + { + if(argumentsContainNoLocal(args)) + { + MessageFilter filter = new NoLocalFilter(queue); + + if(argumentsContainJMSSelector(args)) + { + filter = new CompoundFilter(filter, createJMSSelectorFilter(args)); + } + return filter; + } + else + { + return createJMSSelectorFilter(args); + } + } + + static final class NoLocalFilter implements MessageFilter + { + private final AMQQueue _queue; + + public NoLocalFilter(AMQQueue queue) + { + _queue = queue; + } + + public boolean matches(Filterable message) + { + InboundMessage inbound = (InboundMessage) message; + final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession(); + return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound); + + } + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + + if (o == null || getClass() != o.getClass()) + { + return false; + } + + NoLocalFilter that = (NoLocalFilter) o; + + return _queue == null ? that._queue == null : _queue.equals(that._queue); + } + + @Override + public int hashCode() + { + return _queue != null ? _queue.hashCode() : 0; + } + } + + static final class CompoundFilter implements MessageFilter + { + private MessageFilter _noLocalFilter; + private MessageFilter _jmsSelectorFilter; + + public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter) + { + _noLocalFilter = filter; + _jmsSelectorFilter = jmsSelectorFilter; + } + + public boolean matches(Filterable message) + { + return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + CompoundFilter that = (CompoundFilter) o; + + if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null) + { + return false; + } + if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0; + result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0); + return result; + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java index b6f5f973f4..eb4a84a5b9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java @@ -22,15 +22,19 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; +import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.framing.AMQTypedValue; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.AMQMessageHeader; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.queue.Filterable; /** * Defines binding and matching based on a set of headers. @@ -44,13 +48,14 @@ class HeadersBinding private final Set required = new HashSet(); private final Map matches = new HashMap(); private boolean matchAny; + private MessageFilter _filter; /** * Creates a header binding for a set of mappings. Those mappings whose value is * null or the empty string are assumed only to be required headers, with * no constraint on the value. Those with a non-null value are assumed to * define a required match of value. - * + * * @param binding the binding to create a header binding using */ public HeadersBinding(Binding binding) @@ -66,9 +71,30 @@ class HeadersBinding _mappings = null; } } - + private void initMappings() { + if(FilterSupport.argumentsContainFilter(_mappings)) + { + try + { + _filter = FilterSupport.createMessageFilter(_mappings,_binding.getQueue()); + } + catch (AMQInvalidArgumentException e) + { + _logger.warn("Invalid filter in binding queue '"+_binding.getQueue().getName() + +"' to exchange '"+_binding.getExchange().getName() + +"' with arguments: " + _binding.getArguments()); + _filter = new MessageFilter() + { + @Override + public boolean matches(Filterable message) + { + return false; + } + }; + } + } for(Map.Entry entry : _mappings.entrySet()) { String propertyName = entry.getKey(); @@ -87,7 +113,7 @@ class HeadersBinding } } } - + public Binding getBinding() { return _binding; @@ -111,6 +137,11 @@ class HeadersBinding } } + public boolean matches(InboundMessage message) + { + return matches(message.getMessageHeader()) && (_filter == null || _filter.matches(message)); + } + private boolean and(AMQMessageHeader headers) { if(headers.containsHeaders(required)) @@ -215,7 +246,7 @@ class HeadersBinding { return key.startsWith("X-") || key.startsWith("x-"); } - + @Override public boolean equals(final Object o) { @@ -250,4 +281,4 @@ class HeadersBinding return true; } -} \ No newline at end of file +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 746c8ac6bc..9fb745d553 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -69,14 +69,14 @@ public class HeadersExchange extends AbstractExchange { private static final Logger _logger = Logger.getLogger(HeadersExchange.class); - + private final ConcurrentHashMap> _bindingsByKey = new ConcurrentHashMap>(); - + private final CopyOnWriteArrayList _bindingHeaderMatchers = new CopyOnWriteArrayList(); - + public static final ExchangeType TYPE = new HeadersExchangeType(); public HeadersExchange() @@ -87,112 +87,31 @@ public class HeadersExchange extends AbstractExchange public ArrayList doRoute(InboundMessage payload) { - AMQMessageHeader header = payload.getMessageHeader(); if (_logger.isDebugEnabled()) { - _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + header); + _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + payload.getMessageHeader()); } - + LinkedHashSet queues = new LinkedHashSet(); - + for (HeadersBinding hb : _bindingHeaderMatchers) { - if (hb.matches(header)) + if (hb.matches(payload)) { Binding b = hb.getBinding(); - + b.incrementMatches(); - + if (_logger.isDebugEnabled()) { _logger.debug("Exchange " + getNameShortString() + ": delivering message with headers " + - header + " to " + b.getQueue().getNameShortString()); + payload.getMessageHeader() + " to " + b.getQueue().getNameShortString()); } queues.add(b.getQueue()); } } - - return new ArrayList(queues); - } - - - public boolean isBound(String bindingKey, Map arguments, AMQQueue queue) - { - CopyOnWriteArraySet bindings; - if(bindingKey == null) - { - bindings = new CopyOnWriteArraySet(getBindings()); - } - else - { - bindings = _bindingsByKey.get(bindingKey); - } - - if(bindings != null) - { - for(Binding binding : bindings) - { - if(queue == null || binding.getQueue().equals(queue)) - { - return arguments == null ? binding.getArguments() == null : binding.getArguments().equals(arguments); - } - } - } - - return false; - } - - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - //fixme isBound here should take the arguements in to consideration. - return isBound(routingKey, queue); - } - - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - CopyOnWriteArraySet bindings = _bindingsByKey.get(bindingKey); - - if(bindings != null) - { - for(Binding binding : bindings) - { - if(binding.getQueue().equals(queue)) - { - return true; - } - } - } - - return false; - } - - public boolean isBound(AMQShortString routingKey) - { - String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - CopyOnWriteArraySet bindings = _bindingsByKey.get(bindingKey); - return bindings != null && !bindings.isEmpty(); - } - - public boolean isBound(AMQQueue queue) - { - for (CopyOnWriteArraySet bindings : _bindingsByKey.values()) - { - for(Binding binding : bindings) - { - if(binding.getQueue().equals(queue)) - { - return true; - } - } - } - - return false; - } - public boolean hasBindings() - { - return !getBindings().isEmpty(); + return new ArrayList(queues); } protected void onBind(final Binding binding) @@ -216,7 +135,7 @@ public class HeadersExchange extends AbstractExchange bindings = newBindings; } } - + if(_logger.isDebugEnabled()) { _logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 6d548be508..9d41856dc0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -20,21 +20,15 @@ */ package org.apache.qpid.server.exchange; -import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.qpid.AMQInvalidArgumentException; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.filter.SelectorParsingException; -import org.apache.qpid.filter.selector.ParseException; -import org.apache.qpid.filter.selector.TokenMgrError; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; @@ -42,14 +36,10 @@ import org.apache.qpid.server.exchange.topic.TopicExchangeResult; import org.apache.qpid.server.exchange.topic.TopicMatcherResult; import org.apache.qpid.server.exchange.topic.TopicNormalizer; import org.apache.qpid.server.exchange.topic.TopicParser; -import org.apache.qpid.server.filter.JMSSelectorFilter; -import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.Filterable; public class TopicExchange extends AbstractExchange { @@ -65,8 +55,6 @@ public class TopicExchange extends AbstractExchange private final Map _bindings = new HashMap(); - private final Map> _selectorCache = new WeakHashMap>(); - public TopicExchange() { super(TYPE); @@ -77,7 +65,7 @@ public class TopicExchange extends AbstractExchange AMQShortString rKey = new AMQShortString(binding.getBindingKey()) ; AMQQueue queue = binding.getQueue(); FieldTable args = FieldTable.convertToFieldTable(binding.getArguments()); - + assert queue != null; assert rKey != null; @@ -91,26 +79,26 @@ public class TopicExchange extends AbstractExchange FieldTable oldArgs = _bindings.get(binding); TopicExchangeResult result = _topicExchangeResults.get(routingKey); - if(argumentsContainFilter(args)) + if(FilterSupport.argumentsContainFilter(args)) { - if(argumentsContainFilter(oldArgs)) + if(FilterSupport.argumentsContainFilter(oldArgs)) { result.replaceQueueFilter(queue, - createMessageFilter(oldArgs, queue), - createMessageFilter(args, queue)); + FilterSupport.createMessageFilter(oldArgs, queue), + FilterSupport.createMessageFilter(args, queue)); } else { - result.addFilteredQueue(queue, createMessageFilter(args, queue)); + result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue)); result.removeUnfilteredQueue(queue); } } else { - if(argumentsContainFilter(oldArgs)) + if(FilterSupport.argumentsContainFilter(oldArgs)) { result.addUnfilteredQueue(queue); - result.removeFilteredQueue(queue, createMessageFilter(oldArgs, queue)); + result.removeFilteredQueue(queue, FilterSupport.createMessageFilter(oldArgs, queue)); } else { @@ -118,7 +106,7 @@ public class TopicExchange extends AbstractExchange return; } } - + result.addBinding(binding); } @@ -129,9 +117,9 @@ public class TopicExchange extends AbstractExchange if(result == null) { result = new TopicExchangeResult(); - if(argumentsContainFilter(args)) + if(FilterSupport.argumentsContainFilter(args)) { - result.addFilteredQueue(queue, createMessageFilter(args, queue)); + result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue)); } else { @@ -142,89 +130,22 @@ public class TopicExchange extends AbstractExchange } else { - if(argumentsContainFilter(args)) + if(FilterSupport.argumentsContainFilter(args)) { - result.addFilteredQueue(queue, createMessageFilter(args, queue)); + result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue)); } else { result.addUnfilteredQueue(queue); } } - + result.addBinding(binding); _bindings.put(binding, args); } } - private MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException - { - if(argumentsContainNoLocal(args)) - { - MessageFilter filter = new NoLocalFilter(queue); - - if(argumentsContainJMSSelector(args)) - { - filter = new CompoundFilter(filter, createJMSSelectorFilter(args)); - } - return filter; - } - else - { - return createJMSSelectorFilter(args); - } - - } - - - private MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException - { - final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()); - WeakReference selectorRef = _selectorCache.get(selectorString); - JMSSelectorFilter selector = null; - - if(selectorRef == null || (selector = selectorRef.get())==null) - { - try - { - selector = new JMSSelectorFilter(selectorString); - } - catch (ParseException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - catch (SelectorParsingException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - catch (TokenMgrError e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - _selectorCache.put(selectorString, new WeakReference(selector)); - } - return selector; - } - - private static boolean argumentsContainFilter(final FieldTable args) - { - return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args); - } - - private static boolean argumentsContainNoLocal(final FieldTable args) - { - return args != null - && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue()) - && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue())); - } - - private static boolean argumentsContainJMSSelector(final FieldTable args) - { - return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) - && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0); - } - public ArrayList doRoute(InboundMessage payload) { @@ -256,87 +177,6 @@ public class TopicExchange extends AbstractExchange } - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - Binding binding = new Binding(null, routingKey.toString(), queue, this, FieldTable.convertToMap(arguments)); - - if (arguments == null) - { - return _bindings.containsKey(binding); - } - else - { - FieldTable o = _bindings.get(binding); - if (o != null) - { - return o.equals(arguments); - } - else - { - return false; - } - - } - } - - public boolean isBound(String bindingKey, Map arguments, AMQQueue queue) - { - Binding binding = new Binding(null, bindingKey, queue, this, arguments); - if (arguments == null) - { - return _bindings.containsKey(binding); - } - else - { - FieldTable o = _bindings.get(binding); - if (o != null) - { - return arguments.equals(FieldTable.convertToMap(o)); - } - else - { - return false; - } - } - - } - - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - return isBound(routingKey, null, queue); - } - - public boolean isBound(AMQShortString routingKey) - { - for(Binding b : _bindings.keySet()) - { - if(b.getBindingKey().equals(routingKey.toString())) - { - return true; - } - } - - return false; - } - - public boolean isBound(AMQQueue queue) - { - for(Binding b : _bindings.keySet()) - { - if(b.getQueue().equals(queue)) - { - return true; - } - } - - return false; - } - - public boolean hasBindings() - { - return !_bindings.isEmpty(); - } - private boolean deregisterQueue(final Binding binding) { if(_bindings.containsKey(binding)) @@ -344,14 +184,15 @@ public class TopicExchange extends AbstractExchange FieldTable bindingArgs = _bindings.remove(binding); AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey())); TopicExchangeResult result = _topicExchangeResults.get(bindingKey); - + result.removeBinding(binding); - - if(argumentsContainFilter(bindingArgs)) + + if(FilterSupport.argumentsContainFilter(bindingArgs)) { try { - result.removeFilteredQueue(binding.getQueue(), createMessageFilter(bindingArgs, binding.getQueue())); + result.removeFilteredQueue(binding.getQueue(), FilterSupport.createMessageFilter(bindingArgs, + binding.getQueue())); } catch (AMQInvalidArgumentException e) { @@ -418,96 +259,4 @@ public class TopicExchange extends AbstractExchange deregisterQueue(binding); } - private static final class NoLocalFilter implements MessageFilter - { - private final AMQQueue _queue; - - public NoLocalFilter(AMQQueue queue) - { - _queue = queue; - } - - public boolean matches(Filterable message) - { - InboundMessage inbound = (InboundMessage) message; - final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession(); - return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound); - - } - - @Override - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - - if (o == null || getClass() != o.getClass()) - { - return false; - } - - NoLocalFilter that = (NoLocalFilter) o; - - return _queue == null ? that._queue == null : _queue.equals(that._queue); - } - - @Override - public int hashCode() - { - return _queue != null ? _queue.hashCode() : 0; - } - } - - private static final class CompoundFilter implements MessageFilter - { - private MessageFilter _noLocalFilter; - private MessageFilter _jmsSelectorFilter; - - public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter) - { - _noLocalFilter = filter; - _jmsSelectorFilter = jmsSelectorFilter; - } - - public boolean matches(Filterable message) - { - return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message); - } - - @Override - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - if (o == null || getClass() != o.getClass()) - { - return false; - } - - CompoundFilter that = (CompoundFilter) o; - - if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null) - { - return false; - } - if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null) - { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0; - result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0); - return result; - } - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index b4eb41684d..2e6a98d81b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -159,9 +159,15 @@ public class ExchangeBoundHandler implements StateAwareMethodListener255) + { + message = message.substring(0,254); + } response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode - new AMQShortString("Queue " + queueName + " not bound with routing key " + - body.getRoutingKey() + " to exchange " + exchangeName)); // replyText + new AMQShortString(message)); // replyText } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index d8d245e255..110c7be50a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -1130,22 +1130,22 @@ public class ServerSessionDelegate extends SessionDelegate if(queueMatched) { - result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue)); + final boolean keyMatched = exchange.isBound(method.getBindingKey(), queue); + result.setKeyNotMatched(!keyMatched); + if(method.hasArguments() && keyMatched) + { + result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), queue)); + } } else { result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); } - if(method.hasArguments()) - { - result.setArgsNotMatched(!exchange.isBound(result.getKeyNotMatched() ? null : method.getBindingKey(), method.getArguments(), queueMatched ? queue : null)); - } - } else if (method.hasArguments()) { - result.setArgsNotMatched(!exchange.isBound(null, method.getArguments(), queueMatched ? queue : null)); + result.setArgsNotMatched(!exchange.isBound(method.getArguments(), queue)); } } @@ -1166,7 +1166,7 @@ public class ServerSessionDelegate extends SessionDelegate { if(method.hasArguments()) { - result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), null)); + result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments())); } result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java deleted file mode 100644 index f4c0fec6c9..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ /dev/null @@ -1,633 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.message.AMQMessage; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MockStoredMessage; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.test.utils.QpidTestCase; - -public class AbstractHeadersExchangeTestBase extends QpidTestCase -{ - private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class); - - private final HeadersExchange exchange = new HeadersExchange(); - private final Set queues = new HashSet(); - private VirtualHost _virtualHost; - private int count; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName()); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_virtualHost != null) - { - _virtualHost.close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - public void testDoNothing() - { - // this is here only to make junit under Eclipse happy - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - protected TestQueue bindDefault(String... bindings) throws AMQException - { - String queueName = "Queue" + (++count); - - return bind(queueName, queueName, getHeadersMap(bindings)); - } - - protected void unbind(TestQueue queue, String... bindings) throws AMQException - { - String queueName = queue.getName(); - exchange.onUnbind(new Binding(null, queueName, queue, exchange, getHeadersMap(bindings))); - } - - protected int getCount() - { - return count; - } - - private TestQueue bind(String key, String queueName, Map args) throws AMQException - { - TestQueue queue = new TestQueue(new AMQShortString(queueName), _virtualHost); - queues.add(queue); - exchange.onBind(new Binding(null, key, queue, exchange, args)); - return queue; - } - - - protected int route(Message m) throws AMQException - { - m.getIncomingMessage().headersReceived(System.currentTimeMillis()); - m.route(exchange); - if(m.getIncomingMessage().allContentReceived()) - { - for(BaseQueue q : m.getIncomingMessage().getDestinationQueues()) - { - q.enqueue(m); - } - } - return m.getIncomingMessage().getDestinationQueues().size(); - } - - protected void routeAndTest(Message m, TestQueue... expected) throws AMQException - { - routeAndTest(m, false, Arrays.asList(expected)); - } - - protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException - { - routeAndTest(m, expectReturn, Arrays.asList(expected)); - } - - protected void routeAndTest(Message m, List expected) throws AMQException - { - routeAndTest(m, false, expected); - } - - protected void routeAndTest(Message m, boolean expectReturn, List expected) throws AMQException - { - int queueCount = route(m); - - for (TestQueue q : queues) - { - if (expected.contains(q)) - { - assertTrue("Expected " + m + " to be delivered to " + q, q.isInQueue(m)); - //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; - } - else - { - assertFalse("Did not expect " + m + " to be delivered to " + q, q.isInQueue(m)); - //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; - } - } - - if(expectReturn) - { - assertEquals("Expected "+m+" to be returned due to manadatory flag, and lack of routing",0, queueCount); - } - - } - - static Map getHeadersMap(String... entries) - { - if(entries == null) - { - return null; - } - - Map headers = new HashMap(); - - for (String s : entries) - { - String[] parts = s.split("=", 2); - headers.put(parts[0], parts.length > 1 ? parts[1] : ""); - } - return headers; - } - - static FieldTable getHeaders(String... entries) - { - FieldTable headers = FieldTableFactory.newFieldTable(); - for (String s : entries) - { - String[] parts = s.split("=", 2); - headers.setObject(parts[0], parts.length > 1 ? parts[1] : ""); - } - return headers; - } - - - static final class MessagePublishInfoImpl implements MessagePublishInfo - { - private AMQShortString _exchange; - private boolean _immediate; - private boolean _mandatory; - private AMQShortString _routingKey; - - public MessagePublishInfoImpl(AMQShortString routingKey) - { - _routingKey = routingKey; - } - - public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, AMQShortString routingKey) - { - _exchange = exchange; - _immediate = immediate; - _mandatory = mandatory; - _routingKey = routingKey; - } - - public AMQShortString getExchange() - { - return _exchange; - } - - public boolean isImmediate() - { - return _immediate; - - } - - public boolean isMandatory() - { - return _mandatory; - } - - public AMQShortString getRoutingKey() - { - return _routingKey; - } - - - public void setExchange(AMQShortString exchange) - { - _exchange = exchange; - } - - public void setImmediate(boolean immediate) - { - _immediate = immediate; - } - - public void setMandatory(boolean mandatory) - { - _mandatory = mandatory; - } - - public void setRoutingKey(AMQShortString routingKey) - { - _routingKey = routingKey; - } - } - - static MessagePublishInfo getPublishRequest(final String id) - { - return new MessagePublishInfoImpl(null, false, false, new AMQShortString(id)); - } - - static ContentHeaderBody getContentHeader(FieldTable headers) - { - ContentHeaderBody header = new ContentHeaderBody(); - header.setProperties(getProperties(headers)); - return header; - } - - static BasicContentHeaderProperties getProperties(FieldTable headers) - { - BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); - properties.setHeaders(headers); - return properties; - } - - static class TestQueue extends SimpleAMQQueue - { - private final List messages = new ArrayList(); - - public String toString() - { - return getNameShortString().toString(); - } - - public TestQueue(AMQShortString name, VirtualHost host) throws AMQException - { - super(UUIDGenerator.generateRandomUUID(), name, false, new AMQShortString("test"), true, false, host, Collections.EMPTY_MAP); - host.getQueueRegistry().registerQueue(this); - } - - - - /** - * We override this method so that the default behaviour, which attempts to use a delivery manager, is - * not invoked. It is unnecessary since for this test we only care to know whether the message was - * sent to the queue; the queue processing logic is not being tested. - * @param msg - * @throws AMQException - */ - @Override - public void enqueue(ServerMessage msg, boolean sync, PostEnqueueAction action) throws AMQException - { - messages.add( new HeadersExchangeTest.Message((AMQMessage) msg)); - final QueueEntry queueEntry = new QueueEntry() - { - - public AMQQueue getQueue() - { - return null; - } - - public AMQMessage getMessage() - { - return null; - } - - public long getSize() - { - return 0; - } - - public boolean getDeliveredToConsumer() - { - return false; - } - - public boolean expired() throws AMQException - { - return false; - } - - public boolean isAvailable() - { - return false; - } - - public boolean isAcquired() - { - return false; - } - - public boolean acquire() - { - return false; - } - - public boolean acquire(Subscription sub) - { - return false; - } - - public boolean delete() - { - return false; - } - - public boolean isDeleted() - { - return false; - } - - public boolean acquiredBySubscription() - { - return false; - } - - public boolean isAcquiredBy(Subscription subscription) - { - return false; - } - - public void release() - { - - } - - public void setRedelivered() - { - - } - - public AMQMessageHeader getMessageHeader() - { - return null; - } - - public boolean isPersistent() - { - return false; - } - - public boolean isRedelivered() - { - return false; - } - - public Subscription getDeliveredSubscription() - { - return null; - } - - public void reject() - { - - } - - public boolean isRejectedBy(long subscriptionId) - { - return false; - } - - public void dequeue() - { - - } - - public void dispose() - { - - } - - public void discard() - { - - } - - public void routeToAlternate() - { - - } - - public boolean isQueueDeleted() - { - return false; - } - - public void addStateChangeListener(StateChangeListener listener) - { - - } - - public boolean removeStateChangeListener(StateChangeListener listener) - { - return false; - } - - public int compareTo(final QueueEntry o) - { - return 0; - } - - public boolean isDequeued() - { - return false; - } - - public boolean isDispensed() - { - return false; - } - - public QueueEntry getNextNode() - { - return null; - } - - public QueueEntry getNextValidEntry() - { - return null; - } - - public int getDeliveryCount() - { - return 0; - } - - public void incrementDeliveryCount() - { - } - - public void decrementDeliveryCount() - { - } - }; - - if(action != null) - { - action.onEnqueue(queueEntry); - } - - } - - boolean isInQueue(Message msg) - { - return messages.contains(msg); - } - - } - - /** - * Just add some extra utility methods to AMQMessage to aid testing. - */ - static class Message extends AMQMessage - { - private static AtomicLong _messageId = new AtomicLong(); - - private class TestIncomingMessage extends IncomingMessage - { - - public TestIncomingMessage(final long messageId, - final MessagePublishInfo info, - final AMQProtocolSession publisher) - { - super(info); - } - - - public ContentHeaderBody getContentHeader() - { - return Message.this.getContentHeaderBody(); - } - } - - private IncomingMessage _incoming; - - - Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException - { - this(protocolSession, id, getHeaders(headers)); - } - - Message(AMQProtocolSession protocolSession, String id, FieldTable headers) throws AMQException - { - this(protocolSession, _messageId.incrementAndGet(),getPublishRequest(id), getContentHeader(headers), Collections.EMPTY_LIST); - } - - public IncomingMessage getIncomingMessage() - { - return _incoming; - } - - private Message(AMQProtocolSession protocolsession, long messageId, - MessagePublishInfo publish, - ContentHeaderBody header, - List bodies) throws AMQException - { - super(new MockStoredMessage(messageId, publish, header)); - - StoredMessage storedMessage = getStoredMessage(); - - int pos = 0; - for(ContentBody body : bodies) - { - storedMessage.addContent(pos, ByteBuffer.wrap(body.getPayload())); - pos += body.getPayload().length; - } - - _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession); - _incoming.setContentHeaderBody(header); - - - } - - - private Message(AMQMessage msg) throws AMQException - { - super(msg.getStoredMessage()); - } - - - - void route(Exchange exchange) throws AMQException - { - _incoming.enqueue(exchange.route(_incoming)); - } - - - public int hashCode() - { - return getKey().hashCode(); - } - - public boolean equals(Object o) - { - return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o); - } - - private boolean equals(HeadersExchangeTest.Message m) - { - return getKey().equals(m.getKey()); - } - - public String toString() - { - return getKey().toString(); - } - - private Object getKey() - { - try - { - return getMessagePublishInfo().getRoutingKey(); - } - catch (AMQException e) - { - _log.error("Error getting routing key: " + e, e); - return null; - } - } - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java index 2ddb417d5d..7b7e2ec346 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java @@ -21,22 +21,32 @@ package org.apache.qpid.server.exchange; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anySet; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Collections; +import java.util.List; +import java.util.Set; import java.util.UUID; import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class FanoutExchangeTest extends TestCase { @@ -51,7 +61,9 @@ public class FanoutExchangeTest extends TestCase _virtualHost = mock(VirtualHost.class); SecurityManager securityManager = mock(SecurityManager.class); when(_virtualHost.getSecurityManager()).thenReturn(securityManager); - when(securityManager.authoriseBind(any(Exchange.class),any(AMQQueue.class),any(AMQShortString.class))).thenReturn(true); + when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true); + when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true); + _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false); } @@ -76,14 +88,14 @@ public class FanoutExchangeTest extends TestCase { AMQQueue queue = bindQueue(); assertTrue("Should return true for a bound queue", - _exchange.isBound((AMQShortString) null, (FieldTable) null, queue)); + _exchange.isBound(new AMQShortString("matters"), (FieldTable) null, queue)); } public void testIsBoundAMQShortStringAMQQueue() throws AMQSecurityException, AMQInternalException { AMQQueue queue = bindQueue(); assertTrue("Should return true for a bound queue", - _exchange.isBound((AMQShortString) null, queue)); + _exchange.isBound(new AMQShortString("matters"), queue)); } public void testIsBoundAMQQueue() throws AMQSecurityException, AMQInternalException @@ -94,10 +106,87 @@ public class FanoutExchangeTest extends TestCase } private AMQQueue bindQueue() throws AMQSecurityException, AMQInternalException + { + AMQQueue queue = mockQueue(); + _exchange.addBinding("matters", queue, null); + return queue; + } + + private AMQQueue mockQueue() { AMQQueue queue = mock(AMQQueue.class); when(queue.getVirtualHost()).thenReturn(_virtualHost); - _exchange.addBinding("does not matter", queue, null); return queue; } + + public void testRoutingWithSelectors() throws Exception + { + AMQQueue queue1 = mockQueue(); + AMQQueue queue2 = mockQueue(); + + _exchange.addBinding("key",queue1, null); + _exchange.addBinding("key",queue2, null); + + + List result = _exchange.route(mockMessage(true)); + + assertEquals("Expected message to be routed to both queues", 2, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertTrue("Expected queue2 to be routed to", result.contains(queue2)); + + _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True")); + + + result = _exchange.route(mockMessage(true)); + + assertEquals("Expected message to be routed to both queues", 2, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertTrue("Expected queue2 to be routed to", result.contains(queue2)); + + _exchange.removeBinding("key",queue2,null); + + result = _exchange.route(mockMessage(true)); + + assertEquals("Expected message to be routed to both queues", 2, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertTrue("Expected queue2 to be routed to", result.contains(queue2)); + + + result = _exchange.route(mockMessage(false)); + + assertEquals("Expected message to be routed to queue1 only", 1, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertFalse("Expected queue2 not to be routed to", result.contains(queue2)); + + _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False")); + + + result = _exchange.route(mockMessage(false)); + assertEquals("Expected message to be routed to both queues", 2, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertTrue("Expected queue2 to be routed to", result.contains(queue2)); + + + } + + private InboundMessage mockMessage(boolean val) + { + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.containsHeader("select")).thenReturn(true); + when(header.getHeader("select")).thenReturn(val); + when(header.getHeaderNames()).thenReturn(Collections.singleton("select")); + when(header.containsHeaders(anySet())).then(new Answer() + { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + final Set names = (Set) invocation.getArguments()[0]; + return names.size() == 1 && names.contains("select"); + + } + }); + final InboundMessage inboundMessage = mock(InboundMessage.class); + when(inboundMessage.getMessageHeader()).thenReturn(header); + return inboundMessage; + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index bd6a02d69b..2b965358e0 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -20,106 +20,230 @@ */ package org.apache.qpid.server.exchange; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.InternalTestProtocolSession; -import org.apache.qpid.server.util.BrokerTestHelper; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import junit.framework.TestCase; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; -public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anySet; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class HeadersExchangeTest extends TestCase { - private AMQProtocolSession _protocolSession; + private HeadersExchange _exchange; + private VirtualHost _virtualHost; @Override public void setUp() throws Exception { super.setUp(); - BrokerTestHelper.setUp(); - _protocolSession = new InternalTestProtocolSession(getVirtualHost(), BrokerTestHelper.createBrokerMock()); + + CurrentActor.setDefault(mock(LogActor.class)); + _exchange = new HeadersExchange(); + _virtualHost = mock(VirtualHost.class); + SecurityManager securityManager = mock(SecurityManager.class); + when(_virtualHost.getSecurityManager()).thenReturn(securityManager); + when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true); + when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true); + + _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false); + } - @Override - public void tearDown() throws Exception + protected void routeAndTest(InboundMessage msg, AMQQueue... expected) throws Exception { - BrokerTestHelper.tearDown(); - super.tearDown(); + List results = _exchange.route(msg); + List unexpected = new ArrayList(results); + unexpected.removeAll(Arrays.asList(expected)); + assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty()); + List missing = new ArrayList(Arrays.asList(expected)); + missing.removeAll(results); + assertTrue("Message not delivered to expected queues: " + missing, missing.isEmpty()); + assertTrue("Duplicates " + results, results.size()==(new HashSet(results)).size()); } - public void testSimple() throws AMQException + + private AMQQueue createAndBind(final String name, String... arguments) + throws Exception { - TestQueue q1 = bindDefault("F0000"); - TestQueue q2 = bindDefault("F0000=Aardvark"); - TestQueue q3 = bindDefault("F0001"); - TestQueue q4 = bindDefault("F0001=Bear"); - TestQueue q5 = bindDefault("F0000", "F0001"); - TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear"); - TestQueue q7 = bindDefault("F0000", "F0001=Bear"); - TestQueue q8 = bindDefault("F0000=Aardvark", "F0001"); - - routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1); - routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2); - routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8); - routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7); - routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"), - q1, q2, q3, q4, q5, q6, q7, q8); - routeAndTest(new Message(_protocolSession, "Message6", "F0002")); - - Message m7 = new Message(_protocolSession, "Message7", "XXXXX"); - - MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo()); - pb7.setMandatory(true); - routeAndTest(m7,true); - - Message m8 = new Message(_protocolSession, "Message8", "F0000"); - MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo()); - pb8.setMandatory(true); - routeAndTest(m8,false,q1); + return createAndBind(name, getArgsMapFromStrings(arguments)); + } + + private Map getArgsMapFromStrings(String... arguments) + { + Map map = new HashMap(); + + for(String arg : arguments) + { + if(arg.contains("=")) + { + String[] keyValue = arg.split("=",2); + map.put(keyValue[0],keyValue[1]); + } + else + { + map.put(arg,null); + } + } + return map; + } + private AMQQueue createAndBind(final String name, Map arguments) + throws Exception + { + AMQQueue q = create(name); + bind(name, arguments, q); + return q; + } + private void bind(String bindingKey, Map arguments, AMQQueue q) + throws AMQSecurityException, AMQInternalException + { + _exchange.addBinding(bindingKey,q,arguments); } - public void testAny() throws AMQException + private AMQQueue create(String name) { - TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any"); - TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any"); - TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any"); - TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any"); - TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any"); - - routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1, q3); - routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2, q3, q4); - routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6); - routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6); - routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6); - routeAndTest(new Message(_protocolSession, "Message6", "F0002")); + AMQQueue q = mock(AMQQueue.class); + when(q.toString()).thenReturn(name); + when(q.getVirtualHost()).thenReturn(_virtualHost); + return q; } - public void testMandatory() throws AMQException + + public void testSimple() throws Exception { - bindDefault("F0000"); - Message m1 = new Message(_protocolSession, "Message1", "XXXXX"); - Message m2 = new Message(_protocolSession, "Message2", "F0000"); - MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo()); - pb1.setMandatory(true); - MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo()); - pb2.setMandatory(true); - routeAndTest(m1,true); + AMQQueue q1 = createAndBind("Q1", "F0000"); + AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark"); + AMQQueue q3 = createAndBind("Q3", "F0001"); + AMQQueue q4 = createAndBind("Q4", "F0001=Bear"); + AMQQueue q5 = createAndBind("Q5", "F0000", "F0001"); + AMQQueue q6 = createAndBind("Q6", "F0000=Aardvark", "F0001=Bear"); + AMQQueue q7 = createAndBind("Q7", "F0000", "F0001=Bear"); + AMQQueue q8 = createAndBind("Q8", "F0000=Aardvark", "F0001"); + + routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q5, q8); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q3, q4, q5, q7); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")), + q1, q2, q3, q4, q5, q6, q7, q8); + routeAndTest(mockMessage(getArgsMapFromStrings("F0002"))); + } - - public void testOnUnbind() throws AMQException + + public void testAny() throws Exception { - TestQueue q1 = bindDefault("F0000"); - TestQueue q2 = bindDefault("F0000=Aardvark"); - TestQueue q3 = bindDefault("F0001"); - - routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1); - routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2); - routeAndTest(new Message(_protocolSession, "Message3", "F0001"), q3); - - unbind(q1,"F0000"); - routeAndTest(new Message(_protocolSession, "Message4", "F0000")); - routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark"), q2); + AMQQueue q1 = createAndBind("Q1", "F0000", "F0001", "X-match=any"); + AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark", "F0001=Bear", "X-match=any"); + AMQQueue q3 = createAndBind("Q3", "F0000", "F0001=Bear", "X-match=any"); + AMQQueue q4 = createAndBind("Q4", "F0000=Aardvark", "F0001", "X-match=any"); + AMQQueue q5 = createAndBind("Q5", "F0000=Apple", "F0001", "X-match=any"); + + routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1, q3); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2, q3, q4); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q4, q5); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q2, q3, q4, q5); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")), q1, q2, q3, q4, q5); + routeAndTest(mockMessage(getArgsMapFromStrings("F0002"))); + } + + public void testOnUnbind() throws Exception + { + AMQQueue q1 = createAndBind("Q1", "F0000"); + AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark"); + AMQQueue q3 = createAndBind("Q3", "F0001"); + + routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2); + routeAndTest(mockMessage(getArgsMapFromStrings("F0001")), q3); + + _exchange.removeBinding("Q1",q1,getArgsMapFromStrings("F0000")); + + routeAndTest(mockMessage(getArgsMapFromStrings("F0000"))); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q2); + } + + + public void testWithSelectors() throws Exception + { + AMQQueue q1 = create("Q1"); + AMQQueue q2 = create("Q2"); + bind("q1",getArgsMapFromStrings("F"), q1); + bind("q1select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q1); + bind("q2",getArgsMapFromStrings("F=1"), q2); + + routeAndTest(mockMessage(getArgsMapFromStrings("F")),q1); + + routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2); + + + AMQQueue q3 = create("Q3"); + bind("q3select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q3); + routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2,q3); + routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1); + bind("q3select2",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='2'"), q3); + + routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1,q3); + + } + + private InboundMessage mockMessage(final Map headerValues) + { + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.containsHeader(anyString())).then(new Answer() + { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable + { + return headerValues.containsKey((String) invocation.getArguments()[0]); + } + }); + when(header.getHeader(anyString())).then(new Answer() + { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + return headerValues.get((String) invocation.getArguments()[0]); + } + }); + when(header.getHeaderNames()).thenReturn(headerValues.keySet()); + when(header.containsHeaders(anySet())).then(new Answer() + { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable + { + final Set names = (Set) invocation.getArguments()[0]; + return headerValues.keySet().containsAll(names); + + } + }); + final InboundMessage inboundMessage = mock(InboundMessage.class); + when(inboundMessage.getMessageHeader()).thenReturn(header); + return inboundMessage; } - public static junit.framework.Test suite() { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java index b9e9a33cd6..922cc1e2a7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java @@ -31,7 +31,7 @@ public class AMQHeadersExchange extends AMQDestination { public AMQHeadersExchange(BindingURL binding) { - this(binding.getExchangeName()); + super(binding); } public AMQHeadersExchange(String name) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index e784e903fa..018a1ec851 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -440,7 +440,7 @@ public abstract class AMQSession subscriber = _subscriptions.get(name); - + // Not subscribed to this name in the current session if (subscriber == null) { // After the address is resolved routing key will not be null. AMQShortString topicName = dest.getRoutingKey(); - + if (_strictAMQP) { if (_strictAMQPFATAL) @@ -1046,8 +1046,8 @@ public abstract class AMQSession args = new HashMap(); - - // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a + + // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise // possible to determine when querying the broker whether there are no arguments or just a non-matching selector // argument, as specifying null for the arguments when querying means they should not be checked at all @@ -1060,16 +1060,28 @@ public abstract class AMQSession subscriber; - + _subscriberDetails.lock(); try { @@ -1896,11 +1916,11 @@ public abstract class AMQSession args) throws JMSException; /** @@ -2844,14 +2865,19 @@ public abstract class AMQSession bindings = new ArrayList(); bindings.addAll(destination.getNode().getBindings()); - + String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? destination.getAddressName(): "amq.topic"; - + for (Binding binding: bindings) { // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link. @@ -386,22 +386,22 @@ public class AMQSession_0_10 extends AMQSession0) { rk = bindingKeys[0].toString(); @@ -583,10 +581,10 @@ public class AMQSession_0_10 extends AMQSession args) { boolean res; @@ -598,21 +596,27 @@ public class AMQSession_0_10 extends AMQSession arguments = new HashMap(); if (noLocal) - { + { arguments.put(AddressHelper.NO_LOCAL, true); - } + } getQpidSession().queueDeclare(queueName.toString(), "" , arguments, amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, @@ -790,7 +794,7 @@ public class AMQSession_0_10 extends AMQSession false @@ -837,7 +841,7 @@ public class AMQSession_0_10 extends AMQSession target,Map source) { boolean match = true; for (String key: source.keySet()) { - match = target.containsKey(key) && + match = target.containsKey(key) && target.get(key).equals(source.get(key)); - - if (!match) - { + + if (!match) + { StringBuffer buf = new StringBuffer(); buf.append("Property given in address did not match with the args sent by the broker."); buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, "); @@ -1184,22 +1188,22 @@ public class AMQSession_0_10 extends AMQSession tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); getPrefetchedMessageTags().addAll(tags); } - + RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags()); RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags()); RangeSet all = RangeSetFactory.createRangeSet(delivered.size() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 3097b33da3..9a9da62f2a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -184,7 +184,7 @@ public class AMQSession_0_8 extends AMQSession( + new FailoverProtectedOperation() + { + public AMQMethodEvent execute() throws AMQException, FailoverException + { + return sendExchangeBound(exchangeName, routingKey, queueName); + + } + }, getAMQConnection()).execute(); + + // Extract and return the response code from the query. + ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); + + return (responseBody.getReplyCode() == 0); + } + + private AMQMethodEvent sendExchangeBound(AMQShortString exchangeName, + AMQShortString routingKey, + AMQShortString queueName) throws AMQException, FailoverException + { + AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody + (exchangeName, routingKey, queueName).generateFrame(getChannelId()); + + return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + } @Override public void sendConsume(BasicMessageConsumer_0_8 consumer, @@ -527,7 +555,7 @@ public class AMQSession_0_8 extends AMQSession args = new HashMap(); args.put("x-match","any"); args.put("dep","sales"); args.put("loc","CA"); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + (AMQSession_0_10)jmsSession).isQueueBound("amq.match", dest.getAddressName(),null, args)); - + MessageProducer prod = jmsSession.createProducer(dest); prod.send(jmsSession.createTextMessage("test")); - + MessageConsumer cons2 = jmsSession.createConsumer(jmsSession.createQueue("ADDR:my-queue")); Message m = cons2.receive(1000); assertNotNull("Should receive message sent to my-queue",m); assertEquals("The subject set in the message is incorrect","hello",m.getStringProperty(QpidMessageProperties.QPID_SUBJECT)); } - + public void testCreateExchange() throws Exception { createExchangeImpl(false, false); @@ -283,21 +283,21 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - String addr = "ADDR:my-exchange/hello; " + - "{ " + - "create: always, " + - "node: " + + String addr = "ADDR:my-exchange/hello; " + + "{ " + + "create: always, " + + "node: " + "{" + "type: topic, " + "x-declare: " + - "{ " + - "type:direct, " + + "{ " + + "type:direct, " + "auto-delete: true" + createExchangeArgsString(withExchangeArgs, useNonsenseArguments) + "}" + "}" + "}"; - + AMQDestination dest = new AMQAnyDestination(addr); MessageConsumer cons; @@ -322,20 +322,20 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase fail("Unexpected exception whilst creating consumer: " + e); } } - + assertTrue("Exchange not created as expected",( (AMQSession_0_10)jmsSession).isExchangeExist(dest,true)); - + // The existence of the queue is implicitly tested here assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("my-exchange", - dest.getQueueName(),"hello", Collections.emptyMap())); - + (AMQSession_0_10)jmsSession).isQueueBound("my-exchange", + dest.getQueueName(),"hello", null)); + // The client should be able to query and verify the existence of my-exchange (QPID-2774) dest = new AMQAnyDestination("ADDR:my-exchange; {create: never}"); - cons = jmsSession.createConsumer(dest); + cons = jmsSession.createConsumer(dest); } - + private String createExchangeArgsString(final boolean withExchangeArgs, final boolean useNonsenseArguments) { @@ -366,60 +366,60 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { assertTrue("Queue not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", - dest.getAddressName(),"test", null)); - + (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", + dest.getAddressName(),"test", null)); + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", + (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", dest.getAddressName(),"a.#", null)); - + Address a = Address.parse(headersBinding); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + (AMQSession_0_10)jmsSession).isQueueBound("amq.match", dest.getAddressName(),null, a.getOptions())); } - + /** * Test goal: Verifies that a producer and consumer creation triggers the correct * behavior for x-bindings specified in node props. */ public void testBindQueueWithArgs() throws Exception { - + Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}"; - - String addr = "node: " + - "{" + + + String addr = "node: " + + "{" + "durable: true ," + - "x-declare: " + - "{ " + + "x-declare: " + + "{ " + "auto-delete: true," + "arguments: {'qpid.max_count': 100}" + "}, " + "x-bindings: [{exchange : 'amq.direct', key : test}, " + - "{exchange : 'amq.topic', key : 'a.#'}," + - headersBinding + + "{exchange : 'amq.topic', key : 'a.#'}," + + headersBinding + "]" + "}" + "}"; - + AMQDestination dest1 = new AMQAnyDestination("ADDR:my-queue/hello; {create: receiver, " +addr); - MessageConsumer cons = jmsSession.createConsumer(dest1); - checkQueueForBindings(jmsSession,dest1,headersBinding); - + MessageConsumer cons = jmsSession.createConsumer(dest1); + checkQueueForBindings(jmsSession,dest1,headersBinding); + AMQDestination dest2 = new AMQAnyDestination("ADDR:my-queue2/hello; {create: sender, " +addr); - MessageProducer prod = jmsSession.createProducer(dest2); - checkQueueForBindings(jmsSession,dest2,headersBinding); + MessageProducer prod = jmsSession.createProducer(dest2); + checkQueueForBindings(jmsSession,dest2,headersBinding); } - + /** * Test goal: Verifies the capacity property in address string is handled properly. * Test strategy: @@ -427,22 +427,22 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase * Creates consumer with client ack. * Sends 15 messages to the queue, tries to receive 10. * Tries to receive the 11th message and checks if its null. - * - * Since capacity is 10 and we haven't acked any messages, + * + * Since capacity is 10 and we haven't acked any messages, * we should not have received the 11th. - * + * * Acks the 10th message and verifies we receive the rest of the msgs. */ public void testCapacity() throws Exception { verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: 10}}"); } - + public void testSourceAndTargetCapacity() throws Exception { verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: {source:10, target:15} }}"); } - + private void verifyCapacity(String address) throws Exception { if (!isCppBroker()) @@ -450,13 +450,13 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase _logger.info("Not C++ broker, exiting test"); return; } - + Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - + AMQDestination dest = new AMQAnyDestination(address); - MessageConsumer cons = jmsSession.createConsumer(dest); + MessageConsumer cons = jmsSession.createConsumer(dest); MessageProducer prod = jmsSession.createProducer(dest); - + for (int i=0; i< 15; i++) { prod.send(jmsSession.createTextMessage("msg" + i) ); @@ -475,48 +475,48 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull("Should have received the " + i + "th message as we acked the last 10",cons.receive(RECEIVE_TIMEOUT)); } } - + /** * Test goal: Verifies if the new address format based destinations * can be specified and loaded correctly from the properties file. - * + * */ public void testLoadingFromPropertiesFile() throws Exception { - Hashtable map = new Hashtable(); - map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " + + Hashtable map = new Hashtable(); + map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " + "{x-declare: {auto-delete: true, arguments : {'qpid.max_size': 1000}}}}"); - + map.put("destination.myQueue2", "ADDR:my-queue2; { create: receiver }"); map.put("destination.myQueue3", "BURL:direct://amq.direct/my-queue3?routingkey='test'"); - + PropertiesFileInitialContextFactory props = new PropertiesFileInitialContextFactory(); Context ctx = props.getInitialContext(map); - - AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1"); + + AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1"); AMQDestination dest2 = (AMQDestination)ctx.lookup("myQueue2"); AMQDestination dest3 = (AMQDestination)ctx.lookup("myQueue3"); - + Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - MessageConsumer cons1 = jmsSession.createConsumer(dest1); + MessageConsumer cons1 = jmsSession.createConsumer(dest1); MessageConsumer cons2 = jmsSession.createConsumer(dest2); MessageConsumer cons3 = jmsSession.createConsumer(dest3); - + assertTrue("Destination1 was not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest1, true)); - + assertTrue("Destination1 was not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest1.getAddressName(),dest1.getAddressName(), null)); - + assertTrue("Destination2 was not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest2,true)); - + assertTrue("Destination2 was not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest2.getAddressName(),dest2.getAddressName(), null)); - + MessageProducer producer = jmsSession.createProducer(dest3); producer.send(jmsSession.createTextMessage("Hello")); TextMessage msg = (TextMessage)cons3.receive(1000); @@ -527,31 +527,31 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase * Test goal: Verifies the subject can be overridden using "qpid.subject" message property. * Test strategy: Creates and address with a default subject "topic1" * Creates a message with "qpid.subject"="topic2" and sends it. - * Verifies that the message goes to "topic2" instead of "topic1". + * Verifies that the message goes to "topic2" instead of "topic1". */ public void testOverridingSubject() throws Exception { Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - + AMQDestination topic1 = new AMQAnyDestination("ADDR:amq.topic/topic1; {link:{name: queue1}}"); - + MessageProducer prod = jmsSession.createProducer(topic1); - + Message m = jmsSession.createTextMessage("Hello"); m.setStringProperty("qpid.subject", "topic2"); - + MessageConsumer consForTopic1 = jmsSession.createConsumer(topic1); MessageConsumer consForTopic2 = jmsSession.createConsumer(new AMQAnyDestination("ADDR:amq.topic/topic2; {link:{name: queue2}}")); - + prod.send(m); Message msg = consForTopic1.receive(1000); assertNull("message shouldn't have been sent to topic1",msg); - + msg = consForTopic2.receive(1000); - assertNotNull("message should have been sent to topic2",msg); - + assertNotNull("message should have been sent to topic2",msg); + } - + /** * Test goal: Verifies that session.createQueue method * works as expected both with the new and old addressing scheme. @@ -559,19 +559,19 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testSessionCreateQueue() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + // Using the BURL method Destination queue = ssn.createQueue("my-queue"); - MessageProducer prod = ssn.createProducer(queue); + MessageProducer prod = ssn.createProducer(queue); MessageConsumer cons = ssn.createConsumer(queue); assertTrue("my-queue was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("amq.direct", + (AMQSession_0_10)ssn).isQueueBound("amq.direct", "my-queue","my-queue", null)); - + prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - + // Using the ADDR method // default case queue = ssn.createQueue("ADDR:my-queue2"); @@ -586,34 +586,34 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "doesn't resolve to an exchange or a queue"; assertEquals(s,e.getCause().getCause().getMessage()); } - + // explicit create case queue = ssn.createQueue("ADDR:my-queue2; {create: sender}"); - prod = ssn.createProducer(queue); + prod = ssn.createProducer(queue); cons = ssn.createConsumer(queue); assertTrue("my-queue2 was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("", + (AMQSession_0_10)ssn).isQueueBound("", "my-queue2","my-queue2", null)); - + prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - + // Using the ADDR method to create a more complicated queue String addr = "ADDR:amq.direct/x512; {" + - "link : {name : 'MY.RESP.QUEUE', " + + "link : {name : 'MY.RESP.QUEUE', " + "x-declare : { auto-delete: true, exclusive: true, " + "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } } }"; queue = ssn.createQueue(addr); - + cons = ssn.createConsumer(queue); prod = ssn.createProducer(queue); assertTrue("MY.RESP.QUEUE was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("amq.direct", + (AMQSession_0_10)ssn).isQueueBound("amq.direct", "MY.RESP.QUEUE","x512", null)); cons.close(); } - + /** * Test goal: Verifies that session.creatTopic method works as expected * both with the new and old addressing scheme. @@ -635,68 +635,68 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase private void sessionCreateTopicImpl(boolean withExchangeArgs) throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + // Using the BURL method Topic topic = ssn.createTopic("ACME"); - MessageProducer prod = ssn.createProducer(topic); + MessageProducer prod = ssn.createProducer(topic); MessageConsumer cons = ssn.createConsumer(topic); - + prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - + // Using the ADDR method topic = ssn.createTopic("ADDR:ACME"); - prod = ssn.createProducer(topic); + prod = ssn.createProducer(topic); cons = ssn.createConsumer(topic); - + prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - String addr = "ADDR:vehicles/bus; " + - "{ " + - "create: always, " + - "node: " + + String addr = "ADDR:vehicles/bus; " + + "{ " + + "create: always, " + + "node: " + "{" + "type: topic, " + "x-declare: " + - "{ " + - "type:direct, " + + "{ " + + "type:direct, " + "auto-delete: true" + createExchangeArgsString(withExchangeArgs, false) + "}" + "}, " + "link: {name : my-topic, " + "x-bindings: [{exchange : 'vehicles', key : car}, " + - "{exchange : 'vehicles', key : van}]" + - "}" + + "{exchange : 'vehicles', key : van}]" + + "}" + "}"; - + // Using the ADDR method to create a more complicated topic topic = ssn.createTopic(addr); cons = ssn.createConsumer(topic); prod = ssn.createProducer(topic); - + assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession_0_10)ssn).isQueueBound("vehicles", "my-topic","bus", null)); - + assertTrue("The queue was not bound to vehicle exchange using car as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession_0_10)ssn).isQueueBound("vehicles", "my-topic","car", null)); - + assertTrue("The queue was not bound to vehicle exchange using van as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession_0_10)ssn).isQueueBound("vehicles", "my-topic","van", null)); - + Message msg = ssn.createTextMessage("test"); msg.setStringProperty("qpid.subject", "van"); prod.send(msg); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); } - + /** * Test Goal : Verify the default subjects used for each exchange type. * The default for amq.topic is "#" and for the rest it's "" @@ -704,92 +704,92 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testDefaultSubjects() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + MessageConsumer queueCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.direct")); MessageConsumer topicCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.topic")); - + MessageProducer queueProducer = ssn.createProducer(new AMQAnyDestination("ADDR:amq.direct")); MessageProducer topicProducer1 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/usa.weather")); MessageProducer topicProducer2 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/sales")); - + queueProducer.send(ssn.createBytesMessage()); assertNotNull("The consumer subscribed to amq.direct " + "with empty binding key should have received the message ",queueCons.receive(1000)); - + topicProducer1.send(ssn.createTextMessage("25c")); assertEquals("The consumer subscribed to amq.topic " + "with '#' binding key should have received the message ", ((TextMessage)topicCons.receive(1000)).getText(),"25c"); - + topicProducer2.send(ssn.createTextMessage("1000")); assertEquals("The consumer subscribed to amq.topic " + "with '#' binding key should have received the message ", ((TextMessage)topicCons.receive(1000)).getText(),"1000"); } - + /** * Test Goal : Verify that 'mode : browse' works as expected using a regular consumer. * This indirectly tests ring queues as well. */ public void testBrowseMode() throws Exception { - + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + String addr = "ADDR:my-ring-queue; {create: always, mode: browse, " + "node: {x-bindings: [{exchange : 'amq.direct', key : test}], " + "x-declare:{arguments : {'qpid.policy_type':ring, 'qpid.max_count':2}}}}"; - + Destination dest = ssn.createQueue(addr); MessageConsumer browseCons = ssn.createConsumer(dest); MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test")); - + prod.send(ssn.createTextMessage("Test1")); prod.send(ssn.createTextMessage("Test2")); - + TextMessage msg = (TextMessage)browseCons.receive(1000); assertEquals("Didn't receive the first message",msg.getText(),"Test1"); - + msg = (TextMessage)browseCons.receive(1000); assertEquals("Didn't receive the first message",msg.getText(),"Test2"); - - browseCons.close(); + + browseCons.close(); prod.send(ssn.createTextMessage("Test3")); browseCons = ssn.createConsumer(dest); - + msg = (TextMessage)browseCons.receive(1000); assertEquals("Should receive the second message again",msg.getText(),"Test2"); - + msg = (TextMessage)browseCons.receive(1000); assertEquals("Should receive the third message since it's a ring queue",msg.getText(),"Test3"); - + assertNull("Should not receive anymore messages",browseCons.receive(500)); } - + /** * Test Goal : When the same destination is used when creating two consumers, - * If the type == topic, verify that unique subscription queues are created, + * If the type == topic, verify that unique subscription queues are created, * unless subscription queue has a name. - * + * * If the type == queue, same queue should be shared. */ public void testSubscriptionForSameDestination() throws Exception { - Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Destination dest = ssn.createTopic("ADDR:amq.topic/foo"); MessageConsumer consumer1 = ssn.createConsumer(dest); MessageConsumer consumer2 = ssn.createConsumer(dest); MessageProducer prod = ssn.createProducer(dest); - + prod.send(ssn.createTextMessage("A")); TextMessage m = (TextMessage)consumer1.receive(1000); assertEquals("Consumer1 should recieve message A",m.getText(),"A"); m = (TextMessage)consumer2.receive(1000); assertEquals("Consumer2 should recieve message A",m.getText(),"A"); - + consumer1.close(); consumer2.close(); - + dest = ssn.createTopic("ADDR:amq.topic/foo; { link: {name: my-queue}}"); consumer1 = ssn.createConsumer(dest); try @@ -798,61 +798,61 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber"); } catch(Exception e) - { + { } _connection.close(); - + _connection = getConnection() ; _connection.start(); - ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); dest = ssn.createTopic("ADDR:my_queue; {create: always}"); consumer1 = ssn.createConsumer(dest); consumer2 = ssn.createConsumer(dest); prod = ssn.createProducer(dest); - + prod.send(ssn.createTextMessage("A")); - Message m1 = consumer1.receive(1000); + Message m1 = consumer1.receive(1000); Message m2 = consumer2.receive(1000); - + if (m1 != null) { - assertNull("Only one consumer should receive the message",m2); + assertNull("Only one consumer should receive the message",m2); } else { - assertNotNull("Only one consumer should receive the message",m2); + assertNotNull("Only one consumer should receive the message",m2); } } - + public void testXBindingsWithoutExchangeName() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); String addr = "ADDR:MRKT; " + "{" + - "create: receiver," + + "create: receiver," + "node : {type: topic, x-declare: {type: topic} }," + "link:{" + "name: my-topic," + "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]" + "}" + "}"; - + // Using the ADDR method to create a more complicated topic Topic topic = ssn.createTopic(addr); MessageConsumer cons = ssn.createConsumer(topic); - + assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession_0_10)ssn).isQueueBound("MRKT", "my-topic","NYSE.#", null)); - + assertTrue("The queue was not bound to MRKT exchange using NASDAQ.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession_0_10)ssn).isQueueBound("MRKT", "my-topic","NASDAQ.#", null)); - + assertTrue("The queue was not bound to MRKT exchange using CNTL.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession_0_10)ssn).isQueueBound("MRKT", "my-topic","CNTL.#", null)); - + MessageProducer prod = ssn.createProducer(topic); Message msg = ssn.createTextMessage("test"); msg.setStringProperty("qpid.subject", "NASDAQ.ABCD"); @@ -860,7 +860,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); } - + public void testXSubscribeOverrides() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); @@ -873,41 +873,41 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber"); } catch(Exception e) - { + { } } - + public void testQueueReceiversAndTopicSubscriber() throws Exception { Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}"); Topic topic = new AMQAnyDestination("ADDR:amq.topic/test"); - + QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); QueueReceiver receiver = qSession.createReceiver(queue); - + TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber sub = tSession.createSubscriber(topic); - + Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue")); prod1.send(ssn.createTextMessage("test1")); - + MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test")); prod2.send(ssn.createTextMessage("test2")); - + Message msg1 = receiver.receive(); assertNotNull(msg1); assertEquals("test1",((TextMessage)msg1).getText()); - + Message msg2 = sub.receive(); assertNotNull(msg2); - assertEquals("test2",((TextMessage)msg2).getText()); + assertEquals("test2",((TextMessage)msg2).getText()); } - + public void testDurableSubscriber() throws Exception { - Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + String bindingStr = "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}"; Properties props = new Properties(); @@ -916,19 +916,19 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase props.setProperty("destination.address2", "ADDR:amq.topic/test; {node:{" + bindingStr); props.setProperty("destination.address3", "ADDR:amq.topic/test; {link:{" + bindingStr); String addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; - props.setProperty("destination.address5", addrStr); - - Context ctx = new InitialContext(props); + props.setProperty("destination.address5", addrStr); + + Context ctx = new InitialContext(props); for (int i=1; i < 4; i++) { Topic topic = (Topic) ctx.lookup("address"+i); createDurableSubscriber(ctx,ssn,"address"+i,topic,"ADDR:amq.topic/test"); } - + Topic topic = ssn.createTopic("ADDR:news.us"); createDurableSubscriber(ctx,ssn,"my-dest",topic,"ADDR:news.us"); - + Topic namedQueue = (Topic) ctx.lookup("address5"); try { @@ -1001,10 +1001,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic, String producerAddr) throws Exception - { + { MessageConsumer cons = ssn.createDurableSubscriber(topic, destName); MessageProducer prod = ssn.createProducer(ssn.createTopic(producerAddr)); - + Message m = ssn.createTextMessage(destName); prod.send(m); Message msg = cons.receive(1000); @@ -1012,12 +1012,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertEquals(destName,((TextMessage)msg).getText()); ssn.unsubscribe(destName); } - + public void testDeleteOptions() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); MessageConsumer cons; - + // default (create never, assert never) ------------------- // create never -------------------------------------------- String addr1 = "ADDR:testQueue1;{create: always, delete: always}"; @@ -1031,11 +1031,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { fail("Exception should not be thrown. Exception thrown is : " + e); } - + assertFalse("Queue not deleted as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); - - + + String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}"; dest = new AMQAnyDestination(addr2); try @@ -1047,11 +1047,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { fail("Exception should not be thrown. Exception thrown is : " + e); } - + assertFalse("Queue not deleted as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); - + String addr3 = "ADDR:testQueue3;{create: always, delete: sender}"; dest = new AMQAnyDestination(addr3); try @@ -1064,43 +1064,43 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { fail("Exception should not be thrown. Exception thrown is : " + e); } - + assertFalse("Queue not deleted as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); } - + /** * Test Goals : 1. Test if the client sets the correct accept mode for unreliable * and at-least-once. * 2. Test default reliability modes for Queues and Topics. * 3. Test if an exception is thrown if exactly-once is used. * 4. Test if an exception is thrown if at-least-once is used with topics. - * + * * Test Strategy: For goal #1 & #2 * For unreliable and at-least-once the test tries to receives messages * in client_ack mode but does not ack the messages. * It will then close the session, recreate a new session * and will then try to verify the queue depth. * For unreliable the messages should have been taken off the queue. - * For at-least-once the messages should be put back onto the queue. - * + * For at-least-once the messages should be put back onto the queue. + * */ - + public void testReliabilityOptions() throws Exception { String addr1 = "ADDR:testQueue1;{create: always, delete : receiver, link : {reliability : unreliable}}"; acceptModeTest(addr1,0); - + String addr2 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : at-least-once}}"; acceptModeTest(addr2,2); - + // Default accept-mode for topics - acceptModeTest("ADDR:amq.topic/test",0); - + acceptModeTest("ADDR:amq.topic/test",0); + // Default accept-mode for queues acceptModeTest("ADDR:testQueue1;{create: always}",2); - - String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}"; + + String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}"; try { AMQAnyDestination dest = new AMQAnyDestination(addr3); @@ -1111,83 +1111,83 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported")); } } - + private void acceptModeTest(String address, int expectedQueueDepth) throws Exception { Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); MessageConsumer cons; MessageProducer prod; - + AMQDestination dest = new AMQAnyDestination(address); cons = ssn.createConsumer(dest); prod = ssn.createProducer(dest); - + for (int i=0; i < expectedQueueDepth; i++) { prod.send(ssn.createTextMessage("Msg" + i)); } - + for (int i=0; i < expectedQueueDepth; i++) { Message msg = cons.receive(1000); assertNotNull(msg); assertEquals("Msg" + i,((TextMessage)msg).getText()); } - + ssn.close(); ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - long queueDepth = ((AMQSession) ssn).getQueueDepth(dest); - assertEquals(expectedQueueDepth,queueDepth); + long queueDepth = ((AMQSession) ssn).getQueueDepth(dest); + assertEquals(expectedQueueDepth,queueDepth); cons.close(); - prod.close(); + prod.close(); } - + public void testDestinationOnSend() throws Exception { Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); MessageConsumer cons = ssn.createConsumer(ssn.createTopic("ADDR:amq.topic/test")); MessageProducer prod = ssn.createProducer(null); - + Topic queue = ssn.createTopic("ADDR:amq.topic/test"); prod.send(queue,ssn.createTextMessage("A")); - + Message msg = cons.receive(1000); assertNotNull(msg); assertEquals("A",((TextMessage)msg).getText()); prod.close(); cons.close(); } - + public void testReplyToWithNamelessExchange() throws Exception { System.setProperty("qpid.declare_exchanges","false"); replyToTest("ADDR:my-queue;{create: always}"); System.setProperty("qpid.declare_exchanges","true"); } - + public void testReplyToWithCustomExchange() throws Exception { replyToTest("ADDR:hello;{create:always,node:{type:topic}}"); } - + private void replyToTest(String replyTo) throws Exception { - Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination replyToDest = AMQDestination.createDestination(replyTo); MessageConsumer replyToCons = session.createConsumer(replyToDest); - + Destination dest = session.createQueue("ADDR:amq.direct/test"); - + MessageConsumer cons = session.createConsumer(dest); MessageProducer prod = session.createProducer(dest); Message m = session.createTextMessage("test"); m.setJMSReplyTo(replyToDest); prod.send(m); - + Message msg = cons.receive(); MessageProducer prodR = session.createProducer(msg.getJMSReplyTo()); prodR.send(session.createTextMessage("x")); - + Message m1 = replyToCons.receive(); assertNotNull("The reply to consumer should have received the messsage",m1); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java index 626592dc10..5dcf678510 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java @@ -207,21 +207,21 @@ public class SelectorTest extends QpidBrokerTestCase implements MessageListener } assertTrue("No exception thrown!", caught); caught = false; - + } - + public void testRuntimeSelectorError() throws JMSException { Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(_destination , "testproperty % 5 = 1"); MessageProducer producer = session.createProducer(_destination); Message sentMsg = session.createTextMessage(); - + sentMsg.setIntProperty("testproperty", 1); // 1 % 5 producer.send(sentMsg); Message recvd = consumer.receive(RECIEVE_TIMEOUT); assertNotNull(recvd); - + sentMsg.setStringProperty("testproperty", "hello"); // "hello" % 5 makes no sense producer.send(sentMsg); try @@ -231,47 +231,47 @@ public class SelectorTest extends QpidBrokerTestCase implements MessageListener } catch (Exception e) { - + } assertFalse("Connection should not be closed", _connection.isClosed()); } - + public void testSelectorWithJMSMessageID() throws Exception { Session session = _connection.createSession(true, Session.SESSION_TRANSACTED); - + MessageProducer prod = session.createProducer(_destination); MessageConsumer consumer = session.createConsumer(_destination,"JMSMessageID IS NOT NULL"); - + for (int i=0; i<2; i++) { Message msg = session.createTextMessage("Msg" + String.valueOf(i)); prod.send(msg); } session.commit(); - + Message msg1 = consumer.receive(1000); Message msg2 = consumer.receive(1000); - + Assert.assertNotNull("Msg1 should not be null", msg1); Assert.assertNotNull("Msg2 should not be null", msg2); - + session.commit(); - + prod.setDisableMessageID(true); - - for (int i=0; i<2; i++) + + for (int i=2; i<4; i++) { Message msg = session.createTextMessage("Msg" + String.valueOf(i)); prod.send(msg); } - + session.commit(); - Message msg3 = consumer.receive(1000); + Message msg3 = consumer.receive(1000); Assert.assertNull("Msg3 should be null", msg3); session.commit(); consumer = session.createConsumer(_destination,"JMSMessageID IS NULL"); - + Message msg4 = consumer.receive(1000); Message msg5 = consumer.receive(1000); session.commit(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index e861b4f4ee..f8ab593c88 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -56,19 +56,19 @@ public class StreamMessageTest extends QpidBrokerTestCase public void testStreamMessageEOF() throws Exception { - Connection con = (AMQConnection) getConnection("guest", "guest"); + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL( ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'")); + FieldTable ft = new FieldTable(); ft.setString("x-match", "any"); ft.setString("F1000", "1"); - MessageConsumer consumer = - consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), false, false, (String) null, ft); - + consumerSession.declareAndBind(queue, ft); + MessageConsumer consumer = consumerSession.createConsumer(queue); // force synch to ensure the consumer has resulted in a bound queue // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); // This is the default now diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 5dae98fe21..6bf20d7708 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.test.unit.topic; +import javax.jms.JMSException; +import javax.naming.NamingException; +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; @@ -37,6 +40,7 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import org.apache.qpid.url.URLSyntaxException; /** @author Apache Software Foundation */ @@ -225,6 +229,44 @@ public class TopicSessionTest extends QpidBrokerTestCase AMQTopic topic = new AMQTopic(con, "testNoLocal"); + noLocalTest(con, topic); + + + con.close(); + } + + + public void testNoLocalDirectExchange() throws Exception + { + + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + + AMQTopic topic = new AMQTopic("direct://amq.direct/testNoLocal/testNoLocal?routingkey='testNoLocal',exclusive='true',autodelete='true'"); + + noLocalTest(con, topic); + + + con.close(); + } + + + + public void testNoLocalFanoutExchange() throws Exception + { + + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + + AMQTopic topic = new AMQTopic("fanout://amq.fanout/testNoLocal/testNoLocal?routingkey='testNoLocal',exclusive='true',autodelete='true'"); + + noLocalTest(con, topic); + + con.close(); + } + + + private void noLocalTest(AMQConnection con, AMQTopic topic) + throws JMSException, URLSyntaxException, AMQException, NamingException + { TopicSession session1 = con.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE); TopicSubscriber noLocal = session1.createSubscriber(topic, "", true); @@ -304,9 +346,6 @@ public class TopicSessionTest extends QpidBrokerTestCase //test nolocal subscriber does message m = (TextMessage) noLocal.receive(1000); assertNotNull(m); - - - con.close(); con2.close(); } -- cgit v1.2.1