diff options
Diffstat (limited to 'qpid/java')
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java | 5 | ||||
| -rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java | 110 |
2 files changed, 72 insertions, 43 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java index 4b140f1ca7..32d0c4c4d1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java @@ -51,6 +51,11 @@ public class VirtualHostRegistry implements Closeable }
_registry.put(host.getName(),host);
}
+
+ public synchronized void unregisterVirtualHost(VirtualHost host)
+ {
+ _registry.remove(host.getName());
+ }
public VirtualHost getVirtualHost(String name)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 66f84270aa..5db22789c3 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.store; -import junit.framework.TestCase; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.server.util.InternalBrokerBaseCase; @@ -55,6 +54,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.util.FileUtils; import java.io.File; import java.util.List; @@ -62,7 +62,8 @@ import java.util.List; /** * This tests the MessageStores by using the available interfaces. * - * This test validates that Exchanges, Queues, Bindings and Messages are persisted correctly. + * This test validates that Exchanges, Queues, Bindings and Messages are persisted and + * recovered correctly. */ public class MessageStoreTest extends InternalBrokerBaseCase { @@ -70,7 +71,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase private static final int DEFAULT_PRIORTY_LEVEL = 5; private static final Logger _logger = LoggerFactory.getLogger(MessageStoreTest.class); - public void testMemoryMessageStore() + public void testMemoryMessageStore() throws Exception { PropertiesConfiguration config = new PropertiesConfiguration(); @@ -80,7 +81,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase runTestWithStore(config); } - public void DISABLE_testDerbyMessageStore() + public void testDerbyMessageStore() throws Exception { PropertiesConfiguration config = new PropertiesConfiguration(); @@ -97,10 +98,13 @@ public class MessageStoreTest extends InternalBrokerBaseCase try { _virtualHost.close(); + _virtualHost.getApplicationRegistry(). + getVirtualHostRegistry().unregisterVirtualHost(_virtualHost); } catch (Exception e) { fail(e.getMessage()); + } } @@ -125,6 +129,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue"); AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue"); + AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive"); AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable"); AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable"); AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue"); @@ -134,7 +139,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase AMQShortString topicRouting = new AMQShortString("MST-topic"); - protected void runTestWithStore(Configuration configuration) + protected void runTestWithStore(Configuration configuration) throws AMQException { //Ensure Environment Path is empty cleanup(configuration); @@ -179,7 +184,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase //Ensure all the topics have two messages (one transient, one persistent) validateMessageOnTopics(2, true); - assertEquals("Not all queues correctly registered", 8, _virtualHost.getQueueRegistry().getQueues().size()); + assertEquals("Not all queues correctly registered", 9, _virtualHost.getQueueRegistry().getQueues().size()); if (!messageStore.isPersistent()) { @@ -198,9 +203,9 @@ public class MessageStoreTest extends InternalBrokerBaseCase validateExchanges(); - //Validate Durable Queues still have the persistentn message + //Validate Durable Queues still have the persistent message validateMessageOnQueues(2, false); - //Validate Durable Queues still have the persistentn message + //Validate Durable Queues still have the persistent message validateMessageOnTopics(1, false); //Validate Properties of Binding @@ -208,6 +213,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase //Validate Properties of Queues validateQueueProperties(); + validateQueueExclusivityProperty(true); //Validate Non-Durable Queues are gone. assertNull("Non-Durable queue still registered:" + priorityQueueName, _virtualHost.getQueueRegistry().getQueue(priorityQueueName)); @@ -215,7 +221,20 @@ public class MessageStoreTest extends InternalBrokerBaseCase assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, _virtualHost.getQueueRegistry().getQueue(priorityTopicQueueName)); assertNull("Non-Durable queue still registered:" + topicQueueName, _virtualHost.getQueueRegistry().getQueue(topicQueueName)); - assertEquals("Not all queues correctly registered", 4, _virtualHost.getQueueRegistry().getQueues().size()); + assertEquals("Not all queues correctly registered", 5, _virtualHost.getQueueRegistry().getQueues().size()); + + //Try updating the queue exclusivity and verify it is persisted and recovered correctly + setQueueExclusivity(false); + + //Reload the Virtualhost to test update to queue exclusivity + _logger.info("Reloading Virtualhost"); + original = _virtualHost; + reload(configuration); + + assertTrue("Virtualhost has not been reloaded", original != _virtualHost); + + //verify the change was persisted + validateQueueExclusivityProperty(false); } private void validateExchanges() @@ -229,8 +248,8 @@ public class MessageStoreTest extends InternalBrokerBaseCase assertTrue(nonDurableExchangeName + " exchange reloaded after failover", !registry.getExchangeNames().contains(nonDurableExchangeName)); - // There are 5 required exchanges + our 2 durable queues - assertEquals("Incorrect number of exchanges available", 5 + 2, registry.getExchangeNames().size()); + // There are 5 required exchanges + qpid.management + our 2 durable exchanges + assertEquals("Incorrect number of exchanges available", 6 + 2, registry.getExchangeNames().size()); } /** Validates that the Durable queues */ @@ -259,10 +278,29 @@ public class MessageStoreTest extends InternalBrokerBaseCase if (useSelectors) { assertTrue("Binding does not contain a Selector argument.", - binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.toString())); + binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())); } } + + private void setQueueExclusivity(boolean exclusive) throws AMQException + { + QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); + AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName); + + queue.setExclusive(exclusive); + } + + private void validateQueueExclusivityProperty(boolean expected) + { + QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); + + AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName); + + assertEquals("Queue exclusivity was incorrect", queue.isExclusive(), expected); + } + + private void validateQueueProperties() { QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); @@ -271,7 +309,6 @@ public class MessageStoreTest extends InternalBrokerBaseCase validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true); validateQueueProperties(queueRegistry.getQueue(durableQueueName), false); validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false); - } private void validateQueueProperties(AMQQueue queue, boolean usePriority) @@ -300,29 +337,14 @@ public class MessageStoreTest extends InternalBrokerBaseCase if (environment != null) { File environmentPath = new File(environment); - + if (environmentPath.exists()) { - deleteDirectory(environmentPath); + FileUtils.delete(environmentPath, true); } } } - private void deleteDirectory(File path) - { - if (path.isDirectory()) - { - for (File file : path.listFiles()) - { - deleteDirectory(file); - } - } - else - { - path.delete(); - } - } - private void sendMessageOnExchange(Exchange directExchange, AMQShortString routingKey, boolean deliveryMode) { //Set MessagePersustebce @@ -360,11 +382,10 @@ public class MessageStoreTest extends InternalBrokerBaseCase MessageMetaData mmd = currentMessage.headersReceived(); currentMessage.setStoredMessage(_virtualHost.getMessageStore().addMessage(mmd)); - + currentMessage.getStoredMessage().flushToStore(); currentMessage.route(); - // check and deliver if header says body length is zero if (currentMessage.allContentReceived()) { @@ -400,31 +421,34 @@ public class MessageStoreTest extends InternalBrokerBaseCase private void createAllQueues() { //Register Durable Priority Queue - createQueue(durablePriorityQueueName, true, true); + createQueue(durablePriorityQueueName, true, true, false); //Register Durable Simple Queue - createQueue(durableQueueName, false, true); - + createQueue(durableQueueName, false, true, false); + + //Register Durable Exclusive Simple Queue + createQueue(durableExclusiveQueueName, false, true, true); + //Register NON-Durable Priority Queue - createQueue(priorityQueueName, true, false); + createQueue(priorityQueueName, true, false, false); //Register NON-Durable Simple Queue - createQueue(queueName, false, false); + createQueue(queueName, false, false, false); } private void createAllTopicQueues() { //Register Durable Priority Queue - createQueue(durablePriorityTopicQueueName, true, true); + createQueue(durablePriorityTopicQueueName, true, true, false); //Register Durable Simple Queue - createQueue(durableTopicQueueName, false, true); + createQueue(durableTopicQueueName, false, true, false); //Register NON-Durable Priority Queue - createQueue(priorityTopicQueueName, true, false); + createQueue(priorityTopicQueueName, true, false, false); //Register NON-Durable Simple Queue - createQueue(topicQueueName, false, false); + createQueue(topicQueueName, false, false, false); } private Exchange createExchange(ExchangeType type, AMQShortString name, boolean durable) @@ -455,7 +479,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase return exchange; } - private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable) + private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable, boolean exclusive) { FieldTable queueArguments = null; @@ -471,7 +495,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase //Ideally we would be able to use the QueueDeclareHandler here. try { - queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, queueOwner, false, false, + queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, queueOwner, false, exclusive, _virtualHost, queueArguments); validateQueueProperties(queue, usePriority); |
