diff options
| author | Keith Wall <kwall@apache.org> | 2011-11-20 23:37:48 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2011-11-20 23:37:48 +0000 |
| commit | a0fccd9dbfabd912a906225f817cd00072d7fc8d (patch) | |
| tree | 71d457039d047e12ee5bec2f6cc98f457f050bec /java/broker/src/test | |
| parent | 33db3cab8c1ec8d82045e557b190a98a2418c565 (diff) | |
| download | qpid-python-a0fccd9dbfabd912a906225f817cd00072d7fc8d.tar.gz | |
QPID-3622: Add Sorted Queue funtionality
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1204295 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/test')
16 files changed, 1019 insertions, 271 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java index d2f2ae5eea..9941c00499 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java @@ -129,7 +129,19 @@ public class QueueConfigurationTest extends TestCase assertEquals(1, qConf.getMinimumAlertRepeatGap()); } - private VirtualHostConfiguration overrideConfiguration(String property, int value) + public void testSortQueueConfiguration() throws ConfigurationException + { + //Check default value + QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf); + assertNull(qConf.getQueueSortKey()); + + // Check explicit value + final VirtualHostConfiguration vhostConfig = overrideConfiguration("sortKey", "test-sort-key"); + qConf = new QueueConfiguration("test", vhostConfig); + assertEquals("test-sort-key", qConf.getQueueSortKey()); + } + + private VirtualHostConfiguration overrideConfiguration(String property, Object value) throws ConfigurationException { PropertiesConfiguration queueConfig = new PropertiesConfiguration(); @@ -141,5 +153,4 @@ public class QueueConfigurationTest extends TestCase return new VirtualHostConfiguration("test", config); } - } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 3b7f5f3a51..7b73987abf 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -20,9 +20,17 @@ */ package org.apache.qpid.server.exchange; -import junit.framework.TestCase; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -52,17 +60,6 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.util.InternalBrokerBaseCase; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase { private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class); @@ -483,6 +480,16 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase { return false; } + + public QueueEntry getNextNode() + { + return null; + } + + public QueueEntry getNextValidEntry() + { + return null; + } }; if(action != null) diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index 3961b3b355..2ce43052d9 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -21,12 +21,12 @@ package org.apache.qpid.server.queue; */ import java.util.ArrayList; - +import junit.framework.AssertionFailedError; import org.apache.qpid.AMQException; -import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; -import junit.framework.AssertionFailedError; +import org.apache.qpid.server.message.AMQMessage; public class AMQPriorityQueueTest extends SimpleAMQQueueTest { @@ -35,7 +35,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest public void setUp() throws Exception { _arguments = new FieldTable(); - _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, 3); + _arguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 3); super.setUp(); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java index 7000df157e..12369bd7d4 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java @@ -20,24 +20,23 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.AMQException; import org.apache.qpid.server.message.AMQMessage; public class MockAMQMessage extends AMQMessage { public MockAMQMessage(long messageId) - throws AMQException { super(new MockStoredMessage(messageId)); } - - + public MockAMQMessage(long messageId, String headerName, Object headerValue) + { + super(new MockStoredMessage(messageId, headerName, headerValue)); + } @Override public long getSize() { return 0l; } - } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index ab8850c18c..864b9ad368 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -234,4 +234,14 @@ public class MockQueueEntry implements QueueEntry return false; } + public QueueEntry getNextNode() + { + return null; + } + + public QueueEntry getNextValidEntry() + { + return null; + } + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java index 7dc491de4d..78ed3e9f34 100755 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.framing.FieldTable; + import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TransactionLog; import org.apache.qpid.server.store.StoredMessage; @@ -36,18 +38,32 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData> private MessageMetaData _metaData; private final ByteBuffer _content; - public MockStoredMessage(long messageId) { - this(messageId, new MockMessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties(), 60)); + this(messageId, (String)null, null); + } + + public MockStoredMessage(long messageId, String headerName, Object headerValue) + { + this(messageId, new MockMessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties(), 60), headerName, headerValue); } public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb) { + this(messageId, info, chb, null, null); + } + + public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb, String headerName, Object headerValue) + { _messageId = messageId; + if (headerName != null) + { + FieldTable headers = new FieldTable(); + headers.setString(headerName, headerValue == null? null :String.valueOf(headerValue)); + ((BasicContentHeaderProperties)chb.getProperties()).setHeaders(headers); + } _metaData = new MessageMetaData(info, chb, 0); _content = ByteBuffer.allocate(_metaData.getContentSize()); - } public MessageMetaData getMetaData() diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index d8afd8d829..d336132316 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -19,9 +19,7 @@ package org.apache.qpid.server.queue; import java.lang.reflect.Field; - import junit.framework.TestCase; - import org.apache.qpid.AMQException; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.queue.QueueEntry.EntryState; @@ -30,18 +28,27 @@ import org.apache.qpid.server.subscription.Subscription; /** * Tests for {@link QueueEntryImpl} - * */ -public class QueueEntryImplTest extends TestCase +public abstract class QueueEntryImplTestBase extends TestCase { // tested entry - private QueueEntryImpl _queueEntry; + protected QueueEntryImpl _queueEntry; + protected QueueEntryImpl _queueEntry2; + protected QueueEntryImpl _queueEntry3; + + public abstract QueueEntryImpl getQueueEntryImpl(int msgid) throws AMQException; + + public abstract void testCompareTo(); + + public abstract void testTraverseWithNoDeletedEntries(); + + public abstract void testTraverseWithDeletedEntries(); public void setUp() throws Exception { - AMQMessage message = new MockAMQMessage(1); - SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); - _queueEntry = new QueueEntryImpl(queueEntryList, message, 1); + _queueEntry = getQueueEntryImpl(1); + _queueEntry2 = getQueueEntryImpl(2); + _queueEntry3 = getQueueEntryImpl(3); } public void testAquire() @@ -105,61 +112,6 @@ public class QueueEntryImplTest extends TestCase } /** - * Tests if entries in DEQUQUED or DELETED state are not returned by getNext method. - */ - public void testGetNext() - { - int numberOfEntries = 5; - QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries]; - SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); - - // create test entries - for(int i = 0; i < numberOfEntries ; i++) - { - AMQMessage message = null;; - try - { - message = new MockAMQMessage(i); - } - catch (AMQException e) - { - fail("Failure to create a mock message:" + e.getMessage()); - } - QueueEntryImpl entry = (QueueEntryImpl)queueEntryList.add(message); - entries[i] = entry; - } - - // test getNext for not acquired entries - for(int i = 0; i < numberOfEntries ; i++) - { - QueueEntryImpl queueEntry = entries[i]; - QueueEntryImpl next = queueEntry.getNext(); - if (i < numberOfEntries - 1) - { - assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next); - } - else - { - assertNull("The next entry after the last should be null", next); - } - } - - // delete second - entries[1].acquire(); - entries[1].delete(); - - // dequeue third - entries[2].acquire(); - entries[2].dequeue(); - - QueueEntryImpl next = entries[0].getNext(); - assertEquals("expected forth entry",entries[3], next); - next = next.getNext(); - assertEquals("expected fifth entry", entries[4], next); - next = next.getNext(); - assertNull("The next entry after the last should be null", next); - } - /** * A helper method to put tested object into deleted state and assert the state */ private void delete() @@ -244,4 +196,52 @@ public class QueueEntryImplTest extends TestCase assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId)); assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id)); } + + /** + * Tests if entries in DEQUQUED or DELETED state are not returned by getNext method. + */ + public void testGetNext() + { + int numberOfEntries = 5; + QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries]; + SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + + // create test entries + for(int i = 0; i < numberOfEntries ; i++) + { + AMQMessage message = new MockAMQMessage(i); + QueueEntryImpl entry = (QueueEntryImpl)queueEntryList.add(message); + entries[i] = entry; + } + + // test getNext for not acquired entries + for(int i = 0; i < numberOfEntries ; i++) + { + QueueEntryImpl queueEntry = entries[i]; + QueueEntry next = queueEntry.getNextValidEntry(); + if (i < numberOfEntries - 1) + { + assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next); + } + else + { + assertNull("The next entry after the last should be null", next); + } + } + + // delete second + entries[1].acquire(); + entries[1].delete(); + + // dequeue third + entries[2].acquire(); + entries[2].dequeue(); + + QueueEntry next = entries[0].getNextValidEntry(); + assertEquals("expected forth entry",entries[3], next); + next = next.getNextValidEntry(); + assertEquals("expected fifth entry", entries[4], next); + next = next.getNextValidEntry(); + assertNull("The next entry after the last should be null", next); + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java new file mode 100644 index 0000000000..7a3f6f701c --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.ServerMessage; + +/** + * Abstract test class for QueueEntryList implementations. + */ +public abstract class QueueEntryListTestBase extends TestCase +{ + protected static final AMQQueue _testQueue = new MockAMQQueue("test"); + public abstract QueueEntryList<QueueEntry> getTestList(); + public abstract long getExpectedFirstMsgId(); + public abstract int getExpectedListLength(); + public abstract ServerMessage getTestMessageToAdd() throws AMQException; + + public void testGetQueue() + { + assertEquals("Unexpected head entry returned by getHead()", getTestList().getQueue(), _testQueue); + } + + /** + * Test to add a message with properties specific to the queue type. + * @see QueueEntryListTestBase#getTestList() + * @see QueueEntryListTestBase#getTestMessageToAdd() + * @throws AMQException + */ + public void testAddSpecificMessage() throws AMQException + { + final QueueEntryList<QueueEntry> list = getTestList(); + list.add(getTestMessageToAdd()); + + final QueueEntryIterator<?> iter = list.iterator(); + int count = 0; + while(iter.advance()) + { + iter.getNode(); + count++; + } + assertEquals("List did not grow by one entry after an add", getExpectedListLength() + 1, count); + } + + /** + * Test to add a generic mock message. + * @see QueueEntryListTestBase#getTestList() + * @see QueueEntryListTestBase#getExpectedListLength() + * @see MockAMQMessage + * @throws AMQException + */ + public void testAddGenericMessage() throws AMQException + { + final QueueEntryList<QueueEntry> list = getTestList(); + list.add(new MockAMQMessage(666)); + + final QueueEntryIterator<?> iter = list.iterator(); + int count = 0; + while(iter.advance()) + { + iter.getNode(); + count++; + } + assertEquals("List did not grow by one entry after a generic message added", getExpectedListLength() + 1, count); + + } + + /** + * Test for getting the next element in a queue list. + * @see QueueEntryListTestBase#getTestList() + * @see QueueEntryListTestBase#getExpectedListLength() + */ + public void testListNext() + { + final QueueEntryList<QueueEntry> entryList = getTestList(); + QueueEntry entry = entryList.getHead(); + int count = 0; + while(entryList.next(entry) != null) + { + entry = entryList.next(entry); + count++; + } + assertEquals("Get next didnt get all the list entries", getExpectedListLength(), count); + } + + /** + * Basic test for the associated QueueEntryIterator implementation. + * @see QueueEntryListTestBase#getTestList() + * @see QueueEntryListTestBase#getExpectedListLength() + */ + public void testIterator() + { + final QueueEntryIterator<?> iter = getTestList().iterator(); + int count = 0; + while(iter.advance()) + { + iter.getNode(); + count++; + } + assertEquals("Iterator invalid", getExpectedListLength(), count); + } + + /** + * Test for associated QueueEntryIterator implementation that checks it handles "removed" messages. + * @see QueueEntryListTestBase#getTestList() + * @see QueueEntryListTestBase#getExpectedListLength() + */ + public void testDequedMessagedNotPresentInIterator() throws Exception + { + final int numberOfMessages = getExpectedListLength(); + final QueueEntryList<QueueEntry> entryList = getTestList(); + + // dequeue all even messages + final QueueEntryIterator<?> it1 = entryList.iterator(); + int counter = 0; + while (it1.advance()) + { + final QueueEntry queueEntry = it1.getNode(); + if(counter++ % 2 == 0) + { + queueEntry.acquire(); + queueEntry.dequeue(); + } + } + + // iterate and check that dequeued messages are not returned by iterator + final QueueEntryIterator<?> it2 = entryList.iterator(); + int counter2 = 0; + while(it2.advance()) + { + it2.getNode(); + counter2++; + } + final int expectedNumber = numberOfMessages / 2; + assertEquals("Expected " + expectedNumber + " number of entries in iterator but got " + counter2, + expectedNumber, counter2); + } + + /** + * Test to verify the head of the queue list is returned as expected. + * @see QueueEntryListTestBase#getTestList() + * @see QueueEntryListTestBase#getExpectedFirstMsgId() + */ + public void testGetHead() + { + final QueueEntry head = getTestList().getHead(); + assertNull("Head entry should not contain an actual message", head.getMessage()); + assertEquals("Unexpected message id for first list entry", getExpectedFirstMsgId(), getTestList().next(head) + .getMessage().getMessageNumber().longValue()); + } + + /** + * Test to verify the entry deletion handled correctly. + * @see QueueEntryListTestBase#getTestList() + */ + public void testEntryDeleted() + { + final QueueEntry head = getTestList().getHead(); + + final QueueEntry first = getTestList().next(head); + first.delete(); + + final QueueEntry second = getTestList().next(head); + assertNotSame("After deletion the next entry should be different", first.getMessage().getMessageNumber(), second + .getMessage().getMessageNumber()); + + final QueueEntry third = getTestList().next(first); + assertEquals("After deletion the deleted nodes next node should be the same as the next from head", second + .getMessage().getMessageNumber(), third.getMessage().getMessageNumber()); + } + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java deleted file mode 100644 index b67723dd25..0000000000 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.test.utils.QpidTestCase; - -/** - * - * Tests QueueEntry - * - */ -public class QueueEntryTest extends QpidTestCase -{ - private QueueEntryImpl _queueEntry1 = null; - private QueueEntryImpl _queueEntry2 = null; - private QueueEntryImpl _queueEntry3 = null; - - @Override - protected void setUp() throws Exception - { - super.setUp(); - - int i = 0; - - SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(null); - _queueEntry1 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++)); - _queueEntry2 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++)); - _queueEntry3 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++)); - } - - public void testCompareTo() - { - assertTrue(_queueEntry1.compareTo(_queueEntry2) < 0); - assertTrue(_queueEntry2.compareTo(_queueEntry1) > 0); - assertTrue(_queueEntry1.compareTo(_queueEntry1) == 0); - } - - /** - * Tests that the getNext() can be used to traverse the list. - */ - public void testTraverseWithNoDeletedEntries() - { - QueueEntryImpl current = _queueEntry1; - - current = current.getNext(); - assertSame("Unexpected current entry",_queueEntry2, current); - - current = current.getNext(); - assertSame("Unexpected current entry",_queueEntry3, current); - - current = current.getNext(); - assertNull(current); - - } - - /** - * Tests that the getNext() can be used to traverse the list but deleted - * entries are skipped and de-linked from the chain of entries. - */ - public void testTraverseWithDeletedEntries() - { - // Delete 2nd queue entry - _queueEntry2.delete(); - assertTrue(_queueEntry2.isDeleted()); - - - QueueEntryImpl current = _queueEntry1; - - current = current.getNext(); - assertSame("Unexpected current entry",_queueEntry3, current); - - current = current.getNext(); - assertNull(current); - - // Assert the side effects of getNext() - assertSame("Next node of entry 1 should now be entry 3", - _queueEntry3, _queueEntry1.nextNode()); - } -} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java new file mode 100644 index 0000000000..7ff693e4c4 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.Assert; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour; + +/** + * Test extension of SortedQueueEntryList that provides data structure validation tests. + * @see SortedQueueEntryList + */ +public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList +{ + public SelfValidatingSortedQueueEntryList(AMQQueue queue, String propertyName) + { + super(queue, propertyName); + } + + @Override /** Overridden to automatically check queue properties before and after. */ + public SortedQueueEntryImpl add(final ServerMessage message) + { + assertQueueProperties(); //before add + final SortedQueueEntryImpl result = super.add(message); + assertQueueProperties(); //after add + return result; + } + + @Override /** Overridden to automatically check queue properties before and after. */ + public void entryDeleted(SortedQueueEntryImpl entry) + { + assertQueueProperties(); //before delete + super.entryDeleted(entry); + assertQueueProperties(); //after delete + } + + public void assertQueueProperties() + { + assertRootIsBlack(); + assertTreeIntegrity(); + assertChildrenOfRedAreBlack(); + assertLeavesSameBlackPath(); + } + + public void assertRootIsBlack() + { + if(!isNodeColour(getRoot(), Colour.BLACK)) + { + Assert.fail("Root Not Black"); + } + } + + public void assertTreeIntegrity() + { + assertTreeIntegrity(getRoot()); + } + + public void assertTreeIntegrity(final SortedQueueEntryImpl node) + { + if(node == null) + { + return; + } + if(node.getLeft() != null) + { + if(node.getLeft().getParent() == node) + { + assertTreeIntegrity(node.getLeft()); + } + else + { + Assert.fail("Tree integrity compromised"); + } + } + if(node.getRight() != null) + { + if(node.getRight().getParent() == node) + { + assertTreeIntegrity(node.getRight()); + } + else + { + Assert.fail("Tree integrity compromised"); + } + + } + } + + public void assertLeavesSameBlackPath() + { + assertLeavesSameBlackPath(getRoot()); + } + + public int assertLeavesSameBlackPath(final SortedQueueEntryImpl node) + { + if(node == null) + { + return 1; + } + final int left = assertLeavesSameBlackPath(node.getLeft()); + final int right = assertLeavesSameBlackPath(node.getLeft()); + if(left == right) + { + return isNodeColour(node, Colour.BLACK) ? 1 + left : left; + } + else + { + Assert.fail("Unequal paths to leaves"); + return 1; //compiler + } + } + + public void assertChildrenOfRedAreBlack() + { + assertChildrenOfRedAreBlack(getRoot()); + } + + public void assertChildrenOfRedAreBlack(final SortedQueueEntryImpl node) + { + if(node == null) + { + return; + } + else if(node.getColour() == Colour.BLACK) + { + assertChildrenOfRedAreBlack(node.getLeft()); + assertChildrenOfRedAreBlack(node.getRight()); + } + else + { + if(isNodeColour(node.getLeft(), Colour.BLACK) + && isNodeColour(node.getRight(), Colour.BLACK)) + { + assertChildrenOfRedAreBlack(node.getLeft()); + assertChildrenOfRedAreBlack(node.getRight()); + } + else + { + Assert.fail("Children of Red are not both black"); + } + } + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index f4cdbbe02c..e4ed232f13 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -21,8 +21,12 @@ package org.apache.qpid.server.queue; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.commons.configuration.PropertiesConfiguration; - import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; @@ -40,6 +44,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction; import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; @@ -51,12 +56,6 @@ import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - public class SimpleAMQQueueTest extends InternalBrokerBaseCase { @@ -1110,9 +1109,9 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase * Entries with even message id are considered * dequeued! */ - protected QueueEntryImpl createQueueEntry(final ServerMessage message) + protected SimpleQueueEntryImpl createQueueEntry(final ServerMessage message) { - return new QueueEntryImpl(this, message) + return new SimpleQueueEntryImpl(this, message) { public boolean isDequeued() { diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java new file mode 100644 index 0000000000..d8d78bbb84 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java @@ -0,0 +1,59 @@ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.ServerMessage; + +public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase { + + private SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + + public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException { + ServerMessage message = new MockAMQMessage(msgId); + return queueEntryList.add(message); + } + + public void testCompareTo() + { + assertTrue(_queueEntry.compareTo(_queueEntry2) < 0); + assertTrue(_queueEntry2.compareTo(_queueEntry3) < 0); + assertTrue(_queueEntry.compareTo(_queueEntry3) < 0); + + assertTrue(_queueEntry2.compareTo(_queueEntry) > 0); + assertTrue(_queueEntry3.compareTo(_queueEntry2) > 0); + assertTrue(_queueEntry3.compareTo(_queueEntry) > 0); + + assertTrue(_queueEntry.compareTo(_queueEntry) == 0); + assertTrue(_queueEntry2.compareTo(_queueEntry2) == 0); + assertTrue(_queueEntry3.compareTo(_queueEntry3) == 0); + } + + public void testTraverseWithNoDeletedEntries() + { + QueueEntry current = _queueEntry; + + current = current.getNextValidEntry(); + assertSame("Unexpected current entry",_queueEntry2, current); + + current = current.getNextValidEntry(); + assertSame("Unexpected current entry",_queueEntry3, current); + + current = current.getNextValidEntry(); + assertNull(current); + + } + + public void testTraverseWithDeletedEntries() + { + // Delete 2nd queue entry + _queueEntry2.delete(); + assertTrue(_queueEntry2.isDeleted()); + + QueueEntry current = _queueEntry; + + current = current.getNextValidEntry(); + assertSame("Unexpected current entry",_queueEntry3, current); + + current = current.getNextValidEntry(); + assertNull(current); + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java index 7136f07ca5..f3ba6a5495 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java @@ -22,21 +22,28 @@ package org.apache.qpid.server.queue; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - import org.apache.qpid.AMQException; import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.ServerMessage; -import junit.framework.TestCase; - -public class SimpleQueueEntryListTest extends TestCase +public class SimpleQueueEntryListTest extends QueueEntryListTestBase { + private SimpleQueueEntryList _sqel; + private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count"; String oldScavengeValue = null; - + @Override protected void setUp() { oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9"); + _sqel = new SimpleQueueEntryList(_testQueue); + for(int i = 1; i <= 100; i++) + { + final ServerMessage msg = new MockAMQMessage(i); + final QueueEntry bleh = _sqel.add(msg); + assertNotNull("QE should not have been null", bleh); + } } @Override @@ -52,19 +59,28 @@ public class SimpleQueueEntryListTest extends TestCase } } - /** - * Tests the behavior of the next(QueuyEntry) method. - */ - public void testNext() throws Exception + @Override + public QueueEntryList getTestList() { - SimpleQueueEntryList sqel = new SimpleQueueEntryList(null); - int i = 0; + return _sqel; + } + + @Override + public long getExpectedFirstMsgId() + { + return 1; + } - QueueEntry queueEntry1 = sqel.add(new MockAMQMessage(i++)); - QueueEntry queueEntry2 = sqel.add(new MockAMQMessage(i++)); + @Override + public int getExpectedListLength() + { + return 100; + } - assertSame(queueEntry2, sqel.next(queueEntry1)); - assertNull(sqel.next(queueEntry2)); + @Override + public AMQMessage getTestMessageToAdd() throws AMQException + { + return new MockAMQMessage(1l); } public void testScavenge() throws Exception @@ -82,7 +98,7 @@ public class SimpleQueueEntryListTest extends TestCase entriesMap.put(i,bleh); } - QueueEntryImpl head = ((QueueEntryImpl) sqel.getHead()); + SimpleQueueEntryImpl head = sqel.getHead(); //We shall now delete some specific messages mid-queue that will lead to //requiring a scavenge once the requested threshold of 9 deletes is passed @@ -99,11 +115,10 @@ public class SimpleQueueEntryListTest extends TestCase assertTrue("Failed to delete QueueEntry", entriesMap.remove(14).delete()); verifyDeletedButPresentBeforeScavenge(head, 14); - //Delete message 20 only assertTrue("Failed to delete QueueEntry", entriesMap.remove(20).delete()); verifyDeletedButPresentBeforeScavenge(head, 20); - + //Delete messages 81 to 84 assertTrue("Failed to delete QueueEntry", entriesMap.remove(81).delete()); verifyDeletedButPresentBeforeScavenge(head, 81); @@ -113,35 +128,35 @@ public class SimpleQueueEntryListTest extends TestCase verifyDeletedButPresentBeforeScavenge(head, 83); assertTrue("Failed to delete QueueEntry", entriesMap.remove(84).delete()); verifyDeletedButPresentBeforeScavenge(head, 84); - + //Delete message 99 - this is the 10th message deleted that is after the queue head //and so will invoke the scavenge() which is set to go after 9 previous deletions assertTrue("Failed to delete QueueEntry", entriesMap.remove(99).delete()); verifyAllDeletedMessagedNotPresent(head, entriesMap); } - - private void verifyDeletedButPresentBeforeScavenge(QueueEntryImpl head, long messageId) + + private void verifyDeletedButPresentBeforeScavenge(SimpleQueueEntryImpl head, long messageId) { //Use the head to get the initial entry in the queue - QueueEntryImpl entry = head._next; - + SimpleQueueEntryImpl entry = head.getNextNode(); + for(long i = 1; i < messageId ; i++) { assertEquals("Expected QueueEntry was not found in the list", i, (long) entry.getMessage().getMessageNumber()); - entry = entry._next; + entry = entry.getNextNode(); } - + assertTrue("Entry should have been deleted", entry.isDeleted()); } - - private void verifyAllDeletedMessagedNotPresent(QueueEntryImpl head, Map<Integer,QueueEntry> remainingMessages) + + private void verifyAllDeletedMessagedNotPresent(SimpleQueueEntryImpl head, Map<Integer,QueueEntry> remainingMessages) { //Use the head to get the initial entry in the queue - QueueEntryImpl entry = head._next; - + SimpleQueueEntryImpl entry = head.getNextNode(); + assertNotNull("Initial entry should not have been null", entry); - + int count = 0; while (entry != null) @@ -149,62 +164,56 @@ public class SimpleQueueEntryListTest extends TestCase assertFalse("Entry " + entry.getMessage().getMessageNumber() + " should not have been deleted", entry.isDeleted()); assertNotNull("QueueEntry was not found in the list of remaining entries", remainingMessages.get(entry.getMessage().getMessageNumber().intValue())); - + count++; - entry = entry._next; + entry = entry.getNextNode(); } - + assertEquals("Count should have been equal",count,remainingMessages.size()); } - public void testDequedMessagedNotPresentInIterator() + public void testGettingNextElement() { - int numberOfMessages = 10; - SimpleQueueEntryList entryList = new SimpleQueueEntryList(new MockAMQQueue("test")); - QueueEntry[] entries = new QueueEntry[numberOfMessages]; + final int numberOfEntries = 5; + final SimpleQueueEntryImpl[] entries = new SimpleQueueEntryImpl[numberOfEntries]; + final SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); - for(int i = 0; i < numberOfMessages ; i++) + // create test entries + for(int i = 0; i < numberOfEntries; i++) { - AMQMessage message = null;; - try - { - message = new MockAMQMessage(i); - } - catch (AMQException e) - { - fail("Failure to create a mock message:" + e.getMessage()); - } - QueueEntry entry = entryList.add(message); - assertNotNull("QE should not be null", entry); - entries[i]= entry; + AMQMessage message = new MockAMQMessage(i); + entries[i] = queueEntryList.add(message); } - // dequeue all even messages - for (QueueEntry queueEntry : entries) + // test getNext for not acquired entries + for(int i = 0; i < numberOfEntries; i++) { - long i = ((AMQMessage)queueEntry.getMessage()).getMessageId().longValue(); - if (i%2 == 0) + final SimpleQueueEntryImpl next = entries[i].getNextValidEntry(); + + if(i < numberOfEntries - 1) + { + assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next); + } + else { - queueEntry.acquire(); - queueEntry.dequeue(); + assertNull("The next entry after the last should be null", next); } } - // iterate and check that dequeued messages are not returned by iterator - QueueEntryIterator it = entryList.iterator(); - int counter = 0; - int i = 1; - while (it.advance()) - { - QueueEntry entry = it.getNode(); - Long id = ((AMQMessage)entry.getMessage()).getMessageId(); - assertEquals("Expected message with id " + i + " but got message with id " - + id, new Long(i), id); - counter++; - i += 2; - } - int expectedNumber = numberOfMessages / 2; - assertEquals("Expected " + expectedNumber + " number of entries in iterator but got " + counter, - expectedNumber, counter); + // delete second + entries[1].acquire(); + entries[1].delete(); + + // dequeue third + entries[2].acquire(); + entries[2].dequeue(); + + SimpleQueueEntryImpl next = entries[2].getNextValidEntry(); + assertEquals("expected forth entry", entries[3], next); + next = next.getNextValidEntry(); + assertEquals("expected fifth entry", entries[4], next); + next = next.getNextValidEntry(); + assertNull("The next entry after the last should be null", next); } + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java new file mode 100644 index 0000000000..43fb5b4cb3 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java @@ -0,0 +1,62 @@ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.ServerMessage; + +public class SortedQueueEntryImplTest extends QueueEntryImplTestBase { + + public final static String keys[] = { "CCC", "AAA", "BBB" }; + + private SelfValidatingSortedQueueEntryList queueEntryList = new SelfValidatingSortedQueueEntryList(new MockAMQQueue("test"),"KEY"); + + public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException { + final ServerMessage message = new MockAMQMessage(msgId, "KEY", keys[msgId-1]); + return queueEntryList.add(message); + } + + public void testCompareTo() + { + assertTrue(_queueEntry.compareTo(_queueEntry2) > 0); + assertTrue(_queueEntry.compareTo(_queueEntry3) > 0); + + assertTrue(_queueEntry2.compareTo(_queueEntry3) < 0); + assertTrue(_queueEntry2.compareTo(_queueEntry) < 0); + + assertTrue(_queueEntry3.compareTo(_queueEntry2) > 0); + assertTrue(_queueEntry3.compareTo(_queueEntry) < 0); + + assertTrue(_queueEntry.compareTo(_queueEntry) == 0); + assertTrue(_queueEntry2.compareTo(_queueEntry2) == 0); + assertTrue(_queueEntry3.compareTo(_queueEntry3) == 0); + } + + public void testTraverseWithNoDeletedEntries() + { + QueueEntry current = _queueEntry2; + + current = current.getNextValidEntry(); + assertSame("Unexpected current entry",_queueEntry3, current); + + current = current.getNextValidEntry(); + assertSame("Unexpected current entry",_queueEntry, current); + + current = current.getNextValidEntry(); + assertNull(current); + + } + + public void testTraverseWithDeletedEntries() + { + // Delete 2nd queue entry + _queueEntry3.delete(); + assertTrue(_queueEntry3.isDeleted()); + + QueueEntry current = _queueEntry2; + + current = current.getNextValidEntry(); + assertSame("Unexpected current entry",_queueEntry, current); + + current = current.getNextValidEntry(); + assertNull(current); + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java new file mode 100644 index 0000000000..eca845644e --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -0,0 +1,323 @@ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.message.AMQMessage; + +import java.util.Arrays; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.ServerMessage; + +public class SortedQueueEntryListTest extends QueueEntryListTestBase +{ + private static SelfValidatingSortedQueueEntryList _sqel; + + public final static String keys[] = { " 73", " 18", " 11", "127", "166", "163", " 69", " 60", "191", "144", + " 17", "161", "145", "140", "157", " 47", "136", " 56", "176", " 81", + "195", " 96", " 2", " 68", "101", "141", "159", "187", "149", " 45", + " 64", "100", " 83", " 51", " 79", " 82", "180", " 26", " 61", " 62", + " 78", " 46", "147", " 91", "120", "164", " 92", "172", "188", " 50", + "111", " 89", " 4", " 8", " 16", "151", "122", "178", " 33", "124", + "171", "165", "116", "113", "155", "148", " 29", " 0", " 37", "131", + "146", " 57", "112", " 97", " 23", "108", "123", "117", "167", " 52", + " 98", " 6", "160", " 25", " 49", " 34", "182", "185", " 30", " 66", + "152", " 58", " 86", "118", "189", " 84", " 36", "104", " 7", " 76", + " 87", " 1", " 80", " 10", "142", " 59", "137", " 12", " 67", " 22", + " 9", "106", " 75", "109", " 93", " 42", "177", "134", " 77", " 88", + "114", " 43", "143", "135", " 55", "181", " 32", "174", "175", "184", + "133", "107", " 28", "126", "103", " 85", " 38", "158", " 39", "162", + "129", "194", " 15", " 24", " 19", " 35", "186", " 31", " 65", " 99", + "192", " 74", "156", " 27", " 95", " 54", " 70", " 13", "110", " 41", + " 90", "173", "125", "196", "130", "183", "102", "190", "132", "105", + " 21", " 53", "139", " 94", "115", " 48", " 44", "179", "128", " 14", + " 72", "119", "153", "168", "197", " 40", "150", "138", " 5", "154", + "169", " 71", "199", "198", "170", " 3", "121", " 20", " 63", "193" }; + + public final static String textkeys[] = { "AAA", "BBB", "CCC", "DDD", "EEE", "FFF", "GGG", "HHH", "III", "JJJ", + "KKK", "LLL", "MMM", "NNN", "OOO", "PPP", "QQQ", "RRR", "SSS", "TTT", + "UUU", "VVV", "XXX", "YYY", "ZZZ"}; + + private final static String keysSorted[] = keys.clone(); + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + // Create result array + Arrays.sort(keysSorted); + + // Create test list + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + // Build test list + long messageId = 0L; + for(final String key : keys) + { + final ServerMessage msg = generateTestMessage(messageId++, key); + _sqel.add(msg); + } + + } + + public QueueEntryList getTestList() + { + return _sqel; + } + + public int getExpectedListLength() + { + return keys.length; + } + + public long getExpectedFirstMsgId() + { + return 67L; + } + + public ServerMessage getTestMessageToAdd() throws AMQException + { + return generateTestMessage(1, "test value"); + } + + private ServerMessage generateTestMessage(final long id, final String keyValue) throws AMQException + { + return new AMQMessage(new MockStoredMessage(id, "KEY", keyValue)); + } + + public void testIterator() + { + super.testIterator(); + + // Test sorted order of list + final QueueEntryIterator<?> iter = getTestList().iterator(); + int count = 0; + while(iter.advance()) + { + assertEquals("Sorted queue entry value does not match sorted key array", + keysSorted[count++], getSortedKeyValue(iter)); + } + } + + private Object getSortedKeyValue(QueueEntryIterator<?> iter) + { + return ((SortedQueueEntryImpl) iter.getNode()).getMessage().getMessageHeader().getHeader("KEY"); + } + + private Long getMessageId(QueueEntryIterator<?> iter) + { + return ((SortedQueueEntryImpl) iter.getNode()).getMessage().getMessageNumber(); + } + + public void testNonUniqueSortKeys() throws Exception + { + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + // Build test list + long messageId = 0L; + while(messageId < 200) + { + final ServerMessage msg = generateTestMessage(messageId++, "samekey"); + _sqel.add(msg); + } + + final QueueEntryIterator<?> iter = getTestList().iterator(); + int count=0; + while(iter.advance()) + { + assertEquals("Sorted queue entry value is not as expected", "samekey", getSortedKeyValue(iter)); + assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter)); + } + } + + public void testNullSortKeys() throws Exception + { + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + // Build test list + long messageId = 0L; + while(messageId < 200) + { + final ServerMessage msg = generateTestMessage(messageId++, null); + _sqel.add(msg); + } + + final QueueEntryIterator<?> iter = getTestList().iterator(); + int count=0; + while(iter.advance()) + { + assertNull("Sorted queue entry value is not as expected", getSortedKeyValue(iter)); + assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter)); } + } + + public void testAscendingSortKeys() throws Exception + { + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + // Build test list + long messageId = 0L; + for(String textKey : textkeys) + { + final ServerMessage msg = generateTestMessage(messageId, textKey); + messageId++; + _sqel.add(msg); + } + + final QueueEntryIterator<?> iter = getTestList().iterator(); + int count=0; + while(iter.advance()) + { + assertEquals("Sorted queue entry value is not as expected", textkeys[count], getSortedKeyValue(iter)); + assertEquals("Message id not as expected", Long.valueOf(count), getMessageId(iter)); + count++; + } + } + + public void testDescendingSortKeys() throws Exception + { + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + // Build test list + long messageId = 0L; + for(int i=textkeys.length-1; i >=0; i--) + { + final ServerMessage msg = generateTestMessage(messageId, textkeys[i]); + messageId++; + _sqel.add(msg); + } + + final QueueEntryIterator<?> iter = getTestList().iterator(); + int count=0; + while(iter.advance()) + { + assertEquals("Sorted queue entry value is not as expected", textkeys[count], getSortedKeyValue(iter)); + assertEquals("Message id not as expected", Long.valueOf(textkeys.length-count-1), getMessageId(iter)); + count++; + } + } + + public void testInsertAfter() throws Exception + { + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + ServerMessage msg = generateTestMessage(1, "A"); + _sqel.add(msg); + + SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "A", 1); + + msg = generateTestMessage(2, "B"); + _sqel.add(msg); + + entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "A", 1); + + entry = _sqel.next(entry); + validateEntry(entry, "B", 2); + } + + public void testInsertBefore() throws Exception + { + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + ServerMessage msg = generateTestMessage(1, "B"); + _sqel.add(msg); + + SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "B", 1); + + msg = generateTestMessage(2, "A"); + _sqel.add(msg); + + entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "A", 2); + + entry = _sqel.next(entry); + validateEntry(entry, "B", 1); + } + + public void testInsertInbetween() throws Exception + { + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + ServerMessage msg = generateTestMessage(1, "A"); + _sqel.add(msg); + SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "A", 1); + + msg = generateTestMessage(2, "C"); + _sqel.add(msg); + + entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "A", 1); + + entry = _sqel.next(entry); + validateEntry(entry, "C", 2); + + msg = generateTestMessage(3, "B"); + _sqel.add(msg); + + entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "A", 1); + + entry = _sqel.next(entry); + validateEntry(entry, "B", 3); + + entry = _sqel.next(entry); + validateEntry(entry, "C", 2); + } + + public void testInsertAtHead() throws Exception + { + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + ServerMessage msg = generateTestMessage(1, "B"); + _sqel.add(msg); + + SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "B", 1); + + msg = generateTestMessage(2, "D"); + _sqel.add(msg); + + entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "B", 1); + + entry = _sqel.next(entry); + validateEntry(entry, "D", 2); + + msg = generateTestMessage(3, "C"); + _sqel.add(msg); + + entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "B", 1); + + entry = _sqel.next(entry); + validateEntry(entry, "C", 3); + + entry = _sqel.next(entry); + validateEntry(entry, "D", 2); + + msg = generateTestMessage(4, "A"); + _sqel.add(msg); + + entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "A", 4); + + entry = _sqel.next(entry); + validateEntry(entry, "B", 1); + + entry = _sqel.next(entry); + validateEntry(entry, "C", 3); + + entry = _sqel.next(entry); + validateEntry(entry, "D", 2); + } + + private void validateEntry(final SortedQueueEntryImpl entry, final String expectedSortKey, final long expectedMessageId) + { + assertEquals("Sorted queue entry value is not as expected", + expectedSortKey, entry.getMessage().getMessageHeader().getHeader("KEY")); + assertEquals("Sorted queue entry id is not as expected", + Long.valueOf(expectedMessageId), entry.getMessage().getMessageNumber()); + } + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 3acd064fd7..1d0a9d6316 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -689,7 +689,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase if (usePriority) { queueArguments = new FieldTable(); - queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL); + queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); } if (lastValueQueue) @@ -767,7 +767,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase private void bindAllQueuesToExchange(Exchange exchange, AMQShortString routingKey) { FieldTable queueArguments = new FieldTable(); - queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL); + queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); @@ -781,7 +781,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase private void bindAllTopicQueuesToExchange(Exchange exchange, AMQShortString routingKey) { FieldTable queueArguments = new FieldTable(); - queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL); + queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); |
