diff options
| author | Keith Wall <kwall@apache.org> | 2011-11-28 09:19:15 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2011-11-28 09:19:15 +0000 |
| commit | 34b8e275d9bf816ee35534981a3b5afbf905651a (patch) | |
| tree | a784c3ae5ae0608a519f332554f1bee5253c9375 /qpid/java/broker/src/test | |
| parent | 1c1293b15d9a4475c3e8dadf5d67f027bd64001b (diff) | |
| download | qpid-python-34b8e275d9bf816ee35534981a3b5afbf905651a.tar.gz | |
QPID-3642,QPID-3643: Add Dead Letter Queue functionality for 0-8/0-9/0-9-1 paths, fixes isBound methods on FanoutExchange
Applied patch from Keith Wall <keith.wall@gmail.com>, Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1207029 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
9 files changed, 782 insertions, 29 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java index fdd533b704..7d128f2bc5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java @@ -20,18 +20,31 @@ */ package org.apache.qpid.server; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.TestLogActor; +import org.apache.qpid.server.queue.AMQPriorityQueue; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; -public class AMQBrokerManagerMBeanTest extends InternalBrokerBaseCase +public class AMQBrokerManagerMBeanTest extends QpidTestCase { private QueueRegistry _queueRegistry; private ExchangeRegistry _exchangeRegistry; @@ -95,14 +108,86 @@ public class AMQBrokerManagerMBeanTest extends InternalBrokerBaseCase assertTrue("New queue should be bound to default exchange", defaultExchange.isBound(new AMQShortString(queueName))); } + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument does cause the + * maximum delivery count to be set on the Queue. + */ + public void testCreateNewQueueWithMaximumDeliveryCount() throws Exception + { + final Map<String,Object> args = new HashMap<String, Object>(); + args.put(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5); + + final AMQShortString queueName = new AMQShortString("testCreateNewQueueWithMaximumDeliveryCount"); + + final QueueRegistry qReg = _vHost.getQueueRegistry(); + + assertNull("The queue should not yet exist", qReg.getQueue(queueName)); + + final ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject()); + mbean.createNewQueue(queueName.asString(), "test", false, args); + + final AMQQueue createdQueue = qReg.getQueue(queueName); + assertNotNull("The queue was not registered as expected", createdQueue); + assertEquals("Unexpected maximum delivery count", 5, createdQueue.getMaximumDeliveryCount()); + } + + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_PRIORITIES} argument prompts creation of + * a Priority Queue. + */ + public void testCreatePriorityQueue() throws Exception + { + int numPriorities = 7; + Map<String,Object> args = new HashMap<String, Object>(); + args.put(AMQQueueFactory.X_QPID_PRIORITIES, numPriorities); + + AMQShortString queueName = new AMQShortString("testCreatePriorityQueue"); + + QueueRegistry qReg = _vHost.getQueueRegistry(); + + assertNull("The queue should not yet exist", qReg.getQueue(queueName)); + + ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject()); + mbean.createNewQueue(queueName.asString(), "test", false, args); + + AMQQueue queue = qReg.getQueue(queueName); + assertEquals("Queue is not a priorty queue", AMQPriorityQueue.class, queue.getClass()); + assertEquals("Number of priorities supported was not as expected", numPriorities, ((AMQPriorityQueue)queue).getPriorities()); + } + @Override public void setUp() throws Exception { super.setUp(); + + CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); + + XMLConfiguration configXml = new XMLConfiguration(); + configXml.addProperty("virtualhosts.virtualhost(-1).name", "test"); + configXml.addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName()); + + ServerConfiguration configuration = new ServerConfiguration(configXml); + + ApplicationRegistry registry = new TestApplicationRegistry(configuration); + ApplicationRegistry.initialise(registry); + registry.getVirtualHostRegistry().setDefaultVirtualHostName("test"); + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); _vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test"); _queueRegistry = _vHost.getQueueRegistry(); _exchangeRegistry = _vHost.getExchangeRegistry(); } + @Override + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + ApplicationRegistry.remove(); + } + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java index 9941c00499..e1a5e7d338 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java @@ -24,6 +24,8 @@ import junit.framework.TestCase; import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.TestApplicationRegistry; public class QueueConfigurationTest extends TestCase { @@ -43,11 +45,71 @@ public class QueueConfigurationTest extends TestCase fullEnv.setProperty("queues.maximumMessageSize", 1); fullEnv.setProperty("queues.maximumMessageCount", 1); fullEnv.setProperty("queues.minimumAlertRepeatGap", 1); + fullEnv.setProperty("queues.deadLetterQueues", true); + fullEnv.setProperty("queues.maximumDeliveryCount", 5); _fullHostConf = new VirtualHostConfiguration("test", fullEnv); } + public void testMaxDeliveryCount() throws Exception + { + try + { + ApplicationRegistry registry = new TestApplicationRegistry(new ServerConfiguration(_env)); + ApplicationRegistry.initialise(registry); + + // Check default value + QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf); + assertEquals("Unexpected default server configuration for max delivery count ", 0, qConf.getMaxDeliveryCount()); + + // Check explicit value + VirtualHostConfiguration vhostConfig = overrideConfiguration("maximumDeliveryCount", 7); + qConf = new QueueConfiguration("test", vhostConfig); + assertEquals("Unexpected host configuration for max delivery count", 7, qConf.getMaxDeliveryCount()); + + // Check inherited value + qConf = new QueueConfiguration("test", _fullHostConf); + assertEquals("Unexpected queue configuration for max delivery count", 5, qConf.getMaxDeliveryCount()); + + } + finally + { + ApplicationRegistry.remove(); + } + } + + /** + * Tests that the default setting for DLQ configuration is disabled, and verifies that it can be overridden + * at a broker or virtualhost level. + * @throws Exception + */ + public void testIsDeadLetterQueueEnabled() throws Exception + { + try + { + ApplicationRegistry registry = new TestApplicationRegistry(new ServerConfiguration(_env)); + ApplicationRegistry.initialise(registry); + + // Check default value + QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf); + assertFalse("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled()); + + // Check explicit value + VirtualHostConfiguration vhostConfig = overrideConfiguration("deadLetterQueues", true); + qConf = new QueueConfiguration("test", vhostConfig); + assertTrue("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled()); + + // Check inherited value + qConf = new QueueConfiguration("test", _fullHostConf); + assertTrue("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled()); + } + finally + { + ApplicationRegistry.remove(); + } + } + public void testGetMaximumMessageAge() throws ConfigurationException { // Check default value diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index 9ee2ed3812..7739f9976e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -25,6 +25,7 @@ import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.io.Writer; import java.util.Locale; import org.apache.commons.configuration.ConfigurationException; @@ -1459,4 +1460,117 @@ public class ServerConfigurationTest extends QpidTestCase ce.getMessage()); } } + + public void testMaxDeliveryCountDefault() throws Exception + { + final ServerConfiguration serverConfig = new ServerConfiguration(_config); + assertEquals(0, serverConfig.getMaxDeliveryCount()); + } + + public void testMaxDeliveryCount() throws Exception + { + _config.setProperty("maximumDeliveryCount", 5); + final ServerConfiguration serverConfig = new ServerConfiguration(_config); + assertEquals(5, serverConfig.getMaxDeliveryCount()); + } + + /** + * Test XML configuration file correctly enables dead letter queues + */ + public void testDeadLetterQueueConfigurationFile() throws Exception + { + // Write config + File xml = File.createTempFile(getClass().getName(), "xml"); + xml.deleteOnExit(); + FileWriter config = new FileWriter(xml); + config.write("<broker>\n"); + writeSecurity(config); + config.write("<deadLetterQueues>true</deadLetterQueues>\n"); + config.write("<virtualhosts>\n"); + config.write("<virtualhost>\n"); + config.write("<name>test</name>\n"); + config.write("<test>\n"); + config.write("<queues>\n"); + config.write("<deadLetterQueues>false</deadLetterQueues>\n"); + config.write("<queue>\n"); + config.write("<name>biggles</name>\n"); + config.write("<biggles>\n"); + config.write("<deadLetterQueues>true</deadLetterQueues>\n"); + config.write("</biggles>\n"); + config.write("</queue>\n"); + config.write("<queue>\n"); + config.write("<name>beetle</name>\n"); + config.write("<beetle />\n"); + config.write("</queue>\n"); + config.write("</queues>\n"); + config.write("</test>\n"); + config.write("</virtualhost>\n"); + config.write("<virtualhost>\n"); + config.write("<name>extra</name>\n"); + config.write("<extra>\n"); + config.write("<queues>\n"); + config.write("<queue>\n"); + config.write("<name>r2d2</name>\n"); + config.write("<r2d2>\n"); + config.write("<deadLetterQueues>false</deadLetterQueues>\n"); + config.write("</r2d2>\n"); + config.write("</queue>\n"); + config.write("<queue>\n"); + config.write("<name>c3p0</name>\n"); + config.write("<c3p0 />\n"); + config.write("</queue>\n"); + config.write("</queues>\n"); + config.write("</extra>\n"); + config.write("</virtualhost>\n"); + config.write("</virtualhosts>\n"); + config.write("</broker>\n"); + config.close(); + + // Load config + ApplicationRegistry.remove(); + ApplicationRegistry registry = new ConfigurationFileApplicationRegistry(xml); + ApplicationRegistry.initialise(registry); + ServerConfiguration serverConfiguration = ApplicationRegistry.getInstance().getConfiguration(); + + VirtualHostConfiguration test = serverConfiguration.getVirtualHostConfig("test"); + assertNotNull("Host 'test' is not found", test); + VirtualHostConfiguration extra = serverConfiguration.getVirtualHostConfig("extra"); + assertNotNull("Host 'extra' is not found", test); + + QueueConfiguration biggles = test.getQueueConfiguration("biggles"); + QueueConfiguration beetle = test.getQueueConfiguration("beetle"); + QueueConfiguration r2d2 = extra.getQueueConfiguration("r2d2"); + QueueConfiguration c3p0 = extra.getQueueConfiguration("c3p0"); + + // Validate config + assertTrue("Broker DLQ should be configured as enabled", serverConfiguration.isDeadLetterQueueEnabled()); + assertFalse("Test vhost DLQ should be configured as disabled", test.isDeadLetterQueueEnabled()); + assertTrue("Extra vhost DLQ should be enabled, using broker default", extra.isDeadLetterQueueEnabled()); + assertTrue("Biggles queue DLQ should be configured as enabled", biggles.isDeadLetterQueueEnabled()); + assertFalse("Beetle queue DLQ should be disabled, using test vhost default", beetle.isDeadLetterQueueEnabled()); + assertFalse("R2D2 queue DLQ should be configured as disabled", r2d2.isDeadLetterQueueEnabled()); + assertTrue("C3P0 queue DLQ should be enabled, using broker default", c3p0.isDeadLetterQueueEnabled()); + } + + /** + * Convenience method to output required security preamble for broker config + */ + private void writeSecurity(Writer out) throws Exception + { + out.write("\t<management><enabled>false</enabled></management>\n"); + out.write("\t<security>\n"); + out.write("\t\t<pd-auth-manager>\n"); + out.write("\t\t\t<principal-database>\n"); + out.write("\t\t\t\t<class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>\n"); + out.write("\t\t\t\t<attributes>\n"); + out.write("\t\t\t\t\t<attribute>\n"); + out.write("\t\t\t\t\t\t<name>passwordFile</name>\n"); + out.write("\t\t\t\t\t\t<value>/dev/null</value>\n"); + out.write("\t\t\t\t\t</attribute>\n"); + out.write("\t\t\t\t</attributes>\n"); + out.write("\t\t\t</principal-database>\n"); + out.write("\t\t\t<jmx-access>/dev/null</jmx-access>\n"); + out.write("\t\t</pd-auth-manager>\n"); + out.write("\t</security>\n"); + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java index b133d53ac5..f6cd397217 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQPriorityQueue; import org.apache.qpid.server.queue.AMQQueue; @@ -34,19 +33,6 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase { @Override - public void setUp() throws Exception - { - super.setUp(); - // Set the default configuration items - getConfigXml().clear(); - getConfigXml().addProperty("virtualhosts.virtualhost(-1).name", "test"); - getConfigXml().addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName()); - - getConfigXml().addProperty("virtualhosts.virtualhost.name", getName()); - getConfigXml().addProperty("virtualhosts.virtualhost."+getName()+".store.class", TestableMemoryMessageStore.class.getName()); - } - - @Override public void createBroker() { // Prevent auto broker startup @@ -134,6 +120,88 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase assertEquals(3, bTest.getMaximumMessageAge()); } + public void testMaxDeliveryCount() throws Exception + { + // Set up vhosts and queues + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.maximumDeliveryCount", 5); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "biggles"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.queue.biggles.maximumDeliveryCount", 4); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "beetle"); + + // Start the broker now. + super.createBroker(); + + // Get vhosts + VirtualHost test = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName()); + + // Enabled specifically + assertEquals("Test vhost MDC was configured as enabled", 5 ,test.getConfiguration().getMaxDeliveryCount()); + + // Enabled by test vhost default + assertEquals("beetle queue DLQ was configured as enabled", test.getConfiguration().getMaxDeliveryCount(), test.getConfiguration().getQueueConfiguration("beetle").getMaxDeliveryCount()); + + // Disabled specifically + assertEquals("Biggles queue DLQ was configured as disabled", 4, test.getConfiguration().getQueueConfiguration("biggles").getMaxDeliveryCount()); + } + + /** + * Tests the full set of configuration options for enabling DLQs in the broker configuration. + */ + public void testIsDeadLetterQueueEnabled() throws Exception + { + // Set up vhosts and queues + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.deadLetterQueues", "true"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "biggles"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.queue.biggles.deadLetterQueues", "false"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "beetle"); + + + getConfigXml().addProperty("virtualhosts.virtualhost.name", getName() + "Extra"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "r2d2"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues.queue.r2d2.deadLetterQueues", "true"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "c3p0"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.class", TestableMemoryMessageStore.class.getName()); + + // Start the broker now. + super.createBroker(); + + // Get vhosts + VirtualHost test = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName()); + VirtualHost extra = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName() + "Extra"); + + // Enabled specifically + assertTrue("Test vhost DLQ was configured as enabled", test.getConfiguration().isDeadLetterQueueEnabled()); + assertTrue("r2d2 queue DLQ was configured as enabled", extra.getConfiguration().getQueueConfiguration("r2d2").isDeadLetterQueueEnabled()); + + // Enabled by test vhost default + assertTrue("beetle queue DLQ was configured as enabled", test.getConfiguration().getQueueConfiguration("beetle").isDeadLetterQueueEnabled()); + + // Disabled specifically + assertFalse("Biggles queue DLQ was configured as disabled", test.getConfiguration().getQueueConfiguration("biggles").isDeadLetterQueueEnabled()); + + // Using broker default of disabled + assertFalse("Extra vhost DLQ disabled, using broker default", extra.getConfiguration().isDeadLetterQueueEnabled()); + assertFalse("c3p0 queue DLQ was configured as disabled", extra.getConfiguration().getQueueConfiguration("c3p0").isDeadLetterQueueEnabled()); + + // Get queues + AMQQueue biggles = test.getQueueRegistry().getQueue(new AMQShortString("biggles")); + AMQQueue beetle = test.getQueueRegistry().getQueue(new AMQShortString("beetle")); + AMQQueue r2d2 = extra.getQueueRegistry().getQueue(new AMQShortString("r2d2")); + AMQQueue c3p0 = extra.getQueueRegistry().getQueue(new AMQShortString("c3p0")); + + // Disabled specifically for this queue, overriding virtualhost setting + assertNull("Biggles queue should not have alt exchange as DLQ should be configured as disabled: " + biggles.getAlternateExchange(), biggles.getAlternateExchange()); + + // Enabled for all queues on the virtualhost + assertNotNull("Beetle queue should have an alt exchange as DLQ should be enabled, using test vhost default", beetle.getAlternateExchange()); + + // Enabled specifically for this queue, overriding the default broker setting of disabled + assertNotNull("R2D2 queue should have an alt exchange as DLQ should be configured as enabled", r2d2.getAlternateExchange()); + + // Disabled by the default broker setting + assertNull("C3PO queue should not have an alt exchange as DLQ should be disabled, using broker default", c3p0.getAlternateExchange()); + } + /** * Test that the house keeping pool sizes is correctly processed * @@ -173,7 +241,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase vhost.getHouseKeepingTaskCount()); // Currently the two are tasks: - // ExpiredMessageTask from VirtualHost + // ExpiredMessageTask from VirtualHost // UpdateTask from the QMF ManagementExchange } @@ -214,7 +282,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase { getConfigXml().addProperty("virtualhosts.virtualhost.testSecurityAuthenticationNameRejected.security.authentication.name", "testdb"); - + try { super.createBroker(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 7b73987abf..ea2fe90da6 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -490,6 +490,22 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase { return null; } + + @Override + public int getDeliveryCount() + { + return 0; + } + + @Override + public void incrementDeliveryCount() + { + } + + @Override + public void decrementDeliveryCount() + { + } }; if(action != null) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index 27891289fb..2b7d1d7f26 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -20,39 +20,76 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.InternalBrokerBaseCase; -import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.TestLogActor; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; -public class AMQQueueFactoryTest extends InternalBrokerBaseCase +public class AMQQueueFactoryTest extends QpidTestCase { QueueRegistry _queueRegistry; VirtualHost _virtualHost; - int _defaultQueueCount; @Override public void setUp() throws Exception { super.setUp(); - ApplicationRegistry registry = (ApplicationRegistry) ApplicationRegistry.getInstance(); - _virtualHost = registry.getVirtualHostRegistry().getVirtualHost("test"); + CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); + + XMLConfiguration configXml = new XMLConfiguration(); + configXml.addProperty("virtualhosts.virtualhost(-1).name", getName()); + configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName()); + + ServerConfiguration configuration = new ServerConfiguration(configXml); + + ApplicationRegistry registry = new TestApplicationRegistry(configuration); + ApplicationRegistry.initialise(registry); + registry.getVirtualHostRegistry().setDefaultVirtualHostName(getName()); + + _virtualHost = registry.getVirtualHostRegistry().getVirtualHost(getName()); _queueRegistry = _virtualHost.getQueueRegistry(); - _defaultQueueCount = _queueRegistry.getQueues().size(); } @Override public void tearDown() throws Exception { - assertEquals("Queue was not registered in virtualhost", _defaultQueueCount + 1, _queueRegistry.getQueues().size()); - super.tearDown(); + try + { + super.tearDown(); + } + finally + { + ApplicationRegistry.remove(); + } + } + + private void verifyRegisteredQueueCount(int count) + { + assertEquals("Queue was not registered in virtualhost", count, _queueRegistry.getQueues().size()); } + private void verifyQueueRegistered(String queueName) + { + assertNotNull("Queue " + queueName + " was not created", _queueRegistry.getQueue(queueName)); + } + public void testPriorityQueueRegistration() throws Exception { FieldTable fieldTable = new FieldTable(); @@ -63,13 +100,314 @@ public class AMQQueueFactoryTest extends InternalBrokerBaseCase false, _virtualHost, fieldTable); assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass()); + verifyQueueRegistered("testPriorityQueue"); + verifyRegisteredQueueCount(1); } public void testSimpleQueueRegistration() throws Exception { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false, + AMQShortString queueName = new AMQShortString("testSimpleQueueRegistration"); + AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, _virtualHost, null); assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass()); + verifyQueueRegistered("testSimpleQueueRegistration"); + + //verify that no alternate exchange or DLQ were produced + QueueRegistry qReg = _virtualHost.getQueueRegistry(); + + assertNull("Queue should not have an alternate exchange as DLQ wasnt enabled", queue.getAlternateExchange()); + assertNull("The DLQ should not exist", qReg.getQueue(dlQueueName)); + + verifyRegisteredQueueCount(1); + } + + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true does + * cause the alternate exchange to be set and DLQ to be produced. + * @throws AMQException + */ + public void testDeadLetterQueueEnabled() throws AMQException + { + FieldTable fieldTable = new FieldTable(); + fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + + AMQShortString queueName = new AMQShortString("testDeadLetterQueueEnabled"); + AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + + QueueRegistry qReg = _virtualHost.getQueueRegistry(); + ExchangeRegistry exReg = _virtualHost.getExchangeRegistry(); + + assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName)); + + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, + _virtualHost, fieldTable); + + Exchange altExchange = queue.getAlternateExchange(); + assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); + assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName()); + assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName()); + + assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName)); + assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName)); + + AMQQueue dlQueue = qReg.getQueue(dlQueueName); + assertNotNull("The DLQ was not registered as expected", dlQueue); + assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); + assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); + assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryCount()); + + //2 queues should have been registered + verifyRegisteredQueueCount(2); + } + + /** + * Tests that the deadLetterQueues/maximumDeliveryCount settings from the configuration + * are not applied to the DLQ itself. + * @throws AMQException + */ + public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws AMQException + { + ApplicationRegistry.getInstance().getConfiguration().getConfig().addProperty("deadLetterQueues","true"); + ApplicationRegistry.getInstance().getConfiguration().getConfig().addProperty("maximumDeliveryCount","5"); + + AMQShortString queueName = new AMQShortString("testDeadLetterQueueEnabled"); + AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + + QueueRegistry qReg = _virtualHost.getQueueRegistry(); + ExchangeRegistry exReg = _virtualHost.getExchangeRegistry(); + + assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName)); + + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, + _virtualHost, null); + + assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount()); + Exchange altExchange = queue.getAlternateExchange(); + assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); + assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName()); + assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName()); + + assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName)); + assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName)); + + AMQQueue dlQueue = qReg.getQueue(dlQueueName); + assertNotNull("The DLQ was not registered as expected", dlQueue); + assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); + assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); + assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryCount()); + + //2 queues should have been registered + verifyRegisteredQueueCount(2); + } + + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument false does not + * result in the alternate exchange being set and DLQ being created. + * @throws AMQException + */ + public void testDeadLetterQueueDisabled() throws AMQException + { + FieldTable fieldTable = new FieldTable(); + fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, false); + + AMQShortString queueName = new AMQShortString("testDeadLetterQueueDisabled"); + AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + + QueueRegistry qReg = _virtualHost.getQueueRegistry(); + ExchangeRegistry exReg = _virtualHost.getExchangeRegistry(); + + assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName)); + + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, + _virtualHost, fieldTable); + + assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange()); + assertNull("The alternate exchange should still not exist", exReg.getExchange(dlExchangeName)); + + assertNull("The DLQ should still not exist", qReg.getQueue(dlQueueName)); + + //only 1 queue should have been registered + verifyRegisteredQueueCount(1); + } + + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true but + * creating an auto-delete queue, does not result in the alternate exchange + * being set and DLQ being created. + * @throws AMQException + */ + public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws AMQException + { + FieldTable fieldTable = new FieldTable(); + fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + + AMQShortString queueName = new AMQShortString("testDeadLetterQueueNotCreatedForAutodeleteQueues"); + AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + + QueueRegistry qReg = _virtualHost.getQueueRegistry(); + ExchangeRegistry exReg = _virtualHost.getExchangeRegistry(); + + assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName)); + + //create an autodelete queue + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), true, false, + _virtualHost, fieldTable); + assertTrue("Queue should be autodelete", queue.isAutoDelete()); + + //ensure that the autodelete property overrides the request to enable DLQ + assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange()); + assertNull("The alternate exchange should not exist as queue is autodelete", exReg.getExchange(dlExchangeName)); + assertNull("The DLQ should not exist as queue is autodelete", qReg.getQueue(dlQueueName)); + + //only 1 queue should have been registered + verifyRegisteredQueueCount(1); + } + + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has + * the desired effect. + */ + public void testMaximumDeliveryCount() throws Exception + { + final FieldTable fieldTable = new FieldTable(); + fieldTable.setInteger(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5); + + final AMQShortString queueName = new AMQShortString("testMaximumDeliveryCount"); + + final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, + _virtualHost, fieldTable); + + assertNotNull("The queue was not registered as expected ", queue); + assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount()); + + verifyRegisteredQueueCount(1); + } + + /** + * Tests that omitting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means + * that queue is created with a default maximumDeliveryCount of zero (unless set in config). + */ + public void testMaximumDeliveryCountDefault() throws Exception + { + + final AMQShortString queueName = new AMQShortString("testMaximumDeliveryCount"); + + final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, + _virtualHost, null); + + assertNotNull("The queue was not registered as expected ", queue); + assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount()); + + verifyRegisteredQueueCount(1); + } + + /** + * Tests queue creation with queue name set to null + */ + public void testQueueNameNullValidation() + { + try + { + AMQQueueFactory.createAMQQueueImpl(null, false, new AMQShortString("owner"), true, false, _virtualHost, null); + fail("queue with null name can not be created!"); + } + catch (Exception e) + { + assertTrue(e instanceof IllegalArgumentException); + assertEquals("Queue name must not be null", e.getMessage()); + } + } + + /** + * Tests queue creation with queue name length less 255 characters but + * corresponding DLQ name length greater than 255. + */ + public void testQueueNameWithLengthLessThan255ButDLQNameWithLengthGreaterThan255() + { + String queueName = "test-" + generateStringWithLength('a', 245); + try + { + // change DLQ name to make its length bigger than exchange name + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterExchangeSuffix", "_DLE"); + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterQueueSuffix", "_DLQUEUE"); + + FieldTable fieldTable = new FieldTable(); + fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), false, new AMQShortString("owner"), + false, false, _virtualHost, fieldTable); + fail("queue with DLQ name having more than 255 characters can not be created!"); + } + catch (Exception e) + { + assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException); + assertTrue("Unexpected exception message!", e.getMessage().contains("DLQ queue name") + && e.getMessage().contains("length exceeds limit of 255")); + } + finally + { + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + } + } + + /** + * Tests queue creation with queue name length less 255 characters but + * corresponding DL exchange name length greater than 255. + */ + public void testQueueNameWithLengthLessThan255ButDLExchangeNameWithLengthGreaterThan255() + { + String queueName = "test-" + generateStringWithLength('a', 245); + try + { + // change DLQ name to make its length bigger than exchange name + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterExchangeSuffix", "_DLEXCHANGE"); + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterQueueSuffix", "_DLQ"); + + FieldTable fieldTable = new FieldTable(); + fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), false, new AMQShortString("owner"), + false, false, _virtualHost, fieldTable); + fail("queue with DLE name having more than 255 characters can not be created!"); + } + catch (Exception e) + { + assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException); + assertTrue("Unexpected exception message!", e.getMessage().contains("DL exchange name") + && e.getMessage().contains("length exceeds limit of 255")); + } + finally + { + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + } + } + + private String generateStringWithLength(char ch, int length) + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < length; i++) + { + sb.append(ch); + } + return sb.toString(); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 8f3023f269..f70250132a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -44,6 +44,7 @@ import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.TabularData; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -390,6 +391,34 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase assertFalse(channel.getBlocking()); } + public void testMaximumDeliveryCount() throws IOException + { + assertEquals("Unexpected default maximum delivery count", Integer.valueOf(0), _queueMBean.getMaximumDeliveryCount()); + } + + public void testViewAllMessages() throws Exception + { + final int messageCount = 5; + sendPersistentMessages(messageCount); + + + final TabularData messageTable = _queueMBean.viewMessages(1L, 5L); + assertNotNull("Message table should not be null", messageTable); + assertEquals("Unexpected number of rows", messageCount, messageTable.size()); + + + final Iterator rowIterator = messageTable.values().iterator(); + // Get its message ID + final CompositeDataSupport row1 = (CompositeDataSupport) rowIterator.next(); + final Long msgId = (Long) row1.get("AMQ MessageId"); + final Long queuePosition = (Long) row1.get("Queue Position"); + final Integer deliveryCount = (Integer) row1.get("Delivery Count"); + + assertNotNull("Row should have value for queue position", queuePosition); + assertNotNull("Row should have value for msgid", msgId); + assertNotNull("Row should have value for deliveryCount", deliveryCount); + } + @Override public void setUp() throws Exception @@ -404,6 +433,13 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase ApplicationRegistry.remove(); } + private void sendPersistentMessages(int messageCount) throws AMQException + { + sendMessages(messageCount, true); + assertEquals("Expected " + messageCount + " messages in the queue", messageCount, _queueMBean + .getMessageCount().intValue()); + } + private List<AMQMessage> sendMessages(int messageCount, boolean persistent) throws AMQException { return sendMessages(messageCount, persistent, 0l, 0l); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 4c31092983..0daf79122c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -611,4 +611,21 @@ public class MockAMQQueue implements AMQQueue { } + + @Override + public int getMaximumDeliveryCount() + { + return 0; + } + + @Override + public void setMaximumDeliveryCount(int maximumDeliveryCount) + { + } + + @Override + public void setAlternateExchange(String exchangeName) + { + } + } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 864b9ad368..7ad002c248 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -244,4 +244,21 @@ public class MockQueueEntry implements QueueEntry return null; } + @Override + public int getDeliveryCount() + { + return 0; + } + + @Override + public void incrementDeliveryCount() + { + } + + @Override + public void decrementDeliveryCount() + { + } + + } |
