summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-07-25 14:24:36 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-07-25 14:24:36 +0000
commit42bfb186da9e911c208f22dd5f6c794b9bddd859 (patch)
tree5c0d345cd36b6ef2d17f0abf39f247fc7fdc21c9 /qpid/java
parent9a08d3ffc0a21d501a33a2b318fca72f85d0c096 (diff)
downloadqpid-python-42bfb186da9e911c208f22dd5f6c794b9bddd859.tar.gz
QPID-4304 : [Java Broker] Add an attribute to queues - "messageDurability" - which controls whether message data is persisted or not. By default, depend on the persistence setting of the message, but allow an individual queue to declare that all (or no) messages should be persisted on the queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1613440 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java43
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java5
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java12
-rw-r--r--qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java39
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java88
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java35
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java42
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java6
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java2
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java4
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java38
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java36
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java2
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java49
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java16
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java80
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java16
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java16
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java11
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java6
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java66
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java6
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java2
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java6
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java6
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java5
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html11
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js2
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java63
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java216
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java5
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes4
-rw-r--r--qpid/java/test-profiles/JavaTransientExcludes2
49 files changed, 672 insertions, 349 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index cf187fe1e9..be592a0d42 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -55,7 +55,6 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMemoryMessage;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.Xid;
@@ -123,14 +122,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
long newMessageId = getNextMessageId();
- if (metaData.isPersistent())
- {
- return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData);
- }
- else
- {
- return new StoredMemoryMessage<T>(newMessageId, metaData);
- }
+ return new StoredBDBMessage<T>(newMessageId, metaData);
}
public long getNextMessageId()
@@ -1049,7 +1041,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
protected abstract Logger getLogger();
- private class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+ class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
{
private final long _messageId;
@@ -1177,8 +1169,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- @Override
- public synchronized StoreFuture flushToStore()
+ synchronized StoreFuture flushToStore()
{
if(!stored())
{
@@ -1229,6 +1220,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
private Transaction _txn;
private int _storeSizeIncrease;
+ private final List<Runnable> _onCommitActions = new ArrayList<>();
private BDBTransaction() throws StoreException
{
@@ -1250,8 +1242,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
if(message.getStoredMessage() instanceof StoredBDBMessage)
{
final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
- storedMessage.store(_txn);
- _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ _onCommitActions.add(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ storedMessage.store(_txn);
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ }
+ });
+
}
AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
@@ -1269,16 +1269,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public void commitTran() throws StoreException
{
checkMessageStoreOpen();
-
+ doPreCommitActions();
AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
}
+ private void doPreCommitActions()
+ {
+ for(Runnable action : _onCommitActions)
+ {
+ action.run();
+ }
+ _onCommitActions.clear();
+ }
+
@Override
public StoreFuture commitTranAsync() throws StoreException
{
checkMessageStoreOpen();
-
+ doPreCommitActions();
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
return AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
}
@@ -1287,7 +1296,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public void abortTran() throws StoreException
{
checkMessageStoreOpen();
-
+ _onCommitActions.clear();
AbstractBDBMessageStore.this.abortTran(_txn);
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
index 8fb011152c..1f4cf45ce1 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
@@ -27,6 +27,7 @@ import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -131,9 +132,9 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
}
@Override
- public boolean isDurable()
+ public MessageDurability getMessageDurability()
{
- return true;
+ return MessageDurability.DEFAULT;
}
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index a96dc8b142..bace585e56 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -51,9 +54,6 @@ import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.util.FileUtils;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* Subclass of MessageStoreTestCase which runs the standard tests from the superclass against
* the BDB Store as well as additional tests specific to the BDB store-implementation.
@@ -113,7 +113,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
storedMessage_0_8.addContent(0, firstContentBytes_0_8);
storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8);
- storedMessage_0_8.flushToStore();
+ ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
/*
* Create and insert a 0-10 message (metadata and content)
@@ -132,7 +132,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
long messageid_0_10 = storedMessage_0_10.getMessageNumber();
storedMessage_0_10.addContent(0, completeContentBody_0_10);
- storedMessage_0_10.flushToStore();
+ ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_10).flushToStore();
/*
* reload the store only (read-only)
@@ -387,7 +387,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
storedMessage_0_8.addContent(0, chunk1);
- storedMessage_0_8.flushToStore();
+ ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
return storedMessage_0_8;
}
diff --git a/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java b/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java
index 59fe72e377..c9c3afccba 100644
--- a/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java
+++ b/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java
@@ -24,7 +24,7 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-@Retention(RetentionPolicy.SOURCE)
+@Retention(RetentionPolicy.CLASS)
@Target(ElementType.TYPE)
public @interface PluggableService
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index fdc2fa90a5..4f2327adee 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -235,12 +235,6 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
}
@Override
- public StoreFuture flushToStore()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 21112b6309..db53f840de 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.model;
import java.util.Collection;
import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.server.store.MessageDurability;
@ManagedObject( defaultType = "standard" )
public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@@ -35,6 +36,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
String ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "alertThresholdQueueDepthMessages";
String ALTERNATE_EXCHANGE = "alternateExchange";
String EXCLUSIVE = "exclusive";
+ String MESSAGE_DURABILITY = "messageDurability";
String MESSAGE_GROUP_KEY = "messageGroupKey";
String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups";
String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup";
@@ -130,6 +132,10 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@ManagedAttribute( defaultValue = "${queue.alertRepeatGap}")
long getAlertRepeatGap();
+ @ManagedAttribute( defaultValue = "DEFAULT" )
+ MessageDurability getMessageDurability();
+
+
//children
Collection<? extends Binding> getBindings();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 479029093f..2aeca6f45f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -77,6 +77,7 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
@@ -175,6 +176,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@ManagedAttributeField
private ExclusivityPolicy _exclusive;
+ @ManagedAttributeField
+ private MessageDurability _messageDurability;
+
private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name
private final Set<NotificationCheck> _notificationChecks =
@@ -245,12 +249,38 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
super.onCreate();
+ if(isDurable() && (getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+ || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END))
+ {
+ Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(),
+ new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ setAttribute(AbstractConfiguredObject.DURABLE, true, false);
+ return null;
+ }
+ });
+ }
- if (isDurable() && !(getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
- || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END))
+ if (isDurable())
{
_virtualHost.getDurableConfigurationStore().create(asObjectRecord());
}
+ else if(getMessageDurability() != MessageDurability.NEVER)
+ {
+ Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(),
+ new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ setAttribute(Queue.MESSAGE_DURABILITY, getMessageDurability(), MessageDurability.NEVER);
+ return null;
+ }
+ });
+ }
_recovering.set(false);
}
@@ -510,6 +540,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
}
+ @Override
+ public final MessageDurability getMessageDurability()
+ {
+ return _messageDurability;
+ }
@Override
public Collection<String> getAvailableAttributes()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index 15df952e61..37e82b0771 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -52,6 +52,9 @@ public class QueueArgumentsConverter
public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
public static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group";
+
+ public static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
+
public static final String QPID_TRACE_EXCLUDE = "qpid.trace.exclude";
public static final String QPID_TRACE_ID = "qpid.trace.id";
@@ -91,6 +94,7 @@ public class QueueArgumentsConverter
ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_MESSAGE_GROUP_ARG, Queue.MESSAGE_GROUP_DEFAULT_GROUP);
ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL);
+ ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY);
}
@@ -138,7 +142,12 @@ public class QueueArgumentsConverter
{
if(modelArguments.containsKey(entry.getValue()))
{
- wireArguments.put(entry.getKey(), modelArguments.get(entry.getValue()));
+ Object value = modelArguments.get(entry.getValue());
+ if(value instanceof Enum)
+ {
+ value = ((Enum) value).name();
+ }
+ wireArguments.put(entry.getKey(), value);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 24866e4e2e..f2f85e1387 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -447,14 +447,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
checkMessageStoreOpen();
- if(metaData.isPersistent())
- {
- return new StoredJDBCMessage(getNextMessageId(), metaData);
- }
- else
- {
- return new StoredMemoryMessage(getNextMessageId(), metaData);
- }
+ return new StoredJDBCMessage(getNextMessageId(), metaData);
+
}
@Override
@@ -970,9 +964,9 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
- public boolean isDurable()
+ public MessageDurability getMessageDurability()
{
- return true;
+ return MessageDurability.DEFAULT;
}
}
@@ -1122,7 +1116,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
private final ConnectionWrapper _connWrapper;
private int _storeSizeIncrease;
-
+ private final List<Runnable> _onCommitActions = new ArrayList<>();
protected JDBCTransaction()
{
@@ -1144,16 +1138,23 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
final StoredMessage storedMessage = message.getStoredMessage();
if(storedMessage instanceof StoredJDBCMessage)
{
- try
- {
- ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
- }
- catch (SQLException e)
+ _onCommitActions.add(new Runnable()
{
- throw new StoreException("Exception on enqueuing message into message store" + _messageId, e);
- }
+ @Override
+ public void run()
+ {
+ try
+ {
+ ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Exception on enqueuing message into message store" + _messageId, e);
+ }
+ }
+ });
}
- _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
}
@@ -1170,7 +1171,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
public void commitTran()
{
checkMessageStoreOpen();
-
+ doPreCommitActions();
AbstractJDBCMessageStore.this.commitTran(_connWrapper);
storedSizeChange(_storeSizeIncrease);
}
@@ -1179,17 +1180,26 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
public StoreFuture commitTranAsync()
{
checkMessageStoreOpen();
-
+ doPreCommitActions();
StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
storedSizeChange(_storeSizeIncrease);
return storeFuture;
}
+ private void doPreCommitActions()
+ {
+ for(Runnable action : _onCommitActions)
+ {
+ action.run();
+ }
+ _onCommitActions.clear();
+ }
+
@Override
public void abortTran()
{
checkMessageStoreOpen();
-
+ _onCommitActions.clear();
AbstractJDBCMessageStore.this.abortTran(_connWrapper);
}
@@ -1215,7 +1225,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
private final long _messageId;
private final boolean _isRecovered;
-
private StorableMessageMetaData _metaData;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
private byte[] _data;
@@ -1320,39 +1329,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
- public synchronized StoreFuture flushToStore()
- {
- checkMessageStoreOpen();
-
- Connection conn = null;
- try
- {
- if(!stored())
- {
- conn = newConnection();
-
- store(conn);
-
- conn.commit();
- storedSizeChange(getMetaData().getContentSize());
- }
- }
- catch (SQLException e)
- {
- if(getLogger().isDebugEnabled())
- {
- getLogger().debug("Error when trying to flush message " + _messageId + " to store: " + e);
- }
- throw new StoreException(e);
- }
- finally
- {
- JdbcUtils.closeConnection(conn, AbstractJDBCMessageStore.this.getLogger());
- }
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
checkMessageStoreOpen();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index f4551aae05..f3b2cac52e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -64,6 +64,12 @@ public class MemoryMessageStore implements MessageStore
@Override
public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
+
+ if(message.getStoredMessage() instanceof StoredMemoryMessage)
+ {
+ _messages.putIfAbsent(message.getMessageNumber(), (StoredMemoryMessage) message.getStoredMessage());
+ }
+
Set<Long> messageIds = _localEnqueueMap.get(queue.getId());
if (messageIds == null)
{
@@ -196,31 +202,20 @@ public class MemoryMessageStore implements MessageStore
{
long id = getNextMessageId();
- if(metaData.isPersistent())
+ StoredMemoryMessage<T> storedMemoryMessage = new StoredMemoryMessage<T>(id, metaData)
{
- return new StoredMemoryMessage<T>(id, metaData)
+
+ @Override
+ public void remove()
{
+ _messages.remove(getMessageNumber());
+ super.remove();
+ }
- @Override
- public StoreFuture flushToStore()
- {
- _messages.putIfAbsent(getMessageNumber(), this) ;
- return super.flushToStore();
- }
+ };
- @Override
- public void remove()
- {
- _messages.remove(getMessageNumber());
- super.remove();
- }
+ return storedMemoryMessage;
- };
- }
- else
- {
- return new StoredMemoryMessage<T>(id, metaData);
- }
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java
new file mode 100644
index 0000000000..4dd621bda0
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+public enum MessageDurability
+{
+ DEFAULT(false,true),
+ ALWAYS(true,true),
+ NEVER(false,false);
+
+ private final boolean _nonPersistent;
+ private final boolean _persistent;
+
+ MessageDurability(final boolean nonPersistent, final boolean persistent)
+ {
+ _nonPersistent = nonPersistent;
+ _persistent = persistent;
+ }
+
+ public boolean persist(final boolean persistent)
+ {
+ return persistent ? _persistent : _nonPersistent;
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index f4e1376980..e1043e8807 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -122,12 +122,6 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
return buf;
}
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
-
public T getMetaData()
{
return _metaData;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
index 7909003855..6beb74f4ae 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
@@ -34,7 +34,5 @@ public interface StoredMessage<M extends StorableMessageMetaData>
ByteBuffer getContent(int offsetInMessage, int size);
- StoreFuture flushToStore();
-
void remove();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
index 18b3125641..f5b1aa6ce7 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
@@ -24,7 +24,9 @@ import java.util.UUID;
public interface TransactionLogResource
{
+
String getName();
public UUID getId();
- boolean isDurable();
+ //boolean isDurable();
+ MessageDurability getMessageDurability();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
index 013e9f32ed..65064b015c 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.txn;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
@@ -31,9 +34,6 @@ import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
-import java.util.Collection;
-import java.util.List;
-
/**
* An implementation of ServerTransaction where each enqueue/dequeue
* operation takes place within it own transaction.
@@ -93,7 +93,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
try
{
StoreFuture future;
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -162,7 +162,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
ServerMessage message = entry.getMessage();
TransactionLogResource queue = entry.getOwningResource();
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -205,7 +205,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
try
{
StoreFuture future;
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -237,28 +237,24 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
try
{
- if(message.isPersistent())
+ for(BaseQueue queue : queues)
{
- for(BaseQueue queue : queues)
+ if (queue.getMessageDurability().persist(message.isPersistent()))
{
- if (queue.isDurable())
+ if (_logger.isDebugEnabled())
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
- }
- if (txn == null)
- {
- txn = _messageStore.newTransaction();
- }
-
- txn.enqueueMessage(queue, message);
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
+ }
+ if (txn == null)
+ {
+ txn = _messageStore.newTransaction();
+ }
+ txn.enqueueMessage(queue, message);
- }
}
-
}
+
StoreFuture future;
if (txn != null)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
index ec3b6f69fb..2ecd847719 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.txn;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
@@ -30,9 +33,6 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
-import java.util.Collection;
-import java.util.List;
-
/**
* An implementation of ServerTransaction where each enqueue/dequeue
* operation takes place within it own transaction.
@@ -77,7 +77,7 @@ public class AutoCommitTransaction implements ServerTransaction
Transaction txn = null;
try
{
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -109,7 +109,7 @@ public class AutoCommitTransaction implements ServerTransaction
ServerMessage message = entry.getMessage();
TransactionLogResource queue = entry.getOwningResource();
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -146,7 +146,7 @@ public class AutoCommitTransaction implements ServerTransaction
Transaction txn = null;
try
{
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -175,25 +175,21 @@ public class AutoCommitTransaction implements ServerTransaction
try
{
- if(message.isPersistent())
+ for(BaseQueue queue : queues)
{
- for(BaseQueue queue : queues)
+ if (queue.getMessageDurability().persist(message.isPersistent()))
{
- if (queue.isDurable())
+ if (_logger.isDebugEnabled())
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
- }
- if (txn == null)
- {
- txn = _messageStore.newTransaction();
- }
-
- txn.enqueueMessage(queue, message);
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
+ }
+ if (txn == null)
+ {
+ txn = _messageStore.newTransaction();
+ }
+ txn.enqueueMessage(queue, message);
- }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
index 535ad77ea4..4195c72a28 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
@@ -381,7 +381,7 @@ public class DtxBranch
public boolean isDurable()
{
- return _message.isPersistent() && _resource.isDurable();
+ return _resource.getMessageDurability().persist(_message.isPersistent());
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index b3d013c99f..e371bcdb02 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -20,21 +20,21 @@
*/
package org.apache.qpid.server.txn;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.TransactionLogResource;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.Transaction;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import org.apache.qpid.server.store.TransactionLogResource;
/**
* A concrete implementation of ServerTransaction where enqueue/dequeue
@@ -97,7 +97,7 @@ public class LocalTransaction implements ServerTransaction
_postTransactionActions.add(postTransactionAction);
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
try
{
@@ -129,7 +129,7 @@ public class LocalTransaction implements ServerTransaction
ServerMessage message = entry.getMessage();
TransactionLogResource queue = entry.getOwningResource();
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -186,7 +186,7 @@ public class LocalTransaction implements ServerTransaction
_postTransactionActions.add(postTransactionAction);
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
try
{
@@ -211,29 +211,26 @@ public class LocalTransaction implements ServerTransaction
_postTransactionActions.add(postTransactionAction);
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
- if(message.isPersistent())
+ try
{
- try
+ for(BaseQueue queue : queues)
{
- for(BaseQueue queue : queues)
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
- if(queue.isDurable())
+ if (_logger.isDebugEnabled())
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName() );
- }
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName() );
+ }
- beginTranIfNecessary();
- _transaction.enqueueMessage(queue, message);
+ beginTranIfNecessary();
+ _transaction.enqueueMessage(queue, message);
- }
}
}
- catch(RuntimeException e)
- {
- tidyUpOnError(e);
- }
+ }
+ catch(RuntimeException e)
+ {
+ tidyUpOnError(e);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
index 9fc71e23d7..becf4a073c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
@@ -220,9 +221,9 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
}
@Override
- public boolean isDurable()
+ public MessageDurability getMessageDurability()
{
- return false;
+ return MessageDurability.DEFAULT;
}
};
txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 6b6de5b66a..4ddf421901 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -34,8 +34,6 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.model.ConfiguredObjectFactory;
-import org.apache.qpid.server.model.VirtualHostNode;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
@@ -49,6 +47,7 @@ import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
@@ -56,6 +55,7 @@ import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
index 51b33276ec..be4505bc80 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -20,16 +20,14 @@
*/
package org.apache.qpid.server.store;
-import static org.mockito.Mockito.mock;
-
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.VirtualHost;
@@ -148,12 +146,6 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
return _transactionResource;
}
- @Override
- public boolean isDurable()
- {
- return true;
- }
-
private static class TestMessage implements EnqueueableMessage
{
private final StoredMessage<?> _handle;
@@ -180,4 +172,10 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
return _handle;
}
}
+
+ @Override
+ public MessageDurability getMessageDurability()
+ {
+ return MessageDurability.DEFAULT;
+ }
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index d4b990da07..758799b81f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -28,13 +28,14 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Collections;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
+import org.hamcrest.Description;
+import org.mockito.ArgumentMatcher;
+
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.UUIDGenerator;
@@ -45,9 +46,6 @@ import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.hamcrest.Description;
-import org.mockito.ArgumentMatcher;
-
public abstract class MessageStoreTestCase extends QpidTestCase
{
private MessageStore _store;
@@ -117,8 +115,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
long messageId = 1;
int contentSize = 0;
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
+ enqueueMessage(message, "dummyQ");
MessageHandler handler = mock(MessageHandler.class);
_store.visitMessages(handler);
@@ -127,14 +124,60 @@ public abstract class MessageStoreTestCase extends QpidTestCase
}
+ public void enqueueMessage(final StoredMessage<TestMessageMetaData> message, final String queueName)
+ {
+ Transaction txn = _store.newTransaction();
+ txn.enqueueMessage(new TransactionLogResource()
+ {
+ private final UUID _id = UUID.nameUUIDFromBytes(queueName.getBytes());
+
+ @Override
+ public String getName()
+ {
+ return queueName;
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ @Override
+ public MessageDurability getMessageDurability()
+ {
+ return MessageDurability.DEFAULT;
+ }
+ }, new EnqueueableMessage()
+ {
+ @Override
+ public long getMessageNumber()
+ {
+ return message.getMessageNumber();
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ @Override
+ public StoredMessage getStoredMessage()
+ {
+ return message;
+ }
+ });
+ txn.commitTran();
+ }
+
public void testVisitMessagesAborted() throws Exception
{
int contentSize = 0;
for (int i = 0; i < 3; i++)
{
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
+ enqueueMessage(message, "dummyQ");
}
MessageHandler handler = mock(MessageHandler.class);
@@ -151,16 +194,16 @@ public abstract class MessageStoreTestCase extends QpidTestCase
for (int i = 0; i < 3; i++)
{
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
+ enqueueMessage(message, "dummyQ");
+
}
reopenStore();
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
+ enqueueMessage(message, "dummyQ");
+
assertTrue("Unexpected message id " + message.getMessageNumber(), message.getMessageNumber() >= 4);
}
@@ -170,8 +213,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase
long messageId = 1;
int contentSize = 0;
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message);
@@ -305,8 +346,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase
long messageId = 1;
int contentSize = 0;
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
MessageHandler handler = mock(MessageHandler.class);
_store.visitMessages(handler);
@@ -319,8 +358,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
long messageId = 1;
int contentSize = 0;
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
+ enqueueMessage(message, "dummyQ");
final AtomicReference<StoredMessage<?>> retrievedMessageRef = new AtomicReference<StoredMessage<?>>();
_store.visitMessages(new MessageHandler()
@@ -360,7 +398,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
TransactionLogResource queue = mock(TransactionLogResource.class);
when(queue.getId()).thenReturn(queueId);
when(queue.getName()).thenReturn("testQueue");
- when(queue.isDurable()).thenReturn(true);
+ when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
return queue;
}
@@ -391,8 +429,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase
private EnqueueableMessage createEnqueueableMessage(long messageId1)
{
final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0));
- StoreFuture flushFuture = message1.flushToStore();
- flushFuture.waitForCompletion();
EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1);
return enqueueableMessage1;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
index 8285bdba4c..ec0908efba 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.Transaction;
@@ -53,6 +54,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase
when(_messageStore.newTransaction()).thenReturn(_storeTransaction);
when(_storeTransaction.commitTranAsync()).thenReturn(_future);
when(_queue.isDurable()).thenReturn(true);
+ when(_queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
}
public void testEnqueuePersistentMessagePostCommitNotCalledWhenFutureAlreadyComplete() throws Exception
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
index 3e1183d203..5abbd7352b 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
@@ -20,22 +20,23 @@
*/
package org.apache.qpid.server.txn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.MockMessageInstance;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
import org.apache.qpid.test.utils.QpidTestCase;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* A unit test ensuring that AutoCommitTransaction creates a separate transaction for
* each dequeue/enqueue operation that involves enlistable messages. Verifies
@@ -428,6 +429,7 @@ public class AutoCommitTransactionTest extends QpidTestCase
{
BaseQueue queue = mock(BaseQueue.class);
when(queue.isDurable()).thenReturn(durable);
+ when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER);
return queue;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
index 58c7401c60..c905e52715 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
@@ -20,22 +20,23 @@
*/
package org.apache.qpid.server.txn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.MockMessageInstance;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
import org.apache.qpid.test.utils.QpidTestCase;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* A unit test ensuring that LocalTransactionTest creates a long-lived store transaction
* that spans many dequeue/enqueue operations of enlistable messages. Verifies
@@ -652,6 +653,7 @@ public class LocalTransactionTest extends QpidTestCase
{
BaseQueue queue = mock(BaseQueue.class);
when(queue.isDurable()).thenReturn(durable);
+ when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER);
return queue;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
index 685fea207b..9c05ed564a 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
@@ -42,6 +42,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.NullMessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -379,6 +380,7 @@ public class SynchronousMessageStoreRecovererTest extends TestCase
{
AMQQueue<?> queue = mock(AMQQueue.class);
final UUID queueId = UUID.randomUUID();
+ when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
when(queue.getId()).thenReturn(queueId);
when(queue.getName()).thenReturn("test-queue");
when(_virtualHost.getQueue(queueId)).thenReturn(queue);
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
index f9bf697a0d..dfdc4e230c 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
@@ -102,12 +102,6 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
index 2a4aeb8b7e..ad99d14170 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -102,12 +102,6 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index af1bd00ae9..5adeba66b1 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -334,11 +334,7 @@ public class ServerSessionDelegate extends SessionDelegate
int enqueues = serverSession.enqueue(message, instanceProperties, exchange);
- if(enqueues != 0)
- {
- storeMessage.flushToStore();
- }
- else
+ if(enqueues == 0)
{
if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 200be71187..4b37823898 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -34,8 +34,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -93,7 +91,6 @@ import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
@@ -152,9 +149,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
- // Set of messages being acknowledged in the current transaction
- private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>();
-
private final AtomicBoolean _suspended = new AtomicBoolean(false);
private ServerTransaction _transaction;
@@ -422,7 +416,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
else
{
incrementOutstandingTxnsIfNecessary();
- handle.flushToStore();
}
}
}
@@ -1412,7 +1405,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
finally
{
- _acknowledgedMessages.clear();
+ _ackedMessages.clear();
}
}
@@ -1435,7 +1428,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
finally
{
- _acknowledgedMessages.clear();
+ _ackedMessages.clear();
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index 2ad6fc2ca6..b7d7e5b236 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -114,12 +114,6 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
index 50145e5c6d..c4de7a252b 100755
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
@@ -104,11 +104,6 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData>
return buf;
}
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
public void remove()
{
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
index 0ff22f9d51..e0f0fc98a5 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
@@ -20,15 +20,21 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.UUID;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.store.MessageCounter;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.test.utils.QpidTestCase;
/**
@@ -85,8 +91,9 @@ public class ReferenceCountingTest extends QpidTestCase
final MessageMetaData mmd = new MessageMetaData(info, chb);
StoredMessage storedMessage = _store.addMessage(mmd);
- storedMessage.flushToStore();
-
+ Transaction txn = _store.newTransaction();
+ txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage));
+ txn.commitTran();
AMQMessage message = new AMQMessage(storedMessage);
MessageReference ref = message.newReference();
@@ -151,14 +158,13 @@ public class ReferenceCountingTest extends QpidTestCase
final MessageMetaData mmd = new MessageMetaData(info, chb);
StoredMessage storedMessage = _store.addMessage(mmd);
- storedMessage.flushToStore();
-
+ Transaction txn = _store.newTransaction();
+ txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage));
+ txn.commitTran();
AMQMessage message = new AMQMessage(storedMessage);
MessageReference ref = message.newReference();
- // we call routing complete to set up the handle
- // message.routingComplete(_store, _storeContext, new MessageHandleFactory());
assertEquals(1, getStoreMessageCount());
MessageReference ref2 = message.newReference();
@@ -166,6 +172,54 @@ public class ReferenceCountingTest extends QpidTestCase
assertEquals(1, getStoreMessageCount());
}
+ private TransactionLogResource createTransactionLogResource(final String queueName)
+ {
+ return new TransactionLogResource()
+ {
+ @Override
+ public String getName()
+ {
+ return queueName;
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return UUID.nameUUIDFromBytes(queueName.getBytes());
+ }
+
+ @Override
+ public MessageDurability getMessageDurability()
+ {
+ return MessageDurability.DEFAULT;
+ }
+ };
+ }
+
+ private EnqueueableMessage createEnqueueableMessage(final StoredMessage storedMessage)
+ {
+ return new EnqueueableMessage()
+ {
+ @Override
+ public long getMessageNumber()
+ {
+ return storedMessage.getMessageNumber();
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ @Override
+ public StoredMessage getStoredMessage()
+ {
+ return storedMessage;
+ }
+ };
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(ReferenceCountingTest.class);
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index b8157b8f9e..f6d849bf79 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -261,12 +261,6 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
}
@Override
- public StoreFuture flushToStore()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
index 3944774dfa..2e7e1cf097 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
@@ -159,8 +159,6 @@ public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, Deliv
offset += bareMessageBuf.remaining();
}
- storedMessage.flushToStore();
-
Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference());
MessageReference<Message_1_0> reference = message.newReference();
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
index 3974198f62..bc2d3fe375 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
@@ -111,12 +111,6 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
index 69479b73d6..5412a09b4c 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
@@ -210,12 +210,6 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
index b8073d149b..46b7c322e6 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
@@ -99,12 +99,6 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index d6abe94f30..639c16a71a 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -115,12 +115,6 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index b65181966c..2c40e536be 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -59,6 +59,7 @@ import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -1010,9 +1011,9 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
- public boolean isDurable()
+ public MessageDurability getMessageDurability()
{
- return false;
+ return MessageDurability.NEVER;
}
private class ConsumedMessageInstance implements MessageInstance
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
index 9a24e23407..c32c1d3bba 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
@@ -33,6 +33,17 @@
<td><input type="checkbox" name="durable" id="formAddQueue.durable" value="durable" checked="checked" dojoType="dijit.form.CheckBox" /></td>
</tr>
<tr>
+ <td valign="top"><strong>Persist Messages? </strong></td>
+ <td>
+ <select id="formAddQueue.messageDurability" name="messageDurability" data-dojo-type="dijit.form.FilteringSelect"
+ data-dojo-props="name: 'messageDurability', value: '', searchAttr: 'name', placeHolder: '', value: '', required: false ">
+ <option value="ALWAYS">Always</option>
+ <option value="DEFAULT">Default</option>
+ <option value="NEVER">Never</option>
+ </select>
+ </td>
+ </tr>
+ <tr>
<td valign="top"><strong>Queue Type: </strong></td>
<td>
<input type="radio" id="formAddQueueTypeStandard" name="type" value="standard" checked="checked" dojoType="dijit.form.RadioButton" />
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
index 039437a0bf..6c0924d09b 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
@@ -277,6 +277,7 @@ define(["dojo/_base/xhr",
storeNodes(["name",
"state",
"durable",
+ "messageDurability",
"exclusive",
"owner",
"lifetimePolicy",
@@ -351,6 +352,7 @@ define(["dojo/_base/xhr",
this.exclusive.innerHTML = entities.encode(String(this.queueData[ "exclusive" ]));
this.owner.innerHTML = this.queueData[ "owner" ] ? entities.encode(String(this.queueData[ "owner" ])) : "" ;
this.lifetimePolicy.innerHTML = entities.encode(String(this.queueData[ "lifetimePolicy" ]));
+ this.messageDurability.innerHTML = entities.encode(String(this.queueData[ "messageDurability" ]));
this.alternateExchange.innerHTML = this.queueData[ "alternateExchange" ] ? entities.encode(String(this.queueData[ "alternateExchange" ])) : "" ;
this.queueDepthMessages.innerHTML = entities.encode(String(this.queueData["queueDepthMessages"]));
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
index 89b7327957..8f2bf3364d 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
@@ -33,6 +33,10 @@
<div class="durable" style="float:left;"></div>
</div>
<div style="clear:both">
+ <div class="formLabel-labelCell" style="float:left; width: 150px;">Persist Messages:</div>
+ <div class="messageDurability" style="float:left;"></div>
+ </div>
+ <div style="clear:both">
<div class="formLabel-labelCell" style="float:left; width: 150px;">Exclusive:</div>
<div class="exclusive" style="float:left;"></div>
</div>
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index a37b532617..3316ae801b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -118,7 +118,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Immediate message prefetch default. */
public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
- private final boolean _delareQueues =
+ private final boolean _declareQueues =
Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "true"));
private final boolean _declareExchanges =
@@ -2871,7 +2871,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
declareExchange(amqd, nowait);
}
- if ((_delareQueues || amqd.isNameRequired()) && !amqd.neverDeclare())
+ if ((_declareQueues || amqd.isNameRequired()) && !amqd.neverDeclare())
{
declareQueue(amqd, consumer.isNoLocal(), nowait);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index cb8f81f68f..46473900c0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -24,6 +24,7 @@ import static org.apache.qpid.transport.Option.UNRELIABLE;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
@@ -37,6 +38,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jms.Destination;
import javax.jms.JMSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.Binding;
@@ -57,29 +61,9 @@ import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ExchangeBoundResult;
-import org.apache.qpid.transport.ExchangeQueryResult;
-import org.apache.qpid.transport.ExecutionErrorCode;
-import org.apache.qpid.transport.ExecutionException;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.QueueQueryResult;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.RangeSetFactory;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.SessionListener;
-import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.*;
import org.apache.qpid.util.Serial;
import org.apache.qpid.util.Strings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This is a 0.10 Session
@@ -362,19 +346,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
final AMQDestination destination, final boolean nowait)
throws AMQException
{
- if (destination.getDestSyntax() == DestSyntax.BURL)
+ if (destination == null || destination.getDestSyntax() == DestSyntax.BURL)
{
Map args = FieldTableSupport.convertToMap(arguments);
- for (AMQShortString rk: destination.getBindingKeys())
+ if(destination != null)
{
- _logger.debug("Binding queue : " + queueName.toString() +
- " exchange: " + exchangeName.toString() +
- " using binding key " + rk.asString());
- getQpidSession().exchangeBind(queueName.toString(),
- exchangeName.toString(),
- rk.toString(),
- args);
+ for (AMQShortString rk: destination.getBindingKeys())
+ {
+ doSendQueueBind(queueName, exchangeName, args, rk);
+ }
+ if(!Arrays.asList(destination.getBindingKeys()).contains(routingKey))
+ {
+ doSendQueueBind(queueName, exchangeName, args, routingKey);
+ }
+ }
+ else
+ {
+ doSendQueueBind(queueName, exchangeName, args, routingKey);
}
}
else
@@ -420,6 +409,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
+ private void doSendQueueBind(final AMQShortString queueName,
+ final AMQShortString exchangeName,
+ final Map args,
+ final AMQShortString rk)
+ {
+ _logger.debug("Binding queue : " + queueName.toString() +
+ " exchange: " + exchangeName.toString() +
+ " using binding key " + rk.asString());
+ getQpidSession().exchangeBind(queueName.toString(),
+ exchangeName.toString(),
+ rk.toString(),
+ args);
+ }
+
/**
* Close this session.
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
new file mode 100644
index 0000000000..fe86e9d41f
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
@@ -0,0 +1,216 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class QueueMessageDurabilityTest extends QpidBrokerTestCase
+{
+
+ private static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
+ private static final String DURABLE_ALWAYS_PERSIST_NAME = "DURABLE_QUEUE_ALWAYS_PERSIST";
+ private static final String DURABLE_NEVER_PERSIST_NAME = "DURABLE_QUEUE_NEVER_PERSIST";
+ private static final String DURABLE_DEFAULT_PERSIST_NAME = "DURABLE_QUEUE_DEFAULT_PERSIST";
+ private static final String NONDURABLE_ALWAYS_PERSIST_NAME = "NONDURABLE_QUEUE_ALWAYS_PERSIST";
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ Connection conn = getConnection();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQSession amqSession = (AMQSession) session;
+
+ Map<String,Object> arguments = new HashMap<>();
+ arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.ALWAYS.name());
+ amqSession.createQueue(new AMQShortString(DURABLE_ALWAYS_PERSIST_NAME), false, true, false, arguments);
+
+ arguments = new HashMap<>();
+ arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.NEVER.name());
+ amqSession.createQueue(new AMQShortString(DURABLE_NEVER_PERSIST_NAME), false, true, false, arguments);
+
+ arguments = new HashMap<>();
+ arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.DEFAULT.name());
+ amqSession.createQueue(new AMQShortString(DURABLE_DEFAULT_PERSIST_NAME), false, true, false, arguments);
+
+ arguments = new HashMap<>();
+ arguments.put(QPID_MESSAGE_DURABILITY,MessageDurability.ALWAYS.name());
+ amqSession.createQueue(new AMQShortString(NONDURABLE_ALWAYS_PERSIST_NAME), false, false, false, arguments);
+
+ amqSession.bindQueue(AMQShortString.valueOf(DURABLE_ALWAYS_PERSIST_NAME),
+ AMQShortString.valueOf("Y.*.*.*"),
+ null,
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+ null);
+
+ amqSession.bindQueue(AMQShortString.valueOf(DURABLE_NEVER_PERSIST_NAME),
+ AMQShortString.valueOf("*.Y.*.*"),
+ null,
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+ null);
+
+ amqSession.bindQueue(AMQShortString.valueOf(DURABLE_DEFAULT_PERSIST_NAME),
+ AMQShortString.valueOf("*.*.Y.*"),
+ null,
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+ null);
+
+ amqSession.bindQueue(AMQShortString.valueOf(NONDURABLE_ALWAYS_PERSIST_NAME),
+ AMQShortString.valueOf("*.*.*.Y"),
+ null,
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+ null);
+ }
+
+ public void testSendPersistentMessageToAll() throws Exception
+ {
+ Connection conn = getConnection();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ conn.start();
+ producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test"));
+ session.commit();
+
+ AMQSession amqSession = (AMQSession) session;
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+ restartBroker();
+
+ conn = getConnection();
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ amqSession = (AMQSession) session;
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+
+ assertFalse(amqSession.isQueueBound((AMQDestination) session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+ }
+
+
+ public void testSendNonPersistentMessageToAll() throws Exception
+ {
+ Connection conn = getConnection();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ conn.start();
+ producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test"));
+ session.commit();
+
+ AMQSession amqSession = (AMQSession) session;
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+ restartBroker();
+
+ conn = getConnection();
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ amqSession = (AMQSession) session;
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+
+ assertFalse(amqSession.isQueueBound((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+ }
+
+ public void testNonPersistentContentRetained() throws Exception
+ {
+ Connection conn = getConnection();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ conn.start();
+ producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1"));
+ producer.send(session.createTopic("Y.N.Y.Y"), session.createTextMessage("test2"));
+ session.commit();
+ MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
+ Message msg = consumer.receive(1000l);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("test2", ((TextMessage) msg).getText());
+ session.rollback();
+ restartBroker();
+ conn = getConnection();
+ conn.start();
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ AMQSession amqSession = (AMQSession) session;
+ assertEquals(1, amqSession.getQueueDepth((AMQDestination) session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+ consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
+ msg = consumer.receive(1000l);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("test2", ((TextMessage)msg).getText());
+ session.commit();
+ }
+
+ public void testPersistentContentRetainedOnTransientQueue() throws Exception
+ {
+ setTestClientSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false");
+ Connection conn = getConnection();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ conn.start();
+ producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1"));
+ session.commit();
+ MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_DEFAULT_PERSIST_NAME));
+ Message msg = consumer.receive(1000l);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("test1", ((TextMessage)msg).getText());
+ session.commit();
+ System.gc();
+ consumer = session.createConsumer(session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME));
+ msg = consumer.receive(1000l);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("test1", ((TextMessage)msg).getText());
+ session.commit();
+ }
+
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
index 2c38a04895..8550c804a6 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
-import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
@@ -34,6 +33,8 @@ import java.util.Map;
import javax.security.auth.Subject;
+import org.codehaus.jackson.map.ObjectMapper;
+
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -77,7 +78,6 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhostnode.JsonVirtualHostNode;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
-import org.codehaus.jackson.map.ObjectMapper;
/**
*
@@ -604,7 +604,6 @@ public class VirtualHostMessageStoreTest extends QpidTestCase
MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis());
final StoredMessage<MessageMetaData> storedMessage = _virtualHost.getMessageStore().addMessage(mmd);
- storedMessage.flushToStore();
final AMQMessage currentMessage = new AMQMessage(storedMessage);
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index 9e9708bf3c..3d87da11c8 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -193,3 +193,7 @@ org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide
// Exclude java broker specific behavior allowing queue re-bind to topic exchanges on 0.8/0-10 paths
org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange
+
+// Tests queue message durability settings which are a Java Broker specific feature
+org.apache.qpid.server.queue.QueueMessageDurabilityTest#*
+
diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes
index a50a8bd599..ff4485d599 100644
--- a/qpid/java/test-profiles/JavaTransientExcludes
+++ b/qpid/java/test-profiles/JavaTransientExcludes
@@ -33,6 +33,8 @@ org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash
org.apache.qpid.test.unit.xa.TopicTest#testDurSubCrash
org.apache.qpid.test.unit.xa.TopicTest#testRecover
+org.apache.qpid.server.queue.QueueMessageDurabilityTest#*
+
org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessagePersistence
org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessageRemoval
org.apache.qpid.server.store.VirtualHostMessageStoreTest#testBindingPersistence