summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java110
2 files changed, 72 insertions, 43 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
index 4b140f1ca7..32d0c4c4d1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
@@ -51,6 +51,11 @@ public class VirtualHostRegistry implements Closeable
}
_registry.put(host.getName(),host);
}
+
+ public synchronized void unregisterVirtualHost(VirtualHost host)
+ {
+ _registry.remove(host.getName());
+ }
public VirtualHost getVirtualHost(String name)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 66f84270aa..5db22789c3 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.store;
-import junit.framework.TestCase;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
@@ -55,6 +54,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.util.FileUtils;
import java.io.File;
import java.util.List;
@@ -62,7 +62,8 @@ import java.util.List;
/**
* This tests the MessageStores by using the available interfaces.
*
- * This test validates that Exchanges, Queues, Bindings and Messages are persisted correctly.
+ * This test validates that Exchanges, Queues, Bindings and Messages are persisted and
+ * recovered correctly.
*/
public class MessageStoreTest extends InternalBrokerBaseCase
{
@@ -70,7 +71,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
private static final int DEFAULT_PRIORTY_LEVEL = 5;
private static final Logger _logger = LoggerFactory.getLogger(MessageStoreTest.class);
- public void testMemoryMessageStore()
+ public void testMemoryMessageStore() throws Exception
{
PropertiesConfiguration config = new PropertiesConfiguration();
@@ -80,7 +81,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
runTestWithStore(config);
}
- public void DISABLE_testDerbyMessageStore()
+ public void testDerbyMessageStore() throws Exception
{
PropertiesConfiguration config = new PropertiesConfiguration();
@@ -97,10 +98,13 @@ public class MessageStoreTest extends InternalBrokerBaseCase
try
{
_virtualHost.close();
+ _virtualHost.getApplicationRegistry().
+ getVirtualHostRegistry().unregisterVirtualHost(_virtualHost);
}
catch (Exception e)
{
fail(e.getMessage());
+
}
}
@@ -125,6 +129,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue");
AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue");
+ AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive");
AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable");
AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable");
AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue");
@@ -134,7 +139,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
AMQShortString topicRouting = new AMQShortString("MST-topic");
- protected void runTestWithStore(Configuration configuration)
+ protected void runTestWithStore(Configuration configuration) throws AMQException
{
//Ensure Environment Path is empty
cleanup(configuration);
@@ -179,7 +184,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
//Ensure all the topics have two messages (one transient, one persistent)
validateMessageOnTopics(2, true);
- assertEquals("Not all queues correctly registered", 8, _virtualHost.getQueueRegistry().getQueues().size());
+ assertEquals("Not all queues correctly registered", 9, _virtualHost.getQueueRegistry().getQueues().size());
if (!messageStore.isPersistent())
{
@@ -198,9 +203,9 @@ public class MessageStoreTest extends InternalBrokerBaseCase
validateExchanges();
- //Validate Durable Queues still have the persistentn message
+ //Validate Durable Queues still have the persistent message
validateMessageOnQueues(2, false);
- //Validate Durable Queues still have the persistentn message
+ //Validate Durable Queues still have the persistent message
validateMessageOnTopics(1, false);
//Validate Properties of Binding
@@ -208,6 +213,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
//Validate Properties of Queues
validateQueueProperties();
+ validateQueueExclusivityProperty(true);
//Validate Non-Durable Queues are gone.
assertNull("Non-Durable queue still registered:" + priorityQueueName, _virtualHost.getQueueRegistry().getQueue(priorityQueueName));
@@ -215,7 +221,20 @@ public class MessageStoreTest extends InternalBrokerBaseCase
assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, _virtualHost.getQueueRegistry().getQueue(priorityTopicQueueName));
assertNull("Non-Durable queue still registered:" + topicQueueName, _virtualHost.getQueueRegistry().getQueue(topicQueueName));
- assertEquals("Not all queues correctly registered", 4, _virtualHost.getQueueRegistry().getQueues().size());
+ assertEquals("Not all queues correctly registered", 5, _virtualHost.getQueueRegistry().getQueues().size());
+
+ //Try updating the queue exclusivity and verify it is persisted and recovered correctly
+ setQueueExclusivity(false);
+
+ //Reload the Virtualhost to test update to queue exclusivity
+ _logger.info("Reloading Virtualhost");
+ original = _virtualHost;
+ reload(configuration);
+
+ assertTrue("Virtualhost has not been reloaded", original != _virtualHost);
+
+ //verify the change was persisted
+ validateQueueExclusivityProperty(false);
}
private void validateExchanges()
@@ -229,8 +248,8 @@ public class MessageStoreTest extends InternalBrokerBaseCase
assertTrue(nonDurableExchangeName + " exchange reloaded after failover",
!registry.getExchangeNames().contains(nonDurableExchangeName));
- // There are 5 required exchanges + our 2 durable queues
- assertEquals("Incorrect number of exchanges available", 5 + 2, registry.getExchangeNames().size());
+ // There are 5 required exchanges + qpid.management + our 2 durable exchanges
+ assertEquals("Incorrect number of exchanges available", 6 + 2, registry.getExchangeNames().size());
}
/** Validates that the Durable queues */
@@ -259,10 +278,29 @@ public class MessageStoreTest extends InternalBrokerBaseCase
if (useSelectors)
{
assertTrue("Binding does not contain a Selector argument.",
- binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.toString()));
+ binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()));
}
}
+
+ private void setQueueExclusivity(boolean exclusive) throws AMQException
+ {
+ QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+ AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName);
+
+ queue.setExclusive(exclusive);
+ }
+
+ private void validateQueueExclusivityProperty(boolean expected)
+ {
+ QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+
+ AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName);
+
+ assertEquals("Queue exclusivity was incorrect", queue.isExclusive(), expected);
+ }
+
+
private void validateQueueProperties()
{
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
@@ -271,7 +309,6 @@ public class MessageStoreTest extends InternalBrokerBaseCase
validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true);
validateQueueProperties(queueRegistry.getQueue(durableQueueName), false);
validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false);
-
}
private void validateQueueProperties(AMQQueue queue, boolean usePriority)
@@ -300,29 +337,14 @@ public class MessageStoreTest extends InternalBrokerBaseCase
if (environment != null)
{
File environmentPath = new File(environment);
-
+
if (environmentPath.exists())
{
- deleteDirectory(environmentPath);
+ FileUtils.delete(environmentPath, true);
}
}
}
- private void deleteDirectory(File path)
- {
- if (path.isDirectory())
- {
- for (File file : path.listFiles())
- {
- deleteDirectory(file);
- }
- }
- else
- {
- path.delete();
- }
- }
-
private void sendMessageOnExchange(Exchange directExchange, AMQShortString routingKey, boolean deliveryMode)
{
//Set MessagePersustebce
@@ -360,11 +382,10 @@ public class MessageStoreTest extends InternalBrokerBaseCase
MessageMetaData mmd = currentMessage.headersReceived();
currentMessage.setStoredMessage(_virtualHost.getMessageStore().addMessage(mmd));
-
+ currentMessage.getStoredMessage().flushToStore();
currentMessage.route();
-
// check and deliver if header says body length is zero
if (currentMessage.allContentReceived())
{
@@ -400,31 +421,34 @@ public class MessageStoreTest extends InternalBrokerBaseCase
private void createAllQueues()
{
//Register Durable Priority Queue
- createQueue(durablePriorityQueueName, true, true);
+ createQueue(durablePriorityQueueName, true, true, false);
//Register Durable Simple Queue
- createQueue(durableQueueName, false, true);
-
+ createQueue(durableQueueName, false, true, false);
+
+ //Register Durable Exclusive Simple Queue
+ createQueue(durableExclusiveQueueName, false, true, true);
+
//Register NON-Durable Priority Queue
- createQueue(priorityQueueName, true, false);
+ createQueue(priorityQueueName, true, false, false);
//Register NON-Durable Simple Queue
- createQueue(queueName, false, false);
+ createQueue(queueName, false, false, false);
}
private void createAllTopicQueues()
{
//Register Durable Priority Queue
- createQueue(durablePriorityTopicQueueName, true, true);
+ createQueue(durablePriorityTopicQueueName, true, true, false);
//Register Durable Simple Queue
- createQueue(durableTopicQueueName, false, true);
+ createQueue(durableTopicQueueName, false, true, false);
//Register NON-Durable Priority Queue
- createQueue(priorityTopicQueueName, true, false);
+ createQueue(priorityTopicQueueName, true, false, false);
//Register NON-Durable Simple Queue
- createQueue(topicQueueName, false, false);
+ createQueue(topicQueueName, false, false, false);
}
private Exchange createExchange(ExchangeType type, AMQShortString name, boolean durable)
@@ -455,7 +479,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
return exchange;
}
- private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable)
+ private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable, boolean exclusive)
{
FieldTable queueArguments = null;
@@ -471,7 +495,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
//Ideally we would be able to use the QueueDeclareHandler here.
try
{
- queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, queueOwner, false, false,
+ queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, queueOwner, false, exclusive,
_virtualHost, queueArguments);
validateQueueProperties(queue, usePriority);