diff options
| author | Robert Gemmell <robbie@apache.org> | 2010-03-18 16:23:56 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2010-03-18 16:23:56 +0000 |
| commit | 9aaa7d922df6a3b0bf4339b7481e90afb01b784b (patch) | |
| tree | b0cc66e14fac16ce5519d84f6e695121cbbc90ef /java | |
| parent | f929c25594a3375d6ff89b43c453c6b6efecc89b (diff) | |
| download | qpid-python-9aaa7d922df6a3b0bf4339b7481e90afb01b784b.tar.gz | |
QPID-2379: add Binding.msgMatched() support to the HeadersExchange
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@924879 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
6 files changed, 268 insertions, 206 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java b/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java index 0b689c16a7..0b6035f32d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java +++ b/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java @@ -37,7 +37,7 @@ public class Binding private final UUID _id; private final AtomicLong _matches = new AtomicLong(); - Binding(UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments) + public Binding(UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments) { _id = id; _bindingKey = bindingKey; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java index 35c4a8f9b2..f58a6513a9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.log4j.Logger; import org.apache.qpid.framing.AMQTypedValue; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.message.AMQMessageHeader; /** @@ -38,69 +39,35 @@ class HeadersBinding private static final Logger _logger = Logger.getLogger(HeadersBinding.class); private final FieldTable _mappings; + private final Binding _binding; private final Set<String> required = new HashSet<String>(); private final Map<String,Object> matches = new HashMap<String,Object>(); private boolean matchAny; - private final class MatchesOrProcessor implements FieldTable.FieldTableElementProcessor - { - private Boolean _result = Boolean.FALSE; - - public boolean processElement(String propertyName, AMQTypedValue value) - { - if((value != null) && (value.getValue() != null) && value.getValue().equals(matches.get(propertyName))) - { - _result = Boolean.TRUE; - return false; - } - return true; - } - - public Object getResult() - { - return _result; - } - } - - private final class RequiredOrProcessor implements FieldTable.FieldTableElementProcessor - { - Boolean _result = Boolean.FALSE; - - public boolean processElement(String propertyName, AMQTypedValue value) - { - if(required.contains(propertyName)) - { - _result = Boolean.TRUE; - return false; - } - return true; - } - - public Object getResult() - { - return _result; - } - } - - - /** - * Creates a binding for a set of mappings. Those mappings whose value is + * Creates a header binding for a set of mappings. Those mappings whose value is * null or the empty string are assumed only to be required headers, with * no constraint on the value. Those with a non-null value are assumed to * define a required match of value. - * @param mappings the defined mappings this binding should use + * + * @param binding the binding to create a header binding using */ - - HeadersBinding(FieldTable mappings) + public HeadersBinding(Binding binding) { - _mappings = mappings; - initMappings(); + _binding = binding; + if(_binding !=null) + { + _mappings = FieldTable.convertToFieldTable(_binding.getArguments()); + initMappings(); + } + else + { + _mappings = null; + } } - + private void initMappings() { - _mappings.processOverElements(new FieldTable.FieldTableElementProcessor() { @@ -133,6 +100,11 @@ class HeadersBinding { return _mappings; } + + public Binding getBinding() + { + return _binding; + } /** * Checks whether the supplied headers match the requirements of this binding @@ -250,4 +222,39 @@ class HeadersBinding { return key.startsWith("X-") || key.startsWith("x-"); } -} + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + + if (o == null) + { + return false; + } + + if (!(o instanceof HeadersBinding)) + { + return false; + } + + final HeadersBinding hb = (HeadersBinding) o; + + if(_binding == null) + { + if(hb.getBinding() != null) + { + return false; + } + } + else if (!_binding.equals(hb.getBinding())) + { + return false; + } + + return true; + } +}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index ce0b14932f..e98a603d12 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -24,8 +24,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; @@ -36,10 +34,11 @@ import org.apache.qpid.server.binding.Binding; import javax.management.JMException; import java.util.ArrayList; -import java.util.List; +import java.util.LinkedHashSet; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; /** * An exchange that binds queues based on a set of required headers and header values @@ -72,7 +71,14 @@ public class HeadersExchange extends AbstractExchange { private static final Logger _logger = Logger.getLogger(HeadersExchange.class); - + + private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>> _bindingsByKey = + new ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>(); + + private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers = + new CopyOnWriteArrayList<HeadersBinding>(); + + public static final ExchangeType<HeadersExchange> TYPE = new ExchangeType<HeadersExchange>() { @@ -102,34 +108,12 @@ public class HeadersExchange extends AbstractExchange } }; - - private final List<Registration> _bindings = new CopyOnWriteArrayList<Registration>(); - private Map<AMQShortString, Registration> _bindingByKey = new ConcurrentHashMap<AMQShortString, Registration>(); - - public HeadersExchange() { super(TYPE); } + - public void registerQueue(String routingKey, AMQQueue queue, Map<String,Object> args) - { - registerQueue(new AMQShortString(routingKey), queue, FieldTable.convertToFieldTable(args)); - } - - public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) - { - _logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() + " with " + args); - - Registration registration = new Registration(new HeadersBinding(args), queue, routingKey); - _bindings.add(registration); - - } - - public void deregisterQueue(String routingKey, AMQQueue queue, Map<String,Object> args) - { - _bindings.remove(new Registration(args == null ? null : new HeadersBinding(FieldTable.convertToFieldTable(args)), queue, new AMQShortString(routingKey))); - } public ArrayList<BaseQueue> doRoute(InboundMessage payload) { @@ -138,24 +122,27 @@ public class HeadersExchange extends AbstractExchange { _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + header); } - boolean routed = false; - ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>(); - for (Registration e : _bindings) + + LinkedHashSet<BaseQueue> queues = new LinkedHashSet<BaseQueue>(); + + for (HeadersBinding hb : _bindingHeaderMatchers) { - - if (e.binding.matches(header)) + if (hb.matches(header)) { + Binding b = hb.getBinding(); + + b.incrementMatches(); + if (_logger.isDebugEnabled()) { _logger.debug("Exchange " + getNameShortString() + ": delivering message with headers " + - header + " to " + e.queue.getNameShortString()); + header + " to " + b.getQueue().getNameShortString()); } - queues.add(e.queue); - - routed = true; + queues.add(b.getQueue()); } } - return queues; + + return new ArrayList<BaseQueue>(queues); } public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) @@ -166,38 +153,49 @@ public class HeadersExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, AMQQueue queue) { - return isBound(queue); + String bindingKey = (routingKey == null) ? "" : routingKey.toString(); + CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey); + + if(bindings != null) + { + for(Binding binding : bindings) + { + if(binding.getQueue().equals(queue)) + { + return true; + } + } + } + + return false; } public boolean isBound(AMQShortString routingKey) { - return hasBindings(); + String bindingKey = (routingKey == null) ? "" : routingKey.toString(); + CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey); + return bindings != null && !bindings.isEmpty(); } public boolean isBound(AMQQueue queue) { - for (Registration r : _bindings) + for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values()) { - if (r.queue.equals(queue)) + for(Binding binding : bindings) { - return true; + if(binding.getQueue().equals(queue)) + { + return true; + } } } + return false; } public boolean hasBindings() { - return !_bindings.isEmpty(); - } - - - - protected FieldTable getHeaders(ContentHeaderBody contentHeaderFrame) - { - //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers, - //but these are not yet implemented. - return ((BasicContentHeaderProperties) contentHeaderFrame.properties).getHeaders(); + return !getBindings().isEmpty(); } protected AbstractExchangeMBean createMBean() throws JMException @@ -210,59 +208,51 @@ public class HeadersExchange extends AbstractExchange return _logger; } - - static class Registration + protected void onBind(final Binding binding) { - private final HeadersBinding binding; - private final AMQQueue queue; - private final AMQShortString routingKey; + String bindingKey = binding.getBindingKey(); + AMQQueue queue = binding.getQueue(); + AMQShortString routingKey = AMQShortString.valueOf(bindingKey); + Map<String,Object> args = binding.getArguments(); - Registration(HeadersBinding binding, AMQQueue queue, AMQShortString routingKey) - { - this.binding = binding; - this.queue = queue; - this.routingKey = routingKey; - } + assert queue != null; + assert routingKey != null; - public int hashCode() - { - int queueHash = queue.hashCode(); - int routingHash = routingKey == null ? 0 : routingKey.hashCode(); - return queueHash + routingHash; - } + CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey); - public boolean equals(Object o) + if(bindings == null) { - return o instanceof Registration - && ((Registration) o).queue.equals(queue) - && (routingKey == null ? ((Registration)o).routingKey == null - : routingKey.equals(((Registration)o).routingKey)); - } - - public HeadersBinding getBinding() - { - return binding; + bindings = new CopyOnWriteArraySet<Binding>(); + CopyOnWriteArraySet<Binding> newBindings; + if((newBindings = _bindingsByKey.putIfAbsent(bindingKey, bindings)) != null) + { + bindings = newBindings; + } } - - public AMQQueue getQueue() + + if(_logger.isDebugEnabled()) { - return queue; + _logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() + + " with binding key '" +bindingKey + "' and args: " + args); } - public AMQShortString getRoutingKey() - { - return routingKey; - } - } + _bindingHeaderMatchers.add(new HeadersBinding(binding)); + bindings.add(binding); - protected void onBind(final Binding binding) - { - registerQueue(binding.getBindingKey(), binding.getQueue(), binding.getArguments()); } protected void onUnbind(final Binding binding) { - deregisterQueue(binding.getBindingKey(), binding.getQueue(), binding.getArguments()); + assert binding != null; + + CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(binding.getBindingKey()); + if(bindings != null) + { + bindings.remove(binding); + } + + _logger.debug("==============="); + _logger.debug("Removing Binding: " + _bindingHeaderMatchers.remove(new HeadersBinding(binding))); } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 06d5d80ac1..2d8b157297 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -31,6 +31,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.binding.BindingFactory; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.AMQMessageHeader; @@ -53,8 +54,10 @@ import org.apache.qpid.server.subscription.Subscription; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -92,31 +95,31 @@ public class AbstractHeadersExchangeTestBase extends TestCase protected TestQueue bindDefault(String... bindings) throws AMQException { - return bind("Queue" + (++count), bindings); - } + String queueName = "Queue" + (++count); - protected TestQueue bind(String queueName, String... bindings) throws AMQException - { - return bind(queueName, getHeaders(bindings)); + return bind(queueName, queueName, getHeadersMap(bindings)); } - - protected TestQueue bind(String queue, FieldTable bindings) throws AMQException + + protected void unbind(TestQueue queue, String... bindings) throws AMQException { - return bind(new TestQueue(new AMQShortString(queue)), bindings); + String queueName = queue.getName(); + //TODO - check this + exchange.onUnbind(new Binding(null,queueName, queue, exchange, getHeadersMap(bindings))); } - - protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException + + protected int getCount() { - return bind(queue, getHeaders(bindings)); + return count; } - protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException + private TestQueue bind(String key, String queueName, Map<String,Object> args) throws AMQException { + TestQueue queue = new TestQueue(new AMQShortString(queueName)); queues.add(queue); - exchange.registerQueue(null, queue, bindings); + exchange.onBind(new Binding(null,key, queue, exchange, args)); return queue; } - + protected int route(Message m) throws AMQException { @@ -171,6 +174,23 @@ public class AbstractHeadersExchangeTestBase extends TestCase } } + + static Map<String,Object> getHeadersMap(String... entries) + { + if(entries == null) + { + return null; + } + + Map<String,Object> headers = new HashMap<String,Object>(); + + for (String s : entries) + { + String[] parts = s.split("=", 2); + headers.put(parts[0], parts.length > 1 ? parts[1] : ""); + } + return headers; + } static FieldTable getHeaders(String... entries) { diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java index 1e56a32383..a7c226cbd8 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java @@ -26,7 +26,9 @@ import java.util.Set; import junit.framework.TestCase; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.queue.MockAMQQueue; /** */ @@ -119,166 +121,193 @@ public class HeadersBindingTest extends TestCase } } - private FieldTable bindHeaders = new FieldTable(); + private Map<String,Object> bindHeaders = new HashMap<String,Object>(); private MockHeader matchHeaders = new MockHeader(); + private int _count = 0; + private MockAMQQueue _queue; + + protected void setUp() + { + _count++; + _queue = new MockAMQQueue(getQueueName()); + } + + protected String getQueueName() + { + return "Queue" + _count; + } public void testDefault_1() { - bindHeaders.setString("A", "Value of A"); + bindHeaders.put("A", "Value of A"); matchHeaders.setString("A", "Value of A"); - assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders)); + Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + assertTrue(new HeadersBinding(b).matches(matchHeaders)); } public void testDefault_2() { - bindHeaders.setString("A", "Value of A"); + bindHeaders.put("A", "Value of A"); matchHeaders.setString("A", "Value of A"); matchHeaders.setString("B", "Value of B"); - assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders)); + Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + assertTrue(new HeadersBinding(b).matches(matchHeaders)); } public void testDefault_3() { - bindHeaders.setString("A", "Value of A"); + bindHeaders.put("A", "Value of A"); matchHeaders.setString("A", "Altered value of A"); - assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders)); + Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + assertFalse(new HeadersBinding(b).matches(matchHeaders)); } public void testAll_1() { - bindHeaders.setString("X-match", "all"); - bindHeaders.setString("A", "Value of A"); + bindHeaders.put("X-match", "all"); + bindHeaders.put("A", "Value of A"); matchHeaders.setString("A", "Value of A"); - assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders)); + Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + assertTrue(new HeadersBinding(b).matches(matchHeaders)); } public void testAll_2() { - bindHeaders.setString("X-match", "all"); - bindHeaders.setString("A", "Value of A"); - bindHeaders.setString("B", "Value of B"); + bindHeaders.put("X-match", "all"); + bindHeaders.put("A", "Value of A"); + bindHeaders.put("B", "Value of B"); matchHeaders.setString("A", "Value of A"); - assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders)); + Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + assertFalse(new HeadersBinding(b).matches(matchHeaders)); } public void testAll_3() { - bindHeaders.setString("X-match", "all"); - bindHeaders.setString("A", "Value of A"); - bindHeaders.setString("B", "Value of B"); + bindHeaders.put("X-match", "all"); + bindHeaders.put("A", "Value of A"); + bindHeaders.put("B", "Value of B"); matchHeaders.setString("A", "Value of A"); matchHeaders.setString("B", "Value of B"); - assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders)); + Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + assertTrue(new HeadersBinding(b).matches(matchHeaders)); } public void testAll_4() { - bindHeaders.setString("X-match", "all"); - bindHeaders.setString("A", "Value of A"); - bindHeaders.setString("B", "Value of B"); + bindHeaders.put("X-match", "all"); + bindHeaders.put("A", "Value of A"); + bindHeaders.put("B", "Value of B"); matchHeaders.setString("A", "Value of A"); matchHeaders.setString("B", "Value of B"); matchHeaders.setString("C", "Value of C"); - assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders)); + Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + assertTrue(new HeadersBinding(b).matches(matchHeaders)); } public void testAll_5() { - bindHeaders.setString("X-match", "all"); - bindHeaders.setString("A", "Value of A"); - bindHeaders.setString("B", "Value of B"); + bindHeaders.put("X-match", "all"); + bindHeaders.put("A", "Value of A"); + bindHeaders.put("B", "Value of B"); matchHeaders.setString("A", "Value of A"); matchHeaders.setString("B", "Altered value of B"); matchHeaders.setString("C", "Value of C"); - assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders)); + Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + assertFalse(new HeadersBinding(b).matches(matchHeaders)); } public void testAny_1() { - bindHeaders.setString("X-match", "any"); - bindHeaders.setString("A", "Value of A"); + bindHeaders.put("X-match", "any"); + bindHeaders.put("A", "Value of A"); matchHeaders.setString("A", "Value of A"); - assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders)); + Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + assertTrue(new HeadersBinding(b).matches(matchHeaders)); } public void testAny_2() { - bindHeaders.setString("X-match", "any"); - bindHeaders.setString("A", "Value of A"); - bindHeaders.setString("B", "Value of B"); + bindHeaders.put("X-match", "any"); + bindHeaders.put("A", "Value of A"); + bindHeaders.put("B", "Value of B"); matchHeaders.setString("A", "Value of A"); - assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders)); + Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + assertTrue(new HeadersBinding(b).matches(matchHeaders)); } public void testAny_3() { - bindHeaders.setString("X-match", "any"); - bindHeaders.setString("A", "Value of A"); - bindHeaders.setString("B", "Value of B"); + bindHeaders.put("X-match", "any"); + bindHeaders.put("A", "Value of A"); + bindHeaders.put("B", "Value of B"); matchHeaders.setString("A", "Value of A"); matchHeaders.setString("B", "Value of B"); - assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders)); + Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + assertTrue(new HeadersBinding(b).matches(matchHeaders)); } public void testAny_4() { - bindHeaders.setString("X-match", "any"); - bindHeaders.setString("A", "Value of A"); - bindHeaders.setString("B", "Value of B"); + bindHeaders.put("X-match", "any"); + bindHeaders.put("A", "Value of A"); + bindHeaders.put("B", "Value of B"); matchHeaders.setString("A", "Value of A"); matchHeaders.setString("B", "Value of B"); matchHeaders.setString("C", "Value of C"); - assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders)); + Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + assertTrue(new HeadersBinding(b).matches(matchHeaders)); } public void testAny_5() { - bindHeaders.setString("X-match", "any"); - bindHeaders.setString("A", "Value of A"); - bindHeaders.setString("B", "Value of B"); + bindHeaders.put("X-match", "any"); + bindHeaders.put("A", "Value of A"); + bindHeaders.put("B", "Value of B"); matchHeaders.setString("A", "Value of A"); matchHeaders.setString("B", "Altered value of B"); matchHeaders.setString("C", "Value of C"); - assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders)); + Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + assertTrue(new HeadersBinding(b).matches(matchHeaders)); } public void testAny_6() { - bindHeaders.setString("X-match", "any"); - bindHeaders.setString("A", "Value of A"); - bindHeaders.setString("B", "Value of B"); + bindHeaders.put("X-match", "any"); + bindHeaders.put("A", "Value of A"); + bindHeaders.put("B", "Value of B"); matchHeaders.setString("A", "Altered value of A"); matchHeaders.setString("B", "Altered value of B"); matchHeaders.setString("C", "Value of C"); - assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders)); + Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + assertFalse(new HeadersBinding(b).matches(matchHeaders)); } public static junit.framework.Test suite() diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 580bc78b8d..f982c3976f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -106,6 +106,22 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase pb2.setMandatory(true); routeAndTest(m1,true); } + + public void testOnUnbind() throws AMQException + { + TestQueue q1 = bindDefault("F0000"); + TestQueue q2 = bindDefault("F0000=Aardvark"); + TestQueue q3 = bindDefault("F0001"); + + routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1); + routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2); + routeAndTest(new Message(_protocolSession, "Message3", "F0001"), q3); + + unbind(q1,"F0000"); + routeAndTest(new Message(_protocolSession, "Message4", "F0000")); + routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark"), q2); + } + public static junit.framework.Test suite() { |
