From 77e140a090d6452334effe6ebd9908b39a14a6db Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Fri, 10 Oct 2008 14:41:08 +0000 Subject: QPID-1314: Make sure all messags that are enqueued are dequeued. SimpleAMQQueue - dequeue messages if they are persistent, regardless of queue durability. SimpleAMQQueueTest - make sure that all messages which are stored are removed properly. TestableMemoryMessageStore - override enqueue/dequeue so it's possible to determine what is in the queue at any given point in time. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703485 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/server/queue/SimpleAMQQueue.java | 2 +- .../qpid/server/queue/SimpleAMQQueueTest.java | 39 +++++++++++++++++++++- .../server/store/TestableMemoryMessageStore.java | 19 +++++++++++ 3 files changed, 58 insertions(+), 2 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 9b196e4e3d..bfbcb9e22f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -587,7 +587,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { AMQMessage msg = entry.getMessage(); - if (isDurable() && msg.isPersistent()) + if (msg.isPersistent()) { _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId()); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index e2a438e199..0b7f6afd8f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -21,13 +21,16 @@ package org.apache.qpid.server.queue; */ +import java.util.ArrayList; import java.util.List; import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.ContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.exchange.DirectExchange; @@ -39,6 +42,7 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionImpl; +import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.virtualhost.VirtualHost; public class SimpleAMQQueueTest extends TestCase @@ -46,7 +50,7 @@ public class SimpleAMQQueueTest extends TestCase protected SimpleAMQQueue _queue; protected VirtualHost _virtualHost; - protected MessageStore _store = new TestableMemoryMessageStore(); + protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore(); protected AMQShortString _qname = new AMQShortString("qname"); protected AMQShortString _owner = new AMQShortString("owner"); protected AMQShortString _routingKey = new AMQShortString("routing key"); @@ -328,6 +332,39 @@ public class SimpleAMQQueueTest extends TestCase assertEquals("Message ID was wrong", messageId, msgids.get(i)); } } + + public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException + { + // Create IncomingMessage and nondurable queue + NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null); + IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null); + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); + contentHeaderBody.properties = new BasicContentHeaderProperties(); + ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); + msg.setContentHeaderBody(contentHeaderBody); + ArrayList qs = new ArrayList(); + + // Send persistent message + qs.add(_queue); + msg.enqueue(qs); + msg.routingComplete(_store, new MessageHandleFactory()); + + // Check that it is enqueued + AMQQueue data = _store.getMessages().get(1L); + _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1)); + assertNotNull(data); + + // Dequeue message + MockQueueEntry entry = new MockQueueEntry(); + AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext); + + entry.setMessage(amqmsg); + _queue.dequeue(null, entry); + + // Check that it is dequeued + data = _store.getMessages().get(1L); + assertNull(data); + } // FIXME: move this to somewhere useful diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index 48d808142c..9146fe88ae 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -20,12 +20,15 @@ */ package org.apache.qpid.server.store; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.ContentChunk; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.HashMap; import java.util.List; /** @@ -35,6 +38,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore { MemoryMessageStore _mms = null; + private HashMap _messages = new HashMap(); public TestableMemoryMessageStore(MemoryMessageStore mms) { @@ -70,4 +74,19 @@ public class TestableMemoryMessageStore extends MemoryMessageStore return _contentBodyMap; } } + + public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + { + getMessages().put(messageId, queue); + } + + public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + { + getMessages().remove(messageId); + } + + public HashMap getMessages() + { + return _messages; + } } -- cgit v1.2.1