diff options
Diffstat (limited to 'java')
5 files changed, 47 insertions, 102 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java b/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java index aa18b5a136..e552596058 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java @@ -31,7 +31,6 @@ import org.apache.qpid.server.configuration.ExchangeConfigType; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeReferrer; import org.apache.qpid.server.exchange.ExchangeType; -import org.apache.qpid.server.exchange.topic.TopicBinding; import org.apache.qpid.server.exchange.topic.TopicExchangeResult; import org.apache.qpid.server.exchange.topic.TopicMatcherResult; import org.apache.qpid.server.exchange.topic.TopicNormalizer; @@ -47,7 +46,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -55,6 +53,7 @@ import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicLong; public class ManagementExchange implements Exchange, QMFService.Listener @@ -69,8 +68,7 @@ public class ManagementExchange implements Exchange, QMFService.Listener private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults = new ConcurrentHashMap<AMQShortString, TopicExchangeResult>(); - private final Map<TopicBinding, FieldTable> _topicBindings = new HashMap<TopicBinding, FieldTable>(); - private final Set<Binding> _bindingSet = new HashSet<Binding>(); + private final Set<Binding> _bindingSet = new CopyOnWriteArraySet<Binding>(); private UUID _id; private static final String AGENT_BANK = "0"; @@ -254,21 +252,7 @@ public class ManagementExchange implements Exchange, QMFService.Listener public synchronized void addBinding(final Binding b) { - _bindingSet.add(b); - - for(BindingListener listener : _listeners) - { - listener.bindingAdded(this, b); - } - - if(_bindingSet.size() > _bindingCountHigh) - { - _bindingCountHigh = _bindingSet.size(); - } - - TopicBinding binding = new TopicBinding(new AMQShortString(b.getBindingKey()), b.getQueue(), null); - - if(!_topicBindings.containsKey(binding)) + if(_bindingSet.add(b)) { AMQShortString routingKey = TopicNormalizer.normalize(new AMQShortString(b.getBindingKey())); @@ -284,10 +268,20 @@ public class ManagementExchange implements Exchange, QMFService.Listener { result.addUnfilteredQueue(b.getQueue()); } - _topicBindings.put(binding, null); + result.addBinding(b); + } + + for(BindingListener listener : _listeners) + { + listener.bindingAdded(this, b); + } + if(_bindingSet.size() > _bindingCountHigh) + { + _bindingCountHigh = _bindingSet.size(); } + String bindingKey = b.getBindingKey(); if(bindingKey.startsWith("schema.") || bindingKey.startsWith("*.") || bindingKey.startsWith("#.")) @@ -355,6 +349,13 @@ public class ManagementExchange implements Exchange, QMFService.Listener HashSet<AMQQueue> queues = new HashSet<AMQQueue>(); for(TopicMatcherResult result : results) { + TopicExchangeResult res = (TopicExchangeResult)result; + + for(Binding b : res.getBindings()) + { + b.incrementMatches(); + } + queues.addAll(((TopicExchangeResult)result).getUnfilteredQueues()); } for(AMQQueue queue : queues) @@ -378,14 +379,11 @@ public class ManagementExchange implements Exchange, QMFService.Listener public synchronized void removeBinding(final Binding binding) { - _bindingSet.remove(binding); - - TopicBinding topicBinding = new TopicBinding(new AMQShortString(binding.getBindingKey()), binding.getQueue(), null); - - if(_topicBindings.containsKey(topicBinding)) + if(_bindingSet.remove(binding)) { - AMQShortString bindingKey = TopicNormalizer.normalize(topicBinding.getBindingKey()); + AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey())); TopicExchangeResult result = _topicExchangeResults.get(bindingKey); + result.removeBinding(binding); result.removeUnfilteredQueue(binding.getQueue()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 41ae52e9b8..be4e8f8ec1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -110,7 +110,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr public String[] getExchangeTypes() throws IOException { ArrayList<String> exchangeTypes = new ArrayList<String>(); - for(ExchangeType<? extends Exchange> ex : _exchangeFactory.getRegisteredTypes()) + for(ExchangeType<? extends Exchange> ex : _exchangeFactory.getPublicCreatableTypes()) { exchangeTypes.add(ex.getName().toString()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 1c4c341c14..9be8bddd28 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -20,8 +20,12 @@ */ package org.apache.qpid.server.exchange; -import org.apache.log4j.Logger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnknownExchangeType; import org.apache.qpid.framing.AMQShortString; @@ -30,10 +34,6 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - public class DefaultExchangeFactory implements ExchangeFactory { private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class); @@ -60,6 +60,21 @@ public class DefaultExchangeFactory implements ExchangeFactory { return _exchangeClassMap.values(); } + + public Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes() + { + Collection<ExchangeType<? extends Exchange>> publicTypes = + new ArrayList<ExchangeType<? extends Exchange>>(); + publicTypes.addAll(_exchangeClassMap.values()); + + //Remove the ManagementExchange type if present, as these + //are private and cannot be created by external means + publicTypes.remove(ManagementExchange.TYPE); + + return publicTypes; + } + + public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) throws AMQException diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java index b91bf559f1..aa4cc1ec24 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java @@ -38,6 +38,8 @@ public interface ExchangeFactory void initialise(VirtualHostConfiguration hostConfig); Collection<ExchangeType<? extends Exchange>> getRegisteredTypes(); + + Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes(); Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicBinding.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicBinding.java deleted file mode 100644 index c6383a886e..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicBinding.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange.topic; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.exchange.TopicExchange; - -public class TopicBinding -{ - private final AMQShortString _bindingKey; - private final AMQQueue _queue; - private final FieldTable _args; - - public TopicBinding(AMQShortString bindingKey, AMQQueue queue, FieldTable args) - { - _bindingKey = bindingKey; - _queue = queue; - _args = args; - } - - public AMQShortString getBindingKey() - { - return _bindingKey; - } - - public AMQQueue getQueue() - { - return _queue; - } - - public int hashCode() - { - return (_bindingKey == null ? 1 : _bindingKey.hashCode())*31 +_queue.hashCode(); - } - - public boolean equals(Object o) - { - if(this == o) - { - return true; - } - if(o instanceof TopicBinding) - { - TopicBinding other = (TopicBinding) o; - return (_queue == other._queue) - && ((_bindingKey == null) ? other._bindingKey == null : _bindingKey.equals(other._bindingKey)); - } - return false; - } -} |
