diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-10-10 14:41:08 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-10-10 14:41:08 +0000 |
| commit | 77e140a090d6452334effe6ebd9908b39a14a6db (patch) | |
| tree | e2a6de68ced835d502d84247f9e4336e23afa46c /java | |
| parent | 4dbc8f73b2a337b081645011476cc565d47fd35f (diff) | |
| download | qpid-python-77e140a090d6452334effe6ebd9908b39a14a6db.tar.gz | |
QPID-1314: Make sure all messags that are enqueued are dequeued.
SimpleAMQQueue - dequeue messages if they are persistent, regardless of queue durability.
SimpleAMQQueueTest - make sure that all messages which are stored are removed properly.
TestableMemoryMessageStore - override enqueue/dequeue so it's possible to determine what is in the queue at any given point in time.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703485 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
3 files changed, 58 insertions, 2 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 9b196e4e3d..bfbcb9e22f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -587,7 +587,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { AMQMessage msg = entry.getMessage(); - if (isDurable() && msg.isPersistent()) + if (msg.isPersistent()) { _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId()); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index e2a438e199..0b7f6afd8f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -21,13 +21,16 @@ package org.apache.qpid.server.queue; */ +import java.util.ArrayList; import java.util.List; import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.ContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.exchange.DirectExchange; @@ -39,6 +42,7 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionImpl; +import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.virtualhost.VirtualHost; public class SimpleAMQQueueTest extends TestCase @@ -46,7 +50,7 @@ public class SimpleAMQQueueTest extends TestCase protected SimpleAMQQueue _queue; protected VirtualHost _virtualHost; - protected MessageStore _store = new TestableMemoryMessageStore(); + protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore(); protected AMQShortString _qname = new AMQShortString("qname"); protected AMQShortString _owner = new AMQShortString("owner"); protected AMQShortString _routingKey = new AMQShortString("routing key"); @@ -328,6 +332,39 @@ public class SimpleAMQQueueTest extends TestCase assertEquals("Message ID was wrong", messageId, msgids.get(i)); } } + + public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException + { + // Create IncomingMessage and nondurable queue + NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null); + IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null); + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); + contentHeaderBody.properties = new BasicContentHeaderProperties(); + ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); + msg.setContentHeaderBody(contentHeaderBody); + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + + // Send persistent message + qs.add(_queue); + msg.enqueue(qs); + msg.routingComplete(_store, new MessageHandleFactory()); + + // Check that it is enqueued + AMQQueue data = _store.getMessages().get(1L); + _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1)); + assertNotNull(data); + + // Dequeue message + MockQueueEntry entry = new MockQueueEntry(); + AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext); + + entry.setMessage(amqmsg); + _queue.dequeue(null, entry); + + // Check that it is dequeued + data = _store.getMessages().get(1L); + assertNull(data); + } // FIXME: move this to somewhere useful diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index 48d808142c..9146fe88ae 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -20,12 +20,15 @@ */ package org.apache.qpid.server.store; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.ContentChunk; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.HashMap; import java.util.List; /** @@ -35,6 +38,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore { MemoryMessageStore _mms = null; + private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>(); public TestableMemoryMessageStore(MemoryMessageStore mms) { @@ -70,4 +74,19 @@ public class TestableMemoryMessageStore extends MemoryMessageStore return _contentBodyMap; } } + + public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + { + getMessages().put(messageId, queue); + } + + public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + { + getMessages().remove(messageId); + } + + public HashMap<Long, AMQQueue> getMessages() + { + return _messages; + } } |
