From 42bfb186da9e911c208f22dd5f6c794b9bddd859 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 25 Jul 2014 14:24:36 +0000 Subject: QPID-4304 : [Java Broker] Add an attribute to queues - "messageDurability" - which controls whether message data is persisted or not. By default, depend on the persistence setting of the message, but allow an individual queue to declare that all (or no) messages should be persisted on the queue git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1613440 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/berkeleydb/AbstractBDBMessageStore.java | 43 +++++++++++++--------- .../tuple/PreparedTransactionBinding.java | 5 ++- .../store/berkeleydb/BDBMessageStoreTest.java | 12 +++--- 3 files changed, 35 insertions(+), 25 deletions(-) (limited to 'qpid/java/bdbstore') diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index cf187fe1e9..be592a0d42 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -55,7 +55,6 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMemoryMessage; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.Xid; @@ -123,14 +122,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore long newMessageId = getNextMessageId(); - if (metaData.isPersistent()) - { - return (StoredMessage) new StoredBDBMessage(newMessageId, metaData); - } - else - { - return new StoredMemoryMessage(newMessageId, metaData); - } + return new StoredBDBMessage(newMessageId, metaData); } public long getNextMessageId() @@ -1049,7 +1041,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore protected abstract Logger getLogger(); - private class StoredBDBMessage implements StoredMessage + class StoredBDBMessage implements StoredMessage { private final long _messageId; @@ -1177,8 +1169,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - @Override - public synchronized StoreFuture flushToStore() + synchronized StoreFuture flushToStore() { if(!stored()) { @@ -1229,6 +1220,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { private Transaction _txn; private int _storeSizeIncrease; + private final List _onCommitActions = new ArrayList<>(); private BDBTransaction() throws StoreException { @@ -1250,8 +1242,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore if(message.getStoredMessage() instanceof StoredBDBMessage) { final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); - storedMessage.store(_txn); - _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + _onCommitActions.add(new Runnable() + { + @Override + public void run() + { + storedMessage.store(_txn); + _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + } + }); + } AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); @@ -1269,16 +1269,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void commitTran() throws StoreException { checkMessageStoreOpen(); - + doPreCommitActions(); AbstractBDBMessageStore.this.commitTranImpl(_txn, true); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); } + private void doPreCommitActions() + { + for(Runnable action : _onCommitActions) + { + action.run(); + } + _onCommitActions.clear(); + } + @Override public StoreFuture commitTranAsync() throws StoreException { checkMessageStoreOpen(); - + doPreCommitActions(); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); return AbstractBDBMessageStore.this.commitTranImpl(_txn, false); } @@ -1287,7 +1296,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void abortTran() throws StoreException { checkMessageStoreOpen(); - + _onCommitActions.clear(); AbstractBDBMessageStore.this.abortTran(_txn); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java index 8fb011152c..1f4cf45ce1 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java @@ -27,6 +27,7 @@ import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; @@ -131,9 +132,9 @@ public class PreparedTransactionBinding extends TupleBinding storedMessage_0_8 = store.addMessage(messageMetaData_0_8); storedMessage_0_8.addContent(0, chunk1); - storedMessage_0_8.flushToStore(); + ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore(); return storedMessage_0_8; } -- cgit v1.2.1