summaryrefslogtreecommitdiff
path: root/java/broker/src/test
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-20 23:37:48 +0000
committerKeith Wall <kwall@apache.org>2011-11-20 23:37:48 +0000
commita0fccd9dbfabd912a906225f817cd00072d7fc8d (patch)
tree71d457039d047e12ee5bec2f6cc98f457f050bec /java/broker/src/test
parent33db3cab8c1ec8d82045e557b190a98a2418c565 (diff)
downloadqpid-python-a0fccd9dbfabd912a906225f817cd00072d7fc8d.tar.gz
QPID-3622: Add Sorted Queue funtionality
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1204295 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/test')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java15
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java33
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java8
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java9
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java10
-rwxr-xr-xjava/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java22
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (renamed from java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java)126
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java190
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java97
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java160
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java17
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java59
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java153
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java62
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java323
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java6
16 files changed, 1019 insertions, 271 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
index d2f2ae5eea..9941c00499 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
@@ -129,7 +129,19 @@ public class QueueConfigurationTest extends TestCase
assertEquals(1, qConf.getMinimumAlertRepeatGap());
}
- private VirtualHostConfiguration overrideConfiguration(String property, int value)
+ public void testSortQueueConfiguration() throws ConfigurationException
+ {
+ //Check default value
+ QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
+ assertNull(qConf.getQueueSortKey());
+
+ // Check explicit value
+ final VirtualHostConfiguration vhostConfig = overrideConfiguration("sortKey", "test-sort-key");
+ qConf = new QueueConfiguration("test", vhostConfig);
+ assertEquals("test-sort-key", qConf.getQueueSortKey());
+ }
+
+ private VirtualHostConfiguration overrideConfiguration(String property, Object value)
throws ConfigurationException
{
PropertiesConfiguration queueConfig = new PropertiesConfiguration();
@@ -141,5 +153,4 @@ public class QueueConfigurationTest extends TestCase
return new VirtualHostConfiguration("test", config);
}
-
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 3b7f5f3a51..7b73987abf 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -20,9 +20,17 @@
*/
package org.apache.qpid.server.exchange;
-import junit.framework.TestCase;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -52,17 +60,6 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
{
private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class);
@@ -483,6 +480,16 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
{
return false;
}
+
+ public QueueEntry getNextNode()
+ {
+ return null;
+ }
+
+ public QueueEntry getNextValidEntry()
+ {
+ return null;
+ }
};
if(action != null)
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index 3961b3b355..2ce43052d9 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -21,12 +21,12 @@ package org.apache.qpid.server.queue;
*/
import java.util.ArrayList;
-
+import junit.framework.AssertionFailedError;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
-import junit.framework.AssertionFailedError;
+import org.apache.qpid.server.message.AMQMessage;
public class AMQPriorityQueueTest extends SimpleAMQQueueTest
{
@@ -35,7 +35,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
public void setUp() throws Exception
{
_arguments = new FieldTable();
- _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, 3);
+ _arguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 3);
super.setUp();
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
index 7000df157e..12369bd7d4 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
@@ -20,24 +20,23 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessage;
public class MockAMQMessage extends AMQMessage
{
public MockAMQMessage(long messageId)
- throws AMQException
{
super(new MockStoredMessage(messageId));
}
-
-
+ public MockAMQMessage(long messageId, String headerName, Object headerValue)
+ {
+ super(new MockStoredMessage(messageId, headerName, headerValue));
+ }
@Override
public long getSize()
{
return 0l;
}
-
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index ab8850c18c..864b9ad368 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -234,4 +234,14 @@ public class MockQueueEntry implements QueueEntry
return false;
}
+ public QueueEntry getNextNode()
+ {
+ return null;
+ }
+
+ public QueueEntry getNextValidEntry()
+ {
+ return null;
+ }
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
index 7dc491de4d..78ed3e9f34 100755
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.framing.FieldTable;
+
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.server.store.StoredMessage;
@@ -36,18 +38,32 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData>
private MessageMetaData _metaData;
private final ByteBuffer _content;
-
public MockStoredMessage(long messageId)
{
- this(messageId, new MockMessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties(), 60));
+ this(messageId, (String)null, null);
+ }
+
+ public MockStoredMessage(long messageId, String headerName, Object headerValue)
+ {
+ this(messageId, new MockMessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties(), 60), headerName, headerValue);
}
public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb)
{
+ this(messageId, info, chb, null, null);
+ }
+
+ public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb, String headerName, Object headerValue)
+ {
_messageId = messageId;
+ if (headerName != null)
+ {
+ FieldTable headers = new FieldTable();
+ headers.setString(headerName, headerValue == null? null :String.valueOf(headerValue));
+ ((BasicContentHeaderProperties)chb.getProperties()).setHeaders(headers);
+ }
_metaData = new MessageMetaData(info, chb, 0);
_content = ByteBuffer.allocate(_metaData.getContentSize());
-
}
public MessageMetaData getMetaData()
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index d8afd8d829..d336132316 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -19,9 +19,7 @@
package org.apache.qpid.server.queue;
import java.lang.reflect.Field;
-
import junit.framework.TestCase;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry.EntryState;
@@ -30,18 +28,27 @@ import org.apache.qpid.server.subscription.Subscription;
/**
* Tests for {@link QueueEntryImpl}
- *
*/
-public class QueueEntryImplTest extends TestCase
+public abstract class QueueEntryImplTestBase extends TestCase
{
// tested entry
- private QueueEntryImpl _queueEntry;
+ protected QueueEntryImpl _queueEntry;
+ protected QueueEntryImpl _queueEntry2;
+ protected QueueEntryImpl _queueEntry3;
+
+ public abstract QueueEntryImpl getQueueEntryImpl(int msgid) throws AMQException;
+
+ public abstract void testCompareTo();
+
+ public abstract void testTraverseWithNoDeletedEntries();
+
+ public abstract void testTraverseWithDeletedEntries();
public void setUp() throws Exception
{
- AMQMessage message = new MockAMQMessage(1);
- SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
- _queueEntry = new QueueEntryImpl(queueEntryList, message, 1);
+ _queueEntry = getQueueEntryImpl(1);
+ _queueEntry2 = getQueueEntryImpl(2);
+ _queueEntry3 = getQueueEntryImpl(3);
}
public void testAquire()
@@ -105,61 +112,6 @@ public class QueueEntryImplTest extends TestCase
}
/**
- * Tests if entries in DEQUQUED or DELETED state are not returned by getNext method.
- */
- public void testGetNext()
- {
- int numberOfEntries = 5;
- QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries];
- SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
-
- // create test entries
- for(int i = 0; i < numberOfEntries ; i++)
- {
- AMQMessage message = null;;
- try
- {
- message = new MockAMQMessage(i);
- }
- catch (AMQException e)
- {
- fail("Failure to create a mock message:" + e.getMessage());
- }
- QueueEntryImpl entry = (QueueEntryImpl)queueEntryList.add(message);
- entries[i] = entry;
- }
-
- // test getNext for not acquired entries
- for(int i = 0; i < numberOfEntries ; i++)
- {
- QueueEntryImpl queueEntry = entries[i];
- QueueEntryImpl next = queueEntry.getNext();
- if (i < numberOfEntries - 1)
- {
- assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next);
- }
- else
- {
- assertNull("The next entry after the last should be null", next);
- }
- }
-
- // delete second
- entries[1].acquire();
- entries[1].delete();
-
- // dequeue third
- entries[2].acquire();
- entries[2].dequeue();
-
- QueueEntryImpl next = entries[0].getNext();
- assertEquals("expected forth entry",entries[3], next);
- next = next.getNext();
- assertEquals("expected fifth entry", entries[4], next);
- next = next.getNext();
- assertNull("The next entry after the last should be null", next);
- }
- /**
* A helper method to put tested object into deleted state and assert the state
*/
private void delete()
@@ -244,4 +196,52 @@ public class QueueEntryImplTest extends TestCase
assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id));
}
+
+ /**
+ * Tests if entries in DEQUQUED or DELETED state are not returned by getNext method.
+ */
+ public void testGetNext()
+ {
+ int numberOfEntries = 5;
+ QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries];
+ SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+
+ // create test entries
+ for(int i = 0; i < numberOfEntries ; i++)
+ {
+ AMQMessage message = new MockAMQMessage(i);
+ QueueEntryImpl entry = (QueueEntryImpl)queueEntryList.add(message);
+ entries[i] = entry;
+ }
+
+ // test getNext for not acquired entries
+ for(int i = 0; i < numberOfEntries ; i++)
+ {
+ QueueEntryImpl queueEntry = entries[i];
+ QueueEntry next = queueEntry.getNextValidEntry();
+ if (i < numberOfEntries - 1)
+ {
+ assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next);
+ }
+ else
+ {
+ assertNull("The next entry after the last should be null", next);
+ }
+ }
+
+ // delete second
+ entries[1].acquire();
+ entries[1].delete();
+
+ // dequeue third
+ entries[2].acquire();
+ entries[2].dequeue();
+
+ QueueEntry next = entries[0].getNextValidEntry();
+ assertEquals("expected forth entry",entries[3], next);
+ next = next.getNextValidEntry();
+ assertEquals("expected fifth entry", entries[4], next);
+ next = next.getNextValidEntry();
+ assertNull("The next entry after the last should be null", next);
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
new file mode 100644
index 0000000000..7a3f6f701c
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
+
+/**
+ * Abstract test class for QueueEntryList implementations.
+ */
+public abstract class QueueEntryListTestBase extends TestCase
+{
+ protected static final AMQQueue _testQueue = new MockAMQQueue("test");
+ public abstract QueueEntryList<QueueEntry> getTestList();
+ public abstract long getExpectedFirstMsgId();
+ public abstract int getExpectedListLength();
+ public abstract ServerMessage getTestMessageToAdd() throws AMQException;
+
+ public void testGetQueue()
+ {
+ assertEquals("Unexpected head entry returned by getHead()", getTestList().getQueue(), _testQueue);
+ }
+
+ /**
+ * Test to add a message with properties specific to the queue type.
+ * @see QueueEntryListTestBase#getTestList()
+ * @see QueueEntryListTestBase#getTestMessageToAdd()
+ * @throws AMQException
+ */
+ public void testAddSpecificMessage() throws AMQException
+ {
+ final QueueEntryList<QueueEntry> list = getTestList();
+ list.add(getTestMessageToAdd());
+
+ final QueueEntryIterator<?> iter = list.iterator();
+ int count = 0;
+ while(iter.advance())
+ {
+ iter.getNode();
+ count++;
+ }
+ assertEquals("List did not grow by one entry after an add", getExpectedListLength() + 1, count);
+ }
+
+ /**
+ * Test to add a generic mock message.
+ * @see QueueEntryListTestBase#getTestList()
+ * @see QueueEntryListTestBase#getExpectedListLength()
+ * @see MockAMQMessage
+ * @throws AMQException
+ */
+ public void testAddGenericMessage() throws AMQException
+ {
+ final QueueEntryList<QueueEntry> list = getTestList();
+ list.add(new MockAMQMessage(666));
+
+ final QueueEntryIterator<?> iter = list.iterator();
+ int count = 0;
+ while(iter.advance())
+ {
+ iter.getNode();
+ count++;
+ }
+ assertEquals("List did not grow by one entry after a generic message added", getExpectedListLength() + 1, count);
+
+ }
+
+ /**
+ * Test for getting the next element in a queue list.
+ * @see QueueEntryListTestBase#getTestList()
+ * @see QueueEntryListTestBase#getExpectedListLength()
+ */
+ public void testListNext()
+ {
+ final QueueEntryList<QueueEntry> entryList = getTestList();
+ QueueEntry entry = entryList.getHead();
+ int count = 0;
+ while(entryList.next(entry) != null)
+ {
+ entry = entryList.next(entry);
+ count++;
+ }
+ assertEquals("Get next didnt get all the list entries", getExpectedListLength(), count);
+ }
+
+ /**
+ * Basic test for the associated QueueEntryIterator implementation.
+ * @see QueueEntryListTestBase#getTestList()
+ * @see QueueEntryListTestBase#getExpectedListLength()
+ */
+ public void testIterator()
+ {
+ final QueueEntryIterator<?> iter = getTestList().iterator();
+ int count = 0;
+ while(iter.advance())
+ {
+ iter.getNode();
+ count++;
+ }
+ assertEquals("Iterator invalid", getExpectedListLength(), count);
+ }
+
+ /**
+ * Test for associated QueueEntryIterator implementation that checks it handles "removed" messages.
+ * @see QueueEntryListTestBase#getTestList()
+ * @see QueueEntryListTestBase#getExpectedListLength()
+ */
+ public void testDequedMessagedNotPresentInIterator() throws Exception
+ {
+ final int numberOfMessages = getExpectedListLength();
+ final QueueEntryList<QueueEntry> entryList = getTestList();
+
+ // dequeue all even messages
+ final QueueEntryIterator<?> it1 = entryList.iterator();
+ int counter = 0;
+ while (it1.advance())
+ {
+ final QueueEntry queueEntry = it1.getNode();
+ if(counter++ % 2 == 0)
+ {
+ queueEntry.acquire();
+ queueEntry.dequeue();
+ }
+ }
+
+ // iterate and check that dequeued messages are not returned by iterator
+ final QueueEntryIterator<?> it2 = entryList.iterator();
+ int counter2 = 0;
+ while(it2.advance())
+ {
+ it2.getNode();
+ counter2++;
+ }
+ final int expectedNumber = numberOfMessages / 2;
+ assertEquals("Expected " + expectedNumber + " number of entries in iterator but got " + counter2,
+ expectedNumber, counter2);
+ }
+
+ /**
+ * Test to verify the head of the queue list is returned as expected.
+ * @see QueueEntryListTestBase#getTestList()
+ * @see QueueEntryListTestBase#getExpectedFirstMsgId()
+ */
+ public void testGetHead()
+ {
+ final QueueEntry head = getTestList().getHead();
+ assertNull("Head entry should not contain an actual message", head.getMessage());
+ assertEquals("Unexpected message id for first list entry", getExpectedFirstMsgId(), getTestList().next(head)
+ .getMessage().getMessageNumber().longValue());
+ }
+
+ /**
+ * Test to verify the entry deletion handled correctly.
+ * @see QueueEntryListTestBase#getTestList()
+ */
+ public void testEntryDeleted()
+ {
+ final QueueEntry head = getTestList().getHead();
+
+ final QueueEntry first = getTestList().next(head);
+ first.delete();
+
+ final QueueEntry second = getTestList().next(head);
+ assertNotSame("After deletion the next entry should be different", first.getMessage().getMessageNumber(), second
+ .getMessage().getMessageNumber());
+
+ final QueueEntry third = getTestList().next(first);
+ assertEquals("After deletion the deleted nodes next node should be the same as the next from head", second
+ .getMessage().getMessageNumber(), third.getMessage().getMessageNumber());
+ }
+
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java
deleted file mode 100644
index b67723dd25..0000000000
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.test.utils.QpidTestCase;
-
-/**
- *
- * Tests QueueEntry
- *
- */
-public class QueueEntryTest extends QpidTestCase
-{
- private QueueEntryImpl _queueEntry1 = null;
- private QueueEntryImpl _queueEntry2 = null;
- private QueueEntryImpl _queueEntry3 = null;
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- int i = 0;
-
- SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(null);
- _queueEntry1 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++));
- _queueEntry2 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++));
- _queueEntry3 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++));
- }
-
- public void testCompareTo()
- {
- assertTrue(_queueEntry1.compareTo(_queueEntry2) < 0);
- assertTrue(_queueEntry2.compareTo(_queueEntry1) > 0);
- assertTrue(_queueEntry1.compareTo(_queueEntry1) == 0);
- }
-
- /**
- * Tests that the getNext() can be used to traverse the list.
- */
- public void testTraverseWithNoDeletedEntries()
- {
- QueueEntryImpl current = _queueEntry1;
-
- current = current.getNext();
- assertSame("Unexpected current entry",_queueEntry2, current);
-
- current = current.getNext();
- assertSame("Unexpected current entry",_queueEntry3, current);
-
- current = current.getNext();
- assertNull(current);
-
- }
-
- /**
- * Tests that the getNext() can be used to traverse the list but deleted
- * entries are skipped and de-linked from the chain of entries.
- */
- public void testTraverseWithDeletedEntries()
- {
- // Delete 2nd queue entry
- _queueEntry2.delete();
- assertTrue(_queueEntry2.isDeleted());
-
-
- QueueEntryImpl current = _queueEntry1;
-
- current = current.getNext();
- assertSame("Unexpected current entry",_queueEntry3, current);
-
- current = current.getNext();
- assertNull(current);
-
- // Assert the side effects of getNext()
- assertSame("Next node of entry 1 should now be entry 3",
- _queueEntry3, _queueEntry1.nextNode());
- }
-}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
new file mode 100644
index 0000000000..7ff693e4c4
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.Assert;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour;
+
+/**
+ * Test extension of SortedQueueEntryList that provides data structure validation tests.
+ * @see SortedQueueEntryList
+ */
+public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList
+{
+ public SelfValidatingSortedQueueEntryList(AMQQueue queue, String propertyName)
+ {
+ super(queue, propertyName);
+ }
+
+ @Override /** Overridden to automatically check queue properties before and after. */
+ public SortedQueueEntryImpl add(final ServerMessage message)
+ {
+ assertQueueProperties(); //before add
+ final SortedQueueEntryImpl result = super.add(message);
+ assertQueueProperties(); //after add
+ return result;
+ }
+
+ @Override /** Overridden to automatically check queue properties before and after. */
+ public void entryDeleted(SortedQueueEntryImpl entry)
+ {
+ assertQueueProperties(); //before delete
+ super.entryDeleted(entry);
+ assertQueueProperties(); //after delete
+ }
+
+ public void assertQueueProperties()
+ {
+ assertRootIsBlack();
+ assertTreeIntegrity();
+ assertChildrenOfRedAreBlack();
+ assertLeavesSameBlackPath();
+ }
+
+ public void assertRootIsBlack()
+ {
+ if(!isNodeColour(getRoot(), Colour.BLACK))
+ {
+ Assert.fail("Root Not Black");
+ }
+ }
+
+ public void assertTreeIntegrity()
+ {
+ assertTreeIntegrity(getRoot());
+ }
+
+ public void assertTreeIntegrity(final SortedQueueEntryImpl node)
+ {
+ if(node == null)
+ {
+ return;
+ }
+ if(node.getLeft() != null)
+ {
+ if(node.getLeft().getParent() == node)
+ {
+ assertTreeIntegrity(node.getLeft());
+ }
+ else
+ {
+ Assert.fail("Tree integrity compromised");
+ }
+ }
+ if(node.getRight() != null)
+ {
+ if(node.getRight().getParent() == node)
+ {
+ assertTreeIntegrity(node.getRight());
+ }
+ else
+ {
+ Assert.fail("Tree integrity compromised");
+ }
+
+ }
+ }
+
+ public void assertLeavesSameBlackPath()
+ {
+ assertLeavesSameBlackPath(getRoot());
+ }
+
+ public int assertLeavesSameBlackPath(final SortedQueueEntryImpl node)
+ {
+ if(node == null)
+ {
+ return 1;
+ }
+ final int left = assertLeavesSameBlackPath(node.getLeft());
+ final int right = assertLeavesSameBlackPath(node.getLeft());
+ if(left == right)
+ {
+ return isNodeColour(node, Colour.BLACK) ? 1 + left : left;
+ }
+ else
+ {
+ Assert.fail("Unequal paths to leaves");
+ return 1; //compiler
+ }
+ }
+
+ public void assertChildrenOfRedAreBlack()
+ {
+ assertChildrenOfRedAreBlack(getRoot());
+ }
+
+ public void assertChildrenOfRedAreBlack(final SortedQueueEntryImpl node)
+ {
+ if(node == null)
+ {
+ return;
+ }
+ else if(node.getColour() == Colour.BLACK)
+ {
+ assertChildrenOfRedAreBlack(node.getLeft());
+ assertChildrenOfRedAreBlack(node.getRight());
+ }
+ else
+ {
+ if(isNodeColour(node.getLeft(), Colour.BLACK)
+ && isNodeColour(node.getRight(), Colour.BLACK))
+ {
+ assertChildrenOfRedAreBlack(node.getLeft());
+ assertChildrenOfRedAreBlack(node.getRight());
+ }
+ else
+ {
+ Assert.fail("Children of Red are not both black");
+ }
+ }
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index f4cdbbe02c..e4ed232f13 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -21,8 +21,12 @@
package org.apache.qpid.server.queue;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.PropertiesConfiguration;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
@@ -40,6 +44,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction;
import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
@@ -51,12 +56,6 @@ import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
public class SimpleAMQQueueTest extends InternalBrokerBaseCase
{
@@ -1110,9 +1109,9 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
* Entries with even message id are considered
* dequeued!
*/
- protected QueueEntryImpl createQueueEntry(final ServerMessage message)
+ protected SimpleQueueEntryImpl createQueueEntry(final ServerMessage message)
{
- return new QueueEntryImpl(this, message)
+ return new SimpleQueueEntryImpl(this, message)
{
public boolean isDequeued()
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
new file mode 100644
index 0000000000..d8d78bbb84
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -0,0 +1,59 @@
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
+
+public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase {
+
+ private SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+
+ public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException {
+ ServerMessage message = new MockAMQMessage(msgId);
+ return queueEntryList.add(message);
+ }
+
+ public void testCompareTo()
+ {
+ assertTrue(_queueEntry.compareTo(_queueEntry2) < 0);
+ assertTrue(_queueEntry2.compareTo(_queueEntry3) < 0);
+ assertTrue(_queueEntry.compareTo(_queueEntry3) < 0);
+
+ assertTrue(_queueEntry2.compareTo(_queueEntry) > 0);
+ assertTrue(_queueEntry3.compareTo(_queueEntry2) > 0);
+ assertTrue(_queueEntry3.compareTo(_queueEntry) > 0);
+
+ assertTrue(_queueEntry.compareTo(_queueEntry) == 0);
+ assertTrue(_queueEntry2.compareTo(_queueEntry2) == 0);
+ assertTrue(_queueEntry3.compareTo(_queueEntry3) == 0);
+ }
+
+ public void testTraverseWithNoDeletedEntries()
+ {
+ QueueEntry current = _queueEntry;
+
+ current = current.getNextValidEntry();
+ assertSame("Unexpected current entry",_queueEntry2, current);
+
+ current = current.getNextValidEntry();
+ assertSame("Unexpected current entry",_queueEntry3, current);
+
+ current = current.getNextValidEntry();
+ assertNull(current);
+
+ }
+
+ public void testTraverseWithDeletedEntries()
+ {
+ // Delete 2nd queue entry
+ _queueEntry2.delete();
+ assertTrue(_queueEntry2.isDeleted());
+
+ QueueEntry current = _queueEntry;
+
+ current = current.getNextValidEntry();
+ assertSame("Unexpected current entry",_queueEntry3, current);
+
+ current = current.getNextValidEntry();
+ assertNull(current);
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
index 7136f07ca5..f3ba6a5495 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
@@ -22,21 +22,28 @@ package org.apache.qpid.server.queue;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
-import junit.framework.TestCase;
-
-public class SimpleQueueEntryListTest extends TestCase
+public class SimpleQueueEntryListTest extends QueueEntryListTestBase
{
+ private SimpleQueueEntryList _sqel;
+
private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count";
String oldScavengeValue = null;
-
+
@Override
protected void setUp()
{
oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9");
+ _sqel = new SimpleQueueEntryList(_testQueue);
+ for(int i = 1; i <= 100; i++)
+ {
+ final ServerMessage msg = new MockAMQMessage(i);
+ final QueueEntry bleh = _sqel.add(msg);
+ assertNotNull("QE should not have been null", bleh);
+ }
}
@Override
@@ -52,19 +59,28 @@ public class SimpleQueueEntryListTest extends TestCase
}
}
- /**
- * Tests the behavior of the next(QueuyEntry) method.
- */
- public void testNext() throws Exception
+ @Override
+ public QueueEntryList getTestList()
{
- SimpleQueueEntryList sqel = new SimpleQueueEntryList(null);
- int i = 0;
+ return _sqel;
+ }
+
+ @Override
+ public long getExpectedFirstMsgId()
+ {
+ return 1;
+ }
- QueueEntry queueEntry1 = sqel.add(new MockAMQMessage(i++));
- QueueEntry queueEntry2 = sqel.add(new MockAMQMessage(i++));
+ @Override
+ public int getExpectedListLength()
+ {
+ return 100;
+ }
- assertSame(queueEntry2, sqel.next(queueEntry1));
- assertNull(sqel.next(queueEntry2));
+ @Override
+ public AMQMessage getTestMessageToAdd() throws AMQException
+ {
+ return new MockAMQMessage(1l);
}
public void testScavenge() throws Exception
@@ -82,7 +98,7 @@ public class SimpleQueueEntryListTest extends TestCase
entriesMap.put(i,bleh);
}
- QueueEntryImpl head = ((QueueEntryImpl) sqel.getHead());
+ SimpleQueueEntryImpl head = sqel.getHead();
//We shall now delete some specific messages mid-queue that will lead to
//requiring a scavenge once the requested threshold of 9 deletes is passed
@@ -99,11 +115,10 @@ public class SimpleQueueEntryListTest extends TestCase
assertTrue("Failed to delete QueueEntry", entriesMap.remove(14).delete());
verifyDeletedButPresentBeforeScavenge(head, 14);
-
//Delete message 20 only
assertTrue("Failed to delete QueueEntry", entriesMap.remove(20).delete());
verifyDeletedButPresentBeforeScavenge(head, 20);
-
+
//Delete messages 81 to 84
assertTrue("Failed to delete QueueEntry", entriesMap.remove(81).delete());
verifyDeletedButPresentBeforeScavenge(head, 81);
@@ -113,35 +128,35 @@ public class SimpleQueueEntryListTest extends TestCase
verifyDeletedButPresentBeforeScavenge(head, 83);
assertTrue("Failed to delete QueueEntry", entriesMap.remove(84).delete());
verifyDeletedButPresentBeforeScavenge(head, 84);
-
+
//Delete message 99 - this is the 10th message deleted that is after the queue head
//and so will invoke the scavenge() which is set to go after 9 previous deletions
assertTrue("Failed to delete QueueEntry", entriesMap.remove(99).delete());
verifyAllDeletedMessagedNotPresent(head, entriesMap);
}
-
- private void verifyDeletedButPresentBeforeScavenge(QueueEntryImpl head, long messageId)
+
+ private void verifyDeletedButPresentBeforeScavenge(SimpleQueueEntryImpl head, long messageId)
{
//Use the head to get the initial entry in the queue
- QueueEntryImpl entry = head._next;
-
+ SimpleQueueEntryImpl entry = head.getNextNode();
+
for(long i = 1; i < messageId ; i++)
{
assertEquals("Expected QueueEntry was not found in the list", i, (long) entry.getMessage().getMessageNumber());
- entry = entry._next;
+ entry = entry.getNextNode();
}
-
+
assertTrue("Entry should have been deleted", entry.isDeleted());
}
-
- private void verifyAllDeletedMessagedNotPresent(QueueEntryImpl head, Map<Integer,QueueEntry> remainingMessages)
+
+ private void verifyAllDeletedMessagedNotPresent(SimpleQueueEntryImpl head, Map<Integer,QueueEntry> remainingMessages)
{
//Use the head to get the initial entry in the queue
- QueueEntryImpl entry = head._next;
-
+ SimpleQueueEntryImpl entry = head.getNextNode();
+
assertNotNull("Initial entry should not have been null", entry);
-
+
int count = 0;
while (entry != null)
@@ -149,62 +164,56 @@ public class SimpleQueueEntryListTest extends TestCase
assertFalse("Entry " + entry.getMessage().getMessageNumber() + " should not have been deleted", entry.isDeleted());
assertNotNull("QueueEntry was not found in the list of remaining entries",
remainingMessages.get(entry.getMessage().getMessageNumber().intValue()));
-
+
count++;
- entry = entry._next;
+ entry = entry.getNextNode();
}
-
+
assertEquals("Count should have been equal",count,remainingMessages.size());
}
- public void testDequedMessagedNotPresentInIterator()
+ public void testGettingNextElement()
{
- int numberOfMessages = 10;
- SimpleQueueEntryList entryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
- QueueEntry[] entries = new QueueEntry[numberOfMessages];
+ final int numberOfEntries = 5;
+ final SimpleQueueEntryImpl[] entries = new SimpleQueueEntryImpl[numberOfEntries];
+ final SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
- for(int i = 0; i < numberOfMessages ; i++)
+ // create test entries
+ for(int i = 0; i < numberOfEntries; i++)
{
- AMQMessage message = null;;
- try
- {
- message = new MockAMQMessage(i);
- }
- catch (AMQException e)
- {
- fail("Failure to create a mock message:" + e.getMessage());
- }
- QueueEntry entry = entryList.add(message);
- assertNotNull("QE should not be null", entry);
- entries[i]= entry;
+ AMQMessage message = new MockAMQMessage(i);
+ entries[i] = queueEntryList.add(message);
}
- // dequeue all even messages
- for (QueueEntry queueEntry : entries)
+ // test getNext for not acquired entries
+ for(int i = 0; i < numberOfEntries; i++)
{
- long i = ((AMQMessage)queueEntry.getMessage()).getMessageId().longValue();
- if (i%2 == 0)
+ final SimpleQueueEntryImpl next = entries[i].getNextValidEntry();
+
+ if(i < numberOfEntries - 1)
+ {
+ assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next);
+ }
+ else
{
- queueEntry.acquire();
- queueEntry.dequeue();
+ assertNull("The next entry after the last should be null", next);
}
}
- // iterate and check that dequeued messages are not returned by iterator
- QueueEntryIterator it = entryList.iterator();
- int counter = 0;
- int i = 1;
- while (it.advance())
- {
- QueueEntry entry = it.getNode();
- Long id = ((AMQMessage)entry.getMessage()).getMessageId();
- assertEquals("Expected message with id " + i + " but got message with id "
- + id, new Long(i), id);
- counter++;
- i += 2;
- }
- int expectedNumber = numberOfMessages / 2;
- assertEquals("Expected " + expectedNumber + " number of entries in iterator but got " + counter,
- expectedNumber, counter);
+ // delete second
+ entries[1].acquire();
+ entries[1].delete();
+
+ // dequeue third
+ entries[2].acquire();
+ entries[2].dequeue();
+
+ SimpleQueueEntryImpl next = entries[2].getNextValidEntry();
+ assertEquals("expected forth entry", entries[3], next);
+ next = next.getNextValidEntry();
+ assertEquals("expected fifth entry", entries[4], next);
+ next = next.getNextValidEntry();
+ assertNull("The next entry after the last should be null", next);
}
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java
new file mode 100644
index 0000000000..43fb5b4cb3
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java
@@ -0,0 +1,62 @@
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
+
+public class SortedQueueEntryImplTest extends QueueEntryImplTestBase {
+
+ public final static String keys[] = { "CCC", "AAA", "BBB" };
+
+ private SelfValidatingSortedQueueEntryList queueEntryList = new SelfValidatingSortedQueueEntryList(new MockAMQQueue("test"),"KEY");
+
+ public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException {
+ final ServerMessage message = new MockAMQMessage(msgId, "KEY", keys[msgId-1]);
+ return queueEntryList.add(message);
+ }
+
+ public void testCompareTo()
+ {
+ assertTrue(_queueEntry.compareTo(_queueEntry2) > 0);
+ assertTrue(_queueEntry.compareTo(_queueEntry3) > 0);
+
+ assertTrue(_queueEntry2.compareTo(_queueEntry3) < 0);
+ assertTrue(_queueEntry2.compareTo(_queueEntry) < 0);
+
+ assertTrue(_queueEntry3.compareTo(_queueEntry2) > 0);
+ assertTrue(_queueEntry3.compareTo(_queueEntry) < 0);
+
+ assertTrue(_queueEntry.compareTo(_queueEntry) == 0);
+ assertTrue(_queueEntry2.compareTo(_queueEntry2) == 0);
+ assertTrue(_queueEntry3.compareTo(_queueEntry3) == 0);
+ }
+
+ public void testTraverseWithNoDeletedEntries()
+ {
+ QueueEntry current = _queueEntry2;
+
+ current = current.getNextValidEntry();
+ assertSame("Unexpected current entry",_queueEntry3, current);
+
+ current = current.getNextValidEntry();
+ assertSame("Unexpected current entry",_queueEntry, current);
+
+ current = current.getNextValidEntry();
+ assertNull(current);
+
+ }
+
+ public void testTraverseWithDeletedEntries()
+ {
+ // Delete 2nd queue entry
+ _queueEntry3.delete();
+ assertTrue(_queueEntry3.isDeleted());
+
+ QueueEntry current = _queueEntry2;
+
+ current = current.getNextValidEntry();
+ assertSame("Unexpected current entry",_queueEntry, current);
+
+ current = current.getNextValidEntry();
+ assertNull(current);
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
new file mode 100644
index 0000000000..eca845644e
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
@@ -0,0 +1,323 @@
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.AMQMessage;
+
+import java.util.Arrays;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
+
+public class SortedQueueEntryListTest extends QueueEntryListTestBase
+{
+ private static SelfValidatingSortedQueueEntryList _sqel;
+
+ public final static String keys[] = { " 73", " 18", " 11", "127", "166", "163", " 69", " 60", "191", "144",
+ " 17", "161", "145", "140", "157", " 47", "136", " 56", "176", " 81",
+ "195", " 96", " 2", " 68", "101", "141", "159", "187", "149", " 45",
+ " 64", "100", " 83", " 51", " 79", " 82", "180", " 26", " 61", " 62",
+ " 78", " 46", "147", " 91", "120", "164", " 92", "172", "188", " 50",
+ "111", " 89", " 4", " 8", " 16", "151", "122", "178", " 33", "124",
+ "171", "165", "116", "113", "155", "148", " 29", " 0", " 37", "131",
+ "146", " 57", "112", " 97", " 23", "108", "123", "117", "167", " 52",
+ " 98", " 6", "160", " 25", " 49", " 34", "182", "185", " 30", " 66",
+ "152", " 58", " 86", "118", "189", " 84", " 36", "104", " 7", " 76",
+ " 87", " 1", " 80", " 10", "142", " 59", "137", " 12", " 67", " 22",
+ " 9", "106", " 75", "109", " 93", " 42", "177", "134", " 77", " 88",
+ "114", " 43", "143", "135", " 55", "181", " 32", "174", "175", "184",
+ "133", "107", " 28", "126", "103", " 85", " 38", "158", " 39", "162",
+ "129", "194", " 15", " 24", " 19", " 35", "186", " 31", " 65", " 99",
+ "192", " 74", "156", " 27", " 95", " 54", " 70", " 13", "110", " 41",
+ " 90", "173", "125", "196", "130", "183", "102", "190", "132", "105",
+ " 21", " 53", "139", " 94", "115", " 48", " 44", "179", "128", " 14",
+ " 72", "119", "153", "168", "197", " 40", "150", "138", " 5", "154",
+ "169", " 71", "199", "198", "170", " 3", "121", " 20", " 63", "193" };
+
+ public final static String textkeys[] = { "AAA", "BBB", "CCC", "DDD", "EEE", "FFF", "GGG", "HHH", "III", "JJJ",
+ "KKK", "LLL", "MMM", "NNN", "OOO", "PPP", "QQQ", "RRR", "SSS", "TTT",
+ "UUU", "VVV", "XXX", "YYY", "ZZZ"};
+
+ private final static String keysSorted[] = keys.clone();
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ // Create result array
+ Arrays.sort(keysSorted);
+
+ // Create test list
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ // Build test list
+ long messageId = 0L;
+ for(final String key : keys)
+ {
+ final ServerMessage msg = generateTestMessage(messageId++, key);
+ _sqel.add(msg);
+ }
+
+ }
+
+ public QueueEntryList getTestList()
+ {
+ return _sqel;
+ }
+
+ public int getExpectedListLength()
+ {
+ return keys.length;
+ }
+
+ public long getExpectedFirstMsgId()
+ {
+ return 67L;
+ }
+
+ public ServerMessage getTestMessageToAdd() throws AMQException
+ {
+ return generateTestMessage(1, "test value");
+ }
+
+ private ServerMessage generateTestMessage(final long id, final String keyValue) throws AMQException
+ {
+ return new AMQMessage(new MockStoredMessage(id, "KEY", keyValue));
+ }
+
+ public void testIterator()
+ {
+ super.testIterator();
+
+ // Test sorted order of list
+ final QueueEntryIterator<?> iter = getTestList().iterator();
+ int count = 0;
+ while(iter.advance())
+ {
+ assertEquals("Sorted queue entry value does not match sorted key array",
+ keysSorted[count++], getSortedKeyValue(iter));
+ }
+ }
+
+ private Object getSortedKeyValue(QueueEntryIterator<?> iter)
+ {
+ return ((SortedQueueEntryImpl) iter.getNode()).getMessage().getMessageHeader().getHeader("KEY");
+ }
+
+ private Long getMessageId(QueueEntryIterator<?> iter)
+ {
+ return ((SortedQueueEntryImpl) iter.getNode()).getMessage().getMessageNumber();
+ }
+
+ public void testNonUniqueSortKeys() throws Exception
+ {
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ // Build test list
+ long messageId = 0L;
+ while(messageId < 200)
+ {
+ final ServerMessage msg = generateTestMessage(messageId++, "samekey");
+ _sqel.add(msg);
+ }
+
+ final QueueEntryIterator<?> iter = getTestList().iterator();
+ int count=0;
+ while(iter.advance())
+ {
+ assertEquals("Sorted queue entry value is not as expected", "samekey", getSortedKeyValue(iter));
+ assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter));
+ }
+ }
+
+ public void testNullSortKeys() throws Exception
+ {
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ // Build test list
+ long messageId = 0L;
+ while(messageId < 200)
+ {
+ final ServerMessage msg = generateTestMessage(messageId++, null);
+ _sqel.add(msg);
+ }
+
+ final QueueEntryIterator<?> iter = getTestList().iterator();
+ int count=0;
+ while(iter.advance())
+ {
+ assertNull("Sorted queue entry value is not as expected", getSortedKeyValue(iter));
+ assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter)); }
+ }
+
+ public void testAscendingSortKeys() throws Exception
+ {
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ // Build test list
+ long messageId = 0L;
+ for(String textKey : textkeys)
+ {
+ final ServerMessage msg = generateTestMessage(messageId, textKey);
+ messageId++;
+ _sqel.add(msg);
+ }
+
+ final QueueEntryIterator<?> iter = getTestList().iterator();
+ int count=0;
+ while(iter.advance())
+ {
+ assertEquals("Sorted queue entry value is not as expected", textkeys[count], getSortedKeyValue(iter));
+ assertEquals("Message id not as expected", Long.valueOf(count), getMessageId(iter));
+ count++;
+ }
+ }
+
+ public void testDescendingSortKeys() throws Exception
+ {
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ // Build test list
+ long messageId = 0L;
+ for(int i=textkeys.length-1; i >=0; i--)
+ {
+ final ServerMessage msg = generateTestMessage(messageId, textkeys[i]);
+ messageId++;
+ _sqel.add(msg);
+ }
+
+ final QueueEntryIterator<?> iter = getTestList().iterator();
+ int count=0;
+ while(iter.advance())
+ {
+ assertEquals("Sorted queue entry value is not as expected", textkeys[count], getSortedKeyValue(iter));
+ assertEquals("Message id not as expected", Long.valueOf(textkeys.length-count-1), getMessageId(iter));
+ count++;
+ }
+ }
+
+ public void testInsertAfter() throws Exception
+ {
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ ServerMessage msg = generateTestMessage(1, "A");
+ _sqel.add(msg);
+
+ SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "A", 1);
+
+ msg = generateTestMessage(2, "B");
+ _sqel.add(msg);
+
+ entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "A", 1);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "B", 2);
+ }
+
+ public void testInsertBefore() throws Exception
+ {
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ ServerMessage msg = generateTestMessage(1, "B");
+ _sqel.add(msg);
+
+ SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "B", 1);
+
+ msg = generateTestMessage(2, "A");
+ _sqel.add(msg);
+
+ entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "A", 2);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "B", 1);
+ }
+
+ public void testInsertInbetween() throws Exception
+ {
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ ServerMessage msg = generateTestMessage(1, "A");
+ _sqel.add(msg);
+ SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "A", 1);
+
+ msg = generateTestMessage(2, "C");
+ _sqel.add(msg);
+
+ entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "A", 1);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "C", 2);
+
+ msg = generateTestMessage(3, "B");
+ _sqel.add(msg);
+
+ entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "A", 1);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "B", 3);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "C", 2);
+ }
+
+ public void testInsertAtHead() throws Exception
+ {
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ ServerMessage msg = generateTestMessage(1, "B");
+ _sqel.add(msg);
+
+ SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "B", 1);
+
+ msg = generateTestMessage(2, "D");
+ _sqel.add(msg);
+
+ entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "B", 1);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "D", 2);
+
+ msg = generateTestMessage(3, "C");
+ _sqel.add(msg);
+
+ entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "B", 1);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "C", 3);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "D", 2);
+
+ msg = generateTestMessage(4, "A");
+ _sqel.add(msg);
+
+ entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "A", 4);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "B", 1);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "C", 3);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "D", 2);
+ }
+
+ private void validateEntry(final SortedQueueEntryImpl entry, final String expectedSortKey, final long expectedMessageId)
+ {
+ assertEquals("Sorted queue entry value is not as expected",
+ expectedSortKey, entry.getMessage().getMessageHeader().getHeader("KEY"));
+ assertEquals("Sorted queue entry id is not as expected",
+ Long.valueOf(expectedMessageId), entry.getMessage().getMessageNumber());
+ }
+
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 3acd064fd7..1d0a9d6316 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -689,7 +689,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
if (usePriority)
{
queueArguments = new FieldTable();
- queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
+ queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
}
if (lastValueQueue)
@@ -767,7 +767,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
private void bindAllQueuesToExchange(Exchange exchange, AMQShortString routingKey)
{
FieldTable queueArguments = new FieldTable();
- queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
+ queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
@@ -781,7 +781,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
private void bindAllTopicQueuesToExchange(Exchange exchange, AMQShortString routingKey)
{
FieldTable queueArguments = new FieldTable();
- queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
+ queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();