diff options
| author | Robert Gemmell <robbie@apache.org> | 2010-08-09 10:05:54 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2010-08-09 10:05:54 +0000 |
| commit | 6c02b1702d59edf8fa3f4250f77af1523a0af650 (patch) | |
| tree | f1b8c5590dea04622e24d22863a4a621e3854e7b /java | |
| parent | 884c2e97c140103edf4bb5a0695d8c0678379f83 (diff) | |
| download | qpid-python-6c02b1702d59edf8fa3f4250f77af1523a0af650.tar.gz | |
QPID-2787: Add test for persistence of Conflation/LastValue queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@983571 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
4 files changed, 68 insertions, 38 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index a547205d27..6bfd7470ac 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -33,10 +33,9 @@ import java.util.HashMap; public class AMQQueueFactory { public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); - - private static final String QPID_LVQ_KEY = "qpid.LVQ_key"; - private static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; - private static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key"; + public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; + public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; + public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key"; private abstract static class QueueProperty { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java index 400d9867ae..b5293f51be 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java @@ -39,5 +39,9 @@ public class ConflationQueue extends SimpleAMQQueue super(name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args); } + public String getConflationKey() + { + return ((ConflationQueueList) _entries).getConflationKey(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java index 9a6e5c884a..2c1883e763 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java @@ -21,19 +21,13 @@ package org.apache.qpid.server.queue; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.message.ServerMessage; - import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; + public class ConflationQueueList extends SimpleQueueEntryList { @@ -47,6 +41,11 @@ public class ConflationQueueList extends SimpleQueueEntryList _conflationKey = conflationKey; } + public String getConflationKey() + { + return _conflationKey; + } + @Override protected ConflationQueueEntry createQueueEntry(ServerMessage message) { diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 673928e619..9cc9148c55 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -47,6 +47,7 @@ import org.apache.qpid.server.queue.AMQPriorityQueue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.ConflationQueue; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.SimpleAMQQueue; @@ -67,6 +68,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase { public static final int DEFAULT_PRIORTY_LEVEL = 5; public static final String SELECTOR_VALUE = "Test = 'MST'"; + public static final String LVQ_KEY = "MST-LVQ-KEY"; AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange"); AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange"); @@ -79,6 +81,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive"); AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable"); + AMQShortString durableLastValueQueueName = new AMQShortString("MST-LastValueQueue-Durable"); AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable"); AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue"); AMQShortString queueName = new AMQShortString("MST-Queue"); @@ -180,7 +183,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase validateMessageOnTopics(2, true); assertEquals("Not all queues correctly registered", - 9, _virtualHost.getQueueRegistry().getQueues().size()); + 10, _virtualHost.getQueueRegistry().getQueues().size()); } /** @@ -212,7 +215,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); assertEquals("Incorrect number of queues registered after recovery", - 5, queueRegistry.getQueues().size()); + 6, queueRegistry.getQueues().size()); //clear the queue queueRegistry.getQueue(durableQueueName).clearQueue(); @@ -246,7 +249,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); assertEquals("Incorrect number of queues registered after recovery", - 5, queueRegistry.getQueues().size()); + 6, queueRegistry.getQueues().size()); //Validate the non-Durable Queues were not recovered. assertNull("Non-Durable queue still registered:" + priorityQueueName, @@ -280,7 +283,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase public void testDurableQueueRemoval() throws Exception { //Register Durable Queue - createQueue(durableQueueName, false, true, false); + createQueue(durableQueueName, false, true, false, false); QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); assertEquals("Incorrect number of queues registered before recovery", @@ -401,7 +404,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase //create durable queue and exchange, bind them Exchange exch = createExchange(DirectExchange.TYPE, directExchangeName, true); - createQueue(durableQueueName, false, true, false); + createQueue(durableQueueName, false, true, false, false); bindQueueToExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null); assertEquals("Incorrect number of bindings registered before recovery", @@ -460,7 +463,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase { QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); - assertEquals("There should be 5 (durable) queues following recovery", 5, queueRegistry.getQueues().size()); + assertEquals("Incorrect number of (durable) queues following recovery", 6, queueRegistry.getQueues().size()); validateBindingProperties(queueRegistry.getQueue(durablePriorityQueueName).getBindings(), false); validateBindingProperties(queueRegistry.getQueue(durablePriorityTopicQueueName).getBindings(), true); @@ -515,24 +518,35 @@ public class MessageStoreTest extends InternalBrokerBaseCase { QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); - validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true, true, false); - validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true, true, false); - validateQueueProperties(queueRegistry.getQueue(durableQueueName), false, true, false); - validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false, true, false); - validateQueueProperties(queueRegistry.getQueue(durableExclusiveQueueName), false, true, true); + validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true, true, false, false); + validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true, true, false, false); + validateQueueProperties(queueRegistry.getQueue(durableQueueName), false, true, false, false); + validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false, true, false, false); + validateQueueProperties(queueRegistry.getQueue(durableExclusiveQueueName), false, true, true, false); + validateQueueProperties(queueRegistry.getQueue(durableLastValueQueueName), false, true, true, true); } - private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive) + private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue) { + if(usePriority || lastValueQueue) + { + assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue); + } + if (usePriority) { assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass()); assertEquals("Priority Queue does not have set priorities", DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities()); } + else if (lastValueQueue) + { + assertEquals("Queue is no longer a LastValue Queue", ConflationQueue.class, queue.getClass()); + assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((ConflationQueue) queue).getConflationKey()); + } else { - assertEquals("Queue is no longer a Priority Queue", SimpleAMQQueue.class, queue.getClass()); + assertEquals("Queue is not 'simple'", SimpleAMQQueue.class, queue.getClass()); } assertEquals("Queue owner is not as expected", queueOwner, queue.getOwner()); @@ -628,46 +642,60 @@ public class MessageStoreTest extends InternalBrokerBaseCase private void createAllQueues() { //Register Durable Priority Queue - createQueue(durablePriorityQueueName, true, true, false); + createQueue(durablePriorityQueueName, true, true, false, false); //Register Durable Simple Queue - createQueue(durableQueueName, false, true, false); + createQueue(durableQueueName, false, true, false, false); //Register Durable Exclusive Simple Queue - createQueue(durableExclusiveQueueName, false, true, true); + createQueue(durableExclusiveQueueName, false, true, true, false); + + //Register Durable LastValue Queue + createQueue(durableLastValueQueueName, false, true, true, true); //Register NON-Durable Priority Queue - createQueue(priorityQueueName, true, false, false); + createQueue(priorityQueueName, true, false, false, false); //Register NON-Durable Simple Queue - createQueue(queueName, false, false, false); + createQueue(queueName, false, false, false, false); } private void createAllTopicQueues() { //Register Durable Priority Queue - createQueue(durablePriorityTopicQueueName, true, true, false); + createQueue(durablePriorityTopicQueueName, true, true, false, false); //Register Durable Simple Queue - createQueue(durableTopicQueueName, false, true, false); + createQueue(durableTopicQueueName, false, true, false, false); //Register NON-Durable Priority Queue - createQueue(priorityTopicQueueName, true, false, false); + createQueue(priorityTopicQueueName, true, false, false, false); //Register NON-Durable Simple Queue - createQueue(topicQueueName, false, false, false); + createQueue(topicQueueName, false, false, false, false); } - private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable, boolean exclusive) + private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue) { FieldTable queueArguments = null; + + if(usePriority || lastValueQueue) + { + assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue); + } if (usePriority) { queueArguments = new FieldTable(); queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL); } + + if (lastValueQueue) + { + queueArguments = new FieldTable(); + queueArguments.put(new AMQShortString(AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY), LVQ_KEY); + } AMQQueue queue = null; @@ -677,7 +705,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, queueOwner, false, exclusive, _virtualHost, queueArguments); - validateQueueProperties(queue, usePriority, durable, exclusive); + validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue); if (queue.isDurable() && !queue.isAutoDelete()) { |
