summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java50
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicBinding.java70
5 files changed, 47 insertions, 102 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java b/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
index aa18b5a136..e552596058 100644
--- a/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
@@ -31,7 +31,6 @@ import org.apache.qpid.server.configuration.ExchangeConfigType;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.exchange.ExchangeType;
-import org.apache.qpid.server.exchange.topic.TopicBinding;
import org.apache.qpid.server.exchange.topic.TopicExchangeResult;
import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
import org.apache.qpid.server.exchange.topic.TopicNormalizer;
@@ -47,7 +46,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -55,6 +53,7 @@ import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
public class ManagementExchange implements Exchange, QMFService.Listener
@@ -69,8 +68,7 @@ public class ManagementExchange implements Exchange, QMFService.Listener
private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
- private final Map<TopicBinding, FieldTable> _topicBindings = new HashMap<TopicBinding, FieldTable>();
- private final Set<Binding> _bindingSet = new HashSet<Binding>();
+ private final Set<Binding> _bindingSet = new CopyOnWriteArraySet<Binding>();
private UUID _id;
private static final String AGENT_BANK = "0";
@@ -254,21 +252,7 @@ public class ManagementExchange implements Exchange, QMFService.Listener
public synchronized void addBinding(final Binding b)
{
- _bindingSet.add(b);
-
- for(BindingListener listener : _listeners)
- {
- listener.bindingAdded(this, b);
- }
-
- if(_bindingSet.size() > _bindingCountHigh)
- {
- _bindingCountHigh = _bindingSet.size();
- }
-
- TopicBinding binding = new TopicBinding(new AMQShortString(b.getBindingKey()), b.getQueue(), null);
-
- if(!_topicBindings.containsKey(binding))
+ if(_bindingSet.add(b))
{
AMQShortString routingKey = TopicNormalizer.normalize(new AMQShortString(b.getBindingKey()));
@@ -284,10 +268,20 @@ public class ManagementExchange implements Exchange, QMFService.Listener
{
result.addUnfilteredQueue(b.getQueue());
}
- _topicBindings.put(binding, null);
+ result.addBinding(b);
+ }
+
+ for(BindingListener listener : _listeners)
+ {
+ listener.bindingAdded(this, b);
+ }
+ if(_bindingSet.size() > _bindingCountHigh)
+ {
+ _bindingCountHigh = _bindingSet.size();
}
+
String bindingKey = b.getBindingKey();
if(bindingKey.startsWith("schema.") || bindingKey.startsWith("*.") || bindingKey.startsWith("#."))
@@ -355,6 +349,13 @@ public class ManagementExchange implements Exchange, QMFService.Listener
HashSet<AMQQueue> queues = new HashSet<AMQQueue>();
for(TopicMatcherResult result : results)
{
+ TopicExchangeResult res = (TopicExchangeResult)result;
+
+ for(Binding b : res.getBindings())
+ {
+ b.incrementMatches();
+ }
+
queues.addAll(((TopicExchangeResult)result).getUnfilteredQueues());
}
for(AMQQueue queue : queues)
@@ -378,14 +379,11 @@ public class ManagementExchange implements Exchange, QMFService.Listener
public synchronized void removeBinding(final Binding binding)
{
- _bindingSet.remove(binding);
-
- TopicBinding topicBinding = new TopicBinding(new AMQShortString(binding.getBindingKey()), binding.getQueue(), null);
-
- if(_topicBindings.containsKey(topicBinding))
+ if(_bindingSet.remove(binding))
{
- AMQShortString bindingKey = TopicNormalizer.normalize(topicBinding.getBindingKey());
+ AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
+ result.removeBinding(binding);
result.removeUnfilteredQueue(binding.getQueue());
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index 41ae52e9b8..be4e8f8ec1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -110,7 +110,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
public String[] getExchangeTypes() throws IOException
{
ArrayList<String> exchangeTypes = new ArrayList<String>();
- for(ExchangeType<? extends Exchange> ex : _exchangeFactory.getRegisteredTypes())
+ for(ExchangeType<? extends Exchange> ex : _exchangeFactory.getPublicCreatableTypes())
{
exchangeTypes.add(ex.getName().toString());
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index 1c4c341c14..9be8bddd28 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -20,8 +20,12 @@
*/
package org.apache.qpid.server.exchange;
-import org.apache.log4j.Logger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.AMQShortString;
@@ -30,10 +34,6 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
public class DefaultExchangeFactory implements ExchangeFactory
{
private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class);
@@ -60,6 +60,21 @@ public class DefaultExchangeFactory implements ExchangeFactory
{
return _exchangeClassMap.values();
}
+
+ public Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes()
+ {
+ Collection<ExchangeType<? extends Exchange>> publicTypes =
+ new ArrayList<ExchangeType<? extends Exchange>>();
+ publicTypes.addAll(_exchangeClassMap.values());
+
+ //Remove the ManagementExchange type if present, as these
+ //are private and cannot be created by external means
+ publicTypes.remove(ManagementExchange.TYPE);
+
+ return publicTypes;
+ }
+
+
public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete)
throws AMQException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
index b91bf559f1..aa4cc1ec24 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
@@ -38,6 +38,8 @@ public interface ExchangeFactory
void initialise(VirtualHostConfiguration hostConfig);
Collection<ExchangeType<? extends Exchange>> getRegisteredTypes();
+
+ Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes();
Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicBinding.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicBinding.java
deleted file mode 100644
index c6383a886e..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicBinding.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange.topic;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.exchange.TopicExchange;
-
-public class TopicBinding
-{
- private final AMQShortString _bindingKey;
- private final AMQQueue _queue;
- private final FieldTable _args;
-
- public TopicBinding(AMQShortString bindingKey, AMQQueue queue, FieldTable args)
- {
- _bindingKey = bindingKey;
- _queue = queue;
- _args = args;
- }
-
- public AMQShortString getBindingKey()
- {
- return _bindingKey;
- }
-
- public AMQQueue getQueue()
- {
- return _queue;
- }
-
- public int hashCode()
- {
- return (_bindingKey == null ? 1 : _bindingKey.hashCode())*31 +_queue.hashCode();
- }
-
- public boolean equals(Object o)
- {
- if(this == o)
- {
- return true;
- }
- if(o instanceof TopicBinding)
- {
- TopicBinding other = (TopicBinding) o;
- return (_queue == other._queue)
- && ((_bindingKey == null) ? other._bindingKey == null : _bindingKey.equals(other._bindingKey));
- }
- return false;
- }
-}