diff options
5 files changed, 97 insertions, 58 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java b/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java index 0b6035f32d..60c9a86b76 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java +++ b/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java @@ -89,29 +89,30 @@ public class Binding @Override public boolean equals(final Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) + { + return true; + } + + if (o == null || !(o instanceof Binding)) + { + return false; + } final Binding binding = (Binding) o; - if (!_bindingKey.equals(binding._bindingKey)) return false; - if (!_exchange.equals(binding._exchange)) return false; - if (!_queue.equals(binding._queue)) return false; - - return true; + return (_bindingKey == null ? binding.getBindingKey() == null : _bindingKey.equals(binding.getBindingKey())) + && (_exchange == null ? binding.getExchange() == null : _exchange.equals(binding.getExchange())) + && (_queue == null ? binding.getQueue() == null : _queue.equals(binding.getQueue())); } @Override public int hashCode() { - int result = _bindingKey.hashCode(); - result = 31 * result + _queue.hashCode(); - result = 31 * result + _exchange.hashCode(); + int result = _bindingKey == null ? 1 : _bindingKey.hashCode(); + result = 31 * result + (_queue == null ? 3 : _queue.hashCode()); + result = 31 * result + (_exchange == null ? 5 : _exchange.hashCode()); return result; } - - - - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 14f15dd92c..6e60d95700 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -31,6 +31,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.topic.*; import org.apache.qpid.server.filter.JMSSelectorFilter; import org.apache.qpid.server.message.InboundMessage; @@ -83,7 +84,7 @@ public class TopicExchange extends AbstractExchange private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults = new ConcurrentHashMap<AMQShortString, TopicExchangeResult>(); - private final Map<TopicBinding, FieldTable> _bindings = new HashMap<TopicBinding, FieldTable>(); + private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>(); private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>(); @@ -92,20 +93,12 @@ public class TopicExchange extends AbstractExchange super(TYPE); } - public synchronized void registerQueue(String rKey, AMQQueue queue, Map<String,Object> args) - { - try - { - registerQueue(new AMQShortString(rKey), queue, FieldTable.convertToFieldTable(args)); - } - catch (AMQInvalidArgumentException e) - { - throw new RuntimeException(e); - } - } - - public synchronized void registerQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQInvalidArgumentException + protected synchronized void registerQueue(final Binding binding) throws AMQInvalidArgumentException { + AMQShortString rKey = new AMQShortString(binding.getBindingKey()) ; + AMQQueue queue = binding.getQueue(); + FieldTable args = FieldTable.convertToFieldTable(binding.getArguments()); + assert queue != null; assert rKey != null; @@ -114,8 +107,6 @@ public class TopicExchange extends AbstractExchange AMQShortString routingKey = TopicNormalizer.normalize(rKey); - TopicBinding binding = new TopicBinding(rKey, queue, args); - if(_bindings.containsKey(binding)) { FieldTable oldArgs = _bindings.get(binding); @@ -146,6 +137,8 @@ public class TopicExchange extends AbstractExchange return; } } + + result.addBinding(binding); } else @@ -177,6 +170,8 @@ public class TopicExchange extends AbstractExchange result.addUnfilteredQueue(queue); } } + + result.addBinding(binding); _bindings.put(binding, args); } @@ -210,11 +205,19 @@ public class TopicExchange extends AbstractExchange ? AMQShortString.EMPTY_STRING : new AMQShortString(payload.getRoutingKey()); + _logger.info("Message routing key: " + routingKey ); + // The copy here is unfortunate, but not too bad relevant to the amount of // things created and copied in getMatchedQueues ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>(); queues.addAll(getMatchedQueues(payload, routingKey)); + for(BaseQueue q : queues) + { + _logger.info("Matched Queue: " + q.getNameShortString() ); + } + + if(queues == null || queues.isEmpty()) { _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes."); @@ -226,7 +229,8 @@ public class TopicExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) { - TopicBinding binding = new TopicBinding(routingKey, queue, arguments); + Binding binding = new Binding(null, routingKey.toString(), queue, this, FieldTable.convertToMap(arguments)); + if (arguments == null) { return _bindings.containsKey(binding); @@ -253,7 +257,7 @@ public class TopicExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey) { - for(TopicBinding b : _bindings.keySet()) + for(Binding b : _bindings.keySet()) { if(b.getBindingKey().equals(routingKey)) { @@ -266,7 +270,7 @@ public class TopicExchange extends AbstractExchange public boolean isBound(AMQQueue queue) { - for(TopicBinding b : _bindings.keySet()) + for(Binding b : _bindings.keySet()) { if(b.getQueue().equals(queue)) { @@ -282,19 +286,16 @@ public class TopicExchange extends AbstractExchange return !_bindings.isEmpty(); } - - public void deregisterQueue(String rKey, AMQQueue queue, Map<String, Object> args) - { - removeBinding(new TopicBinding(new AMQShortString(rKey), queue, FieldTable.convertToFieldTable(args))); - } - - private boolean removeBinding(final TopicBinding binding) + private boolean deregisterQueue(final Binding binding) { if(_bindings.containsKey(binding)) { FieldTable bindingArgs = _bindings.remove(binding); - AMQShortString bindingKey = TopicNormalizer.normalize(binding.getBindingKey()); + AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey())); TopicExchangeResult result = _topicExchangeResults.get(bindingKey); + + result.removeBinding(binding); + if(argumentsContainSelector(bindingArgs)) { try @@ -341,8 +342,14 @@ public class TopicExchange extends AbstractExchange Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>(); for(TopicMatcherResult result : results) { + TopicExchangeResult res = (TopicExchangeResult)result; - queues = ((TopicExchangeResult)result).processMessage(message, queues); + for(Binding b : res.getBindings()) + { + b.incrementMatches(); + } + + queues = res.processMessage(message, queues); } return queues; } @@ -350,14 +357,21 @@ public class TopicExchange extends AbstractExchange } - protected void onBind(final org.apache.qpid.server.binding.Binding binding) + protected void onBind(final Binding binding) { - registerQueue(binding.getBindingKey(),binding.getQueue(),binding.getArguments()); + try + { + registerQueue(binding); + } + catch (AMQInvalidArgumentException e) + { + throw new RuntimeException(e); + } } - protected void onUnbind(final org.apache.qpid.server.binding.Binding binding) + protected void onUnbind(final Binding binding) { - deregisterQueue(binding.getBindingKey(),binding.getQueue(),binding.getArguments()); + deregisterQueue(binding); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java index d9a779802f..41dc0d749a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java @@ -21,14 +21,22 @@ package org.apache.qpid.server.exchange.topic; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InboundMessage; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; public final class TopicExchangeResult implements TopicMatcherResult { + private final List<Binding> _bindings = new CopyOnWriteArrayList<Binding>(); private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>(); private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>(); @@ -64,6 +72,20 @@ public final class TopicExchangeResult implements TopicMatcherResult return _unfilteredQueues.keySet(); } + public void addBinding(Binding binding) + { + _bindings.add(binding); + } + + public void removeBinding(Binding binding) + { + _bindings.remove(binding); + } + + public List<Binding> getBindings() + { + return new ArrayList<Binding>(_bindings); + } public void addFilteredQueue(AMQQueue queue, MessageFilter filter) { diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index daa0377e0a..4fa47d039e 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.AMQException; @@ -64,7 +65,7 @@ public class TopicExchangeTest extends TestCase public void testNoRoute() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null); - _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null); + _exchange.registerQueue(new Binding(null,"a.*.#.b", queue,_exchange, null)); IncomingMessage message = createMessage("a.b"); @@ -76,7 +77,7 @@ public class TopicExchangeTest extends TestCase public void testDirectMatch() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("ab"), false, null, false, _vhost, null); - _exchange.registerQueue(new AMQShortString("a.b"), queue, null); + _exchange.registerQueue(new Binding(null,"a.b", queue,_exchange, null)); IncomingMessage message = createMessage("a.b"); @@ -103,7 +104,7 @@ public class TopicExchangeTest extends TestCase public void testStarMatch() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*"), false, null, false, _vhost, null); - _exchange.registerQueue(new AMQShortString("a.*"), queue, null); + _exchange.registerQueue(new Binding(null,"a.*", queue,_exchange, null)); IncomingMessage message = createMessage("a.b"); @@ -142,7 +143,7 @@ public class TopicExchangeTest extends TestCase public void testHashMatch() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null); - _exchange.registerQueue(new AMQShortString("a.#"), queue, null); + _exchange.registerQueue(new Binding(null,"a.#", queue,_exchange, null)); IncomingMessage message = createMessage("a.b.c"); @@ -205,7 +206,7 @@ public class TopicExchangeTest extends TestCase public void testMidHash() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null); - _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null); + _exchange.registerQueue(new Binding(null,"a.*.#.b", queue,_exchange, null)); IncomingMessage message = createMessage("a.c.d.b"); @@ -235,7 +236,7 @@ public class TopicExchangeTest extends TestCase public void testMatchafterHash() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null); - _exchange.registerQueue(new AMQShortString("a.*.#.b.c"), queue, null); + _exchange.registerQueue(new Binding(null,"a.*.#.b.c", queue,_exchange, null)); IncomingMessage message = createMessage("a.c.b.b"); @@ -281,7 +282,7 @@ public class TopicExchangeTest extends TestCase public void testHashAfterHash() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null); - _exchange.registerQueue(new AMQShortString("a.*.#.b.c.#.d"), queue, null); + _exchange.registerQueue(new Binding(null,"a.*.#.b.c.#.d", queue,_exchange, null)); IncomingMessage message = createMessage("a.c.b.b.c"); @@ -308,7 +309,7 @@ public class TopicExchangeTest extends TestCase public void testHashHash() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null); - _exchange.registerQueue(new AMQShortString("a.#.*.#.d"), queue, null); + _exchange.registerQueue(new Binding(null,"a.#.*.#.d", queue,_exchange, null)); IncomingMessage message = createMessage("a.c.b.b.c"); @@ -334,7 +335,7 @@ public class TopicExchangeTest extends TestCase public void testSubMatchFails() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null); - _exchange.registerQueue(new AMQShortString("a.b.c.d"), queue, null); + _exchange.registerQueue(new Binding(null,"a.b.c.d", queue,_exchange, null)); IncomingMessage message = createMessage("a.b.c"); @@ -364,7 +365,7 @@ public class TopicExchangeTest extends TestCase public void testMoreRouting() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null); - _exchange.registerQueue(new AMQShortString("a.b"), queue, null); + _exchange.registerQueue(new Binding(null,"a.b", queue,_exchange, null)); IncomingMessage message = createMessage("a.b.c"); @@ -379,7 +380,7 @@ public class TopicExchangeTest extends TestCase public void testMoreQueue() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null); - _exchange.registerQueue(new AMQShortString("a.b"), queue, null); + _exchange.registerQueue(new Binding(null,"a.b", queue,_exchange, null)); IncomingMessage message = createMessage("a"); diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index 91bb5d2529..cafd212dd3 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -334,6 +334,7 @@ public class DurableSubscriptionTest extends QpidTestCase { _logger.info("Receive message on consumer 3 :expecting B"); msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT); + assertNotNull(msg); assertEquals("B", ((TextMessage) msg).getText()); } |
