summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-28 09:19:15 +0000
committerKeith Wall <kwall@apache.org>2011-11-28 09:19:15 +0000
commit34b8e275d9bf816ee35534981a3b5afbf905651a (patch)
treea784c3ae5ae0608a519f332554f1bee5253c9375 /qpid/java/broker/src/test
parent1c1293b15d9a4475c3e8dadf5d67f027bd64001b (diff)
downloadqpid-python-34b8e275d9bf816ee35534981a3b5afbf905651a.tar.gz
QPID-3642,QPID-3643: Add Dead Letter Queue functionality for 0-8/0-9/0-9-1 paths, fixes isBound methods on FanoutExchange
Applied patch from Keith Wall <keith.wall@gmail.com>, Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1207029 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java89
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java62
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java114
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java100
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java16
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java360
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java36
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java17
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java17
9 files changed, 782 insertions, 29 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
index fdd533b704..7d128f2bc5 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
@@ -20,18 +20,31 @@
*/
package org.apache.qpid.server;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.queue.AMQPriorityQueue;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class AMQBrokerManagerMBeanTest extends InternalBrokerBaseCase
+public class AMQBrokerManagerMBeanTest extends QpidTestCase
{
private QueueRegistry _queueRegistry;
private ExchangeRegistry _exchangeRegistry;
@@ -95,14 +108,86 @@ public class AMQBrokerManagerMBeanTest extends InternalBrokerBaseCase
assertTrue("New queue should be bound to default exchange", defaultExchange.isBound(new AMQShortString(queueName)));
}
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument does cause the
+ * maximum delivery count to be set on the Queue.
+ */
+ public void testCreateNewQueueWithMaximumDeliveryCount() throws Exception
+ {
+ final Map<String,Object> args = new HashMap<String, Object>();
+ args.put(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5);
+
+ final AMQShortString queueName = new AMQShortString("testCreateNewQueueWithMaximumDeliveryCount");
+
+ final QueueRegistry qReg = _vHost.getQueueRegistry();
+
+ assertNull("The queue should not yet exist", qReg.getQueue(queueName));
+
+ final ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
+ mbean.createNewQueue(queueName.asString(), "test", false, args);
+
+ final AMQQueue createdQueue = qReg.getQueue(queueName);
+ assertNotNull("The queue was not registered as expected", createdQueue);
+ assertEquals("Unexpected maximum delivery count", 5, createdQueue.getMaximumDeliveryCount());
+ }
+
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_PRIORITIES} argument prompts creation of
+ * a Priority Queue.
+ */
+ public void testCreatePriorityQueue() throws Exception
+ {
+ int numPriorities = 7;
+ Map<String,Object> args = new HashMap<String, Object>();
+ args.put(AMQQueueFactory.X_QPID_PRIORITIES, numPriorities);
+
+ AMQShortString queueName = new AMQShortString("testCreatePriorityQueue");
+
+ QueueRegistry qReg = _vHost.getQueueRegistry();
+
+ assertNull("The queue should not yet exist", qReg.getQueue(queueName));
+
+ ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
+ mbean.createNewQueue(queueName.asString(), "test", false, args);
+
+ AMQQueue queue = qReg.getQueue(queueName);
+ assertEquals("Queue is not a priorty queue", AMQPriorityQueue.class, queue.getClass());
+ assertEquals("Number of priorities supported was not as expected", numPriorities, ((AMQPriorityQueue)queue).getPriorities());
+ }
+
@Override
public void setUp() throws Exception
{
super.setUp();
+
+ CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
+
+ XMLConfiguration configXml = new XMLConfiguration();
+ configXml.addProperty("virtualhosts.virtualhost(-1).name", "test");
+ configXml.addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName());
+
+ ServerConfiguration configuration = new ServerConfiguration(configXml);
+
+ ApplicationRegistry registry = new TestApplicationRegistry(configuration);
+ ApplicationRegistry.initialise(registry);
+ registry.getVirtualHostRegistry().setDefaultVirtualHostName("test");
+
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
_vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
_queueRegistry = _vHost.getQueueRegistry();
_exchangeRegistry = _vHost.getExchangeRegistry();
}
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ ApplicationRegistry.remove();
+ }
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
index 9941c00499..e1a5e7d338 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
@@ -24,6 +24,8 @@ import junit.framework.TestCase;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
public class QueueConfigurationTest extends TestCase
{
@@ -43,11 +45,71 @@ public class QueueConfigurationTest extends TestCase
fullEnv.setProperty("queues.maximumMessageSize", 1);
fullEnv.setProperty("queues.maximumMessageCount", 1);
fullEnv.setProperty("queues.minimumAlertRepeatGap", 1);
+ fullEnv.setProperty("queues.deadLetterQueues", true);
+ fullEnv.setProperty("queues.maximumDeliveryCount", 5);
_fullHostConf = new VirtualHostConfiguration("test", fullEnv);
}
+ public void testMaxDeliveryCount() throws Exception
+ {
+ try
+ {
+ ApplicationRegistry registry = new TestApplicationRegistry(new ServerConfiguration(_env));
+ ApplicationRegistry.initialise(registry);
+
+ // Check default value
+ QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
+ assertEquals("Unexpected default server configuration for max delivery count ", 0, qConf.getMaxDeliveryCount());
+
+ // Check explicit value
+ VirtualHostConfiguration vhostConfig = overrideConfiguration("maximumDeliveryCount", 7);
+ qConf = new QueueConfiguration("test", vhostConfig);
+ assertEquals("Unexpected host configuration for max delivery count", 7, qConf.getMaxDeliveryCount());
+
+ // Check inherited value
+ qConf = new QueueConfiguration("test", _fullHostConf);
+ assertEquals("Unexpected queue configuration for max delivery count", 5, qConf.getMaxDeliveryCount());
+
+ }
+ finally
+ {
+ ApplicationRegistry.remove();
+ }
+ }
+
+ /**
+ * Tests that the default setting for DLQ configuration is disabled, and verifies that it can be overridden
+ * at a broker or virtualhost level.
+ * @throws Exception
+ */
+ public void testIsDeadLetterQueueEnabled() throws Exception
+ {
+ try
+ {
+ ApplicationRegistry registry = new TestApplicationRegistry(new ServerConfiguration(_env));
+ ApplicationRegistry.initialise(registry);
+
+ // Check default value
+ QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
+ assertFalse("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled());
+
+ // Check explicit value
+ VirtualHostConfiguration vhostConfig = overrideConfiguration("deadLetterQueues", true);
+ qConf = new QueueConfiguration("test", vhostConfig);
+ assertTrue("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled());
+
+ // Check inherited value
+ qConf = new QueueConfiguration("test", _fullHostConf);
+ assertTrue("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled());
+ }
+ finally
+ {
+ ApplicationRegistry.remove();
+ }
+ }
+
public void testGetMaximumMessageAge() throws ConfigurationException
{
// Check default value
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index 9ee2ed3812..7739f9976e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -25,6 +25,7 @@ import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.io.Writer;
import java.util.Locale;
import org.apache.commons.configuration.ConfigurationException;
@@ -1459,4 +1460,117 @@ public class ServerConfigurationTest extends QpidTestCase
ce.getMessage());
}
}
+
+ public void testMaxDeliveryCountDefault() throws Exception
+ {
+ final ServerConfiguration serverConfig = new ServerConfiguration(_config);
+ assertEquals(0, serverConfig.getMaxDeliveryCount());
+ }
+
+ public void testMaxDeliveryCount() throws Exception
+ {
+ _config.setProperty("maximumDeliveryCount", 5);
+ final ServerConfiguration serverConfig = new ServerConfiguration(_config);
+ assertEquals(5, serverConfig.getMaxDeliveryCount());
+ }
+
+ /**
+ * Test XML configuration file correctly enables dead letter queues
+ */
+ public void testDeadLetterQueueConfigurationFile() throws Exception
+ {
+ // Write config
+ File xml = File.createTempFile(getClass().getName(), "xml");
+ xml.deleteOnExit();
+ FileWriter config = new FileWriter(xml);
+ config.write("<broker>\n");
+ writeSecurity(config);
+ config.write("<deadLetterQueues>true</deadLetterQueues>\n");
+ config.write("<virtualhosts>\n");
+ config.write("<virtualhost>\n");
+ config.write("<name>test</name>\n");
+ config.write("<test>\n");
+ config.write("<queues>\n");
+ config.write("<deadLetterQueues>false</deadLetterQueues>\n");
+ config.write("<queue>\n");
+ config.write("<name>biggles</name>\n");
+ config.write("<biggles>\n");
+ config.write("<deadLetterQueues>true</deadLetterQueues>\n");
+ config.write("</biggles>\n");
+ config.write("</queue>\n");
+ config.write("<queue>\n");
+ config.write("<name>beetle</name>\n");
+ config.write("<beetle />\n");
+ config.write("</queue>\n");
+ config.write("</queues>\n");
+ config.write("</test>\n");
+ config.write("</virtualhost>\n");
+ config.write("<virtualhost>\n");
+ config.write("<name>extra</name>\n");
+ config.write("<extra>\n");
+ config.write("<queues>\n");
+ config.write("<queue>\n");
+ config.write("<name>r2d2</name>\n");
+ config.write("<r2d2>\n");
+ config.write("<deadLetterQueues>false</deadLetterQueues>\n");
+ config.write("</r2d2>\n");
+ config.write("</queue>\n");
+ config.write("<queue>\n");
+ config.write("<name>c3p0</name>\n");
+ config.write("<c3p0 />\n");
+ config.write("</queue>\n");
+ config.write("</queues>\n");
+ config.write("</extra>\n");
+ config.write("</virtualhost>\n");
+ config.write("</virtualhosts>\n");
+ config.write("</broker>\n");
+ config.close();
+
+ // Load config
+ ApplicationRegistry.remove();
+ ApplicationRegistry registry = new ConfigurationFileApplicationRegistry(xml);
+ ApplicationRegistry.initialise(registry);
+ ServerConfiguration serverConfiguration = ApplicationRegistry.getInstance().getConfiguration();
+
+ VirtualHostConfiguration test = serverConfiguration.getVirtualHostConfig("test");
+ assertNotNull("Host 'test' is not found", test);
+ VirtualHostConfiguration extra = serverConfiguration.getVirtualHostConfig("extra");
+ assertNotNull("Host 'extra' is not found", test);
+
+ QueueConfiguration biggles = test.getQueueConfiguration("biggles");
+ QueueConfiguration beetle = test.getQueueConfiguration("beetle");
+ QueueConfiguration r2d2 = extra.getQueueConfiguration("r2d2");
+ QueueConfiguration c3p0 = extra.getQueueConfiguration("c3p0");
+
+ // Validate config
+ assertTrue("Broker DLQ should be configured as enabled", serverConfiguration.isDeadLetterQueueEnabled());
+ assertFalse("Test vhost DLQ should be configured as disabled", test.isDeadLetterQueueEnabled());
+ assertTrue("Extra vhost DLQ should be enabled, using broker default", extra.isDeadLetterQueueEnabled());
+ assertTrue("Biggles queue DLQ should be configured as enabled", biggles.isDeadLetterQueueEnabled());
+ assertFalse("Beetle queue DLQ should be disabled, using test vhost default", beetle.isDeadLetterQueueEnabled());
+ assertFalse("R2D2 queue DLQ should be configured as disabled", r2d2.isDeadLetterQueueEnabled());
+ assertTrue("C3P0 queue DLQ should be enabled, using broker default", c3p0.isDeadLetterQueueEnabled());
+ }
+
+ /**
+ * Convenience method to output required security preamble for broker config
+ */
+ private void writeSecurity(Writer out) throws Exception
+ {
+ out.write("\t<management><enabled>false</enabled></management>\n");
+ out.write("\t<security>\n");
+ out.write("\t\t<pd-auth-manager>\n");
+ out.write("\t\t\t<principal-database>\n");
+ out.write("\t\t\t\t<class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>\n");
+ out.write("\t\t\t\t<attributes>\n");
+ out.write("\t\t\t\t\t<attribute>\n");
+ out.write("\t\t\t\t\t\t<name>passwordFile</name>\n");
+ out.write("\t\t\t\t\t\t<value>/dev/null</value>\n");
+ out.write("\t\t\t\t\t</attribute>\n");
+ out.write("\t\t\t\t</attributes>\n");
+ out.write("\t\t\t</principal-database>\n");
+ out.write("\t\t\t<jmx-access>/dev/null</jmx-access>\n");
+ out.write("\t\t</pd-auth-manager>\n");
+ out.write("\t</security>\n");
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
index b133d53ac5..f6cd397217 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
@@ -21,7 +21,6 @@ package org.apache.qpid.server.configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
@@ -34,19 +33,6 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase
{
@Override
- public void setUp() throws Exception
- {
- super.setUp();
- // Set the default configuration items
- getConfigXml().clear();
- getConfigXml().addProperty("virtualhosts.virtualhost(-1).name", "test");
- getConfigXml().addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName());
-
- getConfigXml().addProperty("virtualhosts.virtualhost.name", getName());
- getConfigXml().addProperty("virtualhosts.virtualhost."+getName()+".store.class", TestableMemoryMessageStore.class.getName());
- }
-
- @Override
public void createBroker()
{
// Prevent auto broker startup
@@ -134,6 +120,88 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase
assertEquals(3, bTest.getMaximumMessageAge());
}
+ public void testMaxDeliveryCount() throws Exception
+ {
+ // Set up vhosts and queues
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.maximumDeliveryCount", 5);
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "biggles");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.queue.biggles.maximumDeliveryCount", 4);
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "beetle");
+
+ // Start the broker now.
+ super.createBroker();
+
+ // Get vhosts
+ VirtualHost test = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName());
+
+ // Enabled specifically
+ assertEquals("Test vhost MDC was configured as enabled", 5 ,test.getConfiguration().getMaxDeliveryCount());
+
+ // Enabled by test vhost default
+ assertEquals("beetle queue DLQ was configured as enabled", test.getConfiguration().getMaxDeliveryCount(), test.getConfiguration().getQueueConfiguration("beetle").getMaxDeliveryCount());
+
+ // Disabled specifically
+ assertEquals("Biggles queue DLQ was configured as disabled", 4, test.getConfiguration().getQueueConfiguration("biggles").getMaxDeliveryCount());
+ }
+
+ /**
+ * Tests the full set of configuration options for enabling DLQs in the broker configuration.
+ */
+ public void testIsDeadLetterQueueEnabled() throws Exception
+ {
+ // Set up vhosts and queues
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.deadLetterQueues", "true");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "biggles");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.queue.biggles.deadLetterQueues", "false");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "beetle");
+
+
+ getConfigXml().addProperty("virtualhosts.virtualhost.name", getName() + "Extra");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "r2d2");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues.queue.r2d2.deadLetterQueues", "true");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "c3p0");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.class", TestableMemoryMessageStore.class.getName());
+
+ // Start the broker now.
+ super.createBroker();
+
+ // Get vhosts
+ VirtualHost test = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName());
+ VirtualHost extra = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName() + "Extra");
+
+ // Enabled specifically
+ assertTrue("Test vhost DLQ was configured as enabled", test.getConfiguration().isDeadLetterQueueEnabled());
+ assertTrue("r2d2 queue DLQ was configured as enabled", extra.getConfiguration().getQueueConfiguration("r2d2").isDeadLetterQueueEnabled());
+
+ // Enabled by test vhost default
+ assertTrue("beetle queue DLQ was configured as enabled", test.getConfiguration().getQueueConfiguration("beetle").isDeadLetterQueueEnabled());
+
+ // Disabled specifically
+ assertFalse("Biggles queue DLQ was configured as disabled", test.getConfiguration().getQueueConfiguration("biggles").isDeadLetterQueueEnabled());
+
+ // Using broker default of disabled
+ assertFalse("Extra vhost DLQ disabled, using broker default", extra.getConfiguration().isDeadLetterQueueEnabled());
+ assertFalse("c3p0 queue DLQ was configured as disabled", extra.getConfiguration().getQueueConfiguration("c3p0").isDeadLetterQueueEnabled());
+
+ // Get queues
+ AMQQueue biggles = test.getQueueRegistry().getQueue(new AMQShortString("biggles"));
+ AMQQueue beetle = test.getQueueRegistry().getQueue(new AMQShortString("beetle"));
+ AMQQueue r2d2 = extra.getQueueRegistry().getQueue(new AMQShortString("r2d2"));
+ AMQQueue c3p0 = extra.getQueueRegistry().getQueue(new AMQShortString("c3p0"));
+
+ // Disabled specifically for this queue, overriding virtualhost setting
+ assertNull("Biggles queue should not have alt exchange as DLQ should be configured as disabled: " + biggles.getAlternateExchange(), biggles.getAlternateExchange());
+
+ // Enabled for all queues on the virtualhost
+ assertNotNull("Beetle queue should have an alt exchange as DLQ should be enabled, using test vhost default", beetle.getAlternateExchange());
+
+ // Enabled specifically for this queue, overriding the default broker setting of disabled
+ assertNotNull("R2D2 queue should have an alt exchange as DLQ should be configured as enabled", r2d2.getAlternateExchange());
+
+ // Disabled by the default broker setting
+ assertNull("C3PO queue should not have an alt exchange as DLQ should be disabled, using broker default", c3p0.getAlternateExchange());
+ }
+
/**
* Test that the house keeping pool sizes is correctly processed
*
@@ -173,7 +241,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase
vhost.getHouseKeepingTaskCount());
// Currently the two are tasks:
- // ExpiredMessageTask from VirtualHost
+ // ExpiredMessageTask from VirtualHost
// UpdateTask from the QMF ManagementExchange
}
@@ -214,7 +282,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase
{
getConfigXml().addProperty("virtualhosts.virtualhost.testSecurityAuthenticationNameRejected.security.authentication.name",
"testdb");
-
+
try
{
super.createBroker();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 7b73987abf..ea2fe90da6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -490,6 +490,22 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
{
return null;
}
+
+ @Override
+ public int getDeliveryCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public void incrementDeliveryCount()
+ {
+ }
+
+ @Override
+ public void decrementDeliveryCount()
+ {
+ }
};
if(action != null)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index 27891289fb..2b7d1d7f26 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -20,39 +20,76 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class AMQQueueFactoryTest extends InternalBrokerBaseCase
+public class AMQQueueFactoryTest extends QpidTestCase
{
QueueRegistry _queueRegistry;
VirtualHost _virtualHost;
- int _defaultQueueCount;
@Override
public void setUp() throws Exception
{
super.setUp();
- ApplicationRegistry registry = (ApplicationRegistry) ApplicationRegistry.getInstance();
- _virtualHost = registry.getVirtualHostRegistry().getVirtualHost("test");
+ CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
+
+ XMLConfiguration configXml = new XMLConfiguration();
+ configXml.addProperty("virtualhosts.virtualhost(-1).name", getName());
+ configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName());
+
+ ServerConfiguration configuration = new ServerConfiguration(configXml);
+
+ ApplicationRegistry registry = new TestApplicationRegistry(configuration);
+ ApplicationRegistry.initialise(registry);
+ registry.getVirtualHostRegistry().setDefaultVirtualHostName(getName());
+
+ _virtualHost = registry.getVirtualHostRegistry().getVirtualHost(getName());
_queueRegistry = _virtualHost.getQueueRegistry();
- _defaultQueueCount = _queueRegistry.getQueues().size();
}
@Override
public void tearDown() throws Exception
{
- assertEquals("Queue was not registered in virtualhost", _defaultQueueCount + 1, _queueRegistry.getQueues().size());
- super.tearDown();
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ ApplicationRegistry.remove();
+ }
+ }
+
+ private void verifyRegisteredQueueCount(int count)
+ {
+ assertEquals("Queue was not registered in virtualhost", count, _queueRegistry.getQueues().size());
}
+ private void verifyQueueRegistered(String queueName)
+ {
+ assertNotNull("Queue " + queueName + " was not created", _queueRegistry.getQueue(queueName));
+ }
+
public void testPriorityQueueRegistration() throws Exception
{
FieldTable fieldTable = new FieldTable();
@@ -63,13 +100,314 @@ public class AMQQueueFactoryTest extends InternalBrokerBaseCase
false, _virtualHost, fieldTable);
assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());
+ verifyQueueRegistered("testPriorityQueue");
+ verifyRegisteredQueueCount(1);
}
public void testSimpleQueueRegistration() throws Exception
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false,
+ AMQShortString queueName = new AMQShortString("testSimpleQueueRegistration");
+ AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false,
false, _virtualHost, null);
assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
+ verifyQueueRegistered("testSimpleQueueRegistration");
+
+ //verify that no alternate exchange or DLQ were produced
+ QueueRegistry qReg = _virtualHost.getQueueRegistry();
+
+ assertNull("Queue should not have an alternate exchange as DLQ wasnt enabled", queue.getAlternateExchange());
+ assertNull("The DLQ should not exist", qReg.getQueue(dlQueueName));
+
+ verifyRegisteredQueueCount(1);
+ }
+
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true does
+ * cause the alternate exchange to be set and DLQ to be produced.
+ * @throws AMQException
+ */
+ public void testDeadLetterQueueEnabled() throws AMQException
+ {
+ FieldTable fieldTable = new FieldTable();
+ fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+
+ AMQShortString queueName = new AMQShortString("testDeadLetterQueueEnabled");
+ AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+
+ QueueRegistry qReg = _virtualHost.getQueueRegistry();
+ ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
+
+ assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName));
+
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false,
+ _virtualHost, fieldTable);
+
+ Exchange altExchange = queue.getAlternateExchange();
+ assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
+ assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName());
+ assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName());
+
+ assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName));
+ assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName));
+
+ AMQQueue dlQueue = qReg.getQueue(dlQueueName);
+ assertNotNull("The DLQ was not registered as expected", dlQueue);
+ assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
+ assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
+ assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryCount());
+
+ //2 queues should have been registered
+ verifyRegisteredQueueCount(2);
+ }
+
+ /**
+ * Tests that the deadLetterQueues/maximumDeliveryCount settings from the configuration
+ * are not applied to the DLQ itself.
+ * @throws AMQException
+ */
+ public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws AMQException
+ {
+ ApplicationRegistry.getInstance().getConfiguration().getConfig().addProperty("deadLetterQueues","true");
+ ApplicationRegistry.getInstance().getConfiguration().getConfig().addProperty("maximumDeliveryCount","5");
+
+ AMQShortString queueName = new AMQShortString("testDeadLetterQueueEnabled");
+ AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+
+ QueueRegistry qReg = _virtualHost.getQueueRegistry();
+ ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
+
+ assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName));
+
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false,
+ _virtualHost, null);
+
+ assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount());
+ Exchange altExchange = queue.getAlternateExchange();
+ assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
+ assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName());
+ assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName());
+
+ assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName));
+ assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName));
+
+ AMQQueue dlQueue = qReg.getQueue(dlQueueName);
+ assertNotNull("The DLQ was not registered as expected", dlQueue);
+ assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
+ assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
+ assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryCount());
+
+ //2 queues should have been registered
+ verifyRegisteredQueueCount(2);
+ }
+
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument false does not
+ * result in the alternate exchange being set and DLQ being created.
+ * @throws AMQException
+ */
+ public void testDeadLetterQueueDisabled() throws AMQException
+ {
+ FieldTable fieldTable = new FieldTable();
+ fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, false);
+
+ AMQShortString queueName = new AMQShortString("testDeadLetterQueueDisabled");
+ AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+
+ QueueRegistry qReg = _virtualHost.getQueueRegistry();
+ ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
+
+ assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName));
+
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false,
+ _virtualHost, fieldTable);
+
+ assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange());
+ assertNull("The alternate exchange should still not exist", exReg.getExchange(dlExchangeName));
+
+ assertNull("The DLQ should still not exist", qReg.getQueue(dlQueueName));
+
+ //only 1 queue should have been registered
+ verifyRegisteredQueueCount(1);
+ }
+
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true but
+ * creating an auto-delete queue, does not result in the alternate exchange
+ * being set and DLQ being created.
+ * @throws AMQException
+ */
+ public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws AMQException
+ {
+ FieldTable fieldTable = new FieldTable();
+ fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+
+ AMQShortString queueName = new AMQShortString("testDeadLetterQueueNotCreatedForAutodeleteQueues");
+ AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+
+ QueueRegistry qReg = _virtualHost.getQueueRegistry();
+ ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
+
+ assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName));
+
+ //create an autodelete queue
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), true, false,
+ _virtualHost, fieldTable);
+ assertTrue("Queue should be autodelete", queue.isAutoDelete());
+
+ //ensure that the autodelete property overrides the request to enable DLQ
+ assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange());
+ assertNull("The alternate exchange should not exist as queue is autodelete", exReg.getExchange(dlExchangeName));
+ assertNull("The DLQ should not exist as queue is autodelete", qReg.getQueue(dlQueueName));
+
+ //only 1 queue should have been registered
+ verifyRegisteredQueueCount(1);
+ }
+
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has
+ * the desired effect.
+ */
+ public void testMaximumDeliveryCount() throws Exception
+ {
+ final FieldTable fieldTable = new FieldTable();
+ fieldTable.setInteger(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5);
+
+ final AMQShortString queueName = new AMQShortString("testMaximumDeliveryCount");
+
+ final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false,
+ _virtualHost, fieldTable);
+
+ assertNotNull("The queue was not registered as expected ", queue);
+ assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount());
+
+ verifyRegisteredQueueCount(1);
+ }
+
+ /**
+ * Tests that omitting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means
+ * that queue is created with a default maximumDeliveryCount of zero (unless set in config).
+ */
+ public void testMaximumDeliveryCountDefault() throws Exception
+ {
+
+ final AMQShortString queueName = new AMQShortString("testMaximumDeliveryCount");
+
+ final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false,
+ _virtualHost, null);
+
+ assertNotNull("The queue was not registered as expected ", queue);
+ assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount());
+
+ verifyRegisteredQueueCount(1);
+ }
+
+ /**
+ * Tests queue creation with queue name set to null
+ */
+ public void testQueueNameNullValidation()
+ {
+ try
+ {
+ AMQQueueFactory.createAMQQueueImpl(null, false, new AMQShortString("owner"), true, false, _virtualHost, null);
+ fail("queue with null name can not be created!");
+ }
+ catch (Exception e)
+ {
+ assertTrue(e instanceof IllegalArgumentException);
+ assertEquals("Queue name must not be null", e.getMessage());
+ }
+ }
+
+ /**
+ * Tests queue creation with queue name length less 255 characters but
+ * corresponding DLQ name length greater than 255.
+ */
+ public void testQueueNameWithLengthLessThan255ButDLQNameWithLengthGreaterThan255()
+ {
+ String queueName = "test-" + generateStringWithLength('a', 245);
+ try
+ {
+ // change DLQ name to make its length bigger than exchange name
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterExchangeSuffix", "_DLE");
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterQueueSuffix", "_DLQUEUE");
+
+ FieldTable fieldTable = new FieldTable();
+ fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+ AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), false, new AMQShortString("owner"),
+ false, false, _virtualHost, fieldTable);
+ fail("queue with DLQ name having more than 255 characters can not be created!");
+ }
+ catch (Exception e)
+ {
+ assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException);
+ assertTrue("Unexpected exception message!", e.getMessage().contains("DLQ queue name")
+ && e.getMessage().contains("length exceeds limit of 255"));
+ }
+ finally
+ {
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+ }
+
+ /**
+ * Tests queue creation with queue name length less 255 characters but
+ * corresponding DL exchange name length greater than 255.
+ */
+ public void testQueueNameWithLengthLessThan255ButDLExchangeNameWithLengthGreaterThan255()
+ {
+ String queueName = "test-" + generateStringWithLength('a', 245);
+ try
+ {
+ // change DLQ name to make its length bigger than exchange name
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterExchangeSuffix", "_DLEXCHANGE");
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterQueueSuffix", "_DLQ");
+
+ FieldTable fieldTable = new FieldTable();
+ fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+ AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), false, new AMQShortString("owner"),
+ false, false, _virtualHost, fieldTable);
+ fail("queue with DLE name having more than 255 characters can not be created!");
+ }
+ catch (Exception e)
+ {
+ assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException);
+ assertTrue("Unexpected exception message!", e.getMessage().contains("DL exchange name")
+ && e.getMessage().contains("length exceeds limit of 255"));
+ }
+ finally
+ {
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+ }
+
+ private String generateStringWithLength(char ch, int length)
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++)
+ {
+ sb.append(ch);
+ }
+ return sb.toString();
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 8f3023f269..f70250132a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -44,6 +44,7 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.TabularData;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
@@ -390,6 +391,34 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
assertFalse(channel.getBlocking());
}
+ public void testMaximumDeliveryCount() throws IOException
+ {
+ assertEquals("Unexpected default maximum delivery count", Integer.valueOf(0), _queueMBean.getMaximumDeliveryCount());
+ }
+
+ public void testViewAllMessages() throws Exception
+ {
+ final int messageCount = 5;
+ sendPersistentMessages(messageCount);
+
+
+ final TabularData messageTable = _queueMBean.viewMessages(1L, 5L);
+ assertNotNull("Message table should not be null", messageTable);
+ assertEquals("Unexpected number of rows", messageCount, messageTable.size());
+
+
+ final Iterator rowIterator = messageTable.values().iterator();
+ // Get its message ID
+ final CompositeDataSupport row1 = (CompositeDataSupport) rowIterator.next();
+ final Long msgId = (Long) row1.get("AMQ MessageId");
+ final Long queuePosition = (Long) row1.get("Queue Position");
+ final Integer deliveryCount = (Integer) row1.get("Delivery Count");
+
+ assertNotNull("Row should have value for queue position", queuePosition);
+ assertNotNull("Row should have value for msgid", msgId);
+ assertNotNull("Row should have value for deliveryCount", deliveryCount);
+ }
+
@Override
public void setUp() throws Exception
@@ -404,6 +433,13 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
ApplicationRegistry.remove();
}
+ private void sendPersistentMessages(int messageCount) throws AMQException
+ {
+ sendMessages(messageCount, true);
+ assertEquals("Expected " + messageCount + " messages in the queue", messageCount, _queueMBean
+ .getMessageCount().intValue());
+ }
+
private List<AMQMessage> sendMessages(int messageCount, boolean persistent) throws AMQException
{
return sendMessages(messageCount, persistent, 0l, 0l);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 4c31092983..0daf79122c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -611,4 +611,21 @@ public class MockAMQQueue implements AMQQueue
{
}
+
+ @Override
+ public int getMaximumDeliveryCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public void setMaximumDeliveryCount(int maximumDeliveryCount)
+ {
+ }
+
+ @Override
+ public void setAlternateExchange(String exchangeName)
+ {
+ }
+
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 864b9ad368..7ad002c248 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -244,4 +244,21 @@ public class MockQueueEntry implements QueueEntry
return null;
}
+ @Override
+ public int getDeliveryCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public void incrementDeliveryCount()
+ {
+ }
+
+ @Override
+ public void decrementDeliveryCount()
+ {
+ }
+
+
}