summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java43
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java5
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java12
-rw-r--r--qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java39
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java88
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java35
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java42
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java6
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java2
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java4
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java38
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java36
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java2
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java49
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java16
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java80
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java16
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java16
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java11
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java6
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java66
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java6
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java2
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java6
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java6
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java5
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html11
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js2
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java63
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java216
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java5
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes4
-rw-r--r--qpid/java/test-profiles/JavaTransientExcludes2
49 files changed, 672 insertions, 349 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index cf187fe1e9..be592a0d42 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -55,7 +55,6 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMemoryMessage;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.Xid;
@@ -123,14 +122,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
long newMessageId = getNextMessageId();
- if (metaData.isPersistent())
- {
- return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData);
- }
- else
- {
- return new StoredMemoryMessage<T>(newMessageId, metaData);
- }
+ return new StoredBDBMessage<T>(newMessageId, metaData);
}
public long getNextMessageId()
@@ -1049,7 +1041,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
protected abstract Logger getLogger();
- private class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+ class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
{
private final long _messageId;
@@ -1177,8 +1169,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- @Override
- public synchronized StoreFuture flushToStore()
+ synchronized StoreFuture flushToStore()
{
if(!stored())
{
@@ -1229,6 +1220,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
private Transaction _txn;
private int _storeSizeIncrease;
+ private final List<Runnable> _onCommitActions = new ArrayList<>();
private BDBTransaction() throws StoreException
{
@@ -1250,8 +1242,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
if(message.getStoredMessage() instanceof StoredBDBMessage)
{
final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
- storedMessage.store(_txn);
- _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ _onCommitActions.add(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ storedMessage.store(_txn);
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ }
+ });
+
}
AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
@@ -1269,16 +1269,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public void commitTran() throws StoreException
{
checkMessageStoreOpen();
-
+ doPreCommitActions();
AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
}
+ private void doPreCommitActions()
+ {
+ for(Runnable action : _onCommitActions)
+ {
+ action.run();
+ }
+ _onCommitActions.clear();
+ }
+
@Override
public StoreFuture commitTranAsync() throws StoreException
{
checkMessageStoreOpen();
-
+ doPreCommitActions();
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
return AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
}
@@ -1287,7 +1296,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public void abortTran() throws StoreException
{
checkMessageStoreOpen();
-
+ _onCommitActions.clear();
AbstractBDBMessageStore.this.abortTran(_txn);
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
index 8fb011152c..1f4cf45ce1 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
@@ -27,6 +27,7 @@ import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -131,9 +132,9 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
}
@Override
- public boolean isDurable()
+ public MessageDurability getMessageDurability()
{
- return true;
+ return MessageDurability.DEFAULT;
}
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index a96dc8b142..bace585e56 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -51,9 +54,6 @@ import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.util.FileUtils;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* Subclass of MessageStoreTestCase which runs the standard tests from the superclass against
* the BDB Store as well as additional tests specific to the BDB store-implementation.
@@ -113,7 +113,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
storedMessage_0_8.addContent(0, firstContentBytes_0_8);
storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8);
- storedMessage_0_8.flushToStore();
+ ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
/*
* Create and insert a 0-10 message (metadata and content)
@@ -132,7 +132,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
long messageid_0_10 = storedMessage_0_10.getMessageNumber();
storedMessage_0_10.addContent(0, completeContentBody_0_10);
- storedMessage_0_10.flushToStore();
+ ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_10).flushToStore();
/*
* reload the store only (read-only)
@@ -387,7 +387,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
storedMessage_0_8.addContent(0, chunk1);
- storedMessage_0_8.flushToStore();
+ ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
return storedMessage_0_8;
}
diff --git a/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java b/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java
index 59fe72e377..c9c3afccba 100644
--- a/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java
+++ b/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java
@@ -24,7 +24,7 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-@Retention(RetentionPolicy.SOURCE)
+@Retention(RetentionPolicy.CLASS)
@Target(ElementType.TYPE)
public @interface PluggableService
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index fdc2fa90a5..4f2327adee 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -235,12 +235,6 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
}
@Override
- public StoreFuture flushToStore()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 21112b6309..db53f840de 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.model;
import java.util.Collection;
import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.server.store.MessageDurability;
@ManagedObject( defaultType = "standard" )
public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@@ -35,6 +36,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
String ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "alertThresholdQueueDepthMessages";
String ALTERNATE_EXCHANGE = "alternateExchange";
String EXCLUSIVE = "exclusive";
+ String MESSAGE_DURABILITY = "messageDurability";
String MESSAGE_GROUP_KEY = "messageGroupKey";
String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups";
String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup";
@@ -130,6 +132,10 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@ManagedAttribute( defaultValue = "${queue.alertRepeatGap}")
long getAlertRepeatGap();
+ @ManagedAttribute( defaultValue = "DEFAULT" )
+ MessageDurability getMessageDurability();
+
+
//children
Collection<? extends Binding> getBindings();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 479029093f..2aeca6f45f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -77,6 +77,7 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
@@ -175,6 +176,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@ManagedAttributeField
private ExclusivityPolicy _exclusive;
+ @ManagedAttributeField
+ private MessageDurability _messageDurability;
+
private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name
private final Set<NotificationCheck> _notificationChecks =
@@ -245,12 +249,38 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
super.onCreate();
+ if(isDurable() && (getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+ || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END))
+ {
+ Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(),
+ new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ setAttribute(AbstractConfiguredObject.DURABLE, true, false);
+ return null;
+ }
+ });
+ }
- if (isDurable() && !(getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
- || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END))
+ if (isDurable())
{
_virtualHost.getDurableConfigurationStore().create(asObjectRecord());
}
+ else if(getMessageDurability() != MessageDurability.NEVER)
+ {
+ Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(),
+ new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ setAttribute(Queue.MESSAGE_DURABILITY, getMessageDurability(), MessageDurability.NEVER);
+ return null;
+ }
+ });
+ }
_recovering.set(false);
}
@@ -510,6 +540,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
}
+ @Override
+ public final MessageDurability getMessageDurability()
+ {
+ return _messageDurability;
+ }
@Override
public Collection<String> getAvailableAttributes()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index 15df952e61..37e82b0771 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -52,6 +52,9 @@ public class QueueArgumentsConverter
public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
public static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group";
+
+ public static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
+
public static final String QPID_TRACE_EXCLUDE = "qpid.trace.exclude";
public static final String QPID_TRACE_ID = "qpid.trace.id";
@@ -91,6 +94,7 @@ public class QueueArgumentsConverter
ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_MESSAGE_GROUP_ARG, Queue.MESSAGE_GROUP_DEFAULT_GROUP);
ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL);
+ ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY);
}
@@ -138,7 +142,12 @@ public class QueueArgumentsConverter
{
if(modelArguments.containsKey(entry.getValue()))
{
- wireArguments.put(entry.getKey(), modelArguments.get(entry.getValue()));
+ Object value = modelArguments.get(entry.getValue());
+ if(value instanceof Enum)
+ {
+ value = ((Enum) value).name();
+ }
+ wireArguments.put(entry.getKey(), value);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 24866e4e2e..f2f85e1387 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -447,14 +447,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
checkMessageStoreOpen();
- if(metaData.isPersistent())
- {
- return new StoredJDBCMessage(getNextMessageId(), metaData);
- }
- else
- {
- return new StoredMemoryMessage(getNextMessageId(), metaData);
- }
+ return new StoredJDBCMessage(getNextMessageId(), metaData);
+
}
@Override
@@ -970,9 +964,9 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
- public boolean isDurable()
+ public MessageDurability getMessageDurability()
{
- return true;
+ return MessageDurability.DEFAULT;
}
}
@@ -1122,7 +1116,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
private final ConnectionWrapper _connWrapper;
private int _storeSizeIncrease;
-
+ private final List<Runnable> _onCommitActions = new ArrayList<>();
protected JDBCTransaction()
{
@@ -1144,16 +1138,23 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
final StoredMessage storedMessage = message.getStoredMessage();
if(storedMessage instanceof StoredJDBCMessage)
{
- try
- {
- ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
- }
- catch (SQLException e)
+ _onCommitActions.add(new Runnable()
{
- throw new StoreException("Exception on enqueuing message into message store" + _messageId, e);
- }
+ @Override
+ public void run()
+ {
+ try
+ {
+ ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Exception on enqueuing message into message store" + _messageId, e);
+ }
+ }
+ });
}
- _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
}
@@ -1170,7 +1171,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
public void commitTran()
{
checkMessageStoreOpen();
-
+ doPreCommitActions();
AbstractJDBCMessageStore.this.commitTran(_connWrapper);
storedSizeChange(_storeSizeIncrease);
}
@@ -1179,17 +1180,26 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
public StoreFuture commitTranAsync()
{
checkMessageStoreOpen();
-
+ doPreCommitActions();
StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
storedSizeChange(_storeSizeIncrease);
return storeFuture;
}
+ private void doPreCommitActions()
+ {
+ for(Runnable action : _onCommitActions)
+ {
+ action.run();
+ }
+ _onCommitActions.clear();
+ }
+
@Override
public void abortTran()
{
checkMessageStoreOpen();
-
+ _onCommitActions.clear();
AbstractJDBCMessageStore.this.abortTran(_connWrapper);
}
@@ -1215,7 +1225,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
private final long _messageId;
private final boolean _isRecovered;
-
private StorableMessageMetaData _metaData;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
private byte[] _data;
@@ -1320,39 +1329,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
- public synchronized StoreFuture flushToStore()
- {
- checkMessageStoreOpen();
-
- Connection conn = null;
- try
- {
- if(!stored())
- {
- conn = newConnection();
-
- store(conn);
-
- conn.commit();
- storedSizeChange(getMetaData().getContentSize());
- }
- }
- catch (SQLException e)
- {
- if(getLogger().isDebugEnabled())
- {
- getLogger().debug("Error when trying to flush message " + _messageId + " to store: " + e);
- }
- throw new StoreException(e);
- }
- finally
- {
- JdbcUtils.closeConnection(conn, AbstractJDBCMessageStore.this.getLogger());
- }
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
checkMessageStoreOpen();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index f4551aae05..f3b2cac52e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -64,6 +64,12 @@ public class MemoryMessageStore implements MessageStore
@Override
public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
+
+ if(message.getStoredMessage() instanceof StoredMemoryMessage)
+ {
+ _messages.putIfAbsent(message.getMessageNumber(), (StoredMemoryMessage) message.getStoredMessage());
+ }
+
Set<Long> messageIds = _localEnqueueMap.get(queue.getId());
if (messageIds == null)
{
@@ -196,31 +202,20 @@ public class MemoryMessageStore implements MessageStore
{
long id = getNextMessageId();
- if(metaData.isPersistent())
+ StoredMemoryMessage<T> storedMemoryMessage = new StoredMemoryMessage<T>(id, metaData)
{
- return new StoredMemoryMessage<T>(id, metaData)
+
+ @Override
+ public void remove()
{
+ _messages.remove(getMessageNumber());
+ super.remove();
+ }
- @Override
- public StoreFuture flushToStore()
- {
- _messages.putIfAbsent(getMessageNumber(), this) ;
- return super.flushToStore();
- }
+ };
- @Override
- public void remove()
- {
- _messages.remove(getMessageNumber());
- super.remove();
- }
+ return storedMemoryMessage;
- };
- }
- else
- {
- return new StoredMemoryMessage<T>(id, metaData);
- }
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java
new file mode 100644
index 0000000000..4dd621bda0
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+public enum MessageDurability
+{
+ DEFAULT(false,true),
+ ALWAYS(true,true),
+ NEVER(false,false);
+
+ private final boolean _nonPersistent;
+ private final boolean _persistent;
+
+ MessageDurability(final boolean nonPersistent, final boolean persistent)
+ {
+ _nonPersistent = nonPersistent;
+ _persistent = persistent;
+ }
+
+ public boolean persist(final boolean persistent)
+ {
+ return persistent ? _persistent : _nonPersistent;
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index f4e1376980..e1043e8807 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -122,12 +122,6 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
return buf;
}
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
-
public T getMetaData()
{
return _metaData;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
index 7909003855..6beb74f4ae 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
@@ -34,7 +34,5 @@ public interface StoredMessage<M extends StorableMessageMetaData>
ByteBuffer getContent(int offsetInMessage, int size);
- StoreFuture flushToStore();
-
void remove();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
index 18b3125641..f5b1aa6ce7 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
@@ -24,7 +24,9 @@ import java.util.UUID;
public interface TransactionLogResource
{
+
String getName();
public UUID getId();
- boolean isDurable();
+ //boolean isDurable();
+ MessageDurability getMessageDurability();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
index 013e9f32ed..65064b015c 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.txn;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
@@ -31,9 +34,6 @@ import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
-import java.util.Collection;
-import java.util.List;
-
/**
* An implementation of ServerTransaction where each enqueue/dequeue
* operation takes place within it own transaction.
@@ -93,7 +93,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
try
{
StoreFuture future;
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -162,7 +162,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
ServerMessage message = entry.getMessage();
TransactionLogResource queue = entry.getOwningResource();
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -205,7 +205,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
try
{
StoreFuture future;
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -237,28 +237,24 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
try
{
- if(message.isPersistent())
+ for(BaseQueue queue : queues)
{
- for(BaseQueue queue : queues)
+ if (queue.getMessageDurability().persist(message.isPersistent()))
{
- if (queue.isDurable())
+ if (_logger.isDebugEnabled())
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
- }
- if (txn == null)
- {
- txn = _messageStore.newTransaction();
- }
-
- txn.enqueueMessage(queue, message);
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
+ }
+ if (txn == null)
+ {
+ txn = _messageStore.newTransaction();
+ }
+ txn.enqueueMessage(queue, message);
- }
}
-
}
+
StoreFuture future;
if (txn != null)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
index ec3b6f69fb..2ecd847719 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.txn;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
@@ -30,9 +33,6 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
-import java.util.Collection;
-import java.util.List;
-
/**
* An implementation of ServerTransaction where each enqueue/dequeue
* operation takes place within it own transaction.
@@ -77,7 +77,7 @@ public class AutoCommitTransaction implements ServerTransaction
Transaction txn = null;
try
{
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -109,7 +109,7 @@ public class AutoCommitTransaction implements ServerTransaction
ServerMessage message = entry.getMessage();
TransactionLogResource queue = entry.getOwningResource();
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -146,7 +146,7 @@ public class AutoCommitTransaction implements ServerTransaction
Transaction txn = null;
try
{
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -175,25 +175,21 @@ public class AutoCommitTransaction implements ServerTransaction
try
{
- if(message.isPersistent())
+ for(BaseQueue queue : queues)
{
- for(BaseQueue queue : queues)
+ if (queue.getMessageDurability().persist(message.isPersistent()))
{
- if (queue.isDurable())
+ if (_logger.isDebugEnabled())
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
- }
- if (txn == null)
- {
- txn = _messageStore.newTransaction();
- }
-
- txn.enqueueMessage(queue, message);
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
+ }
+ if (txn == null)
+ {
+ txn = _messageStore.newTransaction();
+ }
+ txn.enqueueMessage(queue, message);
- }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
index 535ad77ea4..4195c72a28 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
@@ -381,7 +381,7 @@ public class DtxBranch
public boolean isDurable()
{
- return _message.isPersistent() && _resource.isDurable();
+ return _resource.getMessageDurability().persist(_message.isPersistent());
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index b3d013c99f..e371bcdb02 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -20,21 +20,21 @@
*/
package org.apache.qpid.server.txn;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.TransactionLogResource;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.Transaction;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import org.apache.qpid.server.store.TransactionLogResource;
/**
* A concrete implementation of ServerTransaction where enqueue/dequeue
@@ -97,7 +97,7 @@ public class LocalTransaction implements ServerTransaction
_postTransactionActions.add(postTransactionAction);
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
try
{
@@ -129,7 +129,7 @@ public class LocalTransaction implements ServerTransaction
ServerMessage message = entry.getMessage();
TransactionLogResource queue = entry.getOwningResource();
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -186,7 +186,7 @@ public class LocalTransaction implements ServerTransaction
_postTransactionActions.add(postTransactionAction);
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
try
{
@@ -211,29 +211,26 @@ public class LocalTransaction implements ServerTransaction
_postTransactionActions.add(postTransactionAction);
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
- if(message.isPersistent())
+ try
{
- try
+ for(BaseQueue queue : queues)
{
- for(BaseQueue queue : queues)
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
- if(queue.isDurable())
+ if (_logger.isDebugEnabled())
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName() );
- }
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName() );
+ }
- beginTranIfNecessary();
- _transaction.enqueueMessage(queue, message);
+ beginTranIfNecessary();
+ _transaction.enqueueMessage(queue, message);
- }
}
}
- catch(RuntimeException e)
- {
- tidyUpOnError(e);
- }
+ }
+ catch(RuntimeException e)
+ {
+ tidyUpOnError(e);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
index 9fc71e23d7..becf4a073c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
@@ -220,9 +221,9 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
}
@Override
- public boolean isDurable()
+ public MessageDurability getMessageDurability()
{
- return false;
+ return MessageDurability.DEFAULT;
}
};
txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 6b6de5b66a..4ddf421901 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -34,8 +34,6 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.model.ConfiguredObjectFactory;
-import org.apache.qpid.server.model.VirtualHostNode;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
@@ -49,6 +47,7 @@ import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
@@ -56,6 +55,7 @@ import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
index 51b33276ec..be4505bc80 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -20,16 +20,14 @@
*/
package org.apache.qpid.server.store;
-import static org.mockito.Mockito.mock;
-
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.VirtualHost;
@@ -148,12 +146,6 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
return _transactionResource;
}
- @Override
- public boolean isDurable()
- {
- return true;
- }
-
private static class TestMessage implements EnqueueableMessage
{
private final StoredMessage<?> _handle;
@@ -180,4 +172,10 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
return _handle;
}
}
+
+ @Override
+ public MessageDurability getMessageDurability()
+ {
+ return MessageDurability.DEFAULT;
+ }
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index d4b990da07..758799b81f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -28,13 +28,14 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Collections;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
+import org.hamcrest.Description;
+import org.mockito.ArgumentMatcher;
+
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.UUIDGenerator;
@@ -45,9 +46,6 @@ import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.hamcrest.Description;
-import org.mockito.ArgumentMatcher;
-
public abstract class MessageStoreTestCase extends QpidTestCase
{
private MessageStore _store;
@@ -117,8 +115,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
long messageId = 1;
int contentSize = 0;
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
+ enqueueMessage(message, "dummyQ");
MessageHandler handler = mock(MessageHandler.class);
_store.visitMessages(handler);
@@ -127,14 +124,60 @@ public abstract class MessageStoreTestCase extends QpidTestCase
}
+ public void enqueueMessage(final StoredMessage<TestMessageMetaData> message, final String queueName)
+ {
+ Transaction txn = _store.newTransaction();
+ txn.enqueueMessage(new TransactionLogResource()
+ {
+ private final UUID _id = UUID.nameUUIDFromBytes(queueName.getBytes());
+
+ @Override
+ public String getName()
+ {
+ return queueName;
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ @Override
+ public MessageDurability getMessageDurability()
+ {
+ return MessageDurability.DEFAULT;
+ }
+ }, new EnqueueableMessage()
+ {
+ @Override
+ public long getMessageNumber()
+ {
+ return message.getMessageNumber();
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ @Override
+ public StoredMessage getStoredMessage()
+ {
+ return message;
+ }
+ });
+ txn.commitTran();
+ }
+
public void testVisitMessagesAborted() throws Exception
{
int contentSize = 0;
for (int i = 0; i < 3; i++)
{
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
+ enqueueMessage(message, "dummyQ");
}
MessageHandler handler = mock(MessageHandler.class);
@@ -151,16 +194,16 @@ public abstract class MessageStoreTestCase extends QpidTestCase
for (int i = 0; i < 3; i++)
{
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
+ enqueueMessage(message, "dummyQ");
+
}
reopenStore();
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
+ enqueueMessage(message, "dummyQ");
+
assertTrue("Unexpected message id " + message.getMessageNumber(), message.getMessageNumber() >= 4);
}
@@ -170,8 +213,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase
long messageId = 1;
int contentSize = 0;
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message);
@@ -305,8 +346,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase
long messageId = 1;
int contentSize = 0;
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
MessageHandler handler = mock(MessageHandler.class);
_store.visitMessages(handler);
@@ -319,8 +358,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
long messageId = 1;
int contentSize = 0;
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
+ enqueueMessage(message, "dummyQ");
final AtomicReference<StoredMessage<?>> retrievedMessageRef = new AtomicReference<StoredMessage<?>>();
_store.visitMessages(new MessageHandler()
@@ -360,7 +398,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
TransactionLogResource queue = mock(TransactionLogResource.class);
when(queue.getId()).thenReturn(queueId);
when(queue.getName()).thenReturn("testQueue");
- when(queue.isDurable()).thenReturn(true);
+ when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
return queue;
}
@@ -391,8 +429,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase
private EnqueueableMessage createEnqueueableMessage(long messageId1)
{
final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0));
- StoreFuture flushFuture = message1.flushToStore();
- flushFuture.waitForCompletion();
EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1);
return enqueueableMessage1;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
index 8285bdba4c..ec0908efba 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.Transaction;
@@ -53,6 +54,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase
when(_messageStore.newTransaction()).thenReturn(_storeTransaction);
when(_storeTransaction.commitTranAsync()).thenReturn(_future);
when(_queue.isDurable()).thenReturn(true);
+ when(_queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
}
public void testEnqueuePersistentMessagePostCommitNotCalledWhenFutureAlreadyComplete() throws Exception
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
index 3e1183d203..5abbd7352b 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
@@ -20,22 +20,23 @@
*/
package org.apache.qpid.server.txn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.MockMessageInstance;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
import org.apache.qpid.test.utils.QpidTestCase;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* A unit test ensuring that AutoCommitTransaction creates a separate transaction for
* each dequeue/enqueue operation that involves enlistable messages. Verifies
@@ -428,6 +429,7 @@ public class AutoCommitTransactionTest extends QpidTestCase
{
BaseQueue queue = mock(BaseQueue.class);
when(queue.isDurable()).thenReturn(durable);
+ when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER);
return queue;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
index 58c7401c60..c905e52715 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
@@ -20,22 +20,23 @@
*/
package org.apache.qpid.server.txn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.MockMessageInstance;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
import org.apache.qpid.test.utils.QpidTestCase;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* A unit test ensuring that LocalTransactionTest creates a long-lived store transaction
* that spans many dequeue/enqueue operations of enlistable messages. Verifies
@@ -652,6 +653,7 @@ public class LocalTransactionTest extends QpidTestCase
{
BaseQueue queue = mock(BaseQueue.class);
when(queue.isDurable()).thenReturn(durable);
+ when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER);
return queue;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
index 685fea207b..9c05ed564a 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
@@ -42,6 +42,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.NullMessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -379,6 +380,7 @@ public class SynchronousMessageStoreRecovererTest extends TestCase
{
AMQQueue<?> queue = mock(AMQQueue.class);
final UUID queueId = UUID.randomUUID();
+ when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
when(queue.getId()).thenReturn(queueId);
when(queue.getName()).thenReturn("test-queue");
when(_virtualHost.getQueue(queueId)).thenReturn(queue);
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
index f9bf697a0d..dfdc4e230c 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
@@ -102,12 +102,6 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
index 2a4aeb8b7e..ad99d14170 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -102,12 +102,6 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index af1bd00ae9..5adeba66b1 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -334,11 +334,7 @@ public class ServerSessionDelegate extends SessionDelegate
int enqueues = serverSession.enqueue(message, instanceProperties, exchange);
- if(enqueues != 0)
- {
- storeMessage.flushToStore();
- }
- else
+ if(enqueues == 0)
{
if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 200be71187..4b37823898 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -34,8 +34,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -93,7 +91,6 @@ import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
@@ -152,9 +149,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
- // Set of messages being acknowledged in the current transaction
- private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>();
-
private final AtomicBoolean _suspended = new AtomicBoolean(false);
private ServerTransaction _transaction;
@@ -422,7 +416,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
else
{
incrementOutstandingTxnsIfNecessary();
- handle.flushToStore();
}
}
}
@@ -1412,7 +1405,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
finally
{
- _acknowledgedMessages.clear();
+ _ackedMessages.clear();
}
}
@@ -1435,7 +1428,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
finally
{
- _acknowledgedMessages.clear();
+ _ackedMessages.clear();
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index 2ad6fc2ca6..b7d7e5b236 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -114,12 +114,6 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
index 50145e5c6d..c4de7a252b 100755
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
@@ -104,11 +104,6 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData>
return buf;
}
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
public void remove()
{
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
index 0ff22f9d51..e0f0fc98a5 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
@@ -20,15 +20,21 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.UUID;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.store.MessageCounter;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.test.utils.QpidTestCase;
/**
@@ -85,8 +91,9 @@ public class ReferenceCountingTest extends QpidTestCase
final MessageMetaData mmd = new MessageMetaData(info, chb);
StoredMessage storedMessage = _store.addMessage(mmd);
- storedMessage.flushToStore();
-
+ Transaction txn = _store.newTransaction();
+ txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage));
+ txn.commitTran();
AMQMessage message = new AMQMessage(storedMessage);
MessageReference ref = message.newReference();
@@ -151,14 +158,13 @@ public class ReferenceCountingTest extends QpidTestCase
final MessageMetaData mmd = new MessageMetaData(info, chb);
StoredMessage storedMessage = _store.addMessage(mmd);
- storedMessage.flushToStore();
-
+ Transaction txn = _store.newTransaction();
+ txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage));
+ txn.commitTran();
AMQMessage message = new AMQMessage(storedMessage);
MessageReference ref = message.newReference();
- // we call routing complete to set up the handle
- // message.routingComplete(_store, _storeContext, new MessageHandleFactory());
assertEquals(1, getStoreMessageCount());
MessageReference ref2 = message.newReference();
@@ -166,6 +172,54 @@ public class ReferenceCountingTest extends QpidTestCase
assertEquals(1, getStoreMessageCount());
}
+ private TransactionLogResource createTransactionLogResource(final String queueName)
+ {
+ return new TransactionLogResource()
+ {
+ @Override
+ public String getName()
+ {
+ return queueName;
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return UUID.nameUUIDFromBytes(queueName.getBytes());
+ }
+
+ @Override
+ public MessageDurability getMessageDurability()
+ {
+ return MessageDurability.DEFAULT;
+ }
+ };
+ }
+
+ private EnqueueableMessage createEnqueueableMessage(final StoredMessage storedMessage)
+ {
+ return new EnqueueableMessage()
+ {
+ @Override
+ public long getMessageNumber()
+ {
+ return storedMessage.getMessageNumber();
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ @Override
+ public StoredMessage getStoredMessage()
+ {
+ return storedMessage;
+ }
+ };
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(ReferenceCountingTest.class);
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index b8157b8f9e..f6d849bf79 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -261,12 +261,6 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
}
@Override
- public StoreFuture flushToStore()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
index 3944774dfa..2e7e1cf097 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
@@ -159,8 +159,6 @@ public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, Deliv
offset += bareMessageBuf.remaining();
}
- storedMessage.flushToStore();
-
Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference());
MessageReference<Message_1_0> reference = message.newReference();
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
index 3974198f62..bc2d3fe375 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
@@ -111,12 +111,6 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
index 69479b73d6..5412a09b4c 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
@@ -210,12 +210,6 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
index b8073d149b..46b7c322e6 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
@@ -99,12 +99,6 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index d6abe94f30..639c16a71a 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -115,12 +115,6 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index b65181966c..2c40e536be 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -59,6 +59,7 @@ import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -1010,9 +1011,9 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
- public boolean isDurable()
+ public MessageDurability getMessageDurability()
{
- return false;
+ return MessageDurability.NEVER;
}
private class ConsumedMessageInstance implements MessageInstance
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
index 9a24e23407..c32c1d3bba 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
@@ -33,6 +33,17 @@
<td><input type="checkbox" name="durable" id="formAddQueue.durable" value="durable" checked="checked" dojoType="dijit.form.CheckBox" /></td>
</tr>
<tr>
+ <td valign="top"><strong>Persist Messages? </strong></td>
+ <td>
+ <select id="formAddQueue.messageDurability" name="messageDurability" data-dojo-type="dijit.form.FilteringSelect"
+ data-dojo-props="name: 'messageDurability', value: '', searchAttr: 'name', placeHolder: '', value: '', required: false ">
+ <option value="ALWAYS">Always</option>
+ <option value="DEFAULT">Default</option>
+ <option value="NEVER">Never</option>
+ </select>
+ </td>
+ </tr>
+ <tr>
<td valign="top"><strong>Queue Type: </strong></td>
<td>
<input type="radio" id="formAddQueueTypeStandard" name="type" value="standard" checked="checked" dojoType="dijit.form.RadioButton" />
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
index 039437a0bf..6c0924d09b 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
@@ -277,6 +277,7 @@ define(["dojo/_base/xhr",
storeNodes(["name",
"state",
"durable",
+ "messageDurability",
"exclusive",
"owner",
"lifetimePolicy",
@@ -351,6 +352,7 @@ define(["dojo/_base/xhr",
this.exclusive.innerHTML = entities.encode(String(this.queueData[ "exclusive" ]));
this.owner.innerHTML = this.queueData[ "owner" ] ? entities.encode(String(this.queueData[ "owner" ])) : "" ;
this.lifetimePolicy.innerHTML = entities.encode(String(this.queueData[ "lifetimePolicy" ]));
+ this.messageDurability.innerHTML = entities.encode(String(this.queueData[ "messageDurability" ]));
this.alternateExchange.innerHTML = this.queueData[ "alternateExchange" ] ? entities.encode(String(this.queueData[ "alternateExchange" ])) : "" ;
this.queueDepthMessages.innerHTML = entities.encode(String(this.queueData["queueDepthMessages"]));
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
index 89b7327957..8f2bf3364d 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
@@ -33,6 +33,10 @@
<div class="durable" style="float:left;"></div>
</div>
<div style="clear:both">
+ <div class="formLabel-labelCell" style="float:left; width: 150px;">Persist Messages:</div>
+ <div class="messageDurability" style="float:left;"></div>
+ </div>
+ <div style="clear:both">
<div class="formLabel-labelCell" style="float:left; width: 150px;">Exclusive:</div>
<div class="exclusive" style="float:left;"></div>
</div>
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index a37b532617..3316ae801b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -118,7 +118,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Immediate message prefetch default. */
public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
- private final boolean _delareQueues =
+ private final boolean _declareQueues =
Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "true"));
private final boolean _declareExchanges =
@@ -2871,7 +2871,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
declareExchange(amqd, nowait);
}
- if ((_delareQueues || amqd.isNameRequired()) && !amqd.neverDeclare())
+ if ((_declareQueues || amqd.isNameRequired()) && !amqd.neverDeclare())
{
declareQueue(amqd, consumer.isNoLocal(), nowait);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index cb8f81f68f..46473900c0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -24,6 +24,7 @@ import static org.apache.qpid.transport.Option.UNRELIABLE;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
@@ -37,6 +38,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jms.Destination;
import javax.jms.JMSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.Binding;
@@ -57,29 +61,9 @@ import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ExchangeBoundResult;
-import org.apache.qpid.transport.ExchangeQueryResult;
-import org.apache.qpid.transport.ExecutionErrorCode;
-import org.apache.qpid.transport.ExecutionException;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.QueueQueryResult;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.RangeSetFactory;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.SessionListener;
-import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.*;
import org.apache.qpid.util.Serial;
import org.apache.qpid.util.Strings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This is a 0.10 Session
@@ -362,19 +346,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
final AMQDestination destination, final boolean nowait)
throws AMQException
{
- if (destination.getDestSyntax() == DestSyntax.BURL)
+ if (destination == null || destination.getDestSyntax() == DestSyntax.BURL)
{
Map args = FieldTableSupport.convertToMap(arguments);
- for (AMQShortString rk: destination.getBindingKeys())
+ if(destination != null)
{
- _logger.debug("Binding queue : " + queueName.toString() +
- " exchange: " + exchangeName.toString() +
- " using binding key " + rk.asString());
- getQpidSession().exchangeBind(queueName.toString(),
- exchangeName.toString(),
- rk.toString(),
- args);
+ for (AMQShortString rk: destination.getBindingKeys())
+ {
+ doSendQueueBind(queueName, exchangeName, args, rk);
+ }
+ if(!Arrays.asList(destination.getBindingKeys()).contains(routingKey))
+ {
+ doSendQueueBind(queueName, exchangeName, args, routingKey);
+ }
+ }
+ else
+ {
+ doSendQueueBind(queueName, exchangeName, args, routingKey);
}
}
else
@@ -420,6 +409,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
+ private void doSendQueueBind(final AMQShortString queueName,
+ final AMQShortString exchangeName,
+ final Map args,
+ final AMQShortString rk)
+ {
+ _logger.debug("Binding queue : " + queueName.toString() +
+ " exchange: " + exchangeName.toString() +
+ " using binding key " + rk.asString());
+ getQpidSession().exchangeBind(queueName.toString(),
+ exchangeName.toString(),
+ rk.toString(),
+ args);
+ }
+
/**
* Close this session.
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
new file mode 100644
index 0000000000..fe86e9d41f
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
@@ -0,0 +1,216 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class QueueMessageDurabilityTest extends QpidBrokerTestCase
+{
+
+ private static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
+ private static final String DURABLE_ALWAYS_PERSIST_NAME = "DURABLE_QUEUE_ALWAYS_PERSIST";
+ private static final String DURABLE_NEVER_PERSIST_NAME = "DURABLE_QUEUE_NEVER_PERSIST";
+ private static final String DURABLE_DEFAULT_PERSIST_NAME = "DURABLE_QUEUE_DEFAULT_PERSIST";
+ private static final String NONDURABLE_ALWAYS_PERSIST_NAME = "NONDURABLE_QUEUE_ALWAYS_PERSIST";
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ Connection conn = getConnection();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQSession amqSession = (AMQSession) session;
+
+ Map<String,Object> arguments = new HashMap<>();
+ arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.ALWAYS.name());
+ amqSession.createQueue(new AMQShortString(DURABLE_ALWAYS_PERSIST_NAME), false, true, false, arguments);
+
+ arguments = new HashMap<>();
+ arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.NEVER.name());
+ amqSession.createQueue(new AMQShortString(DURABLE_NEVER_PERSIST_NAME), false, true, false, arguments);
+
+ arguments = new HashMap<>();
+ arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.DEFAULT.name());
+ amqSession.createQueue(new AMQShortString(DURABLE_DEFAULT_PERSIST_NAME), false, true, false, arguments);
+
+ arguments = new HashMap<>();
+ arguments.put(QPID_MESSAGE_DURABILITY,MessageDurability.ALWAYS.name());
+ amqSession.createQueue(new AMQShortString(NONDURABLE_ALWAYS_PERSIST_NAME), false, false, false, arguments);
+
+ amqSession.bindQueue(AMQShortString.valueOf(DURABLE_ALWAYS_PERSIST_NAME),
+ AMQShortString.valueOf("Y.*.*.*"),
+ null,
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+ null);
+
+ amqSession.bindQueue(AMQShortString.valueOf(DURABLE_NEVER_PERSIST_NAME),
+ AMQShortString.valueOf("*.Y.*.*"),
+ null,
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+ null);
+
+ amqSession.bindQueue(AMQShortString.valueOf(DURABLE_DEFAULT_PERSIST_NAME),
+ AMQShortString.valueOf("*.*.Y.*"),
+ null,
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+ null);
+
+ amqSession.bindQueue(AMQShortString.valueOf(NONDURABLE_ALWAYS_PERSIST_NAME),
+ AMQShortString.valueOf("*.*.*.Y"),
+ null,
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+ null);
+ }
+
+ public void testSendPersistentMessageToAll() throws Exception
+ {
+ Connection conn = getConnection();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ conn.start();
+ producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test"));
+ session.commit();
+
+ AMQSession amqSession = (AMQSession) session;
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+ restartBroker();
+
+ conn = getConnection();
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ amqSession = (AMQSession) session;
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+
+ assertFalse(amqSession.isQueueBound((AMQDestination) session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+ }
+
+
+ public void testSendNonPersistentMessageToAll() throws Exception
+ {
+ Connection conn = getConnection();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ conn.start();
+ producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test"));
+ session.commit();
+
+ AMQSession amqSession = (AMQSession) session;
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+ restartBroker();
+
+ conn = getConnection();
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ amqSession = (AMQSession) session;
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+
+ assertFalse(amqSession.isQueueBound((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+ }
+
+ public void testNonPersistentContentRetained() throws Exception
+ {
+ Connection conn = getConnection();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ conn.start();
+ producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1"));
+ producer.send(session.createTopic("Y.N.Y.Y"), session.createTextMessage("test2"));
+ session.commit();
+ MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
+ Message msg = consumer.receive(1000l);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("test2", ((TextMessage) msg).getText());
+ session.rollback();
+ restartBroker();
+ conn = getConnection();
+ conn.start();
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ AMQSession amqSession = (AMQSession) session;
+ assertEquals(1, amqSession.getQueueDepth((AMQDestination) session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+ consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
+ msg = consumer.receive(1000l);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("test2", ((TextMessage)msg).getText());
+ session.commit();
+ }
+
+ public void testPersistentContentRetainedOnTransientQueue() throws Exception
+ {
+ setTestClientSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false");
+ Connection conn = getConnection();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ conn.start();
+ producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1"));
+ session.commit();
+ MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_DEFAULT_PERSIST_NAME));
+ Message msg = consumer.receive(1000l);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("test1", ((TextMessage)msg).getText());
+ session.commit();
+ System.gc();
+ consumer = session.createConsumer(session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME));
+ msg = consumer.receive(1000l);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("test1", ((TextMessage)msg).getText());
+ session.commit();
+ }
+
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
index 2c38a04895..8550c804a6 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
-import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
@@ -34,6 +33,8 @@ import java.util.Map;
import javax.security.auth.Subject;
+import org.codehaus.jackson.map.ObjectMapper;
+
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -77,7 +78,6 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhostnode.JsonVirtualHostNode;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
-import org.codehaus.jackson.map.ObjectMapper;
/**
*
@@ -604,7 +604,6 @@ public class VirtualHostMessageStoreTest extends QpidTestCase
MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis());
final StoredMessage<MessageMetaData> storedMessage = _virtualHost.getMessageStore().addMessage(mmd);
- storedMessage.flushToStore();
final AMQMessage currentMessage = new AMQMessage(storedMessage);
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index 9e9708bf3c..3d87da11c8 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -193,3 +193,7 @@ org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide
// Exclude java broker specific behavior allowing queue re-bind to topic exchanges on 0.8/0-10 paths
org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange
+
+// Tests queue message durability settings which are a Java Broker specific feature
+org.apache.qpid.server.queue.QueueMessageDurabilityTest#*
+
diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes
index a50a8bd599..ff4485d599 100644
--- a/qpid/java/test-profiles/JavaTransientExcludes
+++ b/qpid/java/test-profiles/JavaTransientExcludes
@@ -33,6 +33,8 @@ org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash
org.apache.qpid.test.unit.xa.TopicTest#testDurSubCrash
org.apache.qpid.test.unit.xa.TopicTest#testRecover
+org.apache.qpid.server.queue.QueueMessageDurabilityTest#*
+
org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessagePersistence
org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessageRemoval
org.apache.qpid.server.store.VirtualHostMessageStoreTest#testBindingPersistence