summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-03-18 16:23:56 +0000
committerRobert Gemmell <robbie@apache.org>2010-03-18 16:23:56 +0000
commit9aaa7d922df6a3b0bf4339b7481e90afb01b784b (patch)
treeb0cc66e14fac16ce5519d84f6e695121cbbc90ef /java
parentf929c25594a3375d6ff89b43c453c6b6efecc89b (diff)
downloadqpid-python-9aaa7d922df6a3b0bf4339b7481e90afb01b784b.tar.gz
QPID-2379: add Binding.msgMatched() support to the HeadersExchange
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@924879 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java109
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java172
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java48
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java127
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java16
6 files changed, 268 insertions, 206 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java b/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
index 0b689c16a7..0b6035f32d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
@@ -37,7 +37,7 @@ public class Binding
private final UUID _id;
private final AtomicLong _matches = new AtomicLong();
- Binding(UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
+ public Binding(UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
{
_id = id;
_bindingKey = bindingKey;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
index 35c4a8f9b2..f58a6513a9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.qpid.framing.AMQTypedValue;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.message.AMQMessageHeader;
/**
@@ -38,69 +39,35 @@ class HeadersBinding
private static final Logger _logger = Logger.getLogger(HeadersBinding.class);
private final FieldTable _mappings;
+ private final Binding _binding;
private final Set<String> required = new HashSet<String>();
private final Map<String,Object> matches = new HashMap<String,Object>();
private boolean matchAny;
- private final class MatchesOrProcessor implements FieldTable.FieldTableElementProcessor
- {
- private Boolean _result = Boolean.FALSE;
-
- public boolean processElement(String propertyName, AMQTypedValue value)
- {
- if((value != null) && (value.getValue() != null) && value.getValue().equals(matches.get(propertyName)))
- {
- _result = Boolean.TRUE;
- return false;
- }
- return true;
- }
-
- public Object getResult()
- {
- return _result;
- }
- }
-
- private final class RequiredOrProcessor implements FieldTable.FieldTableElementProcessor
- {
- Boolean _result = Boolean.FALSE;
-
- public boolean processElement(String propertyName, AMQTypedValue value)
- {
- if(required.contains(propertyName))
- {
- _result = Boolean.TRUE;
- return false;
- }
- return true;
- }
-
- public Object getResult()
- {
- return _result;
- }
- }
-
-
-
/**
- * Creates a binding for a set of mappings. Those mappings whose value is
+ * Creates a header binding for a set of mappings. Those mappings whose value is
* null or the empty string are assumed only to be required headers, with
* no constraint on the value. Those with a non-null value are assumed to
* define a required match of value.
- * @param mappings the defined mappings this binding should use
+ *
+ * @param binding the binding to create a header binding using
*/
-
- HeadersBinding(FieldTable mappings)
+ public HeadersBinding(Binding binding)
{
- _mappings = mappings;
- initMappings();
+ _binding = binding;
+ if(_binding !=null)
+ {
+ _mappings = FieldTable.convertToFieldTable(_binding.getArguments());
+ initMappings();
+ }
+ else
+ {
+ _mappings = null;
+ }
}
-
+
private void initMappings()
{
-
_mappings.processOverElements(new FieldTable.FieldTableElementProcessor()
{
@@ -133,6 +100,11 @@ class HeadersBinding
{
return _mappings;
}
+
+ public Binding getBinding()
+ {
+ return _binding;
+ }
/**
* Checks whether the supplied headers match the requirements of this binding
@@ -250,4 +222,39 @@ class HeadersBinding
{
return key.startsWith("X-") || key.startsWith("x-");
}
-}
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+
+ if (o == null)
+ {
+ return false;
+ }
+
+ if (!(o instanceof HeadersBinding))
+ {
+ return false;
+ }
+
+ final HeadersBinding hb = (HeadersBinding) o;
+
+ if(_binding == null)
+ {
+ if(hb.getBinding() != null)
+ {
+ return false;
+ }
+ }
+ else if (!_binding.equals(hb.getBinding()))
+ {
+ return false;
+ }
+
+ return true;
+ }
+} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index ce0b14932f..e98a603d12 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -24,8 +24,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -36,10 +34,11 @@ import org.apache.qpid.server.binding.Binding;
import javax.management.JMException;
import java.util.ArrayList;
-import java.util.List;
+import java.util.LinkedHashSet;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
/**
* An exchange that binds queues based on a set of required headers and header values
@@ -72,7 +71,14 @@ public class HeadersExchange extends AbstractExchange
{
private static final Logger _logger = Logger.getLogger(HeadersExchange.class);
-
+
+ private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>> _bindingsByKey =
+ new ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>();
+
+ private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers =
+ new CopyOnWriteArrayList<HeadersBinding>();
+
+
public static final ExchangeType<HeadersExchange> TYPE = new ExchangeType<HeadersExchange>()
{
@@ -102,34 +108,12 @@ public class HeadersExchange extends AbstractExchange
}
};
-
- private final List<Registration> _bindings = new CopyOnWriteArrayList<Registration>();
- private Map<AMQShortString, Registration> _bindingByKey = new ConcurrentHashMap<AMQShortString, Registration>();
-
-
public HeadersExchange()
{
super(TYPE);
}
+
- public void registerQueue(String routingKey, AMQQueue queue, Map<String,Object> args)
- {
- registerQueue(new AMQShortString(routingKey), queue, FieldTable.convertToFieldTable(args));
- }
-
- public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args)
- {
- _logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() + " with " + args);
-
- Registration registration = new Registration(new HeadersBinding(args), queue, routingKey);
- _bindings.add(registration);
-
- }
-
- public void deregisterQueue(String routingKey, AMQQueue queue, Map<String,Object> args)
- {
- _bindings.remove(new Registration(args == null ? null : new HeadersBinding(FieldTable.convertToFieldTable(args)), queue, new AMQShortString(routingKey)));
- }
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
@@ -138,24 +122,27 @@ public class HeadersExchange extends AbstractExchange
{
_logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + header);
}
- boolean routed = false;
- ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>();
- for (Registration e : _bindings)
+
+ LinkedHashSet<BaseQueue> queues = new LinkedHashSet<BaseQueue>();
+
+ for (HeadersBinding hb : _bindingHeaderMatchers)
{
-
- if (e.binding.matches(header))
+ if (hb.matches(header))
{
+ Binding b = hb.getBinding();
+
+ b.incrementMatches();
+
if (_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getNameShortString() + ": delivering message with headers " +
- header + " to " + e.queue.getNameShortString());
+ header + " to " + b.getQueue().getNameShortString());
}
- queues.add(e.queue);
-
- routed = true;
+ queues.add(b.getQueue());
}
}
- return queues;
+
+ return new ArrayList<BaseQueue>(queues);
}
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
@@ -166,38 +153,49 @@ public class HeadersExchange extends AbstractExchange
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
- return isBound(queue);
+ String bindingKey = (routingKey == null) ? "" : routingKey.toString();
+ CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
+
+ if(bindings != null)
+ {
+ for(Binding binding : bindings)
+ {
+ if(binding.getQueue().equals(queue))
+ {
+ return true;
+ }
+ }
+ }
+
+ return false;
}
public boolean isBound(AMQShortString routingKey)
{
- return hasBindings();
+ String bindingKey = (routingKey == null) ? "" : routingKey.toString();
+ CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
+ return bindings != null && !bindings.isEmpty();
}
public boolean isBound(AMQQueue queue)
{
- for (Registration r : _bindings)
+ for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values())
{
- if (r.queue.equals(queue))
+ for(Binding binding : bindings)
{
- return true;
+ if(binding.getQueue().equals(queue))
+ {
+ return true;
+ }
}
}
+
return false;
}
public boolean hasBindings()
{
- return !_bindings.isEmpty();
- }
-
-
-
- protected FieldTable getHeaders(ContentHeaderBody contentHeaderFrame)
- {
- //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
- //but these are not yet implemented.
- return ((BasicContentHeaderProperties) contentHeaderFrame.properties).getHeaders();
+ return !getBindings().isEmpty();
}
protected AbstractExchangeMBean createMBean() throws JMException
@@ -210,59 +208,51 @@ public class HeadersExchange extends AbstractExchange
return _logger;
}
-
- static class Registration
+ protected void onBind(final Binding binding)
{
- private final HeadersBinding binding;
- private final AMQQueue queue;
- private final AMQShortString routingKey;
+ String bindingKey = binding.getBindingKey();
+ AMQQueue queue = binding.getQueue();
+ AMQShortString routingKey = AMQShortString.valueOf(bindingKey);
+ Map<String,Object> args = binding.getArguments();
- Registration(HeadersBinding binding, AMQQueue queue, AMQShortString routingKey)
- {
- this.binding = binding;
- this.queue = queue;
- this.routingKey = routingKey;
- }
+ assert queue != null;
+ assert routingKey != null;
- public int hashCode()
- {
- int queueHash = queue.hashCode();
- int routingHash = routingKey == null ? 0 : routingKey.hashCode();
- return queueHash + routingHash;
- }
+ CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
- public boolean equals(Object o)
+ if(bindings == null)
{
- return o instanceof Registration
- && ((Registration) o).queue.equals(queue)
- && (routingKey == null ? ((Registration)o).routingKey == null
- : routingKey.equals(((Registration)o).routingKey));
- }
-
- public HeadersBinding getBinding()
- {
- return binding;
+ bindings = new CopyOnWriteArraySet<Binding>();
+ CopyOnWriteArraySet<Binding> newBindings;
+ if((newBindings = _bindingsByKey.putIfAbsent(bindingKey, bindings)) != null)
+ {
+ bindings = newBindings;
+ }
}
-
- public AMQQueue getQueue()
+
+ if(_logger.isDebugEnabled())
{
- return queue;
+ _logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() +
+ " with binding key '" +bindingKey + "' and args: " + args);
}
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- }
+ _bindingHeaderMatchers.add(new HeadersBinding(binding));
+ bindings.add(binding);
- protected void onBind(final Binding binding)
- {
- registerQueue(binding.getBindingKey(), binding.getQueue(), binding.getArguments());
}
protected void onUnbind(final Binding binding)
{
- deregisterQueue(binding.getBindingKey(), binding.getQueue(), binding.getArguments());
+ assert binding != null;
+
+ CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(binding.getBindingKey());
+ if(bindings != null)
+ {
+ bindings.remove(binding);
+ }
+
+ _logger.debug("===============");
+ _logger.debug("Removing Binding: " + _bindingHeaderMatchers.remove(new HeadersBinding(binding)));
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 06d5d80ac1..2d8b157297 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -31,6 +31,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
@@ -53,8 +54,10 @@ import org.apache.qpid.server.subscription.Subscription;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -92,31 +95,31 @@ public class AbstractHeadersExchangeTestBase extends TestCase
protected TestQueue bindDefault(String... bindings) throws AMQException
{
- return bind("Queue" + (++count), bindings);
- }
+ String queueName = "Queue" + (++count);
- protected TestQueue bind(String queueName, String... bindings) throws AMQException
- {
- return bind(queueName, getHeaders(bindings));
+ return bind(queueName, queueName, getHeadersMap(bindings));
}
-
- protected TestQueue bind(String queue, FieldTable bindings) throws AMQException
+
+ protected void unbind(TestQueue queue, String... bindings) throws AMQException
{
- return bind(new TestQueue(new AMQShortString(queue)), bindings);
+ String queueName = queue.getName();
+ //TODO - check this
+ exchange.onUnbind(new Binding(null,queueName, queue, exchange, getHeadersMap(bindings)));
}
-
- protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException
+
+ protected int getCount()
{
- return bind(queue, getHeaders(bindings));
+ return count;
}
- protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException
+ private TestQueue bind(String key, String queueName, Map<String,Object> args) throws AMQException
{
+ TestQueue queue = new TestQueue(new AMQShortString(queueName));
queues.add(queue);
- exchange.registerQueue(null, queue, bindings);
+ exchange.onBind(new Binding(null,key, queue, exchange, args));
return queue;
}
-
+
protected int route(Message m) throws AMQException
{
@@ -171,6 +174,23 @@ public class AbstractHeadersExchangeTestBase extends TestCase
}
}
+
+ static Map<String,Object> getHeadersMap(String... entries)
+ {
+ if(entries == null)
+ {
+ return null;
+ }
+
+ Map<String,Object> headers = new HashMap<String,Object>();
+
+ for (String s : entries)
+ {
+ String[] parts = s.split("=", 2);
+ headers.put(parts[0], parts.length > 1 ? parts[1] : "");
+ }
+ return headers;
+ }
static FieldTable getHeaders(String... entries)
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index 1e56a32383..a7c226cbd8 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -26,7 +26,9 @@ import java.util.Set;
import junit.framework.TestCase;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.queue.MockAMQQueue;
/**
*/
@@ -119,166 +121,193 @@ public class HeadersBindingTest extends TestCase
}
}
- private FieldTable bindHeaders = new FieldTable();
+ private Map<String,Object> bindHeaders = new HashMap<String,Object>();
private MockHeader matchHeaders = new MockHeader();
+ private int _count = 0;
+ private MockAMQQueue _queue;
+
+ protected void setUp()
+ {
+ _count++;
+ _queue = new MockAMQQueue(getQueueName());
+ }
+
+ protected String getQueueName()
+ {
+ return "Queue" + _count;
+ }
public void testDefault_1()
{
- bindHeaders.setString("A", "Value of A");
+ bindHeaders.put("A", "Value of A");
matchHeaders.setString("A", "Value of A");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testDefault_2()
{
- bindHeaders.setString("A", "Value of A");
+ bindHeaders.put("A", "Value of A");
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Value of B");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testDefault_3()
{
- bindHeaders.setString("A", "Value of A");
+ bindHeaders.put("A", "Value of A");
matchHeaders.setString("A", "Altered value of A");
- assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ assertFalse(new HeadersBinding(b).matches(matchHeaders));
}
public void testAll_1()
{
- bindHeaders.setString("X-match", "all");
- bindHeaders.setString("A", "Value of A");
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
matchHeaders.setString("A", "Value of A");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testAll_2()
{
- bindHeaders.setString("X-match", "all");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Value of A");
- assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ assertFalse(new HeadersBinding(b).matches(matchHeaders));
}
public void testAll_3()
{
- bindHeaders.setString("X-match", "all");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Value of B");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testAll_4()
{
- bindHeaders.setString("X-match", "all");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Value of B");
matchHeaders.setString("C", "Value of C");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testAll_5()
{
- bindHeaders.setString("X-match", "all");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Altered value of B");
matchHeaders.setString("C", "Value of C");
- assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ assertFalse(new HeadersBinding(b).matches(matchHeaders));
}
public void testAny_1()
{
- bindHeaders.setString("X-match", "any");
- bindHeaders.setString("A", "Value of A");
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
matchHeaders.setString("A", "Value of A");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testAny_2()
{
- bindHeaders.setString("X-match", "any");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Value of A");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testAny_3()
{
- bindHeaders.setString("X-match", "any");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Value of B");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testAny_4()
{
- bindHeaders.setString("X-match", "any");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Value of B");
matchHeaders.setString("C", "Value of C");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testAny_5()
{
- bindHeaders.setString("X-match", "any");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Altered value of B");
matchHeaders.setString("C", "Value of C");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testAny_6()
{
- bindHeaders.setString("X-match", "any");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Altered value of A");
matchHeaders.setString("B", "Altered value of B");
matchHeaders.setString("C", "Value of C");
- assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ assertFalse(new HeadersBinding(b).matches(matchHeaders));
}
public static junit.framework.Test suite()
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 580bc78b8d..f982c3976f 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -106,6 +106,22 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
pb2.setMandatory(true);
routeAndTest(m1,true);
}
+
+ public void testOnUnbind() throws AMQException
+ {
+ TestQueue q1 = bindDefault("F0000");
+ TestQueue q2 = bindDefault("F0000=Aardvark");
+ TestQueue q3 = bindDefault("F0001");
+
+ routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1);
+ routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2);
+ routeAndTest(new Message(_protocolSession, "Message3", "F0001"), q3);
+
+ unbind(q1,"F0000");
+ routeAndTest(new Message(_protocolSession, "Message4", "F0000"));
+ routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark"), q2);
+ }
+
public static junit.framework.Test suite()
{