diff options
| author | Robert Gemmell <robbie@apache.org> | 2010-03-18 16:23:56 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2010-03-18 16:23:56 +0000 |
| commit | 9aaa7d922df6a3b0bf4339b7481e90afb01b784b (patch) | |
| tree | b0cc66e14fac16ce5519d84f6e695121cbbc90ef /java/broker/src/main | |
| parent | f929c25594a3375d6ff89b43c453c6b6efecc89b (diff) | |
| download | qpid-python-9aaa7d922df6a3b0bf4339b7481e90afb01b784b.tar.gz | |
QPID-2379: add Binding.msgMatched() support to the HeadersExchange
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@924879 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
3 files changed, 140 insertions, 143 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java b/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java index 0b689c16a7..0b6035f32d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java +++ b/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java @@ -37,7 +37,7 @@ public class Binding private final UUID _id; private final AtomicLong _matches = new AtomicLong(); - Binding(UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments) + public Binding(UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments) { _id = id; _bindingKey = bindingKey; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java index 35c4a8f9b2..f58a6513a9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.log4j.Logger; import org.apache.qpid.framing.AMQTypedValue; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.message.AMQMessageHeader; /** @@ -38,69 +39,35 @@ class HeadersBinding private static final Logger _logger = Logger.getLogger(HeadersBinding.class); private final FieldTable _mappings; + private final Binding _binding; private final Set<String> required = new HashSet<String>(); private final Map<String,Object> matches = new HashMap<String,Object>(); private boolean matchAny; - private final class MatchesOrProcessor implements FieldTable.FieldTableElementProcessor - { - private Boolean _result = Boolean.FALSE; - - public boolean processElement(String propertyName, AMQTypedValue value) - { - if((value != null) && (value.getValue() != null) && value.getValue().equals(matches.get(propertyName))) - { - _result = Boolean.TRUE; - return false; - } - return true; - } - - public Object getResult() - { - return _result; - } - } - - private final class RequiredOrProcessor implements FieldTable.FieldTableElementProcessor - { - Boolean _result = Boolean.FALSE; - - public boolean processElement(String propertyName, AMQTypedValue value) - { - if(required.contains(propertyName)) - { - _result = Boolean.TRUE; - return false; - } - return true; - } - - public Object getResult() - { - return _result; - } - } - - - /** - * Creates a binding for a set of mappings. Those mappings whose value is + * Creates a header binding for a set of mappings. Those mappings whose value is * null or the empty string are assumed only to be required headers, with * no constraint on the value. Those with a non-null value are assumed to * define a required match of value. - * @param mappings the defined mappings this binding should use + * + * @param binding the binding to create a header binding using */ - - HeadersBinding(FieldTable mappings) + public HeadersBinding(Binding binding) { - _mappings = mappings; - initMappings(); + _binding = binding; + if(_binding !=null) + { + _mappings = FieldTable.convertToFieldTable(_binding.getArguments()); + initMappings(); + } + else + { + _mappings = null; + } } - + private void initMappings() { - _mappings.processOverElements(new FieldTable.FieldTableElementProcessor() { @@ -133,6 +100,11 @@ class HeadersBinding { return _mappings; } + + public Binding getBinding() + { + return _binding; + } /** * Checks whether the supplied headers match the requirements of this binding @@ -250,4 +222,39 @@ class HeadersBinding { return key.startsWith("X-") || key.startsWith("x-"); } -} + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + + if (o == null) + { + return false; + } + + if (!(o instanceof HeadersBinding)) + { + return false; + } + + final HeadersBinding hb = (HeadersBinding) o; + + if(_binding == null) + { + if(hb.getBinding() != null) + { + return false; + } + } + else if (!_binding.equals(hb.getBinding())) + { + return false; + } + + return true; + } +}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index ce0b14932f..e98a603d12 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -24,8 +24,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; @@ -36,10 +34,11 @@ import org.apache.qpid.server.binding.Binding; import javax.management.JMException; import java.util.ArrayList; -import java.util.List; +import java.util.LinkedHashSet; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; /** * An exchange that binds queues based on a set of required headers and header values @@ -72,7 +71,14 @@ public class HeadersExchange extends AbstractExchange { private static final Logger _logger = Logger.getLogger(HeadersExchange.class); - + + private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>> _bindingsByKey = + new ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>(); + + private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers = + new CopyOnWriteArrayList<HeadersBinding>(); + + public static final ExchangeType<HeadersExchange> TYPE = new ExchangeType<HeadersExchange>() { @@ -102,34 +108,12 @@ public class HeadersExchange extends AbstractExchange } }; - - private final List<Registration> _bindings = new CopyOnWriteArrayList<Registration>(); - private Map<AMQShortString, Registration> _bindingByKey = new ConcurrentHashMap<AMQShortString, Registration>(); - - public HeadersExchange() { super(TYPE); } + - public void registerQueue(String routingKey, AMQQueue queue, Map<String,Object> args) - { - registerQueue(new AMQShortString(routingKey), queue, FieldTable.convertToFieldTable(args)); - } - - public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) - { - _logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() + " with " + args); - - Registration registration = new Registration(new HeadersBinding(args), queue, routingKey); - _bindings.add(registration); - - } - - public void deregisterQueue(String routingKey, AMQQueue queue, Map<String,Object> args) - { - _bindings.remove(new Registration(args == null ? null : new HeadersBinding(FieldTable.convertToFieldTable(args)), queue, new AMQShortString(routingKey))); - } public ArrayList<BaseQueue> doRoute(InboundMessage payload) { @@ -138,24 +122,27 @@ public class HeadersExchange extends AbstractExchange { _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + header); } - boolean routed = false; - ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>(); - for (Registration e : _bindings) + + LinkedHashSet<BaseQueue> queues = new LinkedHashSet<BaseQueue>(); + + for (HeadersBinding hb : _bindingHeaderMatchers) { - - if (e.binding.matches(header)) + if (hb.matches(header)) { + Binding b = hb.getBinding(); + + b.incrementMatches(); + if (_logger.isDebugEnabled()) { _logger.debug("Exchange " + getNameShortString() + ": delivering message with headers " + - header + " to " + e.queue.getNameShortString()); + header + " to " + b.getQueue().getNameShortString()); } - queues.add(e.queue); - - routed = true; + queues.add(b.getQueue()); } } - return queues; + + return new ArrayList<BaseQueue>(queues); } public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) @@ -166,38 +153,49 @@ public class HeadersExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, AMQQueue queue) { - return isBound(queue); + String bindingKey = (routingKey == null) ? "" : routingKey.toString(); + CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey); + + if(bindings != null) + { + for(Binding binding : bindings) + { + if(binding.getQueue().equals(queue)) + { + return true; + } + } + } + + return false; } public boolean isBound(AMQShortString routingKey) { - return hasBindings(); + String bindingKey = (routingKey == null) ? "" : routingKey.toString(); + CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey); + return bindings != null && !bindings.isEmpty(); } public boolean isBound(AMQQueue queue) { - for (Registration r : _bindings) + for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values()) { - if (r.queue.equals(queue)) + for(Binding binding : bindings) { - return true; + if(binding.getQueue().equals(queue)) + { + return true; + } } } + return false; } public boolean hasBindings() { - return !_bindings.isEmpty(); - } - - - - protected FieldTable getHeaders(ContentHeaderBody contentHeaderFrame) - { - //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers, - //but these are not yet implemented. - return ((BasicContentHeaderProperties) contentHeaderFrame.properties).getHeaders(); + return !getBindings().isEmpty(); } protected AbstractExchangeMBean createMBean() throws JMException @@ -210,59 +208,51 @@ public class HeadersExchange extends AbstractExchange return _logger; } - - static class Registration + protected void onBind(final Binding binding) { - private final HeadersBinding binding; - private final AMQQueue queue; - private final AMQShortString routingKey; + String bindingKey = binding.getBindingKey(); + AMQQueue queue = binding.getQueue(); + AMQShortString routingKey = AMQShortString.valueOf(bindingKey); + Map<String,Object> args = binding.getArguments(); - Registration(HeadersBinding binding, AMQQueue queue, AMQShortString routingKey) - { - this.binding = binding; - this.queue = queue; - this.routingKey = routingKey; - } + assert queue != null; + assert routingKey != null; - public int hashCode() - { - int queueHash = queue.hashCode(); - int routingHash = routingKey == null ? 0 : routingKey.hashCode(); - return queueHash + routingHash; - } + CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey); - public boolean equals(Object o) + if(bindings == null) { - return o instanceof Registration - && ((Registration) o).queue.equals(queue) - && (routingKey == null ? ((Registration)o).routingKey == null - : routingKey.equals(((Registration)o).routingKey)); - } - - public HeadersBinding getBinding() - { - return binding; + bindings = new CopyOnWriteArraySet<Binding>(); + CopyOnWriteArraySet<Binding> newBindings; + if((newBindings = _bindingsByKey.putIfAbsent(bindingKey, bindings)) != null) + { + bindings = newBindings; + } } - - public AMQQueue getQueue() + + if(_logger.isDebugEnabled()) { - return queue; + _logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() + + " with binding key '" +bindingKey + "' and args: " + args); } - public AMQShortString getRoutingKey() - { - return routingKey; - } - } + _bindingHeaderMatchers.add(new HeadersBinding(binding)); + bindings.add(binding); - protected void onBind(final Binding binding) - { - registerQueue(binding.getBindingKey(), binding.getQueue(), binding.getArguments()); } protected void onUnbind(final Binding binding) { - deregisterQueue(binding.getBindingKey(), binding.getQueue(), binding.getArguments()); + assert binding != null; + + CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(binding.getBindingKey()); + if(bindings != null) + { + bindings.remove(binding); + } + + _logger.debug("==============="); + _logger.debug("Removing Binding: " + _bindingHeaderMatchers.remove(new HeadersBinding(binding))); } } |
