diff options
Diffstat (limited to 'qpid/java')
49 files changed, 672 insertions, 349 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index cf187fe1e9..be592a0d42 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -55,7 +55,6 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMemoryMessage; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.Xid; @@ -123,14 +122,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore long newMessageId = getNextMessageId(); - if (metaData.isPersistent()) - { - return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData); - } - else - { - return new StoredMemoryMessage<T>(newMessageId, metaData); - } + return new StoredBDBMessage<T>(newMessageId, metaData); } public long getNextMessageId() @@ -1049,7 +1041,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore protected abstract Logger getLogger(); - private class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T> + class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T> { private final long _messageId; @@ -1177,8 +1169,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - @Override - public synchronized StoreFuture flushToStore() + synchronized StoreFuture flushToStore() { if(!stored()) { @@ -1229,6 +1220,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { private Transaction _txn; private int _storeSizeIncrease; + private final List<Runnable> _onCommitActions = new ArrayList<>(); private BDBTransaction() throws StoreException { @@ -1250,8 +1242,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore if(message.getStoredMessage() instanceof StoredBDBMessage) { final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); - storedMessage.store(_txn); - _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + _onCommitActions.add(new Runnable() + { + @Override + public void run() + { + storedMessage.store(_txn); + _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + } + }); + } AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); @@ -1269,16 +1269,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void commitTran() throws StoreException { checkMessageStoreOpen(); - + doPreCommitActions(); AbstractBDBMessageStore.this.commitTranImpl(_txn, true); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); } + private void doPreCommitActions() + { + for(Runnable action : _onCommitActions) + { + action.run(); + } + _onCommitActions.clear(); + } + @Override public StoreFuture commitTranAsync() throws StoreException { checkMessageStoreOpen(); - + doPreCommitActions(); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); return AbstractBDBMessageStore.this.commitTranImpl(_txn, false); } @@ -1287,7 +1296,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void abortTran() throws StoreException { checkMessageStoreOpen(); - + _onCommitActions.clear(); AbstractBDBMessageStore.this.abortTran(_txn); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java index 8fb011152c..1f4cf45ce1 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java @@ -27,6 +27,7 @@ import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; @@ -131,9 +132,9 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction } @Override - public boolean isDurable() + public MessageDurability getMessageDurability() { - return true; + return MessageDurability.DEFAULT; } } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index a96dc8b142..bace585e56 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.store.berkeleydb; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.File; import java.nio.ByteBuffer; import java.util.Arrays; @@ -51,9 +54,6 @@ import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.util.FileUtils; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** * Subclass of MessageStoreTestCase which runs the standard tests from the superclass against * the BDB Store as well as additional tests specific to the BDB store-implementation. @@ -113,7 +113,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase storedMessage_0_8.addContent(0, firstContentBytes_0_8); storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8); - storedMessage_0_8.flushToStore(); + ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore(); /* * Create and insert a 0-10 message (metadata and content) @@ -132,7 +132,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase long messageid_0_10 = storedMessage_0_10.getMessageNumber(); storedMessage_0_10.addContent(0, completeContentBody_0_10); - storedMessage_0_10.flushToStore(); + ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_10).flushToStore(); /* * reload the store only (read-only) @@ -387,7 +387,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8); storedMessage_0_8.addContent(0, chunk1); - storedMessage_0_8.flushToStore(); + ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore(); return storedMessage_0_8; } diff --git a/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java b/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java index 59fe72e377..c9c3afccba 100644 --- a/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java +++ b/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java @@ -24,7 +24,7 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -@Retention(RetentionPolicy.SOURCE) +@Retention(RetentionPolicy.CLASS) @Target(ElementType.TYPE) public @interface PluggableService { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java index fdc2fa90a5..4f2327adee 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java @@ -235,12 +235,6 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, } @Override - public StoreFuture flushToStore() - { - throw new UnsupportedOperationException(); - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index 21112b6309..db53f840de 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.model; import java.util.Collection; import org.apache.qpid.server.queue.QueueEntryVisitor; +import org.apache.qpid.server.store.MessageDurability; @ManagedObject( defaultType = "standard" ) public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> @@ -35,6 +36,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> String ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "alertThresholdQueueDepthMessages"; String ALTERNATE_EXCHANGE = "alternateExchange"; String EXCLUSIVE = "exclusive"; + String MESSAGE_DURABILITY = "messageDurability"; String MESSAGE_GROUP_KEY = "messageGroupKey"; String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups"; String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup"; @@ -130,6 +132,10 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> @ManagedAttribute( defaultValue = "${queue.alertRepeatGap}") long getAlertRepeatGap(); + @ManagedAttribute( defaultValue = "DEFAULT" ) + MessageDurability getMessageDurability(); + + //children Collection<? extends Binding> getBindings(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 479029093f..2aeca6f45f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -77,6 +77,7 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; @@ -175,6 +176,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> @ManagedAttributeField private ExclusivityPolicy _exclusive; + @ManagedAttributeField + private MessageDurability _messageDurability; + private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name private final Set<NotificationCheck> _notificationChecks = @@ -245,12 +249,38 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { super.onCreate(); + if(isDurable() && (getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END)) + { + Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), + new PrivilegedAction<Object>() + { + @Override + public Object run() + { + setAttribute(AbstractConfiguredObject.DURABLE, true, false); + return null; + } + }); + } - if (isDurable() && !(getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE - || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END)) + if (isDurable()) { _virtualHost.getDurableConfigurationStore().create(asObjectRecord()); } + else if(getMessageDurability() != MessageDurability.NEVER) + { + Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), + new PrivilegedAction<Object>() + { + @Override + public Object run() + { + setAttribute(Queue.MESSAGE_DURABILITY, getMessageDurability(), MessageDurability.NEVER); + return null; + } + }); + } _recovering.set(false); } @@ -510,6 +540,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } } + @Override + public final MessageDurability getMessageDurability() + { + return _messageDurability; + } @Override public Collection<String> getAvailableAttributes() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index 15df952e61..37e82b0771 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -52,6 +52,9 @@ public class QueueArgumentsConverter public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key"; public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group"; public static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group"; + + public static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability"; + public static final String QPID_TRACE_EXCLUDE = "qpid.trace.exclude"; public static final String QPID_TRACE_ID = "qpid.trace.id"; @@ -91,6 +94,7 @@ public class QueueArgumentsConverter ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_MESSAGE_GROUP_ARG, Queue.MESSAGE_GROUP_DEFAULT_GROUP); ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL); + ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY); } @@ -138,7 +142,12 @@ public class QueueArgumentsConverter { if(modelArguments.containsKey(entry.getValue())) { - wireArguments.put(entry.getKey(), modelArguments.get(entry.getValue())); + Object value = modelArguments.get(entry.getValue()); + if(value instanceof Enum) + { + value = ((Enum) value).name(); + } + wireArguments.put(entry.getKey(), value); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 24866e4e2e..f2f85e1387 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -447,14 +447,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { checkMessageStoreOpen(); - if(metaData.isPersistent()) - { - return new StoredJDBCMessage(getNextMessageId(), metaData); - } - else - { - return new StoredMemoryMessage(getNextMessageId(), metaData); - } + return new StoredJDBCMessage(getNextMessageId(), metaData); + } @Override @@ -970,9 +964,9 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override - public boolean isDurable() + public MessageDurability getMessageDurability() { - return true; + return MessageDurability.DEFAULT; } } @@ -1122,7 +1116,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { private final ConnectionWrapper _connWrapper; private int _storeSizeIncrease; - + private final List<Runnable> _onCommitActions = new ArrayList<>(); protected JDBCTransaction() { @@ -1144,16 +1138,23 @@ public abstract class AbstractJDBCMessageStore implements MessageStore final StoredMessage storedMessage = message.getStoredMessage(); if(storedMessage instanceof StoredJDBCMessage) { - try - { - ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection()); - } - catch (SQLException e) + _onCommitActions.add(new Runnable() { - throw new StoreException("Exception on enqueuing message into message store" + _messageId, e); - } + @Override + public void run() + { + try + { + ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection()); + _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + } + catch (SQLException e) + { + throw new StoreException("Exception on enqueuing message into message store" + _messageId, e); + } + } + }); } - _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber()); } @@ -1170,7 +1171,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore public void commitTran() { checkMessageStoreOpen(); - + doPreCommitActions(); AbstractJDBCMessageStore.this.commitTran(_connWrapper); storedSizeChange(_storeSizeIncrease); } @@ -1179,17 +1180,26 @@ public abstract class AbstractJDBCMessageStore implements MessageStore public StoreFuture commitTranAsync() { checkMessageStoreOpen(); - + doPreCommitActions(); StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper); storedSizeChange(_storeSizeIncrease); return storeFuture; } + private void doPreCommitActions() + { + for(Runnable action : _onCommitActions) + { + action.run(); + } + _onCommitActions.clear(); + } + @Override public void abortTran() { checkMessageStoreOpen(); - + _onCommitActions.clear(); AbstractJDBCMessageStore.this.abortTran(_connWrapper); } @@ -1215,7 +1225,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore private final long _messageId; private final boolean _isRecovered; - private StorableMessageMetaData _metaData; private volatile SoftReference<StorableMessageMetaData> _metaDataRef; private byte[] _data; @@ -1320,39 +1329,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override - public synchronized StoreFuture flushToStore() - { - checkMessageStoreOpen(); - - Connection conn = null; - try - { - if(!stored()) - { - conn = newConnection(); - - store(conn); - - conn.commit(); - storedSizeChange(getMetaData().getContentSize()); - } - } - catch (SQLException e) - { - if(getLogger().isDebugEnabled()) - { - getLogger().debug("Error when trying to flush message " + _messageId + " to store: " + e); - } - throw new StoreException(e); - } - finally - { - JdbcUtils.closeConnection(conn, AbstractJDBCMessageStore.this.getLogger()); - } - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { checkMessageStoreOpen(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index f4551aae05..f3b2cac52e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -64,6 +64,12 @@ public class MemoryMessageStore implements MessageStore @Override public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) { + + if(message.getStoredMessage() instanceof StoredMemoryMessage) + { + _messages.putIfAbsent(message.getMessageNumber(), (StoredMemoryMessage) message.getStoredMessage()); + } + Set<Long> messageIds = _localEnqueueMap.get(queue.getId()); if (messageIds == null) { @@ -196,31 +202,20 @@ public class MemoryMessageStore implements MessageStore { long id = getNextMessageId(); - if(metaData.isPersistent()) + StoredMemoryMessage<T> storedMemoryMessage = new StoredMemoryMessage<T>(id, metaData) { - return new StoredMemoryMessage<T>(id, metaData) + + @Override + public void remove() { + _messages.remove(getMessageNumber()); + super.remove(); + } - @Override - public StoreFuture flushToStore() - { - _messages.putIfAbsent(getMessageNumber(), this) ; - return super.flushToStore(); - } + }; - @Override - public void remove() - { - _messages.remove(getMessageNumber()); - super.remove(); - } + return storedMemoryMessage; - }; - } - else - { - return new StoredMemoryMessage<T>(id, metaData); - } } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java new file mode 100644 index 0000000000..4dd621bda0 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +public enum MessageDurability +{ + DEFAULT(false,true), + ALWAYS(true,true), + NEVER(false,false); + + private final boolean _nonPersistent; + private final boolean _persistent; + + MessageDurability(final boolean nonPersistent, final boolean persistent) + { + _nonPersistent = nonPersistent; + _persistent = persistent; + } + + public boolean persist(final boolean persistent) + { + return persistent ? _persistent : _nonPersistent; + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java index f4e1376980..e1043e8807 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java @@ -122,12 +122,6 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S return buf; } - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - public T getMetaData() { return _metaData; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java index 7909003855..6beb74f4ae 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java @@ -34,7 +34,5 @@ public interface StoredMessage<M extends StorableMessageMetaData> ByteBuffer getContent(int offsetInMessage, int size); - StoreFuture flushToStore(); - void remove(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java index 18b3125641..f5b1aa6ce7 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java @@ -24,7 +24,9 @@ import java.util.UUID; public interface TransactionLogResource { + String getName(); public UUID getId(); - boolean isDurable(); + //boolean isDurable(); + MessageDurability getMessageDurability(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index 013e9f32ed..65064b015c 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.txn; +import java.util.Collection; +import java.util.List; + import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; @@ -31,9 +34,6 @@ import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; -import java.util.Collection; -import java.util.List; - /** * An implementation of ServerTransaction where each enqueue/dequeue * operation takes place within it own transaction. @@ -93,7 +93,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction try { StoreFuture future; - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -162,7 +162,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction ServerMessage message = entry.getMessage(); TransactionLogResource queue = entry.getOwningResource(); - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -205,7 +205,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction try { StoreFuture future; - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -237,28 +237,24 @@ public class AsyncAutoCommitTransaction implements ServerTransaction try { - if(message.isPersistent()) + for(BaseQueue queue : queues) { - for(BaseQueue queue : queues) + if (queue.getMessageDurability().persist(message.isPersistent())) { - if (queue.isDurable()) + if (_logger.isDebugEnabled()) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName()); - } - if (txn == null) - { - txn = _messageStore.newTransaction(); - } - - txn.enqueueMessage(queue, message); + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName()); + } + if (txn == null) + { + txn = _messageStore.newTransaction(); + } + txn.enqueueMessage(queue, message); - } } - } + StoreFuture future; if (txn != null) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index ec3b6f69fb..2ecd847719 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.txn; +import java.util.Collection; +import java.util.List; + import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; @@ -30,9 +33,6 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; -import java.util.Collection; -import java.util.List; - /** * An implementation of ServerTransaction where each enqueue/dequeue * operation takes place within it own transaction. @@ -77,7 +77,7 @@ public class AutoCommitTransaction implements ServerTransaction Transaction txn = null; try { - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -109,7 +109,7 @@ public class AutoCommitTransaction implements ServerTransaction ServerMessage message = entry.getMessage(); TransactionLogResource queue = entry.getOwningResource(); - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -146,7 +146,7 @@ public class AutoCommitTransaction implements ServerTransaction Transaction txn = null; try { - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -175,25 +175,21 @@ public class AutoCommitTransaction implements ServerTransaction try { - if(message.isPersistent()) + for(BaseQueue queue : queues) { - for(BaseQueue queue : queues) + if (queue.getMessageDurability().persist(message.isPersistent())) { - if (queue.isDurable()) + if (_logger.isDebugEnabled()) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName()); - } - if (txn == null) - { - txn = _messageStore.newTransaction(); - } - - txn.enqueueMessage(queue, message); + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName()); + } + if (txn == null) + { + txn = _messageStore.newTransaction(); + } + txn.enqueueMessage(queue, message); - } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java index 535ad77ea4..4195c72a28 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java @@ -381,7 +381,7 @@ public class DtxBranch public boolean isDurable() { - return _message.isPersistent() && _resource.isDurable(); + return _resource.getMessageDurability().persist(_message.isPersistent()); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index b3d013c99f..e371bcdb02 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -20,21 +20,21 @@ */ package org.apache.qpid.server.txn; -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.TransactionLogResource; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.Transaction; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import org.apache.qpid.server.store.TransactionLogResource; /** * A concrete implementation of ServerTransaction where enqueue/dequeue @@ -97,7 +97,7 @@ public class LocalTransaction implements ServerTransaction _postTransactionActions.add(postTransactionAction); initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { try { @@ -129,7 +129,7 @@ public class LocalTransaction implements ServerTransaction ServerMessage message = entry.getMessage(); TransactionLogResource queue = entry.getOwningResource(); - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -186,7 +186,7 @@ public class LocalTransaction implements ServerTransaction _postTransactionActions.add(postTransactionAction); initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { try { @@ -211,29 +211,26 @@ public class LocalTransaction implements ServerTransaction _postTransactionActions.add(postTransactionAction); initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); - if(message.isPersistent()) + try { - try + for(BaseQueue queue : queues) { - for(BaseQueue queue : queues) + if(queue.getMessageDurability().persist(message.isPersistent())) { - if(queue.isDurable()) + if (_logger.isDebugEnabled()) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName() ); - } + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName() ); + } - beginTranIfNecessary(); - _transaction.enqueueMessage(queue, message); + beginTranIfNecessary(); + _transaction.enqueueMessage(queue, message); - } } } - catch(RuntimeException e) - { - tidyUpOnError(e); - } + } + catch(RuntimeException e) + { + tidyUpOnError(e); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java index 9fc71e23d7..becf4a073c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; @@ -220,9 +221,9 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer } @Override - public boolean isDurable() + public MessageDurability getMessageDurability() { - return false; + return MessageDurability.DEFAULT; } }; txn.dequeueMessage(mockQueue, new DummyMessage(messageId)); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 6b6de5b66a..4ddf421901 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -34,8 +34,6 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; -import org.apache.qpid.server.model.ConfiguredObjectFactory; -import org.apache.qpid.server.model.VirtualHostNode; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; @@ -49,6 +47,7 @@ import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.ExclusivityPolicy; @@ -56,6 +55,7 @@ import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java index 51b33276ec..be4505bc80 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -20,16 +20,14 @@ */ package org.apache.qpid.server.store; -import static org.mockito.Mockito.mock; - import java.io.File; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.UUID; import org.apache.log4j.Logger; + import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.VirtualHost; @@ -148,12 +146,6 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple return _transactionResource; } - @Override - public boolean isDurable() - { - return true; - } - private static class TestMessage implements EnqueueableMessage { private final StoredMessage<?> _handle; @@ -180,4 +172,10 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple return _handle; } } + + @Override + public MessageDurability getMessageDurability() + { + return MessageDurability.DEFAULT; + } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index d4b990da07..758799b81f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -28,13 +28,14 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.Collections; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import org.hamcrest.Description; +import org.mockito.ArgumentMatcher; + import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.UUIDGenerator; @@ -45,9 +46,6 @@ import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.test.utils.QpidTestCase; -import org.hamcrest.Description; -import org.mockito.ArgumentMatcher; - public abstract class MessageStoreTestCase extends QpidTestCase { private MessageStore _store; @@ -117,8 +115,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase long messageId = 1; int contentSize = 0; final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); + enqueueMessage(message, "dummyQ"); MessageHandler handler = mock(MessageHandler.class); _store.visitMessages(handler); @@ -127,14 +124,60 @@ public abstract class MessageStoreTestCase extends QpidTestCase } + public void enqueueMessage(final StoredMessage<TestMessageMetaData> message, final String queueName) + { + Transaction txn = _store.newTransaction(); + txn.enqueueMessage(new TransactionLogResource() + { + private final UUID _id = UUID.nameUUIDFromBytes(queueName.getBytes()); + + @Override + public String getName() + { + return queueName; + } + + @Override + public UUID getId() + { + return _id; + } + + @Override + public MessageDurability getMessageDurability() + { + return MessageDurability.DEFAULT; + } + }, new EnqueueableMessage() + { + @Override + public long getMessageNumber() + { + return message.getMessageNumber(); + } + + @Override + public boolean isPersistent() + { + return true; + } + + @Override + public StoredMessage getStoredMessage() + { + return message; + } + }); + txn.commitTran(); + } + public void testVisitMessagesAborted() throws Exception { int contentSize = 0; for (int i = 0; i < 3; i++) { final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); + enqueueMessage(message, "dummyQ"); } MessageHandler handler = mock(MessageHandler.class); @@ -151,16 +194,16 @@ public abstract class MessageStoreTestCase extends QpidTestCase for (int i = 0; i < 3; i++) { final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); + enqueueMessage(message, "dummyQ"); + } reopenStore(); final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); + enqueueMessage(message, "dummyQ"); + assertTrue("Unexpected message id " + message.getMessageNumber(), message.getMessageNumber() >= 4); } @@ -170,8 +213,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase long messageId = 1; int contentSize = 0; final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message); @@ -305,8 +346,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase long messageId = 1; int contentSize = 0; final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); MessageHandler handler = mock(MessageHandler.class); _store.visitMessages(handler); @@ -319,8 +358,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase long messageId = 1; int contentSize = 0; final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); + enqueueMessage(message, "dummyQ"); final AtomicReference<StoredMessage<?>> retrievedMessageRef = new AtomicReference<StoredMessage<?>>(); _store.visitMessages(new MessageHandler() @@ -360,7 +398,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase TransactionLogResource queue = mock(TransactionLogResource.class); when(queue.getId()).thenReturn(queueId); when(queue.getName()).thenReturn("testQueue"); - when(queue.isDurable()).thenReturn(true); + when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT); return queue; } @@ -391,8 +429,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase private EnqueueableMessage createEnqueueableMessage(long messageId1) { final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0)); - StoreFuture flushFuture = message1.flushToStore(); - flushFuture.waitForCompletion(); EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1); return enqueueableMessage1; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java index 8285bdba4c..ec0908efba 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java @@ -25,6 +25,7 @@ import java.util.Collections; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.Transaction; @@ -53,6 +54,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase when(_messageStore.newTransaction()).thenReturn(_storeTransaction); when(_storeTransaction.commitTranAsync()).thenReturn(_future); when(_queue.isDurable()).thenReturn(true); + when(_queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT); } public void testEnqueuePersistentMessagePostCommitNotCalledWhenFutureAlreadyComplete() throws Exception diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java index 3e1183d203..5abbd7352b 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java @@ -20,22 +20,23 @@ */ package org.apache.qpid.server.txn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.MockMessageInstance; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState; import org.apache.qpid.test.utils.QpidTestCase; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** * A unit test ensuring that AutoCommitTransaction creates a separate transaction for * each dequeue/enqueue operation that involves enlistable messages. Verifies @@ -428,6 +429,7 @@ public class AutoCommitTransactionTest extends QpidTestCase { BaseQueue queue = mock(BaseQueue.class); when(queue.isDurable()).thenReturn(durable); + when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER); return queue; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java index 58c7401c60..c905e52715 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java @@ -20,22 +20,23 @@ */ package org.apache.qpid.server.txn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.MockMessageInstance; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState; import org.apache.qpid.test.utils.QpidTestCase; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** * A unit test ensuring that LocalTransactionTest creates a long-lived store transaction * that spans many dequeue/enqueue operations of enlistable messages. Verifies @@ -652,6 +653,7 @@ public class LocalTransactionTest extends QpidTestCase { BaseQueue queue = mock(BaseQueue.class); when(queue.isDurable()).thenReturn(durable); + when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER); return queue; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java index 685fea207b..9c05ed564a 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java @@ -42,6 +42,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.NullMessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -379,6 +380,7 @@ public class SynchronousMessageStoreRecovererTest extends TestCase { AMQQueue<?> queue = mock(AMQQueue.class); final UUID queueId = UUID.randomUUID(); + when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT); when(queue.getId()).thenReturn(queueId); when(queue.getName()).thenReturn("test-queue"); when(_virtualHost.getQueue(queueId)).thenReturn(queue); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java index f9bf697a0d..dfdc4e230c 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java @@ -102,12 +102,6 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java index 2a4aeb8b7e..ad99d14170 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java @@ -102,12 +102,6 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index af1bd00ae9..5adeba66b1 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -334,11 +334,7 @@ public class ServerSessionDelegate extends SessionDelegate int enqueues = serverSession.enqueue(message, instanceProperties, exchange); - if(enqueues != 0) - { - storeMessage.flushToStore(); - } - else + if(enqueues == 0) { if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 200be71187..4b37823898 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -34,8 +34,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -93,7 +91,6 @@ import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; @@ -152,9 +149,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); - // Set of messages being acknowledged in the current transaction - private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>(); - private final AtomicBoolean _suspended = new AtomicBoolean(false); private ServerTransaction _transaction; @@ -422,7 +416,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>> else { incrementOutstandingTxnsIfNecessary(); - handle.flushToStore(); } } } @@ -1412,7 +1405,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } finally { - _acknowledgedMessages.clear(); + _ackedMessages.clear(); } } @@ -1435,7 +1428,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } finally { - _acknowledgedMessages.clear(); + _ackedMessages.clear(); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java index 2ad6fc2ca6..b7d7e5b236 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java @@ -114,12 +114,6 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java index 50145e5c6d..c4de7a252b 100755 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java @@ -104,11 +104,6 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData> return buf; } - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - public void remove() { } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java index 0ff22f9d51..e0f0fc98a5 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java @@ -20,15 +20,21 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.util.UUID; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.store.MessageCounter; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestMemoryMessageStore; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.test.utils.QpidTestCase; /** @@ -85,8 +91,9 @@ public class ReferenceCountingTest extends QpidTestCase final MessageMetaData mmd = new MessageMetaData(info, chb); StoredMessage storedMessage = _store.addMessage(mmd); - storedMessage.flushToStore(); - + Transaction txn = _store.newTransaction(); + txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage)); + txn.commitTran(); AMQMessage message = new AMQMessage(storedMessage); MessageReference ref = message.newReference(); @@ -151,14 +158,13 @@ public class ReferenceCountingTest extends QpidTestCase final MessageMetaData mmd = new MessageMetaData(info, chb); StoredMessage storedMessage = _store.addMessage(mmd); - storedMessage.flushToStore(); - + Transaction txn = _store.newTransaction(); + txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage)); + txn.commitTran(); AMQMessage message = new AMQMessage(storedMessage); MessageReference ref = message.newReference(); - // we call routing complete to set up the handle - // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); assertEquals(1, getStoreMessageCount()); MessageReference ref2 = message.newReference(); @@ -166,6 +172,54 @@ public class ReferenceCountingTest extends QpidTestCase assertEquals(1, getStoreMessageCount()); } + private TransactionLogResource createTransactionLogResource(final String queueName) + { + return new TransactionLogResource() + { + @Override + public String getName() + { + return queueName; + } + + @Override + public UUID getId() + { + return UUID.nameUUIDFromBytes(queueName.getBytes()); + } + + @Override + public MessageDurability getMessageDurability() + { + return MessageDurability.DEFAULT; + } + }; + } + + private EnqueueableMessage createEnqueueableMessage(final StoredMessage storedMessage) + { + return new EnqueueableMessage() + { + @Override + public long getMessageNumber() + { + return storedMessage.getMessageNumber(); + } + + @Override + public boolean isPersistent() + { + return true; + } + + @Override + public StoredMessage getStoredMessage() + { + return storedMessage; + } + }; + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(ReferenceCountingTest.class); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index b8157b8f9e..f6d849bf79 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -261,12 +261,6 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement } @Override - public StoreFuture flushToStore() - { - throw new UnsupportedOperationException(); - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java index 3944774dfa..2e7e1cf097 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java @@ -159,8 +159,6 @@ public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, Deliv offset += bareMessageBuf.remaining(); } - storedMessage.flushToStore(); - Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference()); MessageReference<Message_1_0> reference = message.newReference(); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java index 3974198f62..bc2d3fe375 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java @@ -111,12 +111,6 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1 } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java index 69479b73d6..5412a09b4c 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java @@ -210,12 +210,6 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java index b8073d149b..46b7c322e6 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java @@ -99,12 +99,6 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java index d6abe94f30..639c16a71a 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java @@ -115,12 +115,6 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index b65181966c..2c40e536be 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -59,6 +59,7 @@ import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.SystemNodeCreator; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -1010,9 +1011,9 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public boolean isDurable() + public MessageDurability getMessageDurability() { - return false; + return MessageDurability.NEVER; } private class ConsumedMessageInstance implements MessageInstance diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html index 9a24e23407..c32c1d3bba 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html @@ -33,6 +33,17 @@ <td><input type="checkbox" name="durable" id="formAddQueue.durable" value="durable" checked="checked" dojoType="dijit.form.CheckBox" /></td> </tr> <tr> + <td valign="top"><strong>Persist Messages? </strong></td> + <td> + <select id="formAddQueue.messageDurability" name="messageDurability" data-dojo-type="dijit.form.FilteringSelect" + data-dojo-props="name: 'messageDurability', value: '', searchAttr: 'name', placeHolder: '', value: '', required: false "> + <option value="ALWAYS">Always</option> + <option value="DEFAULT">Default</option> + <option value="NEVER">Never</option> + </select> + </td> + </tr> + <tr> <td valign="top"><strong>Queue Type: </strong></td> <td> <input type="radio" id="formAddQueueTypeStandard" name="type" value="standard" checked="checked" dojoType="dijit.form.RadioButton" /> diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js index 039437a0bf..6c0924d09b 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js @@ -277,6 +277,7 @@ define(["dojo/_base/xhr", storeNodes(["name", "state", "durable", + "messageDurability", "exclusive", "owner", "lifetimePolicy", @@ -351,6 +352,7 @@ define(["dojo/_base/xhr", this.exclusive.innerHTML = entities.encode(String(this.queueData[ "exclusive" ])); this.owner.innerHTML = this.queueData[ "owner" ] ? entities.encode(String(this.queueData[ "owner" ])) : "" ; this.lifetimePolicy.innerHTML = entities.encode(String(this.queueData[ "lifetimePolicy" ])); + this.messageDurability.innerHTML = entities.encode(String(this.queueData[ "messageDurability" ])); this.alternateExchange.innerHTML = this.queueData[ "alternateExchange" ] ? entities.encode(String(this.queueData[ "alternateExchange" ])) : "" ; this.queueDepthMessages.innerHTML = entities.encode(String(this.queueData["queueDepthMessages"])); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html index 89b7327957..8f2bf3364d 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html @@ -33,6 +33,10 @@ <div class="durable" style="float:left;"></div> </div> <div style="clear:both"> + <div class="formLabel-labelCell" style="float:left; width: 150px;">Persist Messages:</div> + <div class="messageDurability" style="float:left;"></div> + </div> + <div style="clear:both"> <div class="formLabel-labelCell" style="float:left; width: 150px;">Exclusive:</div> <div class="exclusive" style="float:left;"></div> </div> diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a37b532617..3316ae801b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -118,7 +118,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Immediate message prefetch default. */ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - private final boolean _delareQueues = + private final boolean _declareQueues = Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "true")); private final boolean _declareExchanges = @@ -2871,7 +2871,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic declareExchange(amqd, nowait); } - if ((_delareQueues || amqd.isNameRequired()) && !amqd.neverDeclare()) + if ((_declareQueues || amqd.isNameRequired()) && !amqd.neverDeclare()) { declareQueue(amqd, consumer.isNoLocal(), nowait); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index cb8f81f68f..46473900c0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -24,6 +24,7 @@ import static org.apache.qpid.transport.Option.UNRELIABLE; import java.lang.ref.WeakReference; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -37,6 +38,9 @@ import java.util.concurrent.ConcurrentLinkedQueue; import javax.jms.Destination; import javax.jms.JMSException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; @@ -57,29 +61,9 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ExchangeBoundResult; -import org.apache.qpid.transport.ExchangeQueryResult; -import org.apache.qpid.transport.ExecutionErrorCode; -import org.apache.qpid.transport.ExecutionException; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.QueueQueryResult; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.RangeSetFactory; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionException; -import org.apache.qpid.transport.SessionListener; -import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.*; import org.apache.qpid.util.Serial; import org.apache.qpid.util.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This is a 0.10 Session @@ -362,19 +346,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic final AMQDestination destination, final boolean nowait) throws AMQException { - if (destination.getDestSyntax() == DestSyntax.BURL) + if (destination == null || destination.getDestSyntax() == DestSyntax.BURL) { Map args = FieldTableSupport.convertToMap(arguments); - for (AMQShortString rk: destination.getBindingKeys()) + if(destination != null) { - _logger.debug("Binding queue : " + queueName.toString() + - " exchange: " + exchangeName.toString() + - " using binding key " + rk.asString()); - getQpidSession().exchangeBind(queueName.toString(), - exchangeName.toString(), - rk.toString(), - args); + for (AMQShortString rk: destination.getBindingKeys()) + { + doSendQueueBind(queueName, exchangeName, args, rk); + } + if(!Arrays.asList(destination.getBindingKeys()).contains(routingKey)) + { + doSendQueueBind(queueName, exchangeName, args, routingKey); + } + } + else + { + doSendQueueBind(queueName, exchangeName, args, routingKey); } } else @@ -420,6 +409,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } + private void doSendQueueBind(final AMQShortString queueName, + final AMQShortString exchangeName, + final Map args, + final AMQShortString rk) + { + _logger.debug("Binding queue : " + queueName.toString() + + " exchange: " + exchangeName.toString() + + " using binding key " + rk.asString()); + getQpidSession().exchangeBind(queueName.toString(), + exchangeName.toString(), + rk.toString(), + args); + } + /** * Close this session. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java new file mode 100644 index 0000000000..fe86e9d41f --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java @@ -0,0 +1,216 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class QueueMessageDurabilityTest extends QpidBrokerTestCase +{ + + private static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability"; + private static final String DURABLE_ALWAYS_PERSIST_NAME = "DURABLE_QUEUE_ALWAYS_PERSIST"; + private static final String DURABLE_NEVER_PERSIST_NAME = "DURABLE_QUEUE_NEVER_PERSIST"; + private static final String DURABLE_DEFAULT_PERSIST_NAME = "DURABLE_QUEUE_DEFAULT_PERSIST"; + private static final String NONDURABLE_ALWAYS_PERSIST_NAME = "NONDURABLE_QUEUE_ALWAYS_PERSIST"; + + @Override + public void setUp() throws Exception + { + super.setUp(); + Connection conn = getConnection(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQSession amqSession = (AMQSession) session; + + Map<String,Object> arguments = new HashMap<>(); + arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.ALWAYS.name()); + amqSession.createQueue(new AMQShortString(DURABLE_ALWAYS_PERSIST_NAME), false, true, false, arguments); + + arguments = new HashMap<>(); + arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.NEVER.name()); + amqSession.createQueue(new AMQShortString(DURABLE_NEVER_PERSIST_NAME), false, true, false, arguments); + + arguments = new HashMap<>(); + arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.DEFAULT.name()); + amqSession.createQueue(new AMQShortString(DURABLE_DEFAULT_PERSIST_NAME), false, true, false, arguments); + + arguments = new HashMap<>(); + arguments.put(QPID_MESSAGE_DURABILITY,MessageDurability.ALWAYS.name()); + amqSession.createQueue(new AMQShortString(NONDURABLE_ALWAYS_PERSIST_NAME), false, false, false, arguments); + + amqSession.bindQueue(AMQShortString.valueOf(DURABLE_ALWAYS_PERSIST_NAME), + AMQShortString.valueOf("Y.*.*.*"), + null, + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), + null); + + amqSession.bindQueue(AMQShortString.valueOf(DURABLE_NEVER_PERSIST_NAME), + AMQShortString.valueOf("*.Y.*.*"), + null, + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), + null); + + amqSession.bindQueue(AMQShortString.valueOf(DURABLE_DEFAULT_PERSIST_NAME), + AMQShortString.valueOf("*.*.Y.*"), + null, + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), + null); + + amqSession.bindQueue(AMQShortString.valueOf(NONDURABLE_ALWAYS_PERSIST_NAME), + AMQShortString.valueOf("*.*.*.Y"), + null, + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), + null); + } + + public void testSendPersistentMessageToAll() throws Exception + { + Connection conn = getConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + conn.start(); + producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test")); + session.commit(); + + AMQSession amqSession = (AMQSession) session; + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME))); + + restartBroker(); + + conn = getConnection(); + session = conn.createSession(true, Session.SESSION_TRANSACTED); + amqSession = (AMQSession) session; + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + + assertFalse(amqSession.isQueueBound((AMQDestination) session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME))); + + } + + + public void testSendNonPersistentMessageToAll() throws Exception + { + Connection conn = getConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + conn.start(); + producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test")); + session.commit(); + + AMQSession amqSession = (AMQSession) session; + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME))); + + restartBroker(); + + conn = getConnection(); + session = conn.createSession(true, Session.SESSION_TRANSACTED); + amqSession = (AMQSession) session; + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + + assertFalse(amqSession.isQueueBound((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME))); + + } + + public void testNonPersistentContentRetained() throws Exception + { + Connection conn = getConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + conn.start(); + producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1")); + producer.send(session.createTopic("Y.N.Y.Y"), session.createTextMessage("test2")); + session.commit(); + MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)); + Message msg = consumer.receive(1000l); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("test2", ((TextMessage) msg).getText()); + session.rollback(); + restartBroker(); + conn = getConnection(); + conn.start(); + session = conn.createSession(true, Session.SESSION_TRANSACTED); + AMQSession amqSession = (AMQSession) session; + assertEquals(1, amqSession.getQueueDepth((AMQDestination) session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)); + msg = consumer.receive(1000l); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("test2", ((TextMessage)msg).getText()); + session.commit(); + } + + public void testPersistentContentRetainedOnTransientQueue() throws Exception + { + setTestClientSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false"); + Connection conn = getConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + conn.start(); + producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1")); + session.commit(); + MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)); + Message msg = consumer.receive(1000l); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("test1", ((TextMessage)msg).getText()); + session.commit(); + System.gc(); + consumer = session.createConsumer(session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)); + msg = consumer.receive(1000l); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("test1", ((TextMessage)msg).getText()); + session.commit(); + } + + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java index 2c38a04895..8550c804a6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.File; -import java.io.IOException; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; @@ -34,6 +33,8 @@ import java.util.Map; import javax.security.auth.Subject; +import org.codehaus.jackson.map.ObjectMapper; + import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -77,7 +78,6 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhostnode.JsonVirtualHostNode; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; -import org.codehaus.jackson.map.ObjectMapper; /** * @@ -604,7 +604,6 @@ public class VirtualHostMessageStoreTest extends QpidTestCase MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis()); final StoredMessage<MessageMetaData> storedMessage = _virtualHost.getMessageStore().addMessage(mmd); - storedMessage.flushToStore(); final AMQMessage currentMessage = new AMQMessage(storedMessage); diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 9e9708bf3c..3d87da11c8 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -193,3 +193,7 @@ org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide // Exclude java broker specific behavior allowing queue re-bind to topic exchanges on 0.8/0-10 paths org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange + +// Tests queue message durability settings which are a Java Broker specific feature +org.apache.qpid.server.queue.QueueMessageDurabilityTest#* + diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index a50a8bd599..ff4485d599 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -33,6 +33,8 @@ org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash org.apache.qpid.test.unit.xa.TopicTest#testDurSubCrash org.apache.qpid.test.unit.xa.TopicTest#testRecover +org.apache.qpid.server.queue.QueueMessageDurabilityTest#* + org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessagePersistence org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessageRemoval org.apache.qpid.server.store.VirtualHostMessageStoreTest#testBindingPersistence |
