diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-07-25 14:24:36 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-07-25 14:24:36 +0000 |
| commit | 42bfb186da9e911c208f22dd5f6c794b9bddd859 (patch) | |
| tree | 5c0d345cd36b6ef2d17f0abf39f247fc7fdc21c9 /qpid/java | |
| parent | 9a08d3ffc0a21d501a33a2b318fca72f85d0c096 (diff) | |
| download | qpid-python-42bfb186da9e911c208f22dd5f6c794b9bddd859.tar.gz | |
QPID-4304 : [Java Broker] Add an attribute to queues - "messageDurability" - which controls whether message data is persisted or not. By default, depend on the persistence setting of the message, but allow an individual queue to declare that all (or no) messages should be persisted on the queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1613440 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
49 files changed, 672 insertions, 349 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index cf187fe1e9..be592a0d42 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -55,7 +55,6 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMemoryMessage; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.Xid; @@ -123,14 +122,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore long newMessageId = getNextMessageId(); - if (metaData.isPersistent()) - { - return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData); - } - else - { - return new StoredMemoryMessage<T>(newMessageId, metaData); - } + return new StoredBDBMessage<T>(newMessageId, metaData); } public long getNextMessageId() @@ -1049,7 +1041,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore protected abstract Logger getLogger(); - private class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T> + class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T> { private final long _messageId; @@ -1177,8 +1169,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - @Override - public synchronized StoreFuture flushToStore() + synchronized StoreFuture flushToStore() { if(!stored()) { @@ -1229,6 +1220,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { private Transaction _txn; private int _storeSizeIncrease; + private final List<Runnable> _onCommitActions = new ArrayList<>(); private BDBTransaction() throws StoreException { @@ -1250,8 +1242,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore if(message.getStoredMessage() instanceof StoredBDBMessage) { final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); - storedMessage.store(_txn); - _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + _onCommitActions.add(new Runnable() + { + @Override + public void run() + { + storedMessage.store(_txn); + _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + } + }); + } AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); @@ -1269,16 +1269,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void commitTran() throws StoreException { checkMessageStoreOpen(); - + doPreCommitActions(); AbstractBDBMessageStore.this.commitTranImpl(_txn, true); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); } + private void doPreCommitActions() + { + for(Runnable action : _onCommitActions) + { + action.run(); + } + _onCommitActions.clear(); + } + @Override public StoreFuture commitTranAsync() throws StoreException { checkMessageStoreOpen(); - + doPreCommitActions(); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); return AbstractBDBMessageStore.this.commitTranImpl(_txn, false); } @@ -1287,7 +1296,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void abortTran() throws StoreException { checkMessageStoreOpen(); - + _onCommitActions.clear(); AbstractBDBMessageStore.this.abortTran(_txn); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java index 8fb011152c..1f4cf45ce1 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java @@ -27,6 +27,7 @@ import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; @@ -131,9 +132,9 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction } @Override - public boolean isDurable() + public MessageDurability getMessageDurability() { - return true; + return MessageDurability.DEFAULT; } } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index a96dc8b142..bace585e56 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.store.berkeleydb; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.File; import java.nio.ByteBuffer; import java.util.Arrays; @@ -51,9 +54,6 @@ import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.util.FileUtils; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** * Subclass of MessageStoreTestCase which runs the standard tests from the superclass against * the BDB Store as well as additional tests specific to the BDB store-implementation. @@ -113,7 +113,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase storedMessage_0_8.addContent(0, firstContentBytes_0_8); storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8); - storedMessage_0_8.flushToStore(); + ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore(); /* * Create and insert a 0-10 message (metadata and content) @@ -132,7 +132,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase long messageid_0_10 = storedMessage_0_10.getMessageNumber(); storedMessage_0_10.addContent(0, completeContentBody_0_10); - storedMessage_0_10.flushToStore(); + ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_10).flushToStore(); /* * reload the store only (read-only) @@ -387,7 +387,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8); storedMessage_0_8.addContent(0, chunk1); - storedMessage_0_8.flushToStore(); + ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore(); return storedMessage_0_8; } diff --git a/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java b/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java index 59fe72e377..c9c3afccba 100644 --- a/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java +++ b/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java @@ -24,7 +24,7 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -@Retention(RetentionPolicy.SOURCE) +@Retention(RetentionPolicy.CLASS) @Target(ElementType.TYPE) public @interface PluggableService { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java index fdc2fa90a5..4f2327adee 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java @@ -235,12 +235,6 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, } @Override - public StoreFuture flushToStore() - { - throw new UnsupportedOperationException(); - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index 21112b6309..db53f840de 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.model; import java.util.Collection; import org.apache.qpid.server.queue.QueueEntryVisitor; +import org.apache.qpid.server.store.MessageDurability; @ManagedObject( defaultType = "standard" ) public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> @@ -35,6 +36,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> String ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "alertThresholdQueueDepthMessages"; String ALTERNATE_EXCHANGE = "alternateExchange"; String EXCLUSIVE = "exclusive"; + String MESSAGE_DURABILITY = "messageDurability"; String MESSAGE_GROUP_KEY = "messageGroupKey"; String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups"; String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup"; @@ -130,6 +132,10 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> @ManagedAttribute( defaultValue = "${queue.alertRepeatGap}") long getAlertRepeatGap(); + @ManagedAttribute( defaultValue = "DEFAULT" ) + MessageDurability getMessageDurability(); + + //children Collection<? extends Binding> getBindings(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 479029093f..2aeca6f45f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -77,6 +77,7 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; @@ -175,6 +176,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> @ManagedAttributeField private ExclusivityPolicy _exclusive; + @ManagedAttributeField + private MessageDurability _messageDurability; + private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name private final Set<NotificationCheck> _notificationChecks = @@ -245,12 +249,38 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { super.onCreate(); + if(isDurable() && (getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END)) + { + Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), + new PrivilegedAction<Object>() + { + @Override + public Object run() + { + setAttribute(AbstractConfiguredObject.DURABLE, true, false); + return null; + } + }); + } - if (isDurable() && !(getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE - || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END)) + if (isDurable()) { _virtualHost.getDurableConfigurationStore().create(asObjectRecord()); } + else if(getMessageDurability() != MessageDurability.NEVER) + { + Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), + new PrivilegedAction<Object>() + { + @Override + public Object run() + { + setAttribute(Queue.MESSAGE_DURABILITY, getMessageDurability(), MessageDurability.NEVER); + return null; + } + }); + } _recovering.set(false); } @@ -510,6 +540,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } } + @Override + public final MessageDurability getMessageDurability() + { + return _messageDurability; + } @Override public Collection<String> getAvailableAttributes() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index 15df952e61..37e82b0771 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -52,6 +52,9 @@ public class QueueArgumentsConverter public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key"; public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group"; public static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group"; + + public static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability"; + public static final String QPID_TRACE_EXCLUDE = "qpid.trace.exclude"; public static final String QPID_TRACE_ID = "qpid.trace.id"; @@ -91,6 +94,7 @@ public class QueueArgumentsConverter ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_MESSAGE_GROUP_ARG, Queue.MESSAGE_GROUP_DEFAULT_GROUP); ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL); + ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY); } @@ -138,7 +142,12 @@ public class QueueArgumentsConverter { if(modelArguments.containsKey(entry.getValue())) { - wireArguments.put(entry.getKey(), modelArguments.get(entry.getValue())); + Object value = modelArguments.get(entry.getValue()); + if(value instanceof Enum) + { + value = ((Enum) value).name(); + } + wireArguments.put(entry.getKey(), value); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 24866e4e2e..f2f85e1387 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -447,14 +447,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { checkMessageStoreOpen(); - if(metaData.isPersistent()) - { - return new StoredJDBCMessage(getNextMessageId(), metaData); - } - else - { - return new StoredMemoryMessage(getNextMessageId(), metaData); - } + return new StoredJDBCMessage(getNextMessageId(), metaData); + } @Override @@ -970,9 +964,9 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override - public boolean isDurable() + public MessageDurability getMessageDurability() { - return true; + return MessageDurability.DEFAULT; } } @@ -1122,7 +1116,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { private final ConnectionWrapper _connWrapper; private int _storeSizeIncrease; - + private final List<Runnable> _onCommitActions = new ArrayList<>(); protected JDBCTransaction() { @@ -1144,16 +1138,23 @@ public abstract class AbstractJDBCMessageStore implements MessageStore final StoredMessage storedMessage = message.getStoredMessage(); if(storedMessage instanceof StoredJDBCMessage) { - try - { - ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection()); - } - catch (SQLException e) + _onCommitActions.add(new Runnable() { - throw new StoreException("Exception on enqueuing message into message store" + _messageId, e); - } + @Override + public void run() + { + try + { + ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection()); + _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + } + catch (SQLException e) + { + throw new StoreException("Exception on enqueuing message into message store" + _messageId, e); + } + } + }); } - _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber()); } @@ -1170,7 +1171,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore public void commitTran() { checkMessageStoreOpen(); - + doPreCommitActions(); AbstractJDBCMessageStore.this.commitTran(_connWrapper); storedSizeChange(_storeSizeIncrease); } @@ -1179,17 +1180,26 @@ public abstract class AbstractJDBCMessageStore implements MessageStore public StoreFuture commitTranAsync() { checkMessageStoreOpen(); - + doPreCommitActions(); StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper); storedSizeChange(_storeSizeIncrease); return storeFuture; } + private void doPreCommitActions() + { + for(Runnable action : _onCommitActions) + { + action.run(); + } + _onCommitActions.clear(); + } + @Override public void abortTran() { checkMessageStoreOpen(); - + _onCommitActions.clear(); AbstractJDBCMessageStore.this.abortTran(_connWrapper); } @@ -1215,7 +1225,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore private final long _messageId; private final boolean _isRecovered; - private StorableMessageMetaData _metaData; private volatile SoftReference<StorableMessageMetaData> _metaDataRef; private byte[] _data; @@ -1320,39 +1329,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override - public synchronized StoreFuture flushToStore() - { - checkMessageStoreOpen(); - - Connection conn = null; - try - { - if(!stored()) - { - conn = newConnection(); - - store(conn); - - conn.commit(); - storedSizeChange(getMetaData().getContentSize()); - } - } - catch (SQLException e) - { - if(getLogger().isDebugEnabled()) - { - getLogger().debug("Error when trying to flush message " + _messageId + " to store: " + e); - } - throw new StoreException(e); - } - finally - { - JdbcUtils.closeConnection(conn, AbstractJDBCMessageStore.this.getLogger()); - } - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { checkMessageStoreOpen(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index f4551aae05..f3b2cac52e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -64,6 +64,12 @@ public class MemoryMessageStore implements MessageStore @Override public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) { + + if(message.getStoredMessage() instanceof StoredMemoryMessage) + { + _messages.putIfAbsent(message.getMessageNumber(), (StoredMemoryMessage) message.getStoredMessage()); + } + Set<Long> messageIds = _localEnqueueMap.get(queue.getId()); if (messageIds == null) { @@ -196,31 +202,20 @@ public class MemoryMessageStore implements MessageStore { long id = getNextMessageId(); - if(metaData.isPersistent()) + StoredMemoryMessage<T> storedMemoryMessage = new StoredMemoryMessage<T>(id, metaData) { - return new StoredMemoryMessage<T>(id, metaData) + + @Override + public void remove() { + _messages.remove(getMessageNumber()); + super.remove(); + } - @Override - public StoreFuture flushToStore() - { - _messages.putIfAbsent(getMessageNumber(), this) ; - return super.flushToStore(); - } + }; - @Override - public void remove() - { - _messages.remove(getMessageNumber()); - super.remove(); - } + return storedMemoryMessage; - }; - } - else - { - return new StoredMemoryMessage<T>(id, metaData); - } } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java new file mode 100644 index 0000000000..4dd621bda0 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +public enum MessageDurability +{ + DEFAULT(false,true), + ALWAYS(true,true), + NEVER(false,false); + + private final boolean _nonPersistent; + private final boolean _persistent; + + MessageDurability(final boolean nonPersistent, final boolean persistent) + { + _nonPersistent = nonPersistent; + _persistent = persistent; + } + + public boolean persist(final boolean persistent) + { + return persistent ? _persistent : _nonPersistent; + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java index f4e1376980..e1043e8807 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java @@ -122,12 +122,6 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S return buf; } - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - public T getMetaData() { return _metaData; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java index 7909003855..6beb74f4ae 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java @@ -34,7 +34,5 @@ public interface StoredMessage<M extends StorableMessageMetaData> ByteBuffer getContent(int offsetInMessage, int size); - StoreFuture flushToStore(); - void remove(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java index 18b3125641..f5b1aa6ce7 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java @@ -24,7 +24,9 @@ import java.util.UUID; public interface TransactionLogResource { + String getName(); public UUID getId(); - boolean isDurable(); + //boolean isDurable(); + MessageDurability getMessageDurability(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index 013e9f32ed..65064b015c 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.txn; +import java.util.Collection; +import java.util.List; + import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; @@ -31,9 +34,6 @@ import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; -import java.util.Collection; -import java.util.List; - /** * An implementation of ServerTransaction where each enqueue/dequeue * operation takes place within it own transaction. @@ -93,7 +93,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction try { StoreFuture future; - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -162,7 +162,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction ServerMessage message = entry.getMessage(); TransactionLogResource queue = entry.getOwningResource(); - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -205,7 +205,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction try { StoreFuture future; - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -237,28 +237,24 @@ public class AsyncAutoCommitTransaction implements ServerTransaction try { - if(message.isPersistent()) + for(BaseQueue queue : queues) { - for(BaseQueue queue : queues) + if (queue.getMessageDurability().persist(message.isPersistent())) { - if (queue.isDurable()) + if (_logger.isDebugEnabled()) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName()); - } - if (txn == null) - { - txn = _messageStore.newTransaction(); - } - - txn.enqueueMessage(queue, message); + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName()); + } + if (txn == null) + { + txn = _messageStore.newTransaction(); + } + txn.enqueueMessage(queue, message); - } } - } + StoreFuture future; if (txn != null) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index ec3b6f69fb..2ecd847719 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.txn; +import java.util.Collection; +import java.util.List; + import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; @@ -30,9 +33,6 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; -import java.util.Collection; -import java.util.List; - /** * An implementation of ServerTransaction where each enqueue/dequeue * operation takes place within it own transaction. @@ -77,7 +77,7 @@ public class AutoCommitTransaction implements ServerTransaction Transaction txn = null; try { - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -109,7 +109,7 @@ public class AutoCommitTransaction implements ServerTransaction ServerMessage message = entry.getMessage(); TransactionLogResource queue = entry.getOwningResource(); - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -146,7 +146,7 @@ public class AutoCommitTransaction implements ServerTransaction Transaction txn = null; try { - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -175,25 +175,21 @@ public class AutoCommitTransaction implements ServerTransaction try { - if(message.isPersistent()) + for(BaseQueue queue : queues) { - for(BaseQueue queue : queues) + if (queue.getMessageDurability().persist(message.isPersistent())) { - if (queue.isDurable()) + if (_logger.isDebugEnabled()) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName()); - } - if (txn == null) - { - txn = _messageStore.newTransaction(); - } - - txn.enqueueMessage(queue, message); + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName()); + } + if (txn == null) + { + txn = _messageStore.newTransaction(); + } + txn.enqueueMessage(queue, message); - } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java index 535ad77ea4..4195c72a28 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java @@ -381,7 +381,7 @@ public class DtxBranch public boolean isDurable() { - return _message.isPersistent() && _resource.isDurable(); + return _resource.getMessageDurability().persist(_message.isPersistent()); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index b3d013c99f..e371bcdb02 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -20,21 +20,21 @@ */ package org.apache.qpid.server.txn; -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.TransactionLogResource; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.Transaction; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import org.apache.qpid.server.store.TransactionLogResource; /** * A concrete implementation of ServerTransaction where enqueue/dequeue @@ -97,7 +97,7 @@ public class LocalTransaction implements ServerTransaction _postTransactionActions.add(postTransactionAction); initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { try { @@ -129,7 +129,7 @@ public class LocalTransaction implements ServerTransaction ServerMessage message = entry.getMessage(); TransactionLogResource queue = entry.getOwningResource(); - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -186,7 +186,7 @@ public class LocalTransaction implements ServerTransaction _postTransactionActions.add(postTransactionAction); initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { try { @@ -211,29 +211,26 @@ public class LocalTransaction implements ServerTransaction _postTransactionActions.add(postTransactionAction); initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); - if(message.isPersistent()) + try { - try + for(BaseQueue queue : queues) { - for(BaseQueue queue : queues) + if(queue.getMessageDurability().persist(message.isPersistent())) { - if(queue.isDurable()) + if (_logger.isDebugEnabled()) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName() ); - } + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName() ); + } - beginTranIfNecessary(); - _transaction.enqueueMessage(queue, message); + beginTranIfNecessary(); + _transaction.enqueueMessage(queue, message); - } } } - catch(RuntimeException e) - { - tidyUpOnError(e); - } + } + catch(RuntimeException e) + { + tidyUpOnError(e); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java index 9fc71e23d7..becf4a073c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; @@ -220,9 +221,9 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer } @Override - public boolean isDurable() + public MessageDurability getMessageDurability() { - return false; + return MessageDurability.DEFAULT; } }; txn.dequeueMessage(mockQueue, new DummyMessage(messageId)); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 6b6de5b66a..4ddf421901 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -34,8 +34,6 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; -import org.apache.qpid.server.model.ConfiguredObjectFactory; -import org.apache.qpid.server.model.VirtualHostNode; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; @@ -49,6 +47,7 @@ import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.ExclusivityPolicy; @@ -56,6 +55,7 @@ import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java index 51b33276ec..be4505bc80 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -20,16 +20,14 @@ */ package org.apache.qpid.server.store; -import static org.mockito.Mockito.mock; - import java.io.File; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.UUID; import org.apache.log4j.Logger; + import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.VirtualHost; @@ -148,12 +146,6 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple return _transactionResource; } - @Override - public boolean isDurable() - { - return true; - } - private static class TestMessage implements EnqueueableMessage { private final StoredMessage<?> _handle; @@ -180,4 +172,10 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple return _handle; } } + + @Override + public MessageDurability getMessageDurability() + { + return MessageDurability.DEFAULT; + } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index d4b990da07..758799b81f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -28,13 +28,14 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.Collections; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import org.hamcrest.Description; +import org.mockito.ArgumentMatcher; + import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.UUIDGenerator; @@ -45,9 +46,6 @@ import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.test.utils.QpidTestCase; -import org.hamcrest.Description; -import org.mockito.ArgumentMatcher; - public abstract class MessageStoreTestCase extends QpidTestCase { private MessageStore _store; @@ -117,8 +115,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase long messageId = 1; int contentSize = 0; final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); + enqueueMessage(message, "dummyQ"); MessageHandler handler = mock(MessageHandler.class); _store.visitMessages(handler); @@ -127,14 +124,60 @@ public abstract class MessageStoreTestCase extends QpidTestCase } + public void enqueueMessage(final StoredMessage<TestMessageMetaData> message, final String queueName) + { + Transaction txn = _store.newTransaction(); + txn.enqueueMessage(new TransactionLogResource() + { + private final UUID _id = UUID.nameUUIDFromBytes(queueName.getBytes()); + + @Override + public String getName() + { + return queueName; + } + + @Override + public UUID getId() + { + return _id; + } + + @Override + public MessageDurability getMessageDurability() + { + return MessageDurability.DEFAULT; + } + }, new EnqueueableMessage() + { + @Override + public long getMessageNumber() + { + return message.getMessageNumber(); + } + + @Override + public boolean isPersistent() + { + return true; + } + + @Override + public StoredMessage getStoredMessage() + { + return message; + } + }); + txn.commitTran(); + } + public void testVisitMessagesAborted() throws Exception { int contentSize = 0; for (int i = 0; i < 3; i++) { final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); + enqueueMessage(message, "dummyQ"); } MessageHandler handler = mock(MessageHandler.class); @@ -151,16 +194,16 @@ public abstract class MessageStoreTestCase extends QpidTestCase for (int i = 0; i < 3; i++) { final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); + enqueueMessage(message, "dummyQ"); + } reopenStore(); final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); + enqueueMessage(message, "dummyQ"); + assertTrue("Unexpected message id " + message.getMessageNumber(), message.getMessageNumber() >= 4); } @@ -170,8 +213,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase long messageId = 1; int contentSize = 0; final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message); @@ -305,8 +346,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase long messageId = 1; int contentSize = 0; final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); MessageHandler handler = mock(MessageHandler.class); _store.visitMessages(handler); @@ -319,8 +358,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase long messageId = 1; int contentSize = 0; final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); + enqueueMessage(message, "dummyQ"); final AtomicReference<StoredMessage<?>> retrievedMessageRef = new AtomicReference<StoredMessage<?>>(); _store.visitMessages(new MessageHandler() @@ -360,7 +398,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase TransactionLogResource queue = mock(TransactionLogResource.class); when(queue.getId()).thenReturn(queueId); when(queue.getName()).thenReturn("testQueue"); - when(queue.isDurable()).thenReturn(true); + when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT); return queue; } @@ -391,8 +429,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase private EnqueueableMessage createEnqueueableMessage(long messageId1) { final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0)); - StoreFuture flushFuture = message1.flushToStore(); - flushFuture.waitForCompletion(); EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1); return enqueueableMessage1; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java index 8285bdba4c..ec0908efba 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java @@ -25,6 +25,7 @@ import java.util.Collections; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.Transaction; @@ -53,6 +54,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase when(_messageStore.newTransaction()).thenReturn(_storeTransaction); when(_storeTransaction.commitTranAsync()).thenReturn(_future); when(_queue.isDurable()).thenReturn(true); + when(_queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT); } public void testEnqueuePersistentMessagePostCommitNotCalledWhenFutureAlreadyComplete() throws Exception diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java index 3e1183d203..5abbd7352b 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java @@ -20,22 +20,23 @@ */ package org.apache.qpid.server.txn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.MockMessageInstance; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState; import org.apache.qpid.test.utils.QpidTestCase; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** * A unit test ensuring that AutoCommitTransaction creates a separate transaction for * each dequeue/enqueue operation that involves enlistable messages. Verifies @@ -428,6 +429,7 @@ public class AutoCommitTransactionTest extends QpidTestCase { BaseQueue queue = mock(BaseQueue.class); when(queue.isDurable()).thenReturn(durable); + when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER); return queue; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java index 58c7401c60..c905e52715 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java @@ -20,22 +20,23 @@ */ package org.apache.qpid.server.txn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.MockMessageInstance; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState; import org.apache.qpid.test.utils.QpidTestCase; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** * A unit test ensuring that LocalTransactionTest creates a long-lived store transaction * that spans many dequeue/enqueue operations of enlistable messages. Verifies @@ -652,6 +653,7 @@ public class LocalTransactionTest extends QpidTestCase { BaseQueue queue = mock(BaseQueue.class); when(queue.isDurable()).thenReturn(durable); + when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER); return queue; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java index 685fea207b..9c05ed564a 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java @@ -42,6 +42,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.NullMessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -379,6 +380,7 @@ public class SynchronousMessageStoreRecovererTest extends TestCase { AMQQueue<?> queue = mock(AMQQueue.class); final UUID queueId = UUID.randomUUID(); + when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT); when(queue.getId()).thenReturn(queueId); when(queue.getName()).thenReturn("test-queue"); when(_virtualHost.getQueue(queueId)).thenReturn(queue); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java index f9bf697a0d..dfdc4e230c 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java @@ -102,12 +102,6 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java index 2a4aeb8b7e..ad99d14170 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java @@ -102,12 +102,6 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index af1bd00ae9..5adeba66b1 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -334,11 +334,7 @@ public class ServerSessionDelegate extends SessionDelegate int enqueues = serverSession.enqueue(message, instanceProperties, exchange); - if(enqueues != 0) - { - storeMessage.flushToStore(); - } - else + if(enqueues == 0) { if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 200be71187..4b37823898 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -34,8 +34,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -93,7 +91,6 @@ import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; @@ -152,9 +149,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); - // Set of messages being acknowledged in the current transaction - private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>(); - private final AtomicBoolean _suspended = new AtomicBoolean(false); private ServerTransaction _transaction; @@ -422,7 +416,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>> else { incrementOutstandingTxnsIfNecessary(); - handle.flushToStore(); } } } @@ -1412,7 +1405,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } finally { - _acknowledgedMessages.clear(); + _ackedMessages.clear(); } } @@ -1435,7 +1428,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } finally { - _acknowledgedMessages.clear(); + _ackedMessages.clear(); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java index 2ad6fc2ca6..b7d7e5b236 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java @@ -114,12 +114,6 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java index 50145e5c6d..c4de7a252b 100755 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java @@ -104,11 +104,6 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData> return buf; } - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - public void remove() { } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java index 0ff22f9d51..e0f0fc98a5 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java @@ -20,15 +20,21 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.util.UUID; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.store.MessageCounter; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestMemoryMessageStore; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.test.utils.QpidTestCase; /** @@ -85,8 +91,9 @@ public class ReferenceCountingTest extends QpidTestCase final MessageMetaData mmd = new MessageMetaData(info, chb); StoredMessage storedMessage = _store.addMessage(mmd); - storedMessage.flushToStore(); - + Transaction txn = _store.newTransaction(); + txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage)); + txn.commitTran(); AMQMessage message = new AMQMessage(storedMessage); MessageReference ref = message.newReference(); @@ -151,14 +158,13 @@ public class ReferenceCountingTest extends QpidTestCase final MessageMetaData mmd = new MessageMetaData(info, chb); StoredMessage storedMessage = _store.addMessage(mmd); - storedMessage.flushToStore(); - + Transaction txn = _store.newTransaction(); + txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage)); + txn.commitTran(); AMQMessage message = new AMQMessage(storedMessage); MessageReference ref = message.newReference(); - // we call routing complete to set up the handle - // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); assertEquals(1, getStoreMessageCount()); MessageReference ref2 = message.newReference(); @@ -166,6 +172,54 @@ public class ReferenceCountingTest extends QpidTestCase assertEquals(1, getStoreMessageCount()); } + private TransactionLogResource createTransactionLogResource(final String queueName) + { + return new TransactionLogResource() + { + @Override + public String getName() + { + return queueName; + } + + @Override + public UUID getId() + { + return UUID.nameUUIDFromBytes(queueName.getBytes()); + } + + @Override + public MessageDurability getMessageDurability() + { + return MessageDurability.DEFAULT; + } + }; + } + + private EnqueueableMessage createEnqueueableMessage(final StoredMessage storedMessage) + { + return new EnqueueableMessage() + { + @Override + public long getMessageNumber() + { + return storedMessage.getMessageNumber(); + } + + @Override + public boolean isPersistent() + { + return true; + } + + @Override + public StoredMessage getStoredMessage() + { + return storedMessage; + } + }; + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(ReferenceCountingTest.class); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index b8157b8f9e..f6d849bf79 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -261,12 +261,6 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement } @Override - public StoreFuture flushToStore() - { - throw new UnsupportedOperationException(); - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java index 3944774dfa..2e7e1cf097 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java @@ -159,8 +159,6 @@ public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, Deliv offset += bareMessageBuf.remaining(); } - storedMessage.flushToStore(); - Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference()); MessageReference<Message_1_0> reference = message.newReference(); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java index 3974198f62..bc2d3fe375 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java @@ -111,12 +111,6 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1 } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java index 69479b73d6..5412a09b4c 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java @@ -210,12 +210,6 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java index b8073d149b..46b7c322e6 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java @@ -99,12 +99,6 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java index d6abe94f30..639c16a71a 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java @@ -115,12 +115,6 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index b65181966c..2c40e536be 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -59,6 +59,7 @@ import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.SystemNodeCreator; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -1010,9 +1011,9 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public boolean isDurable() + public MessageDurability getMessageDurability() { - return false; + return MessageDurability.NEVER; } private class ConsumedMessageInstance implements MessageInstance diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html index 9a24e23407..c32c1d3bba 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html @@ -33,6 +33,17 @@ <td><input type="checkbox" name="durable" id="formAddQueue.durable" value="durable" checked="checked" dojoType="dijit.form.CheckBox" /></td> </tr> <tr> + <td valign="top"><strong>Persist Messages? </strong></td> + <td> + <select id="formAddQueue.messageDurability" name="messageDurability" data-dojo-type="dijit.form.FilteringSelect" + data-dojo-props="name: 'messageDurability', value: '', searchAttr: 'name', placeHolder: '', value: '', required: false "> + <option value="ALWAYS">Always</option> + <option value="DEFAULT">Default</option> + <option value="NEVER">Never</option> + </select> + </td> + </tr> + <tr> <td valign="top"><strong>Queue Type: </strong></td> <td> <input type="radio" id="formAddQueueTypeStandard" name="type" value="standard" checked="checked" dojoType="dijit.form.RadioButton" /> diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js index 039437a0bf..6c0924d09b 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js @@ -277,6 +277,7 @@ define(["dojo/_base/xhr", storeNodes(["name", "state", "durable", + "messageDurability", "exclusive", "owner", "lifetimePolicy", @@ -351,6 +352,7 @@ define(["dojo/_base/xhr", this.exclusive.innerHTML = entities.encode(String(this.queueData[ "exclusive" ])); this.owner.innerHTML = this.queueData[ "owner" ] ? entities.encode(String(this.queueData[ "owner" ])) : "" ; this.lifetimePolicy.innerHTML = entities.encode(String(this.queueData[ "lifetimePolicy" ])); + this.messageDurability.innerHTML = entities.encode(String(this.queueData[ "messageDurability" ])); this.alternateExchange.innerHTML = this.queueData[ "alternateExchange" ] ? entities.encode(String(this.queueData[ "alternateExchange" ])) : "" ; this.queueDepthMessages.innerHTML = entities.encode(String(this.queueData["queueDepthMessages"])); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html index 89b7327957..8f2bf3364d 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html @@ -33,6 +33,10 @@ <div class="durable" style="float:left;"></div> </div> <div style="clear:both"> + <div class="formLabel-labelCell" style="float:left; width: 150px;">Persist Messages:</div> + <div class="messageDurability" style="float:left;"></div> + </div> + <div style="clear:both"> <div class="formLabel-labelCell" style="float:left; width: 150px;">Exclusive:</div> <div class="exclusive" style="float:left;"></div> </div> diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a37b532617..3316ae801b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -118,7 +118,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Immediate message prefetch default. */ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - private final boolean _delareQueues = + private final boolean _declareQueues = Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "true")); private final boolean _declareExchanges = @@ -2871,7 +2871,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic declareExchange(amqd, nowait); } - if ((_delareQueues || amqd.isNameRequired()) && !amqd.neverDeclare()) + if ((_declareQueues || amqd.isNameRequired()) && !amqd.neverDeclare()) { declareQueue(amqd, consumer.isNoLocal(), nowait); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index cb8f81f68f..46473900c0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -24,6 +24,7 @@ import static org.apache.qpid.transport.Option.UNRELIABLE; import java.lang.ref.WeakReference; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -37,6 +38,9 @@ import java.util.concurrent.ConcurrentLinkedQueue; import javax.jms.Destination; import javax.jms.JMSException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; @@ -57,29 +61,9 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ExchangeBoundResult; -import org.apache.qpid.transport.ExchangeQueryResult; -import org.apache.qpid.transport.ExecutionErrorCode; -import org.apache.qpid.transport.ExecutionException; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.QueueQueryResult; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.RangeSetFactory; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionException; -import org.apache.qpid.transport.SessionListener; -import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.*; import org.apache.qpid.util.Serial; import org.apache.qpid.util.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This is a 0.10 Session @@ -362,19 +346,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic final AMQDestination destination, final boolean nowait) throws AMQException { - if (destination.getDestSyntax() == DestSyntax.BURL) + if (destination == null || destination.getDestSyntax() == DestSyntax.BURL) { Map args = FieldTableSupport.convertToMap(arguments); - for (AMQShortString rk: destination.getBindingKeys()) + if(destination != null) { - _logger.debug("Binding queue : " + queueName.toString() + - " exchange: " + exchangeName.toString() + - " using binding key " + rk.asString()); - getQpidSession().exchangeBind(queueName.toString(), - exchangeName.toString(), - rk.toString(), - args); + for (AMQShortString rk: destination.getBindingKeys()) + { + doSendQueueBind(queueName, exchangeName, args, rk); + } + if(!Arrays.asList(destination.getBindingKeys()).contains(routingKey)) + { + doSendQueueBind(queueName, exchangeName, args, routingKey); + } + } + else + { + doSendQueueBind(queueName, exchangeName, args, routingKey); } } else @@ -420,6 +409,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } + private void doSendQueueBind(final AMQShortString queueName, + final AMQShortString exchangeName, + final Map args, + final AMQShortString rk) + { + _logger.debug("Binding queue : " + queueName.toString() + + " exchange: " + exchangeName.toString() + + " using binding key " + rk.asString()); + getQpidSession().exchangeBind(queueName.toString(), + exchangeName.toString(), + rk.toString(), + args); + } + /** * Close this session. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java new file mode 100644 index 0000000000..fe86e9d41f --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java @@ -0,0 +1,216 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class QueueMessageDurabilityTest extends QpidBrokerTestCase +{ + + private static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability"; + private static final String DURABLE_ALWAYS_PERSIST_NAME = "DURABLE_QUEUE_ALWAYS_PERSIST"; + private static final String DURABLE_NEVER_PERSIST_NAME = "DURABLE_QUEUE_NEVER_PERSIST"; + private static final String DURABLE_DEFAULT_PERSIST_NAME = "DURABLE_QUEUE_DEFAULT_PERSIST"; + private static final String NONDURABLE_ALWAYS_PERSIST_NAME = "NONDURABLE_QUEUE_ALWAYS_PERSIST"; + + @Override + public void setUp() throws Exception + { + super.setUp(); + Connection conn = getConnection(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQSession amqSession = (AMQSession) session; + + Map<String,Object> arguments = new HashMap<>(); + arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.ALWAYS.name()); + amqSession.createQueue(new AMQShortString(DURABLE_ALWAYS_PERSIST_NAME), false, true, false, arguments); + + arguments = new HashMap<>(); + arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.NEVER.name()); + amqSession.createQueue(new AMQShortString(DURABLE_NEVER_PERSIST_NAME), false, true, false, arguments); + + arguments = new HashMap<>(); + arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.DEFAULT.name()); + amqSession.createQueue(new AMQShortString(DURABLE_DEFAULT_PERSIST_NAME), false, true, false, arguments); + + arguments = new HashMap<>(); + arguments.put(QPID_MESSAGE_DURABILITY,MessageDurability.ALWAYS.name()); + amqSession.createQueue(new AMQShortString(NONDURABLE_ALWAYS_PERSIST_NAME), false, false, false, arguments); + + amqSession.bindQueue(AMQShortString.valueOf(DURABLE_ALWAYS_PERSIST_NAME), + AMQShortString.valueOf("Y.*.*.*"), + null, + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), + null); + + amqSession.bindQueue(AMQShortString.valueOf(DURABLE_NEVER_PERSIST_NAME), + AMQShortString.valueOf("*.Y.*.*"), + null, + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), + null); + + amqSession.bindQueue(AMQShortString.valueOf(DURABLE_DEFAULT_PERSIST_NAME), + AMQShortString.valueOf("*.*.Y.*"), + null, + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), + null); + + amqSession.bindQueue(AMQShortString.valueOf(NONDURABLE_ALWAYS_PERSIST_NAME), + AMQShortString.valueOf("*.*.*.Y"), + null, + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), + null); + } + + public void testSendPersistentMessageToAll() throws Exception + { + Connection conn = getConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + conn.start(); + producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test")); + session.commit(); + + AMQSession amqSession = (AMQSession) session; + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME))); + + restartBroker(); + + conn = getConnection(); + session = conn.createSession(true, Session.SESSION_TRANSACTED); + amqSession = (AMQSession) session; + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + + assertFalse(amqSession.isQueueBound((AMQDestination) session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME))); + + } + + + public void testSendNonPersistentMessageToAll() throws Exception + { + Connection conn = getConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + conn.start(); + producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test")); + session.commit(); + + AMQSession amqSession = (AMQSession) session; + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME))); + + restartBroker(); + + conn = getConnection(); + session = conn.createSession(true, Session.SESSION_TRANSACTED); + amqSession = (AMQSession) session; + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + + assertFalse(amqSession.isQueueBound((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME))); + + } + + public void testNonPersistentContentRetained() throws Exception + { + Connection conn = getConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + conn.start(); + producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1")); + producer.send(session.createTopic("Y.N.Y.Y"), session.createTextMessage("test2")); + session.commit(); + MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)); + Message msg = consumer.receive(1000l); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("test2", ((TextMessage) msg).getText()); + session.rollback(); + restartBroker(); + conn = getConnection(); + conn.start(); + session = conn.createSession(true, Session.SESSION_TRANSACTED); + AMQSession amqSession = (AMQSession) session; + assertEquals(1, amqSession.getQueueDepth((AMQDestination) session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)); + msg = consumer.receive(1000l); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("test2", ((TextMessage)msg).getText()); + session.commit(); + } + + public void testPersistentContentRetainedOnTransientQueue() throws Exception + { + setTestClientSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false"); + Connection conn = getConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + conn.start(); + producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1")); + session.commit(); + MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)); + Message msg = consumer.receive(1000l); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("test1", ((TextMessage)msg).getText()); + session.commit(); + System.gc(); + consumer = session.createConsumer(session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)); + msg = consumer.receive(1000l); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("test1", ((TextMessage)msg).getText()); + session.commit(); + } + + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java index 2c38a04895..8550c804a6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.File; -import java.io.IOException; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; @@ -34,6 +33,8 @@ import java.util.Map; import javax.security.auth.Subject; +import org.codehaus.jackson.map.ObjectMapper; + import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -77,7 +78,6 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhostnode.JsonVirtualHostNode; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; -import org.codehaus.jackson.map.ObjectMapper; /** * @@ -604,7 +604,6 @@ public class VirtualHostMessageStoreTest extends QpidTestCase MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis()); final StoredMessage<MessageMetaData> storedMessage = _virtualHost.getMessageStore().addMessage(mmd); - storedMessage.flushToStore(); final AMQMessage currentMessage = new AMQMessage(storedMessage); diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 9e9708bf3c..3d87da11c8 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -193,3 +193,7 @@ org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide // Exclude java broker specific behavior allowing queue re-bind to topic exchanges on 0.8/0-10 paths org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange + +// Tests queue message durability settings which are a Java Broker specific feature +org.apache.qpid.server.queue.QueueMessageDurabilityTest#* + diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index a50a8bd599..ff4485d599 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -33,6 +33,8 @@ org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash org.apache.qpid.test.unit.xa.TopicTest#testDurSubCrash org.apache.qpid.test.unit.xa.TopicTest#testRecover +org.apache.qpid.server.queue.QueueMessageDurabilityTest#* + org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessagePersistence org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessageRemoval org.apache.qpid.server.store.VirtualHostMessageStoreTest#testBindingPersistence |
