summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java39
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java19
3 files changed, 58 insertions, 2 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 9b196e4e3d..bfbcb9e22f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -587,7 +587,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
AMQMessage msg = entry.getMessage();
- if (isDurable() && msg.isPersistent())
+ if (msg.isPersistent())
{
_virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId());
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index e2a438e199..0b7f6afd8f 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -21,13 +21,16 @@ package org.apache.qpid.server.queue;
*/
+import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.exchange.DirectExchange;
@@ -39,6 +42,7 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionImpl;
+import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class SimpleAMQQueueTest extends TestCase
@@ -46,7 +50,7 @@ public class SimpleAMQQueueTest extends TestCase
protected SimpleAMQQueue _queue;
protected VirtualHost _virtualHost;
- protected MessageStore _store = new TestableMemoryMessageStore();
+ protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore();
protected AMQShortString _qname = new AMQShortString("qname");
protected AMQShortString _owner = new AMQShortString("owner");
protected AMQShortString _routingKey = new AMQShortString("routing key");
@@ -328,6 +332,39 @@ public class SimpleAMQQueueTest extends TestCase
assertEquals("Message ID was wrong", messageId, msgids.get(i));
}
}
+
+ public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
+ {
+ // Create IncomingMessage and nondurable queue
+ NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
+ IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null);
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.properties = new BasicContentHeaderProperties();
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+ msg.setContentHeaderBody(contentHeaderBody);
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+
+ // Send persistent message
+ qs.add(_queue);
+ msg.enqueue(qs);
+ msg.routingComplete(_store, new MessageHandleFactory());
+
+ // Check that it is enqueued
+ AMQQueue data = _store.getMessages().get(1L);
+ _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1));
+ assertNotNull(data);
+
+ // Dequeue message
+ MockQueueEntry entry = new MockQueueEntry();
+ AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext);
+
+ entry.setMessage(amqmsg);
+ _queue.dequeue(null, entry);
+
+ // Check that it is dequeued
+ data = _store.getMessages().get(1L);
+ assertNull(data);
+ }
// FIXME: move this to somewhere useful
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index 48d808142c..9146fe88ae 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.server.store;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.HashMap;
import java.util.List;
/**
@@ -35,6 +38,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
{
MemoryMessageStore _mms = null;
+ private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>();
public TestableMemoryMessageStore(MemoryMessageStore mms)
{
@@ -70,4 +74,19 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
return _contentBodyMap;
}
}
+
+ public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ {
+ getMessages().put(messageId, queue);
+ }
+
+ public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ {
+ getMessages().remove(messageId);
+ }
+
+ public HashMap<Long, AMQQueue> getMessages()
+ {
+ return _messages;
+ }
}