diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-06-02 00:55:36 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-06-02 00:55:36 +0000 |
| commit | f83f52deb36df5277dbcde0913e37693ae7b1b9e (patch) | |
| tree | aac2b064af01a197650bc33d2ad2b41cb0327cfd /java | |
| parent | 1f92ceb67b4c717d425cad75c5ecde8e08f7874e (diff) | |
| download | qpid-python-f83f52deb36df5277dbcde0913e37693ae7b1b9e.tar.gz | |
QPID-4898 : [Java Broker] Allow setting arbitrary arguments in queue defintion within XML config file
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1488636 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
5 files changed, 112 insertions, 10 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index 06691d8659..7a54bdaa66 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.configuration; +import java.util.Collections; +import java.util.Map; import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.ConfigurationException; @@ -66,7 +68,8 @@ public class QueueConfiguration extends AbstractConfiguration "lvqKey", "sortKey", "maximumDeliveryCount", - "deadLetterQueues" + "deadLetterQueues", + "argument" }; } @@ -85,7 +88,7 @@ public class QueueConfiguration extends AbstractConfiguration { return getBooleanValue("durable"); } - + public boolean getExclusive() { return getBooleanValue("exclusive"); @@ -193,4 +196,9 @@ public class QueueConfiguration extends AbstractConfiguration { return getBooleanValue("deadLetterQueues", _vHostConfig.isDeadLetterQueueEnabled()); } + + public Map<String,String> getArguments() + { + return getMap("argument"); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/AbstractConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/AbstractConfiguration.java index 3c17faef75..b87022868e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/AbstractConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/AbstractConfiguration.java @@ -18,6 +18,8 @@ */ package org.apache.qpid.server.configuration.plugins; +import java.util.LinkedHashMap; +import java.util.Map; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.ConversionException; @@ -324,6 +326,19 @@ public abstract class AbstractConfiguration { _config = config; } + + protected Map<String,String> getMap(String name) + { + List elements = getListValue(name,Collections.emptyList()); + + Map<String,String> map = new LinkedHashMap(); + for(Object item : elements) + { + String[] keyValue = String.valueOf(item).split("=",2); + map.put(keyValue[0].trim(), keyValue.length > 1 ? keyValue[1].trim() : null); + } + return map; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 872a936462..37f1f8f7a5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -239,6 +239,21 @@ public class AMQQueueFactory { priorities = ((Number)prioritiesObj).intValue(); } + else if(prioritiesObj instanceof String) + { + try + { + priorities = Integer.parseInt(prioritiesObj.toString()); + } + catch (NumberFormatException e) + { + // TODO - should warn here of invalid format + } + } + else + { + // TODO - should warn here of invalid format + } } else if(arguments.containsKey(QPID_QUEUE_SORT_KEY)) { @@ -445,6 +460,11 @@ public class AMQQueueFactory { Map<String,Object> arguments = new HashMap<String,Object>(); + if(config.getArguments() != null && !config.getArguments().isEmpty()) + { + arguments.putAll(config.getArguments()); + } + if(config.isLVQ() || config.getLVQKey() != null) { arguments.put(QPID_LAST_VALUE_QUEUE, 1); diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java index 7b149df191..c10b3410a5 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,6 +22,7 @@ package org.apache.qpid.server.configuration; import static org.mockito.Mockito.when; +import java.util.Collections; import junit.framework.TestCase; import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.ConfigurationException; @@ -238,6 +239,43 @@ public class QueueConfigurationTest extends TestCase assertEquals("mydescription", qConf.getDescription()); } + + public void testQueueSingleArgument() throws ConfigurationException + { + //Check default value + QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf); + assertTrue(qConf.getArguments().isEmpty()); + + // Check explicit value + final VirtualHostConfiguration vhostConfig = overrideConfiguration("argument", "qpid.group_header_key=mykey"); + qConf = new QueueConfiguration("test", vhostConfig); + assertEquals(Collections.singletonMap("qpid.group_header_key","mykey"), qConf.getArguments()); + } + + + public void testQueueMultipleArguments() throws ConfigurationException + { + //Check default value + QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf); + assertTrue(qConf.getArguments().isEmpty()); + + + PropertiesConfiguration queueConfig = new PropertiesConfiguration(); + queueConfig.addProperty("queues.queue.test.argument", "qpid.group_header_key=mykey"); + queueConfig.addProperty("queues.queue.test.argument", "qpid.shared_msg_group=1"); + + CompositeConfiguration config = new CompositeConfiguration(); + config.addConfiguration(_fullHostConf.getConfig()); + config.addConfiguration(queueConfig); + + final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration("test", config, _broker);; + qConf = new QueueConfiguration("test", vhostConfig); + assertEquals(2, qConf.getArguments().size()); + assertEquals("mykey", qConf.getArguments().get("qpid.group_header_key")); + assertEquals("1", qConf.getArguments().get("qpid.shared_msg_group")); + } + + private VirtualHostConfiguration overrideConfiguration(String property, Object value) throws ConfigurationException { diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index eee6f315d0..2a56ca2121 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.queue; import static org.mockito.Mockito.when; +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.AMQException; @@ -29,6 +31,7 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.Exchange; @@ -44,6 +47,7 @@ public class AMQQueueFactoryTest extends QpidTestCase { private QueueRegistry _queueRegistry; private VirtualHost _virtualHost; + private Broker _broker; @Override public void setUp() throws Exception @@ -55,14 +59,14 @@ public class AMQQueueFactoryTest extends QpidTestCase configXml.addProperty("store.class", TestableMemoryMessageStore.class.getName()); - Broker broker = BrokerTestHelper.createBrokerMock(); + _broker = BrokerTestHelper.createBrokerMock(); if (getName().equals("testDeadLetterQueueDoesNotInheritDLQorMDCSettings")) { - when(broker.getAttribute(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS)).thenReturn(5); - when(broker.getAttribute(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED)).thenReturn(true); + when(_broker.getAttribute(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS)).thenReturn(5); + when(_broker.getAttribute(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED)).thenReturn(true); } - _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getName(), configXml, broker)); + _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getName(), configXml, _broker)); _queueRegistry = _virtualHost.getQueueRegistry(); @@ -376,6 +380,21 @@ public class AMQQueueFactoryTest extends QpidTestCase } } + public void testMessageGroupFromConfig() throws Exception + { + + PropertiesConfiguration queueConfig = new PropertiesConfiguration(); + queueConfig.addProperty("queues.queue.test.argument", "qpid.group_header_key=mykey"); + queueConfig.addProperty("queues.queue.test.argument", "qpid.shared_msg_group=1"); + + + final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration("test", queueConfig, _broker);; + QueueConfiguration qConf = new QueueConfiguration("test", vhostConfig); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(qConf, _virtualHost); + assertEquals("mykey", queue.getArguments().get(SimpleAMQQueue.QPID_GROUP_HEADER_KEY)); + assertEquals("1", queue.getArguments().get(SimpleAMQQueue.QPID_SHARED_MSG_GROUP)); + } + private String generateStringWithLength(char ch, int length) { StringBuilder sb = new StringBuilder(); @@ -385,4 +404,6 @@ public class AMQQueueFactoryTest extends QpidTestCase } return sb.toString(); } + + } |
