summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/AccessControl.java2
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java2
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java4
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java2
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java (renamed from java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java)115
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java36
-rw-r--r--java/broker-plugins/firewall/src/main/java/org/apache/qpid/server/security/access/plugins/Firewall.java2
-rw-r--r--java/broker-plugins/simple-xml/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java120
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java76
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java26
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java4
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java7
21 files changed, 335 insertions, 98 deletions
diff --git a/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/AccessControl.java b/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/AccessControl.java
index 59fbaa4a34..e6e0059902 100644
--- a/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/AccessControl.java
+++ b/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/AccessControl.java
@@ -60,7 +60,7 @@ public class AccessControl extends AbstractPlugin
public AccessControl newInstance(ConfigurationPlugin config) throws ConfigurationException
{
- AccessControlConfiguration configuration = config.getConfiguration(AccessControlConfiguration.class);
+ AccessControlConfiguration configuration = config.getConfiguration(AccessControlConfiguration.class.getName());
// If there is no configuration for this plugin then don't load it.
if (configuration == null)
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
index c094c7eb6d..0949204a33 100644
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
+++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
@@ -95,7 +95,7 @@ public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin
"('messageAge','depth' or 'messageCount') must be specified.");
}
- SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class);
+ SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class.getName());
PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager();
Map<String, SlowConsumerPolicyPluginFactory> factories = pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class);
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
index 77819f8dbf..cac52c2fdf 100644
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
+++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
@@ -40,7 +40,7 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
{
public SlowConsumerDetection newInstance(VirtualHost vhost)
{
- SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class);
+ SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class.getName());
if (config == null)
{
@@ -74,7 +74,7 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
try
{
SlowConsumerDetectionQueueConfiguration config =
- q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class);
+ q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
if (checkQueueStatus(q, config))
{
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java
index 3c67f6e6d7..c5275aa0bf 100644
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java
+++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java
@@ -45,7 +45,7 @@ public class TopicDeletePolicy implements SlowConsumerPolicyPlugin
public TopicDeletePolicy newInstance(ConfigurationPlugin configuration) throws ConfigurationException
{
TopicDeletePolicyConfiguration config =
- configuration.getConfiguration(TopicDeletePolicyConfiguration.class);
+ configuration.getConfiguration(TopicDeletePolicyConfiguration.class.getName());
TopicDeletePolicy policy = new TopicDeletePolicy();
policy.configure(config);
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java
index f0be3d2db0..513bafa8ad 100644
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
+++ b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java
@@ -47,7 +47,7 @@ import java.util.concurrent.TimeUnit;
* Slow consumers should on a topic should expect to receive a
* 506 : Resource Error if the hit a predefined threshold.
*/
-public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener
+public class GlobalQueuesTest extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener
{
Topic _destination;
private CountDownLatch _disconnectionLatch = new CountDownLatch(1);
@@ -59,6 +59,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
private Exception _publisherError = null;
private JMSException _connectionException = null;
private static final long JOIN_WAIT = 5000;
+ protected String CONFIG_SECTION = ".queues";
@Override
public void setUp() throws IOException, ConfigurationException, NamingException
@@ -71,32 +72,26 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
+ getConnectionURL().getVirtualHost().substring(1) +
".slow-consumer-detection.timeunit", "SECONDS");
- setConfigurationProperty("virtualhosts.virtualhost."
- + getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "policy.name", "TopicDelete");
-
-
/**
* Queue Configuration
<slow-consumer-detection>
- <!-- The depth before which the policy will be applied-->
- <depth>4235264</depth>
-
- <!-- The message age before which the policy will be applied-->
- <messageAge>600000</messageAge>
-
- <!-- The number of message before which the policy will be applied-->
- <messageCount>50</messageCount>
-
- <!-- Policies configuration -->
- <policy>
- <name>TopicDelete</name>
- <topicDelete>
- <delete-persistent/>
- </topicDelete>
- </policy>
+ <!-- The depth before which the policy will be applied-->
+ <depth>4235264</depth>
+
+ <!-- The message age before which the policy will be applied-->
+ <messageAge>600000</messageAge>
+
+ <!-- The number of message before which the policy will be applied-->
+ <messageCount>50</messageCount>
+
+ <!-- Policies configuration -->
+ <policy>
+ <name>TopicDelete</name>
+ <topicDelete>
+ <delete-persistent/>
+ </topicDelete>
+ </policy>
</slow-consumer-detection>
*/
@@ -105,8 +100,8 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
* VirtualHost Plugin Configuration
<slow-consumer-detection>
- <delay>1</delay>
- <timeunit>MINUTES</timeunit>
+ <delay>1</delay>
+ <timeunit>MINUTES</timeunit>
</slow-consumer-detection>
*/
@@ -132,6 +127,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
* Clients should not have to modify their code based on the protocol in use.
*
* @param ackMode @see javax.jms.Session
+ *
* @throws Exception
*/
public void topicConsumer(int ackMode, boolean durable) throws Exception
@@ -260,6 +256,27 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
}
+ public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException
+ {
+ setConfigurationProperty("virtualhosts.virtualhost."
+ + getConnectionURL().getVirtualHost().substring(1) +
+ CONFIG_SECTION + ".slow-consumer-detection." +
+ "policy.name", "TopicDelete");
+
+ setConfigurationProperty("virtualhosts.virtualhost." +
+ getConnectionURL().getVirtualHost().substring(1) +
+ CONFIG_SECTION + ".slow-consumer-detection." +
+ property, value);
+
+ if (deleteDurable)
+ {
+ setConfigurationProperty("virtualhosts.virtualhost."
+ + getConnectionURL().getVirtualHost().substring(1) +
+ CONFIG_SECTION + ".slow-consumer-detection." +
+ "policy.topicdelete.delete-persistent", "");
+ }
+ }
+
/**
* Test that setting messageCount takes affect on topics
*
@@ -271,10 +288,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
{
MAX_QUEUE_MESSAGE_COUNT = 10;
- setConfigurationProperty("virtualhosts.virtualhost." +
- getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1));
+ setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), false);
//Start the broker
super.setUp();
@@ -295,10 +309,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
{
MAX_QUEUE_MESSAGE_COUNT = 10;
- setConfigurationProperty("virtualhosts.virtualhost." +
- getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "depth", String.valueOf(MESSAGE_SIZE * 9));
+ setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), false);
//Start the broker
super.setUp();
@@ -321,10 +332,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
{
MAX_QUEUE_MESSAGE_COUNT = 10;
- setConfigurationProperty("virtualhosts.virtualhost." +
- getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "messageAge", String.valueOf(DISCONNECTION_WAIT / 2));
+ setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 2), false);
//Start the broker
super.setUp();
@@ -346,15 +354,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
{
MAX_QUEUE_MESSAGE_COUNT = 10;
- setConfigurationProperty("virtualhosts.virtualhost." +
- getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1));
-
- setConfigurationProperty("virtualhosts.virtualhost."
- + getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "policy.topicdelete.delete-persistent", "");
+ setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), true);
//Start the broker
super.setUp();
@@ -377,15 +377,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
{
MAX_QUEUE_MESSAGE_COUNT = 10;
- setConfigurationProperty("virtualhosts.virtualhost." +
- getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "depth", String.valueOf(MESSAGE_SIZE * 9));
-
- setConfigurationProperty("virtualhosts.virtualhost."
- + getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "policy.topicdelete.delete-persistent", "");
+ setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), true);
//Start the broker
super.setUp();
@@ -395,7 +387,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
}
- /**
+ /**
* Test that setting messageAge has an effect on topics
*
* Ensure we set the delete-persistent option
@@ -410,15 +402,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
{
MAX_QUEUE_MESSAGE_COUNT = 10;
- setConfigurationProperty("virtualhosts.virtualhost." +
- getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "messageAge", String.valueOf(DISCONNECTION_WAIT / 5));
-
- setConfigurationProperty("virtualhosts.virtualhost."
- + getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "policy.topicdelete.delete-persistent", "");
+ setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 5), true);
//Start the broker
super.setUp();
@@ -438,6 +422,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
_disconnectionLatch.countDown();
}
+
/// Connection Listener
public void bytesSent(long count)
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java
new file mode 100644
index 0000000000..1f8103fa3c
--- /dev/null
+++ b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import javax.naming.NamingException;
+import java.io.IOException;
+
+public class GlobalTopicsTest extends GlobalQueuesTest
+{
+ @Override
+ public void setUp() throws NamingException, IOException, ConfigurationException
+ {
+ CONFIG_SECTION = ".topics";
+ super.setUp();
+ }
+}
diff --git a/java/broker-plugins/firewall/src/main/java/org/apache/qpid/server/security/access/plugins/Firewall.java b/java/broker-plugins/firewall/src/main/java/org/apache/qpid/server/security/access/plugins/Firewall.java
index 6fe0d03741..ae2baa95ca 100644
--- a/java/broker-plugins/firewall/src/main/java/org/apache/qpid/server/security/access/plugins/Firewall.java
+++ b/java/broker-plugins/firewall/src/main/java/org/apache/qpid/server/security/access/plugins/Firewall.java
@@ -44,7 +44,7 @@ public class Firewall extends AbstractPlugin
{
public Firewall newInstance(ConfigurationPlugin config) throws ConfigurationException
{
- FirewallConfiguration configuration = config.getConfiguration(FirewallConfiguration.class);
+ FirewallConfiguration configuration = config.getConfiguration(FirewallConfiguration.class.getName());
// If there is no configuration for this plugin then don't load it.
if (configuration == null)
diff --git a/java/broker-plugins/simple-xml/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java b/java/broker-plugins/simple-xml/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
index 1bf8761978..c9a476c5f2 100644
--- a/java/broker-plugins/simple-xml/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
+++ b/java/broker-plugins/simple-xml/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
@@ -52,7 +52,7 @@ public class SimpleXML extends AbstractPlugin
{
public SimpleXML newInstance(ConfigurationPlugin config) throws ConfigurationException
{
- SimpleXMLConfiguration configuration = config.getConfiguration(SimpleXMLConfiguration.class);
+ SimpleXMLConfiguration configuration = config.getConfiguration(SimpleXMLConfiguration.class.getName());
// If there is no configuration for this plugin then don't load it.
if (configuration == null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
index 50377eaf52..b24a326ed3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
@@ -32,6 +32,8 @@ import org.apache.qpid.server.configuration.BindingConfig;
import org.apache.qpid.server.configuration.BindingConfigType;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BindingMessages;
@@ -201,6 +203,15 @@ public class BindingFactory
exchange.addBinding(b);
getConfigStore().addConfiguredObject(b);
b.logCreation();
+
+ //Reconfigure the queue for to reflect this new binding.
+ ConfigurationPlugin config = queue.getVirtualHost().getConfiguration().getQueueConfiguration(queue);
+
+ if (config != null)
+ {
+ // Reconfigure with new config.
+ queue.configure(config);
+ }
return true;
}
else
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java
new file mode 100644
index 0000000000..0218bf7273
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class TopicConfiguration extends ConfigurationPlugin
+{
+ public static final ConfigurationPluginFactory FACTORY = new TopicConfigurationFactory();
+
+ private static final String VIRTUALHOSTS_VIRTUALHOST_TOPICS = "virtualhosts.virtualhost.topics";
+
+ public static class TopicConfigurationFactory implements ConfigurationPluginFactory
+ {
+
+ public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException
+ {
+ TopicConfiguration topicsConfig = new TopicConfiguration();
+ topicsConfig.setConfiguration(path, config);
+ return topicsConfig;
+ }
+
+ public List<String> getParentPaths()
+ {
+ return Arrays.asList(VIRTUALHOSTS_VIRTUALHOST_TOPICS);
+ }
+ }
+
+ Map<String, TopicConfig> _topics = new HashMap<String, TopicConfig>();
+
+ public String[] getElementsProcessed()
+ {
+ return new String[]{"topic"};
+ }
+
+ @Override
+ public void validateConfiguration() throws ConfigurationException
+ {
+ if (_configuration.isEmpty())
+ {
+ throw new ConfigurationException("Topics section cannot be empty.");
+ }
+
+ int topics = _configuration.getList("topic.name").size();
+
+ for(int index=0; index<topics;index++)
+ {
+ TopicConfig topic = new TopicConfig();
+ topic.setConfiguration(VIRTUALHOSTS_VIRTUALHOST_TOPICS + ".topic", _configuration.subset("topic(" + index + ")"));
+
+ String topicName = _configuration.getString("topic(" + index + ").name");
+ if(_topics.containsKey(topicName))
+ {
+ throw new ConfigurationException("Topics section cannot contain two entries for the same topic.");
+ }
+ else
+ {
+ _topics.put(topicName, topic);
+ }
+ }
+ }
+
+ public String toString()
+ {
+ return getClass().getName() + ": Defined Topics:" + _topics.size();
+ }
+
+ public static class TopicConfig extends ConfigurationPlugin
+ {
+ @Override
+ public String[] getElementsProcessed()
+ {
+ return new String[]{"name"};
+ }
+
+ public String getName()
+ {
+ // If we don't specify a topic name then match all topics
+ String configName = getStringValue("name");
+ return configName == null ? "#" : configName;
+ }
+
+
+ public void validateConfiguration() throws ConfigurationException
+ {
+ if(_configuration.isEmpty())
+ {
+ throw new ConfigurationException("Topic section cannot be empty.");
+ }
+ }
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 2be3311403..7dab02aee7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.configuration;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -29,7 +31,15 @@ import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeType;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
@@ -147,6 +157,72 @@ public class VirtualHostConfiguration extends ConfigurationPlugin
}
}
+ public ConfigurationPlugin getQueueConfiguration(AMQQueue queue)
+ {
+ VirtualHostConfiguration hostConfig = queue.getVirtualHost().getConfiguration();
+
+ // First check if we have a named queue configuration (the easy case)
+ if (Arrays.asList(hostConfig.getQueueNames()).contains(queue.getName()))
+ {
+ return null;
+ }
+
+ // We don't have an explicit queue config we must find out what we need.
+ ArrayList<Binding> bindings = new ArrayList<Binding>(queue.getBindings());
+
+ List<AMQShortString> exchangeClasses = new ArrayList<AMQShortString>(bindings.size());
+
+ //Remove default exchange
+ for (int index = 0; index < bindings.size(); index++)
+ {
+ // Ignore the DEFAULT Exchange binding
+ if (bindings.get(index).getExchange().getNameShortString().equals(ExchangeDefaults.DEFAULT_EXCHANGE_NAME))
+ {
+ bindings.remove(index);
+ }
+ else
+ {
+ exchangeClasses.add(bindings.get(index).getExchange().getType().getName());
+
+ if (exchangeClasses.size() > 1)
+ {
+ // If we have more than 1 class of exchange then we can only use the global queue configuration.
+ // and this will be returned from the default getQueueConfiguration
+ return null;
+ }
+ }
+ }
+
+ // If we are just bound the the default exchange then use the default.
+ if (bindings.isEmpty())
+ {
+ return null;
+ }
+
+ // If we are bound to only one type of exchange then we are going
+ // to have to resolve the configuration for that exchange.
+
+ String exchangeName = bindings.get(0).getExchange().getType().getName().toString();
+
+ // Lookup a Configuration handler for this Exchange.
+
+ // Build the expected class name. <Exchangename>sConfiguration
+ // i.e. TopicConfiguration or HeadersConfiguration
+ String exchangeClass = "org.apache.qpid.server.configuration."
+ + exchangeName.substring(0, 1).toUpperCase()
+ + exchangeName.substring(1) + "Configuration";
+
+ ConfigurationPlugin configPlugin
+ = queue.getVirtualHost().getConfiguration().getConfiguration(exchangeClass);
+
+
+ // now need to perform the queue-topic-topics-queue magic.
+
+ System.err.println("*********** Reconfiguring queue with config:"+configPlugin);
+
+ return configPlugin;
+ }
+
public long getMemoryUsageMaximum()
{
return getLongValue("queues.maximumMemoryUsage");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
index 9024c6aec6..1da1459f70 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
@@ -38,8 +38,8 @@ public abstract class ConfigurationPlugin
{
protected static final Logger _logger = Logger.getLogger(ConfigurationPlugin.class);
- private Map<Class<? extends ConfigurationPlugin>, ConfigurationPlugin>
- _pluginConfiguration = new HashMap<Class<? extends ConfigurationPlugin>, ConfigurationPlugin>();
+ private Map<String, ConfigurationPlugin>
+ _pluginConfiguration = new HashMap<String, ConfigurationPlugin>();
protected Configuration _configuration;
@@ -69,7 +69,7 @@ public abstract class ConfigurationPlugin
return _configuration;
}
- public <C extends ConfigurationPlugin> C getConfiguration(Class<C> plugin)
+ public <C extends ConfigurationPlugin> C getConfiguration(String plugin)
{
return (C) _pluginConfiguration.get(plugin);
}
@@ -155,7 +155,7 @@ public abstract class ConfigurationPlugin
List<ConfigurationPlugin> handlers = configurationManager.getConfigurationPlugins(configurationElement, handled);
for (ConfigurationPlugin plugin : handlers)
{
- _pluginConfiguration.put(plugin.getClass(), plugin);
+ _pluginConfiguration.put(plugin.getClass().getName(), plugin);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
index 466bc9e228..8364635632 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
@@ -34,6 +34,8 @@ import org.apache.felix.framework.Felix;
import org.apache.felix.framework.util.StringMap;
import org.apache.log4j.Logger;
import org.apache.qpid.common.Closeable;
+import org.apache.qpid.server.configuration.TopicConfiguration;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
import org.apache.qpid.server.exchange.ExchangeType;
import org.apache.qpid.server.security.SecurityManager;
@@ -79,6 +81,7 @@ public class PluginManager implements Closeable
_securityPlugins.put(pluginFactory.getPluginName(), pluginFactory);
}
for (ConfigurationPluginFactory configFactory : Arrays.asList(
+ TopicConfiguration.FACTORY,
SecurityManager.SecurityConfiguration.FACTORY,
AllowAll.AllowAllConfiguration.FACTORY,
DenyAll.DenyAllConfiguration.FACTORY,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 3b60734a2e..225fbec930 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -274,9 +275,9 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
public void doTask(AMQQueue queue) throws AMQException;
}
- void configure(QueueConfiguration config);
+ void configure(ConfigurationPlugin config);
- QueueConfiguration getConfiguration();
+ ConfigurationPlugin getConfiguration();
ManagedObject getManagedObject();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index f81a4a6911..451d59b2e9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.ConfigStore;
@@ -185,7 +186,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
//TODO : persist creation time
private long _createTime = System.currentTimeMillis();
- private QueueConfiguration _queueConfiguration;
+ private ConfigurationPlugin _queueConfiguration;
@@ -2065,24 +2066,29 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- public void configure(QueueConfiguration config)
+ public void configure(ConfigurationPlugin config)
{
if (config != null)
{
- setMaximumMessageAge(config.getMaximumMessageAge());
- setMaximumQueueDepth(config.getMaximumQueueDepth());
- setMaximumMessageSize(config.getMaximumMessageSize());
- setMaximumMessageCount(config.getMaximumMessageCount());
- setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
- _capacity = config.getCapacity();
- _flowResumeCapacity = config.getFlowResumeCapacity();
+ if (config instanceof QueueConfiguration)
+ {
+
+ setMaximumMessageAge(((QueueConfiguration)config).getMaximumMessageAge());
+ setMaximumQueueDepth(((QueueConfiguration)config).getMaximumQueueDepth());
+ setMaximumMessageSize(((QueueConfiguration)config).getMaximumMessageSize());
+ setMaximumMessageCount(((QueueConfiguration)config).getMaximumMessageCount());
+ setMinimumAlertRepeatGap(((QueueConfiguration)config).getMinimumAlertRepeatGap());
+ _capacity = ((QueueConfiguration)config).getCapacity();
+ _flowResumeCapacity = ((QueueConfiguration)config).getFlowResumeCapacity();
+ }
_queueConfiguration = config;
+
}
}
- public QueueConfiguration getConfiguration()
+ public ConfigurationPlugin getConfiguration()
{
return _queueConfiguration;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
index 362d919a5e..ff28d76053 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
@@ -153,7 +153,7 @@ public class SecurityManager
public Map<String, SecurityPlugin> configurePlugins(ConfigurationPlugin hostConfig) throws ConfigurationException
{
Map<String, SecurityPlugin> plugins = new HashMap<String, SecurityPlugin>();
- SecurityConfiguration securityConfig = hostConfig.getConfiguration(SecurityConfiguration.class);
+ SecurityConfiguration securityConfig = hostConfig.getConfiguration(SecurityConfiguration.class.getName());
// If we have no security Configuration then there is nothing to configure.
if (securityConfig != null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
index d4777b8cb3..db18a89231 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
@@ -66,7 +66,7 @@ public class AllowAll extends BasicPlugin
{
public AllowAll newInstance(ConfigurationPlugin config) throws ConfigurationException
{
- AllowAllConfiguration configuration = config.getConfiguration(AllowAllConfiguration.class);
+ AllowAllConfiguration configuration = config.getConfiguration(AllowAllConfiguration.class.getName());
// If there is no configuration for this plugin then don't load it.
if (configuration == null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java
index cd68511730..6c0fb1eaa4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java
@@ -66,7 +66,7 @@ public class DenyAll extends BasicPlugin
{
public DenyAll newInstance(ConfigurationPlugin config) throws ConfigurationException
{
- DenyAllConfiguration configuration = config.getConfiguration(DenyAllConfiguration.class);
+ DenyAllConfiguration configuration = config.getConfiguration(DenyAllConfiguration.class.getName());
// If there is no configuration for this plugin then don't load it.
if (configuration == null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java
index 1250cdcb1b..bd99cdd1fa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java
@@ -60,7 +60,7 @@ public class LegacyAccess extends BasicPlugin
{
public LegacyAccess newInstance(ConfigurationPlugin config) throws ConfigurationException
{
- LegacyAccessConfiguration configuration = config.getConfiguration(LegacyAccessConfiguration.class);
+ LegacyAccessConfiguration configuration = config.getConfiguration(LegacyAccessConfiguration.class.getName());
// If there is no configuration for this plugin then don't load it.
if (configuration == null)
diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java
index 7a8cabf512..956bb6f8fa 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java
@@ -38,7 +38,7 @@ public class LogMessageTest extends TestCase
{
Locale usLocal = Locale.US;
Locale.setDefault(usLocal);
- ResourceBundle _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.LogMessages",
+ ResourceBundle _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Broker_logmessages",
usLocal);
assertNotNull("Unable to load ResourceBundle", _messages);
@@ -55,7 +55,7 @@ public class LogMessageTest extends TestCase
Locale.setDefault(japanese);
try
{
- ResourceBundle _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.LogMessages",
+ ResourceBundle _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Broker_logmessages",
japanese);
assertNotNull("Unable to load ResourceBundle", _messages);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 70d146437f..51b049787c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -21,8 +21,8 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.*;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.subscription.Subscription;
@@ -36,7 +36,6 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.AMQException;
-import javax.swing.*;
import java.util.List;
import java.util.Set;
import java.util.Map;
@@ -520,12 +519,12 @@ public class MockAMQQueue implements AMQQueue
//To change body of implemented methods use File | Settings | File Templates.
}
- public void configure(QueueConfiguration config)
+ public void configure(ConfigurationPlugin config)
{
}
- public QueueConfiguration getConfiguration()
+ public ConfigurationPlugin getConfiguration()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}