diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-06-07 11:18:41 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-06-07 11:18:41 +0000 |
| commit | 67788db66c38665e454ecbf977e1d376bf7eea97 (patch) | |
| tree | fb6ee01c54b5350abc109315ae03b6e254cc22b1 /qpid/java | |
| parent | 174789ad53876088c38ea365a65ae54ad2c48867 (diff) | |
| download | qpid-python-67788db66c38665e454ecbf977e1d376bf7eea97.tar.gz | |
QPID-3219: update handling of QueueEntries to exclude use of entries in the intermediate 'dequeued' state, simplify logic in general.
Applied patch from Oleksandr Rudyy <orudyy@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1132959 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
10 files changed, 872 insertions, 17 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java index b6e97e08fb..371ae0de50 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java @@ -60,7 +60,7 @@ public class AMQPriorityQueue extends SimpleAMQQueue { // check that all subscriptions are not in advance of the entry SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator(); - while(subIter.advance() && !entry.isAcquired()) + while(subIter.advance() && entry.isAvailable()) { final Subscription subscription = subIter.getNode().getSubscription(); if(!subscription.isClosed()) @@ -70,7 +70,7 @@ public class AMQPriorityQueue extends SimpleAMQQueue { QueueEntry subnode = context._lastSeenEntry; QueueEntry released = context._releasedEntry; - while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0)) + while(subnode != null && entry.compareTo(subnode) < 0 && entry.isAvailable() && (released == null || released.compareTo(entry) < 0)) { if(QueueContext._releasedUpdater.compareAndSet(context,released,entry)) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 79ede2694e..88349586c3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -52,6 +52,17 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable } public abstract State getState(); + + /** + * Returns true if state is either DEQUEUED or DELETED. + * + * @return true if state is either DEQUEUED or DELETED. + */ + public boolean isDispensed() + { + State currentState = getState(); + return currentState == State.DEQUEUED || currentState == State.DELETED; + } } @@ -207,4 +218,18 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable void addStateChangeListener(StateChangeListener listener); boolean removeStateChangeListener(StateChangeListener listener); + + /** + * Returns true if entry is in DEQUEUED state, otherwise returns false. + * + * @return true if entry is in DEQUEUED state, otherwise returns false + */ + boolean isDequeued(); + + /** + * Returns true if entry is either DEQUED or DELETED state. + * + * @return true if entry is either DEQUED or DELETED state + */ + boolean isDispensed(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 809ba3277e..bc452d2d72 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -499,7 +499,7 @@ public class QueueEntryImpl implements QueueEntry { QueueEntryImpl next = nextNode(); - while(next != null && next.isDeleted()) + while(next != null && next.isDispensed() ) { final QueueEntryImpl newNext = next.nextNode(); @@ -547,4 +547,16 @@ public class QueueEntryImpl implements QueueEntry return _queueEntryList; } + @Override + public boolean isDequeued() + { + return _state == DEQUEUED_STATE; + } + + @Override + public boolean isDispensed() + { + return _state.isDispensed(); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b02d03a1ad..274cb6714a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -629,7 +629,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // this catches the case where we *just* miss an update int loops = 2; - while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0) + while (entry.isAvailable() && loops != 0) { if (nextNode == null) { @@ -648,7 +648,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - if (!(entry.isAcquired() || entry.isDeleted())) + if (entry.isAvailable()) { checkSubscriptionsNotAheadOfDelivery(entry); @@ -942,7 +942,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if (node != null && !node.isDeleted()) + if (node != null && !node.isDispensed()) { entryList.add(node); } @@ -1046,7 +1046,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance() && !filter.filterComplete()) { QueueEntry node = queueListIterator.getNode(); - if (!node.isDeleted() && filter.accept(node)) + if (!node.isDispensed() && filter.accept(node)) { entryList.add(node); } @@ -1240,7 +1240,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if ((messageId >= fromMessageId) && (messageId <= toMessageId) - && !node.isDeleted() && node.acquire()) { dequeueEntry(node); @@ -1270,7 +1269,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (noDeletes && queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if (!node.isDeleted() && node.acquire()) + if (node.acquire()) { dequeueEntry(node); noDeletes = false; @@ -1300,7 +1299,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if (!node.isDeleted() && node.acquire()) + if (node.acquire()) { dequeueEntry(node, txn); if(++count == request) @@ -1654,7 +1653,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = getNextAvailableEntry(sub); - if (node != null && !(node.isAcquired() || node.isDeleted())) + if (node != null && node.isAvailable()) { if (sub.hasInterest(node)) { @@ -1715,7 +1714,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); boolean expired = false; - while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node))) + while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node))) { if (expired) { @@ -1884,8 +1883,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - // Only process nodes that are not currently deleted - if (!node.isDeleted()) + // Only process nodes that are not currently deleted and not dequeued + if (!node.isDispensed()) { // If the node has exired then aquire it if (node.expired() && node.acquire()) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index b97c2c55c5..46baab8c85 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -1,6 +1,5 @@ package org.apache.qpid.server.queue; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -156,7 +155,7 @@ public class SimpleQueueEntryList implements QueueEntryList if(!atTail()) { QueueEntryImpl nextNode = _lastNode.nextNode(); - while(nextNode.isDeleted() && nextNode.nextNode() != null) + while(nextNode.isDispensed() && nextNode.nextNode() != null) { nextNode = nextNode.nextNode(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 9e831b2a8e..ae58a52046 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -482,6 +482,18 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase { return 0; //To change body of implemented methods use File | Settings | File Templates. } + + @Override + public boolean isDequeued() + { + return false; + } + + @Override + public boolean isDispensed() + { + return false; + } }; if(action != null) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 5bdbe2c68e..0ba8eb4792 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -231,4 +231,16 @@ public class MockQueueEntry implements QueueEntry _message = msg; } + @Override + public boolean isDequeued() + { + return false; + } + + @Override + public boolean isDispensed() + { + return false; + } + } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java new file mode 100644 index 0000000000..0899f25cc5 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.queue; + +import java.lang.reflect.Field; + +import junit.framework.TestCase; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry.EntryState; +import org.apache.qpid.server.subscription.MockSubscription; + +/** + * Tests for {@link QueueEntryImpl} + * + */ +public class QueueEntryImplTest extends TestCase +{ + // tested entry + private QueueEntryImpl _queueEntry; + + public void setUp() throws Exception + { + AMQMessage message = new MockAMQMessage(1); + SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + _queueEntry = new QueueEntryImpl(queueEntryList, message, 1); + } + + public void testAquire() + { + assertTrue("Queue entry should be in AVAILABLE state before invoking of acquire method", + _queueEntry.isAvailable()); + acquire(); + } + + public void testDequeue() + { + dequeue(); + } + + public void testDelete() + { + delete(); + } + + /** + * Tests release method for entry in acquired state. + * <p> + * Entry in state ACQUIRED should be released and its status should be + * changed to AVAILABLE. + */ + public void testReleaseAquired() + { + acquire(); + _queueEntry.release(); + assertTrue("Queue entry should be in AVAILABLE state after invoking of release method", + _queueEntry.isAvailable()); + } + + /** + * Tests release method for entry in dequeued state. + * <p> + * Invoking release on dequeued entry should not have any effect on its + * state. + */ + public void testReleaseDequeued() + { + dequeue(); + _queueEntry.release(); + EntryState state = getState(); + assertEquals("Invoking of release on entry in DEQUEUED state should not have any effect", + QueueEntry.DEQUEUED_STATE, state); + } + + /** + * Tests release method for entry in deleted state. + * <p> + * Invoking release on deleted entry should not have any effect on its + * state. + */ + public void testReleaseDeleted() + { + delete(); + _queueEntry.release(); + assertTrue("Invoking of release on entry in DELETED state should not have any effect", + _queueEntry.isDeleted()); + } + + /** + * Tests if entries in DEQUQUED or DELETED state are not returned by getNext method. + */ + public void testGetNext() + { + int numberOfEntries = 5; + QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries]; + SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + + // create test entries + for(int i = 0; i < numberOfEntries ; i++) + { + AMQMessage message = null;; + try + { + message = new MockAMQMessage(i); + } + catch (AMQException e) + { + fail("Failure to create a mock message:" + e.getMessage()); + } + QueueEntryImpl entry = (QueueEntryImpl)queueEntryList.add(message); + entries[i] = entry; + } + + // test getNext for not acquired entries + for(int i = 0; i < numberOfEntries ; i++) + { + QueueEntryImpl queueEntry = entries[i]; + QueueEntryImpl next = queueEntry.getNext(); + if (i < numberOfEntries - 1) + { + assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next); + } + else + { + assertNull("The next entry after the last should be null", next); + } + } + + // delete second + entries[1].acquire(); + entries[1].delete(); + + // dequeue third + entries[2].acquire(); + entries[2].dequeue(); + + QueueEntryImpl next = entries[0].getNext(); + assertEquals("expected forth entry",entries[3], next); + next = next.getNext(); + assertEquals("expected fifth entry", entries[4], next); + next = next.getNext(); + assertNull("The next entry after the last should be null", next); + } + /** + * A helper method to put tested object into deleted state and assert the state + */ + private void delete() + { + _queueEntry.delete(); + assertTrue("Queue entry should be in DELETED state after invoking of delete method", + _queueEntry.isDeleted()); + } + + /** + * A helper method to put tested entry into dequeue state and assert the sate + */ + private void dequeue() + { + acquire(); + _queueEntry.dequeue(); + EntryState state = getState(); + assertEquals("Queue entry should be in DEQUEUED state after invoking of dequeue method", + QueueEntry.DEQUEUED_STATE, state); + } + + /** + * A helper method to put tested entry into acquired state and assert the sate + */ + private void acquire() + { + _queueEntry.acquire(new MockSubscription()); + assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method", + _queueEntry.isAcquired()); + } + + /** + * A helper method to get entry state + * + * @return entry state + */ + private EntryState getState() + { + EntryState state = null; + try + { + Field f = QueueEntryImpl.class.getDeclaredField("_state"); + f.setAccessible(true); + state = (EntryState) f.get(_queueEntry); + } + catch (Exception e) + { + fail("Failure to get a state field: " + e.getMessage()); + } + return state; + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index abe2d1728f..4c21f38363 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -36,13 +36,16 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction; +import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -51,6 +54,8 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class SimpleAMQQueueTest extends InternalBrokerBaseCase { @@ -735,6 +740,533 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase verifyRecievedMessages(msgListSub3, sub3.getMessages()); } + /** + * Tests that dequeued message is not present in the list returned form + * {@link SimpleAMQQueue#getMessagesOnTheQueue()} + */ + public void testGetMessagesOnTheQueueWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // send test messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // get messages on the queue + List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); + + // assert queue entries + assertEquals(messageNumber - 1, entries.size()); + int expectedId = 0; + for (int i = 0; i < messageNumber - 1; i++) + { + Long id = ((AMQMessage) entries.get(i).getMessage()).getMessageId(); + if (i == dequeueMessageIndex) + { + assertFalse("Message with id " + dequeueMessageIndex + + " was dequeued and should not be returned by method getMessagesOnTheQueue!", + new Long(expectedId).equals(id)); + expectedId++; + } + assertEquals("Expected message with id " + expectedId + " but got message with id " + id, + new Long(expectedId), id); + expectedId++; + } + } + + /** + * Tests that dequeued message is not present in the list returned form + * {@link SimpleAMQQueue#getMessagesOnTheQueue(QueueEntryFilter)} + */ + public void testGetMessagesOnTheQueueByQueueEntryFilterWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // send test messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // get messages on the queue with filter accepting all available messages + List<QueueEntry> entries = _queue.getMessagesOnTheQueue(new QueueEntryFilter() + { + + @Override + public boolean accept(QueueEntry entry) + { + return true; + } + + @Override + public boolean filterComplete() + { + return false; + } + }); + + // assert entries on the queue + assertEquals(messageNumber - 1, entries.size()); + int expectedId = 0; + for (int i = 0; i < messageNumber - 1; i++) + { + Long id = ((AMQMessage) entries.get(i).getMessage()).getMessageId(); + if (i == dequeueMessageIndex) + { + assertFalse("Message with id " + dequeueMessageIndex + + " was dequeued and should not be returned by method getMessagesOnTheQueue!", + new Long(expectedId).equals(id)); + expectedId++; + } + assertEquals("Expected message with id " + expectedId + " but got message with id " + id, + new Long(expectedId), id); + expectedId++; + } + } + + /** + * Tests that dequeued message is not copied as part of invocation of + * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, StoreContext)} + */ + public void testCopyMessagesWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + String anotherQueueName = "testQueue2"; + + // put test messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // create another queue + SimpleAMQQueue queue = createQueue(anotherQueueName); + + // create transaction + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + + // copy messages into another queue + _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); + + // commit transaction + txn.commit(); + + // get messages on another queue + List<QueueEntry> entries = queue.getMessagesOnTheQueue(); + + // assert another queue entries + assertEquals(messageNumber - 1, entries.size()); + int expectedId = 0; + for (int i = 0; i < messageNumber - 1; i++) + { + Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId(); + if (i == dequeueMessageIndex) + { + assertFalse("Message with id " + dequeueMessageIndex + + " was dequeued and should not been copied into another queue!", + new Long(expectedId).equals(id)); + expectedId++; + } + assertEquals("Expected message with id " + expectedId + " but got message with id " + id, + new Long(expectedId), id); + expectedId++; + } + } + + /** + * Tests that dequeued message is not moved as part of invocation of + * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, StoreContext)} + */ + public void testMovedMessagesWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + String anotherQueueName = "testQueue2"; + + // put messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // create another queue + SimpleAMQQueue queue = createQueue(anotherQueueName); + + // create transaction + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + + // move messages into another queue + _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); + + // commit transaction + txn.commit(); + + // get messages on another queue + List<QueueEntry> entries = queue.getMessagesOnTheQueue(); + + // assert another queue entries + assertEquals(messageNumber - 1, entries.size()); + int expectedId = 0; + for (int i = 0; i < messageNumber - 1; i++) + { + Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId(); + if (i == dequeueMessageIndex) + { + assertFalse("Message with id " + dequeueMessageIndex + + " was dequeued and should not been copied into another queue!", + new Long(expectedId).equals(id)); + expectedId++; + } + assertEquals("Expected message with id " + expectedId + " but got message with id " + id, + new Long(expectedId), id); + expectedId++; + } + } + + /** + * Tests that messages in given range including dequeued one are deleted + * from the queue on invocation of + * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long, StoreContext)} + */ + public void testRemoveMessagesFromQueueWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // put messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // remove messages + _queue.removeMessagesFromQueue(0, messageNumber); + + // get queue entries + List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); + + // assert queue entries + assertNotNull("Null is returned from getMessagesOnTheQueue", entries); + assertEquals("Queue should be empty", 0, entries.size()); + } + + /** + * Tests that dequeued message on the top is not accounted and next message + * is deleted from the queue on invocation of + * {@link SimpleAMQQueue#deleteMessageFromTop(StoreContext)} + */ + public void testDeleteMessageFromTopWithDequeuedEntryOnTop() + { + int messageNumber = 4; + int dequeueMessageIndex = 0; + + // put messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message on top + dequeueMessage(_queue, dequeueMessageIndex); + + //delete message from top + _queue.deleteMessageFromTop(); + + //get queue netries + List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); + + // assert queue entries + assertNotNull("Null is returned from getMessagesOnTheQueue", entries); + assertEquals("Expected " + (messageNumber - 2) + " number of messages but recieved " + entries.size(), + messageNumber - 2, entries.size()); + assertEquals("Expected first entry with id 2", new Long(2), + ((AMQMessage) entries.get(0).getMessage()).getMessageId()); + } + + /** + * Tests that all messages including dequeued one are deleted from the queue + * on invocation of {@link SimpleAMQQueue#clearQueue(StoreContext)} + */ + public void testClearQueueWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // put messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message on a test queue + dequeueMessage(_queue, dequeueMessageIndex); + + // clean queue + try + { + _queue.clearQueue(); + } + catch (AMQException e) + { + fail("Failure to clear queue:" + e.getMessage()); + } + + // get queue entries + List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); + + // assert queue entries + assertNotNull(entries); + assertEquals(0, entries.size()); + } + + /** + * Tests whether dequeued entry is sent to subscriber in result of + * invocation of {@link SimpleAMQQueue#processQueue(QueueRunner)} + */ + public void testProcessQueueWithDequeuedEntry() + { + // total number of messages to send + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // create queue with overridden method deliverAsync + SimpleAMQQueue testQueue = new SimpleAMQQueue(new AMQShortString("test"), false, + new AMQShortString("testOwner"), false, false, _virtualHost, null) + { + @Override + public void deliverAsync(Subscription sub) + { + // do nothing + } + }; + + // put messages + List<QueueEntry> entries = enqueueGivenNumberOfMessages(testQueue, messageNumber); + + // dequeue message + dequeueMessage(testQueue, dequeueMessageIndex); + + // latch to wait for message receipt + final CountDownLatch latch = new CountDownLatch(messageNumber -1); + + // create a subscription + MockSubscription subscription = new MockSubscription() + { + /** + * Send a message and decrement latch + */ + public void send(QueueEntry msg) throws AMQException + { + super.send(msg); + latch.countDown(); + } + }; + + try + { + // subscribe + testQueue.registerSubscription(subscription, false); + + // process queue + testQueue.processQueue(new QueueRunner(testQueue, 1) + { + public void run() + { + // do nothing + } + }); + } + catch (AMQException e) + { + fail("Failure to process queue:" + e.getMessage()); + } + // wait up to 1 minute for message receipt + try + { + latch.await(1, TimeUnit.MINUTES); + } + catch (InterruptedException e1) + { + Thread.currentThread().interrupt(); + } + List<QueueEntry> expected = createEntriesList(entries.get(0), entries.get(2), entries.get(3)); + verifyRecievedMessages(expected, subscription.getMessages()); + } + + /** + * Tests that entry in dequeued state are not enqueued and not delivered to subscription + */ + public void testEqueueDequeuedEntry() + { + // create a queue where each even entry is considered a dequeued + SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("test"), false, new AMQShortString("testOwner"), + false, false, _virtualHost, new QueueEntryListFactory() + { + public QueueEntryList createQueueEntryList(AMQQueue queue) + { + /** + * Override SimpleQueueEntryList to create a dequeued + * entries for messages with even id + */ + return new SimpleQueueEntryList(queue) + { + /** + * Entries with even message id are considered + * dequeued! + */ + protected QueueEntryImpl createQueueEntry(final ServerMessage message) + { + return new QueueEntryImpl(this, message) + { + public boolean isDequeued() + { + return (((AMQMessage) message).getMessageId().longValue() % 2 == 0); + } + + public boolean isDispensed() + { + return (((AMQMessage) message).getMessageId().longValue() % 2 == 0); + } + + public boolean isAvailable() + { + return !(((AMQMessage) message).getMessageId().longValue() % 2 == 0); + } + }; + } + }; + } + }, null); + // create a subscription + MockSubscription subscription = new MockSubscription(); + + // register subscription + try + { + queue.registerSubscription(subscription, false); + } + catch (AMQException e) + { + fail("Failure to register subscription:" + e.getMessage()); + } + + // put test messages into a queue + putGivenNumberOfMessages(queue, 4); + + // assert received messages + List<QueueEntry> messages = subscription.getMessages(); + assertEquals("Only 2 messages should be returned", 2, messages.size()); + assertEquals("ID of first message should be 1", new Long(1), + ((AMQMessage) messages.get(0).getMessage()).getMessageId()); + assertEquals("ID of second message should be 3", new Long(3), + ((AMQMessage) messages.get(1).getMessage()).getMessageId()); + } + + /** + * A helper method to create a queue with given name + * + * @param name + * queue name + * @return queue + */ + private SimpleAMQQueue createQueue(String name) + { + SimpleAMQQueue queue = null; + try + { + AMQShortString queueName = new AMQShortString(name); + AMQShortString ownerName = new AMQShortString(name + "Owner"); + queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(queueName, false, ownerName, false, false, + _virtualHost, _arguments); + } + catch (AMQException e) + { + fail("Failure to create a queue:" + e.getMessage()); + } + assertNotNull("Queue was not created", queue); + return queue; + } + + /** + * A helper method to put given number of messages into queue + * <p> + * All messages are asserted that they are present on queue + * + * @param queue + * queue to put messages into + * @param messageNumber + * number of messages to put into queue + */ + private List<QueueEntry> enqueueGivenNumberOfMessages(AMQQueue queue, int messageNumber) + { + putGivenNumberOfMessages(queue, messageNumber); + + // make sure that all enqueued messages are on the queue + List<QueueEntry> entries = queue.getMessagesOnTheQueue(); + assertEquals(messageNumber, entries.size()); + for (int i = 0; i < messageNumber; i++) + { + assertEquals(new Long(i), ((AMQMessage)entries.get(i).getMessage()).getMessageId()); + } + return entries; + } + + /** + * A helper method to put given number of messages into queue + * <p> + * Queue is not checked if messages are added into queue + * + * @param queue + * queue to put messages into + * @param messageNumber + * number of messages to put into queue + * @param queue + * @param messageNumber + */ + private void putGivenNumberOfMessages(AMQQueue queue, int messageNumber) + { + for (int i = 0; i < messageNumber; i++) + { + // Create message + Long messageId = new Long(i); + AMQMessage message = null; + try + { + message = createMessage(messageId); + } + catch (AMQException e) + { + fail("Failure to create a test message:" + e.getMessage()); + } + // Put message on queue + try + { + queue.enqueue(message); + } + catch (AMQException e) + { + fail("Failure to put message on queue:" + e.getMessage()); + } + } + } + + /** + * A helper method to dequeue an entry on queue with given index + * + * @param queue + * queue to dequeue message on + * @param dequeueMessageIndex + * entry index to dequeue. + */ + private QueueEntry dequeueMessage(AMQQueue queue, int dequeueMessageIndex) + { + List<QueueEntry> entries = queue.getMessagesOnTheQueue(); + QueueEntry entry = entries.get(dequeueMessageIndex); + entry.acquire(); + entry.dequeue(); + assertTrue(entry.isDequeued()); + return entry; + } + private List<QueueEntry> createEntriesList(QueueEntry... entries) { ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java index 320a75045a..7136f07ca5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.qpid.AMQException; import org.apache.qpid.server.message.AMQMessage; import junit.framework.TestCase; @@ -155,5 +156,55 @@ public class SimpleQueueEntryListTest extends TestCase assertEquals("Count should have been equal",count,remainingMessages.size()); } - + + public void testDequedMessagedNotPresentInIterator() + { + int numberOfMessages = 10; + SimpleQueueEntryList entryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + QueueEntry[] entries = new QueueEntry[numberOfMessages]; + + for(int i = 0; i < numberOfMessages ; i++) + { + AMQMessage message = null;; + try + { + message = new MockAMQMessage(i); + } + catch (AMQException e) + { + fail("Failure to create a mock message:" + e.getMessage()); + } + QueueEntry entry = entryList.add(message); + assertNotNull("QE should not be null", entry); + entries[i]= entry; + } + + // dequeue all even messages + for (QueueEntry queueEntry : entries) + { + long i = ((AMQMessage)queueEntry.getMessage()).getMessageId().longValue(); + if (i%2 == 0) + { + queueEntry.acquire(); + queueEntry.dequeue(); + } + } + + // iterate and check that dequeued messages are not returned by iterator + QueueEntryIterator it = entryList.iterator(); + int counter = 0; + int i = 1; + while (it.advance()) + { + QueueEntry entry = it.getNode(); + Long id = ((AMQMessage)entry.getMessage()).getMessageId(); + assertEquals("Expected message with id " + i + " but got message with id " + + id, new Long(i), id); + counter++; + i += 2; + } + int expectedNumber = numberOfMessages / 2; + assertEquals("Expected " + expectedNumber + " number of entries in iterator but got " + counter, + expectedNumber, counter); + } } |
